Application 4 - Searchable timestamps

This example demonstrates how to extend the library’s application recorder classes to support retrieving aggregates at a particular point in time with both PostgreSQL and SQLite.

Application

The application class SearchableTimestampsApplication extends the BookingApplication presented in the cargo shipping example. It extends the application _record() method by setting in the processing event a list of Cargo events that will be used by the recorder to insert event timestamps into an index. It also introduces a get_cargo_at_timestamp() method that expects a tracking_id and a timestamp argument, then returns a Cargo aggregate as it was at the specified time.

from __future__ import annotations

from typing import TYPE_CHECKING, List, cast

from eventsourcing.examples.cargoshipping.application import BookingApplication
from eventsourcing.examples.cargoshipping.domainmodel import Cargo
from eventsourcing.examples.searchabletimestamps.persistence import (
    SearchableTimestampsRecorder,
)

if TYPE_CHECKING:  # pragma: nocover
    from datetime import datetime
    from uuid import UUID

    from eventsourcing.application import ProcessingEvent
    from eventsourcing.persistence import Recording


class SearchableTimestampsApplication(BookingApplication):
    def _record(self, processing_event: ProcessingEvent) -> List[Recording]:
        event_timestamps_data = [
            (e.originator_id, e.timestamp, e.originator_version)
            for e in processing_event.events
            if isinstance(e, Cargo.Event)
        ]
        processing_event.saved_kwargs["event_timestamps_data"] = event_timestamps_data
        return super()._record(processing_event)

    def get_cargo_at_timestamp(self, tracking_id: UUID, timestamp: datetime) -> Cargo:
        recorder = cast(SearchableTimestampsRecorder, self.recorder)
        version = recorder.get_version_at_timestamp(tracking_id, timestamp)
        return cast(Cargo, self.repository.get(tracking_id, version=version))

Persistence

The recorder classes SearchableTimestampsApplicationRecorder extend the PostgreSQL and SQLite ApplicationRecorder classes by creating a table that contains rows with the originator ID, timestamp, and originator version of aggregate events. The define SQL statements that insert and select from the rows of the table. They define a get_version_at_timestamp() method which returns the version of an aggregate at a particular time.

from __future__ import annotations

from abc import abstractmethod
from typing import TYPE_CHECKING

from eventsourcing.persistence import ApplicationRecorder

if TYPE_CHECKING:  # pragma: nocover
    from datetime import datetime
    from uuid import UUID


class SearchableTimestampsRecorder(ApplicationRecorder):
    @abstractmethod
    def get_version_at_timestamp(
        self, originator_id: UUID, timestamp: datetime
    ) -> int | None:
        """
        Returns originator version at timestamp for given originator ID.
        """

The application recorder classes extend the _insert_events() method by inserting rows, according to the event timestamp data passed down from the application.

The infrastructure factory classes SearchableTimestampsInfrastructureFactory extend the PostgreSQL and SQLite Factory classes by overriding the application_recorder() method so that a SearchableTimestampsApplicationRecorder is constructed as the application recorder.

PostgreSQL

from __future__ import annotations

from datetime import datetime
from typing import TYPE_CHECKING, Any, List, Tuple, cast
from uuid import UUID

from eventsourcing.domain import Aggregate
from eventsourcing.examples.searchabletimestamps.persistence import (
    SearchableTimestampsRecorder,
)
from eventsourcing.postgres import (
    Factory,
    PostgresApplicationRecorder,
    PostgresDatastore,
)

if TYPE_CHECKING:  # pragma: nocover
    from psycopg import Cursor
    from psycopg.rows import DictRow

    from eventsourcing.persistence import ApplicationRecorder, StoredEvent


