Application 4 - Searchable timestamps

This example demonstrates how to extend the library’s application recorder classes to support retrieving aggregates at a particular point in time with both PostgreSQL and SQLite.

Application

The application class SearchableTimestampsApplication extends the BookingApplication presented in the cargo shipping example. It extends the application _record() method by setting in the processing event a list of Cargo events that will be used by the recorder to insert event timestamps into an index. It also introduces a get_cargo_at_timestamp() method that expects a tracking_id and a timestamp argument, then returns a Cargo aggregate as it was at the specified time.

from datetime import datetime
from typing import List, cast
from uuid import UUID

from eventsourcing.application import ProcessingEvent
from eventsourcing.examples.cargoshipping.application import BookingApplication
from eventsourcing.examples.cargoshipping.domainmodel import Cargo
from eventsourcing.examples.searchabletimestamps.persistence import (
    SearchableTimestampsRecorder,
)
from eventsourcing.persistence import Recording


class SearchableTimestampsApplication(BookingApplication):
    def _record(self, processing_event: ProcessingEvent) -> List[Recording]:
        event_timestamps_data = [
            (e.originator_id, e.timestamp, e.originator_version)
            for e in processing_event.events
            if isinstance(e, Cargo.Event)
        ]
        processing_event.saved_kwargs["event_timestamps_data"] = event_timestamps_data
        return super()._record(processing_event)

    def get_cargo_at_timestamp(self, tracking_id: UUID, timestamp: datetime) -> Cargo:
        recorder = cast(SearchableTimestampsRecorder, self.recorder)
        version = recorder.get_version_at_timestamp(tracking_id, timestamp)
        return cast(Cargo, self.repository.get(tracking_id, version=version))

Persistence

The recorder classes SearchableTimestampsApplicationRecorder extend the PostgreSQL and SQLite ApplicationRecorder classes by creating a table that contains rows with the originator ID, timestamp, and originator version of aggregate events. The define SQL statements that insert and select from the rows of the table. They define a get_version_at_timestamp() method which returns the version of an aggregate at a particular time.

from abc import abstractmethod
from datetime import datetime
from typing import Optional
from uuid import UUID

from eventsourcing.persistence import ApplicationRecorder


class SearchableTimestampsRecorder(ApplicationRecorder):
    @abstractmethod
    def get_version_at_timestamp(
        self, originator_id: UUID, timestamp: datetime
    ) -> Optional[int]:
        """
        Returns originator version at timestamp for given originator ID.
        """

The application recorder classes extend the _insert_events() method by inserting rows, according to the event timestamp data passed down from the application.

The infrastructure factory classes SearchableTimestampsInfrastructureFactory extend the PostgreSQL and SQLite Factory classes by overriding the application_recorder() method so that a SearchableTimestampsApplicationRecorder is constructed as the application recorder.

PostgreSQL

from datetime import datetime
from typing import Any, List, Optional, Sequence, Tuple, cast
from uuid import UUID

from eventsourcing.domain import Aggregate
from eventsourcing.examples.searchabletimestamps.persistence import (
    SearchableTimestampsRecorder,
)
from eventsourcing.persistence import ApplicationRecorder, StoredEvent
from eventsourcing.postgres import (
    Factory,
    PostgresApplicationRecorder,
    PostgresConnection,
    PostgresCursor,
    PostgresDatastore,
)


