Wiki application

This example demonstrates the use of version 5 UUIDs for both discovery of aggregate IDs and also to implement an application-wide rule (or “invariant”), the use of the declarative syntax for domain models with a “non-trivial” command method, automatic snapshotting, automatic setting of a common attribute on all events without needing to mention this attribute in the command methods, and a recipe for an event-sourced log.

Domain model

In the domain model below, the Page aggregate has a base class Event which is defined with a user_id dataclass field that is defined not to be included in its __init__ method, and so does not need to be matched by parameters in the command method signatures. It has a default factory which gets the event attribute value from a Python context variable. This base aggregate event class is inherited by all its concrete aggregate event classes.

The update_body() command method does a “non-trival” amount of work before the BodyUpdated event is triggered, by creating a “diff” of the current version of the body and the new version. It then triggers an event, which contains the diff. The event is applied to the body by “patching” the current version of the body with this diff.

The Index aggregate has a version 5 UUID which is a function of a slug. The Index and Page aggregates are used in combination to maintain editable pages of text, with editable titles, and with editable “slugs” that can be used in page URLs.

A PageLogged event is also defined, and used to define a “page log” in the application.

from contextvars import ContextVar
from dataclasses import dataclass, field
from typing import Optional
from uuid import NAMESPACE_URL, UUID, uuid5

from eventsourcing.domain import Aggregate, AggregateEvent, event
from eventsourcing.examples.wiki.utils import apply_patch, create_diff

user_id_cvar: ContextVar[Optional[UUID]] = ContextVar("user_id", default=None)


@dataclass
class Page(Aggregate):
    title: str
    slug: str
    body: str = ""
    modified_by: Optional[UUID] = field(default=None, init=False)

    class Event(Aggregate.Event["Page"]):
        user_id: Optional[UUID] = field(default_factory=user_id_cvar.get, init=False)

        def apply(self, aggregate: "Page") -> None:
            aggregate.modified_by = self.user_id

    @event("SlugUpdated")
    def update_slug(self, slug: str) -> None:
        self.slug = slug

    @event("TitleUpdated")
    def update_title(self, title: str) -> None:
        self.title = title

    def update_body(self, body: str) -> None:
        self._update_body(create_diff(old=self.body, new=body))

    @event("BodyUpdated")
    def _update_body(self, diff: str) -> None:
        self.body = apply_patch(old=self.body, diff=diff)


@dataclass
class Index(Aggregate):
    slug: str
    ref: Optional[UUID]

    @staticmethod
    def create_id(slug: str) -> UUID:
        return uuid5(NAMESPACE_URL, f"/slugs/{slug}")

    @event("RefChanged")
    def update_ref(self, ref: Optional[UUID]) -> None:
        self.ref = ref


class PageLogged(AggregateEvent[Aggregate]):
    page_id: UUID

The create_diff() and apply_patch() functions use the Unix command line tools patch and diff.

import os
from tempfile import TemporaryDirectory


def create_diff(old: str, new: str) -> str:
    return run("diff %s %s > %s", old, new)


def apply_patch(old: str, diff: str) -> str:
    return run("patch -s %s %s -o %s", old, diff)


def run(cmd: str, a: str, b: str) -> str:
    with TemporaryDirectory() as td:
        a_path = os.path.join(td, "a")
        b_path = os.path.join(td, "b")
        c_path = os.path.join(td, "c")
        with open(a_path, "w") as a_file:
            a_file.write(a)
        with open(b_path, "w") as b_file:
            b_file.write(b)
        os.system(cmd % (a_path, b_path, c_path))
        with open(c_path, "r") as c_file:
            return c_file.read()

Application

The application provides methods to create a new page, get the details for a page by its slug, update the title of a page referenced by its slug, update the body of a page, and change the page slug. Please note that none of these methods mention a user_id argument. To get to a page, the slug is used to identify an index, and the index is used to get the page ID, and then the page ID is used to get the body and title of the page. To change a slug, the index objects for the old and the new are identified, the page ID is removed as the reference from the old index and set as the reference on the new index. The indexes are also used to implement a application-wide rule (or “invariant”) that a slug can be used by only one page, such that if an attempt is made to change the slug of one page to a slug that is already being used by another page, then a SlugConflictError will be raised, and no changes made.

The application also demonstrates the “event-sourced log” recipe, by showing how all the IDs of the Page aggregates can be listed, by logging the IDs when a new page is created, in a sequence of stored events, and then selecting from this sequence when presenting a list of pages.

from typing import (
    Any,
    Dict,
    Generic,
    Iterator,
    Mapping,
    Optional,
    Type,
    Union,
    cast,
)
from uuid import NAMESPACE_URL, UUID, uuid5

from eventsourcing.application import AggregateNotFound, Application
from eventsourcing.domain import Aggregate, AggregateEvent, TDomainEvent
from eventsourcing.examples.wiki.domainmodel import Index, Page, PageLogged
from eventsourcing.persistence import EventStore

PageDetailsType = Dict[str, Union[str, Any]]


