Application 3 - Content management¶
This example demonstrates the use of version-5 UUIDs for both discovery of aggregate IDs and also to implement an application-wide rule (or “invariant”), the use of the declarative syntax for domain models with a “non-trivial” command method, automatic snapshotting, automatic setting of a common attribute on all events without needing to mention this attribute in the command methods, and a recipe for an event-sourced log.
Application¶
The application provides methods to create a new page, get the details for a page by its
slug, update the title of a page referenced by its slug, update the body of a page,
and change the page slug. Please note that none of these methods mention a user_id
argument. To get to a page, the slug is used to identify an index, and the index is used
to get the page ID, and then the page ID is used to get the body and title of
the page. To change a slug, the index objects for the old and the new are identified,
the page ID is removed as the reference from the old index and set as the reference
on the new index. The indexes are also used to implement a application-wide rule (or
“invariant”) that a slug can be used by only one page, such that if an attempt is made
to change the slug of one page to a slug that is already being used by another page,
then a SlugConflictError
will be raised, and no changes made.
The application also demonstrates the “event-sourced log” recipe, by showing how all the
IDs of the Page
aggregates can be listed, by logging the IDs when a new page is
created, in a sequence of stored events, and then selecting from this sequence when
presenting a list of pages.
from typing import Any, Dict, Iterator, Optional, Union, cast
from uuid import NAMESPACE_URL, UUID, uuid5
from eventsourcing.application import AggregateNotFound, Application, EventSourcedLog
from eventsourcing.examples.wiki.domainmodel import Index, Page, PageLogged
from eventsourcing.utils import EnvType
PageDetailsType = Dict[str, Union[str, Any]]
class WikiApplication(Application):
env = {"COMPRESSOR_TOPIC": "gzip"}
snapshotting_intervals = {Page: 5}
def __init__(self, env: Optional[EnvType] = None) -> None:
super().__init__(env)
self.page_log: EventSourcedLog[PageLogged] = EventSourcedLog(
self.events, uuid5(NAMESPACE_URL, "/page_log"), PageLogged
)
def create_page(self, title: str, slug: str) -> None:
page = Page(title=title, slug=slug)
page_logged = self.page_log.trigger_event(page_id=page.id)
index_entry = Index(slug, ref=page.id)
self.save(page, page_logged, index_entry)
def get_page_details(self, slug: str) -> PageDetailsType:
page = self._get_page_by_slug(slug)
return self._details_from_page(page)
def _details_from_page(self, page: Page) -> PageDetailsType:
return {
"title": page.title,
"slug": page.slug,
"body": page.body,
"modified_by": page.modified_by,
}
def update_title(self, slug: str, title: str) -> None:
page = self._get_page_by_slug(slug)
page.update_title(title=title)
self.save(page)
def update_slug(self, old_slug: str, new_slug: str) -> None:
page = self._get_page_by_slug(old_slug)
page.update_slug(new_slug)
old_index = self._get_index(old_slug)
old_index.update_ref(None)
try:
new_index = self._get_index(new_slug)
except AggregateNotFound:
new_index = Index(new_slug, page.id)
else:
if new_index.ref is None:
new_index.update_ref(page.id)
else:
raise SlugConflictError()
self.save(page, old_index, new_index)
def update_body(self, slug: str, body: str) -> None:
page = self._get_page_by_slug(slug)
page.update_body(body)
self.save(page)
def _get_page_by_slug(self, slug: str) -> Page:
try:
index = self._get_index(slug)
except AggregateNotFound:
raise PageNotFound(slug)
if index.ref is None:
raise PageNotFound(slug)
page_id = index.ref
return self._get_page_by_id(page_id)
def _get_page_by_id(self, page_id: UUID) -> Page:
return cast(Page, self.repository.get(page_id))
def _get_index(self, slug: str) -> Index:
return cast(Index, self.repository.get(Index.create_id(slug)))
def get_pages(
self,
gt: Optional[int] = None,
lte: Optional[int] = None,
desc: bool = False,
limit: Optional[int] = None,
) -> Iterator[PageDetailsType]:
for page_logged in self.page_log.get(gt, lte, desc, limit):
page = self._get_page_by_id(page_logged.page_id)
yield self._details_from_page(page)
class PageNotFound(Exception):
"""
Raised when a page is not found.
"""
class SlugConflictError(Exception):
"""
Raised when updating a page to a slug used by another page.
"""
Domain model¶
In the domain model below, the Page
aggregate has a base class Event
which is defined with a user_id
dataclass field
that is defined not
to be included in its __init__
method, and so does not need to be matched
by parameters in the command method signatures. It has a default factory which
gets the event attribute value from a Python context variable. This base aggregate
event class is inherited by all its concrete aggregate event classes.
The update_body()
command method does a “non-trival” amount of work
before the BodyUpdated
event is triggered, by creating a “diff” of the
current version of the body
and the new version. It then triggers an event,
which contains the diff. The event is applied to the body
by “patching” the
current version of the body
with this diff.
The Index
aggregate has a version-5 UUID which is a function of a slug
.
The Index
and Page
aggregates are used in combination to maintain editable
pages of text, with editable titles, and with editable “slugs” that can be used in page URLs.
A PageLogged
event is also defined, and used to define a “page log” in the application.
from __future__ import annotations
from contextvars import ContextVar
from dataclasses import dataclass, field
from typing import Optional, cast
from uuid import NAMESPACE_URL, UUID, uuid5
from eventsourcing.domain import Aggregate, LogEvent, event
from eventsourcing.examples.wiki.utils import apply_patch, create_diff
user_id_cvar: ContextVar[Optional[UUID]] = ContextVar("user_id", default=None)
@dataclass
class Page(Aggregate):
title: str
slug: str
body: str = ""
modified_by: Optional[UUID] = field(default=None, init=False)
class Event(Aggregate.Event):
user_id: Optional[UUID] = field(default_factory=user_id_cvar.get, init=False)
def apply(self, aggregate: Aggregate) -> None:
cast(Page, aggregate).modified_by = self.user_id
@event("SlugUpdated")
def update_slug(self, slug: str) -> None:
self.slug = slug
@event("TitleUpdated")
def update_title(self, title: str) -> None:
self.title = title
def update_body(self, body: str) -> None:
self._update_body(create_diff(old=self.body, new=body))
@event("BodyUpdated")
def _update_body(self, diff: str) -> None:
self.body = apply_patch(old=self.body, diff=diff)
@dataclass
class Index(Aggregate):
slug: str
ref: Optional[UUID]
class Event(Aggregate.Event):
pass
@staticmethod
def create_id(slug: str) -> UUID:
return uuid5(NAMESPACE_URL, f"/slugs/{slug}")
@event("RefChanged")
def update_ref(self, ref: Optional[UUID]) -> None:
self.ref = ref
class PageLogged(LogEvent):
page_id: UUID
The create_diff()
and apply_patch()
functions use the Unix command line
tools patch
and diff
.
import os
from tempfile import TemporaryDirectory
def create_diff(old: str, new: str) -> str:
return run("diff %s %s > %s", old, new)
def apply_patch(old: str, diff: str) -> str:
return run("patch -s %s %s -o %s", old, diff)
def run(cmd: str, a: str, b: str) -> str:
with TemporaryDirectory() as td:
a_path = os.path.join(td, "a")
b_path = os.path.join(td, "b")
c_path = os.path.join(td, "c")
with open(a_path, "w") as a_file:
a_file.write(a)
with open(b_path, "w") as b_file:
b_file.write(b)
os.system(cmd % (a_path, b_path, c_path))
with open(c_path, "r") as c_file:
return c_file.read()
Test case¶
The test case below sets a user ID in the context variable. A page is created and updated in various ways. At the end, all the page events are checked to make sure they all have the user ID that was set in the context variable.
from typing import cast
from unittest import TestCase
from uuid import uuid4
from eventsourcing.examples.wiki.application import (
PageNotFound,
SlugConflictError,
WikiApplication,
)
from eventsourcing.examples.wiki.domainmodel import Index, Page, user_id_cvar
from eventsourcing.system import NotificationLogReader
class TestWiki(TestCase):
def test(self) -> None:
# Set user_id context variable.
user_id = uuid4()
user_id_cvar.set(user_id)
# Construct application.
app = WikiApplication()
# Check the page doesn't exist.
with self.assertRaises(PageNotFound):
app.get_page_details(slug="welcome")
# Check the list of pages is empty.
pages = list(app.get_pages())
self.assertEqual(len(pages), 0)
# Create a page.
app.create_page(title="Welcome", slug="welcome")
# Present page identified by the given slug.
page = app.get_page_details(slug="welcome")
# Check we got a dict that has the given title and slug.
self.assertEqual(page["title"], "Welcome")
self.assertEqual(page["slug"], "welcome")
self.assertEqual(page["body"], "")
self.assertEqual(page["modified_by"], user_id)
# Update the title.
app.update_title(slug="welcome", title="Welcome Visitors")
# Check the title was updated.
page = app.get_page_details(slug="welcome")
self.assertEqual(page["title"], "Welcome Visitors")
self.assertEqual(page["modified_by"], user_id)
# Update the slug.
app.update_slug(old_slug="welcome", new_slug="welcome-visitors")
# Check the index was updated.
with self.assertRaises(PageNotFound):
app.get_page_details(slug="welcome")
# Check we can get the page by the new slug.
page = app.get_page_details(slug="welcome-visitors")
self.assertEqual(page["title"], "Welcome Visitors")
self.assertEqual(page["slug"], "welcome-visitors")
# Update the body.
app.update_body(slug="welcome-visitors", body="Welcome to my wiki")
# Check the body was updated.
page = app.get_page_details(slug="welcome-visitors")
self.assertEqual(page["body"], "Welcome to my wiki")
# Update the body.
app.update_body(slug="welcome-visitors", body="Welcome to this wiki")
# Check the body was updated.
page = app.get_page_details(slug="welcome-visitors")
self.assertEqual(page["body"], "Welcome to this wiki")
# Update the body.
app.update_body(
slug="welcome-visitors",
body="""
Welcome to this wiki!
This is a wiki about...
""",
)
# Check the body was updated.
page = app.get_page_details(slug="welcome-visitors")
self.assertEqual(
page["body"],
"""
Welcome to this wiki!
This is a wiki about...
""",
)
# Check all the Page events have the user_id.
for notification in NotificationLogReader(app.notification_log).read(start=1):
domain_event = app.mapper.to_domain_event(notification)
if isinstance(domain_event, Page.Event):
self.assertEqual(domain_event.user_id, user_id)
# Change user_id context variable.
user_id = uuid4()
user_id_cvar.set(user_id)
# Update the body.
app.update_body(
slug="welcome-visitors",
body="""
Welcome to this wiki!
This is a wiki about us!
""",
)
# Check 'modified_by' changed.
page = app.get_page_details(slug="welcome-visitors")
self.assertEqual(page["title"], "Welcome Visitors")
self.assertEqual(page["modified_by"], user_id)
# Check a snapshot was created by now.
assert app.snapshots
index = cast(Index, app.repository.get(Index.create_id("welcome-visitors")))
assert index.ref
self.assertTrue(len(list(app.snapshots.get(index.ref))))
# Create some more pages and list all the pages.
app.create_page("Page 2", "page-2")
app.create_page("Page 3", "page-3")
app.create_page("Page 4", "page-4")
app.create_page("Page 5", "page-5")
pages = list(app.get_pages(desc=True))
self.assertEqual(pages[0]["title"], "Page 5")
self.assertEqual(pages[0]["slug"], "page-5")
self.assertEqual(pages[1]["title"], "Page 4")
self.assertEqual(pages[1]["slug"], "page-4")
self.assertEqual(pages[2]["title"], "Page 3")
self.assertEqual(pages[2]["slug"], "page-3")
self.assertEqual(pages[3]["title"], "Page 2")
self.assertEqual(pages[3]["slug"], "page-2")
self.assertEqual(pages[4]["title"], "Welcome Visitors")
self.assertEqual(pages[4]["slug"], "welcome-visitors")
pages = list(app.get_pages(desc=True, limit=3))
self.assertEqual(len(pages), 3)
self.assertEqual(pages[0]["slug"], "page-5")
self.assertEqual(pages[1]["slug"], "page-4")
self.assertEqual(pages[2]["slug"], "page-3")
pages = list(app.get_pages(desc=True, limit=3, lte=2))
self.assertEqual(len(pages), 2)
self.assertEqual(pages[0]["slug"], "page-2")
self.assertEqual(pages[1]["slug"], "welcome-visitors")
pages = list(app.get_pages(desc=True, lte=2))
self.assertEqual(len(pages), 2)
self.assertEqual(pages[0]["slug"], "page-2")
self.assertEqual(pages[1]["slug"], "welcome-visitors")
# Check we can't change the slug of a page to one
# that is being used by another page.
with self.assertRaises(SlugConflictError):
app.update_slug("page-2", "page-3")
# Check we can change the slug of a page to one
# that was previously being used.
app.update_slug("welcome-visitors", "welcome")
page = app.get_page_details(slug="welcome")
self.assertEqual(page["title"], "Welcome Visitors")
self.assertEqual(page["modified_by"], user_id)