class SearchableTimestampsApplicationRecorder(
    SearchableTimestampsRecorder, PostgresApplicationRecorder
):
    def __init__(
        self,
        datastore: PostgresDatastore,
        events_table_name: str = "stored_events",
        event_timestamps_table_name: str = "event_timestamps",
    ):
        self.check_table_name_length(event_timestamps_table_name, datastore.schema)
        self.event_timestamps_table_name = event_timestamps_table_name
        super().__init__(datastore, events_table_name)
        self.insert_event_timestamp_statement = (
            f"INSERT INTO {self.event_timestamps_table_name} VALUES ($1, $2, $3)"
        )
        self.insert_event_timestamp_statement_name = (
            f"insert_{event_timestamps_table_name}".replace(".", "_")
        )
        self.select_event_timestamp_statement = (
            f"SELECT originator_version FROM {self.event_timestamps_table_name} WHERE "
            f"originator_id = $1 AND "
            f"timestamp <= $2 "
            "ORDER BY originator_version DESC "
            "LIMIT 1"
        )

        self.select_event_timestamp_statement_name = (
            f"select_{event_timestamps_table_name}".replace(".", "_")
        )

    def construct_create_table_statements(self) -> List[str]:
        statements = super().construct_create_table_statements()
        statements.append(
            "CREATE TABLE IF NOT EXISTS "
            f"{self.event_timestamps_table_name} ("
            "originator_id uuid NOT NULL, "
            "timestamp timestamp with time zone, "
            "originator_version bigint NOT NULL, "
            "PRIMARY KEY "
            "(originator_id, timestamp))"
        )
        return statements

    def _prepare_insert_events(self, conn: PostgresConnection) -> None:
        super()._prepare_insert_events(conn)
        self._prepare(
            conn,
            self.insert_event_timestamp_statement_name,
            self.insert_event_timestamp_statement,
        )

    def _insert_events(
        self,
        c: PostgresCursor,
        stored_events: List[StoredEvent],
        **kwargs: Any,
    ) -> Optional[Sequence[int]]:
        notification_ids = super()._insert_events(c, stored_events, **kwargs)

        # Insert event timestamps.
        event_timestamps_data = cast(
            List[Tuple[UUID, datetime, int]], kwargs.get("event_timestamps_data")
        )
        for event_timestamp_data in event_timestamps_data:
            statement_alias = self.statement_name_aliases[
                self.insert_event_timestamp_statement_name
            ]
            c.execute(f"EXECUTE {statement_alias}(%s, %s, %s)", event_timestamp_data)
        return notification_ids

    def get_version_at_timestamp(
        self, originator_id: UUID, timestamp: datetime
    ) -> Optional[int]:
        with self.datastore.get_connection() as conn:
            self._prepare(
                conn,
                self.select_event_timestamp_statement_name,
                self.select_event_timestamp_statement,
            )
            with conn.transaction(commit=False) as curs:
                statement_alias = self.statement_name_aliases[
                    self.select_event_timestamp_statement_name
                ]
                curs.execute(
                    f"EXECUTE {statement_alias}(%s, %s)", [originator_id, timestamp]
                )
                for row in curs.fetchall():
                    return row["originator_version"]
                else:
                    return Aggregate.INITIAL_VERSION - 1


class SearchableTimestampsInfrastructureFactory(Factory):
    def application_recorder(self) -> ApplicationRecorder:
        prefix = (self.datastore.schema + ".") if self.datastore.schema else ""
        prefix += self.env.name.lower() or "stored"
        events_table_name = prefix + "_events"
        event_timestamps_table_name = prefix + "_timestamps"
        recorder = SearchableTimestampsApplicationRecorder(
            datastore=self.datastore,
            events_table_name=events_table_name,
            event_timestamps_table_name=event_timestamps_table_name,
        )
        recorder.create_table()
        return recorder


del Factory

SQLite

from datetime import datetime
from typing import Any, List, Optional, Sequence, Tuple, cast
from uuid import UUID

from eventsourcing.domain import Aggregate
from eventsourcing.examples.searchabletimestamps.persistence import (
    SearchableTimestampsRecorder,
)
from eventsourcing.persistence import ApplicationRecorder, StoredEvent
from eventsourcing.sqlite import (
    Factory,
    SQLiteApplicationRecorder,
    SQLiteCursor,
    SQLiteDatastore,
)


class SearchableTimestampsApplicationRecorder(
    SearchableTimestampsRecorder, SQLiteApplicationRecorder
):
    def __init__(
        self,
        datastore: SQLiteDatastore,
        events_table_name: str = "stored_events",
        event_timestamps_table_name: str = "event_timestamps",
    ):
        self.event_timestamps_table_name = event_timestamps_table_name
        super().__init__(datastore, events_table_name)
        self.insert_event_timestamp_statement = (
            f"INSERT INTO {self.event_timestamps_table_name} VALUES (?, ?, ?)"
        )
        self.select_event_timestamp_statement = (
            f"SELECT originator_version FROM {self.event_timestamps_table_name} WHERE "
            f"originator_id = ? AND "
            f"timestamp <= ? "
            "ORDER BY originator_version DESC "
            "LIMIT 1"
        )

    def construct_create_table_statements(self) -> List[str]:
        statements = super().construct_create_table_statements()
        statements.append(
            "CREATE TABLE IF NOT EXISTS "
            f"{self.event_timestamps_table_name} ("
            "originator_id TEXT, "
            "timestamp timestamp, "
            "originator_version INTEGER, "
            "PRIMARY KEY "
            "(originator_id, timestamp))"
        )
        return statements

    def _insert_events(
        self,
        c: SQLiteCursor,
        stored_events: List[StoredEvent],
        **kwargs: Any,
    ) -> Optional[Sequence[int]]:
        notification_ids = super()._insert_events(c, stored_events, **kwargs)

        # Insert event timestamps.
        event_timestamps_data = cast(
            List[Tuple[UUID, datetime, int]], kwargs["event_timestamps_data"]
        )
        for originator_id, timestamp, originator_version in event_timestamps_data:
            c.execute(
                self.insert_event_timestamp_statement,
                (originator_id.hex, timestamp, originator_version),
            )

        return notification_ids

    def get_version_at_timestamp(
        self, originator_id: UUID, timestamp: datetime
    ) -> Optional[int]:
        with self.datastore.transaction(commit=False) as c:
            c.execute(
                self.select_event_timestamp_statement, (originator_id.hex, timestamp)
            )
            for row in c.fetchall():
                return row["originator_version"]
            else:
                return Aggregate.INITIAL_VERSION - 1