class WikiApplication(Application[Aggregate]):
    env = {"COMPRESSOR_TOPIC": "gzip"}
    snapshotting_intervals = {Page: 5}

    def __init__(self, env: Optional[Mapping[str, str]] = None) -> None:
        super().__init__(env)
        self.page_log: Log[PageLogged] = Log(
            self.events, uuid5(NAMESPACE_URL, "/page_log"), PageLogged
        )

    def create_page(self, title: str, slug: str) -> None:
        page = Page(title=title, slug=slug)
        page_logged = self.page_log.trigger_event(page_id=page.id)
        index_entry = Index(slug, ref=page.id)
        self.save(page, page_logged, index_entry)

    def get_page_details(self, slug: str) -> PageDetailsType:
        page = self._get_page_by_slug(slug)
        return self._details_from_page(page)

    def _details_from_page(self, page: Page) -> PageDetailsType:
        return {
            "title": page.title,
            "slug": page.slug,
            "body": page.body,
            "modified_by": page.modified_by,
        }

    def update_title(self, slug: str, title: str) -> None:
        page = self._get_page_by_slug(slug)
        page.update_title(title=title)
        self.save(page)

    def update_slug(self, old_slug: str, new_slug: str) -> None:
        page = self._get_page_by_slug(old_slug)
        page.update_slug(new_slug)
        old_index = self._get_index(old_slug)
        old_index.update_ref(None)
        try:
            new_index = self._get_index(new_slug)
        except AggregateNotFound:
            new_index = Index(new_slug, page.id)
        else:
            if new_index.ref is None:
                new_index.update_ref(page.id)
            else:
                raise SlugConflictError()
        self.save(page, old_index, new_index)

    def update_body(self, slug: str, body: str) -> None:
        page = self._get_page_by_slug(slug)
        page.update_body(body)
        self.save(page)

    def _get_page_by_slug(self, slug: str) -> Page:
        try:
            index = self._get_index(slug)
        except AggregateNotFound:
            raise PageNotFound(slug)
        if index.ref is None:
            raise PageNotFound(slug)
        page_id = index.ref
        return self._get_page_by_id(page_id)

    def _get_page_by_id(self, page_id: UUID) -> Page:
        return cast(Page, self.repository.get(page_id))

    def _get_index(self, slug: str) -> Index:
        return cast(Index, self.repository.get(Index.create_id(slug)))

    def get_pages(self, limit: int = 10, offset: int = 0) -> Iterator[PageDetailsType]:
        for page_logged in self.page_log.get(limit, offset):
            page = self._get_page_by_id(page_logged.page_id)
            yield self._details_from_page(page)


class Log(Generic[TDomainEvent]):
    def __init__(
        self,
        events: EventStore[AggregateEvent[Aggregate]],
        originator_id: UUID,
        logged_cls: Type[TDomainEvent],
    ):
        self.events = events
        self.originator_id = originator_id
        self.logged_cls = logged_cls

    def trigger_event(self, **kwargs: Any) -> TDomainEvent:
        last_logged = self._get_last_logged()
        if last_logged:
            next_originator_version = last_logged.originator_version + 1
        else:
            next_originator_version = Aggregate.INITIAL_VERSION
        return self.logged_cls(  # type: ignore
            originator_id=self.originator_id,
            originator_version=next_originator_version,
            timestamp=self.logged_cls.create_timestamp(),
            **kwargs,
        )

    def get(self, limit: int = 10, offset: int = 0) -> Iterator[TDomainEvent]:
        # Calculate lte.
        lte = None
        if offset > 0:
            last = self._get_last_logged()
            if last:
                lte = last.originator_version - offset

        # Get logged events.
        return cast(
            Iterator[TDomainEvent],
            self.events.get(
                originator_id=self.originator_id,
                lte=lte,
                desc=True,
                limit=limit,
            ),
        )

    def _get_last_logged(
        self,
    ) -> Optional[TDomainEvent]:
        events = self.events.get(originator_id=self.originator_id, desc=True, limit=1)
        try:
            return cast(TDomainEvent, next(events))
        except StopIteration:
            return None


class PageNotFound(Exception):
    """
    Raised when a page is not found.
    """


class SlugConflictError(Exception):
    """
    Raised when updating a page to a slug used by another page.
    """

Test case

The test case below sets a user ID in the context variable. A page is created and updated in various ways. At the end, all the page events are checked to make sure they all have the user ID that was set in the context variable.

from typing import cast
from unittest import TestCase
from uuid import uuid4

from eventsourcing.examples.wiki.application import (
    PageNotFound,
    SlugConflictError,
    WikiApplication,
)
from eventsourcing.examples.wiki.domainmodel import Index, Page, user_id_cvar
from eventsourcing.system import NotificationLogReader


