Application 4 - Searchable timestamps¶
This example demonstrates how to extend the library’s application recorder classes to support retrieving aggregates at a particular point in time with both PostgreSQL and SQLite.
Application¶
The application class SearchableTimestampsApplication
extends the BookingApplication
presented in the cargo shipping example. It extends
the application _record()
method by setting in the processing event a list of Cargo
events that will be used by the recorder to insert event timestamps into an index. It also
introduces a get_cargo_at_timestamp()
method that expects a tracking_id
and a
timestamp
argument, then returns a Cargo
aggregate as it was at the specified time.
from __future__ import annotations
from typing import TYPE_CHECKING, cast
from eventsourcing.application import AggregateNotFoundError
from examples.cargoshipping.application import BookingApplication
from examples.cargoshipping.domainmodel import Cargo
if TYPE_CHECKING:
from datetime import datetime
from uuid import UUID
from eventsourcing.application import ProcessingEvent
from eventsourcing.persistence import Recording
from examples.searchabletimestamps.persistence import SearchableTimestampsRecorder
class CargoNotFoundError(AggregateNotFoundError):
pass
class SearchableTimestampsApplication(BookingApplication):
def _record(self, processing_event: ProcessingEvent[UUID]) -> list[Recording[UUID]]:
event_timestamps_data = [
(e.originator_id, e.timestamp, e.originator_version)
for e in processing_event.events
if isinstance(e, Cargo.Event)
]
processing_event.saved_kwargs["event_timestamps_data"] = event_timestamps_data
return super()._record(processing_event)
def get_cargo_at_timestamp(self, tracking_id: UUID, timestamp: datetime) -> Cargo:
recorder = cast("SearchableTimestampsRecorder", self.recorder)
version = recorder.get_version_at_timestamp(tracking_id, timestamp)
if version is None:
raise CargoNotFoundError((tracking_id, timestamp))
return cast("Cargo", self.repository.get(tracking_id, version=version))
Persistence¶
The recorder classes SearchableTimestampsApplicationRecorder
extend the PostgreSQL
and SQLite ApplicationRecorder
classes by creating a table that contains rows
with the originator ID, timestamp, and originator version of aggregate events. The
define SQL statements that insert and select from the rows of the table.
They define a get_version_at_timestamp()
method which returns the version of
an aggregate at a particular time.
from __future__ import annotations
from abc import abstractmethod
from typing import TYPE_CHECKING
from eventsourcing.persistence import ApplicationRecorder
if TYPE_CHECKING:
from datetime import datetime
from uuid import UUID
class SearchableTimestampsRecorder(ApplicationRecorder):
@abstractmethod
def get_version_at_timestamp(
self, originator_id: UUID, timestamp: datetime
) -> int | None:
"""Returns originator version at timestamp for given originator ID."""
The application recorder classes extend the _insert_events()
method by inserting rows,
according to the event timestamp data passed down from the application.
The infrastructure factory classes SearchableTimestampsInfrastructureFactory
extend the
PostgreSQL and SQLite Factory
classes by overriding the application_recorder()
method
so that a SearchableTimestampsApplicationRecorder
is constructed as the application recorder.
PostgreSQL¶
from __future__ import annotations
from typing import TYPE_CHECKING, Any, cast
from psycopg.sql import SQL, Identifier
from eventsourcing.postgres import (
PostgresApplicationRecorder,
PostgresDatastore,
PostgresFactory,
)
from examples.searchabletimestamps.persistence import SearchableTimestampsRecorder
if TYPE_CHECKING:
from collections.abc import Sequence
from datetime import datetime
from uuid import UUID
from psycopg import Cursor
from psycopg.rows import DictRow
from eventsourcing.persistence import ApplicationRecorder, StoredEvent
class SearchableTimestampsApplicationRecorder(
SearchableTimestampsRecorder, PostgresApplicationRecorder
):
def __init__(
self,
datastore: PostgresDatastore,
events_table_name: str = "stored_events",
event_timestamps_table_name: str = "event_timestamps",
):
super().__init__(datastore, events_table_name=events_table_name)
self.check_table_name_length(event_timestamps_table_name)
self.event_timestamps_table_name = event_timestamps_table_name
self.create_table_statements.append(
SQL(
"CREATE TABLE IF NOT EXISTS {0}.{1} ("
"originator_id uuid NOT NULL, "
"timestamp timestamp with time zone, "
"originator_version bigint NOT NULL, "
"PRIMARY KEY "
"(originator_id, timestamp))"
).format(
Identifier(self.datastore.schema),
Identifier(self.event_timestamps_table_name),
)
)
self.insert_event_timestamp_statement = SQL(
"INSERT INTO {0}.{1} VALUES (%s, %s, %s)"
).format(
Identifier(self.datastore.schema),
Identifier(self.event_timestamps_table_name),
)
self.select_event_timestamp_statement = SQL(
"SELECT originator_version FROM {0}.{1} WHERE "
"originator_id = %s AND "
"timestamp <= %s "
"ORDER BY originator_version DESC "
"LIMIT 1"
).format(
Identifier(self.datastore.schema),
Identifier(self.event_timestamps_table_name),
)
def _insert_events(
self,
curs: Cursor[DictRow],
stored_events: Sequence[StoredEvent],
**kwargs: Any,
) -> None:
# Insert event timestamps.
event_timestamps_data = cast(
"list[tuple[UUID, datetime, int]]", kwargs.get("event_timestamps_data")
)
for event_timestamp_data in event_timestamps_data:
curs.execute(
query=self.insert_event_timestamp_statement,
params=event_timestamp_data,
prepare=True,
)
super()._insert_events(curs, stored_events, **kwargs)
def get_version_at_timestamp(
self, originator_id: UUID, timestamp: datetime
) -> int | None:
with self.datastore.get_connection() as conn, conn.cursor() as curs:
curs.execute(
query=self.select_event_timestamp_statement,
params=(originator_id, timestamp),
prepare=True,
)
for row in curs.fetchall():
return row["originator_version"]
return None
class SearchableTimestampsInfrastructureFactory(PostgresFactory):
def application_recorder(self) -> ApplicationRecorder:
prefix = self.env.name.lower() or "stored"
events_table_name = prefix + "_events"
event_timestamps_table_name = prefix + "_timestamps"
recorder = SearchableTimestampsApplicationRecorder(
datastore=self.datastore,
events_table_name=events_table_name,
event_timestamps_table_name=event_timestamps_table_name,
)
recorder.create_table()
return recorder
del PostgresFactory
SQLite¶
from __future__ import annotations
import datetime
import sqlite3
from typing import TYPE_CHECKING, Any, cast
from eventsourcing.sqlite import (
SQLiteApplicationRecorder,
SQLiteCursor,
SQLiteDatastore,
SQLiteFactory,
)
from examples.searchabletimestamps.persistence import SearchableTimestampsRecorder
if TYPE_CHECKING:
from collections.abc import Sequence
from uuid import UUID
from eventsourcing.persistence import ApplicationRecorder, StoredEvent
def adapt_date_iso(val: datetime.date) -> str:
"""Adapt datetime.date to ISO 8601 date."""
return val.isoformat()
def adapt_datetime_iso(val: datetime.datetime) -> str:
"""Adapt datetime.datetime to timezone-naive ISO 8601 date."""
return val.isoformat()
def adapt_datetime_epoch(val: datetime.datetime) -> int:
"""Adapt datetime.datetime to Unix timestamp."""
return int(val.timestamp())
sqlite3.register_adapter(datetime.date, adapt_date_iso)
sqlite3.register_adapter(datetime.datetime, adapt_datetime_iso)
# sqlite3.register_adapter(datetime.datetime, adapt_datetime_epoch)
def convert_date(val: bytes) -> datetime.date:
"""Convert ISO 8601 date to datetime.date object."""
return datetime.date.fromisoformat(val.decode())
def convert_datetime(val: bytes) -> datetime.datetime:
"""Convert ISO 8601 datetime to datetime.datetime object."""
return datetime.datetime.fromisoformat(val.decode())
def convert_timestamp(val: bytes) -> datetime.datetime:
"""Convert Unix epoch timestamp to datetime.datetime object."""
return datetime.datetime.fromtimestamp(int(val), datetime.timezone.utc)
sqlite3.register_converter("date", convert_date)
sqlite3.register_converter("datetime", convert_datetime)
sqlite3.register_converter("timestamp", convert_timestamp)
class SearchableTimestampsApplicationRecorder(
SearchableTimestampsRecorder, SQLiteApplicationRecorder
):
def __init__(
self,
datastore: SQLiteDatastore,
events_table_name: str = "stored_events",
event_timestamps_table_name: str = "event_timestamps",
):
self.event_timestamps_table_name = event_timestamps_table_name
super().__init__(datastore, events_table_name)
self.insert_event_timestamp_statement = (
f"INSERT INTO {self.event_timestamps_table_name} VALUES (?, ?, ?)"
)
self.select_event_timestamp_statement = (
f"SELECT originator_version FROM {self.event_timestamps_table_name} WHERE "
"originator_id = ? AND "
"timestamp <= ? "
"ORDER BY originator_version DESC "
"LIMIT 1"
)
def construct_create_table_statements(self) -> list[str]:
statements = super().construct_create_table_statements()
statements.append(
"CREATE TABLE IF NOT EXISTS "
f"{self.event_timestamps_table_name} ("
"originator_id TEXT, "
"timestamp timestamp, "
"originator_version INTEGER, "
"PRIMARY KEY "
"(originator_id, timestamp))"
)
return statements
def _insert_events(
self,
c: SQLiteCursor,
stored_events: Sequence[StoredEvent],
**kwargs: Any,
) -> Sequence[int] | None:
notification_ids = super()._insert_events(c, stored_events, **kwargs)
# Insert event timestamps.
event_timestamps_data = cast(
"list[tuple[UUID, datetime.datetime, int]]", kwargs["event_timestamps_data"]
)
for originator_id, timestamp, originator_version in event_timestamps_data:
c.execute(
self.insert_event_timestamp_statement,
(originator_id.hex, timestamp, originator_version),
)
return notification_ids
def get_version_at_timestamp(
self, originator_id: UUID, timestamp: datetime.datetime
) -> int | None:
with self.datastore.transaction(commit=False) as c:
c.execute(
self.select_event_timestamp_statement, (originator_id.hex, timestamp)
)
for row in c.fetchall():
return row["originator_version"]
return None
class SearchableTimestampsInfrastructureFactory(SQLiteFactory):
def application_recorder(self) -> ApplicationRecorder:
recorder = SearchableTimestampsApplicationRecorder(datastore=self.datastore)
recorder.create_table()
return recorder
del SQLiteFactory
Test case¶
The test case SearchableTimestampsTestCase
uses the application to evolve the
state of a Cargo
aggregate. The aggregate is then reconstructed as it was at
particular times in its evolution. The test is executed twice, with the application
configured for both PostgreSQL and SQLite.
from __future__ import annotations
import os
from datetime import timedelta
from time import sleep
from typing import ClassVar
from unittest import TestCase
from eventsourcing.domain import datetime_now_with_tzinfo
from eventsourcing.tests.postgres_utils import drop_tables
from examples.cargoshipping.domainmodel import Location
from examples.searchabletimestamps.application import (
CargoNotFoundError,
SearchableTimestampsApplication,
)
class SearchableTimestampsTestCase(TestCase):
env: ClassVar[dict[str, str]]
def test(self) -> None:
# Construct application.
app = SearchableTimestampsApplication(env=self.env)
timestamp0 = datetime_now_with_tzinfo()
sleep(1e-5)
# Book new cargo.
tracking_id = app.book_new_cargo(
origin=Location["NLRTM"],
destination=Location["USDAL"],
arrival_deadline=datetime_now_with_tzinfo() + timedelta(weeks=3),
)
timestamp1 = datetime_now_with_tzinfo()
sleep(1e-5)
# Change destination.
app.change_destination(tracking_id, destination=Location["AUMEL"])
timestamp2 = datetime_now_with_tzinfo()
sleep(1e-5)
# View the state of the cargo tracking at particular times.
with self.assertRaises(CargoNotFoundError):
app.get_cargo_at_timestamp(tracking_id, timestamp0)
cargo_at_timestamp1 = app.get_cargo_at_timestamp(tracking_id, timestamp1)
self.assertEqual(cargo_at_timestamp1.destination, Location["USDAL"])
cargo_at_timestamp2 = app.get_cargo_at_timestamp(tracking_id, timestamp2)
self.assertEqual(cargo_at_timestamp2.destination, Location["AUMEL"])
class WithSQLite(SearchableTimestampsTestCase):
env: ClassVar[dict[str, str]] = {
"PERSISTENCE_MODULE": "examples.searchabletimestamps.sqlite",
"SQLITE_DBNAME": ":memory:",
}
class WithPostgreSQL(SearchableTimestampsTestCase):
env: ClassVar[dict[str, str]] = {
"PERSISTENCE_MODULE": "examples.searchabletimestamps.postgres"
}
def setUp(self) -> None:
drop_tables()
super().setUp()
os.environ["POSTGRES_DBNAME"] = "eventsourcing"
os.environ["POSTGRES_HOST"] = "127.0.0.1"
os.environ["POSTGRES_PORT"] = "5432"
os.environ["POSTGRES_USER"] = "eventsourcing"
os.environ["POSTGRES_PASSWORD"] = "eventsourcing" # noqa: S105
def tearDown(self) -> None:
super().tearDown()
drop_tables()
del SearchableTimestampsTestCase