Examples

This library contains a few example applications and systems.

this page is under development — please check back soon

import unittest

Bank accounts

Test first…

class TestBankAccounts(unittest.TestCase):
    def test(self):
        app = BankAccounts()

        # Check account not found error.
        with self.assertRaises(AccountNotFoundError):
            app.get_balance(uuid4())

        # Create an account.
        account_id1 = app.open_account(
            full_name="Alice",
            email_address="alice@example.com",
        )

        # Check balance.
        self.assertEqual(app.get_balance(account_id1), Decimal("0.00"))

        # Deposit funds.
        app.deposit_funds(
            credit_account_id=account_id1,
            amount=Decimal("200.00"),
        )

        # Check balance.
        self.assertEqual(app.get_balance(account_id1), Decimal("200.00"))

        # Withdraw funds.
        app.withdraw_funds(
            debit_account_id=account_id1,
            amount=Decimal("50.00"),
        )

        # Check balance.
        self.assertEqual(app.get_balance(account_id1), Decimal("150.00"))

        # Fail to withdraw funds - insufficient funds.
        with self.assertRaises(InsufficientFundsError):
            app.withdraw_funds(
                debit_account_id=account_id1,
                amount=Decimal("151.00"),
            )

        # Check balance - should be unchanged.
        self.assertEqual(app.get_balance(account_id1), Decimal("150.00"))

        # Create another account.
        account_id2 = app.open_account(
            full_name="Bob",
            email_address="bob@example.com",
        )

        # Transfer funds.
        app.transfer_funds(
            debit_account_id=account_id1,
            credit_account_id=account_id2,
            amount=Decimal("100.00"),
        )

        # Check balances.
        self.assertEqual(app.get_balance(account_id1), Decimal("50.00"))
        self.assertEqual(app.get_balance(account_id2), Decimal("100.00"))

        # Fail to transfer funds - insufficient funds.
        with self.assertRaises(InsufficientFundsError):
            app.transfer_funds(
                debit_account_id=account_id1,
                credit_account_id=account_id2,
                amount=Decimal("1000.00"),
            )

        # Check balances - should be unchanged.
        self.assertEqual(app.get_balance(account_id1), Decimal("50.00"))
        self.assertEqual(app.get_balance(account_id2), Decimal("100.00"))

        # Close account.
        app.close_account(account_id1)

        # Fail to transfer funds - account closed.
        with self.assertRaises(AccountClosedError):
            app.transfer_funds(
                debit_account_id=account_id1,
                credit_account_id=account_id2,
                amount=Decimal("50.00"),
            )

        # Fail to transfer funds - account closed.
        with self.assertRaises(AccountClosedError):
            app.transfer_funds(
                debit_account_id=account_id2,
                credit_account_id=account_id1,
                amount=Decimal("50.00"),
            )

        # Fail to withdraw funds - account closed.
        with self.assertRaises(AccountClosedError):
            app.withdraw_funds(
                debit_account_id=account_id1,
                amount=Decimal("1.00"),
            )

        # Fail to deposit funds - account closed.
        with self.assertRaises(AccountClosedError):
            app.deposit_funds(
                credit_account_id=account_id1,
                amount=Decimal("1000.00"),
            )

        # Check balance - should be unchanged.
        self.assertEqual(app.get_balance(account_id1), Decimal("50.00"))

        # Check overdraft limit.
        self.assertEqual(
            app.get_overdraft_limit(account_id2),
            Decimal("0.00"),
        )

        # Set overdraft limit.
        app.set_overdraft_limit(
            account_id=account_id2,
            overdraft_limit=Decimal("500.00"),
        )

        # Can't set negative overdraft limit.
        with self.assertRaises(AssertionError):
            app.set_overdraft_limit(
                account_id=account_id2,
                overdraft_limit=Decimal("-500.00"),
            )

        # Check overdraft limit.
        self.assertEqual(
            app.get_overdraft_limit(account_id2),
            Decimal("500.00"),
        )

        # Withdraw funds.
        app.withdraw_funds(
            debit_account_id=account_id2,
            amount=Decimal("500.00"),
        )

        # Check balance - should be overdrawn.
        self.assertEqual(
            app.get_balance(account_id2),
            Decimal("-400.00"),
        )

        # Fail to withdraw funds - insufficient funds.
        with self.assertRaises(InsufficientFundsError):
            app.withdraw_funds(
                debit_account_id=account_id2,
                amount=Decimal("101.00"),
            )

        # Fail to set overdraft limit - account closed.
        with self.assertRaises(AccountClosedError):
            app.set_overdraft_limit(
                account_id=account_id1,
                overdraft_limit=Decimal("500.00"),
            )

