Application 3 - Content management

This example demonstrates the use of namespaced IDs for both discovery of aggregate IDs and implementation of an application-wide rule (or “invariant”). This example also involves event-sourced logs, automatic snapshotting, and the use of the declarative syntax for domain models with non-trivial command methods.

This example also shows how to use a thread-specific context variable to set the value of a common event attribute without cluttering all the command methods with the same argument. In this example the ID of the user is recorded on each event, but the same technique can be used to set correlation and causation IDs on all events in a domain model.

Domain model

The Page class is an aggregate defined to have a title, a slug, a body, and a modified_by attribute. It defines a aggregate event base class, Event, which has a user_id attribute that is defined with a data class field object. All the aggregate event classes of Page inherit from its Event class. The Event class defines a apply() method that sets the aggregate’s modified_by attribute to the value of the events’s user_id attribute.

@dataclass
class Page(Aggregate):
    title: str
    """The title of the page."""

    slug: str
    """The slug of the page - used in URLs."""

    body: str
    """The proper content of the page."""

    modified_by: UUID | None = field(init=False)
    """The ID of the user who last modified the page."""

    class Event(Aggregate.Event):
        user_id: UUID | None = field(default_factory=user_id_cvar.get, init=False)

        def apply(self, aggregate: Aggregate) -> None:
            """Sets the aggregate's `modified_by` attribute to the
            value of the event's `user_id` attribute.
            """
            cast("Page", aggregate).modified_by = self.user_id

    @event("SlugUpdated")
    def update_slug(self, slug: str) -> None:
        self.slug = slug

    @event("TitleUpdated")
    def update_title(self, title: str) -> None:
        self.title = title

    def update_body(self, body: str) -> None:
        diff = create_diff(old=self.body, new=body)
        self._update_body(diff=diff)

    class Created(Aggregate.Created, Event):
        title: str
        slug: str
        body: str

    class BodyUpdated(Event):
        diff: str

    @event(BodyUpdated)
    def _update_body(self, diff: str) -> None:
        new_body = apply_diff(old=self.body, diff=diff)
        self.body = new_body

The user_id attribute is defined as a dataclass field that is not included in __init__ methods (init=False), and so it does not need to be matched by parameters in the aggregate command method signatures. Instead, the data class field gets the event attribute value from a Python context variable (default_factory=user_id_cvar.get). That is why none of the command method signatures need to mention this as one of their arguments, but still all the aggregate events will carry the ID of the user that executed the command.

The update_body() command method is a non-trivial command methods, in that is does some work on the command method arguments before triggering a domain event: it creates a “diff” of the current version of the body and the new version. For this reason, it is not decorated with the event decorator.

After creating a “diff” of the page page, it calls the “private” method _update_body(), which is decorated with the event decorator, and so triggers a BodyUpdated event. The event is applied to the page’s body by patching the current value with the diff that has been encapsulated by the event object.

The Slug assists the domain model by carrying a page_id. This aggregate’s ID is a version-5 UUID, that is a function of its name.

The Slug and Page aggregates are used in combination to maintain editable pages of text, with editable titles, and with editable “slugs” that can be used in page URLs. The slug’s name is taken from the URL, and used to discover the aggregate ID of the page it is currently associated with.

@dataclass
class Slug(Aggregate):
    name: str
    page_id: UUID | None

    class Event(Aggregate.Event):
        pass

    @staticmethod
    def create_id(name: str) -> UUID:
        return uuid5(NAMESPACE_URL, f"/slugs/{name}")

    @event("PageUpdated")
    def update_page(self, page_id: UUID | None) -> None:
        self.page_id = page_id

A PageLogged event is also defined, and used to define a “page log”. The page log can be used to discover all the pages that have been created.

class PageLogged(DomainEvent):
    page_id: UUID

Application

The ContentManagement application encapsulates the Page and Slug aggregates. It defines methods to create a new page, to get the content of a page by its slug, and to update the title, body, and slug of a page.

