from __future__ import annotations
from collections.abc import Sequence
from typing import TYPE_CHECKING
from uuid import uuid4
from eventsourcing.dcb.application import (
DCBApplication,
)
from eventsourcing.dcb.domain import (
Selector,
Slice,
)
from eventsourcing.dcb.msgpack import Decision, MessagePackMapper
from eventsourcing.domain import event
from eventsourcing.utils import get_topic
from examples.dcb_enrolment.interface import (
AlreadyJoinedError,
CourseID,
CourseNotFoundError,
EnrolmentInterface,
FullyBookedError,
NotAlreadyJoinedError,
StudentID,
StudentNotFoundError,
TooManyCoursesError,
)
if TYPE_CHECKING:
from collections.abc import Mapping
[docs]
class StudentJoinedCourse(Decision):
student_id: StudentID
course_id: CourseID
[docs]
class StudentLeftCourse(Decision):
student_id: StudentID
course_id: CourseID
[docs]
class StudentRegistered(Decision):
student_id: StudentID
name: str
max_courses: int
[docs]
class StudentNameUpdated(Decision):
student_id: StudentID
name: str
[docs]
class StudentMaxCoursesUpdated(Decision):
student_id: StudentID
max_courses: int
[docs]
class CourseRegistered(Decision):
course_id: CourseID
name: str
places: int
[docs]
class CourseNameUpdated(Decision):
course_id: CourseID
name: str
[docs]
class CoursePlacesUpdated(Decision):
course_id: CourseID
places: int
[docs]
class RegisterStudent(Slice[Decision]):
[docs]
def __init__(self, name: str, max_courses: int):
self.student_id = StudentID(f"student-{uuid4()}")
self.name = name
self.max_courses = max_courses
[docs]
def consistency_boundary(self) -> Selector:
return Selector(types=[StudentRegistered], tags=[self.student_id])
[docs]
def execute(self) -> None:
self.trigger_event(
StudentRegistered,
tags=[self.student_id],
student_id=self.student_id,
name=self.name,
max_courses=self.max_courses,
)
[docs]
class UpdateStudentName(Slice[Decision]):
[docs]
def __init__(self, student_id: StudentID, name: str) -> None:
self.id = student_id
self.name = name
self.student_was_registered: bool = False
[docs]
def consistency_boundary(self) -> Selector:
return Selector(types=[StudentRegistered, StudentNameUpdated], tags=[self.id])
@event(StudentRegistered)
def _(self) -> None:
self.student_was_registered = True
[docs]
def execute(self) -> None:
assert self.student_was_registered
self.trigger_event(
StudentNameUpdated,
tags=[self.id],
student_id=self.id,
name=self.name,
)
[docs]
class UpdateMaxCourses(Slice[Decision]):
[docs]
def __init__(self, student_id: StudentID, max_courses: int) -> None:
self.student_was_registered: bool = False
self.id = student_id
self.max_courses = max_courses
[docs]
def consistency_boundary(self) -> Selector:
return Selector(
types=[StudentRegistered, StudentMaxCoursesUpdated],
tags=[self.id],
)
@event(StudentRegistered)
def _(self) -> None:
self.student_was_registered = True
[docs]
def execute(self) -> None:
assert self.student_was_registered
self.trigger_event(
StudentMaxCoursesUpdated,
tags=[self.id],
student_id=self.id,
max_courses=self.max_courses,
)
[docs]
class RegisterCourse(Slice[Decision]):
[docs]
def __init__(self, name: str, places: int):
self.course_id = CourseID(f"course-{uuid4()}")
self.name = name
self.places = places
[docs]
def consistency_boundary(self) -> Selector:
return Selector(types=[CourseRegistered], tags=[self.course_id])
[docs]
def execute(self) -> None:
self.trigger_event(
CourseRegistered,
tags=[self.course_id],
course_id=self.course_id,
name=self.name,
places=self.places,
)
[docs]
class UpdateCourseName(Slice[Decision]):
[docs]
def __init__(self, course_id: CourseID, name: str) -> None:
self.id = course_id
self.name = name
self.course_was_registered: bool = False
[docs]
def consistency_boundary(self) -> Selector:
return Selector(types=[CourseRegistered, CourseNameUpdated], tags=[self.id])
@event(CourseRegistered)
def _(self) -> None:
self.course_was_registered = True
[docs]
def execute(self) -> None:
assert self.course_was_registered
self.trigger_event(
CourseNameUpdated,
tags=[self.id],
course_id=self.id,
name=self.name,
)
[docs]
class UpdatePlaces(Slice[Decision]):
[docs]
def __init__(self, course_id: CourseID, places: int) -> None:
self.id = course_id
self.places = places
self.course_was_registered: bool = False
[docs]
def consistency_boundary(self) -> Selector:
return Selector(types=[CourseRegistered, CoursePlacesUpdated], tags=[self.id])
@event(CourseRegistered)
def _(self) -> None:
self.course_was_registered = True
[docs]
def execute(self) -> None:
assert self.course_was_registered
self.trigger_event(
CoursePlacesUpdated,
tags=[self.id],
course_id=self.id,
places=self.places,
)
[docs]
class StudentJoinsCourse(Slice[Decision]):
[docs]
def __init__(self, student_id: StudentID, course_id: CourseID) -> None:
self.student_id = student_id
self.course_id = course_id
self.course_was_registered = False
self.student_was_registered = False
self.student_max_courses = 0
self.course_places = 0
self.students_on_course: list[StudentID] = []
self.courses_for_student: list[CourseID] = []
[docs]
def consistency_boundary(self) -> list[Selector]:
return [
Selector(
types=[
StudentRegistered,
StudentMaxCoursesUpdated,
StudentJoinedCourse,
StudentLeftCourse,
],
tags=[self.student_id],
),
Selector(
types=[
CourseRegistered,
CoursePlacesUpdated,
StudentJoinedCourse,
StudentLeftCourse,
],
tags=[self.course_id],
),
]
@event(StudentRegistered)
def _(self, max_courses: int) -> None:
self.student_was_registered = True
self.student_max_courses = max_courses
@event(CourseRegistered)
def _(self, places: int) -> None:
self.course_was_registered = True
self.course_places = places
@event(StudentJoinedCourse)
def _(self, student_id: StudentID, course_id: CourseID) -> None:
if student_id == self.student_id:
self.courses_for_student.append(course_id)
if course_id == self.course_id:
self.students_on_course.append(student_id)
@event(StudentLeftCourse)
def _(self, student_id: StudentID, course_id: CourseID) -> None:
if student_id == self.student_id:
self.courses_for_student.remove(course_id)
if course_id == self.course_id:
self.students_on_course.remove(student_id)
@event(StudentMaxCoursesUpdated)
def _(self, max_courses: int) -> None:
self.student_max_courses = max_courses
@event(CoursePlacesUpdated)
def _(self, places: int) -> None:
self.course_places = places
[docs]
def execute(self) -> None:
if not self.course_was_registered:
raise CourseNotFoundError(self.course_id)
if not self.student_was_registered:
raise StudentNotFoundError(self.student_id)
if len(self.students_on_course) >= self.course_places:
raise FullyBookedError(self.course_id)
if len(self.courses_for_student) >= self.student_max_courses:
raise TooManyCoursesError(self.student_id)
if self.student_id in self.students_on_course:
raise AlreadyJoinedError((self.student_id, self.course_id))
self.trigger_event(
StudentJoinedCourse,
tags=[self.student_id, self.course_id],
student_id=self.student_id,
course_id=self.course_id,
)
[docs]
class StudentLeavesCourse(Slice[Decision]):
[docs]
def __init__(self, student_id: StudentID, course_id: CourseID) -> None:
self.student_id = student_id
self.course_id = course_id
self.course_was_registered = False
self.student_was_registered = False
self.students_on_course: list[StudentID] = []
self.courses_for_student: list[CourseID] = []
[docs]
def consistency_boundary(self) -> list[Selector]:
return [
Selector(
types=[StudentRegistered, StudentJoinedCourse, StudentLeftCourse],
tags=[self.student_id],
),
Selector(
types=[CourseRegistered, StudentJoinedCourse, StudentLeftCourse],
tags=[self.course_id],
),
]
@event(StudentRegistered)
def _(self) -> None:
self.student_was_registered = True
@event(CourseRegistered)
def _(self) -> None:
self.course_was_registered = True
@event(StudentJoinedCourse)
def _(self, student_id: StudentID, course_id: CourseID) -> None:
if student_id == self.student_id:
self.courses_for_student.append(course_id)
if course_id == self.course_id:
self.students_on_course.append(student_id)
@event(StudentLeftCourse)
def _(self, student_id: StudentID, course_id: CourseID) -> None:
if student_id == self.student_id:
self.courses_for_student.remove(course_id)
if course_id == self.course_id:
self.students_on_course.remove(student_id)
[docs]
def execute(self) -> None:
if not self.course_was_registered:
raise CourseNotFoundError
if not self.student_was_registered:
raise StudentNotFoundError
if self.student_id not in self.students_on_course:
raise NotAlreadyJoinedError
self.trigger_event(
StudentLeftCourse,
tags=[self.student_id, self.course_id],
student_id=self.student_id,
course_id=self.course_id,
)
[docs]
class StudentsIDs(Slice[Decision]):
[docs]
def __init__(self, course_id: CourseID) -> None:
self.course_id = course_id
self.student_ids: list[StudentID] = []
[docs]
def consistency_boundary(self) -> Selector:
return Selector(types=type(self).projected_types, tags=[self.course_id])
@event(StudentJoinedCourse)
def _(self, student_id: StudentID) -> None:
self.student_ids.append(student_id)
@event(StudentLeftCourse)
def _(self, student_id: StudentID) -> None:
self.student_ids.remove(student_id)
[docs]
class StudentNames(Slice[Decision]):
[docs]
def __init__(self, student_ids: list[StudentID]) -> None:
self.student_id_names: dict[StudentID, str | None] = dict.fromkeys(
student_ids, None
)
[docs]
def consistency_boundary(self) -> list[Selector]:
return [
Selector(types=type(self).projected_types, tags=[student_id])
for student_id in self.student_id_names
]
@event(StudentRegistered)
def _(self, student_id: StudentID, name: str) -> None:
self.student_id_names[student_id] = name
@event(StudentNameUpdated)
def _(self, student_id: StudentID, name: str) -> None:
self.student_id_names[student_id] = name
@property
def names(self) -> list[str]:
return [n for n in self.student_id_names.values() if n]
[docs]
class CourseIDs(Slice[Decision]):
[docs]
def __init__(self, student_id: StudentID) -> None:
self.student_id = student_id
self.course_ids: list[CourseID] = []
[docs]
def consistency_boundary(self) -> Selector:
return Selector(types=type(self).projected_types, tags=[self.student_id])
@event(StudentJoinedCourse)
def _(self, course_id: CourseID) -> None:
self.course_ids.append(course_id)
@event(StudentLeftCourse)
def _(self, course_id: CourseID) -> None:
self.course_ids.remove(course_id)
[docs]
class CourseNames(Slice[Decision]):
[docs]
def __init__(self, course_ids: list[CourseID]) -> None:
self.course_id_names: dict[CourseID, str | None] = dict.fromkeys(
course_ids, None
)
[docs]
def consistency_boundary(self) -> list[Selector]:
return [
Selector(types=type(self).projected_types, tags=[student_id])
for student_id in self.course_id_names
]
@event(CourseRegistered)
def _(self, course_id: CourseID, name: str) -> None:
self.course_id_names[course_id] = name
@event(CourseNameUpdated)
def _(self, course_id: CourseID, name: str) -> None:
self.course_id_names[course_id] = name
@property
def names(self) -> list[str]:
return [n for n in self.course_id_names.values() if n]
[docs]
class Student(Slice[Decision]):
[docs]
def __init__(self, student_id: StudentID) -> None:
self.student_was_registered: bool = False
self.id = student_id
self.name: str = ""
self.max_courses: int = 0
self.course_ids: list[CourseID] = []
[docs]
def consistency_boundary(self) -> Selector:
return Selector(tags=[self.id])
@event(StudentRegistered)
def _(self, name: str, max_courses: int) -> None:
self.student_was_registered = True
self.name = name
self.max_courses = max_courses
@event(StudentNameUpdated)
def _(self, name: str) -> None:
self.name = name
@event(StudentMaxCoursesUpdated)
def _(self, max_courses: int) -> None:
self.max_courses = max_courses
@event(StudentJoinedCourse)
def _(self, course_id: CourseID) -> None:
self.course_ids.append(course_id)
@event(StudentLeftCourse)
def _(self, course_id: CourseID) -> None:
self.course_ids.remove(course_id)
[docs]
class Course(Slice[Decision]):
[docs]
def __init__(self, course_id: CourseID) -> None:
self.course_was_registered: bool = False
self.id = course_id
self.name: str = ""
self.places = 0
self.student_ids: list[StudentID] = []
[docs]
def consistency_boundary(self) -> Selector:
return Selector(tags=[self.id])
@event(CourseRegistered)
def _(self, name: str, places: int) -> None:
self.course_was_registered = True
self.name = name
self.places = places
@event(CourseNameUpdated)
def _(self, name: str) -> None:
self.name = name
@event(CoursePlacesUpdated)
def _(self, places: int) -> None:
self.places = places
@event(StudentJoinedCourse)
def _(self, student_id: StudentID) -> None:
self.student_ids.append(student_id)
@event(StudentLeftCourse)
def _(self, student_id: StudentID) -> None:
self.student_ids.remove(student_id)
[docs]
class EnrolmentWithVerticalSlices(DCBApplication, EnrolmentInterface):
env: Mapping[str, str] = {
"MAPPER_TOPIC": get_topic(MessagePackMapper),
**DCBApplication.env,
}
[docs]
def register_student(self, name: str, max_courses: int) -> StudentID:
return self.do(RegisterStudent(name, max_courses)).student_id
[docs]
def register_course(self, name: str, places: int) -> CourseID:
return self.do(RegisterCourse(name, places)).course_id
[docs]
def join_course(self, student_id: StudentID, course_id: CourseID) -> None:
self.do(StudentJoinsCourse(student_id, course_id))
[docs]
def leave_course(self, student_id: StudentID, course_id: CourseID) -> None:
self.do(StudentLeavesCourse(student_id, course_id))
[docs]
def list_students_for_course(self, course_id: CourseID) -> list[str]:
return self.do(StudentNames(self.do(StudentsIDs(course_id)).student_ids)).names
[docs]
def list_courses_for_student(self, student_id: StudentID) -> list[str]:
return self.do(CourseNames(self.do(CourseIDs(student_id)).course_ids)).names
[docs]
def update_student_name(self, student_id: StudentID, name: str) -> None:
self.do(UpdateStudentName(student_id, name))
[docs]
def update_max_courses(self, student_id: StudentID, max_courses: int) -> None:
self.do(UpdateMaxCourses(student_id, max_courses))
[docs]
def update_course_name(self, course_id: CourseID, name: str) -> None:
self.do(UpdateCourseName(course_id, name))
[docs]
def update_places(self, course_id: CourseID, places: int) -> None:
self.do(UpdatePlaces(course_id, places))
[docs]
def get_student(self, student_id: StudentID) -> Student:
return self.do(Student(student_id=student_id))
[docs]
def get_course(self, course_id: CourseID) -> Course:
return self.do(Course(course_id=course_id))
DecisionTypes = Sequence[type[Decision]]