Examples¶
This library contains a few example applications and systems.
this page is under development — please check back soon
import unittest
Bank accounts¶
Test first…
class TestBankAccounts(unittest.TestCase):
def test(self):
app = BankAccounts()
# Check account not found error.
with self.assertRaises(AccountNotFoundError):
app.get_balance(uuid4())
# Create an account.
account_id1 = app.open_account(
full_name="Alice",
email_address="alice@example.com",
)
# Check balance.
self.assertEqual(app.get_balance(account_id1), Decimal("0.00"))
# Deposit funds.
app.deposit_funds(
credit_account_id=account_id1,
amount=Decimal("200.00"),
)
# Check balance.
self.assertEqual(app.get_balance(account_id1), Decimal("200.00"))
# Withdraw funds.
app.withdraw_funds(
debit_account_id=account_id1,
amount=Decimal("50.00"),
)
# Check balance.
self.assertEqual(app.get_balance(account_id1), Decimal("150.00"))
# Fail to withdraw funds - insufficient funds.
with self.assertRaises(InsufficientFundsError):
app.withdraw_funds(
debit_account_id=account_id1,
amount=Decimal("151.00"),
)
# Check balance - should be unchanged.
self.assertEqual(app.get_balance(account_id1), Decimal("150.00"))
# Create another account.
account_id2 = app.open_account(
full_name="Bob",
email_address="bob@example.com",
)
# Transfer funds.
app.transfer_funds(
debit_account_id=account_id1,
credit_account_id=account_id2,
amount=Decimal("100.00"),
)
# Check balances.
self.assertEqual(app.get_balance(account_id1), Decimal("50.00"))
self.assertEqual(app.get_balance(account_id2), Decimal("100.00"))
# Fail to transfer funds - insufficient funds.
with self.assertRaises(InsufficientFundsError):
app.transfer_funds(
debit_account_id=account_id1,
credit_account_id=account_id2,
amount=Decimal("1000.00"),
)
# Check balances - should be unchanged.
self.assertEqual(app.get_balance(account_id1), Decimal("50.00"))
self.assertEqual(app.get_balance(account_id2), Decimal("100.00"))
# Close account.
app.close_account(account_id1)
# Fail to transfer funds - account closed.
with self.assertRaises(AccountClosedError):
app.transfer_funds(
debit_account_id=account_id1,
credit_account_id=account_id2,
amount=Decimal("50.00"),
)
# Fail to transfer funds - account closed.
with self.assertRaises(AccountClosedError):
app.transfer_funds(
debit_account_id=account_id2,
credit_account_id=account_id1,
amount=Decimal("50.00"),
)
# Fail to withdraw funds - account closed.
with self.assertRaises(AccountClosedError):
app.withdraw_funds(
debit_account_id=account_id1,
amount=Decimal("1.00"),
)
# Fail to deposit funds - account closed.
with self.assertRaises(AccountClosedError):
app.deposit_funds(
credit_account_id=account_id1,
amount=Decimal("1000.00"),
)
# Check balance - should be unchanged.
self.assertEqual(app.get_balance(account_id1), Decimal("50.00"))
# Check overdraft limit.
self.assertEqual(
app.get_overdraft_limit(account_id2),
Decimal("0.00"),
)
# Set overdraft limit.
app.set_overdraft_limit(
account_id=account_id2,
overdraft_limit=Decimal("500.00"),
)
# Can't set negative overdraft limit.
with self.assertRaises(AssertionError):
app.set_overdraft_limit(
account_id=account_id2,
overdraft_limit=Decimal("-500.00"),
)
# Check overdraft limit.
self.assertEqual(
app.get_overdraft_limit(account_id2),
Decimal("500.00"),
)
# Withdraw funds.
app.withdraw_funds(
debit_account_id=account_id2,
amount=Decimal("500.00"),
)
# Check balance - should be overdrawn.
self.assertEqual(
app.get_balance(account_id2),
Decimal("-400.00"),
)
# Fail to withdraw funds - insufficient funds.
with self.assertRaises(InsufficientFundsError):
app.withdraw_funds(
debit_account_id=account_id2,
amount=Decimal("101.00"),
)
# Fail to set overdraft limit - account closed.
with self.assertRaises(AccountClosedError):
app.set_overdraft_limit(
account_id=account_id1,
overdraft_limit=Decimal("500.00"),
)
The application class BankAccounts
…
class BankAccounts(Application):
def open_account(self, full_name: str, email_address: str) -> UUID:
account = BankAccount.open(
full_name=full_name,
email_address=email_address,
)
self.save(account)
return account.id
def get_account(self, account_id: UUID) -> BankAccount:
try:
aggregate = self.repository.get(account_id)
except AggregateNotFound:
raise AccountNotFoundError(account_id)
else:
assert isinstance(aggregate, BankAccount)
return aggregate
def get_balance(self, account_id: UUID) -> Decimal:
account = self.get_account(account_id)
return account.balance
def deposit_funds(self, credit_account_id: UUID, amount: Decimal) -> None:
account = self.get_account(credit_account_id)
account.append_transaction(amount)
self.save(account)
def withdraw_funds(self, debit_account_id: UUID, amount: Decimal) -> None:
account = self.get_account(debit_account_id)
account.append_transaction(-amount)
self.save(account)
def transfer_funds(
self,
debit_account_id: UUID,
credit_account_id: UUID,
amount: Decimal,
) -> None:
debit_account = self.get_account(debit_account_id)
credit_account = self.get_account(credit_account_id)
debit_account.append_transaction(-amount)
credit_account.append_transaction(amount)
self.save(debit_account, credit_account)
def set_overdraft_limit(self, account_id: UUID, overdraft_limit: Decimal) -> None:
account = self.get_account(account_id)
account.set_overdraft_limit(overdraft_limit)
self.save(account)
def get_overdraft_limit(self, account_id: UUID) -> Decimal:
account = self.get_account(account_id)
return account.overdraft_limit
def close_account(self, account_id: UUID) -> None:
account = self.get_account(account_id)
account.close()
self.save(account)
class AccountNotFoundError(Exception):
pass
The aggregate class BankAccount
…
class BankAccount(Aggregate):
def __init__(self, full_name: str, email_address: str):
self.full_name = full_name
self.email_address = email_address
self.balance = Decimal("0.00")
self.overdraft_limit = Decimal("0.00")
self.is_closed = False
@classmethod
def open(cls, full_name: str, email_address: str) -> "BankAccount":
return cls._create(
cls.Opened,
id=uuid4(),
full_name=full_name,
email_address=email_address,
)
class Opened(AggregateCreated):
full_name: str
email_address: str
def append_transaction(
self, amount: Decimal, transaction_id: Optional[UUID] = None
) -> None:
self.check_account_is_not_closed()
self.check_has_sufficient_funds(amount)
self.trigger_event(
self.TransactionAppended,
amount=amount,
transaction_id=transaction_id,
)
def check_account_is_not_closed(self) -> None:
if self.is_closed:
raise AccountClosedError({"account_id": self.id})
def check_has_sufficient_funds(self, amount: Decimal) -> None:
if self.balance + amount < -self.overdraft_limit:
raise InsufficientFundsError({"account_id": self.id})
class TransactionAppended(AggregateEvent):
amount: Decimal
transaction_id: UUID
def apply(self, aggregate: "BankAccount") -> None:
aggregate.balance += self.amount
def set_overdraft_limit(self, overdraft_limit: Decimal) -> None:
assert overdraft_limit > Decimal("0.00")
self.check_account_is_not_closed()
self.trigger_event(
self.OverdraftLimitSet,
overdraft_limit=overdraft_limit,
)
class OverdraftLimitSet(AggregateEvent):
overdraft_limit: Decimal
def apply(self, aggregate: "BankAccount") -> None:
aggregate.overdraft_limit = self.overdraft_limit
def close(self) -> None:
self.trigger_event(self.Closed)
class Closed(AggregateEvent):
def apply(self, aggregate: "BankAccount") -> None:
aggregate.is_closed = True
class TransactionError(Exception):
pass
class AccountClosedError(TransactionError):
pass
class InsufficientFundsError(TransactionError):
pass
Run the test…
suite = unittest.TestSuite()
suite.addTest(TestBankAccounts("test"))
runner = unittest.TextTestRunner()
result = runner.run(suite)
assert result.wasSuccessful()
Cargo shipping¶
Test first…
class TestBookingService(unittest.TestCase):
def setUp(self) -> None:
self.service = BookingService(BookingApplication())
def test_admin_can_book_new_cargo(self) -> None:
arrival_deadline = datetime.now(tz=TZINFO) + timedelta(weeks=3)
cargo_id = self.service.book_new_cargo(
origin="NLRTM",
destination="USDAL",
arrival_deadline=arrival_deadline,
)
cargo_details = self.service.get_cargo_details(cargo_id)
self.assertTrue(cargo_details["id"])
self.assertEqual(cargo_details["origin"], "NLRTM")
self.assertEqual(cargo_details["destination"], "USDAL")
self.service.change_destination(cargo_id, destination="AUMEL")
cargo_details = self.service.get_cargo_details(cargo_id)
self.assertEqual(cargo_details["destination"], "AUMEL")
self.assertEqual(
cargo_details["arrival_deadline"],
arrival_deadline,
)
def test_scenario_cargo_from_hongkong_to_stockholm(
self,
) -> None:
# Test setup: A cargo should be shipped from
# Hongkong to Stockholm, and it should arrive
# in no more than two weeks.
origin = "HONGKONG"
destination = "STOCKHOLM"
arrival_deadline = datetime.now(tz=TZINFO) + timedelta(weeks=2)
# Use case 1: booking.
# A new cargo is booked, and the unique tracking
# id is assigned to the cargo.
tracking_id = self.service.book_new_cargo(origin, destination, arrival_deadline)
# The tracking id can be used to lookup the cargo
# in the repository.
# Important: The cargo, and thus the domain model,
# is responsible for determining the status of the
# cargo, whether it is on the right track or not
# and so on. This is core domain logic. Tracking
# the cargo basically amounts to presenting
# information extracted from the cargo aggregate
# in a suitable way.
cargo_details = self.service.get_cargo_details(tracking_id)
self.assertEqual(
cargo_details["transport_status"],
"NOT_RECEIVED",
)
self.assertEqual(cargo_details["routing_status"], "NOT_ROUTED")
self.assertEqual(cargo_details["is_misdirected"], False)
self.assertEqual(
cargo_details["estimated_time_of_arrival"],
None,
)
self.assertEqual(cargo_details["next_expected_activity"], None)
# Use case 2: routing.
#
# A number of possible routes for this cargo is
# requested and may be presented to the customer
# in some way for him/her to choose from.
# Selection could be affected by things like price
# and time of delivery, but this test simply uses
# an arbitrary selection to mimic that process.
routes_details = self.service.request_possible_routes_for_cargo(tracking_id)
route_details = select_preferred_itinerary(routes_details)
# The cargo is then assigned to the selected
# route, described by an itinerary.
self.service.assign_route(tracking_id, route_details)
cargo_details = self.service.get_cargo_details(tracking_id)
self.assertEqual(
cargo_details["transport_status"],
"NOT_RECEIVED",
)
self.assertEqual(cargo_details["routing_status"], "ROUTED")
self.assertEqual(cargo_details["is_misdirected"], False)
self.assertTrue(cargo_details["estimated_time_of_arrival"])
self.assertEqual(
cargo_details["next_expected_activity"],
("RECEIVE", "HONGKONG"),
)
# Use case 3: handling
# A handling event registration attempt will be
# formed from parsing the data coming in as a
# handling report either via the web service
# interface or as an uploaded CSV file. The
# handling event factory tries to create a
# HandlingEvent from the attempt, and if the
# factory decides that this is a plausible
# handling event, it is stored. If the attempt
# is invalid, for example if no cargo exists for
# the specified tracking id, the attempt is
# rejected.
#
# Handling begins: cargo is received in Hongkong.
self.service.register_handling_event(tracking_id, None, "HONGKONG", "RECEIVE")
cargo_details = self.service.get_cargo_details(tracking_id)
self.assertEqual(cargo_details["transport_status"], "IN_PORT")
self.assertEqual(
cargo_details["last_known_location"],
"HONGKONG",
)
self.assertEqual(
cargo_details["next_expected_activity"],
("LOAD", "HONGKONG", "V1"),
)
# Load onto voyage V1.
self.service.register_handling_event(tracking_id, "V1", "HONGKONG", "LOAD")
cargo_details = self.service.get_cargo_details(tracking_id)
self.assertEqual(cargo_details["current_voyage_number"], "V1")
self.assertEqual(
cargo_details["last_known_location"],
"HONGKONG",
)
self.assertEqual(
cargo_details["transport_status"],
"ONBOARD_CARRIER",
)
self.assertEqual(
cargo_details["next_expected_activity"],
("UNLOAD", "NEWYORK", "V1"),
)
# Incorrectly unload in Tokyo.
self.service.register_handling_event(tracking_id, "V1", "TOKYO", "UNLOAD")
cargo_details = self.service.get_cargo_details(tracking_id)
self.assertEqual(cargo_details["current_voyage_number"], None)
self.assertEqual(cargo_details["last_known_location"], "TOKYO")
self.assertEqual(cargo_details["transport_status"], "IN_PORT")
self.assertEqual(cargo_details["is_misdirected"], True)
self.assertEqual(cargo_details["next_expected_activity"], None)
# Reroute.
routes_details = self.service.request_possible_routes_for_cargo(tracking_id)
route_details = select_preferred_itinerary(routes_details)
self.service.assign_route(tracking_id, route_details)
# Load in Tokyo.
self.service.register_handling_event(tracking_id, "V3", "TOKYO", "LOAD")
cargo_details = self.service.get_cargo_details(tracking_id)
self.assertEqual(cargo_details["current_voyage_number"], "V3")
self.assertEqual(cargo_details["last_known_location"], "TOKYO")
self.assertEqual(
cargo_details["transport_status"],
"ONBOARD_CARRIER",
)
self.assertEqual(cargo_details["is_misdirected"], False)
self.assertEqual(
cargo_details["next_expected_activity"],
("UNLOAD", "HAMBURG", "V3"),
)
# Unload in Hamburg.
self.service.register_handling_event(tracking_id, "V3", "HAMBURG", "UNLOAD")
cargo_details = self.service.get_cargo_details(tracking_id)
self.assertEqual(cargo_details["current_voyage_number"], None)
self.assertEqual(cargo_details["last_known_location"], "HAMBURG")
self.assertEqual(cargo_details["transport_status"], "IN_PORT")
self.assertEqual(cargo_details["is_misdirected"], False)
self.assertEqual(
cargo_details["next_expected_activity"],
("LOAD", "HAMBURG", "V4"),
)
# Load in Hamburg
self.service.register_handling_event(tracking_id, "V4", "HAMBURG", "LOAD")
cargo_details = self.service.get_cargo_details(tracking_id)
self.assertEqual(cargo_details["current_voyage_number"], "V4")
self.assertEqual(cargo_details["last_known_location"], "HAMBURG")
self.assertEqual(
cargo_details["transport_status"],
"ONBOARD_CARRIER",
)
self.assertEqual(cargo_details["is_misdirected"], False)
self.assertEqual(
cargo_details["next_expected_activity"],
("UNLOAD", "STOCKHOLM", "V4"),
)
# Unload in Stockholm
self.service.register_handling_event(tracking_id, "V4", "STOCKHOLM", "UNLOAD")
cargo_details = self.service.get_cargo_details(tracking_id)
self.assertEqual(cargo_details["current_voyage_number"], None)
self.assertEqual(
cargo_details["last_known_location"],
"STOCKHOLM",
)
self.assertEqual(cargo_details["transport_status"], "IN_PORT")
self.assertEqual(cargo_details["is_misdirected"], False)
self.assertEqual(
cargo_details["next_expected_activity"],
("CLAIM", "STOCKHOLM"),
)
# Finally, cargo is claimed in Stockholm.
self.service.register_handling_event(tracking_id, None, "STOCKHOLM", "CLAIM")
cargo_details = self.service.get_cargo_details(tracking_id)
self.assertEqual(cargo_details["current_voyage_number"], None)
self.assertEqual(
cargo_details["last_known_location"],
"STOCKHOLM",
)
self.assertEqual(cargo_details["transport_status"], "CLAIMED")
self.assertEqual(cargo_details["is_misdirected"], False)
self.assertEqual(cargo_details["next_expected_activity"], None)
Interface…
class BookingService(object):
"""
Presents an application interface that uses
simple types of object (str, bool, datetime).
"""
def __init__(self, app: BookingApplication):
self.app = app
def book_new_cargo(
self,
origin: str,
destination: str,
arrival_deadline: datetime,
) -> str:
tracking_id = self.app.book_new_cargo(
Location[origin],
Location[destination],
arrival_deadline,
)
return str(tracking_id)
def get_cargo_details(self, tracking_id: str) -> CargoDetails:
cargo = self.app.get_cargo(UUID(tracking_id))
# Present 'next_expected_activity'.
next_expected_activity: Optional[Union[Tuple[Any, Any], Tuple[Any, Any, Any]]]
if cargo.next_expected_activity is None:
next_expected_activity = None
elif len(cargo.next_expected_activity) == 2:
next_expected_activity = (
cargo.next_expected_activity[0].value,
cargo.next_expected_activity[1].value,
)
elif len(cargo.next_expected_activity) == 3:
next_expected_activity = (
cargo.next_expected_activity[0].value,
cargo.next_expected_activity[1].value,
cargo.next_expected_activity[2],
)
else:
raise Exception(
"Invalid next expected activity: {}".format(
cargo.next_expected_activity
)
)
# Present 'last_known_location'.
if cargo.last_known_location is None:
last_known_location = None
else:
last_known_location = cargo.last_known_location.value
# Present the cargo details.
return {
"id": str(cargo.id),
"origin": cargo.origin.value,
"destination": cargo.destination.value,
"arrival_deadline": cargo.arrival_deadline,
"transport_status": cargo.transport_status,
"routing_status": cargo.routing_status,
"is_misdirected": cargo.is_misdirected,
"estimated_time_of_arrival": cargo.estimated_time_of_arrival,
"next_expected_activity": next_expected_activity,
"last_known_location": last_known_location,
"current_voyage_number": cargo.current_voyage_number,
}
def change_destination(self, tracking_id: str, destination: str) -> None:
self.app.change_destination(UUID(tracking_id), Location[destination])
def request_possible_routes_for_cargo(self, tracking_id: str) -> List[dict]:
routes = self.app.request_possible_routes_for_cargo(UUID(tracking_id))
return [self.dict_from_itinerary(route) for route in routes]
def dict_from_itinerary(self, itinerary: Itinerary) -> ItineraryDetails:
legs_details = []
for leg in itinerary.legs:
leg_details: LegDetails = {
"origin": leg.origin,
"destination": leg.destination,
"voyage_number": leg.voyage_number,
}
legs_details.append(leg_details)
route_details: ItineraryDetails = {
"origin": itinerary.origin,
"destination": itinerary.destination,
"legs": legs_details,
}
return route_details
def assign_route(
self,
tracking_id: str,
route_details: ItineraryDetails,
) -> None:
routes = self.app.request_possible_routes_for_cargo(UUID(tracking_id))
for route in routes:
if route_details == self.dict_from_itinerary(route):
self.app.assign_route(UUID(tracking_id), route)
def register_handling_event(
self,
tracking_id: str,
voyage_number: Optional[str],
location: str,
handling_activity: str,
) -> None:
self.app.register_handling_event(
UUID(tracking_id),
voyage_number,
Location[location],
HandlingActivity[handling_activity],
)
def select_preferred_itinerary(
itineraries: List[ItineraryDetails],
) -> ItineraryDetails:
return itineraries[0]
Application…
class BookingApplication(Application):
def register_transcodings(self, transcoder: Transcoder) -> None:
super(BookingApplication, self).register_transcodings(transcoder)
transcoder.register(LocationAsName())
transcoder.register(HandlingActivityAsName())
transcoder.register(ItineraryAsDict())
transcoder.register(LegAsDict())
def book_new_cargo(
self,
origin: Location,
destination: Location,
arrival_deadline: datetime,
) -> UUID:
cargo = Cargo.new_booking(origin, destination, arrival_deadline)
self.save(cargo)
return cargo.id
def change_destination(self, tracking_id: UUID, destination: Location) -> None:
cargo = self.get_cargo(tracking_id)
cargo.change_destination(destination)
self.save(cargo)
def request_possible_routes_for_cargo(self, tracking_id: UUID) -> List[Itinerary]:
cargo = self.get_cargo(tracking_id)
from_location = (cargo.last_known_location or cargo.origin).value
to_location = cargo.destination.value
try:
possible_routes = REGISTERED_ROUTES[(from_location, to_location)]
except KeyError:
raise Exception(
"Can't find routes from {} to {}".format(from_location, to_location)
)
return possible_routes
def assign_route(self, tracking_id: UUID, itinerary: Itinerary) -> None:
cargo = self.get_cargo(tracking_id)
cargo.assign_route(itinerary)
self.save(cargo)
def register_handling_event(
self,
tracking_id: UUID,
voyage_number: Optional[str],
location: Location,
handing_activity: HandlingActivity,
) -> None:
cargo = self.get_cargo(tracking_id)
cargo.register_handling_event(
tracking_id,
voyage_number,
location,
handing_activity,
)
self.save(cargo)
def get_cargo(self, tracking_id: UUID) -> Cargo:
cargo = self.repository.get(tracking_id)
assert isinstance(cargo, Cargo)
return cargo
class HandlingActivityAsName(Transcoding):
type = HandlingActivity
name = "handling_activity"
def encode(self, obj: HandlingActivity) -> str:
return obj.name
def decode(self, data: str) -> HandlingActivity:
assert isinstance(data, str)
return HandlingActivity[data]
class ItineraryAsDict(Transcoding):
type = Itinerary
name = "itinerary"
def encode(self, obj: Itinerary) -> dict:
return obj.__dict__
def decode(self, data: dict) -> Itinerary:
assert isinstance(data, dict)
return Itinerary(**data)
class LegAsDict(Transcoding):
type = Leg
name = "leg"
def encode(self, obj: Leg) -> dict:
return obj.__dict__
def decode(self, data: dict) -> Leg:
assert isinstance(data, dict)
return Leg(**data)
class LocationAsName(Transcoding):
type = Location
name = "location"
def encode(self, obj: Location) -> str:
return obj.name
def decode(self, data: str) -> Location:
assert isinstance(data, str)
return Location[data]
Domain model…
class Cargo(Aggregate):
"""
The Cargo aggregate is an event-sourced domain model aggregate that
specifies the routing from origin to destination, and can track what
happens to the cargo after it has been booked.
"""
def __init__(
self,
origin: Location,
destination: Location,
arrival_deadline: datetime,
):
self._origin: Location = origin
self._destination: Location = destination
self._arrival_deadline: datetime = arrival_deadline
self._transport_status: str = "NOT_RECEIVED"
self._routing_status: str = "NOT_ROUTED"
self._is_misdirected: bool = False
self._estimated_time_of_arrival: Optional[datetime] = None
self._next_expected_activity: NextExpectedActivity = None
self._route: Optional[Itinerary] = None
self._last_known_location: Optional[Location] = None
self._current_voyage_number: Optional[str] = None
@property
def origin(self) -> Location:
return self._origin
@property
def destination(self) -> Location:
return self._destination
@property
def arrival_deadline(self) -> datetime:
return self._arrival_deadline
@property
def transport_status(self) -> str:
return self._transport_status
@property
def routing_status(self) -> str:
return self._routing_status
@property
def is_misdirected(self) -> bool:
return self._is_misdirected
@property
def estimated_time_of_arrival(
self,
) -> Optional[datetime]:
return self._estimated_time_of_arrival
@property
def next_expected_activity(self) -> Optional[Tuple]:
return self._next_expected_activity
@property
def route(self) -> Optional[Itinerary]:
return self._route
@property
def last_known_location(self) -> Optional[Location]:
return self._last_known_location
@property
def current_voyage_number(self) -> Optional[str]:
return self._current_voyage_number
@classmethod
def new_booking(
cls,
origin: Location,
destination: Location,
arrival_deadline: datetime,
) -> "Cargo":
return cls._create(
event_class=Cargo.BookingStarted,
id=uuid4(),
origin=origin,
destination=destination,
arrival_deadline=arrival_deadline,
)
class BookingStarted(AggregateCreated):
origin: Location
destination: Location
arrival_deadline: datetime
class Event(AggregateEvent["Cargo"]):
def apply(self, aggregate: "Cargo") -> None:
aggregate.apply(self)
@singledispatchmethod
def apply(self, event: "Cargo.Event") -> None:
"""
Default aggregate projection.
"""
def change_destination(self, destination: Location) -> None:
self.trigger_event(
self.DestinationChanged,
destination=destination,
)
class DestinationChanged(Event):
destination: Location
@apply.register(DestinationChanged)
def destination_changed(self, event: DestinationChanged) -> None:
self._destination = event.destination
def assign_route(self, itinerary: Itinerary) -> None:
self.trigger_event(self.RouteAssigned, route=itinerary)
class RouteAssigned(Event):
route: Itinerary
@apply.register(RouteAssigned)
def route_assigned(self, event: RouteAssigned) -> None:
self._route = event.route
self._routing_status = "ROUTED"
self._estimated_time_of_arrival = datetime.now(tz=TZINFO) + timedelta(weeks=1)
self._next_expected_activity = (
HandlingActivity.RECEIVE,
self.origin,
)
self._is_misdirected = False
def register_handling_event(
self,
tracking_id: UUID,
voyage_number: Optional[str],
location: Location,
handling_activity: HandlingActivity,
) -> None:
self.trigger_event(
self.HandlingEventRegistered,
tracking_id=tracking_id,
voyage_number=voyage_number,
location=location,
handling_activity=handling_activity,
)
class HandlingEventRegistered(Event):
tracking_id: UUID
voyage_number: str
location: Location
handling_activity: str
@apply.register(HandlingEventRegistered)
def handling_event_registered(self, event: HandlingEventRegistered) -> None:
assert self.route is not None
if event.handling_activity == HandlingActivity.RECEIVE:
self._transport_status = "IN_PORT"
self._last_known_location = event.location
self._next_expected_activity = (
HandlingActivity.LOAD,
event.location,
self.route.legs[0].voyage_number,
)
elif event.handling_activity == HandlingActivity.LOAD:
self._transport_status = "ONBOARD_CARRIER"
self._current_voyage_number = event.voyage_number
for leg in self.route.legs:
if leg.origin == event.location.value:
if leg.voyage_number == event.voyage_number:
self._next_expected_activity = (
HandlingActivity.UNLOAD,
Location[leg.destination],
event.voyage_number,
)
break
else:
raise Exception(
"Can't find leg with origin={} and "
"voyage_number={}".format(
event.location,
event.voyage_number,
)
)
elif event.handling_activity == HandlingActivity.UNLOAD:
self._current_voyage_number = None
self._last_known_location = event.location
self._transport_status = "IN_PORT"
if event.location == self.destination:
self._next_expected_activity = (
HandlingActivity.CLAIM,
event.location,
)
elif event.location.value in [leg.destination for leg in self.route.legs]:
for i, leg in enumerate(self.route.legs):
if leg.voyage_number == event.voyage_number:
next_leg: Leg = self.route.legs[i + 1]
assert Location[next_leg.origin] == event.location
self._next_expected_activity = (
HandlingActivity.LOAD,
event.location,
next_leg.voyage_number,
)
break
else:
self._is_misdirected = True
self._next_expected_activity = None
elif event.handling_activity == HandlingActivity.CLAIM:
self._next_expected_activity = None
self._transport_status = "CLAIMED"
else:
raise Exception(
"Unsupported handling event: {}".format(event.handling_activity)
)
class HandlingActivity(Enum):
RECEIVE = "RECEIVE"
LOAD = "LOAD"
UNLOAD = "UNLOAD"
CLAIM = "CLAIM"
class Itinerary(object):
"""
An itinerary along which cargo is shipped.
"""
def __init__(
self,
origin: str,
destination: str,
legs: Tuple[Leg, ...],
):
self.origin = origin
self.destination = destination
self.legs = legs
class Leg(object):
"""
Leg of an itinerary.
"""
def __init__(
self,
origin: str,
destination: str,
voyage_number: str,
):
self.origin: str = origin
self.destination: str = destination
self.voyage_number: str = voyage_number
class Location(Enum):
"""
Locations in the world.
"""
HAMBURG = "HAMBURG"
HONGKONG = "HONGKONG"
NEWYORK = "NEWYORK"
STOCKHOLM = "STOCKHOLM"
TOKYO = "TOKYO"
NLRTM = "NLRTM"
USDAL = "USDAL"
AUMEL = "AUMEL"
Run the test…
suite = unittest.TestSuite()
suite.addTest(TestBookingService("test_admin_can_book_new_cargo"))
suite.addTest(TestBookingService("test_scenario_cargo_from_hongkong_to_stockholm"))
runner = unittest.TextTestRunner()
result = runner.run(suite)
assert result.wasSuccessful()