class ContentManagement(Application[UUID]):
    env: ClassVar[dict[str, str]] = {"CONTENTMANAGEMENT_COMPRESSOR_TOPIC": "gzip"}
    snapshotting_intervals: ClassVar[
        dict[type[MutableOrImmutableAggregate[UUID]], int]
    ] = {Page: 5}

    def __init__(self, env: EnvType | None = None) -> None:
        super().__init__(env)
        self.page_log: EventSourcedLog[PageLogged] = EventSourcedLog(
            self.events, uuid5(NAMESPACE_URL, "/page_log"), PageLogged
        )

    def create_page(self, title: str, slug: str) -> int:
        page = Page(title=title, slug=slug, body="")
        page_logged = self.page_log.trigger_event(page_id=page.id)
        index_entry = Slug(slug, page_id=page.id)
        recordings = self.save(page, page_logged, index_entry)
        return recordings[-1].notification.id

    def get_page_by_slug(self, slug: str) -> PageDetailsType:
        page = self._get_page_by_slug(slug)
        return self._details_from_page(page)

    def get_page_by_id(self, page_id: UUID) -> PageDetailsType:
        page = self._get_page_by_id(page_id)
        return self._details_from_page(page)

    def _details_from_page(self, page: Page) -> PageDetailsType:
        return {
            "title": page.title,
            "slug": page.slug,
            "body": page.body,
            "modified_by": page.modified_by,
        }

    def update_title(self, slug: str, title: str) -> int:
        page = self._get_page_by_slug(slug)
        page.update_title(title=title)
        recordings = self.save(page)
        return recordings[-1].notification.id

    def update_slug(self, old_slug: str, new_slug: str) -> int:
        page = self._get_page_by_slug(old_slug)
        page.update_slug(new_slug)
        old_slug_aggregate = self._get_slug(old_slug)
        old_slug_aggregate.update_page(None)
        try:
            new_slug_aggregate = self._get_slug(new_slug)
        except AggregateNotFoundError:
            new_slug_aggregate = Slug(new_slug, page.id)
        else:
            if new_slug_aggregate.page_id is None:
                new_slug_aggregate.update_page(page.id)
            else:
                raise SlugConflictError
        recordings = self.save(page, old_slug_aggregate, new_slug_aggregate)
        return recordings[-1].notification.id

    def update_body(self, slug: str, body: str) -> int:
        page = self._get_page_by_slug(slug)
        page.update_body(body)
        recordings = self.save(page)
        return recordings[-1].notification.id

    def _get_page_by_slug(self, slug: str) -> Page:
        try:
            index = self._get_slug(slug)
        except AggregateNotFoundError:
            raise PageNotFoundError(slug) from None
        if index.page_id is None:
            raise PageNotFoundError(slug)
        page_id = index.page_id
        return self._get_page_by_id(page_id)

    def _get_page_by_id(self, page_id: UUID) -> Page:
        return self.repository.get(page_id)

    def _get_slug(self, slug: str) -> Slug:
        return self.repository.get(Slug.create_id(slug))

    def get_pages(
        self,
        *,
        gt: int | None = None,
        lte: int | None = None,
        desc: bool = False,
        limit: int | None = None,
    ) -> Iterator[PageDetailsType]:
        for page_logged in self.page_log.get(gt=gt, lte=lte, desc=desc, limit=limit):
            page = self._get_page_by_id(page_logged.page_id)
            yield self._details_from_page(page)

To get to a page, a slug aggregate ID is computed from a slug string, and the slug aggregate is used to get the page aggregate ID.

To change a page’s slug, the slug aggregates for the old and the new slug strings are obtained, the page ID is removed as the page ID of the old slug, and it is set as the page ID of the new slug. The slugs are also used to implement an application-wide rule (or “invariant”) that a slug can be used by only one page. If an attempt is made to change the slug of one page to a slug that is already being used by another page, then a SlugConflictError will be raised, and no changes made.

The application also demonstrates the “event-sourced log” recipe, by showing how all the IDs of the Page aggregates can be listed, by logging the page ID in a sequence of stored events, and then selecting from this sequence when presenting a list of pages.

Please note, although all the Page aggregate events have a user_id attribute, none of the aggregate or application command methods mention a user_id argument. Instead the value can be set in a context variable by callers of the application command methods, for example in an interface or presentation layer after a user request has been authenticated (see the test case below).