class SearchableTimestampsInfrastructureFactory(Factory):
    def application_recorder(self) -> ApplicationRecorder:
        recorder = SearchableTimestampsApplicationRecorder(datastore=self.datastore)
        recorder.create_table()
        return recorder


del Factory

Test case

The test case SearchableTimestampsTestCase uses the application to evolve the state of a Cargo aggregate. The aggregate is then reconstructed as it was at particular times in its evolution. The test is executed twice, with the application configured for both PostgreSQL and SQLite.

import os
from datetime import timedelta
from time import sleep
from typing import Dict
from unittest import TestCase

from eventsourcing.application import AggregateNotFound
from eventsourcing.domain import create_utc_datetime_now
from eventsourcing.examples.cargoshipping.domainmodel import Location
from eventsourcing.examples.searchabletimestamps.application import (
    SearchableTimestampsApplication,
)
from eventsourcing.postgres import PostgresDatastore
from eventsourcing.tests.postgres_utils import drop_postgres_table


class SearchableTimestampsTestCase(TestCase):
    env: Dict[str, str]

    def test(self) -> None:
        # Construct application.
        app = SearchableTimestampsApplication(env=self.env)
        timestamp0 = create_utc_datetime_now()
        sleep(1e-5)

        # Book new cargo.
        tracking_id = app.book_new_cargo(
            origin=Location["NLRTM"],
            destination=Location["USDAL"],
            arrival_deadline=create_utc_datetime_now() + timedelta(weeks=3),
        )
        timestamp1 = create_utc_datetime_now()
        sleep(1e-5)

        # Change destination.
        app.change_destination(tracking_id, destination=Location["AUMEL"])
        timestamp2 = create_utc_datetime_now()
        sleep(1e-5)

        # View the state of the cargo tracking at particular times.
        with self.assertRaises(AggregateNotFound):
            app.get_cargo_at_timestamp(tracking_id, timestamp0)

        cargo_at_timestamp1 = app.get_cargo_at_timestamp(tracking_id, timestamp1)
        self.assertEqual(cargo_at_timestamp1.destination, Location["USDAL"])

        cargo_at_timestamp2 = app.get_cargo_at_timestamp(tracking_id, timestamp2)
        self.assertEqual(cargo_at_timestamp2.destination, Location["AUMEL"])


class WithSQLite(SearchableTimestampsTestCase):
    env = {
        "PERSISTENCE_MODULE": "eventsourcing.examples.searchabletimestamps.sqlite",
        "SQLITE_DBNAME": ":memory:",
    }


class WithPostgreSQL(SearchableTimestampsTestCase):
    env = {"PERSISTENCE_MODULE": "eventsourcing.examples.searchabletimestamps.postgres"}

    def setUp(self) -> None:
        super().setUp()
        os.environ["POSTGRES_DBNAME"] = "eventsourcing"
        os.environ["POSTGRES_HOST"] = "127.0.0.1"
        os.environ["POSTGRES_PORT"] = "5432"
        os.environ["POSTGRES_USER"] = "eventsourcing"
        os.environ["POSTGRES_PASSWORD"] = "eventsourcing"
        self.drop_tables()

    def tearDown(self) -> None:
        self.drop_tables()
        super().tearDown()

    def drop_tables(self) -> None:
        db = PostgresDatastore(
            os.environ["POSTGRES_DBNAME"],
            os.environ["POSTGRES_HOST"],
            os.environ["POSTGRES_PORT"],
            os.environ["POSTGRES_USER"],
            os.environ["POSTGRES_PASSWORD"],
        )
        drop_postgres_table(db, "public.searchabletimestampsapplication_events")
        drop_postgres_table(db, "public.searchabletimestampsapplication_timestamps")
        db.close()


del SearchableTimestampsTestCase