from __future__ import annotations
from collections.abc import Sequence
from typing import TYPE_CHECKING, cast
from eventsourcing.dcb.application import (
DCBApplication,
)
from eventsourcing.dcb.domain import (
EnduringObject,
Group,
)
from eventsourcing.dcb.msgpack import (
Decision,
InitialDecision,
MessagePackMapper,
)
from eventsourcing.domain import event
from eventsourcing.utils import get_topic
from examples.dcb_enrolment.interface import (
AlreadyJoinedError,
CourseID,
CourseNotFoundError,
EnrolmentInterface,
FullyBookedError,
NotAlreadyJoinedError,
StudentID,
StudentNotFoundError,
TooManyCoursesError,
)
if TYPE_CHECKING:
from collections.abc import Mapping
[docs]
class StudentJoinedCourse(Decision):
student_id: StudentID
course_id: CourseID
[docs]
class StudentLeftCourse(Decision):
student_id: StudentID
course_id: CourseID
[docs]
class Student(EnduringObject[Decision, StudentID]):
[docs]
class Registered(InitialDecision):
student_id: StudentID
name: str
max_courses: int
[docs]
class NameUpdated(Decision):
name: str
[docs]
class MaxCoursesUpdated(Decision):
max_courses: int
[docs]
def __init__(self, name: str, max_courses: int) -> None:
self.name = name
self.max_courses = max_courses
self.course_ids: list[CourseID] = []
[docs]
@event(NameUpdated)
def update_name(self, name: str) -> None:
self.name = name
[docs]
@event(MaxCoursesUpdated)
def update_max_courses(self, max_courses: int) -> None:
self.max_courses = max_courses
@event(StudentJoinedCourse)
def _(self, course_id: CourseID) -> None:
if len(self.course_ids) >= self.max_courses:
raise TooManyCoursesError
self.course_ids.append(course_id)
@event(StudentLeftCourse)
def _(self, course_id: CourseID) -> None:
self.course_ids.remove(course_id)
[docs]
class Course(EnduringObject[Decision, CourseID]):
[docs]
class Registered(InitialDecision):
course_id: CourseID
name: str
places: int
[docs]
class NameUpdated(Decision):
name: str
[docs]
class PlacesUpdated(Decision):
places: int
[docs]
def __init__(self, name: str, places: int) -> None:
self.name = name
self.places = places
self.student_ids: list[StudentID] = []
[docs]
@event(NameUpdated)
def update_name(self, name: str) -> None:
self.name = name
[docs]
@event(PlacesUpdated)
def update_places(self, places: int) -> None:
self.places = places
@event(StudentJoinedCourse)
def _(self, student_id: StudentID) -> None:
if student_id in self.student_ids:
raise AlreadyJoinedError
if len(self.student_ids) >= self.places:
raise FullyBookedError
self.student_ids.append(student_id)
@event(StudentLeftCourse)
def _(self, student_id: StudentID) -> None:
if student_id not in self.student_ids:
raise NotAlreadyJoinedError
self.student_ids.remove(student_id)
[docs]
class StudentAndCourse(Group[Decision]):
[docs]
def __init__(
self,
student: Student | None,
course: Course | None,
) -> None:
if course is None:
raise CourseNotFoundError
if student is None:
raise StudentNotFoundError
self.student = student
self.course = course
[docs]
def student_joins_course(self) -> None:
# The DCB magic: one event for "one fact".
self.trigger_event(
StudentJoinedCourse,
student_id=self.student.id,
course_id=self.course.id,
)
[docs]
def student_leaves_course(self) -> None:
# The DCB magic: one event for "one fact".
self.trigger_event(
StudentLeftCourse,
student_id=self.student.id,
course_id=self.course.id,
)
[docs]
class EnrolmentWithEnduringObjects(DCBApplication, EnrolmentInterface):
env: Mapping[str, str] = {
"MAPPER_TOPIC": get_topic(MessagePackMapper),
**DCBApplication.env,
}
[docs]
def register_student(self, name: str, max_courses: int) -> StudentID:
student = Student(name=name, max_courses=max_courses)
self.repository.save(student)
return student.id
[docs]
def register_course(self, name: str, places: int) -> CourseID:
course = Course(name=name, places=places)
self.repository.save(course)
return course.id
[docs]
def join_course(self, student_id: StudentID, course_id: CourseID) -> None:
group = self.repository.get_group(StudentAndCourse, student_id, course_id)
group.student_joins_course()
self.repository.save(group)
[docs]
def leave_course(self, student_id: StudentID, course_id: CourseID) -> None:
group = self.repository.get_group(StudentAndCourse, student_id, course_id)
group.student_leaves_course()
self.repository.save(group)
[docs]
def list_students_for_course(self, course_id: CourseID) -> list[str]:
course = self.get_course(course_id)
students = self.repository.get_many(*course.student_ids)
return [cast(Student, c).name for c in students if c is not None]
[docs]
def list_courses_for_student(self, student_id: StudentID) -> list[str]:
student = self.get_student(student_id)
courses = self.repository.get_many(*student.course_ids)
return [cast(Course, c).name for c in courses if c is not None]
[docs]
def update_student_name(self, student_id: StudentID, name: str) -> None:
student = self.get_student(student_id)
student.update_name(name)
self.repository.save(student)
[docs]
def update_max_courses(self, student_id: StudentID, max_courses: int) -> None:
student = self.get_student(student_id)
student.update_max_courses(max_courses)
self.repository.save(student)
[docs]
def update_course_name(self, course_id: CourseID, name: str) -> None:
course = self.get_course(course_id)
course.update_name(name)
self.repository.save(course)
[docs]
def update_places(self, course_id: CourseID, max_courses: int) -> None:
course = self.get_course(course_id)
course.update_places(max_courses)
self.repository.save(course)
[docs]
def get_student(self, student_id: StudentID) -> Student:
return cast(Student, self.repository.get(student_id))
[docs]
def get_course(self, course_id: CourseID) -> Course:
return cast(Course, self.repository.get(course_id))
DecisionTypes = Sequence[type[Decision]]