The application also demonstrates the automatic snapshotting of aggregates at regular intervals. In this case, the class attribute snapshotting_intervals specifies that a page will be snapshotted every 5 events.

Diff and patch utilities

The create_diff() and apply_diff() functions use the Unix command line tools diff and patch.

def create_diff(old: str, new: str) -> str:
    return run("diff %s %s > %s", old, new)
def apply_diff(old: str, diff: str) -> str:
    return run("patch -s %s %s -o %s", old, diff)
def run(cmd: str, a: str, b: str) -> str:
    with TemporaryDirectory() as td:
        a_path = Path(td) / "a"
        b_path = Path(td) / "b"
        c_path = Path(td) / "c"
        with a_path.open("w") as a_file:
            a_file.write(a)
        with b_path.open("w") as b_file:
            b_file.write(b)
        os.system(cmd % (a_path, b_path, c_path))  # noqa: S605
        with c_path.open() as c_file:
            return c_file.read()

Test case

The TestContentManagement test case creates and updates pages in various ways. It sets a user ID in user_id_cvar context variable before application methods are called. At the end, all the page events are checked to make sure they all have the user ID that was set in the context variable.

class TestContentManagement(TestCase):
    def test(self) -> None:
        # Construct application.
        app = ContentManagement()

        # Check the page doesn't exist.
        with self.assertRaises(PageNotFoundError):
            app.get_page_by_slug(slug="welcome")

        # Check the list of pages is empty.
        pages = list(app.get_pages())
        self.assertEqual(len(pages), 0)

        # Create a page.
        user_id1 = uuid4()
        user_id_cvar.set(user_id1)
        app.create_page(title="Welcome", slug="welcome")

        # Present page identified by the given slug.
        page = app.get_page_by_slug(slug="welcome")

        # Check we got a dict that has the given title and slug.
        self.assertEqual(page["title"], "Welcome")
        self.assertEqual(page["slug"], "welcome")
        self.assertEqual(page["body"], "")
        self.assertEqual(page["modified_by"], user_id1)

        # Update the title.
        user_id2 = uuid4()
        user_id_cvar.set(user_id2)
        app.update_title(slug="welcome", title="Welcome Visitors")

        # Check the title was updated.
        page = app.get_page_by_slug(slug="welcome")
        self.assertEqual(page["title"], "Welcome Visitors")
        self.assertEqual(page["slug"], "welcome")
        self.assertEqual(page["body"], "")
        self.assertEqual(page["modified_by"], user_id2)

        # Update the slug.
        user_id3 = uuid4()
        user_id_cvar.set(user_id3)
        app.update_slug(old_slug="welcome", new_slug="welcome-visitors")

        # Check the slug was updated.
        with self.assertRaises(PageNotFoundError):
            app.get_page_by_slug(slug="welcome")

        # Check we can get the page by the new slug.
        page = app.get_page_by_slug(slug="welcome-visitors")
        self.assertEqual(page["title"], "Welcome Visitors")
        self.assertEqual(page["slug"], "welcome-visitors")
        self.assertEqual(page["body"], "")
        self.assertEqual(page["modified_by"], user_id3)

        # Update the body.
        user_id4 = uuid4()
        user_id_cvar.set(user_id4)
        app.update_body(slug="welcome-visitors", body="Welcome to my wiki!")

        # Check the body was updated.
        page = app.get_page_by_slug(slug="welcome-visitors")
        self.assertEqual(page["title"], "Welcome Visitors")
        self.assertEqual(page["slug"], "welcome-visitors")
        self.assertEqual(page["body"], "Welcome to my wiki!")
        self.assertEqual(page["modified_by"], user_id4)

        # Check we are on version 4.
        page_id = cast(
            "Slug", app.repository.get(Slug.create_id("welcome-visitors"))
        ).page_id
        assert page_id is not None
        page_aggregate_v4: Page = app.repository.get(page_id)
        self.assertEqual(page_aggregate_v4.version, 4)

        # Check there are no snapshots.
        assert app.snapshots is not None
        self.assertFalse(len(list(app.snapshots.get(page_id))))

        # Update the body (should trigger a snapshot).
        user_id5 = uuid4()
        user_id_cvar.set(user_id5)
        app.update_body(
            slug="welcome-visitors",
            body="""
Welcome to this wiki!

This is a wiki about...
""",
        )

        # Check we are on version 5.
        page_aggregate_v5: Page = app.repository.get(page_id)
        self.assertEqual(page_aggregate_v5.version, 5)

        # Check the body was updated.
        page = app.get_page_by_slug(slug="welcome-visitors")
        self.assertEqual(page["title"], "Welcome Visitors")
        self.assertEqual(page["slug"], "welcome-visitors")
        self.assertEqual(
            page["body"],
            """
Welcome to this wiki!

This is a wiki about...
""",
        )
        self.assertEqual(page["modified_by"], user_id5)

        # Check a snapshot was taken.
        self.assertTrue(len(list(app.snapshots.get(page_id))))

        # Check all the Page events have the correct user IDs.
        user_ids = iter([user_id1, user_id2, user_id3, user_id4, user_id5])
        for notification in NotificationLogReader(app.notification_log).read(start=1):
            domain_event = app.mapper.to_domain_event(notification)
            if isinstance(domain_event, Page.Event):
                self.assertEqual(domain_event.user_id, next(user_ids))

        # Create some more pages.
        app.create_page("Page 2", "page-2")
        app.create_page("Page 3", "page-3")
        app.create_page("Page 4", "page-4")
        app.create_page("Page 5", "page-5")

        # List all the pages.
        pages = list(app.get_pages(desc=True))
        self.assertEqual(pages[0]["title"], "Page 5")
        self.assertEqual(pages[0]["slug"], "page-5")
        self.assertEqual(pages[1]["title"], "Page 4")
        self.assertEqual(pages[1]["slug"], "page-4")
        self.assertEqual(pages[2]["title"], "Page 3")
        self.assertEqual(pages[2]["slug"], "page-3")
        self.assertEqual(pages[3]["title"], "Page 2")
        self.assertEqual(pages[3]["slug"], "page-2")
        self.assertEqual(pages[4]["title"], "Welcome Visitors")
        self.assertEqual(pages[4]["slug"], "welcome-visitors")

        pages = list(app.get_pages(desc=True, limit=3))
        self.assertEqual(len(pages), 3)
        self.assertEqual(pages[0]["slug"], "page-5")
        self.assertEqual(pages[1]["slug"], "page-4")
        self.assertEqual(pages[2]["slug"], "page-3")

        pages = list(app.get_pages(desc=True, limit=3, lte=2))
        self.assertEqual(len(pages), 2)
        self.assertEqual(pages[0]["slug"], "page-2")
        self.assertEqual(pages[1]["slug"], "welcome-visitors")

        pages = list(app.get_pages(desc=True, lte=2))
        self.assertEqual(len(pages), 2)
        self.assertEqual(pages[0]["slug"], "page-2")
        self.assertEqual(pages[1]["slug"], "welcome-visitors")

        # Check we can't change the slug of a page to one
        # that is being used by another page.
        app.get_page_by_slug(slug="page-3")
        with self.assertRaises(SlugConflictError):
            app.update_slug("page-2", "page-3")

        # Check we can change the slug of a page to one
        # that was previously being used by another page.
        app.get_page_by_slug(slug="welcome-visitors")
        with self.assertRaises(PageNotFoundError):
            app.get_page_by_slug(slug="welcome")
        slug: Slug = app.repository.get(Slug.create_id("welcome"))
        self.assertIsNone(slug.page_id)

        app.update_slug("welcome-visitors", "welcome")

        page = app.get_page_by_slug(slug="welcome")
        self.assertEqual(page["title"], "Welcome Visitors")

