Source code for examples.cargoshipping.domainmodel

from __future__ import annotations

from datetime import datetime, timedelta
from enum import Enum
from typing import cast
from uuid import UUID, uuid4

from eventsourcing.dispatch import singledispatchmethod
from eventsourcing.domain import Aggregate


[docs] class Location(Enum): """Locations in the world.""" HAMBURG = "HAMBURG" HONGKONG = "HONGKONG" NEWYORK = "NEWYORK" STOCKHOLM = "STOCKHOLM" TOKYO = "TOKYO" NLRTM = "NLRTM" USDAL = "USDAL" AUMEL = "AUMEL"
[docs] class Leg: """Leg of an itinerary.""" def __init__( self, origin: str, destination: str, voyage_number: str, ): self.origin: str = origin self.destination: str = destination self.voyage_number: str = voyage_number
[docs] class Itinerary: """An itinerary along which cargo is shipped.""" def __init__( self, origin: str, destination: str, legs: tuple[Leg, ...], ): self.origin = origin self.destination = destination self.legs = legs
[docs] class HandlingActivity(Enum): RECEIVE = "RECEIVE" LOAD = "LOAD" UNLOAD = "UNLOAD" CLAIM = "CLAIM"
# Custom static types. LegDetails = dict[str, str] ItineraryDetails = dict[str, str | list[LegDetails] | None] NextExpectedActivity = tuple[HandlingActivity, Location, str] | None # Some routes from one location to another. REGISTERED_ROUTES = { ("HONGKONG", "STOCKHOLM"): [ Itinerary( origin="HONGKONG", destination="STOCKHOLM", legs=( Leg( origin="HONGKONG", destination="NEWYORK", voyage_number="V1", ), Leg( origin="NEWYORK", destination="STOCKHOLM", voyage_number="V2", ), ), ) ], ("TOKYO", "STOCKHOLM"): [ Itinerary( origin="TOKYO", destination="STOCKHOLM", legs=( Leg( origin="TOKYO", destination="HAMBURG", voyage_number="V3", ), Leg( origin="HAMBURG", destination="STOCKHOLM", voyage_number="V4", ), ), ) ], }
[docs] class Cargo(Aggregate): """The Cargo aggregate is an event-sourced domain model aggregate that specifies the routing from origin to destination, and can track what happens to the cargo after it has been booked. """ def __init__( self, origin: Location, destination: Location, arrival_deadline: datetime, ): self._origin: Location = origin self._destination: Location = destination self._arrival_deadline: datetime = arrival_deadline self._transport_status: str = "NOT_RECEIVED" self._routing_status: str = "NOT_ROUTED" self._is_misdirected: bool = False self._estimated_time_of_arrival: datetime | None = None self._next_expected_activity: NextExpectedActivity = None self._route: Itinerary | None = None self._last_known_location: Location | None = None self._current_voyage_number: str | None = None @property def origin(self) -> Location: return self._origin @property def destination(self) -> Location: return self._destination @property def arrival_deadline(self) -> datetime: return self._arrival_deadline @property def transport_status(self) -> str: return self._transport_status @property def routing_status(self) -> str: return self._routing_status @property def is_misdirected(self) -> bool: return self._is_misdirected @property def estimated_time_of_arrival( self, ) -> datetime | None: return self._estimated_time_of_arrival @property def next_expected_activity(self) -> NextExpectedActivity: return self._next_expected_activity @property def route(self) -> Itinerary | None: return self._route @property def last_known_location(self) -> Location | None: return self._last_known_location @property def current_voyage_number(self) -> str | None: return self._current_voyage_number
[docs] @classmethod def new_booking( cls, origin: Location, destination: Location, arrival_deadline: datetime, ) -> Cargo: return cls._create( event_class=cls.BookingStarted, id=uuid4(), origin=origin, destination=destination, arrival_deadline=arrival_deadline, )
[docs] class BookingStarted(Aggregate.Created): origin: Location destination: Location arrival_deadline: datetime
[docs] class Event(Aggregate.Event):
[docs] def apply(self, aggregate: Aggregate) -> None: cast("Cargo", aggregate).when(self)
[docs] @singledispatchmethod def when(self, event: Event) -> None: """Default method to apply an aggregate event to the aggregate object."""
[docs] def change_destination(self, destination: Location) -> None: self.trigger_event( self.DestinationChanged, destination=destination, )
[docs] class DestinationChanged(Event): destination: Location
@when.register def _(self, event: Cargo.DestinationChanged) -> None: self._destination = event.destination
[docs] def assign_route(self, itinerary: Itinerary) -> None: self.trigger_event(self.RouteAssigned, route=itinerary)
[docs] class RouteAssigned(Event): route: Itinerary
@when.register def _(self, event: Cargo.RouteAssigned) -> None: self._route = event.route self._routing_status = "ROUTED" self._estimated_time_of_arrival = Cargo.Event.create_timestamp() + timedelta( weeks=1 ) self._next_expected_activity = (HandlingActivity.RECEIVE, self.origin, "") self._is_misdirected = False
[docs] def register_handling_event( self, tracking_id: UUID, voyage_number: str | None, location: Location, handling_activity: HandlingActivity, ) -> None: self.trigger_event( self.HandlingEventRegistered, tracking_id=tracking_id, voyage_number=voyage_number, location=location, handling_activity=handling_activity, )
[docs] class HandlingEventRegistered(Event): tracking_id: UUID voyage_number: str location: Location handling_activity: str
@when.register def _(self, event: Cargo.HandlingEventRegistered) -> None: assert self.route is not None if event.handling_activity == HandlingActivity.RECEIVE: self._transport_status = "IN_PORT" self._last_known_location = event.location self._next_expected_activity = ( HandlingActivity.LOAD, event.location, self.route.legs[0].voyage_number, ) elif event.handling_activity == HandlingActivity.LOAD: self._transport_status = "ONBOARD_CARRIER" self._current_voyage_number = event.voyage_number for leg in self.route.legs: if ( leg.origin == event.location.value and leg.voyage_number == event.voyage_number ): self._next_expected_activity = ( HandlingActivity.UNLOAD, Location[leg.destination], event.voyage_number, ) break else: msg = ( f"Can't find leg with origin={event.location} " f"and voyage_number={event.voyage_number}" ) raise ValueError(msg) elif event.handling_activity == HandlingActivity.UNLOAD: self._current_voyage_number = None self._last_known_location = event.location self._transport_status = "IN_PORT" if event.location == self.destination: self._next_expected_activity = ( HandlingActivity.CLAIM, event.location, "", ) elif event.location.value in [leg.destination for leg in self.route.legs]: for i, leg in enumerate(self.route.legs): if leg.voyage_number == event.voyage_number: next_leg: Leg = self.route.legs[i + 1] assert Location[next_leg.origin] == event.location self._next_expected_activity = ( HandlingActivity.LOAD, event.location, next_leg.voyage_number, ) break else: self._is_misdirected = True self._next_expected_activity = None elif event.handling_activity == HandlingActivity.CLAIM: self._next_expected_activity = None self._transport_status = "CLAIMED" else: msg = f"Unsupported handling activity: {event.handling_activity}" raise ValueError(msg)