Source code for examples.ftscontentmanagement.persistence

from __future__ import annotations

from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import TYPE_CHECKING

from eventsourcing.persistence import Recorder

if TYPE_CHECKING:
    from collections.abc import Sequence
    from uuid import UUID


[docs] @dataclass(frozen=True) class PageInfo: id: UUID slug: str title: str body: str
[docs] class FtsRecorder(Recorder, ABC):
[docs] @abstractmethod def insert_pages(self, pages: Sequence[PageInfo]) -> None: """Insert a sequence of pages (id, slug, title, body)."""
[docs] @abstractmethod def update_pages(self, pages: Sequence[PageInfo]) -> None: """Update a sequence of pages (id, slug, title, body)."""
[docs] @abstractmethod def search_pages(self, query: str) -> list[UUID]: """Returns IDs for pages that match query."""
[docs] @abstractmethod def select_page(self, page_id: UUID) -> PageInfo: """Returns slug, title and body for given ID."""
[docs] def search(self, query: str) -> Sequence[PageInfo]: pages = [] for page_id in self.search_pages(query): page = self.select_page(page_id) pages.append(page) return pages