Code reference

examples.contentmanagement.domainmodel.user_id_cvar: ContextVar[UUID | None] = <ContextVar name='user_id' default=None>

Context variable holding a user ID for the current thread.

class examples.contentmanagement.domainmodel.Page(*args: 'Any', **kwargs: 'Any')[source]

Bases: Aggregate

title: str

The title of the page.

slug: str

The slug of the page - used in URLs.

body: str

The proper content of the page.

modified_by: UUID | None

The ID of the user who last modified the page.

class Event(self, originator_id: 'UUID', originator_version: 'int', timestamp: 'datetime')[source]

Bases: Event

user_id: UUID | None
apply(aggregate: Aggregate) None[source]

Sets the aggregate’s modified_by attribute to the value of the event’s user_id attribute.

originator_id_type

alias of UUID

update_slug(slug: str) None[source]
update_title(title: str) None[source]
update_body(body: str) None[source]
class Created(self, originator_id: 'UUID', originator_version: 'int', timestamp: 'datetime', originator_topic: 'str')[source]

Bases: Created, Event

title: str
slug: str
body: str
originator_id_type

alias of UUID

class BodyUpdated(self, originator_id: 'UUID', originator_version: 'int', timestamp: 'datetime', diff: 'str')[source]

Bases: DecoratedFuncCaller, BodyUpdated

