Source code for examples.ftsprocess.application

from __future__ import annotations

from typing import TYPE_CHECKING, ClassVar, cast
from uuid import UUID

from eventsourcing.dispatch import singledispatchmethod
from eventsourcing.system import ProcessApplication
from examples.contentmanagement.domainmodel import Page
from examples.contentmanagement.utils import apply_diff
from examples.ftscontentmanagement.persistence import FtsRecorder, PageInfo

if TYPE_CHECKING:
    from eventsourcing.application import ProcessingEvent
    from eventsourcing.domain import DomainEventProtocol


[docs] class FtsProcess(ProcessApplication[UUID]): env: ClassVar[dict[str, str]] = { "COMPRESSOR_TOPIC": "gzip", }
[docs] @singledispatchmethod def policy( self, domain_event: DomainEventProtocol[UUID], processing_event: ProcessingEvent[UUID], ) -> None: if isinstance(domain_event, Page.Created): processing_event.collect_events( insert_pages=[ PageInfo( id=domain_event.originator_id, slug=domain_event.slug, title=domain_event.title, body=domain_event.body, ) ] ) elif isinstance(domain_event, Page.BodyUpdated): recorder = cast("FtsRecorder", self.recorder) page_id = domain_event.originator_id page = recorder.select_page(page_id) page_body = apply_diff(page.body, domain_event.diff) processing_event.collect_events( update_pages=[ PageInfo( id=page_id, slug=page.slug, title=page.title, body=page_body, ) ] )
[docs] def search(self, query: str) -> list[UUID]: recorder = cast("FtsRecorder", self.recorder) return recorder.search_pages(query)