The application class BankAccounts

class BankAccounts(Application):
    def open_account(self, full_name: str, email_address: str) -> UUID:
        account = BankAccount.open(
            full_name=full_name,
            email_address=email_address,
        )
        self.save(account)
        return account.id

    def get_account(self, account_id: UUID) -> BankAccount:
        try:
            aggregate = self.repository.get(account_id)
        except AggregateNotFound:
            raise AccountNotFoundError(account_id)
        else:
            assert isinstance(aggregate, BankAccount)
            return aggregate

    def get_balance(self, account_id: UUID) -> Decimal:
        account = self.get_account(account_id)
        return account.balance

    def deposit_funds(self, credit_account_id: UUID, amount: Decimal) -> None:
        account = self.get_account(credit_account_id)
        account.append_transaction(amount)
        self.save(account)

    def withdraw_funds(self, debit_account_id: UUID, amount: Decimal) -> None:
        account = self.get_account(debit_account_id)
        account.append_transaction(-amount)
        self.save(account)

    def transfer_funds(
        self,
        debit_account_id: UUID,
        credit_account_id: UUID,
        amount: Decimal,
    ) -> None:
        debit_account = self.get_account(debit_account_id)
        credit_account = self.get_account(credit_account_id)
        debit_account.append_transaction(-amount)
        credit_account.append_transaction(amount)
        self.save(debit_account, credit_account)

    def set_overdraft_limit(self, account_id: UUID, overdraft_limit: Decimal) -> None:
        account = self.get_account(account_id)
        account.set_overdraft_limit(overdraft_limit)
        self.save(account)

    def get_overdraft_limit(self, account_id: UUID) -> Decimal:
        account = self.get_account(account_id)
        return account.overdraft_limit

    def close_account(self, account_id: UUID) -> None:
        account = self.get_account(account_id)
        account.close()
        self.save(account)
class AccountNotFoundError(Exception):
    pass

The aggregate class BankAccount

class BankAccount(Aggregate):
    def __init__(self, full_name: str, email_address: str):
        self.full_name = full_name
        self.email_address = email_address
        self.balance = Decimal("0.00")
        self.overdraft_limit = Decimal("0.00")
        self.is_closed = False

    @classmethod
    def open(cls, full_name: str, email_address: str) -> "BankAccount":
        return cls._create(
            cls.Opened,
            id=uuid4(),
            full_name=full_name,
            email_address=email_address,
        )

    class Opened(AggregateCreated):
        full_name: str
        email_address: str

    def append_transaction(
        self, amount: Decimal, transaction_id: Optional[UUID] = None
    ) -> None:
        self.check_account_is_not_closed()
        self.check_has_sufficient_funds(amount)
        self.trigger_event(
            self.TransactionAppended,
            amount=amount,
            transaction_id=transaction_id,
        )

    def check_account_is_not_closed(self) -> None:
        if self.is_closed:
            raise AccountClosedError({"account_id": self.id})

    def check_has_sufficient_funds(self, amount: Decimal) -> None:
        if self.balance + amount < -self.overdraft_limit:
            raise InsufficientFundsError({"account_id": self.id})

    class TransactionAppended(AggregateEvent):
        amount: Decimal
        transaction_id: UUID

        def apply(self, aggregate: "BankAccount") -> None:
            aggregate.balance += self.amount

    def set_overdraft_limit(self, overdraft_limit: Decimal) -> None:
        assert overdraft_limit > Decimal("0.00")
        self.check_account_is_not_closed()
        self.trigger_event(
            self.OverdraftLimitSet,
            overdraft_limit=overdraft_limit,
        )

    class OverdraftLimitSet(AggregateEvent):
        overdraft_limit: Decimal

        def apply(self, aggregate: "BankAccount") -> None:
            aggregate.overdraft_limit = self.overdraft_limit

    def close(self) -> None:
        self.trigger_event(self.Closed)

    class Closed(AggregateEvent):
        def apply(self, aggregate: "BankAccount") -> None:
            aggregate.is_closed = True
