DCB 3 - Enduring Objects and Groups

Here we implement the course subscriptions challenge with a higher-level style for DCB that uses enduring objects and a group.

Domain model

The domain model defines Student and Course as enduring objects.

The Student has an ID, a name, and maximum number of courses.

class Student(EnduringObject[Decision, StudentID]):
    class Registered(InitialDecision):
        student_id: StudentID
        name: str
        max_courses: int

    class NameUpdated(Decision):
        name: str

    class MaxCoursesUpdated(Decision):
        max_courses: int

    def __init__(self, name: str, max_courses: int) -> None:
        self.name = name
        self.max_courses = max_courses
        self.course_ids: list[CourseID] = []

    @event(NameUpdated)
    def update_name(self, name: str) -> None:
        self.name = name

    @event(MaxCoursesUpdated)
    def update_max_courses(self, max_courses: int) -> None:
        self.max_courses = max_courses

    @event(StudentJoinedCourse)
    def _(self, course_id: CourseID) -> None:
        if len(self.course_ids) >= self.max_courses:
            raise TooManyCoursesError
        self.course_ids.append(course_id)

    @event(StudentLeftCourse)
    def _(self, course_id: CourseID) -> None:
        self.course_ids.remove(course_id)

The Course has an ID, a name, and number of places for students.

class Course(EnduringObject[Decision, CourseID]):
    class Registered(InitialDecision):
        course_id: CourseID
        name: str
        places: int

    class NameUpdated(Decision):
        name: str

    class PlacesUpdated(Decision):
        places: int

    def __init__(self, name: str, places: int) -> None:
        self.name = name
        self.places = places
        self.student_ids: list[StudentID] = []

    @event(NameUpdated)
    def update_name(self, name: str) -> None:
        self.name = name

    @event(PlacesUpdated)
    def update_places(self, places: int) -> None:
        self.places = places

    @event(StudentJoinedCourse)
    def _(self, student_id: StudentID) -> None:
        if student_id in self.student_ids:
            raise AlreadyJoinedError
        if len(self.student_ids) >= self.places:
            raise FullyBookedError
        self.student_ids.append(student_id)

    @event(StudentLeftCourse)
    def _(self, student_id: StudentID) -> None:
        if student_id not in self.student_ids:
            raise NotAlreadyJoinedError
        self.student_ids.remove(student_id)

Both classes include the event The StudentJoinedCourse in their projection, but neither has a command method for doing so.

The cross-cutting concern of a student joining and leaving a course is implemented with the group StudentAndCourse.

class StudentAndCourse(Group[Decision]):
    def __init__(
        self,
        student: Student | None,
        course: Course | None,
    ) -> None:
        if course is None:
            raise CourseNotFoundError
        if student is None:
            raise StudentNotFoundError
        self.student = student
        self.course = course

    def student_joins_course(self) -> None:
        # The DCB magic: one event for "one fact".
        self.trigger_event(
            StudentJoinedCourse,
            student_id=self.student.id,
            course_id=self.course.id,
        )

    def student_leaves_course(self) -> None:
        # The DCB magic: one event for "one fact".
        self.trigger_event(
            StudentLeftCourse,
            student_id=self.student.id,
            course_id=self.course.id,
        )
class StudentJoinedCourse(Decision):
    student_id: StudentID
    course_id: CourseID
class StudentLeftCourse(Decision):
    student_id: StudentID
    course_id: CourseID

Application

The application class EnrolmentWithEnduringObjects implements the enrolment interface. Unlike the previous example, its methods are all very short three-line blocks, which mostly initialise or reconstruct a “perspective” (line 1), make a new decision (line 2), and then append new events to the database (line 3). Because this style so easy to code, we added more methods just for fun!

This version looks a lot like the application that uses event-sourced aggregates in the first example.

One interesting difference is the repository has methods not only to get one enduring object, but also to get many enduring objects in one request, and to get a “group” of enduring objects of a particular type.

