DCB 1 - Course Subscriptions¶
On this page, we are setting the scene for a series of examples that illustrate the support offered by this library for “dynamic consistency boundaries”.
We will define and validate a test case and an interface for the course subscriptions challenge. Over the next few pages, we will explore the basic objects and methods described in the “dynamic consistency boundaries” specification, and then look at some higher-level styles for DCB. Alongside these developments, we will also present an assessment of successively better support for the technical challenge of implementing a DCB event store.
Introduction¶
The course subscription challenge is often used when discussing dynamic consistency boundaries. The challenge is to enforce a rule when enrolling students on courses that no student can join more than a given number of courses, and no course can accept more than a given number of students. The idea is that this is either difficult or impossible with “traditional” event-sourced aggregates without much accidental complexity, and that DCB allows more straightforward implementations.
In the section Aggregates and DCB we can see how to implement the course subscriptions challenge using “traditional” event-sourced aggregates, by extending the consistency boundary to include more than one aggregate. In the Basic DCB Objects example, you can see how to implement the “course subscriptions” challenge in Python using these basic DCB objects. Whilst the code is relatively verbose, the DCB approach can be understood directly without any extra abstractions. The later examples Enduring Objects and Groups and Vertical Slices with DCB present the alternative higher-level abstractions supported by this library that are perhaps more usable.
Enrolment test case¶
The EnrolmentTestCase below checks an implementation can register
students and courses, and that students can join courses, with some particular conditions that should lead to
particular errors. It depends on the Enrolment interface defined below.
class EnrolmentTestCase(TestCase):
def assert_implementation(self, app: EnrolmentInterface) -> None:
# Register courses.
dcb = app.register_course("Dynamic Consistency Boundaries", places=5)
maths = app.register_course("Maths", places=5)
biology = app.register_course("Biology", places=5)
french = app.register_course("French", places=5)
spanish = app.register_course("Spanish", places=5)
# Register students.
sara = app.register_student("Sara", max_courses=3)
mollie = app.register_student("Mollie", max_courses=3)
allard = app.register_student("Allard", max_courses=3)
grace = app.register_student("Grace", max_courses=3)
bastian = app.register_student("Bastian", max_courses=3)
greg = app.register_student("Greg", max_courses=3)
katherine = app.register_student("Katherine", max_courses=3)
# Enrol students for "Dynamic Consistency Boundaries" course.
app.join_course(sara, dcb)
app.join_course(mollie, dcb)
app.join_course(allard, dcb)
app.join_course(grace, dcb)
app.join_course(bastian, dcb)
# Greg can't join because the course is full.
with self.assertRaises(FullyBookedError):
app.join_course(greg, dcb)
# Greg joins other courses instead.
app.join_course(greg, french)
app.join_course(greg, spanish)
app.join_course(greg, maths)
# Greg has enough to do already.
with self.assertRaises(TooManyCoursesError):
app.join_course(greg, biology)
# Katherine also does "French".
app.join_course(katherine, french)
# Katherine already does "French".
with self.assertRaises(AlreadyJoinedError):
app.join_course(katherine, french)
# Course not found.
with self.assertRaises(CourseNotFoundError):
app.join_course(grace, CourseID("not-a-course"))
# Student not found.
with self.assertRaises(StudentNotFoundError):
app.join_course(StudentID("not-a-student"), dcb)
# List students for "Dynamic Consistency Boundaries" course.
students = app.list_students_for_course(dcb)
self.assertEqual(students, ["Sara", "Mollie", "Allard", "Grace", "Bastian"])
# List students for "French" course.
students = app.list_students_for_course(french)
self.assertEqual(students, ["Greg", "Katherine"])
# List Sara's courses.
courses = app.list_courses_for_student(sara)
self.assertEqual(courses, ["Dynamic Consistency Boundaries"])
# List Greg's courses.
courses = app.list_courses_for_student(greg)
self.assertEqual(courses, ["French", "Spanish", "Maths"])
Enrolment interface¶
The course subscriptions challenge can be expressed firstly as an interface.
The EnrolmentInterface will be used across
all the examples in the following pages. We have defined methods for registering students,
for registering courses, for joining students with courses, for listing students for
a course, and for listing courses for a student, along with some exception classes.
from __future__ import annotations
from abc import ABC, abstractmethod
from typing import NewType
StudentID = NewType("StudentID", str)
CourseID = NewType("CourseID", str)
class EnrolmentInterface(ABC):
@abstractmethod
def register_student(self, name: str, max_courses: int) -> StudentID:
"""
Register a new student.
"""
@abstractmethod
def register_course(self, name: str, places: int) -> CourseID:
"""
Register a new course.
"""
@abstractmethod
def join_course(self, student_id: StudentID, course_id: CourseID) -> None:
"""
Enrol a student on a course.
"""
@abstractmethod
def list_students_for_course(self, course_id: CourseID) -> list[str]:
"""
List students enrolled on a course.
"""
@abstractmethod
def list_courses_for_student(self, student_id: StudentID) -> list[str]:
"""
List courses enrolled by a student.
"""
class StudentNotFoundError(Exception):
"""
Raised when a student is not registered.
"""
class CourseNotFoundError(Exception):
"""
Raised when a course is not registered.
"""
class TooManyCoursesError(Exception):
"""
Raised when a student is already enrolled to a maximum number of courses.
"""
class FullyBookedError(Exception):
"""
Raised when a course already has a maximum number of enrolled students.
"""
class NotAlreadyJoinedError(Exception):
"""
Raised when a student is not already enrolled on a course.
"""
class AlreadyJoinedError(Exception):
"""
Raised when a student is already enrolled on a course.
"""
Aggregates and DCB¶
Before we continue with DCB, let’s implement the course subscriptions challenge with “traditional” event-sourced aggregates. This will allow us to validate the interface and to demonstrate the test case is effective.
The central critique motivating DCB is that the aggregates of DDD establish strict and rigid consistency boundaries that may eventually become inappropriate and difficult to refactor. This may be true. We will investigate later how comparatively easy or difficult it is to refactor sequences of events recorded by DCB applications and by event-sourced applications.
Another of the arguments motivating DCB is that, “by definition, the aggregate is the boundary of consistency” and so it is impossible to implement the “course subscriptions” challenge using event-sourced aggregates without the accidental complexity of awkward tricks.
Whatever the arguments are against aggregates, it is more important that a proposition be interesting than that it be true. DCB is indeed an interesting novel proposition.
Enrolment with aggregates¶
The domain model shown below defines Course, an event-sourced aggregate
class for courses that students can join, and Student, an event-sourced
aggregate class for students that may join courses.
class Course(Aggregate[CourseID]):
@staticmethod
def create_id() -> CourseID:
return CourseID("course-" + str(uuid4()))
def __init__(self, name: str, places: int) -> None:
self.name = name
self.places = places
self.student_ids: list[StudentID] = []
@event("StudentAccepted")
def accept_student(self, student_id: StudentID) -> None:
if len(self.student_ids) >= self.places:
raise FullyBookedError
if student_id in self.student_ids:
raise AlreadyJoinedError
self.student_ids.append(student_id)
class Student(Aggregate[StudentID]):
@staticmethod
def create_id() -> StudentID:
return StudentID("student-" + str(uuid4()))
def __init__(self, name: str, max_courses: int) -> None:
self.name = name
self.max_courses = max_courses
self.course_ids: list[CourseID] = []
@event("CourseJoined")
def join_course(self, course_id: CourseID) -> None:
if len(self.course_ids) >= self.max_courses:
raise TooManyCoursesError
self.course_ids.append(course_id)
These aggregate classes are implemented using the concise declarative syntax supported by this library. These aggregate classes are coded to use string IDs as demonstrated in example 11.
The EnrolmentWithAggregates application uses
the Course and Student
aggregate classes to implement the enrolment interface.
The “consistency boundary” for joining a course includes a student and a course.
class EnrolmentWithAggregates(Application[str], EnrolmentInterface):
env: ClassVar[dict[str, str]] = {
"MAPPER_TOPIC": get_topic(MessagePackMapper),
"ORIGINATOR_ID_TYPE": "text",
}
def register_student(self, name: str, max_courses: int) -> StudentID:
student = Student(name, max_courses=max_courses)
self.save(student)
return student.id
def register_course(self, name: str, places: int) -> CourseID:
course = Course(name, places=places)
self.save(course)
return course.id
def join_course(self, student_id: StudentID, course_id: CourseID) -> None:
course = self.get_course(course_id)
student = self.get_student(student_id)
course.accept_student(student_id)
student.join_course(course_id)
self.save(course, student)
def list_students_for_course(self, course_id: CourseID) -> list[str]:
course = self.get_course(course_id)
return [self.get_student(s).name for s in course.student_ids]
def list_courses_for_student(self, student_id: StudentID) -> list[str]:
student = self.get_student(student_id)
return [self.get_course(s).name for s in student.course_ids]
def get_student(self, student_id: StudentID) -> Student:
try:
return self.repository.get(student_id)
except AggregateNotFoundError:
raise StudentNotFoundError from None
def get_course(self, course_id: CourseID) -> Course:
try:
return self.repository.get(course_id)
except AggregateNotFoundError:
raise CourseNotFoundError from None
This meets the “course subscriptions” challenge with event-sourced aggregates, without tricks and without accidental complexity. It shows that it is possible and straightforward to extend the transactional consistency boundary when using event-sourced aggregates to include more than one aggregate. Indeed, this is a legitimate technique.
At the time of writing, this possibility is not mentioned in the list of traditional approaches on the dynamic consistency boundaries website, which lists only three options: eventual consistency, larger aggregate, reservation pattern.
Test case¶
The test case below calls assert_implementation()
with an instance of EnrolmentWithAggregates, configured to use an
in-memory event store and to use PostgreSQL. The third test method shows more explicitly that extending the
transactional consistency boundary when using event-sourced aggregates to include more than one aggregate is
technically sound, by checking that the recorded consistency of the course-student nexus is guarded against
concurrent operations.
class TestEnrolmentWithAggregates(EnrolmentTestCase):
def test_enrolment_in_memory(self) -> None:
self.assert_implementation(EnrolmentWithAggregates())
def test_enrolment_with_postgres(self) -> None:
env = {
"PERSISTENCE_MODULE": "eventsourcing.postgres",
"POSTGRES_DBNAME": "eventsourcing",
"POSTGRES_HOST": "127.0.0.1",
"POSTGRES_PORT": "5432",
"POSTGRES_USER": "eventsourcing",
"POSTGRES_PASSWORD": "eventsourcing",
}
try:
app = EnrolmentWithAggregates(env)
self.assert_implementation(app)
finally:
drop_tables()
def test_consistency_boundary(self) -> None:
app = EnrolmentWithAggregates()
# Register courses.
french = app.register_course("French", places=5)
# Register students.
sara = app.register_student("Sara", max_courses=3)
bastian = app.register_student("Bastian", max_courses=3)
# Try to break recorded consistency with concurrent operation.
assert isinstance(app, EnrolmentWithAggregates)
student = app.get_student(sara)
course = app.get_course(french)
student.join_course(course.id)
course.accept_student(student.id)
# During this operation, Bastian joins French.
app.join_course(bastian, french)
# Can't proceed with concurrent operation because course changed.
with self.assertRaises(IntegrityError):
app.save(student, course)
# Check Sara doesn't have French, and French doesn't have Sara.
self.assertNotIn("Sara", app.list_students_for_course(french))
self.assertNotIn("French", app.list_courses_for_student(sara))
Summary¶
Implementing the course subscriptions challenge with “traditional” event-sourced aggregates was straightforward. The application didn’t have any accidental complexity and performed well.
The transactional consistency boundary can legitimately be extended in include more than one aggregate. The meaning of “not less than” is “greater than or equal to”. It has been a common misapprehension that the “consistency boundary” notion in DDD is equal to one aggregate. The actual idea from DDD is that a database transactional consistency boundary must not be less than one aggregate. A consistency boundary that includes more than one aggregate, or indeed other things, has always been permitted by DDD.
Nevertheless, there are other reasons why DCB is an interesting novel approach for event sourcing, so let’s continue by implementing the course subscriptions challenge directly with basic DCB objects.
Code reference¶
- class examples.dcb_enrolment.interface.EnrolmentInterface[source]¶
Bases:
ABC- abstract join_course(student_id: StudentID, course_id: CourseID) None[source]¶
Enrol a student on a course.
- exception examples.dcb_enrolment.interface.StudentNotFoundError[source]¶
Bases:
ExceptionRaised when a student is not registered.
- exception examples.dcb_enrolment.interface.CourseNotFoundError[source]¶
Bases:
ExceptionRaised when a course is not registered.
- exception examples.dcb_enrolment.interface.TooManyCoursesError[source]¶
Bases:
ExceptionRaised when a student is already enrolled to a maximum number of courses.
- exception examples.dcb_enrolment.interface.FullyBookedError[source]¶
Bases:
ExceptionRaised when a course already has a maximum number of enrolled students.
- exception examples.dcb_enrolment.interface.NotAlreadyJoinedError[source]¶
Bases:
ExceptionRaised when a student is not already enrolled on a course.
- exception examples.dcb_enrolment.interface.AlreadyJoinedError[source]¶
Bases:
ExceptionRaised when a student is already enrolled on a course.
- class examples.dcb_enrolment.test_enrolment.EnrolmentTestCase(methodName='runTest')[source]¶
Bases:
TestCase- assert_implementation(app: EnrolmentInterface) None[source]¶
- class examples.dcb_enrolment.domainmodel.DomainEvent(originator_id: str, originator_version: int, timestamp: datetime)[source]¶
Bases:
Struct- originator_id: str¶
- originator_version: int¶
- timestamp: datetime¶
- class examples.dcb_enrolment.domainmodel.MsgspecStringIDEvent(originator_id: str, originator_version: int, timestamp: datetime)[source]¶
Bases:
DomainEvent,CanMutateAggregate[str]- originator_id_type¶
alias of
str
- class examples.dcb_enrolment.domainmodel.MsgspecStringIDCreatedEvent(originator_id: str, originator_version: int, timestamp: datetime, originator_topic: str)[source]¶
Bases:
DomainEvent,CanInitAggregate[str]- originator_topic: str¶
String describing the path to an aggregate class.
- originator_id_type¶
alias of
str
- class examples.dcb_enrolment.domainmodel.MsgspecStringIDAggregate(*args: Any, **kwargs: Any)[source]¶
Bases:
BaseAggregate[TID]- class Event(originator_id: str, originator_version: int, timestamp: datetime)[source]¶
Bases:
MsgspecStringIDEvent- originator_id_type¶
alias of
str
- class examples.dcb_enrolment.domainmodel.Aggregate(*args: Any, **kwargs: Any)[source]¶
Bases:
MsgspecStringIDAggregate[TID]
- class examples.dcb_enrolment.domainmodel.Student(*args: Any, **kwargs: Any)[source]¶
Bases:
Aggregate[StudentID]- class CourseJoined(originator_id: str, originator_version: int, timestamp: datetime, course_id: CourseID)¶
Bases:
DecoratedFuncCaller,Event- course_id: CourseID¶
- class examples.dcb_enrolment.domainmodel.Course(*args: Any, **kwargs: Any)[source]¶
Bases:
Aggregate[CourseID]- class Event(originator_id: str, originator_version: int, timestamp: datetime)¶
Bases:
Event- originator_id_type¶
alias of
str
- class StudentAccepted(originator_id: str, originator_version: int, timestamp: datetime, student_id: StudentID)¶
Bases:
DecoratedFuncCaller,Event- student_id: StudentID¶
- class examples.dcb_enrolment.application.EnrolmentWithAggregates(env: Mapping[str, str] | None = None)[source]¶
Bases:
Application[str],EnrolmentInterface- env: ClassVar[dict[str, str]] = {'MAPPER_TOPIC': 'examples.aggregate9.msgpack:MessagePackMapper', 'ORIGINATOR_ID_TYPE': 'text'}¶
- list_students_for_course(course_id: CourseID) list[str][source]¶
List students enrolled on a course.
- list_courses_for_student(student_id: StudentID) list[str][source]¶
List courses enrolled by a student.
- name = 'EnrolmentWithAggregates'¶