class TransactionError(Exception):
    pass
class AccountClosedError(TransactionError):
    pass
class InsufficientFundsError(TransactionError):
    pass

Run the test…

suite = unittest.TestSuite()
suite.addTest(TestBankAccounts("test"))

runner = unittest.TextTestRunner()
result = runner.run(suite)

assert result.wasSuccessful()

Cargo shipping

Test first…

class TestBookingService(unittest.TestCase):
    def setUp(self) -> None:
        self.service = BookingService(BookingApplication())

    def test_admin_can_book_new_cargo(self) -> None:
        arrival_deadline = datetime.now(tz=TZINFO) + timedelta(weeks=3)

        cargo_id = self.service.book_new_cargo(
            origin="NLRTM",
            destination="USDAL",
            arrival_deadline=arrival_deadline,
        )

        cargo_details = self.service.get_cargo_details(cargo_id)
        self.assertTrue(cargo_details["id"])
        self.assertEqual(cargo_details["origin"], "NLRTM")
        self.assertEqual(cargo_details["destination"], "USDAL")

        self.service.change_destination(cargo_id, destination="AUMEL")
        cargo_details = self.service.get_cargo_details(cargo_id)
        self.assertEqual(cargo_details["destination"], "AUMEL")
        self.assertEqual(
            cargo_details["arrival_deadline"],
            arrival_deadline,
        )

    def test_scenario_cargo_from_hongkong_to_stockholm(
        self,
    ) -> None:
        # Test setup: A cargo should be shipped from
        # Hongkong to Stockholm, and it should arrive
        # in no more than two weeks.
        origin = "HONGKONG"
        destination = "STOCKHOLM"
        arrival_deadline = datetime.now(tz=TZINFO) + timedelta(weeks=2)

        # Use case 1: booking.

        # A new cargo is booked, and the unique tracking
        # id is assigned to the cargo.
        tracking_id = self.service.book_new_cargo(origin, destination, arrival_deadline)

        # The tracking id can be used to lookup the cargo
        # in the repository.
        # Important: The cargo, and thus the domain model,
        # is responsible for determining the status of the
        # cargo, whether it is on the right track or not
        # and so on. This is core domain logic. Tracking
        # the cargo basically amounts to presenting
        # information extracted from the cargo aggregate
        # in a suitable way.
        cargo_details = self.service.get_cargo_details(tracking_id)
        self.assertEqual(
            cargo_details["transport_status"],
            "NOT_RECEIVED",
        )
        self.assertEqual(cargo_details["routing_status"], "NOT_ROUTED")
        self.assertEqual(cargo_details["is_misdirected"], False)
        self.assertEqual(
            cargo_details["estimated_time_of_arrival"],
            None,
        )
        self.assertEqual(cargo_details["next_expected_activity"], None)

        # Use case 2: routing.
        #
        # A number of possible routes for this cargo is
        # requested and may be presented to the customer
        # in some way for him/her to choose from.
        # Selection could be affected by things like price
        # and time of delivery, but this test simply uses
        # an arbitrary selection to mimic that process.
        routes_details = self.service.request_possible_routes_for_cargo(tracking_id)
        route_details = select_preferred_itinerary(routes_details)

        # The cargo is then assigned to the selected
        # route, described by an itinerary.
        self.service.assign_route(tracking_id, route_details)

        cargo_details = self.service.get_cargo_details(tracking_id)
        self.assertEqual(
            cargo_details["transport_status"],
            "NOT_RECEIVED",
        )
        self.assertEqual(cargo_details["routing_status"], "ROUTED")
        self.assertEqual(cargo_details["is_misdirected"], False)
        self.assertTrue(cargo_details["estimated_time_of_arrival"])
        self.assertEqual(
            cargo_details["next_expected_activity"],
            ("RECEIVE", "HONGKONG"),
        )

        # Use case 3: handling

        # A handling event registration attempt will be
        # formed from parsing the data coming in as a
        # handling report either via the web service
        # interface or as an uploaded CSV file. The
        # handling event factory tries to create a
        # HandlingEvent from the attempt, and if the
        # factory decides that this is a plausible
        # handling event, it is stored. If the attempt
        # is invalid, for example if no cargo exists for
        # the specified tracking id, the attempt is
        # rejected.
        #
        # Handling begins: cargo is received in Hongkong.
        self.service.register_handling_event(tracking_id, None, "HONGKONG", "RECEIVE")
        cargo_details = self.service.get_cargo_details(tracking_id)
        self.assertEqual(cargo_details["transport_status"], "IN_PORT")
        self.assertEqual(
            cargo_details["last_known_location"],
            "HONGKONG",
        )
        self.assertEqual(
            cargo_details["next_expected_activity"],
            ("LOAD", "HONGKONG", "V1"),
        )

        # Load onto voyage V1.
        self.service.register_handling_event(tracking_id, "V1", "HONGKONG", "LOAD")
        cargo_details = self.service.get_cargo_details(tracking_id)
        self.assertEqual(cargo_details["current_voyage_number"], "V1")
        self.assertEqual(
            cargo_details["last_known_location"],
            "HONGKONG",
        )
        self.assertEqual(
            cargo_details["transport_status"],
            "ONBOARD_CARRIER",
        )
        self.assertEqual(
            cargo_details["next_expected_activity"],
            ("UNLOAD", "NEWYORK", "V1"),
        )

        # Incorrectly unload in Tokyo.
        self.service.register_handling_event(tracking_id, "V1", "TOKYO", "UNLOAD")
        cargo_details = self.service.get_cargo_details(tracking_id)
        self.assertEqual(cargo_details["current_voyage_number"], None)
        self.assertEqual(cargo_details["last_known_location"], "TOKYO")
        self.assertEqual(cargo_details["transport_status"], "IN_PORT")
        self.assertEqual(cargo_details["is_misdirected"], True)
        self.assertEqual(cargo_details["next_expected_activity"], None)

        # Reroute.
        routes_details = self.service.request_possible_routes_for_cargo(tracking_id)
        route_details = select_preferred_itinerary(routes_details)
        self.service.assign_route(tracking_id, route_details)

        # Load in Tokyo.
        self.service.register_handling_event(tracking_id, "V3", "TOKYO", "LOAD")
        cargo_details = self.service.get_cargo_details(tracking_id)
        self.assertEqual(cargo_details["current_voyage_number"], "V3")
        self.assertEqual(cargo_details["last_known_location"], "TOKYO")
        self.assertEqual(
            cargo_details["transport_status"],
            "ONBOARD_CARRIER",
        )
        self.assertEqual(cargo_details["is_misdirected"], False)
        self.assertEqual(
            cargo_details["next_expected_activity"],
            ("UNLOAD", "HAMBURG", "V3"),
        )

        # Unload in Hamburg.
        self.service.register_handling_event(tracking_id, "V3", "HAMBURG", "UNLOAD")
        cargo_details = self.service.get_cargo_details(tracking_id)
        self.assertEqual(cargo_details["current_voyage_number"], None)
        self.assertEqual(cargo_details["last_known_location"], "HAMBURG")
        self.assertEqual(cargo_details["transport_status"], "IN_PORT")
        self.assertEqual(cargo_details["is_misdirected"], False)
        self.assertEqual(
            cargo_details["next_expected_activity"],
            ("LOAD", "HAMBURG", "V4"),
        )

        # Load in Hamburg
        self.service.register_handling_event(tracking_id, "V4", "HAMBURG", "LOAD")
        cargo_details = self.service.get_cargo_details(tracking_id)
        self.assertEqual(cargo_details["current_voyage_number"], "V4")
        self.assertEqual(cargo_details["last_known_location"], "HAMBURG")
        self.assertEqual(
            cargo_details["transport_status"],
            "ONBOARD_CARRIER",
        )
        self.assertEqual(cargo_details["is_misdirected"], False)
        self.assertEqual(
            cargo_details["next_expected_activity"],
            ("UNLOAD", "STOCKHOLM", "V4"),
        )

        # Unload in Stockholm
        self.service.register_handling_event(tracking_id, "V4", "STOCKHOLM", "UNLOAD")
        cargo_details = self.service.get_cargo_details(tracking_id)
        self.assertEqual(cargo_details["current_voyage_number"], None)
        self.assertEqual(
            cargo_details["last_known_location"],
            "STOCKHOLM",
        )
        self.assertEqual(cargo_details["transport_status"], "IN_PORT")
        self.assertEqual(cargo_details["is_misdirected"], False)
        self.assertEqual(
            cargo_details["next_expected_activity"],
            ("CLAIM", "STOCKHOLM"),
        )

        # Finally, cargo is claimed in Stockholm.
        self.service.register_handling_event(tracking_id, None, "STOCKHOLM", "CLAIM")
        cargo_details = self.service.get_cargo_details(tracking_id)
        self.assertEqual(cargo_details["current_voyage_number"], None)
        self.assertEqual(
            cargo_details["last_known_location"],
            "STOCKHOLM",
        )
        self.assertEqual(cargo_details["transport_status"], "CLAIMED")
        self.assertEqual(cargo_details["is_misdirected"], False)
        self.assertEqual(cargo_details["next_expected_activity"], None)

