Wiki application

This example demonstrates the use of version-5 UUIDs for both discovery of aggregate IDs and also to implement an application-wide rule (or “invariant”), the use of the declarative syntax for domain models with a “non-trivial” command method, automatic snapshotting, automatic setting of a common attribute on all events without needing to mention this attribute in the command methods, and a recipe for an event-sourced log.


The application provides methods to create a new page, get the details for a page by its slug, update the title of a page referenced by its slug, update the body of a page, and change the page slug. Please note that none of these methods mention a user_id argument. To get to a page, the slug is used to identify an index, and the index is used to get the page ID, and then the page ID is used to get the body and title of the page. To change a slug, the index objects for the old and the new are identified, the page ID is removed as the reference from the old index and set as the reference on the new index. The indexes are also used to implement a application-wide rule (or “invariant”) that a slug can be used by only one page, such that if an attempt is made to change the slug of one page to a slug that is already being used by another page, then a SlugConflictError will be raised, and no changes made.

The application also demonstrates the “event-sourced log” recipe, by showing how all the IDs of the Page aggregates can be listed, by logging the IDs when a new page is created, in a sequence of stored events, and then selecting from this sequence when presenting a list of pages.

from typing import Any, Dict, Iterator, Optional, Union, cast
from uuid import NAMESPACE_URL, UUID, uuid5