class EnrolmentWithEnduringObjects(DCBApplication, EnrolmentInterface):
    env: Mapping[str, str] = {
        "MAPPER_TOPIC": get_topic(MessagePackMapper),
        **DCBApplication.env,
    }

    def register_student(self, name: str, max_courses: int) -> StudentID:
        student = Student(name=name, max_courses=max_courses)
        self.repository.save(student)
        return student.id

    def register_course(self, name: str, places: int) -> CourseID:
        course = Course(name=name, places=places)
        self.repository.save(course)
        return course.id

    def join_course(self, student_id: StudentID, course_id: CourseID) -> None:
        group = self.repository.get_group(StudentAndCourse, student_id, course_id)
        group.student_joins_course()
        self.repository.save(group)

    def leave_course(self, student_id: StudentID, course_id: CourseID) -> None:
        group = self.repository.get_group(StudentAndCourse, student_id, course_id)
        group.student_leaves_course()
        self.repository.save(group)

    def list_students_for_course(self, course_id: CourseID) -> list[str]:
        course = self.get_course(course_id)
        students = self.repository.get_many(*course.student_ids)
        return [cast(Student, c).name for c in students if c is not None]

    def list_courses_for_student(self, student_id: StudentID) -> list[str]:
        student = self.get_student(student_id)
        courses = self.repository.get_many(*student.course_ids)
        return [cast(Course, c).name for c in courses if c is not None]

    def update_student_name(self, student_id: StudentID, name: str) -> None:
        student = self.get_student(student_id)
        student.update_name(name)
        self.repository.save(student)

    def update_max_courses(self, student_id: StudentID, max_courses: int) -> None:
        student = self.get_student(student_id)
        student.update_max_courses(max_courses)
        self.repository.save(student)

    def update_course_name(self, course_id: CourseID, name: str) -> None:
        course = self.get_course(course_id)
        course.update_name(name)
        self.repository.save(course)

    def update_places(self, course_id: CourseID, max_courses: int) -> None:
        course = self.get_course(course_id)
        course.update_places(max_courses)
        self.repository.save(course)

    def get_student(self, student_id: StudentID) -> Student:
        return cast(Student, self.repository.get(student_id))

    def get_course(self, course_id: CourseID) -> Course:
        return cast(Course, self.repository.get(course_id))

Test case

The enrolment test case is extended for EnrolmentWithEnduringObjects.

It has some extra steps to cover the extra methods that we have implemented.