Interface…

class BookingService(object):
    """
    Presents an application interface that uses
    simple types of object (str, bool, datetime).
    """

    def __init__(self, app: BookingApplication):
        self.app = app

    def book_new_cargo(
        self,
        origin: str,
        destination: str,
        arrival_deadline: datetime,
    ) -> str:
        tracking_id = self.app.book_new_cargo(
            Location[origin],
            Location[destination],
            arrival_deadline,
        )
        return str(tracking_id)

    def get_cargo_details(self, tracking_id: str) -> CargoDetails:
        cargo = self.app.get_cargo(UUID(tracking_id))

        # Present 'next_expected_activity'.
        next_expected_activity: Optional[Union[Tuple[Any, Any], Tuple[Any, Any, Any]]]
        if cargo.next_expected_activity is None:
            next_expected_activity = None
        elif len(cargo.next_expected_activity) == 2:
            next_expected_activity = (
                cargo.next_expected_activity[0].value,
                cargo.next_expected_activity[1].value,
            )
        elif len(cargo.next_expected_activity) == 3:
            next_expected_activity = (
                cargo.next_expected_activity[0].value,
                cargo.next_expected_activity[1].value,
                cargo.next_expected_activity[2],
            )
        else:
            raise Exception(
                "Invalid next expected activity: {}".format(
                    cargo.next_expected_activity
                )
            )

        # Present 'last_known_location'.
        if cargo.last_known_location is None:
            last_known_location = None
        else:
            last_known_location = cargo.last_known_location.value

        # Present the cargo details.
        return {
            "id": str(cargo.id),
            "origin": cargo.origin.value,
            "destination": cargo.destination.value,
            "arrival_deadline": cargo.arrival_deadline,
            "transport_status": cargo.transport_status,
            "routing_status": cargo.routing_status,
            "is_misdirected": cargo.is_misdirected,
            "estimated_time_of_arrival": cargo.estimated_time_of_arrival,
            "next_expected_activity": next_expected_activity,
            "last_known_location": last_known_location,
            "current_voyage_number": cargo.current_voyage_number,
        }

    def change_destination(self, tracking_id: str, destination: str) -> None:
        self.app.change_destination(UUID(tracking_id), Location[destination])

    def request_possible_routes_for_cargo(self, tracking_id: str) -> List[dict]:
        routes = self.app.request_possible_routes_for_cargo(UUID(tracking_id))
        return [self.dict_from_itinerary(route) for route in routes]

    def dict_from_itinerary(self, itinerary: Itinerary) -> ItineraryDetails:
        legs_details = []
        for leg in itinerary.legs:
            leg_details: LegDetails = {
                "origin": leg.origin,
                "destination": leg.destination,
                "voyage_number": leg.voyage_number,
            }
            legs_details.append(leg_details)
        route_details: ItineraryDetails = {
            "origin": itinerary.origin,
            "destination": itinerary.destination,
            "legs": legs_details,
        }
        return route_details

    def assign_route(
        self,
        tracking_id: str,
        route_details: ItineraryDetails,
    ) -> None:
        routes = self.app.request_possible_routes_for_cargo(UUID(tracking_id))
        for route in routes:
            if route_details == self.dict_from_itinerary(route):
                self.app.assign_route(UUID(tracking_id), route)

    def register_handling_event(
        self,
        tracking_id: str,
        voyage_number: Optional[str],
        location: str,
        handling_activity: str,
    ) -> None:
        self.app.register_handling_event(
            UUID(tracking_id),
            voyage_number,
            Location[location],
            HandlingActivity[handling_activity],
        )
