Source code for examples.cargoshipping.domainmodel

from __future__ import annotations

from datetime import datetime, timedelta
from enum import Enum
from typing import Optional, Union, cast
from uuid import UUID, uuid4

from eventsourcing.dispatch import singledispatchmethod
from eventsourcing.domain import Aggregate


[docs] class Location(Enum): """Locations in the world.""" HAMBURG = "HAMBURG" HONGKONG = "HONGKONG" NEWYORK = "NEWYORK" STOCKHOLM = "STOCKHOLM" TOKYO = "TOKYO" NLRTM = "NLRTM" USDAL = "USDAL" AUMEL = "AUMEL"
[docs] class Leg: """Leg of an itinerary.""" def __init__( self, origin: str, destination: str, voyage_number: str, ): self.origin: str = origin self.destination: str = destination self.voyage_number: str = voyage_number
[docs] class Itinerary: """An itinerary along which cargo is shipped.""" def __init__( self, origin: str, destination: str, legs: tuple[Leg, ...], ): self.origin = origin self.destination = destination self.legs = legs
[docs] class HandlingActivity(Enum): RECEIVE = "RECEIVE" LOAD = "LOAD" UNLOAD = "UNLOAD" CLAIM = "CLAIM"
# Custom static types. LegDetails = dict[str, str] ItineraryDetails = dict[str, Union[str, list[LegDetails]]] NextExpectedActivity = Optional[tuple[HandlingActivity, Location, str]] # Some routes from one location to another. REGISTERED_ROUTES = { ("HONGKONG", "STOCKHOLM"): [ Itinerary( origin="HONGKONG", destination="STOCKHOLM", legs=( Leg( origin="HONGKONG", destination="NEWYORK", voyage_number="V1", ), Leg( origin="NEWYORK", destination="STOCKHOLM", voyage_number="V2", ), ), ) ], ("TOKYO", "STOCKHOLM"): [ Itinerary( origin="TOKYO", destination="STOCKHOLM", legs=( Leg( origin="TOKYO", destination="HAMBURG", voyage_number="V3", ), Leg( origin="HAMBURG", destination="STOCKHOLM", voyage_number="V4", ), ), ) ], }
[docs] class Cargo(Aggregate): """The Cargo aggregate is an event-sourced domain model aggregate that specifies the routing from origin to destination, and can track what happens to the cargo after it has been booked. """ def __init__( self, origin: Location, destination: Location, arrival_deadline: datetime, ): self._origin: Location = origin self._destination: Location = destination self._arrival_deadline: datetime = arrival_deadline self._transport_status: str = "NOT_RECEIVED" self._routing_status: str = "NOT_ROUTED" self._is_misdirected: bool = False self._estimated_time_of_arrival: datetime | None = None self._next_expected_activity: NextExpectedActivity = None self._route: Itinerary | None = None self._last_known_location: Location | None = None self._current_voyage_number: str | None = None @property def origin(self) -> Location: return self._origin @property def destination(self) -> Location: return self._destination @property def arrival_deadline(self) -> datetime: return self._arrival_deadline @property def transport_status(self) -> str: return self._transport_status @property def routing_status(self) -> str: return self._routing_status @property def is_misdirected(self) -> bool: return self._is_misdirected @property def estimated_time_of_arrival( self, ) -> datetime | None: return self._estimated_time_of_arrival @property def next_expected_activity(self) -> NextExpectedActivity: return self._next_expected_activity @property def route(self) -> Itinerary | None: return self._route @property def last_known_location(self) -> Location | None: return self._last_known_location @property def current_voyage_number(self) -> str | None: return self._current_voyage_number
[docs] @classmethod def new_booking( cls, origin: Location, destination: Location, arrival_deadline: datetime, ) -> Cargo: return cls._create( event_class=cls.BookingStarted, id=uuid4(), origin=origin, destination=destination, arrival_deadline=arrival_deadline, )
[docs] class BookingStarted(Aggregate.Created): origin: Location destination: Location arrival_deadline: datetime
[docs] class Event(Aggregate.Event):
[docs] def apply(self, aggregate: Aggregate) -> None: cast("Cargo", aggregate).when(self)
[docs] @singledispatchmethod def when(self, event: Event) -> None: """Default method to apply an aggregate event to the aggregate object."""
[docs] def change_destination(self, destination: Location) -> None: self.trigger_event( self.DestinationChanged, destination=destination, )
[docs] class DestinationChanged(Event): destination: Location
@when.register def _(self, event: Cargo.DestinationChanged) -> None: self._destination = event.destination
[docs] def assign_route(self, itinerary: Itinerary) -> None: self.trigger_event(self.RouteAssigned, route=itinerary)
[docs] class RouteAssigned(Event): route: Itinerary
@when.register def _(self, event: Cargo.RouteAssigned) -> None: self._route = event.route self._routing_status = "ROUTED" self._estimated_time_of_arrival = Cargo.Event.create_timestamp() + timedelta( weeks=1 ) self._next_expected_activity = (HandlingActivity.RECEIVE, self.origin, "") self._is_misdirected = False
[docs] def register_handling_event( self, tracking_id: UUID, voyage_number: str | None, location: Location, handling_activity: HandlingActivity, ) -> None: self.trigger_event( self.HandlingEventRegistered, tracking_id=tracking_id, voyage_number=voyage_number, location=location, handling_activity=handling_activity, )
[docs] class HandlingEventRegistered(Event): tracking_id: UUID voyage_number: str location: Location handling_activity: str
@when.register def _(self, event: Cargo.HandlingEventRegistered) -> None: assert self.route is not None if event.handling_activity == HandlingActivity.RECEIVE: self._transport_status = "IN_PORT" self._last_known_location = event.location self._next_expected_activity = ( HandlingActivity.LOAD, event.location, self.route.legs[0].voyage_number, ) elif event.handling_activity == HandlingActivity.LOAD: self._transport_status = "ONBOARD_CARRIER" self._current_voyage_number = event.voyage_number for leg in self.route.legs: if ( leg.origin == event.location.value and leg.voyage_number == event.voyage_number ): self._next_expected_activity = ( HandlingActivity.UNLOAD, Location[leg.destination], event.voyage_number, ) break else: msg = ( f"Can't find leg with origin={event.location} " f"and voyage_number={event.voyage_number}" ) raise ValueError(msg) elif event.handling_activity == HandlingActivity.UNLOAD: self._current_voyage_number = None self._last_known_location = event.location self._transport_status = "IN_PORT" if event.location == self.destination: self._next_expected_activity = ( HandlingActivity.CLAIM, event.location, "", ) elif event.location.value in [leg.destination for leg in self.route.legs]: for i, leg in enumerate(self.route.legs): if leg.voyage_number == event.voyage_number: next_leg: Leg = self.route.legs[i + 1] assert Location[next_leg.origin] == event.location self._next_expected_activity = ( HandlingActivity.LOAD, event.location, next_leg.voyage_number, ) break else: self._is_misdirected = True self._next_expected_activity = None elif event.handling_activity == HandlingActivity.CLAIM: self._next_expected_activity = None self._transport_status = "CLAIMED" else: msg = f"Unsupported handling activity: {event.handling_activity}" raise ValueError(msg)