class TestEnrolmentWithEnduringObjects(EnrolmentTestCase):
    def test_enrolment_in_memory(self) -> None:
        self.assert_implementation(EnrolmentWithEnduringObjects())

    def test_enrolment_with_postgres(self) -> None:
        env = {
            "PERSISTENCE_MODULE": "eventsourcing.dcb.postgres_tt",
            "POSTGRES_DBNAME": "eventsourcing",
            "POSTGRES_HOST": "127.0.0.1",
            "POSTGRES_PORT": "5432",
            "POSTGRES_USER": "eventsourcing",
            "POSTGRES_PASSWORD": "eventsourcing",
        }
        try:
            self.assert_implementation(EnrolmentWithEnduringObjects(env))
        finally:
            drop_tables()

    def test_enrolment_with_umadb(self) -> None:
        env = {
            "PERSISTENCE_MODULE": "eventsourcing_umadb",
            "UMADB_URI": "http://127.0.0.1:50051",
        }
        self.assert_implementation(EnrolmentWithEnduringObjects(env))

    def assert_implementation(self, app: EnrolmentInterface) -> None:
        super().assert_implementation(app)

        assert isinstance(app, EnrolmentWithEnduringObjects)
        # Register student.
        student_id = app.register_student(name="Max", max_courses=4)

        # Update name.
        app.update_student_name(student_id, "Maxine")
        student = app.get_student(student_id)
        self.assertEqual("Maxine", student.name)

        # Update max_courses.
        app.update_max_courses(student_id, 10)
        student = app.get_student(student_id)
        self.assertEqual(10, student.max_courses)

        # Register course.
        course_id = app.register_course(name="Bio", places=3)

        # Update name.
        app.update_course_name(course_id, "Biology")
        course = app.get_course(course_id)
        self.assertEqual("Biology", course.name)

        # Update places.
        app.update_places(course_id, 10)
        course = app.get_course(course_id)
        self.assertEqual(10, course.places)

        # Join course.
        app.join_course(student_id=student_id, course_id=course_id)
        student = app.get_student(student_id)
        course = app.get_course(course_id)
        self.assertEqual(student.course_ids, [course_id])
        self.assertEqual(course.student_ids, [student_id])

        # Leave course.
        app.leave_course(student_id=student_id, course_id=course_id)
        student = app.get_student(student_id)
        course = app.get_course(course_id)
        self.assertEqual(student.course_ids, [])
        self.assertEqual(course.student_ids, [])

        # Can operate on enduring objects in group.
        group = app.repository.get_group(StudentAndCourse, student_id, course_id)
        group.student.update_max_courses(100)
        app.repository.save(group.student)
        student = app.get_student(student_id)
        self.assertEqual(100, student.max_courses)

        # Check concurrent change raises IntegrityError.
        group = app.repository.get_group(StudentAndCourse, student_id, course_id)
        group.student_joins_course()
        app.update_max_courses(student_id, 1)
        with self.assertRaises(IntegrityError):
            app.repository.save(group)

        # Check concurrent change raises IntegrityError.
        group = app.repository.get_group(StudentAndCourse, student_id, course_id)
        group.student_joins_course()
        app.update_student_name(student_id, "Maxy")
        with self.assertRaises(IntegrityError):
            app.repository.save(group)

        # Check get_many() preserves order.
        objs = app.repository.get_many(course_id, student_id)
        self.assertEqual([course_id, student_id], [o.id for o in objs if o])
        objs = app.repository.get_many(student_id, course_id)
        self.assertEqual([student_id, course_id], [o.id for o in objs if o])

        # Can't call non-command underscore methods.
        with self.assertRaisesRegex(ProgrammingError, "cannot be used"):
            student._(course_id=course_id)

It has some extra steps to cover the extra methods that were added to make further use of the more declarative syntax for DCB, such as a student leaving a course, changes of name of students and courses, and changes to the number of “places” a course has and the “max courses” for student.

The extra steps also show the command methods of enduring objects in a group can be executed. New events from the group and from its enduring objects are collected when a group is saved.

The extra steps also show that concurrent changes to enduring objects conflict with the student-course group, both for relevant events (such as changes to the ‘max_courses’ and ‘places’ numbers which are relevant when a student joins a course) but also for irrelevant events (such as changed to a student’s name). It would be possible to reconstruct enduring objects and groups with more restricted consistency boundaries, but then attributes that would be updated from events that are not included will have stale values.

Discussion

This is just one way of styling a slightly higher-level abstraction over the objects and methods described in the DCB specification.

Overall, we think it expresses the concerns in the “course subscriptions” challenge clearly and concisely. The conceptual overhead is relatively low. However, the full flexibility of DCB is not utilised by this approach, since each consistency boundary for an enduring object includes its full sequence of events. This means that commands which in fact are not contentious, such as changing the name of a student whilst the student is being enrolled on a course, could conflict if undertaken concurrently, which is unnecessary. This aspect is remedied by using the “vertical slices” style that is shown in the next example, perhaps at the cost of increased conceptual overhead.

Code reference

class examples.dcb_enrolment_with_enduring_objects.application.StudentJoinedCourse(student_id: StudentID, course_id: CourseID)[source]

Bases: Decision

student_id: StudentID
course_id: CourseID
class examples.dcb_enrolment_with_enduring_objects.application.StudentLeftCourse(student_id: StudentID, course_id: CourseID)[source]