def select_preferred_itinerary(
    itineraries: List[ItineraryDetails],
) -> ItineraryDetails:
    return itineraries[0]

Application…

class BookingApplication(Application):
    def register_transcodings(self, transcoder: Transcoder) -> None:
        super(BookingApplication, self).register_transcodings(transcoder)
        transcoder.register(LocationAsName())
        transcoder.register(HandlingActivityAsName())
        transcoder.register(ItineraryAsDict())
        transcoder.register(LegAsDict())

    def book_new_cargo(
        self,
        origin: Location,
        destination: Location,
        arrival_deadline: datetime,
    ) -> UUID:
        cargo = Cargo.new_booking(origin, destination, arrival_deadline)
        self.save(cargo)
        return cargo.id

    def change_destination(self, tracking_id: UUID, destination: Location) -> None:
        cargo = self.get_cargo(tracking_id)
        cargo.change_destination(destination)
        self.save(cargo)

    def request_possible_routes_for_cargo(self, tracking_id: UUID) -> List[Itinerary]:
        cargo = self.get_cargo(tracking_id)
        from_location = (cargo.last_known_location or cargo.origin).value
        to_location = cargo.destination.value
        try:
            possible_routes = REGISTERED_ROUTES[(from_location, to_location)]
        except KeyError:
            raise Exception(
                "Can't find routes from {} to {}".format(from_location, to_location)
            )

        return possible_routes

    def assign_route(self, tracking_id: UUID, itinerary: Itinerary) -> None:
        cargo = self.get_cargo(tracking_id)
        cargo.assign_route(itinerary)
        self.save(cargo)

    def register_handling_event(
        self,
        tracking_id: UUID,
        voyage_number: Optional[str],
        location: Location,
        handing_activity: HandlingActivity,
    ) -> None:
        cargo = self.get_cargo(tracking_id)
        cargo.register_handling_event(
            tracking_id,
            voyage_number,
            location,
            handing_activity,
        )
        self.save(cargo)

    def get_cargo(self, tracking_id: UUID) -> Cargo:
        cargo = self.repository.get(tracking_id)
        assert isinstance(cargo, Cargo)
        return cargo
