DCB 3 - Enduring Objects and Groups¶
Here we implement the course subscriptions challenge with a higher-level style for DCB that uses enduring objects and a group.
Domain model¶
The domain model defines Student
and Course as enduring objects.
The Student has an ID, a name, and
maximum number of courses.
class Student(EnduringObject[Decision, StudentID]):
class Registered(InitialDecision):
student_id: StudentID
name: str
max_courses: int
class NameUpdated(Decision):
name: str
class MaxCoursesUpdated(Decision):
max_courses: int
def __init__(self, name: str, max_courses: int) -> None:
self.name = name
self.max_courses = max_courses
self.course_ids: list[CourseID] = []
@event(NameUpdated)
def update_name(self, name: str) -> None:
self.name = name
@event(MaxCoursesUpdated)
def update_max_courses(self, max_courses: int) -> None:
self.max_courses = max_courses
@event(StudentJoinedCourse)
def _(self, course_id: CourseID) -> None:
if len(self.course_ids) >= self.max_courses:
raise TooManyCoursesError
self.course_ids.append(course_id)
@event(StudentLeftCourse)
def _(self, course_id: CourseID) -> None:
self.course_ids.remove(course_id)
The Course has an ID, a name, and
number of places for students.
class Course(EnduringObject[Decision, CourseID]):
class Registered(InitialDecision):
course_id: CourseID
name: str
places: int
class NameUpdated(Decision):
name: str
class PlacesUpdated(Decision):
places: int
def __init__(self, name: str, places: int) -> None:
self.name = name
self.places = places
self.student_ids: list[StudentID] = []
@event(NameUpdated)
def update_name(self, name: str) -> None:
self.name = name
@event(PlacesUpdated)
def update_places(self, places: int) -> None:
self.places = places
@event(StudentJoinedCourse)
def _(self, student_id: StudentID) -> None:
if student_id in self.student_ids:
raise AlreadyJoinedError
if len(self.student_ids) >= self.places:
raise FullyBookedError
self.student_ids.append(student_id)
@event(StudentLeftCourse)
def _(self, student_id: StudentID) -> None:
if student_id not in self.student_ids:
raise NotAlreadyJoinedError
self.student_ids.remove(student_id)
Both classes include the event The StudentJoinedCourse
in their projection, but neither has a command method for doing so.
The cross-cutting concern of a student joining and leaving a course is implemented with the group
StudentAndCourse.
class StudentAndCourse(Group[Decision]):
def __init__(
self,
student: Student | None,
course: Course | None,
) -> None:
if course is None:
raise CourseNotFoundError
if student is None:
raise StudentNotFoundError
self.student = student
self.course = course
def student_joins_course(self) -> None:
# The DCB magic: one event for "one fact".
self.trigger_event(
StudentJoinedCourse,
student_id=self.student.id,
course_id=self.course.id,
)
def student_leaves_course(self) -> None:
# The DCB magic: one event for "one fact".
self.trigger_event(
StudentLeftCourse,
student_id=self.student.id,
course_id=self.course.id,
)
class StudentJoinedCourse(Decision):
student_id: StudentID
course_id: CourseID
class StudentLeftCourse(Decision):
student_id: StudentID
course_id: CourseID
Application¶
The application class EnrolmentWithEnduringObjects
implements the enrolment interface. Unlike the previous example,
its methods are all very short three-line blocks, which mostly initialise or reconstruct a “perspective” (line 1),
make a new decision (line 2), and then append new events to the database (line 3). Because this style so easy to code,
we added more methods just for fun!
This version looks a lot like the application that uses event-sourced aggregates in the first example.
One interesting difference is the repository has methods not only to get one enduring object, but also to get many enduring objects in one request, and to get a “group” of enduring objects of a particular type.
class EnrolmentWithEnduringObjects(DCBApplication, EnrolmentInterface):
env: Mapping[str, str] = {
"MAPPER_TOPIC": get_topic(MessagePackMapper),
**DCBApplication.env,
}
def register_student(self, name: str, max_courses: int) -> StudentID:
student = Student(name=name, max_courses=max_courses)
self.repository.save(student)
return student.id
def register_course(self, name: str, places: int) -> CourseID:
course = Course(name=name, places=places)
self.repository.save(course)
return course.id
def join_course(self, student_id: StudentID, course_id: CourseID) -> None:
group = self.repository.get_group(StudentAndCourse, student_id, course_id)
group.student_joins_course()
self.repository.save(group)
def leave_course(self, student_id: StudentID, course_id: CourseID) -> None:
group = self.repository.get_group(StudentAndCourse, student_id, course_id)
group.student_leaves_course()
self.repository.save(group)
def list_students_for_course(self, course_id: CourseID) -> list[str]:
course = self.get_course(course_id)
students = self.repository.get_many(*course.student_ids)
return [cast(Student, c).name for c in students if c is not None]
def list_courses_for_student(self, student_id: StudentID) -> list[str]:
student = self.get_student(student_id)
courses = self.repository.get_many(*student.course_ids)
return [cast(Course, c).name for c in courses if c is not None]
def update_student_name(self, student_id: StudentID, name: str) -> None:
student = self.get_student(student_id)
student.update_name(name)
self.repository.save(student)
def update_max_courses(self, student_id: StudentID, max_courses: int) -> None:
student = self.get_student(student_id)
student.update_max_courses(max_courses)
self.repository.save(student)
def update_course_name(self, course_id: CourseID, name: str) -> None:
course = self.get_course(course_id)
course.update_name(name)
self.repository.save(course)
def update_places(self, course_id: CourseID, max_courses: int) -> None:
course = self.get_course(course_id)
course.update_places(max_courses)
self.repository.save(course)
def get_student(self, student_id: StudentID) -> Student:
return cast(Student, self.repository.get(student_id))
def get_course(self, course_id: CourseID) -> Course:
return cast(Course, self.repository.get(course_id))
Test case¶
The enrolment test case is extended for
EnrolmentWithEnduringObjects.
It has some extra steps to cover the extra methods that we have implemented.
class TestEnrolmentWithEnduringObjects(EnrolmentTestCase):
def test_enrolment_in_memory(self) -> None:
self.assert_implementation(EnrolmentWithEnduringObjects())
def test_enrolment_with_postgres(self) -> None:
env = {
"PERSISTENCE_MODULE": "eventsourcing.dcb.postgres_tt",
"POSTGRES_DBNAME": "eventsourcing",
"POSTGRES_HOST": "127.0.0.1",
"POSTGRES_PORT": "5432",
"POSTGRES_USER": "eventsourcing",
"POSTGRES_PASSWORD": "eventsourcing",
}
try:
self.assert_implementation(EnrolmentWithEnduringObjects(env))
finally:
drop_tables()
def test_enrolment_with_umadb(self) -> None:
env = {
"PERSISTENCE_MODULE": "eventsourcing_umadb",
"UMADB_URI": "http://127.0.0.1:50051",
}
self.assert_implementation(EnrolmentWithEnduringObjects(env))
def assert_implementation(self, app: EnrolmentInterface) -> None:
super().assert_implementation(app)
assert isinstance(app, EnrolmentWithEnduringObjects)
# Register student.
student_id = app.register_student(name="Max", max_courses=4)
# Update name.
app.update_student_name(student_id, "Maxine")
student = app.get_student(student_id)
self.assertEqual("Maxine", student.name)
# Update max_courses.
app.update_max_courses(student_id, 10)
student = app.get_student(student_id)
self.assertEqual(10, student.max_courses)
# Register course.
course_id = app.register_course(name="Bio", places=3)
# Update name.
app.update_course_name(course_id, "Biology")
course = app.get_course(course_id)
self.assertEqual("Biology", course.name)
# Update places.
app.update_places(course_id, 10)
course = app.get_course(course_id)
self.assertEqual(10, course.places)
# Join course.
app.join_course(student_id=student_id, course_id=course_id)
student = app.get_student(student_id)
course = app.get_course(course_id)
self.assertEqual(student.course_ids, [course_id])
self.assertEqual(course.student_ids, [student_id])
# Leave course.
app.leave_course(student_id=student_id, course_id=course_id)
student = app.get_student(student_id)
course = app.get_course(course_id)
self.assertEqual(student.course_ids, [])
self.assertEqual(course.student_ids, [])
# Can operate on enduring objects in group.
group = app.repository.get_group(StudentAndCourse, student_id, course_id)
group.student.update_max_courses(100)
app.repository.save(group.student)
student = app.get_student(student_id)
self.assertEqual(100, student.max_courses)
# Check concurrent change raises IntegrityError.
group = app.repository.get_group(StudentAndCourse, student_id, course_id)
group.student_joins_course()
app.update_max_courses(student_id, 1)
with self.assertRaises(IntegrityError):
app.repository.save(group)
# Check concurrent change raises IntegrityError.
group = app.repository.get_group(StudentAndCourse, student_id, course_id)
group.student_joins_course()
app.update_student_name(student_id, "Maxy")
with self.assertRaises(IntegrityError):
app.repository.save(group)
# Check get_many() preserves order.
objs = app.repository.get_many(course_id, student_id)
self.assertEqual([course_id, student_id], [o.id for o in objs if o])
objs = app.repository.get_many(student_id, course_id)
self.assertEqual([student_id, course_id], [o.id for o in objs if o])
# Can't call non-command underscore methods.
with self.assertRaisesRegex(ProgrammingError, "cannot be used"):
student._(course_id=course_id)
It has some extra steps to cover the extra methods that were added to make further use of the more declarative syntax for DCB, such as a student leaving a course, changes of name of students and courses, and changes to the number of “places” a course has and the “max courses” for student.
The extra steps also show the command methods of enduring objects in a group can be executed. New events from the group and from its enduring objects are collected when a group is saved.
The extra steps also show that concurrent changes to enduring objects conflict with the student-course group, both for relevant events (such as changes to the ‘max_courses’ and ‘places’ numbers which are relevant when a student joins a course) but also for irrelevant events (such as changed to a student’s name). It would be possible to reconstruct enduring objects and groups with more restricted consistency boundaries, but then attributes that would be updated from events that are not included will have stale values.
Discussion¶
This is just one way of styling a slightly higher-level abstraction over the objects and methods described in the DCB specification.
Overall, we think it expresses the concerns in the “course subscriptions” challenge clearly and concisely. The conceptual overhead is relatively low. However, the full flexibility of DCB is not utilised by this approach, since each consistency boundary for an enduring object includes its full sequence of events. This means that commands which in fact are not contentious, such as changing the name of a student whilst the student is being enrolled on a course, could conflict if undertaken concurrently, which is unnecessary. This aspect is remedied by using the “vertical slices” style that is shown in the next example, perhaps at the cost of increased conceptual overhead.
Code reference¶
- class examples.dcb_enrolment_with_enduring_objects.application.StudentJoinedCourse(student_id: StudentID, course_id: CourseID)[source]¶
Bases:
Decision- student_id: StudentID¶
- course_id: CourseID¶
- class examples.dcb_enrolment_with_enduring_objects.application.StudentLeftCourse(student_id: StudentID, course_id: CourseID)[source]¶
Bases:
Decision- student_id: StudentID¶
- course_id: CourseID¶
- class examples.dcb_enrolment_with_enduring_objects.application.Student(**kwargs: Any)[source]¶
Bases:
EnduringObject[Decision,StudentID]- class Registered(originator_topic: str, student_id: StudentID, name: str, max_courses: int)[source]¶
Bases:
InitialDecision- student_id: StudentID¶
- name: str¶
- max_courses: int¶
- projected_types = [<class 'examples.dcb_enrolment_with_enduring_objects.application.Student.NameUpdated'>, <class 'examples.dcb_enrolment_with_enduring_objects.application.Student.MaxCoursesUpdated'>, <class 'examples.dcb_enrolment_with_enduring_objects.application.StudentJoinedCourse'>, <class 'examples.dcb_enrolment_with_enduring_objects.application.StudentLeftCourse'>]¶
- id: TID¶
- last_known_position: int | None¶
- class examples.dcb_enrolment_with_enduring_objects.application.Course(**kwargs: Any)[source]¶
Bases:
EnduringObject[Decision,CourseID]- class Registered(originator_topic: str, course_id: CourseID, name: str, places: int)[source]¶
Bases:
InitialDecision- course_id: CourseID¶
- name: str¶
- places: int¶
- projected_types = [<class 'examples.dcb_enrolment_with_enduring_objects.application.Course.NameUpdated'>, <class 'examples.dcb_enrolment_with_enduring_objects.application.Course.PlacesUpdated'>, <class 'examples.dcb_enrolment_with_enduring_objects.application.StudentJoinedCourse'>, <class 'examples.dcb_enrolment_with_enduring_objects.application.StudentLeftCourse'>]¶
- id: TID¶
- last_known_position: int | None¶
- class examples.dcb_enrolment_with_enduring_objects.application.StudentAndCourse(*args: Any, **kwargs: Any)[source]¶
- class examples.dcb_enrolment_with_enduring_objects.application.EnrolmentWithEnduringObjects(env: Mapping[str, str] | None = None)[source]¶
Bases:
DCBApplication,EnrolmentInterface- env: Mapping[str, str] = {'MAPPER_TOPIC': 'eventsourcing.dcb.msgpack:MessagePackMapper', 'PERSISTENCE_MODULE': 'eventsourcing.dcb.popo'}¶
- list_students_for_course(course_id: CourseID) list[str][source]¶
List students enrolled on a course.
- list_courses_for_student(student_id: StudentID) list[str][source]¶
List courses enrolled by a student.
- name = 'EnrolmentWithEnduringObjects'¶