Application 3 - Content management

This example demonstrates the use of namespaced IDs for both discovery of aggregate IDs and implementation of an application-wide rule (or “invariant”). This example also involves event-sourced logs, automatic snapshotting, and the use of the declarative syntax for domain models with non-trivial command methods.

This example also shows how to use a thread-specific context variable to set the value of a common event attribute without cluttering all the command methods with the same argument. In this example the ID of the user is recorded on each event, but the same technique can be used to set correlation and causation IDs on all events in a domain model.

Application

The ContentManagementApplication class defines methods to create a new page, to get the details for a page by its slug, to update the title of a page referenced by its slug, to update the body of a page, and to change the page slug.

To get to a page, the slug is used to identify an index, and the index is used to get the page ID, and then the page ID is used to get the body and title of the page. To change a slug, the index objects for the old and the new are identified, the page ID is removed as the reference from the old index and set as the reference on the new index. The indexes are also used to implement a application-wide rule (or “invariant”) that a slug can be used by only one page, such that if an attempt is made to change the slug of one page to a slug that is already being used by another page, then a SlugConflictError will be raised, and no changes made.

The application also demonstrates the “event-sourced log” recipe, by showing how all the IDs of the Page aggregates can be listed, by logging the page ID in a sequence of stored events, and then selecting from this sequence when presenting a list of pages.

Please note, although the domain model (see below) involves a user_id event attribute, none of the application command methods mention a user_id argument. Instead the value is set in a context variable by callers of the application command methods (see the test below).

from __future__ import annotations

from typing import Any, Dict, Iterator, Optional, Union, cast
from uuid import NAMESPACE_URL, UUID, uuid5

from eventsourcing.application import AggregateNotFound, Application, EventSourcedLog
from eventsourcing.examples.contentmanagement.domainmodel import Index, Page, PageLogged
from eventsourcing.utils import EnvType

PageDetailsType = Dict[str, Union[str, Any]]


class ContentManagementApplication(Application):
    env = {"COMPRESSOR_TOPIC": "gzip"}
    snapshotting_intervals = {Page: 5}

    def __init__(self, env: Optional[EnvType] = None) -> None:
        super().__init__(env)
        self.page_log: EventSourcedLog[PageLogged] = EventSourcedLog(
            self.events, uuid5(NAMESPACE_URL, "/page_log"), PageLogged
        )

    def create_page(self, title: str, slug: str) -> None:
        page = Page(title=title, slug=slug)
        page_logged = self.page_log.trigger_event(page_id=page.id)
        index_entry = Index(slug, ref=page.id)
        self.save(page, page_logged, index_entry)

    def get_page_by_slug(self, slug: str) -> PageDetailsType:
        page = self._get_page_by_slug(slug)
        return self._details_from_page(page)

    def get_page_by_id(self, page_id: UUID) -> PageDetailsType:
        page = self._get_page_by_id(page_id)
        return self._details_from_page(page)

    def _details_from_page(self, page: Page) -> PageDetailsType:
        return {
            "title": page.title,
            "slug": page.slug,
            "body": page.body,
            "modified_by": page.modified_by,
        }

    def update_title(self, slug: str, title: str) -> None:
        page = self._get_page_by_slug(slug)
        page.update_title(title=title)
        self.save(page)

    def update_slug(self, old_slug: str, new_slug: str) -> None:
        page = self._get_page_by_slug(old_slug)
        page.update_slug(new_slug)
        old_index = self._get_index(old_slug)
        old_index.update_ref(None)
        try:
            new_index = self._get_index(new_slug)
        except AggregateNotFound:
            new_index = Index(new_slug, page.id)
        else:
            if new_index.ref is None:
                new_index.update_ref(page.id)
            else:
                raise SlugConflictError()
        self.save(page, old_index, new_index)

    def update_body(self, slug: str, body: str) -> None:
        page = self._get_page_by_slug(slug)
        page.update_body(body)
        self.save(page)

    def _get_page_by_slug(self, slug: str) -> Page:
        try:
            index = self._get_index(slug)
        except AggregateNotFound:
            raise PageNotFound(slug)
        if index.ref is None:
            raise PageNotFound(slug)
        page_id = index.ref
        return self._get_page_by_id(page_id)

    def _get_page_by_id(self, page_id: UUID) -> Page:
        return cast(Page, self.repository.get(page_id))

    def _get_index(self, slug: str) -> Index:
        return cast(Index, self.repository.get(Index.create_id(slug)))

    def get_pages(
        self,
        gt: Optional[int] = None,
        lte: Optional[int] = None,
        desc: bool = False,
        limit: Optional[int] = None,
    ) -> Iterator[PageDetailsType]:
        for page_logged in self.page_log.get(gt, lte, desc, limit):
            page = self._get_page_by_id(page_logged.page_id)
            yield self._details_from_page(page)


class PageNotFound(Exception):
    """
    Raised when a page is not found.
    """


class SlugConflictError(Exception):
    """
    Raised when updating a page to a slug used by another page.
    """