class SearchableTimestampsApplicationRecorder(
    SearchableTimestampsRecorder, PostgresApplicationRecorder
):
    def __init__(
        self,
        datastore: PostgresDatastore,
        events_table_name: str = "stored_events",
        event_timestamps_table_name: str = "event_timestamps",
    ):
        self.check_table_name_length(event_timestamps_table_name, datastore.schema)
        self.event_timestamps_table_name = event_timestamps_table_name
        super().__init__(datastore, events_table_name)
        self.insert_event_timestamp_statement = (
            f"INSERT INTO {self.event_timestamps_table_name} VALUES (%s, %s, %s)"
        )
        self.select_event_timestamp_statement = (
            f"SELECT originator_version FROM {self.event_timestamps_table_name} WHERE "
            "originator_id = %s AND "
            "timestamp <= %s "
            "ORDER BY originator_version DESC "
            "LIMIT 1"
        )

    def construct_create_table_statements(self) -> List[str]:
        statements = super().construct_create_table_statements()
        statements.append(
            "CREATE TABLE IF NOT EXISTS "
            f"{self.event_timestamps_table_name} ("
            "originator_id uuid NOT NULL, "
            "timestamp timestamp with time zone, "
            "originator_version bigint NOT NULL, "
            "PRIMARY KEY "
            "(originator_id, timestamp))"
        )
        return statements

    def _insert_events(
        self,
        c: Cursor[DictRow],
        stored_events: List[StoredEvent],
        **kwargs: Any,
    ) -> None:
        # Insert event timestamps.
        event_timestamps_data = cast(
            List[Tuple[UUID, datetime, int]], kwargs.get("event_timestamps_data")
        )
        for event_timestamp_data in event_timestamps_data:
            c.execute(
                query=self.insert_event_timestamp_statement,
                params=event_timestamp_data,
                prepare=True,
            )
        super()._insert_events(c, stored_events, **kwargs)

    def get_version_at_timestamp(
        self, originator_id: UUID, timestamp: datetime
    ) -> int | None:
        with self.datastore.transaction(commit=False) as curs:
            curs.execute(
                query=self.select_event_timestamp_statement,
                params=(originator_id, timestamp),
                prepare=True,
            )
            for row in curs.fetchall():
                version = row["originator_version"]
                break
            else:
                version = Aggregate.INITIAL_VERSION - 1
            return version


class SearchableTimestampsInfrastructureFactory(Factory):
    def application_recorder(self) -> ApplicationRecorder:
        prefix = (self.datastore.schema + ".") if self.datastore.schema else ""
        prefix += self.env.name.lower() or "stored"
        events_table_name = prefix + "_events"
        event_timestamps_table_name = prefix + "_timestamps"
        recorder = SearchableTimestampsApplicationRecorder(
            datastore=self.datastore,
            events_table_name=events_table_name,
            event_timestamps_table_name=event_timestamps_table_name,
        )
        recorder.create_table()
        return recorder


del Factory

SQLite

from __future__ import annotations

from datetime import datetime
from typing import TYPE_CHECKING, Any, List, Sequence, Tuple, cast
from uuid import UUID

from eventsourcing.domain import Aggregate
from eventsourcing.examples.searchabletimestamps.persistence import (
    SearchableTimestampsRecorder,
)
from eventsourcing.sqlite import (
    Factory,
    SQLiteApplicationRecorder,
    SQLiteCursor,
    SQLiteDatastore,
)

if TYPE_CHECKING:  # pragma: nocover
    from eventsourcing.persistence import ApplicationRecorder, StoredEvent


class SearchableTimestampsApplicationRecorder(
    SearchableTimestampsRecorder, SQLiteApplicationRecorder
):
    def __init__(
        self,
        datastore: SQLiteDatastore,
        events_table_name: str = "stored_events",
        event_timestamps_table_name: str = "event_timestamps",
    ):
        self.event_timestamps_table_name = event_timestamps_table_name
        super().__init__(datastore, events_table_name)
        self.insert_event_timestamp_statement = (
            f"INSERT INTO {self.event_timestamps_table_name} VALUES (?, ?, ?)"
        )
        self.select_event_timestamp_statement = (
            f"SELECT originator_version FROM {self.event_timestamps_table_name} WHERE "
            "originator_id = ? AND "
            "timestamp <= ? "
            "ORDER BY originator_version DESC "
            "LIMIT 1"
        )

    def construct_create_table_statements(self) -> List[str]:
        statements = super().construct_create_table_statements()
        statements.append(
            "CREATE TABLE IF NOT EXISTS "
            f"{self.event_timestamps_table_name} ("
            "originator_id TEXT, "
            "timestamp timestamp, "
            "originator_version INTEGER, "
            "PRIMARY KEY "
            "(originator_id, timestamp))"
        )
        return statements

    def _insert_events(
        self,
        c: SQLiteCursor,
        stored_events: List[StoredEvent],
        **kwargs: Any,
    ) -> Sequence[int] | None:
        notification_ids = super()._insert_events(c, stored_events, **kwargs)

        # Insert event timestamps.
        event_timestamps_data = cast(
            List[Tuple[UUID, datetime, int]], kwargs["event_timestamps_data"]
        )
        for originator_id, timestamp, originator_version in event_timestamps_data:
            c.execute(
                self.insert_event_timestamp_statement,
                (originator_id.hex, timestamp, originator_version),
            )

        return notification_ids

    def get_version_at_timestamp(
        self, originator_id: UUID, timestamp: datetime
    ) -> int | None:
        with self.datastore.transaction(commit=False) as c:
            c.execute(
                self.select_event_timestamp_statement, (originator_id.hex, timestamp)
            )
            for row in c.fetchall():
                version = row["originator_version"]
                break
            else:
                version = Aggregate.INITIAL_VERSION - 1
            return version


