from __future__ import annotations
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING
from eventsourcing.dispatch import singledispatchmethod
from eventsourcing.persistence import Tracking, TrackingRecorder
from eventsourcing.postgres import PostgresTrackingRecorder
from eventsourcing.projection import Projection
from examples.contentmanagement.domainmodel import Page # noqa: TC001
from examples.contentmanagement.utils import apply_diff
from examples.ftscontentmanagement.persistence import FtsRecorder, PageInfo
from examples.ftscontentmanagement.postgres import PostgresFtsRecorder
if TYPE_CHECKING:
from collections.abc import Sequence
from eventsourcing.domain import DomainEventProtocol, TAggregateID
[docs]
class FtsViewInterface(FtsRecorder, TrackingRecorder, ABC):
[docs]
@abstractmethod
def insert_pages_with_tracking(
self, pages: Sequence[PageInfo], tracking: Tracking
) -> None:
pass
[docs]
@abstractmethod
def update_pages_with_tracking(
self, pages: Sequence[PageInfo], tracking: Tracking
) -> None:
pass
[docs]
class FtsProjection(Projection[FtsViewInterface]):
[docs]
@singledispatchmethod
def process_event(
self, domain_event: DomainEventProtocol[TAggregateID], tracking: Tracking
) -> None:
self.view.insert_tracking(tracking)
[docs]
@process_event.register
def page_created(self, domain_event: Page.Created, tracking: Tracking) -> None:
new_page = PageInfo(
id=domain_event.originator_id,
slug=domain_event.slug,
title=domain_event.title,
body=domain_event.body,
)
self.view.insert_pages_with_tracking([new_page], tracking)
[docs]
@process_event.register
def body_updated(self, domain_event: Page.BodyUpdated, tracking: Tracking) -> None:
page_id = domain_event.originator_id
old_page = self.view.select_page(page_id)
new_page = PageInfo(
id=page_id,
slug=old_page.slug,
title=old_page.title,
body=apply_diff(old_page.body, domain_event.diff),
)
self.view.update_pages_with_tracking([new_page], tracking)
[docs]
class PostgresFtsView(PostgresFtsRecorder, PostgresTrackingRecorder, FtsViewInterface):
[docs]
def insert_pages_with_tracking(
self, pages: Sequence[PageInfo], tracking: Tracking
) -> None:
with self.datastore.transaction(commit=True) as curs:
self._insert_pages(curs, pages)
self._insert_tracking(curs, tracking)
[docs]
def update_pages_with_tracking(
self, pages: Sequence[PageInfo], tracking: Tracking
) -> None:
with self.datastore.transaction(commit=True) as curs:
self._update_pages(curs, pages)
self._insert_tracking(curs, tracking)