Domain model

In the domain model below, the Page aggregate has a base class Event which is defined with a user_id data class field. This base aggregate event class is inherited by all its concrete aggregate event classes. The user_id field is defined not to be included in the __init__ method of the aggregate’s event classes (init=False), and so it does not need to be matched by parameters in the aggregate command method signatures. Instead, this field gets the event attribute value from a Python context variable (default_factory=user_id_cvar.get).

The update_body() command method does some work on the command arguments before the BodyUpdated event is triggered. It creates a “diff” of the current version of the body and the new version. It then triggers an event, which contains the diff, by calling _update_body(). The event is applied to the body by patching the current version of the body with the diff that has been encapsulated by the event object.

The Index aggregate has a version-5 UUID which is a function of a slug. The Index and Page aggregates are used in combination to maintain editable pages of text, with editable titles, and with editable “slugs” that can be used in page URLs.

A PageLogged event is also defined, and used to define a “page log” in the application.

from __future__ import annotations

from contextvars import ContextVar
from dataclasses import dataclass, field
from typing import Optional, cast
from uuid import NAMESPACE_URL, UUID, uuid5

from eventsourcing.domain import Aggregate, DomainEvent, event
from eventsourcing.examples.contentmanagement.utils import apply_patch, create_diff

user_id_cvar: ContextVar[Optional[UUID]] = ContextVar("user_id", default=None)


class Page(Aggregate):
    class Event(Aggregate.Event):
        user_id: Optional[UUID] = field(default_factory=user_id_cvar.get, init=False)

        def apply(self, aggregate: Aggregate) -> None:
            cast(Page, aggregate).modified_by = self.user_id

    class Created(Event, Aggregate.Created):
        title: str
        slug: str
        body: str

    def __init__(self, title: str, slug: str, body: str = ""):
        self.title = title
        self.slug = slug
        self.body = body
        self.modified_by: Optional[UUID] = None

    @event("SlugUpdated")
    def update_slug(self, slug: str) -> None:
        self.slug = slug

    @event("TitleUpdated")
    def update_title(self, title: str) -> None:
        self.title = title

    def update_body(self, body: str) -> None:
        self._update_body(create_diff(old=self.body, new=body))

    class BodyUpdated(Event):
        diff: str

    @event(BodyUpdated)
    def _update_body(self, diff: str) -> None:
        self.body = apply_patch(old=self.body, diff=diff)


@dataclass
class Index(Aggregate):
    slug: str
    ref: Optional[UUID]

    class Event(Aggregate.Event):
        pass

    @staticmethod
    def create_id(slug: str) -> UUID:
        return uuid5(NAMESPACE_URL, f"/slugs/{slug}")

    @event("RefChanged")
    def update_ref(self, ref: Optional[UUID]) -> None:
        self.ref = ref


class PageLogged(DomainEvent):
    page_id: UUID

The create_diff() and apply_patch() functions use the Unix command line tools patch and diff.

from __future__ import annotations

import os
from tempfile import TemporaryDirectory


def create_diff(old: str, new: str) -> str:
    return run("diff %s %s > %s", old, new)


def apply_patch(old: str, diff: str) -> str:
    return run("patch -s %s %s -o %s", old, diff)


def run(cmd: str, a: str, b: str) -> str:
    with TemporaryDirectory() as td:
        a_path = os.path.join(td, "a")
        b_path = os.path.join(td, "b")
        c_path = os.path.join(td, "c")
        with open(a_path, "w") as a_file:
            a_file.write(a)
        with open(b_path, "w") as b_file:
            b_file.write(b)
        os.system(cmd % (a_path, b_path, c_path))
        with open(c_path, "r") as c_file:
            return c_file.read()

Test case

The test case below sets a user ID in the context variable. A page is created and updated in various ways. At the end, all the page events are checked to make sure they all have the user ID that was set in the context variable.

from __future__ import annotations

from typing import cast
from unittest import TestCase
from uuid import uuid4

from eventsourcing.examples.contentmanagement.application import (
    ContentManagementApplication,
    PageNotFound,
    SlugConflictError,
)
from eventsourcing.examples.contentmanagement.domainmodel import (
    Index,
    Page,
    user_id_cvar,
)
from eventsourcing.system import NotificationLogReader