class SearchableTimestampsInfrastructureFactory(Factory):
    def application_recorder(self) -> ApplicationRecorder:
        recorder = SearchableTimestampsApplicationRecorder(datastore=self.datastore)
        recorder.create_table()
        return recorder


del Factory

Test case

The test case SearchableTimestampsTestCase uses the application to evolve the state of a Cargo aggregate. The aggregate is then reconstructed as it was at particular times in its evolution. The test is executed twice, with the application configured for both PostgreSQL and SQLite.

from __future__ import annotations

import os
from datetime import timedelta
from time import sleep
from typing import ClassVar, Dict
from unittest import TestCase

from eventsourcing.application import AggregateNotFoundError
from eventsourcing.domain import create_utc_datetime_now
from eventsourcing.examples.cargoshipping.domainmodel import Location
from eventsourcing.examples.searchabletimestamps.application import (
    SearchableTimestampsApplication,
)
from eventsourcing.postgres import PostgresDatastore
from eventsourcing.tests.postgres_utils import drop_postgres_table


class SearchableTimestampsTestCase(TestCase):
    env: ClassVar[Dict[str, str]]

    def test(self) -> None:
        # Construct application.
        app = SearchableTimestampsApplication(env=self.env)
        timestamp0 = create_utc_datetime_now()
        sleep(1e-5)

        # Book new cargo.
        tracking_id = app.book_new_cargo(
            origin=Location["NLRTM"],
            destination=Location["USDAL"],
            arrival_deadline=create_utc_datetime_now() + timedelta(weeks=3),
        )
        timestamp1 = create_utc_datetime_now()
        sleep(1e-5)

        # Change destination.
        app.change_destination(tracking_id, destination=Location["AUMEL"])
        timestamp2 = create_utc_datetime_now()
        sleep(1e-5)

        # View the state of the cargo tracking at particular times.
        with self.assertRaises(AggregateNotFoundError):
            app.get_cargo_at_timestamp(tracking_id, timestamp0)

        cargo_at_timestamp1 = app.get_cargo_at_timestamp(tracking_id, timestamp1)
        self.assertEqual(cargo_at_timestamp1.destination, Location["USDAL"])

        cargo_at_timestamp2 = app.get_cargo_at_timestamp(tracking_id, timestamp2)
        self.assertEqual(cargo_at_timestamp2.destination, Location["AUMEL"])


class WithSQLite(SearchableTimestampsTestCase):
    env: ClassVar[Dict[str, str]] = {
        "PERSISTENCE_MODULE": "eventsourcing.examples.searchabletimestamps.sqlite",
        "SQLITE_DBNAME": ":memory:",
    }


class WithPostgreSQL(SearchableTimestampsTestCase):
    env: ClassVar[Dict[str, str]] = {
        "PERSISTENCE_MODULE": "eventsourcing.examples.searchabletimestamps.postgres"
    }

    def setUp(self) -> None:
        super().setUp()
        os.environ["POSTGRES_DBNAME"] = "eventsourcing"
        os.environ["POSTGRES_HOST"] = "127.0.0.1"
        os.environ["POSTGRES_PORT"] = "5432"
        os.environ["POSTGRES_USER"] = "eventsourcing"
        os.environ["POSTGRES_PASSWORD"] = "eventsourcing"  # noqa: S105
        self.drop_tables()

    def tearDown(self) -> None:
        self.drop_tables()
        super().tearDown()

    def drop_tables(self) -> None:
        db = PostgresDatastore(
            os.environ["POSTGRES_DBNAME"],
            os.environ["POSTGRES_HOST"],
            os.environ["POSTGRES_PORT"],
            os.environ["POSTGRES_USER"],
            os.environ["POSTGRES_PASSWORD"],
        )
        drop_postgres_table(db, "public.searchabletimestampsapplication_events")
        drop_postgres_table(db, "public.searchabletimestampsapplication_timestamps")
        db.close()


del SearchableTimestampsTestCase