from eventsourcing.application import (
from import Index, Page, PageLogged
from eventsourcing.utils import EnvType

PageDetailsType = Dict[str, Union[str, Any]]

class WikiApplication(Application):
    env = {"COMPRESSOR_TOPIC": "gzip"}
    snapshotting_intervals = {Page: 5}

    def __init__(self, env: Optional[EnvType] = None) -> None:
        self.page_log: EventSourcedLog[PageLogged] = EventSourcedLog(
  , uuid5(NAMESPACE_URL, "/page_log"), PageLogged

    def create_page(self, title: str, slug: str) -> None:
        page = Page(title=title, slug=slug)
        page_logged = self.page_log.trigger_event(
        index_entry = Index(slug,, page_logged, index_entry)

    def get_page_details(self, slug: str) -> PageDetailsType:
        page = self._get_page_by_slug(slug)
        return self._details_from_page(page)

    def _details_from_page(self, page: Page) -> PageDetailsType:
        return {
            "title": page.title,
            "slug": page.slug,
            "body": page.body,
            "modified_by": page.modified_by,

    def update_title(self, slug: str, title: str) -> None:
        page = self._get_page_by_slug(slug)

    def update_slug(self, old_slug: str, new_slug: str) -> None:
        page = self._get_page_by_slug(old_slug)
        old_index = self._get_index(old_slug)
            new_index = self._get_index(new_slug)
        except AggregateNotFound:
            new_index = Index(new_slug,
            if new_index.ref is None:
                raise SlugConflictError(), old_index, new_index)

    def update_body(self, slug: str, body: str) -> None:
        page = self._get_page_by_slug(slug)

    def _get_page_by_slug(self, slug: str) -> Page:
            index = self._get_index(slug)
        except AggregateNotFound:
            raise PageNotFound(slug)
        if index.ref is None:
            raise PageNotFound(slug)
        page_id = index.ref
        return self._get_page_by_id(page_id)

    def _get_page_by_id(self, page_id: UUID) -> Page:
        return cast(Page, self.repository.get(page_id))

    def _get_index(self, slug: str) -> Index:
        return cast(Index, self.repository.get(Index.create_id(slug)))

    def get_pages(
        gt: Optional[int] = None,
        lte: Optional[int] = None,
        desc: bool = False,
        limit: Optional[int] = None,
    ) -> Iterator[PageDetailsType]:
        for page_logged in self.page_log.get(gt, lte, desc, limit):
            page = self._get_page_by_id(page_logged.page_id)
            yield self._details_from_page(page)

class PageNotFound(Exception):
    Raised when a page is not found.

class SlugConflictError(Exception):
    Raised when updating a page to a slug used by another page.

Domain model

In the domain model below, the Page aggregate has a base class Event which is defined with a user_id dataclass field that is defined not to be included in its __init__ method, and so does not need to be matched by parameters in the command method signatures. It has a default factory which gets the event attribute value from a Python context variable. This base aggregate event class is inherited by all its concrete aggregate event classes.

The update_body() command method does a “non-trival” amount of work before the BodyUpdated event is triggered, by creating a “diff” of the current version of the body and the new version. It then triggers an event, which contains the diff. The event is applied to the body by “patching” the current version of the body with this diff.

The Index aggregate has a version-5 UUID which is a function of a slug. The Index and Page aggregates are used in combination to maintain editable pages of text, with editable titles, and with editable “slugs” that can be used in page URLs.

A PageLogged event is also defined, and used to define a “page log” in the application.

from contextvars import ContextVar
from dataclasses import dataclass, field
from typing import Optional
from uuid import NAMESPACE_URL, UUID, uuid5

from eventsourcing.domain import Aggregate, LogEvent, event
from import apply_patch, create_diff

user_id_cvar: ContextVar[Optional[UUID]] = ContextVar("user_id", default=None)

class Page(Aggregate):
    title: str
    slug: str
    body: str = ""
    modified_by: Optional[UUID] = field(default=None, init=False)

    class Event(Aggregate.Event["Page"]):
        user_id: Optional[UUID] = field(default_factory=user_id_cvar.get, init=False)

        def apply(self, aggregate: "Page") -> None:
            aggregate.modified_by = self.user_id

    def update_slug(self, slug: str) -> None:
        self.slug = slug

    def update_title(self, title: str) -> None:
        self.title = title

    def update_body(self, body: str) -> None:
        self._update_body(create_diff(old=self.body, new=body))

    def _update_body(self, diff: str) -> None:
        self.body = apply_patch(old=self.body, diff=diff)

class Index(Aggregate):
    slug: str
    ref: Optional[UUID]

    def create_id(slug: str) -> UUID:
        return uuid5(NAMESPACE_URL, f"/slugs/{slug}")

    def update_ref(self, ref: Optional[UUID]) -> None:
        self.ref = ref

class PageLogged(LogEvent):
    page_id: UUID

The create_diff() and apply_patch() functions use the Unix command line tools patch and diff.

import os
from tempfile import TemporaryDirectory

def create_diff(old: str, new: str) -> str:
    return run("diff %s %s > %s", old, new)

def apply_patch(old: str, diff: str) -> str:
    return run("patch -s %s %s -o %s", old, diff)

def run(cmd: str, a: str, b: str) -> str:
    with TemporaryDirectory() as td:
        a_path = os.path.join(td, "a")
        b_path = os.path.join(td, "b")
        c_path = os.path.join(td, "c")
        with open(a_path, "w") as a_file:
        with open(b_path, "w") as b_file:
        os.system(cmd % (a_path, b_path, c_path))
        with open(c_path, "r") as c_file:

Test case

The test case below sets a user ID in the context variable. A page is created and updated in various ways. At the end, all the page events are checked to make sure they all have the user ID that was set in the context variable.

from typing import cast
from unittest import TestCase
from uuid import uuid4

from import (
from import Index, Page, user_id_cvar
from eventsourcing.system import NotificationLogReader

class TestWiki(TestCase):
    def test(self) -> None:

        # Set user_id context variable.
        user_id = uuid4()

        # Construct application.
        app = WikiApplication()

        # Check the page doesn't exist.
        with self.assertRaises(PageNotFound):

        # Check the list of pages is empty.
        pages = list(app.get_pages())
        self.assertEqual(len(pages), 0)

        # Create a page.
        app.create_page(title="Welcome", slug="welcome")

        # Present page identified by the given slug.
        page = app.get_page_details(slug="welcome")

        # Check we got a dict that has the given title and slug.
        self.assertEqual(page["title"], "Welcome")
        self.assertEqual(page["slug"], "welcome")
        self.assertEqual(page["body"], "")
        self.assertEqual(page["modified_by"], user_id)

        # Update the title.
        app.update_title(slug="welcome", title="Welcome Visitors")

        # Check the title was updated.
        page = app.get_page_details(slug="welcome")
        self.assertEqual(page["title"], "Welcome Visitors")
        self.assertEqual(page["modified_by"], user_id)

        # Update the slug.
        app.update_slug(old_slug="welcome", new_slug="welcome-visitors")

        # Check the index was updated.
        with self.assertRaises(PageNotFound):

        # Check we can get the page by the new slug.
        page = app.get_page_details(slug="welcome-visitors")
        self.assertEqual(page["title"], "Welcome Visitors")
        self.assertEqual(page["slug"], "welcome-visitors")

        # Update the body.
        app.update_body(slug="welcome-visitors", body="Welcome to my wiki")

        # Check the body was updated.
        page = app.get_page_details(slug="welcome-visitors")
        self.assertEqual(page["body"], "Welcome to my wiki")

        # Update the body.
        app.update_body(slug="welcome-visitors", body="Welcome to this wiki")

        # Check the body was updated.
        page = app.get_page_details(slug="welcome-visitors")
        self.assertEqual(page["body"], "Welcome to this wiki")

        # Update the body.
Welcome to this wiki!

This is a wiki about...

        # Check the body was updated.
        page = app.get_page_details(slug="welcome-visitors")
Welcome to this wiki!

This is a wiki about...

        # Check all the Page events have the user_id.
        for notification in NotificationLogReader(app.notification_log).read(start=1):
            domain_event = app.mapper.to_domain_event(notification)
            if isinstance(domain_event, Page.Event):
                self.assertEqual(domain_event.user_id, user_id)

        # Change user_id context variable.
        user_id = uuid4()

        # Update the body.
Welcome to this wiki!

This is a wiki about us!

        # Check 'modified_by' changed.
        page = app.get_page_details(slug="welcome-visitors")
        self.assertEqual(page["title"], "Welcome Visitors")
        self.assertEqual(page["modified_by"], user_id)

        # Check a snapshot was created by now.
        assert app.snapshots
        index = cast(Index, app.repository.get(Index.create_id("welcome-visitors")))
        assert index.ref

        # Create some more pages and list all the pages.
        app.create_page("Page 2", "page-2")
        app.create_page("Page 3", "page-3")
        app.create_page("Page 4", "page-4")
        app.create_page("Page 5", "page-5")

        pages = list(app.get_pages(desc=True))
        self.assertEqual(pages[0]["title"], "Page 5")
        self.assertEqual(pages[0]["slug"], "page-5")
        self.assertEqual(pages[1]["title"], "Page 4")
        self.assertEqual(pages[1]["slug"], "page-4")
        self.assertEqual(pages[2]["title"], "Page 3")
        self.assertEqual(pages[2]["slug"], "page-3")
        self.assertEqual(pages[3]["title"], "Page 2")
        self.assertEqual(pages[3]["slug"], "page-2")
        self.assertEqual(pages[4]["title"], "Welcome Visitors")
        self.assertEqual(pages[4]["slug"], "welcome-visitors")

        pages = list(app.get_pages(desc=True, limit=3))
        self.assertEqual(len(pages), 3)
        self.assertEqual(pages[0]["slug"], "page-5")
        self.assertEqual(pages[1]["slug"], "page-4")
        self.assertEqual(pages[2]["slug"], "page-3")

        pages = list(app.get_pages(desc=True, limit=3, lte=2))
        self.assertEqual(len(pages), 2)
        self.assertEqual(pages[0]["slug"], "page-2")
        self.assertEqual(pages[1]["slug"], "welcome-visitors")

        pages = list(app.get_pages(desc=True, lte=2))
        self.assertEqual(len(pages), 2)
        self.assertEqual(pages[0]["slug"], "page-2")
        self.assertEqual(pages[1]["slug"], "welcome-visitors")

        # Check we can't change the slug of a page to one
        # that is being used by another page.
        with self.assertRaises(SlugConflictError):
            app.update_slug("page-2", "page-3")

        # Check we can change the slug of a page to one
        # that was previously being used.
        app.update_slug("welcome-visitors", "welcome")

        page = app.get_page_details(slug="welcome")
        self.assertEqual(page["title"], "Welcome Visitors")
        self.assertEqual(page["modified_by"], user_id)