Application 4 - Searchable timestamps¶
This example demonstrates how to extend the library’s application recorder classes to support retrieving aggregates at a particular point in time with both PostgreSQL and SQLite.
Application¶
The application class SearchableTimestampsApplication
extends the BookingApplication
presented in the cargo shipping example. It extends
the application _record()
method by setting in the processing event a list of Cargo
events that will be used by the recorder to insert event timestamps into an index. It also
introduces a get_cargo_at_timestamp()
method that expects a tracking_id
and a
timestamp
argument, then returns a Cargo
aggregate as it was at the specified time.
from datetime import datetime
from typing import List, cast
from uuid import UUID
from eventsourcing.application import ProcessingEvent
from eventsourcing.examples.cargoshipping.application import BookingApplication
from eventsourcing.examples.cargoshipping.domainmodel import Cargo
from eventsourcing.examples.searchabletimestamps.persistence import (
SearchableTimestampsRecorder,
)
from eventsourcing.persistence import Recording
class SearchableTimestampsApplication(BookingApplication):
def _record(self, processing_event: ProcessingEvent) -> List[Recording]:
event_timestamps_data = [
(e.originator_id, e.timestamp, e.originator_version)
for e in processing_event.events
if isinstance(e, Cargo.Event)
]
processing_event.saved_kwargs["event_timestamps_data"] = event_timestamps_data
return super()._record(processing_event)
def get_cargo_at_timestamp(self, tracking_id: UUID, timestamp: datetime) -> Cargo:
recorder = cast(SearchableTimestampsRecorder, self.recorder)
version = recorder.get_version_at_timestamp(tracking_id, timestamp)
return cast(Cargo, self.repository.get(tracking_id, version=version))
Persistence¶
The recorder classes SearchableTimestampsApplicationRecorder
extend the PostgreSQL
and SQLite ApplicationRecorder
classes by creating a table that contains rows
with the originator ID, timestamp, and originator version of aggregate events. The
define SQL statements that insert and select from the rows of the table.
They define a get_version_at_timestamp()
method which returns the version of
an aggregate at a particular time.
from abc import abstractmethod
from datetime import datetime
from typing import Optional
from uuid import UUID
from eventsourcing.persistence import ApplicationRecorder
class SearchableTimestampsRecorder(ApplicationRecorder):
@abstractmethod
def get_version_at_timestamp(
self, originator_id: UUID, timestamp: datetime
) -> Optional[int]:
"""
Returns originator version at timestamp for given originator ID.
"""
The application recorder classes extend the _insert_events()
method by inserting rows,
according to the event timestamp data passed down from the application.
The infrastructure factory classes SearchableTimestampsInfrastructureFactory
extend the
PostgreSQL and SQLite Factory
classes by overriding the application_recorder()
method
so that a SearchableTimestampsApplicationRecorder
is constructed as the application recorder.
PostgreSQL¶
from datetime import datetime
from typing import Any, List, Optional, Sequence, Tuple, cast
from uuid import UUID
from eventsourcing.domain import Aggregate
from eventsourcing.examples.searchabletimestamps.persistence import (
SearchableTimestampsRecorder,
)
from eventsourcing.persistence import ApplicationRecorder, StoredEvent
from eventsourcing.postgres import (
Factory,
PostgresApplicationRecorder,
PostgresConnection,
PostgresCursor,
PostgresDatastore,
)
class SearchableTimestampsApplicationRecorder(
SearchableTimestampsRecorder, PostgresApplicationRecorder
):
def __init__(
self,
datastore: PostgresDatastore,
events_table_name: str = "stored_events",
event_timestamps_table_name: str = "event_timestamps",
):
self.check_table_name_length(event_timestamps_table_name, datastore.schema)
self.event_timestamps_table_name = event_timestamps_table_name
super().__init__(datastore, events_table_name)
self.insert_event_timestamp_statement = (
f"INSERT INTO {self.event_timestamps_table_name} VALUES ($1, $2, $3)"
)
self.insert_event_timestamp_statement_name = (
f"insert_{event_timestamps_table_name}".replace(".", "_")
)
self.select_event_timestamp_statement = (
f"SELECT originator_version FROM {self.event_timestamps_table_name} WHERE "
f"originator_id = $1 AND "
f"timestamp <= $2 "
"ORDER BY originator_version DESC "
"LIMIT 1"
)
self.select_event_timestamp_statement_name = (
f"select_{event_timestamps_table_name}".replace(".", "_")
)
def construct_create_table_statements(self) -> List[str]:
statements = super().construct_create_table_statements()
statements.append(
"CREATE TABLE IF NOT EXISTS "
f"{self.event_timestamps_table_name} ("
"originator_id uuid NOT NULL, "
"timestamp timestamp with time zone, "
"originator_version bigint NOT NULL, "
"PRIMARY KEY "
"(originator_id, timestamp))"
)
return statements
def _prepare_insert_events(self, conn: PostgresConnection) -> None:
super()._prepare_insert_events(conn)
self._prepare(
conn,
self.insert_event_timestamp_statement_name,
self.insert_event_timestamp_statement,
)
def _insert_events(
self,
c: PostgresCursor,
stored_events: List[StoredEvent],
**kwargs: Any,
) -> Optional[Sequence[int]]:
notification_ids = super()._insert_events(c, stored_events, **kwargs)
# Insert event timestamps.
event_timestamps_data = cast(
List[Tuple[UUID, datetime, int]], kwargs.get("event_timestamps_data")
)
for event_timestamp_data in event_timestamps_data:
statement_alias = self.statement_name_aliases[
self.insert_event_timestamp_statement_name
]
c.execute(f"EXECUTE {statement_alias}(%s, %s, %s)", event_timestamp_data)
return notification_ids
def get_version_at_timestamp(
self, originator_id: UUID, timestamp: datetime
) -> Optional[int]:
with self.datastore.get_connection() as conn:
self._prepare(
conn,
self.select_event_timestamp_statement_name,
self.select_event_timestamp_statement,
)
with conn.transaction(commit=False) as curs:
statement_alias = self.statement_name_aliases[
self.select_event_timestamp_statement_name
]
curs.execute(
f"EXECUTE {statement_alias}(%s, %s)", [originator_id, timestamp]
)
for row in curs.fetchall():
return row["originator_version"]
else:
return Aggregate.INITIAL_VERSION - 1
class SearchableTimestampsInfrastructureFactory(Factory):
def application_recorder(self) -> ApplicationRecorder:
prefix = (self.datastore.schema + ".") if self.datastore.schema else ""
prefix += self.env.name.lower() or "stored"
events_table_name = prefix + "_events"
event_timestamps_table_name = prefix + "_timestamps"
recorder = SearchableTimestampsApplicationRecorder(
datastore=self.datastore,
events_table_name=events_table_name,
event_timestamps_table_name=event_timestamps_table_name,
)
recorder.create_table()
return recorder
del Factory
SQLite¶
from datetime import datetime
from typing import Any, List, Optional, Sequence, Tuple, cast
from uuid import UUID
from eventsourcing.domain import Aggregate
from eventsourcing.examples.searchabletimestamps.persistence import (
SearchableTimestampsRecorder,
)
from eventsourcing.persistence import ApplicationRecorder, StoredEvent
from eventsourcing.sqlite import (
Factory,
SQLiteApplicationRecorder,
SQLiteCursor,
SQLiteDatastore,
)
class SearchableTimestampsApplicationRecorder(
SearchableTimestampsRecorder, SQLiteApplicationRecorder
):
def __init__(
self,
datastore: SQLiteDatastore,
events_table_name: str = "stored_events",
event_timestamps_table_name: str = "event_timestamps",
):
self.event_timestamps_table_name = event_timestamps_table_name
super().__init__(datastore, events_table_name)
self.insert_event_timestamp_statement = (
f"INSERT INTO {self.event_timestamps_table_name} VALUES (?, ?, ?)"
)
self.select_event_timestamp_statement = (
f"SELECT originator_version FROM {self.event_timestamps_table_name} WHERE "
f"originator_id = ? AND "
f"timestamp <= ? "
"ORDER BY originator_version DESC "
"LIMIT 1"
)
def construct_create_table_statements(self) -> List[str]:
statements = super().construct_create_table_statements()
statements.append(
"CREATE TABLE IF NOT EXISTS "
f"{self.event_timestamps_table_name} ("
"originator_id TEXT, "
"timestamp timestamp, "
"originator_version INTEGER, "
"PRIMARY KEY "
"(originator_id, timestamp))"
)
return statements
def _insert_events(
self,
c: SQLiteCursor,
stored_events: List[StoredEvent],
**kwargs: Any,
) -> Optional[Sequence[int]]:
notification_ids = super()._insert_events(c, stored_events, **kwargs)
# Insert event timestamps.
event_timestamps_data = cast(
List[Tuple[UUID, datetime, int]], kwargs["event_timestamps_data"]
)
for originator_id, timestamp, originator_version in event_timestamps_data:
c.execute(
self.insert_event_timestamp_statement,
(originator_id.hex, timestamp, originator_version),
)
return notification_ids
def get_version_at_timestamp(
self, originator_id: UUID, timestamp: datetime
) -> Optional[int]:
with self.datastore.transaction(commit=False) as c:
c.execute(
self.select_event_timestamp_statement, (originator_id.hex, timestamp)
)
for row in c.fetchall():
return row["originator_version"]
else:
return Aggregate.INITIAL_VERSION - 1
class SearchableTimestampsInfrastructureFactory(Factory):
def application_recorder(self) -> ApplicationRecorder:
recorder = SearchableTimestampsApplicationRecorder(datastore=self.datastore)
recorder.create_table()
return recorder
del Factory
Test case¶
The test case SearchableTimestampsTestCase
uses the application to evolve the
state of a Cargo
aggregate. The aggregate is then reconstructed as it was at
particular times in its evolution. The test is executed twice, with the application
configured for both PostgreSQL and SQLite.
import os
from datetime import timedelta
from time import sleep
from typing import Dict
from unittest import TestCase
from eventsourcing.application import AggregateNotFound
from eventsourcing.domain import create_utc_datetime_now
from eventsourcing.examples.cargoshipping.domainmodel import Location
from eventsourcing.examples.searchabletimestamps.application import (
SearchableTimestampsApplication,
)
from eventsourcing.postgres import PostgresDatastore
from eventsourcing.tests.postgres_utils import drop_postgres_table
class SearchableTimestampsTestCase(TestCase):
env: Dict[str, str]
def test(self) -> None:
# Construct application.
app = SearchableTimestampsApplication(env=self.env)
timestamp0 = create_utc_datetime_now()
sleep(1e-5)
# Book new cargo.
tracking_id = app.book_new_cargo(
origin=Location["NLRTM"],
destination=Location["USDAL"],
arrival_deadline=create_utc_datetime_now() + timedelta(weeks=3),
)
timestamp1 = create_utc_datetime_now()
sleep(1e-5)
# Change destination.
app.change_destination(tracking_id, destination=Location["AUMEL"])
timestamp2 = create_utc_datetime_now()
sleep(1e-5)
# View the state of the cargo tracking at particular times.
with self.assertRaises(AggregateNotFound):
app.get_cargo_at_timestamp(tracking_id, timestamp0)
cargo_at_timestamp1 = app.get_cargo_at_timestamp(tracking_id, timestamp1)
self.assertEqual(cargo_at_timestamp1.destination, Location["USDAL"])
cargo_at_timestamp2 = app.get_cargo_at_timestamp(tracking_id, timestamp2)
self.assertEqual(cargo_at_timestamp2.destination, Location["AUMEL"])
class WithSQLite(SearchableTimestampsTestCase):
env = {
"PERSISTENCE_MODULE": "eventsourcing.examples.searchabletimestamps.sqlite",
"SQLITE_DBNAME": ":memory:",
}
class WithPostgreSQL(SearchableTimestampsTestCase):
env = {"PERSISTENCE_MODULE": "eventsourcing.examples.searchabletimestamps.postgres"}
def setUp(self) -> None:
super().setUp()
os.environ["POSTGRES_DBNAME"] = "eventsourcing"
os.environ["POSTGRES_HOST"] = "127.0.0.1"
os.environ["POSTGRES_PORT"] = "5432"
os.environ["POSTGRES_USER"] = "eventsourcing"
os.environ["POSTGRES_PASSWORD"] = "eventsourcing"
self.drop_tables()
def tearDown(self) -> None:
self.drop_tables()
super().tearDown()
def drop_tables(self) -> None:
db = PostgresDatastore(
os.environ["POSTGRES_DBNAME"],
os.environ["POSTGRES_HOST"],
os.environ["POSTGRES_PORT"],
os.environ["POSTGRES_USER"],
os.environ["POSTGRES_PASSWORD"],
)
drop_postgres_table(db, "public.searchabletimestampsapplication_events")
drop_postgres_table(db, "public.searchabletimestampsapplication_timestamps")
db.close()
del SearchableTimestampsTestCase