Source code for examples.dcb_enrolment_with_enduring_objects.application

from __future__ import annotations

from collections.abc import Sequence
from typing import TYPE_CHECKING, cast

from eventsourcing.dcb.application import (
    DCBApplication,
)
from eventsourcing.dcb.domain import (
    EnduringObject,
    Group,
)
from eventsourcing.dcb.msgpack import (
    Decision,
    InitialDecision,
    MessagePackMapper,
)
from eventsourcing.domain import event
from eventsourcing.utils import get_topic
from examples.dcb_enrolment.interface import (
    AlreadyJoinedError,
    CourseID,
    CourseNotFoundError,
    EnrolmentInterface,
    FullyBookedError,
    NotAlreadyJoinedError,
    StudentID,
    StudentNotFoundError,
    TooManyCoursesError,
)

if TYPE_CHECKING:
    from collections.abc import Mapping


[docs] class StudentJoinedCourse(Decision): student_id: StudentID course_id: CourseID
[docs] class StudentLeftCourse(Decision): student_id: StudentID course_id: CourseID
[docs] class Student(EnduringObject[Decision, StudentID]):
[docs] class Registered(InitialDecision): student_id: StudentID name: str max_courses: int
[docs] class NameUpdated(Decision): name: str
[docs] class MaxCoursesUpdated(Decision): max_courses: int
[docs] def __init__(self, name: str, max_courses: int) -> None: self.name = name self.max_courses = max_courses self.course_ids: list[CourseID] = []
[docs] @event(NameUpdated) def update_name(self, name: str) -> None: self.name = name
[docs] @event(MaxCoursesUpdated) def update_max_courses(self, max_courses: int) -> None: self.max_courses = max_courses
@event(StudentJoinedCourse) def _(self, course_id: CourseID) -> None: if len(self.course_ids) >= self.max_courses: raise TooManyCoursesError self.course_ids.append(course_id) @event(StudentLeftCourse) def _(self, course_id: CourseID) -> None: self.course_ids.remove(course_id)
[docs] class Course(EnduringObject[Decision, CourseID]):
[docs] class Registered(InitialDecision): course_id: CourseID name: str places: int
[docs] class NameUpdated(Decision): name: str
[docs] class PlacesUpdated(Decision): places: int
[docs] def __init__(self, name: str, places: int) -> None: self.name = name self.places = places self.student_ids: list[StudentID] = []
[docs] @event(NameUpdated) def update_name(self, name: str) -> None: self.name = name
[docs] @event(PlacesUpdated) def update_places(self, places: int) -> None: self.places = places
@event(StudentJoinedCourse) def _(self, student_id: StudentID) -> None: if student_id in self.student_ids: raise AlreadyJoinedError if len(self.student_ids) >= self.places: raise FullyBookedError self.student_ids.append(student_id) @event(StudentLeftCourse) def _(self, student_id: StudentID) -> None: if student_id not in self.student_ids: raise NotAlreadyJoinedError self.student_ids.remove(student_id)
[docs] class StudentAndCourse(Group[Decision]):
[docs] def __init__( self, student: Student | None, course: Course | None, ) -> None: if course is None: raise CourseNotFoundError if student is None: raise StudentNotFoundError self.student = student self.course = course
[docs] def student_joins_course(self) -> None: # The DCB magic: one event for "one fact". self.trigger_event( StudentJoinedCourse, student_id=self.student.id, course_id=self.course.id, )
[docs] def student_leaves_course(self) -> None: # The DCB magic: one event for "one fact". self.trigger_event( StudentLeftCourse, student_id=self.student.id, course_id=self.course.id, )
[docs] class EnrolmentWithEnduringObjects(DCBApplication, EnrolmentInterface): env: Mapping[str, str] = { "MAPPER_TOPIC": get_topic(MessagePackMapper), **DCBApplication.env, }
[docs] def register_student(self, name: str, max_courses: int) -> StudentID: student = Student(name=name, max_courses=max_courses) self.repository.save(student) return student.id
[docs] def register_course(self, name: str, places: int) -> CourseID: course = Course(name=name, places=places) self.repository.save(course) return course.id
[docs] def join_course(self, student_id: StudentID, course_id: CourseID) -> None: group = self.repository.get_group(StudentAndCourse, student_id, course_id) group.student_joins_course() self.repository.save(group)
[docs] def leave_course(self, student_id: StudentID, course_id: CourseID) -> None: group = self.repository.get_group(StudentAndCourse, student_id, course_id) group.student_leaves_course() self.repository.save(group)
[docs] def list_students_for_course(self, course_id: CourseID) -> list[str]: course = self.get_course(course_id) students = self.repository.get_many(*course.student_ids) return [cast(Student, c).name for c in students if c is not None]
[docs] def list_courses_for_student(self, student_id: StudentID) -> list[str]: student = self.get_student(student_id) courses = self.repository.get_many(*student.course_ids) return [cast(Course, c).name for c in courses if c is not None]
[docs] def update_student_name(self, student_id: StudentID, name: str) -> None: student = self.get_student(student_id) student.update_name(name) self.repository.save(student)
[docs] def update_max_courses(self, student_id: StudentID, max_courses: int) -> None: student = self.get_student(student_id) student.update_max_courses(max_courses) self.repository.save(student)
[docs] def update_course_name(self, course_id: CourseID, name: str) -> None: course = self.get_course(course_id) course.update_name(name) self.repository.save(course)
[docs] def update_places(self, course_id: CourseID, max_courses: int) -> None: course = self.get_course(course_id) course.update_places(max_courses) self.repository.save(course)
[docs] def get_student(self, student_id: StudentID) -> Student: return cast(Student, self.repository.get(student_id))
[docs] def get_course(self, course_id: CourseID) -> Course: return cast(Course, self.repository.get(course_id))
DecisionTypes = Sequence[type[Decision]]