Bases: Decision

student_id: StudentID
course_id: CourseID
class examples.dcb_enrolment_with_enduring_objects.application.Student(**kwargs: Any)[source]

Bases: EnduringObject[Decision, StudentID]

class Registered(originator_topic: str, student_id: StudentID, name: str, max_courses: int)[source]

Bases: InitialDecision

student_id: StudentID
name: str
max_courses: int
class NameUpdated(name: str)[source]

Bases: Decision

name: str
class MaxCoursesUpdated(max_courses: int)[source]

Bases: Decision

max_courses: int
__init__(name: str, max_courses: int) None[source]
update_name(name: str) None[source]
update_max_courses(max_courses: int) None[source]
projected_types = [<class 'examples.dcb_enrolment_with_enduring_objects.application.Student.NameUpdated'>, <class 'examples.dcb_enrolment_with_enduring_objects.application.Student.MaxCoursesUpdated'>, <class 'examples.dcb_enrolment_with_enduring_objects.application.StudentJoinedCourse'>, <class 'examples.dcb_enrolment_with_enduring_objects.application.StudentLeftCourse'>]
id: TID
last_known_position: int | None
new_decisions: list[Tagged[TDecision]]
class examples.dcb_enrolment_with_enduring_objects.application.Course(**kwargs: Any)[source]

Bases: EnduringObject[Decision, CourseID]

class Registered(originator_topic: str, course_id: CourseID, name: str, places: int)[source]

Bases: InitialDecision

course_id: CourseID
name: str
places: int
class NameUpdated(name: str)[source]

Bases: Decision

name: str
class PlacesUpdated(places: int)[source]

Bases: Decision

places: int
__init__(name: str, places: int) None[source]
update_name(name: str) None[source]
update_places(places: int) None[source]
projected_types = [<class 'examples.dcb_enrolment_with_enduring_objects.application.Course.NameUpdated'>, <class 'examples.dcb_enrolment_with_enduring_objects.application.Course.PlacesUpdated'>, <class 'examples.dcb_enrolment_with_enduring_objects.application.StudentJoinedCourse'>, <class 'examples.dcb_enrolment_with_enduring_objects.application.StudentLeftCourse'>]
id: TID
last_known_position: int | None
new_decisions: list[Tagged[TDecision]]
class examples.dcb_enrolment_with_enduring_objects.application.StudentAndCourse(*args: Any, **kwargs: Any)[source]

Bases: Group[Decision]

__init__(student: Student | None, course: Course | None) None[source]
student_joins_course() None[source]
student_leaves_course() None[source]
class examples.dcb_enrolment_with_enduring_objects.application.EnrolmentWithEnduringObjects(env: Mapping[str, str] | None = None)[source]

Bases: DCBApplication, EnrolmentInterface

env: Mapping[str, str] = {'MAPPER_TOPIC': 'eventsourcing.dcb.msgpack:MessagePackMapper', 'PERSISTENCE_MODULE': 'eventsourcing.dcb.popo'}
register_student(name: str, max_courses: int) StudentID[source]

Register a new student.

register_course(name: str, places: int) CourseID[source]

Register a new course.

join_course(student_id: StudentID, course_id: CourseID) None[source]

Enrol a student on a course.

leave_course(student_id: StudentID, course_id: CourseID) None[source]
list_students_for_course(course_id: CourseID) list[str][source]

List students enrolled on a course.

list_courses_for_student(student_id: StudentID) list[str][source]

List courses enrolled by a student.

update_student_name(student_id: StudentID, name: str) None[source]
update_max_courses(student_id: StudentID, max_courses: int) None[source]
update_course_name(course_id: CourseID, name: str) None[source]
update_places(course_id: CourseID, max_courses: int) None[source]
get_student(student_id: StudentID) Student[source]
get_course(course_id: CourseID) Course[source]
name = 'EnrolmentWithEnduringObjects'