Application 3 - Content management¶
This example demonstrates the use of namespaced IDs for both discovery of aggregate IDs and implementation of an application-wide rule (or “invariant”). This example also involves event-sourced logs, automatic snapshotting, and the use of the declarative syntax for domain models with non-trivial command methods.
This example also shows how to use a thread-specific context variable to set the value of a common event attribute without cluttering all the command methods with the same argument. In this example the ID of the user is recorded on each event, but the same technique can be used to set correlation and causation IDs on all events in a domain model.
Application¶
The ContentManagementApplication
class defines methods to create a new page, to get
the details for a page by its slug, to update the title of a page referenced by its slug,
to update the body of a page, and to change the page slug.
To get to a page, the slug is used to identify an index, and the index is used to get the
page ID, and then the page ID is used to get the body and title of the page. To change a
slug, the index objects for the old and the new are identified, the page ID is removed as
the reference from the old index and set as the reference on the new index. The indexes
are also used to implement a application-wide rule (or “invariant”) that a slug can be
used by only one page, such that if an attempt is made to change the slug of one page
to a slug that is already being used by another page, then a SlugConflictError
will be raised, and no changes made.
The application also demonstrates the “event-sourced log” recipe, by showing how all
the IDs of the Page
aggregates can be listed, by logging the page ID in a sequence
of stored events, and then selecting from this sequence when presenting a list of pages.
Please note, although the domain model (see below) involves a user_id
event attribute,
none of the application command methods mention a user_id
argument. Instead the value
is set in a context variable by callers of the application command methods (see the test below).
from typing import Any, Dict, Iterator, Optional, Union, cast
from uuid import NAMESPACE_URL, UUID, uuid5
from eventsourcing.application import AggregateNotFound, Application, EventSourcedLog
from eventsourcing.examples.contentmanagement.domainmodel import Index, Page, PageLogged
from eventsourcing.utils import EnvType
PageDetailsType = Dict[str, Union[str, Any]]
class ContentManagementApplication(Application):
env = {"COMPRESSOR_TOPIC": "gzip"}
snapshotting_intervals = {Page: 5}
def __init__(self, env: Optional[EnvType] = None) -> None:
super().__init__(env)
self.page_log: EventSourcedLog[PageLogged] = EventSourcedLog(
self.events, uuid5(NAMESPACE_URL, "/page_log"), PageLogged
)
def create_page(self, title: str, slug: str) -> None:
page = Page(title=title, slug=slug)
page_logged = self.page_log.trigger_event(page_id=page.id)
index_entry = Index(slug, ref=page.id)
self.save(page, page_logged, index_entry)
def get_page_details(self, slug: str) -> PageDetailsType:
page = self._get_page_by_slug(slug)
return self._details_from_page(page)
def _details_from_page(self, page: Page) -> PageDetailsType:
return {
"title": page.title,
"slug": page.slug,
"body": page.body,
"modified_by": page.modified_by,
}
def update_title(self, slug: str, title: str) -> None:
page = self._get_page_by_slug(slug)
page.update_title(title=title)
self.save(page)
def update_slug(self, old_slug: str, new_slug: str) -> None:
page = self._get_page_by_slug(old_slug)
page.update_slug(new_slug)
old_index = self._get_index(old_slug)
old_index.update_ref(None)
try:
new_index = self._get_index(new_slug)
except AggregateNotFound:
new_index = Index(new_slug, page.id)
else:
if new_index.ref is None:
new_index.update_ref(page.id)
else:
raise SlugConflictError()
self.save(page, old_index, new_index)
def update_body(self, slug: str, body: str) -> None:
page = self._get_page_by_slug(slug)
page.update_body(body)
self.save(page)
def _get_page_by_slug(self, slug: str) -> Page:
try:
index = self._get_index(slug)
except AggregateNotFound:
raise PageNotFound(slug)
if index.ref is None:
raise PageNotFound(slug)
page_id = index.ref
return self._get_page_by_id(page_id)
def _get_page_by_id(self, page_id: UUID) -> Page:
return cast(Page, self.repository.get(page_id))
def _get_index(self, slug: str) -> Index:
return cast(Index, self.repository.get(Index.create_id(slug)))
def get_pages(
self,
gt: Optional[int] = None,
lte: Optional[int] = None,
desc: bool = False,
limit: Optional[int] = None,
) -> Iterator[PageDetailsType]:
for page_logged in self.page_log.get(gt, lte, desc, limit):
page = self._get_page_by_id(page_logged.page_id)
yield self._details_from_page(page)
class PageNotFound(Exception):
"""
Raised when a page is not found.
"""
class SlugConflictError(Exception):
"""
Raised when updating a page to a slug used by another page.
"""
Domain model¶
In the domain model below, the Page
aggregate has a base class Event
which is defined with a user_id
dataclass field
. This base aggregate
event class is inherited by all its concrete aggregate event classes.
The user_id
field is defined not to be included in the __init__
method
of the aggregate’s event classes (init=False
), and so it does not need to
be matched by parameters in the aggregate command method signatures. Instead,
this field gets the event attribute value from a Python context variable
(default_factory=user_id_cvar.get
).
The update_body()
command method does some work on the command arguments
before the BodyUpdated
event is triggered. It creates a “diff” of the
current version of the body
and the new version. It then triggers an event,
which contains the diff, by calling _update_body()
. The event is applied
to the body
by patching the current version of the body
with the diff
that has been encapsulated by the event object.
The Index
aggregate has a version-5 UUID which is a function of a slug
.
The Index
and Page
aggregates are used in combination to maintain editable
pages of text, with editable titles, and with editable “slugs” that can be used in page URLs.
A PageLogged
event is also defined, and used to define a “page log” in the application.
from __future__ import annotations
from contextvars import ContextVar
from dataclasses import dataclass, field
from typing import Optional, cast
from uuid import NAMESPACE_URL, UUID, uuid5
from eventsourcing.domain import Aggregate, DomainEvent, event
from eventsourcing.examples.contentmanagement.utils import apply_patch, create_diff
user_id_cvar: ContextVar[Optional[UUID]] = ContextVar("user_id", default=None)
@dataclass
class Page(Aggregate):
title: str
slug: str
body: str = ""
modified_by: Optional[UUID] = field(default=None, init=False)
class Event(Aggregate.Event):
user_id: Optional[UUID] = field(default_factory=user_id_cvar.get, init=False)
def apply(self, aggregate: Aggregate) -> None:
cast(Page, aggregate).modified_by = self.user_id
@event("SlugUpdated")
def update_slug(self, slug: str) -> None:
self.slug = slug
@event("TitleUpdated")
def update_title(self, title: str) -> None:
self.title = title
def update_body(self, body: str) -> None:
self._update_body(create_diff(old=self.body, new=body))
@event("BodyUpdated")
def _update_body(self, diff: str) -> None:
self.body = apply_patch(old=self.body, diff=diff)
@dataclass
class Index(Aggregate):
slug: str
ref: Optional[UUID]
class Event(Aggregate.Event):
pass
@staticmethod
def create_id(slug: str) -> UUID:
return uuid5(NAMESPACE_URL, f"/slugs/{slug}")
@event("RefChanged")
def update_ref(self, ref: Optional[UUID]) -> None:
self.ref = ref
class PageLogged(DomainEvent):
page_id: UUID
The create_diff()
and apply_patch()
functions use the Unix command line
tools patch
and diff
.
import os
from tempfile import TemporaryDirectory
def create_diff(old: str, new: str) -> str:
return run("diff %s %s > %s", old, new)
def apply_patch(old: str, diff: str) -> str:
return run("patch -s %s %s -o %s", old, diff)
def run(cmd: str, a: str, b: str) -> str:
with TemporaryDirectory() as td:
a_path = os.path.join(td, "a")
b_path = os.path.join(td, "b")
c_path = os.path.join(td, "c")
with open(a_path, "w") as a_file:
a_file.write(a)
with open(b_path, "w") as b_file:
b_file.write(b)
os.system(cmd % (a_path, b_path, c_path))
with open(c_path, "r") as c_file:
return c_file.read()
Test case¶
The test case below sets a user ID in the context variable. A page is created and updated in various ways. At the end, all the page events are checked to make sure they all have the user ID that was set in the context variable.
from typing import cast
from unittest import TestCase
from uuid import uuid4
from eventsourcing.examples.contentmanagement.application import (
ContentManagementApplication,
PageNotFound,
SlugConflictError,
)
from eventsourcing.examples.contentmanagement.domainmodel import (
Index,
Page,
user_id_cvar,
)
from eventsourcing.system import NotificationLogReader
class TestContentManagement(TestCase):
def test(self) -> None:
# Set user_id context variable.
user_id = uuid4()
user_id_cvar.set(user_id)
# Construct application.
app = ContentManagementApplication()
# Check the page doesn't exist.
with self.assertRaises(PageNotFound):
app.get_page_details(slug="welcome")
# Check the list of pages is empty.
pages = list(app.get_pages())
self.assertEqual(len(pages), 0)
# Create a page.
app.create_page(title="Welcome", slug="welcome")
# Present page identified by the given slug.
page = app.get_page_details(slug="welcome")
# Check we got a dict that has the given title and slug.
self.assertEqual(page["title"], "Welcome")
self.assertEqual(page["slug"], "welcome")
self.assertEqual(page["body"], "")
self.assertEqual(page["modified_by"], user_id)
# Update the title.
app.update_title(slug="welcome", title="Welcome Visitors")
# Check the title was updated.
page = app.get_page_details(slug="welcome")
self.assertEqual(page["title"], "Welcome Visitors")
self.assertEqual(page["modified_by"], user_id)
# Update the slug.
app.update_slug(old_slug="welcome", new_slug="welcome-visitors")
# Check the index was updated.
with self.assertRaises(PageNotFound):
app.get_page_details(slug="welcome")
# Check we can get the page by the new slug.
page = app.get_page_details(slug="welcome-visitors")
self.assertEqual(page["title"], "Welcome Visitors")
self.assertEqual(page["slug"], "welcome-visitors")
# Update the body.
app.update_body(slug="welcome-visitors", body="Welcome to my wiki")
# Check the body was updated.
page = app.get_page_details(slug="welcome-visitors")
self.assertEqual(page["body"], "Welcome to my wiki")
# Update the body.
app.update_body(slug="welcome-visitors", body="Welcome to this wiki")
# Check the body was updated.
page = app.get_page_details(slug="welcome-visitors")
self.assertEqual(page["body"], "Welcome to this wiki")
# Update the body.
app.update_body(
slug="welcome-visitors",
body="""
Welcome to this wiki!
This is a wiki about...
""",
)
# Check the body was updated.
page = app.get_page_details(slug="welcome-visitors")
self.assertEqual(
page["body"],
"""
Welcome to this wiki!
This is a wiki about...
""",
)
# Check all the Page events have the user_id.
for notification in NotificationLogReader(app.notification_log).read(start=1):
domain_event = app.mapper.to_domain_event(notification)
if isinstance(domain_event, Page.Event):
self.assertEqual(domain_event.user_id, user_id)
# Change user_id context variable.
user_id = uuid4()
user_id_cvar.set(user_id)
# Update the body.
app.update_body(
slug="welcome-visitors",
body="""
Welcome to this wiki!
This is a wiki about us!
""",
)
# Check 'modified_by' changed.
page = app.get_page_details(slug="welcome-visitors")
self.assertEqual(page["title"], "Welcome Visitors")
self.assertEqual(page["modified_by"], user_id)
# Check a snapshot was created by now.
assert app.snapshots
index = cast(Index, app.repository.get(Index.create_id("welcome-visitors")))
assert index.ref
self.assertTrue(len(list(app.snapshots.get(index.ref))))
# Create some more pages and list all the pages.
app.create_page("Page 2", "page-2")
app.create_page("Page 3", "page-3")
app.create_page("Page 4", "page-4")
app.create_page("Page 5", "page-5")
pages = list(app.get_pages(desc=True))
self.assertEqual(pages[0]["title"], "Page 5")
self.assertEqual(pages[0]["slug"], "page-5")
self.assertEqual(pages[1]["title"], "Page 4")
self.assertEqual(pages[1]["slug"], "page-4")
self.assertEqual(pages[2]["title"], "Page 3")
self.assertEqual(pages[2]["slug"], "page-3")
self.assertEqual(pages[3]["title"], "Page 2")
self.assertEqual(pages[3]["slug"], "page-2")
self.assertEqual(pages[4]["title"], "Welcome Visitors")
self.assertEqual(pages[4]["slug"], "welcome-visitors")
pages = list(app.get_pages(desc=True, limit=3))
self.assertEqual(len(pages), 3)
self.assertEqual(pages[0]["slug"], "page-5")
self.assertEqual(pages[1]["slug"], "page-4")
self.assertEqual(pages[2]["slug"], "page-3")
pages = list(app.get_pages(desc=True, limit=3, lte=2))
self.assertEqual(len(pages), 2)
self.assertEqual(pages[0]["slug"], "page-2")
self.assertEqual(pages[1]["slug"], "welcome-visitors")
pages = list(app.get_pages(desc=True, lte=2))
self.assertEqual(len(pages), 2)
self.assertEqual(pages[0]["slug"], "page-2")
self.assertEqual(pages[1]["slug"], "welcome-visitors")
# Check we can't change the slug of a page to one
# that is being used by another page.
with self.assertRaises(SlugConflictError):
app.update_slug("page-2", "page-3")
# Check we can change the slug of a page to one
# that was previously being used.
app.update_slug("welcome-visitors", "welcome")
page = app.get_page_details(slug="welcome")
self.assertEqual(page["title"], "Welcome Visitors")
self.assertEqual(page["modified_by"], user_id)