_update_body(diff: str) None[source]
class SlugUpdated(self, originator_id: 'UUID', originator_version: 'int', timestamp: 'datetime')

Bases: DecoratedFuncCaller, Event

slug: str
class TitleUpdated(self, originator_id: 'UUID', originator_version: 'int', timestamp: 'datetime')

Bases: DecoratedFuncCaller, Event

title: str
class examples.contentmanagement.domainmodel.Slug(*args: 'Any', **kwargs: 'Any')[source]

Bases: Aggregate

name: str
page_id: UUID | None
class Created(self, originator_id: 'UUID', originator_version: 'int', timestamp: 'datetime', originator_topic: 'str')

Bases: Created, Event

originator_id_type

alias of UUID

name: str
page_id: UUID | None
class Event(self, originator_id: 'UUID', originator_version: 'int', timestamp: 'datetime')[source]

Bases: Event

originator_id_type

alias of UUID

timestamp: datetime

Timezone-aware datetime object representing when an event occurred.

originator_id: TAggregateID

UUID identifying an aggregate to which the event belongs.

originator_version: int

Integer identifying the version of the aggregate when the event occurred.

class PageUpdated(self, originator_id: 'UUID', originator_version: 'int', timestamp: 'datetime')

Bases: DecoratedFuncCaller, Event

page_id: UUID | None
static create_id(name: str) UUID[source]

Returns a new aggregate ID.

update_page(page_id: UUID | None) None[source]
class examples.contentmanagement.domainmodel.PageLogged(self, originator_id: 'UUID', originator_version: 'int', timestamp: 'datetime')[source]

Bases: DomainEvent

page_id: UUID
class examples.contentmanagement.application.ContentManagement(env: EnvType | None = None)[source]

Bases: Application[UUID]

env: ClassVar[dict[str, str]] = {'CONTENTMANAGEMENT_COMPRESSOR_TOPIC': 'gzip'}
snapshotting_intervals: ClassVar[dict[type[MutableOrImmutableAggregate[UUID]], int]] = {<class 'examples.contentmanagement.domainmodel.Page'>: 5}
create_page(title: str, slug: str) int[source]
get_page_by_slug(slug: str) dict[str, str | Any][source]
get_page_by_id(page_id: UUID) dict[str, str | Any][source]
update_title(slug: str, title: str) int[source]
update_slug(old_slug: str, new_slug: str) int[source]
update_body(slug: str, body: str) int[source]
get_pages(*, gt: int | None = None, lte: int | None = None, desc: bool = False, limit: int | None = None) Iterator[PageDetailsType][source]
name = 'ContentManagement'
exception examples.contentmanagement.application.PageNotFoundError[source]

Bases: Exception

Raised when a page is not found.

exception examples.contentmanagement.application.SlugConflictError[source]

Bases: Exception

Raised when updating a page to a slug used by another page.

examples.contentmanagement.utils.create_diff(old: str, new: str) str[source]
examples.contentmanagement.utils.apply_diff(old: str, diff: str) str[source]
examples.contentmanagement.utils.run(cmd: str, a: str, b: str) str[source]
class examples.contentmanagement.test.TestContentManagement(methodName='runTest')[source]

Bases: TestCase

test() None[source]