class HandlingActivityAsName(Transcoding):
    type = HandlingActivity
    name = "handling_activity"

    def encode(self, obj: HandlingActivity) -> str:
        return obj.name

    def decode(self, data: str) -> HandlingActivity:
        assert isinstance(data, str)
        return HandlingActivity[data]
class ItineraryAsDict(Transcoding):
    type = Itinerary
    name = "itinerary"

    def encode(self, obj: Itinerary) -> dict:
        return obj.__dict__

    def decode(self, data: dict) -> Itinerary:
        assert isinstance(data, dict)
        return Itinerary(**data)
class LegAsDict(Transcoding):
    type = Leg
    name = "leg"

    def encode(self, obj: Leg) -> dict:
        return obj.__dict__

    def decode(self, data: dict) -> Leg:
        assert isinstance(data, dict)
        return Leg(**data)
class LocationAsName(Transcoding):
    type = Location
    name = "location"

    def encode(self, obj: Location) -> str:
        return obj.name

    def decode(self, data: str) -> Location:
        assert isinstance(data, str)
        return Location[data]

Domain model…

class Cargo(Aggregate):
    """
    The Cargo aggregate is an event-sourced domain model aggregate that
    specifies the routing from origin to destination, and can track what
    happens to the cargo after it has been booked.
    """

    def __init__(
        self,
        origin: Location,
        destination: Location,
        arrival_deadline: datetime,
    ):
        self._origin: Location = origin
        self._destination: Location = destination
        self._arrival_deadline: datetime = arrival_deadline
        self._transport_status: str = "NOT_RECEIVED"
        self._routing_status: str = "NOT_ROUTED"
        self._is_misdirected: bool = False
        self._estimated_time_of_arrival: Optional[datetime] = None
        self._next_expected_activity: NextExpectedActivity = None
        self._route: Optional[Itinerary] = None
        self._last_known_location: Optional[Location] = None
        self._current_voyage_number: Optional[str] = None

    @property
    def origin(self) -> Location:
        return self._origin

    @property
    def destination(self) -> Location:
        return self._destination

    @property
    def arrival_deadline(self) -> datetime:
        return self._arrival_deadline

    @property
    def transport_status(self) -> str:
        return self._transport_status

    @property
    def routing_status(self) -> str:
        return self._routing_status

    @property
    def is_misdirected(self) -> bool:
        return self._is_misdirected

    @property
    def estimated_time_of_arrival(
        self,
    ) -> Optional[datetime]:
        return self._estimated_time_of_arrival

    @property
    def next_expected_activity(self) -> Optional[Tuple]:
        return self._next_expected_activity

    @property
    def route(self) -> Optional[Itinerary]:
        return self._route

    @property
    def last_known_location(self) -> Optional[Location]:
        return self._last_known_location

    @property
    def current_voyage_number(self) -> Optional[str]:
        return self._current_voyage_number

    @classmethod
    def new_booking(
        cls,
        origin: Location,
        destination: Location,
        arrival_deadline: datetime,
    ) -> "Cargo":
        return cls._create(
            event_class=Cargo.BookingStarted,
            id=uuid4(),
            origin=origin,
            destination=destination,
            arrival_deadline=arrival_deadline,
        )

    class BookingStarted(AggregateCreated):
        origin: Location
        destination: Location
        arrival_deadline: datetime

    class Event(AggregateEvent["Cargo"]):
        def apply(self, aggregate: "Cargo") -> None:
            aggregate.apply(self)

    @singledispatchmethod
    def apply(self, event: "Cargo.Event") -> None:
        """
        Default aggregate projection.
        """

    def change_destination(self, destination: Location) -> None:
        self.trigger_event(
            self.DestinationChanged,
            destination=destination,
        )

    class DestinationChanged(Event):
        destination: Location

    @apply.register(DestinationChanged)
    def destination_changed(self, event: DestinationChanged) -> None:
        self._destination = event.destination

    def assign_route(self, itinerary: Itinerary) -> None:
        self.trigger_event(self.RouteAssigned, route=itinerary)

    class RouteAssigned(Event):
        route: Itinerary

    @apply.register(RouteAssigned)
    def route_assigned(self, event: RouteAssigned) -> None:
        self._route = event.route
        self._routing_status = "ROUTED"
        self._estimated_time_of_arrival = datetime.now(tz=TZINFO) + timedelta(weeks=1)
        self._next_expected_activity = (
            HandlingActivity.RECEIVE,
            self.origin,
        )
        self._is_misdirected = False

    def register_handling_event(
        self,
        tracking_id: UUID,
        voyage_number: Optional[str],
        location: Location,
        handling_activity: HandlingActivity,
    ) -> None:
        self.trigger_event(
            self.HandlingEventRegistered,
            tracking_id=tracking_id,
            voyage_number=voyage_number,
            location=location,
            handling_activity=handling_activity,
        )

    class HandlingEventRegistered(Event):
        tracking_id: UUID
        voyage_number: str
        location: Location
        handling_activity: str

    @apply.register(HandlingEventRegistered)
    def handling_event_registered(self, event: HandlingEventRegistered) -> None:
        assert self.route is not None
        if event.handling_activity == HandlingActivity.RECEIVE:
            self._transport_status = "IN_PORT"
            self._last_known_location = event.location
            self._next_expected_activity = (
                HandlingActivity.LOAD,
                event.location,
                self.route.legs[0].voyage_number,
            )
        elif event.handling_activity == HandlingActivity.LOAD:
            self._transport_status = "ONBOARD_CARRIER"
            self._current_voyage_number = event.voyage_number
            for leg in self.route.legs:
                if leg.origin == event.location.value:
                    if leg.voyage_number == event.voyage_number:
                        self._next_expected_activity = (
                            HandlingActivity.UNLOAD,
                            Location[leg.destination],
                            event.voyage_number,
                        )
                        break
            else:
                raise Exception(
                    "Can't find leg with origin={} and "
                    "voyage_number={}".format(
                        event.location,
                        event.voyage_number,
                    )
                )

        elif event.handling_activity == HandlingActivity.UNLOAD:
            self._current_voyage_number = None
            self._last_known_location = event.location
            self._transport_status = "IN_PORT"
            if event.location == self.destination:
                self._next_expected_activity = (
                    HandlingActivity.CLAIM,
                    event.location,
                )
            elif event.location.value in [leg.destination for leg in self.route.legs]:
                for i, leg in enumerate(self.route.legs):
                    if leg.voyage_number == event.voyage_number:
                        next_leg: Leg = self.route.legs[i + 1]
                        assert Location[next_leg.origin] == event.location
                        self._next_expected_activity = (
                            HandlingActivity.LOAD,
                            event.location,
                            next_leg.voyage_number,
                        )
                        break
            else:
                self._is_misdirected = True
                self._next_expected_activity = None

        elif event.handling_activity == HandlingActivity.CLAIM:
            self._next_expected_activity = None
            self._transport_status = "CLAIMED"

        else:
            raise Exception(
                "Unsupported handling event: {}".format(event.handling_activity)
            )
