Source code for examples.contentmanagement.domainmodel

from __future__ import annotations

from dataclasses import dataclass, field
from uuid import NAMESPACE_URL, UUID, uuid5

from eventsourcing.domain import Aggregate, DomainEvent, event
from examples.contentmanagement.utils import apply_diff, create_diff


[docs] @dataclass class Page(Aggregate): title: str """The title of the page.""" slug: str """The slug of the page - used in URLs.""" body: str """The proper content of the page.""" modified_by: UUID | None = field(init=False) """The ID of the user who last modified the page."""
[docs] class Event(Aggregate.Event):
[docs] def apply(self, aggregate: Page) -> None: """Sets the aggregate's `modified_by` attribute to the value of the event's metadata `user_id` value. """ aggregate.modified_by = self.get_user_id()
[docs] def get_user_id(self) -> UUID: return UUID(self.metadata["user_id"])
[docs] @event("SlugUpdated") def update_slug(self, slug: str) -> None: self.slug = slug
[docs] @event("TitleUpdated") def update_title(self, title: str) -> None: self.title = title
[docs] def update_body(self, body: str) -> None: diff = create_diff(old=self.body, new=body) self._update_body(diff=diff)
[docs] class Created(Aggregate.Created, Event): title: str slug: str body: str
[docs] class BodyUpdated(Event): diff: str
[docs] @event(BodyUpdated) def _update_body(self, diff: str) -> None: new_body = apply_diff(old=self.body, diff=diff) self.body = new_body
[docs] @dataclass class Slug(Aggregate): name: str page_id: UUID | None
[docs] class Event(Aggregate.Event): pass
[docs] @staticmethod def create_id(name: str) -> UUID: return uuid5(NAMESPACE_URL, f"/slugs/{name}")
[docs] @event("PageUpdated") def update_page(self, page_id: UUID | None) -> None: self.page_id = page_id
[docs] class PageLogged(DomainEvent): page_id: UUID