class TestWiki(TestCase):
    def test(self) -> None:

        # Set user_id context variable.
        user_id = uuid4()
        user_id_cvar.set(user_id)

        # Construct application.
        app = WikiApplication()

        # Check the page doesn't exist.
        with self.assertRaises(PageNotFound):
            app.get_page_details(slug="welcome")

        # Check the list of pages is empty.
        pages = list(app.get_pages())
        self.assertEqual(len(pages), 0)
        pages = list(app.get_pages(offset=1))
        self.assertEqual(len(pages), 0)

        # Create a page.
        app.create_page(title="Welcome", slug="welcome")

        # Present page identified by the given slug.
        page = app.get_page_details(slug="welcome")

        # Check we got a dict that has the given title and slug.
        self.assertEqual(page["title"], "Welcome")
        self.assertEqual(page["slug"], "welcome")
        self.assertEqual(page["body"], "")
        self.assertEqual(page["modified_by"], user_id)

        # Update the title.
        app.update_title(slug="welcome", title="Welcome Visitors")

        # Check the title was updated.
        page = app.get_page_details(slug="welcome")
        self.assertEqual(page["title"], "Welcome Visitors")
        self.assertEqual(page["modified_by"], user_id)

        # Update the slug.
        app.update_slug(old_slug="welcome", new_slug="welcome-visitors")

        # Check the index was updated.
        with self.assertRaises(PageNotFound):
            app.get_page_details(slug="welcome")

        # Check we can get the page by the new slug.
        page = app.get_page_details(slug="welcome-visitors")
        self.assertEqual(page["title"], "Welcome Visitors")
        self.assertEqual(page["slug"], "welcome-visitors")

        # Update the body.
        app.update_body(slug="welcome-visitors", body="Welcome to my wiki")

        # Check the body was updated.
        page = app.get_page_details(slug="welcome-visitors")
        self.assertEqual(page["body"], "Welcome to my wiki")

        # Update the body.
        app.update_body(slug="welcome-visitors", body="Welcome to this wiki")

        # Check the body was updated.
        page = app.get_page_details(slug="welcome-visitors")
        self.assertEqual(page["body"], "Welcome to this wiki")

        # Update the body.
        app.update_body(
            slug="welcome-visitors",
            body="""
Welcome to this wiki!

This is a wiki about...
""",
        )

        # Check the body was updated.
        page = app.get_page_details(slug="welcome-visitors")
        self.assertEqual(
            page["body"],
            """
Welcome to this wiki!

This is a wiki about...
""",
        )

        # Check all the Page events have the user_id.
        for notification in NotificationLogReader(app.log).read(start=1):
            domain_event = app.mapper.to_domain_event(notification)
            if isinstance(domain_event, Page.Event):
                self.assertEqual(domain_event.user_id, user_id)

        # Change user_id context variable.
        user_id = uuid4()
        user_id_cvar.set(user_id)

        # Update the body.
        app.update_body(
            slug="welcome-visitors",
            body="""
Welcome to this wiki!

This is a wiki about us!
""",
        )

        # Check 'modified_by' changed.
        page = app.get_page_details(slug="welcome-visitors")
        self.assertEqual(page["title"], "Welcome Visitors")
        self.assertEqual(page["modified_by"], user_id)

        # Check a snapshot was created by now.
        assert app.snapshots
        index = cast(Index, app.repository.get(Index.create_id("welcome-visitors")))
        assert index.ref
        self.assertTrue(len(list(app.snapshots.get(index.ref))))

        # Create some more pages and list all the pages.
        app.create_page("Page 2", "page-2")
        app.create_page("Page 3", "page-3")
        app.create_page("Page 4", "page-4")
        app.create_page("Page 5", "page-5")

        pages = list(app.get_pages())
        self.assertEqual(pages[0]["title"], "Page 5")
        self.assertEqual(pages[0]["slug"], "page-5")
        self.assertEqual(pages[1]["title"], "Page 4")
        self.assertEqual(pages[1]["slug"], "page-4")
        self.assertEqual(pages[2]["title"], "Page 3")
        self.assertEqual(pages[2]["slug"], "page-3")
        self.assertEqual(pages[3]["title"], "Page 2")
        self.assertEqual(pages[3]["slug"], "page-2")
        self.assertEqual(pages[4]["title"], "Welcome Visitors")
        self.assertEqual(pages[4]["slug"], "welcome-visitors")

        pages = list(app.get_pages(limit=3))
        self.assertEqual(len(pages), 3)
        self.assertEqual(pages[0]["slug"], "page-5")
        self.assertEqual(pages[1]["slug"], "page-4")
        self.assertEqual(pages[2]["slug"], "page-3")

        pages = list(app.get_pages(limit=3, offset=3))
        self.assertEqual(len(pages), 2)
        self.assertEqual(pages[0]["slug"], "page-2")
        self.assertEqual(pages[1]["slug"], "welcome-visitors")

        pages = list(app.get_pages(offset=3))
        self.assertEqual(len(pages), 2)
        self.assertEqual(pages[0]["slug"], "page-2")
        self.assertEqual(pages[1]["slug"], "welcome-visitors")

        # Check we can't change the slug of a page to one
        # that is being used by another page.
        with self.assertRaises(SlugConflictError):
            app.update_slug("page-2", "page-3")

        # Check we can change the slug of a page to one
        # that was previously being used.
        app.update_slug("welcome-visitors", "welcome")

        page = app.get_page_details(slug="welcome")
        self.assertEqual(page["title"], "Welcome Visitors")
        self.assertEqual(page["modified_by"], user_id)