Source code for examples.contentmanagement.application

from __future__ import annotations

from typing import TYPE_CHECKING, Any, ClassVar, Union
from uuid import NAMESPACE_URL, UUID, uuid5

from eventsourcing.application import (
    AggregateNotFoundError,
    Application,
    EventSourcedLog,
)
from examples.contentmanagement.domainmodel import Page, PageLogged, Slug

if TYPE_CHECKING:
    from collections.abc import Iterator

    from eventsourcing.domain import MutableOrImmutableAggregate
    from eventsourcing.utils import EnvType

PageDetailsType = dict[str, Union[str, Any]]


[docs] class ContentManagement(Application): env: ClassVar[dict[str, str]] = {"CONTENTMANAGEMENT_COMPRESSOR_TOPIC": "gzip"} snapshotting_intervals: ClassVar[dict[type[MutableOrImmutableAggregate], int]] = { Page: 5 } def __init__(self, env: EnvType | None = None) -> None: super().__init__(env) self.page_log: EventSourcedLog[PageLogged] = EventSourcedLog( self.events, uuid5(NAMESPACE_URL, "/page_log"), PageLogged )
[docs] def create_page(self, title: str, slug: str) -> int: page = Page(title=title, slug=slug, body="") page_logged = self.page_log.trigger_event(page_id=page.id) index_entry = Slug(slug, page_id=page.id) recordings = self.save(page, page_logged, index_entry) return recordings[-1].notification.id
[docs] def get_page_by_slug(self, slug: str) -> PageDetailsType: page = self._get_page_by_slug(slug) return self._details_from_page(page)
[docs] def get_page_by_id(self, page_id: UUID) -> PageDetailsType: page = self._get_page_by_id(page_id) return self._details_from_page(page)
def _details_from_page(self, page: Page) -> PageDetailsType: return { "title": page.title, "slug": page.slug, "body": page.body, "modified_by": page.modified_by, }
[docs] def update_title(self, slug: str, title: str) -> int: page = self._get_page_by_slug(slug) page.update_title(title=title) recordings = self.save(page) return recordings[-1].notification.id
[docs] def update_slug(self, old_slug: str, new_slug: str) -> int: page = self._get_page_by_slug(old_slug) page.update_slug(new_slug) old_slug_aggregate = self._get_slug(old_slug) old_slug_aggregate.update_page(None) try: new_slug_aggregate = self._get_slug(new_slug) except AggregateNotFoundError: new_slug_aggregate = Slug(new_slug, page.id) else: if new_slug_aggregate.page_id is None: new_slug_aggregate.update_page(page.id) else: raise SlugConflictError recordings = self.save(page, old_slug_aggregate, new_slug_aggregate) return recordings[-1].notification.id
[docs] def update_body(self, slug: str, body: str) -> int: page = self._get_page_by_slug(slug) page.update_body(body) recordings = self.save(page) return recordings[-1].notification.id
def _get_page_by_slug(self, slug: str) -> Page: try: index = self._get_slug(slug) except AggregateNotFoundError: raise PageNotFoundError(slug) from None if index.page_id is None: raise PageNotFoundError(slug) page_id = index.page_id return self._get_page_by_id(page_id) def _get_page_by_id(self, page_id: UUID) -> Page: return self.repository.get(page_id) def _get_slug(self, slug: str) -> Slug: return self.repository.get(Slug.create_id(slug))
[docs] def get_pages( self, *, gt: int | None = None, lte: int | None = None, desc: bool = False, limit: int | None = None, ) -> Iterator[PageDetailsType]: for page_logged in self.page_log.get(gt=gt, lte=lte, desc=desc, limit=limit): page = self._get_page_by_id(page_logged.page_id) yield self._details_from_page(page)
[docs] class PageNotFoundError(Exception): """Raised when a page is not found."""
[docs] class SlugConflictError(Exception): """Raised when updating a page to a slug used by another page."""