class HandlingActivity(Enum):
    RECEIVE = "RECEIVE"
    LOAD = "LOAD"
    UNLOAD = "UNLOAD"
    CLAIM = "CLAIM"
class Itinerary(object):
    """
    An itinerary along which cargo is shipped.
    """

    def __init__(
        self,
        origin: str,
        destination: str,
        legs: Tuple[Leg, ...],
    ):
        self.origin = origin
        self.destination = destination
        self.legs = legs
class Leg(object):
    """
    Leg of an itinerary.
    """

    def __init__(
        self,
        origin: str,
        destination: str,
        voyage_number: str,
    ):
        self.origin: str = origin
        self.destination: str = destination
        self.voyage_number: str = voyage_number
class Location(Enum):
    """
    Locations in the world.
    """

    HAMBURG = "HAMBURG"
    HONGKONG = "HONGKONG"
    NEWYORK = "NEWYORK"
    STOCKHOLM = "STOCKHOLM"
    TOKYO = "TOKYO"

    NLRTM = "NLRTM"
    USDAL = "USDAL"
    AUMEL = "AUMEL"

Run the test…

suite = unittest.TestSuite()
suite.addTest(TestBookingService("test_admin_can_book_new_cargo"))
suite.addTest(TestBookingService("test_scenario_cargo_from_hongkong_to_stockholm"))

runner = unittest.TextTestRunner()
result = runner.run(suite)

assert result.wasSuccessful()