class TestContentManagement(TestCase):
    def test(self) -> None:
        # Set user_id context variable.
        user_id = uuid4()
        user_id_cvar.set(user_id)

        # Construct application.
        app = ContentManagementApplication()

        # Check the page doesn't exist.
        with self.assertRaises(PageNotFound):
            app.get_page_by_slug(slug="welcome")

        # Check the list of pages is empty.
        pages = list(app.get_pages())
        self.assertEqual(len(pages), 0)

        # Create a page.
        app.create_page(title="Welcome", slug="welcome")

        # Present page identified by the given slug.
        page = app.get_page_by_slug(slug="welcome")

        # Check we got a dict that has the given title and slug.
        self.assertEqual(page["title"], "Welcome")
        self.assertEqual(page["slug"], "welcome")
        self.assertEqual(page["body"], "")
        self.assertEqual(page["modified_by"], user_id)

        # Update the title.
        app.update_title(slug="welcome", title="Welcome Visitors")

        # Check the title was updated.
        page = app.get_page_by_slug(slug="welcome")
        self.assertEqual(page["title"], "Welcome Visitors")
        self.assertEqual(page["modified_by"], user_id)

        # Update the slug.
        app.update_slug(old_slug="welcome", new_slug="welcome-visitors")

        # Check the index was updated.
        with self.assertRaises(PageNotFound):
            app.get_page_by_slug(slug="welcome")

        # Check we can get the page by the new slug.
        page = app.get_page_by_slug(slug="welcome-visitors")
        self.assertEqual(page["title"], "Welcome Visitors")
        self.assertEqual(page["slug"], "welcome-visitors")

        # Update the body.
        app.update_body(slug="welcome-visitors", body="Welcome to my wiki")

        # Check the body was updated.
        page = app.get_page_by_slug(slug="welcome-visitors")
        self.assertEqual(page["body"], "Welcome to my wiki")

        # Update the body.
        app.update_body(slug="welcome-visitors", body="Welcome to this wiki")

        # Check the body was updated.
        page = app.get_page_by_slug(slug="welcome-visitors")
        self.assertEqual(page["body"], "Welcome to this wiki")

        # Update the body.
        app.update_body(
            slug="welcome-visitors",
            body="""
Welcome to this wiki!

This is a wiki about...
""",
        )

        # Check the body was updated.
        page = app.get_page_by_slug(slug="welcome-visitors")
        self.assertEqual(
            page["body"],
            """
Welcome to this wiki!

This is a wiki about...
""",
        )

        # Check all the Page events have the user_id.
        for notification in NotificationLogReader(app.notification_log).read(start=1):
            domain_event = app.mapper.to_domain_event(notification)
            if isinstance(domain_event, Page.Event):
                self.assertEqual(domain_event.user_id, user_id)

        # Change user_id context variable.
        user_id = uuid4()
        user_id_cvar.set(user_id)

        # Update the body.
        app.update_body(
            slug="welcome-visitors",
            body="""
Welcome to this wiki!

This is a wiki about us!
""",
        )

        # Check 'modified_by' changed.
        page = app.get_page_by_slug(slug="welcome-visitors")
        self.assertEqual(page["title"], "Welcome Visitors")
        self.assertEqual(page["modified_by"], user_id)

        # Check a snapshot was created by now.
        assert app.snapshots
        index = cast(Index, app.repository.get(Index.create_id("welcome-visitors")))
        assert index.ref
        self.assertTrue(len(list(app.snapshots.get(index.ref))))

        # Create some more pages and list all the pages.
        app.create_page("Page 2", "page-2")
        app.create_page("Page 3", "page-3")
        app.create_page("Page 4", "page-4")
        app.create_page("Page 5", "page-5")

        pages = list(app.get_pages(desc=True))
        self.assertEqual(pages[0]["title"], "Page 5")
        self.assertEqual(pages[0]["slug"], "page-5")
        self.assertEqual(pages[1]["title"], "Page 4")
        self.assertEqual(pages[1]["slug"], "page-4")
        self.assertEqual(pages[2]["title"], "Page 3")
        self.assertEqual(pages[2]["slug"], "page-3")
        self.assertEqual(pages[3]["title"], "Page 2")
        self.assertEqual(pages[3]["slug"], "page-2")
        self.assertEqual(pages[4]["title"], "Welcome Visitors")
        self.assertEqual(pages[4]["slug"], "welcome-visitors")

        pages = list(app.get_pages(desc=True, limit=3))
        self.assertEqual(len(pages), 3)
        self.assertEqual(pages[0]["slug"], "page-5")
        self.assertEqual(pages[1]["slug"], "page-4")
        self.assertEqual(pages[2]["slug"], "page-3")

        pages = list(app.get_pages(desc=True, limit=3, lte=2))
        self.assertEqual(len(pages), 2)
        self.assertEqual(pages[0]["slug"], "page-2")
        self.assertEqual(pages[1]["slug"], "welcome-visitors")

        pages = list(app.get_pages(desc=True, lte=2))
        self.assertEqual(len(pages), 2)
        self.assertEqual(pages[0]["slug"], "page-2")
        self.assertEqual(pages[1]["slug"], "welcome-visitors")

        # Check we can't change the slug of a page to one
        # that is being used by another page.
        with self.assertRaises(SlugConflictError):
            app.update_slug("page-2", "page-3")

        # Check we can change the slug of a page to one
        # that was previously being used.
        app.update_slug("welcome-visitors", "welcome")

        page = app.get_page_by_slug(slug="welcome")
        self.assertEqual(page["title"], "Welcome Visitors")
        self.assertEqual(page["modified_by"], user_id)