Application 3 - Content management¶
This example demonstrates the use of namespaced IDs for both discovery of aggregate IDs and implementation of an application-wide rule (or “invariant”). This example also involves event-sourced logs, automatic snapshotting, and the use of the declarative syntax for domain models with non-trivial command methods.
This example also shows how to use a thread-specific context variable to set the value of a common event attribute without cluttering all the command methods with the same argument. In this example the ID of the user is recorded on each event, but the same technique can be used to set correlation and causation IDs on all events in a domain model.
Domain model¶
The Page class is an aggregate defined
to have a title,
a slug,
a body,
and a modified_by attribute.
It defines a aggregate event base class, Event,
which has a user_id attribute that is
defined with a data class field object. All the aggregate event classes of Page
inherit from its Event class. The
Event class defines a
apply() method that sets the
aggregate’s modified_by attribute
to the value of the events’s user_id attribute.
@dataclass
class Page(Aggregate):
title: str
"""The title of the page."""
slug: str
"""The slug of the page - used in URLs."""
body: str
"""The proper content of the page."""
modified_by: UUID | None = field(init=False)
"""The ID of the user who last modified the page."""
class Event(Aggregate.Event):
user_id: UUID | None = field(default_factory=user_id_cvar.get, init=False)
def apply(self, aggregate: Aggregate) -> None:
"""Sets the aggregate's `modified_by` attribute to the
value of the event's `user_id` attribute.
"""
cast("Page", aggregate).modified_by = self.user_id
@event("SlugUpdated")
def update_slug(self, slug: str) -> None:
self.slug = slug
@event("TitleUpdated")
def update_title(self, title: str) -> None:
self.title = title
def update_body(self, body: str) -> None:
diff = create_diff(old=self.body, new=body)
self._update_body(diff=diff)
class Created(Aggregate.Created, Event):
title: str
slug: str
body: str
class BodyUpdated(Event):
diff: str
@event(BodyUpdated)
def _update_body(self, diff: str) -> None:
new_body = apply_diff(old=self.body, diff=diff)
self.body = new_body
The user_id attribute is defined
as a dataclass field that is not included in __init__ methods (init=False),
and so it does not need to be matched by parameters in the aggregate command method signatures.
Instead, the data class field gets the event attribute value from a Python context variable
(default_factory=user_id_cvar.get). That is why none of the command method signatures
need to mention this as one of their arguments, but still all the aggregate events will carry
the ID of the user that executed the command.
The update_body() command method is a
non-trivial command methods, in that is does some work
on the command method arguments before triggering a domain event: it creates a “diff” of the
current version of the body and the
new version. For this reason, it is not decorated with the event decorator.
After creating a “diff” of the page page, it calls the “private” method
_update_body(), which is decorated with the
event decorator, and so triggers a
BodyUpdated event. The event is applied
to the page’s body by patching the
current value with the diff
that has been encapsulated by the event object.
The Slug assists the domain model by carrying
a page_id. This aggregate’s ID is a
version-5 UUID, that is a function of its name.
The Slug and Page
aggregates are used in combination to maintain editable pages of text, with editable titles, and with editable
“slugs” that can be used in page URLs. The slug’s name is taken from the URL, and used to discover the aggregate ID
of the page it is currently associated with.
@dataclass
class Slug(Aggregate):
name: str
page_id: UUID | None
class Event(Aggregate.Event):
pass
@staticmethod
def create_id(name: str) -> UUID:
return uuid5(NAMESPACE_URL, f"/slugs/{name}")
@event("PageUpdated")
def update_page(self, page_id: UUID | None) -> None:
self.page_id = page_id
A PageLogged event is also defined, and used to define a “page log”. The page log
can be used to discover all the pages that have been created.
class PageLogged(DomainEvent):
page_id: UUID
Application¶
The ContentManagement application encapsulates
the Page and Slug
aggregates. It defines methods to create a new page, to get the content of a page by its slug, and to update
the title, body, and slug of a page.
class ContentManagement(Application[UUID]):
env: ClassVar[dict[str, str]] = {"CONTENTMANAGEMENT_COMPRESSOR_TOPIC": "gzip"}
snapshotting_intervals: ClassVar[
dict[type[MutableOrImmutableAggregate[UUID]], int]
] = {Page: 5}
def __init__(self, env: EnvType | None = None) -> None:
super().__init__(env)
self.page_log: EventSourcedLog[PageLogged] = EventSourcedLog(
self.events, uuid5(NAMESPACE_URL, "/page_log"), PageLogged
)
def create_page(self, title: str, slug: str) -> int:
page = Page(title=title, slug=slug, body="")
page_logged = self.page_log.trigger_event(page_id=page.id)
index_entry = Slug(slug, page_id=page.id)
recordings = self.save(page, page_logged, index_entry)
return recordings[-1].notification.id
def get_page_by_slug(self, slug: str) -> PageDetailsType:
page = self._get_page_by_slug(slug)
return self._details_from_page(page)
def get_page_by_id(self, page_id: UUID) -> PageDetailsType:
page = self._get_page_by_id(page_id)
return self._details_from_page(page)
def _details_from_page(self, page: Page) -> PageDetailsType:
return {
"title": page.title,
"slug": page.slug,
"body": page.body,
"modified_by": page.modified_by,
}
def update_title(self, slug: str, title: str) -> int:
page = self._get_page_by_slug(slug)
page.update_title(title=title)
recordings = self.save(page)
return recordings[-1].notification.id
def update_slug(self, old_slug: str, new_slug: str) -> int:
page = self._get_page_by_slug(old_slug)
page.update_slug(new_slug)
old_slug_aggregate = self._get_slug(old_slug)
old_slug_aggregate.update_page(None)
try:
new_slug_aggregate = self._get_slug(new_slug)
except AggregateNotFoundError:
new_slug_aggregate = Slug(new_slug, page.id)
else:
if new_slug_aggregate.page_id is None:
new_slug_aggregate.update_page(page.id)
else:
raise SlugConflictError
recordings = self.save(page, old_slug_aggregate, new_slug_aggregate)
return recordings[-1].notification.id
def update_body(self, slug: str, body: str) -> int:
page = self._get_page_by_slug(slug)
page.update_body(body)
recordings = self.save(page)
return recordings[-1].notification.id
def _get_page_by_slug(self, slug: str) -> Page:
try:
index = self._get_slug(slug)
except AggregateNotFoundError:
raise PageNotFoundError(slug) from None
if index.page_id is None:
raise PageNotFoundError(slug)
page_id = index.page_id
return self._get_page_by_id(page_id)
def _get_page_by_id(self, page_id: UUID) -> Page:
return self.repository.get(page_id)
def _get_slug(self, slug: str) -> Slug:
return self.repository.get(Slug.create_id(slug))
def get_pages(
self,
*,
gt: int | None = None,
lte: int | None = None,
desc: bool = False,
limit: int | None = None,
) -> Iterator[PageDetailsType]:
for page_logged in self.page_log.get(gt=gt, lte=lte, desc=desc, limit=limit):
page = self._get_page_by_id(page_logged.page_id)
yield self._details_from_page(page)
To get to a page, a slug aggregate ID is computed from a slug string, and the slug aggregate is used to get the page aggregate ID.
To change a page’s slug, the slug aggregates for the old and the new slug strings are obtained,
the page ID is removed as the page ID of the old slug, and it is set as the page ID of the new slug.
The slugs are also used to implement an application-wide rule (or “invariant”) that a slug can be
used by only one page. If an attempt is made to change the slug of one page to a slug that is already
being used by another page, then a SlugConflictError
will be raised, and no changes made.
The application also demonstrates the “event-sourced log” recipe, by showing how all
the IDs of the Page aggregates can be listed, by logging
the page ID in a sequence of stored events, and then selecting from this sequence when presenting a list
of pages.
Please note, although all the Page aggregate events
have a user_id attribute,
none of the aggregate or application command methods mention a user_id argument. Instead the value
can be set in a context variable by callers of the application command methods, for example in an
interface or presentation layer after a user request has been authenticated (see the test case below).
The application also demonstrates the automatic snapshotting of aggregates at regular intervals. In
this case, the class attribute snapshotting_intervals
specifies that a page will be snapshotted every 5 events.
Diff and patch utilities¶
The create_diff() and apply_diff() functions use the Unix command line
tools diff and patch.
def create_diff(old: str, new: str) -> str:
return run("diff %s %s > %s", old, new)
def apply_diff(old: str, diff: str) -> str:
return run("patch -s %s %s -o %s", old, diff)
def run(cmd: str, a: str, b: str) -> str:
with TemporaryDirectory() as td:
a_path = Path(td) / "a"
b_path = Path(td) / "b"
c_path = Path(td) / "c"
with a_path.open("w") as a_file:
a_file.write(a)
with b_path.open("w") as b_file:
b_file.write(b)
os.system(cmd % (a_path, b_path, c_path)) # noqa: S605
with c_path.open() as c_file:
return c_file.read()
Test case¶
The TestContentManagement test case creates and updates pages
in various ways. It sets a user ID in user_id_cvar context
variable before application methods are called. At the end, all the page events are checked to make sure
they all have the user ID that was set in the context variable.
class TestContentManagement(TestCase):
def test(self) -> None:
# Construct application.
app = ContentManagement()
# Check the page doesn't exist.
with self.assertRaises(PageNotFoundError):
app.get_page_by_slug(slug="welcome")
# Check the list of pages is empty.
pages = list(app.get_pages())
self.assertEqual(len(pages), 0)
# Create a page.
user_id1 = uuid4()
user_id_cvar.set(user_id1)
app.create_page(title="Welcome", slug="welcome")
# Present page identified by the given slug.
page = app.get_page_by_slug(slug="welcome")
# Check we got a dict that has the given title and slug.
self.assertEqual(page["title"], "Welcome")
self.assertEqual(page["slug"], "welcome")
self.assertEqual(page["body"], "")
self.assertEqual(page["modified_by"], user_id1)
# Update the title.
user_id2 = uuid4()
user_id_cvar.set(user_id2)
app.update_title(slug="welcome", title="Welcome Visitors")
# Check the title was updated.
page = app.get_page_by_slug(slug="welcome")
self.assertEqual(page["title"], "Welcome Visitors")
self.assertEqual(page["slug"], "welcome")
self.assertEqual(page["body"], "")
self.assertEqual(page["modified_by"], user_id2)
# Update the slug.
user_id3 = uuid4()
user_id_cvar.set(user_id3)
app.update_slug(old_slug="welcome", new_slug="welcome-visitors")
# Check the slug was updated.
with self.assertRaises(PageNotFoundError):
app.get_page_by_slug(slug="welcome")
# Check we can get the page by the new slug.
page = app.get_page_by_slug(slug="welcome-visitors")
self.assertEqual(page["title"], "Welcome Visitors")
self.assertEqual(page["slug"], "welcome-visitors")
self.assertEqual(page["body"], "")
self.assertEqual(page["modified_by"], user_id3)
# Update the body.
user_id4 = uuid4()
user_id_cvar.set(user_id4)
app.update_body(slug="welcome-visitors", body="Welcome to my wiki!")
# Check the body was updated.
page = app.get_page_by_slug(slug="welcome-visitors")
self.assertEqual(page["title"], "Welcome Visitors")
self.assertEqual(page["slug"], "welcome-visitors")
self.assertEqual(page["body"], "Welcome to my wiki!")
self.assertEqual(page["modified_by"], user_id4)
# Check we are on version 4.
page_id = cast(
"Slug", app.repository.get(Slug.create_id("welcome-visitors"))
).page_id
assert page_id is not None
page_aggregate_v4: Page = app.repository.get(page_id)
self.assertEqual(page_aggregate_v4.version, 4)
# Check there are no snapshots.
assert app.snapshots is not None
self.assertFalse(len(list(app.snapshots.get(page_id))))
# Update the body (should trigger a snapshot).
user_id5 = uuid4()
user_id_cvar.set(user_id5)
app.update_body(
slug="welcome-visitors",
body="""
Welcome to this wiki!
This is a wiki about...
""",
)
# Check we are on version 5.
page_aggregate_v5: Page = app.repository.get(page_id)
self.assertEqual(page_aggregate_v5.version, 5)
# Check the body was updated.
page = app.get_page_by_slug(slug="welcome-visitors")
self.assertEqual(page["title"], "Welcome Visitors")
self.assertEqual(page["slug"], "welcome-visitors")
self.assertEqual(
page["body"],
"""
Welcome to this wiki!
This is a wiki about...
""",
)
self.assertEqual(page["modified_by"], user_id5)
# Check a snapshot was taken.
self.assertTrue(len(list(app.snapshots.get(page_id))))
# Check all the Page events have the correct user IDs.
user_ids = iter([user_id1, user_id2, user_id3, user_id4, user_id5])
for notification in NotificationLogReader(app.notification_log).read(start=1):
domain_event = app.mapper.to_domain_event(notification)
if isinstance(domain_event, Page.Event):
self.assertEqual(domain_event.user_id, next(user_ids))
# Create some more pages.
app.create_page("Page 2", "page-2")
app.create_page("Page 3", "page-3")
app.create_page("Page 4", "page-4")
app.create_page("Page 5", "page-5")
# List all the pages.
pages = list(app.get_pages(desc=True))
self.assertEqual(pages[0]["title"], "Page 5")
self.assertEqual(pages[0]["slug"], "page-5")
self.assertEqual(pages[1]["title"], "Page 4")
self.assertEqual(pages[1]["slug"], "page-4")
self.assertEqual(pages[2]["title"], "Page 3")
self.assertEqual(pages[2]["slug"], "page-3")
self.assertEqual(pages[3]["title"], "Page 2")
self.assertEqual(pages[3]["slug"], "page-2")
self.assertEqual(pages[4]["title"], "Welcome Visitors")
self.assertEqual(pages[4]["slug"], "welcome-visitors")
pages = list(app.get_pages(desc=True, limit=3))
self.assertEqual(len(pages), 3)
self.assertEqual(pages[0]["slug"], "page-5")
self.assertEqual(pages[1]["slug"], "page-4")
self.assertEqual(pages[2]["slug"], "page-3")
pages = list(app.get_pages(desc=True, limit=3, lte=2))
self.assertEqual(len(pages), 2)
self.assertEqual(pages[0]["slug"], "page-2")
self.assertEqual(pages[1]["slug"], "welcome-visitors")
pages = list(app.get_pages(desc=True, lte=2))
self.assertEqual(len(pages), 2)
self.assertEqual(pages[0]["slug"], "page-2")
self.assertEqual(pages[1]["slug"], "welcome-visitors")
# Check we can't change the slug of a page to one
# that is being used by another page.
app.get_page_by_slug(slug="page-3")
with self.assertRaises(SlugConflictError):
app.update_slug("page-2", "page-3")
# Check we can change the slug of a page to one
# that was previously being used by another page.
app.get_page_by_slug(slug="welcome-visitors")
with self.assertRaises(PageNotFoundError):
app.get_page_by_slug(slug="welcome")
slug: Slug = app.repository.get(Slug.create_id("welcome"))
self.assertIsNone(slug.page_id)
app.update_slug("welcome-visitors", "welcome")
page = app.get_page_by_slug(slug="welcome")
self.assertEqual(page["title"], "Welcome Visitors")
Code reference¶
- examples.contentmanagement.domainmodel.user_id_cvar: ContextVar[UUID | None] = <ContextVar name='user_id' default=None>¶
Context variable holding a user ID for the current thread.
- class examples.contentmanagement.domainmodel.Page(*args: 'Any', **kwargs: 'Any')[source]¶
Bases:
Aggregate- title: str¶
The title of the page.
- slug: str¶
The slug of the page - used in URLs.
- body: str¶
The proper content of the page.
- modified_by: UUID | None¶
The ID of the user who last modified the page.
- class Event(self, originator_id: 'UUID', originator_version: 'int', timestamp: 'datetime')[source]¶
Bases:
Event- user_id: UUID | None¶
- apply(aggregate: Aggregate) None[source]¶
Sets the aggregate’s modified_by attribute to the value of the event’s user_id attribute.
- originator_id_type¶
alias of
UUID
- class Created(self, originator_id: 'UUID', originator_version: 'int', timestamp: 'datetime', originator_topic: 'str')[source]¶
-
- title: str¶
- slug: str¶
- body: str¶
- originator_id_type¶
alias of
UUID
- class BodyUpdated(self, originator_id: 'UUID', originator_version: 'int', timestamp: 'datetime', diff: 'str')[source]¶
Bases:
DecoratedFuncCaller,BodyUpdated
- class SlugUpdated(self, originator_id: 'UUID', originator_version: 'int', timestamp: 'datetime')¶
Bases:
DecoratedFuncCaller,Event- slug: str¶
- class TitleUpdated(self, originator_id: 'UUID', originator_version: 'int', timestamp: 'datetime')¶
Bases:
DecoratedFuncCaller,Event- title: str¶
- class examples.contentmanagement.domainmodel.Slug(*args: 'Any', **kwargs: 'Any')[source]¶
Bases:
Aggregate- name: str¶
- page_id: UUID | None¶
- class Created(self, originator_id: 'UUID', originator_version: 'int', timestamp: 'datetime', originator_topic: 'str')¶
-
- originator_id_type¶
alias of
UUID
- name: str¶
- page_id: UUID | None¶
- class Event(self, originator_id: 'UUID', originator_version: 'int', timestamp: 'datetime')[source]¶
Bases:
Event- originator_id_type¶
alias of
UUID
- timestamp: datetime¶
Timezone-aware
datetimeobject representing when an event occurred.
- originator_id: TAggregateID¶
UUID identifying an aggregate to which the event belongs.
- originator_version: int¶
Integer identifying the version of the aggregate when the event occurred.
- class PageUpdated(self, originator_id: 'UUID', originator_version: 'int', timestamp: 'datetime')¶
Bases:
DecoratedFuncCaller,Event- page_id: UUID | None¶
- class examples.contentmanagement.domainmodel.PageLogged(self, originator_id: 'UUID', originator_version: 'int', timestamp: 'datetime')[source]¶
Bases:
DomainEvent- page_id: UUID¶
- class examples.contentmanagement.application.ContentManagement(env: EnvType | None = None)[source]¶
Bases:
Application[UUID]- env: ClassVar[dict[str, str]] = {'CONTENTMANAGEMENT_COMPRESSOR_TOPIC': 'gzip'}¶
- snapshotting_intervals: ClassVar[dict[type[MutableOrImmutableAggregate[UUID]], int]] = {<class 'examples.contentmanagement.domainmodel.Page'>: 5}¶
- get_pages(*, gt: int | None = None, lte: int | None = None, desc: bool = False, limit: int | None = None) Iterator[PageDetailsType][source]¶
- name = 'ContentManagement'¶
- exception examples.contentmanagement.application.PageNotFoundError[source]¶
Bases:
ExceptionRaised when a page is not found.