persistence — Persistence¶
This module provides a cohesive mechanism for storing and retrieving domain events.
This module, along with the concrete persistence modules that adapt particular database management systems, are the most important parts of this library. The other modules (domain, application, projection, and system) serve primarily as guiding examples of how to use the persistence modules to build event-sourced applications and event-driven systems.
Requirements¶
These requirements were written after industry-wide discussions about what is and isn’t event sourcing demonstrated the need for such a statement. They effectively summarise the technical character of an adequate persistence mechanism for event sourcing, and happen to describe what this module essentially implements.
In summary: there needs to be one sequence of events for each aggregate, and usually one sequence for the application as a whole; the positions in these sequences must be occupied uniquely, with new additions inserted at the end of the sequence; each event should be recorded in both kinds of sequence atomically; and this atomic recording can be extended to include unique records that track which event notification has been processed when new events result from processing an event notification.
1. We need a universal type for storing domain events, because an application will have different types of domain event and we want to record all of the events in the same way. The term ‘stored event’ shall be used to refer to objects of this type.
2. We need to record each domain event in a sequence for its aggregate, because we also need to select the events for an aggregate when reconstructing an aggregate from its events. The term ‘aggregate sequence’ shall be used to refer to the sequence of events of an individual aggregate.
3. We need domain events to be recorded in sequential order in their aggregate sequence, because domain events will be generated in sequential order, and used in sequential order to reconstruct the state of the aggregate.
4. We need domain events to be recorded uniquely in their aggregate sequence, so that only one domain event can be recorded at any given position in its aggregate sequence. Aggregate events and snapshots will therefore need to be stored separately. This requirement provides optimistic concurrency control, but it also protects against any subsequent over-writing of recorded domain events.
5. We sometimes need aggregate events to be positioned in a global sequence of event notifications for the application as a whole. The term ‘application sequence’ shall be used to refer to the sequence of events of an application as a whole.
6. We need event notifications to be recorded in sequential order in their application sequence, because we also need to propagate event notifications in the order that they were recorded.
7. We need event notifications to be recorded uniquely in their application sequence, so that only one aggregate event can be recorded at any given position in its application sequence. This requirement protects against any concurrent writing or subsequent over-writing of recorded event notifications.
8. When recording an aggregate event in both an aggregate sequence and an application sequence, we need atomic recording of aggregate events with event notifications, because we need to exclude the possibility that an aggregate event will appear in one sequence but not in the other. That is, we need to avoid dual-writing in the recording of aggregate events and event notifications.
9. We sometimes need to record a notification tracking object that indicates both the position in an application sequence of an event notification that has been processed, and the application to which that sequence belongs. We need tracking records to be recorded uniquely. This requirement supports knowing what has been processed and protects against subsequent over-writing of recorded notification tracking records.
10. When tracking event notifications, we need atomic recording of tracking objects with new aggregate events (or any other new application state) generated from processing the event notification represented by that tracking object, because we need to exclude the possibility that a tracking object will be recorded without the consequences of processing the event notification it represents, and vice versa. That is, we need to avoid dual writing in the consumption of event notifications. This effectively provides “exactly once” semantics for the processing of event notifications into recorded application state changes.
11. When recording aggregate events in an application sequence, we need the “insert order” and the “commit order” to be the same, so that those following an application sequence don’t experience overlooking things committed later in time that were inserted earlier in the sequence. This is a constraint on concurrent recording of the application sequence, which effectively serialises the recording of aggregate events in an application sequence.
12. When recording aggregate events in an application sequence, we want to know the positions of the aggregate events in the application sequence, so that we can detect when those aggregate events have been processed by another application in an event-driven system.
The sections below describe how these requirements are implemented by this module.
Overview¶
A stored event is the universal type of object used in the library to represent domain events of different types. By using a common type for the representation of domain events, all domain events can be stored and retrieved in a common way.
An aggregate sequence is a sequence of stored events for an aggregate. The originator version number of the event determines its position in its sequence.
An event notification is a stored event that also has a notification ID. The notification ID identifies the position of the event in an application sequence.
An application sequence is a sequence of event notifications for an application. It includes all stored events of all aggregate sequences.
A tracking object indicates the position of an event notification in an application sequence.
A recorder inserts stored event objects in a database when domain events are stored in an event store, and selects stored events from a database when domain events are retrieved from an event store. Some recorders atomically record stored events in an aggregate sequence. Some recorders atomically record stored events in both an aggregate sequence and an application sequence. Some recorders atomically record stored events in both an aggregate sequence and an application sequence along with a tracking record that indicates the position of an event notification that was processed when those stored events were generated.
A transcoder serializes and deserializes the state of a domain event.
A compressor compresses and decompresses the serialized state of of domain event. Compressed state may or may not also be encrypted after being compressed, or decrypted before being decompressed.
A cipher encrypts and decrypts the serialized state of of domain event. The serialized state may or may not be have been compressed before being encrypted, or be compressed after being decrypted.
A mapper converts domain events to stored events, and converts stored events back to a domain events.
An event store stores and retrieves domain events. The event store uses a mapper to convert domain events to stored events, and it uses a recorder to insert stored events in a datastore.
An infrastructure factory helps with the construction of persistence infrastructure objects, providing a common interface for applications to construct and configure a particular persistence mechanism from a particular persistence module.
Stored event¶
Stored event objects represent any type of domain event objects in a way that allows the domain event objects to be reconstructed.
The library’s StoredEvent class
is a Python frozen data class.
from eventsourcing.persistence import StoredEvent
A StoredEvent has an originator_id
attribute which is a UUID that identifies the aggregate sequence to
which the domain event belongs. It has an originator_version attribute which
is a Python int that identifies the position of the domain event in that
sequence.
A stored event object also has a state attribute which is a Python
bytes object, that holds the serialized state of the domain event object. And it has a
topic attribute which is a Python
str that identifies the class of the domain event object that is being stored (see Topics).
from uuid import uuid4
stored_event = StoredEvent(
originator_id=uuid4(),
originator_version=1,
topic="eventsourcing.model:DomainEvent",
state=b'{"a": 4}',
)
Transcoder¶
A transcoder serializes and deserializes the state of domain events.
The library’s JSONTranscoder class
can be constructed without any arguments.
from eventsourcing.persistence import Transcoder, JSONTranscoder
transcoder = JSONTranscoder()
The transcoder object has methods encode()
and decode() which are used to perform the
serialisation and deserialisation. The serialised state is a Python bytes object.
json_bytes = transcoder.encode({"a": 4})
copy = transcoder.decode(json_bytes)
assert copy == {"a": 4}
The library’s JSONTranscoder uses the Python
json module. And so, by default, only the object types supported by that
module can be encoded and decoded.
The transcoder can be extended by registering
transcodings for the other types of object used in your domain model’s event objects.
A transcoding will convert other types of object to a representation of the object
that uses other types that are supported. The transcoder method
register() is used to register
individual transcodings with the transcoder.
Transcodings¶
In order to encode and decode types of object that are not supported by
the transcoder by default, custom transcodings need to be defined in code and
registered with the transcoder using the transcoder object’s
register() method. A transcoding
will encode an instance of a type of object into a representation of that object
that can be encoded by the transcoder, and will decode that representation into
the original type of object. This makes it possible to transcode custom value objects,
including custom types that contain custom types.
The library includes a limited collection of custom transcoding objects. For
example, the library’s UUIDAsHex class
transcodes a Python UUID objects as a hexadecimal string.
from uuid import UUID
from eventsourcing.persistence import Transcoding, UUIDAsHex
uuid_transcoding = UUIDAsHex()
id1 = UUID("ffffffffffffffffffffffffffffffff")
python_str = uuid_transcoding.encode(id1)
copy = uuid_transcoding.decode(python_str)
assert copy == id1
The library’s DatetimeAsISO class
transcodes Python datetime objects as ISO strings.
from datetime import datetime
from eventsourcing.persistence import DatetimeAsISO
datetime_transcoding = DatetimeAsISO()
datetime1 = datetime(2021, 12, 31, 23, 59, 59)
python_str = datetime_transcoding.encode(datetime1)
copy = datetime_transcoding.decode(python_str)
assert copy == datetime1
The library’s DecimalAsStr class
transcodes Python Decimal objects as decimal strings.
from decimal import Decimal
from eventsourcing.persistence import DecimalAsStr
decimal_transcoding = DecimalAsStr()
decimal1 = Decimal("1.2345")
python_str = decimal_transcoding.encode(decimal1)
copy = decimal_transcoding.decode(python_str)
assert copy == decimal1
Transcodings are registered with the transcoder using the transcoder object’s
register() method.
transcoder.register(UUIDAsHex())
transcoder.register(DatetimeAsISO())
transcoder.register(DecimalAsStr())
json_bytes = transcoder.encode(id1)
copy = transcoder.decode(json_bytes)
assert copy == id1
json_bytes = transcoder.encode(datetime1)
copy = transcoder.decode(json_bytes)
assert copy == datetime1
json_bytes = transcoder.encode(decimal1)
copy = transcoder.decode(json_bytes)
assert copy == decimal1
Attempting to serialize an unsupported type will result in a Python TypeError.
from datetime import date
date1 = date(2021, 12, 31)
try:
JSONTranscoder().encode(date1)
except TypeError as e:
assert e.args[0] == (
"Object of type <class 'datetime.date'> is not serializable. "
"Please define and register a custom transcoding for this type."
)
else:
raise AssertionError("TypeError not raised")
Attempting to deserialize an unsupported type will also result in a Python TypeError.
try:
JSONTranscoder().decode(json_bytes)
except TypeError as e:
assert e.args[0] == (
"Data serialized with name 'decimal_str' is not deserializable. "
"Please register a custom transcoding for this type."
)
else:
raise AssertionError("TypeError not raised")
The library’s abstract base class Transcoding
can be subclassed to define custom transcodings for other object types. To define
a custom transcoding, simply subclass this base class, assign to the class attribute
type the class transcoded type, and assign a string to the class attribute
name. Then define an encode()
method that converts an instance of that type to a representation that uses a basic
type, and a decode() method that will
convert that representation back to an instance of that type.
from eventsourcing.persistence import Transcoding
class DateAsISO(Transcoding):
type = date
name = "date_iso"
def encode(self, obj: date) -> str:
return obj.isoformat()
def decode(self, data: str) -> date:
return date.fromisoformat(data)
transcoder.register(DateAsISO())
json_bytes = transcoder.encode(date1)
copy = transcoder.decode(json_bytes)
assert copy == date1
Please note, due to the way the Python json module works, it isn’t
currently possible to transcode subclasses of the basic Python types that
are supported by default, such as dict, list, tuple,
str, int, float, and bool. This behaviour
also means an encoded tuple will be decoded as a list.
If you don’t want tuples to be converted to lists, please avoid using tuples
in event objects. This behaviour is coded in Python as C code, and can’t be
suspended without avoiding the use of this C code and thereby incurring a
performance penalty in the transcoding of domain event objects.
json_bytes = transcoder.encode((1, 2, 3))
copy = transcoder.decode(json_bytes)
assert isinstance(copy, list)
assert copy == [1, 2, 3]
Custom or non-basic types that contain other custom or non-basic types can be
supported in the transcoder by registering a transcoding for each non-basic type.
The transcoding for the type which contains non-basic types must return an object
that represents that type by involving the included non-basic objects, and this
representation will be subsequently transcoded by the transcoder using the applicable
transcoding for the included non-basic types. In the example below, SimpleCustomValue
has a UUID and a date as its id and data attributes.
The transcoding for SimpleCustomValue returns a Python dict that includes
the non-basic UUID and date objects. The class ComplexCustomValue
simply has a ComplexCustomValue object as its value attribute, and its
transcoding simply returns that object.
from typing import Any
from uuid import UUID
class SimpleCustomValue:
def __init__(self, id: UUID, date: date):
self.id = id
self.date = date
def __eq__(self, other: object) -> bool:
return (
isinstance(other, SimpleCustomValue) and
self.id == other.id and self.date == other.date
)
class ComplexCustomValue:
def __init__(self, value: SimpleCustomValue):
self.value = value
def __eq__(self, other: object) -> bool:
return (
isinstance(other, ComplexCustomValue) and
self.value == other.value
)
class SimpleCustomValueAsDict(Transcoding):
type = SimpleCustomValue
name = "simple_custom_value"
def encode(self, obj: SimpleCustomValue) -> dict[str, Any]:
return {"id": obj.id, "date": obj.date}
def decode(self, data: dict[str, Any]) -> SimpleCustomValue:
assert isinstance(data, dict)
return SimpleCustomValue(**data)
class ComplexCustomValueAsDict(Transcoding):
type = ComplexCustomValue
name = "complex_custom_value"
def encode(self, obj: ComplexCustomValue) -> SimpleCustomValue:
return obj.value
def decode(self, data: SimpleCustomValue) -> ComplexCustomValue:
assert isinstance(data, SimpleCustomValue)
return ComplexCustomValue(data)
The custom value object transcodings can be registered with the transcoder.
transcoder.register(SimpleCustomValueAsDict())
transcoder.register(ComplexCustomValueAsDict())
We can now transcode an instance of ComplexCustomValueAsDict.
obj1 = ComplexCustomValue(
SimpleCustomValue(
id=UUID("b2723fe2c01a40d2875ea3aac6a09ff5"),
date=date(2000, 2, 20)
)
)
json_bytes = transcoder.encode(obj1)
copy = transcoder.decode(json_bytes)
assert copy == obj1
As you can see from the bytes representation below, the transcoder puts the return value
of each transcoding’s encode() method in a Python dict that has two values
_data_ and _type_. The _data_ value is the return value of the
transcoding’s encode() method, and the _type_ value is the name of the
transcoding. For this reason, it is necessary to avoid defining model objects to have a
Python dict that has only two attributes _data_ and _type_, and
avoid defining transcodings that return such a thing.
assert json_bytes == (
b'{"_type_":"complex_custom_value","_data_":{"_type_":'
b'"simple_custom_value","_data_":{"id":{"_type_":'
b'"uuid_hex","_data_":"b2723fe2c01a40d2875ea3aac6a09ff5"},'
b'"date":{"_type_":"date_iso","_data_":"2000-02-20"}'
b'}}}'
)
Mapper¶
A mapper maps between domain event objects and stored event objects. It brings together a transcoder, and optionally a cipher and a compressor. It is used by an event store.
The library’s Mapper class
must be constructed with a transcoder object.
from eventsourcing.persistence import Mapper
mapper = Mapper[UUID](transcoder=transcoder)
The Mapper class defines a to_stored_event()
method, which converts DomainEvent objects to StoredEvent
objects.
from eventsourcing.domain import DomainEvent
class MyDomainEvent(DomainEvent):
obj: ComplexCustomValue
domain_event = MyDomainEvent(
originator_id=id1,
originator_version=1,
timestamp=MyDomainEvent.create_timestamp(),
obj=obj1,
)
stored_event = mapper.to_stored_event(domain_event)
assert isinstance(stored_event, StoredEvent)
The Mapper class defines a to_domain_event()
method, which converts StoredEvent objects to DomainEvent
objects.
assert mapper.to_domain_event(stored_event) == domain_event
Compression¶
The Mapper class has an optional constructor argument, compressor,
which accepts Compressor objects. A compressor will compress and decompress
the state of stored events, and can be used to reduce the size of stored events, reducing the transport time
between an application and its database, and reducing the size of the database files.
The library’s ZlibCompressor class
implements the abstract base class Compressor.
It can be used to compress and decompress the state of stored events using
Python’s zlib module.
from eventsourcing.compressor import ZlibCompressor
compressor = ZlibCompressor()
mapper = Mapper(
transcoder=transcoder,
compressor=compressor,
)
compressed_stored_event = mapper.to_stored_event(domain_event)
assert mapper.to_domain_event(compressed_stored_event) == domain_event
The compressed state of a stored event will normally be much smaller than the state of a stored event that is not compressed.
assert len(compressed_stored_event.state) < len(stored_event.state)
If you want to use another compression strategy, then implement the
Compressor base class.
Encryption¶
The Mapper class has an optional constructor argument, cipher,
which accepts Cipher objects. A cipher will encrypt and decrypt the state
of stored events within an event-sourced application, inhibiting disclosure of sensitive information in
case of unauthorised access to an application’s database files and backups, or network interception.
The library’s eventsourcing.cipher.AESCipher class
implements the abstract base class Cipher.
It can be used to cryptographically encode and decode the state of stored
events using the AES cipher
from the PyCryptodome library
in GCM mode.
AES is a very fast and secure symmetric block cipher, and is the de facto
standard for symmetric encryption. Galois/Counter Mode (GCM) is a mode of
operation for symmetric block ciphers that is designed to provide both data
authenticity and confidentiality, and is widely adopted for its performance.
Alternatively, the library’s eventsourcing.cryptography.AESCipher class
also implements the abstract base class Cipher and
is functionally equivalent, but uses the Python cryptography library,
and should work as a drop-in replacement for eventsourcing.cipher.AESCipher.
A Cipher is constructed with an
Environment object so that that
encryption can be configured using environment variables.
The AESCipher reads a cipher key
from environment variable CIPHER_KEY. The static method
create_key() can be used to
generate a cipher key. The AES cipher key must be either 16, 24, or
32 bytes long. Please note, the same cipher key must be used to
decrypt stored events as that which was used to encrypt stored events.
from eventsourcing.cipher import AESCipher
from eventsourcing.utils import Environment
key = AESCipher.create_key(num_bytes=32) # 16, 24, or 32
environment = Environment()
environment["CIPHER_KEY"] = key
cipher = AESCipher(environment)
mapper = Mapper(
transcoder=transcoder,
cipher=cipher,
)
encrypted_stored_event = mapper.to_stored_event(domain_event)
assert mapper.to_domain_event(encrypted_stored_event) == domain_event
The state of an encrypted stored event will normally be slightly larger than the state of a stored event that is not encrypted.
assert len(encrypted_stored_event.state) > len(stored_event.state)
If you want to use a different cipher strategy, then implement the base
class Cipher.
Compression and encryption¶
Stored events can be both compressed and encrypted.
mapper = Mapper(
transcoder=transcoder,
cipher=cipher,
compressor=compressor,
)
compressed_and_encrypted = mapper.to_stored_event(domain_event)
assert mapper.to_domain_event(compressed_and_encrypted) == domain_event
The state of a stored event that is both compressed and encrypted will usually be significantly smaller than the state of a stored event that is neither compressed not encrypted. But it will normally be marginally larger than the state of a stored event that is compressed but not encrypted.
assert len(compressed_and_encrypted.state) < len(stored_event.state)
assert len(compressed_and_encrypted.state) > len(compressed_stored_event.state)
Notification¶
Event notifications are used to propagate the state of an event-sourced application in a reliable way. A stored event can be positioned in a “total order”, by attributing to each stored event a notification ID that is higher than any that has been previously recorded. An event notification object brings together the attributes of a stored event with its notification ID.
The library’s Notification class
is a Python frozen data class. It is a subclass of StoredEvent.
from eventsourcing.persistence import Notification
assert issubclass(Notification, StoredEvent)
The Notification class extends StoredEvent
with an id attribute which is a Python int that represents
the position of a stored event in an application sequence.
notification = Notification(
id=123,
originator_id=uuid4(),
originator_version=1,
state=b"{}",
topic="eventsourcing.model:DomainEvent",
)
This class is used when selecting from and subscribing to an application sequence with a recorder. Event notifications from application sequences are also presented by an application in its notification log.
By recording aggregate events atomically with notification IDs, there will never be an aggregate event that is not available to be passed as an event notification message across a network, and there will never be an event notification message passed across a network that doesn’t correspond to a recorded aggregate event. This solves the problem of “dual writing” in the production of event notifications that occurs when a domain model is updated and then separately a message is put on a message queue, a problem of reliability that may cause catastrophic inconsistencies in the state of a system.
Tracking¶
A tracking object identifies the position of an event in an application sequence. This is useful when the results of processing an event are to be stored by an event processing component, so that we can ensure each event is processed only once, and so that we can resume processing events from the correct position.
The library’s Tracking class
is a Python frozen data class.
from eventsourcing.persistence import Tracking
The Tracking class has a notification_id
attribute which in a Python int that indicates the position in an application
sequence of an event notification that has been processed. And it has an
application_name attribute which is a Python str that identifies
the name of that application.
tracking = Tracking(
notification_id=123,
application_name="bounded_context1",
)
By recording, with uniqueness constraints, a tracking object that represents the position of an event notification in its application sequence atomically with new state that results from processing that event notification, we can ensure that domain events are processed “at most once”. And, by resuming event processing from the position indicated by the last recorded tracking object, we can ensure that domain events are processed “at most once”. This constructs “exactly once” semantics, and solves the problem of “dual writing” in the consumption of event notifications that occurs when an event notification is consumed from a message queue with updates made to materialised view and then separately an acknowledgement is sent back to the message queue, a problem of reliability that may cause catastrophic inconsistencies in the state of a system.
Recorders¶
A recorder object adapts a database management system for the purposes of event sourcing.
This library defines four kinds of recorder.
An aggregate recorder simply stores domain events in aggregate sequences, without also positioning the stored events in a total order. An aggregate recorder can be used for storing snapshots of aggregates in an application, and also for storing aggregate events in an application that will not provide event notifications.
An application recorder extends an aggregate recorder by also positioning stored events in an application sequence. Application recorders can be used for storing aggregate events in applications that will provide event notifications.
An tracking recorder supports the atomic recording of a tracking object along with new state. The tracking object indicates the position in an application sequence of an event notification that was being processed when the new state was generated.
A process recorder combines the method of an application recorder and a tracking recorder so that stored events can be recorded atomically with a tracking object.
The library has an abstract base class for each kind of recorder. These abstract base classes are implemented in concrete “persistence modules” that each encapsulate a particular database management system (DBMS) or object-relational mapper (ORM). This library includes a persistence module that keeps events in memory using “plain old Python objects”, which is very fast and useful for development but it isn’t suitable for production because events are not written to disk. This library also includes a persistence module for SQLite and a persistence module for PostgreSQL. Other persistence modules are available.
Recorder classes are conveniently constructed by using an infrastructure factory.
Aggregate recorder¶
The library’s AggregateRecorder class
is an abstract base class for recording events in aggregate sequences.
The methods insert_events()
and select_events() define method
signatures for inserting and selecting stored event objects.
class AggregateRecorder(Recorder, ABC):
"""Abstract base class for inserting and selecting stored events."""
@abstractmethod
def insert_events(
self, stored_events: Sequence[StoredEvent], **kwargs: Any
) -> Sequence[int] | None:
"""Writes stored events into database."""
@abstractmethod
def select_events(
self,
originator_id: UUID | str,
*,
gt: int | None = None,
lte: int | None = None,
desc: bool = False,
limit: int | None = None,
) -> Sequence[StoredEvent]:
"""Reads stored events from database."""
Application recorder¶
The library’s ApplicationRecorder class is an abstract base class
for recording stored event objects in both aggregate and application sequences.
ApplicationRecorder is a subclass of
the aggregate recorder class.
The select_notifications() method
defines a method signature for selecting notification objects
from an application sequence.
The max_notification_id() method
defines a method signature for discovering last notification ID of the application
sequence, for example when estimating progress when processing the event notifications
of an application.
The subscribe() method
defines a method signature for “subscribing” to the application sequence. This
allows iterating over already recorded events, and then also events recorded
after the subscription has begun.
class ApplicationRecorder(AggregateRecorder):
"""Abstract base class for recording events in both aggregate
and application sequences.
"""
@abstractmethod
def select_notifications(
self,
start: int | None,
limit: int,
stop: int | None = None,
topics: Sequence[str] = (),
*,
inclusive_of_start: bool = True,
) -> Sequence[Notification]:
"""Returns a list of Notification objects representing events from an
application sequence. If `inclusive_of_start` is True (the default),
the returned Notification objects will have IDs greater than or equal
to `start` and less than or equal to `stop`. If `inclusive_of_start`
is False, the Notification objects will have IDs greater than `start`
and less than or equal to `stop`.
"""
@abstractmethod
def max_notification_id(self) -> int | None:
"""Returns the largest notification ID in an application sequence,
or None if no stored events have been recorded.
"""
@abstractmethod
def subscribe(
self, gt: int | None = None, topics: Sequence[str] = ()
) -> Subscription[ApplicationRecorder]:
"""Returns an iterator of Notification objects representing events from an
application sequence.
The iterator will block after the last recorded event has been yielded, but
will then continue yielding newly recorded events when they are recorded.
Notifications will have IDs greater than the optional `gt` argument.
"""
Tracking recorder¶
The library’s TrackingRecorder class is an abstract base class
for recording tracking objects and querying successfully recorded tracking
objects.
The insert_tracking() method defines a method signature for
recording tracking objects.
The max_tracking_id() method defines a method signature for
discovering the notification ID of the last successfully recorded tracking object. This is useful when resuming
the processing of event notifications from an application sequence.
The has_tracking_id() method defines a method signature for
discovering whether an event notification has been successfully processed, and can be used by user interfaces
to poll for an eventually-consistent materialised view of the state of an event-sourced application to be updated.
The wait() method defines a method for waiting until a tracking
object has been recorded, and can be used by user interfaces to wait for an eventually-consistent materialised
view of the state of an event-sourced application to be updated. It calls
has_tracking_id() with exponential backoff until
the given timeout (seconds), optionally interrupted by the setting of a given event.
class TrackingRecorder(Recorder, ABC):
"""Abstract base class for recorders that record tracking
objects atomically with other state.
"""
@abstractmethod
def insert_tracking(self, tracking: Tracking) -> None:
"""Records a tracking object."""
@abstractmethod
def max_tracking_id(self, application_name: str) -> int | None:
"""Returns the largest notification ID across all recorded tracking objects
for the named application, or None if no tracking objects have been recorded.
"""
def has_tracking_id(
self, application_name: str, notification_id: int | None
) -> bool:
"""Returns True if given notification_id is None or a tracking
object with the given application_name and a notification ID greater
than or equal to the given notification_id has been recorded.
"""
if notification_id is None:
return True
max_tracking_id = self.max_tracking_id(application_name)
return max_tracking_id is not None and max_tracking_id >= notification_id
def wait(
self,
application_name: str,
notification_id: int | None,
timeout: float = 1.0,
interrupt: Event | None = None,
) -> None:
"""Block until a tracking object with the given application name and a
notification ID greater than equal to the given value has been recorded.
Polls max_tracking_id() with exponential backoff until the timeout
is reached, or until the optional interrupt event is set.
The timeout argument should be a floating point number specifying a
timeout for the operation in seconds (or fractions thereof). The default
is 1.0 seconds.
Raises TimeoutError if the timeout is reached.
Raises WaitInterruptError if the `interrupt` is set before `timeout` is reached.
"""
deadline = monotonic() + timeout
sleep_interval_ms = 100.0
max_sleep_interval_ms = 800.0
while True:
if self.has_tracking_id(application_name, notification_id):
break
if interrupt:
if interrupt.wait(timeout=sleep_interval_ms / 1000):
raise WaitInterruptedError
else:
sleep(sleep_interval_ms / 1000)
remaining = deadline - monotonic()
if remaining < 0:
msg = (
f"Timed out waiting for notification {notification_id} "
f"from application '{application_name}' to be processed"
)
raise TimeoutError(msg)
sleep_interval_ms = min(
sleep_interval_ms * 2, remaining * 1000, max_sleep_interval_ms
)
Process recorder¶
The library’s ProcessRecorder class is an abstract base class
for recording stored event objects in both an aggregate and application sequences,
along with tracking objects. ProcessRecorder is a
subclass of the application recorder and tracking recorder
classes.
Concrete process recorders will extend the insert_events()
method so that a tracking object will be recorded within the same transaction
as the events.
class ProcessRecorder(TrackingRecorder, ApplicationRecorder, ABC):
pass
Persistence modules¶
Persistence modules encapsulate a particular database management system (DBMS) or object-relational mapper (ORM), by implementing recorder classes.
POPO module¶
The eventsourcing.popo persistence module has recorders
that use “plain old Python objects” to keep stored events in a
data structure in memory. These recorders provide the fastest
way of running an application, and thereby support rapid development
of event-sourced applications. Stored events can be recorded
and retrieved in microseconds, allowing a test suite to run in milliseconds.
The POPOAggregateRecorder class implements the
aggregate recorder abstract base class.
from eventsourcing.popo import POPOAggregateRecorder
# Construct aggregate recorder.
aggregate_recorder = POPOAggregateRecorder()
# Insert stored events.
aggregate_recorder.insert_events([stored_event])
# Select stored events from an aggregate sequence.
recorded_events = aggregate_recorder.select_events(stored_event.originator_id)
assert recorded_events[0] == stored_event
The POPOApplicationRecorder class
implements the application recorder abstract base class
by extending POPOAggregateRecorder.
from eventsourcing.popo import POPOApplicationRecorder
# Construct application recorder.
application_recorder = POPOApplicationRecorder()
# Insert stored events.
application_recorder.insert_events([stored_event])
# Select stored events from an aggregate sequence.
recorded_events = application_recorder.select_events(stored_event.originator_id)
assert recorded_events[0] == stored_event
# Select notifications from the application sequence.
notifications = application_recorder.select_notifications(start=1, limit=10)
assert notifications[0].id == 1
assert notifications[0].originator_id == stored_event.originator_id
assert notifications[0].originator_version == stored_event.originator_version
assert notifications[0].state == stored_event.state
assert notifications[0].topic == stored_event.topic
# Subscribe to application sequence from a tracked position.
max_tracking_id = None
with application_recorder.subscribe(gt=max_tracking_id) as subscription:
for notification in subscription:
max_tracking_id = notification.id
assert notifications[0].id == 1
assert notifications[0].originator_id == stored_event.originator_id
assert notifications[0].originator_version == stored_event.originator_version
assert notifications[0].state == stored_event.state
assert notifications[0].topic == stored_event.topic
subscription.stop()
The POPOProcessRecorder class
implements the process recorder abstract base class
by extending POPOApplicationRecorder.
from eventsourcing.popo import POPOProcessRecorder
# Construct process recorder.
process_recorder = POPOProcessRecorder()
# Define a tracking object.
tracking = Tracking(notification_id=21, application_name="upstream")
# Insert stored events atomically with a tracking object.
process_recorder.insert_events([stored_event], tracking=tracking)
# Select stored events from an aggregate sequence.
recorded_events = process_recorder.select_events(stored_event.originator_id)
assert recorded_events[0] == stored_event
# Select notifications from the application sequence.
notifications = process_recorder.select_notifications(start=1, limit=10)
assert notifications[0].id == 1
assert notifications[0].originator_id == stored_event.originator_id
assert notifications[0].originator_version == stored_event.originator_version
assert notifications[0].state == stored_event.state
assert notifications[0].topic == stored_event.topic
# Get latest tracked position.
assert process_recorder.max_tracking_id("upstream") == 21
SQLite module¶
The eventsourcing.sqlite persistence module supports recording events in
SQLite. Recorder classes use the Python sqlite3
module.
The SQLiteDatastore class encapsulates a SQLite database
and provides a connection pool.
from eventsourcing.sqlite import SQLiteDatastore
# Create an SQLite database.
datastore = SQLiteDatastore(db_name=":memory:")
The SQLiteAggregateRecorder class implements
the aggregate recorder abstract base class, and provides a method
to create a database table for stored events.
from eventsourcing.sqlite import SQLiteAggregateRecorder
# Create an SQLite database.
datastore = SQLiteDatastore(db_name=":memory:")
# Construct aggregate recorder and create database table.
aggregate_recorder = SQLiteAggregateRecorder(datastore)
aggregate_recorder.create_table()
# Insert stored events.
aggregate_recorder.insert_events([stored_event])
# Select stored events from an aggregate sequence.
recorded_events = aggregate_recorder.select_events(stored_event.originator_id)
The SQLiteApplicationRecorder class
implements the application recorder abstract base class
by extending SQLiteAggregateRecorder.
Please note, the SQLiteApplicationRecorder class
does not implement the ApplicationRecorder.subscribe()
method, and so does not support subscribing to application sequences.
from eventsourcing.sqlite import SQLiteApplicationRecorder
# Create an SQLite database.
datastore = SQLiteDatastore(db_name=":memory:")
# Construct application recorder and create database table.
application_recorder = SQLiteApplicationRecorder(datastore)
application_recorder.create_table()
# Insert stored events.
application_recorder.insert_events([stored_event])
# Select stored events from an aggregate sequence.
recorded_events = application_recorder.select_events(stored_event.originator_id)
# Select notifications from the application sequence.
notifications = application_recorder.select_notifications(start=1, limit=10)
The SQLiteTrackingRecorder class implements the
tracking recorder abstract base class, and provides
a method to create a database table for tracking records.
from eventsourcing.sqlite import SQLiteTrackingRecorder
# Construct tracking recorder and create table.
tracking_recorder = SQLiteTrackingRecorder(datastore)
tracking_recorder.create_table()
# Construct tracking object.
tracking = Tracking(notification_id=21, application_name="upstream")
# Insert tracking object.
tracking_recorder.insert_tracking(tracking=tracking)
# Get latest tracked position.
assert tracking_recorder.max_tracking_id("upstream") == 21
# Check if an event notification has been processed.
assert tracking_recorder.has_tracking_id("upstream", 21)
assert not tracking_recorder.has_tracking_id("upstream", 22)
The SQLiteProcessRecorder class
implements the process recorder abstract base class
by extending SQLiteApplicationRecorder.
from eventsourcing.sqlite import SQLiteProcessRecorder
# Create an SQLite database.
datastore = SQLiteDatastore(db_name=":memory:")
# Construct process recorder and create database table.
process_recorder = SQLiteProcessRecorder(datastore)
process_recorder.create_table()
# Construct a tracking object.
tracking = Tracking(notification_id=21, application_name="upstream")
# Insert stored events atomically with a tracking object.
process_recorder.insert_events([stored_event], tracking=tracking)
# Select stored events from an aggregate sequence.
recorded_events = process_recorder.select_events(stored_event.originator_id)
# Select notifications from the application sequence.
notifications = process_recorder.select_notifications(start=1, limit=10)
# Get latest tracked position.
assert process_recorder.max_tracking_id("upstream") == 21
PostgreSQL module¶
The eventsourcing.postgres persistence module supports storing events in
PostgreSQL using the third party Psycopg v3
package. This code is tested with PostgreSQL versions 12, 13, 14, 15, 16, and 17.
The PostgresDatastore class encapsulates a Postgres database
and provides a connection pool.
from eventsourcing.postgres import PostgresDatastore
# Construct datastore object.
datastore = PostgresDatastore(
dbname = "eventsourcing",
host = "127.0.0.1",
port = "5432",
user = "eventsourcing",
password = "eventsourcing",
)
The PostgresAggregateRecorder class implements
the aggregate recorder abstract base class, and provides
a method to create a database table for stored events.
from eventsourcing.postgres import PostgresAggregateRecorder
# Construct aggregate recorder and create table.
aggregate_recorder = PostgresAggregateRecorder(datastore)
aggregate_recorder.create_table()
# Insert stored events.
aggregate_recorder.insert_events([stored_event])
# Select stored events from an aggregate sequence.
recorded_events = aggregate_recorder.select_events(stored_event.originator_id)
assert recorded_events[0] == stored_event, (recorded_events[0], stored_event)
The PostgresApplicationRecorder class
implements the application recorder
by extending PostgresAggregateRecorder.
from eventsourcing.postgres import PostgresApplicationRecorder
# Construct application recorder and create table.
application_recorder = PostgresApplicationRecorder(datastore)
application_recorder.create_table()
# Insert stored events.
application_recorder.insert_events([stored_event])
# Select stored events from an aggregate sequence.
recorded_events = application_recorder.select_events(stored_event.originator_id)
assert recorded_events[0] == stored_event
# Select notifications from the application sequence.
notifications = application_recorder.select_notifications(start=1, limit=10)
assert notifications[0].id == 1
assert notifications[0].originator_id == stored_event.originator_id
assert notifications[0].originator_version == stored_event.originator_version
# Subscribe to application sequence from a tracked position.
max_tracking_id = None
with application_recorder.subscribe(gt=max_tracking_id) as subscription:
for notification in subscription:
max_tracking_id = notification.id
assert notifications[0].id == 1
assert notifications[0].originator_id == stored_event.originator_id
assert notifications[0].originator_version == stored_event.originator_version
subscription.stop()
The PostgresTrackingRecorder class implements the
tracking recorder abstract base class, and provides
a method to create a database table for tracking records.
from eventsourcing.postgres import PostgresTrackingRecorder
# Construct tracking recorder and create table.
tracking_recorder = PostgresTrackingRecorder(datastore)
tracking_recorder.create_table()
# Construct tracking object.
tracking = Tracking(notification_id=21, application_name="upstream")
# Insert tracking object.
tracking_recorder.insert_tracking(tracking=tracking)
# Get latest tracked position.
assert tracking_recorder.max_tracking_id("upstream") == 21
# Check if an event notification has been processed.
assert tracking_recorder.has_tracking_id("upstream", 21)
assert not tracking_recorder.has_tracking_id("upstream", 22)
The PostgresProcessRecorder class
implements the process recorder abstract base class
by combining and extending PostgresApplicationRecorder and
PostgresTrackingRecorder.
from eventsourcing.postgres import PostgresProcessRecorder
# Construct process recorder and create table.
process_recorder = PostgresProcessRecorder(datastore)
process_recorder.create_table()
# Construct tracking object.
tracking = Tracking(notification_id=21, application_name="upstream")
# Insert stored events atomically with tracking object.
process_recorder.insert_events([stored_event], tracking=tracking)
# Select stored events from an aggregate sequence.
recorded_events = process_recorder.select_events(stored_event.originator_id)
assert recorded_events[0] == stored_event
# Select notifications from the application sequence.
notifications = process_recorder.select_notifications(start=1, limit=10)
assert notifications[0].id == 1
assert notifications[0].originator_id == stored_event.originator_id
assert notifications[0].originator_version == stored_event.originator_version
# Get latest tracked position.
assert process_recorder.max_tracking_id("upstream") == 21
Other persistence modules¶
Other persistence modules are available or under development. There are extension projects that support using popular ORMs for persistence such as Django and SQLAlchemy, specialist event stores such as Axon Server and KurrentDB, and popular NoSQL databases such as DynamoDB.
Event store¶
The library’s EventStore class provides methods for storing
and retrieving domain event objects. It combines a mapper
and a recorder, so that domain event objects will be converted to stored event
objects before being recorded, and stored events will be converted to domain event objects after
being selected.
The EventStore class must be constructed with a mapper
and a recorder.
The EventStore defines an object method
put() which can be used to
store a list of new domain event objects. If any of these domain event
objects conflict with any already existing domain event object (because
they have the same aggregate ID and version number), an exception will
be raised and none of the new events will be stored.
The EventStore class defines an object method
get() which can be used to
get a list of domain event objects. Only the originator_id argument
is required, which is the ID of the aggregate for which existing events
are wanted. The arguments gt, lte, limit, and desc
condition the selection of events to be greater than a particular version
number, less then or equal to a particular version number, limited in
number, or selected in a descending fashion. The selection is by default
ascending, unlimited, and otherwise unrestricted such that all the previously
stored domain event objects for a particular aggregate will be returned
in the order in which they were created.
from eventsourcing.persistence import EventStore
application_recorder = POPOApplicationRecorder()
event_store = EventStore[UUID](
mapper=mapper,
recorder=application_recorder,
)
event_store.put([domain_event])
domain_events = list(event_store.get(id1))
assert domain_events == [domain_event]
Infrastructure factory¶
To help with the construction of the persistence infrastructure objects mentioned above, and to support configuring applications with environment variables, the library provides an “infrastructure factory”.
The library’s InfrastructureFactory class
is an abstract base class for infrastructure factories, which provides a
standard way for applications to select, configure, and construct recorder
classes from persistence modules. Each persistence module has a concrete
infrastructure factory.
from eventsourcing.persistence import InfrastructureFactory
The class method construct()
will construct a concrete infrastructure factory from a persistence module.
By default, it will construct the infrastructure factory from the library’s “plain old Python objects” persistence module.
factory = InfrastructureFactory.construct()
assert type(factory).__name__ == "POPOFactory"
assert type(factory).__module__ == "eventsourcing.popo"
The factory method aggregate_recorder()
will construct an aggregate recorder from the selected persistence module.
recorder = factory.aggregate_recorder()
assert isinstance(recorder, AggregateRecorder)
assert type(recorder).__name__ == "POPOAggregateRecorder"
assert type(recorder).__module__ == "eventsourcing.popo"
The factory method application_recorder()
will construct an application recorder from the selected persistence module.
recorder = factory.application_recorder()
assert isinstance(recorder, ApplicationRecorder)
assert type(recorder).__name__ == "POPOApplicationRecorder"
assert type(recorder).__module__ == "eventsourcing.popo"
The factory method tracking_recorder()
will construct a tracking recorder from the selected persistence module.
recorder = factory.tracking_recorder()
assert isinstance(recorder, TrackingRecorder)
assert type(recorder).__name__ == "POPOTrackingRecorder"
assert type(recorder).__module__ == "eventsourcing.popo"
The factory method process_recorder()
will construct a process recorder from the selected persistence module.
recorder = factory.process_recorder()
assert isinstance(recorder, ProcessRecorder)
assert type(recorder).__name__ == "POPOProcessRecorder"
assert type(recorder).__module__ == "eventsourcing.popo"
The factory method transcoder()
will construct a transcoder object.
transcoder = factory.transcoder()
transcoder.register(UUIDAsHex())
transcoder.register(DatetimeAsISO())
transcoder.register(DateAsISO())
transcoder.register(ComplexCustomValueAsDict())
transcoder.register(SimpleCustomValueAsDict())
The factory method mapper()
will construct a mapper object.
mapper = factory.mapper(transcoder=transcoder)
The factory method event_store()
will construct an event store object.
event_store = factory.event_store(
mapper=mapper,
recorder=recorder,
)
event_store.put([domain_event])
domain_events = list(event_store.get(id1))
assert domain_events == [domain_event]
Environment variables¶
Environment variables can be used to select and configure persistence infrastructure. In this way, an event-sourced application can be defined independently of any particular persistence infrastructure, and then easily configured in different ways at different times.
You can set the environment variable PERSISTENCE_MODULE to the topic of a
persistence module before calling construct().
This will condition the construct() method to
construct a factory from the specified persistence module.
For example, to explicitly select the eventsourcing.popo persistence module,
use the module name string 'eventsourcing.popo' as the value of PERSISTENCE_MODULE.
environ = Environment()
environ["PERSISTENCE_MODULE"] = "eventsourcing.popo"
factory = InfrastructureFactory.construct(environ)
assert type(factory).__module__ == "eventsourcing.popo"
The environment variables COMPRESSOR_TOPIC, CIPHER_TOPIC, and CIPHER_KEY may be
used to enable compression and encryption of stored events.
The environment variables MAPPER_TOPIC and TRANSCODER_TOPIC can be used to
select alternative mappers and transcoders.
Persistence modules use their own particular set of environment variables, of which some are required and some are optional.
Environment object¶
The Environment class is used to encapsulate a set of environment
variables. It can be constructed with an optional name argument and a mapping of key-value pairs,
for example os.environ.
environ = Environment(
name="MyApp",
env={
"PERSISTENCE_MODULE": "eventsourcing.popo",
}
)
The Environment class has a get() method
which can be used to get a value for a key.
value = environ.get("PERSISTENCE_MODULE")
assert value == "eventsourcing.popo"
The get() method first searches for environment variables prefixed
with the uppercase name, which can be used to distinguish between environment variables defined for different
applications. It falls back onto the standard names, and returns None if no value is found.
import os
os.environ["MYAPPLICATION_PERSISTENCE_MODULE"] = "eventsourcing.postgres"
os.environ["MYPROJECTION_PERSISTENCE_MODULE"] = "eventsourcing.sqlite"
environ = Environment(name="MyApplication", env=os.environ)
value = environ.get("PERSISTENCE_MODULE")
assert value == "eventsourcing.postgres"
environ = Environment(name="MyProjection", env=os.environ)
value = environ.get("PERSISTENCE_MODULE")
assert value == "eventsourcing.sqlite"
SQLite environment¶
Similarly, the SQLite module can be selected and configured with environment variables.
environ = Environment()
environ["PERSISTENCE_MODULE"] = "eventsourcing.sqlite"
environ["SQLITE_DBNAME"] = ":memory:"
environ["SQLITE_LOCK_TIMEOUT"] = "10"
environ["SQLITE_SINGLE_ROW_TRACKING"] = "t"
environ["CREATE_TABLE"] = "t"
The environment variable SQLITE_DBNAME is required to set the name of a database.
The value of this variable is normally a file path, but an in-memory SQLite database
can also be specified using this variable.
Writing to a file-base SQLite database is serialised with a lock. The lock timeout can
be adjusted by setting the environment variable SQLITE_LOCK_TIMEOUT. Setting this
value to a positive number of seconds will cause attempts to lock the SQLite database
for writing to timeout after that duration. By default this value is 5 (seconds).
Please note, a file-based SQLite database will have its journal mode set to use
write-ahead logging (WAL), which allows reading to proceed concurrently reading
and writing.
The optional environment variable SQLITE_SINGLE_ROW_TRACKING may be used to disable the
single-row tracking implementation of tracking recorders, which uses one row in a tracking table
per application name and which is the default, and instead continue with the legacy multi-row tracking
implementation, which records a new row for each tracking object. Setting this to a “true” value
("y", "yes", "t", "true", "on", or "1") has no effect because that is the
default. Setting this value to to a “false” value ("n", "no", "f", "false", "off",
or "0") will mean that tracking recorders will continue with the legacy multi-row tracking
implementation, unless a table for single-row tracking has already been created, in which case an exception
will be raised at runtime. Migration from multi-row tracking to single-row tracking will automatically happen
when a tracking recorder is constructed unless this value is “false” or CREATE_TABLE is “false”.
The optional environment variable CREATE_TABLE controls whether or not database tables are
created when a recorder is constructed by a factory. If the tables already exist, the CREATE_TABLE
may be set to a “false” value ("n", "no", "f", "false", "off", or "0").
This value is by default “true” which is normally okay because the tables are created only if they
do not exist.
Having configured the environment to use SQLite, the infrastructure can be constructed and used in a standard way.
factory = InfrastructureFactory.construct(environ)
recorder = factory.application_recorder()
assert isinstance(recorder, SQLiteApplicationRecorder)
mapper = factory.mapper(transcoder=transcoder)
event_store = factory.event_store(
mapper=mapper,
recorder=recorder,
)
event_store.put([domain_event])
domain_events = list(event_store.get(id1))
assert domain_events == [domain_event]
Various alternatives for specifying in-memory SQLite databases are listed below.
# Configure SQLite database URI. Either use a file-based DB;
environ['SQLITE_DBNAME'] = '/path/to/your/sqlite-db'
# or use an in-memory DB with cache not shared, only works with single thread;
environ['SQLITE_DBNAME'] = ':memory:'
# or use an unnamed in-memory DB with shared cache, works with multiple threads;
environ['SQLITE_DBNAME'] = 'file::memory:?mode=memory&cache=shared'
# or use a named in-memory DB with shared cache, to create distinct databases.
environ['SQLITE_DBNAME'] = 'file:application1?mode=memory&cache=shared'
As above, the optional environment variables COMPRESSOR_TOPIC, CIPHER_KEY,
and CIPHER_TOPIC may be used to enable compression and encryption of stored
events recorded in SQLite.
PostgreSQL environment¶
Similarly, the PostgreSQL module can be selected and configured with environment variables.
environ = Environment()
environ["PERSISTENCE_MODULE"] = "eventsourcing.postgres"
environ["POSTGRES_DBNAME"] = "eventsourcing"
environ["POSTGRES_HOST"] = "127.0.0.1"
environ["POSTGRES_PORT"] = "5432"
environ["POSTGRES_USER"] = "eventsourcing"
environ["POSTGRES_PASSWORD"] = "eventsourcing"
environ["POSTGRES_CONNECT_TIMEOUT"] = "30"
environ["POSTGRES_IDLE_IN_TRANSACTION_SESSION_TIMEOUT"] = "5"
environ["POSTGRES_POOL_SIZE"] = "5"
environ["POSTGRES_MAX_OVERFLOW"] = "10"
environ["POSTGRES_MAX_WAITING"] = "0"
environ["POSTGRES_CONN_MAX_AGE"] = ""
environ["POSTGRES_PRE_PING"] = "n"
environ["POSTGRES_LOCK_TIMEOUT"] = "5"
environ["POSTGRES_SCHEMA"] = "public"
environ["POSTGRES_SINGLE_ROW_TRACKING"] = "y"
environ["CREATE_TABLE"] = "t"
The environment variables POSTGRES_DBNAME, POSTGRES_HOST, POSTGRES_PORT,
POSTGRES_USER, and POSTGRES_PASSWORD (or POSTGRES_GET_PASSWORD_TOPIC) are
required to set the name of a database, the database server’s host name and port number,
and the database user name and password.
As an alternative to setting a fixed password in POSTGRES_PASSWORD, you can set
POSTGRES_GET_PASSWORD_TOPIC to indicate the topic of a function
that returns passwords. This function will be called each time when creating new database
connections. This variable supports using database services authenticated with Identity
Access Management (IAM), sometimes referred to as token-based authentication, for which
the password is a token that is changed perhaps every 15 minutes. If POSTGRES_GET_PASSWORD_TOPIC
is set, the POSTGRES_PASSWORD variable is not required and will be ignored. The value
of this variable should be resolvable using resolve_topic() to
a Python function that expects no arguments and returns a Python str.
The optional environment variable POSTGRES_CONNECT_TIMEOUT may be used to set
the maximum time in seconds that a client can wait to receive a connection from the pool.
If set, an integer value is required. The default value is 30.
The optional environment variable POSTGRES_IDLE_IN_TRANSACTION_SESSION_TIMEOUT may be used to
timeout sessions that are idle in a transaction. If a transaction cannot be ended for some reason,
perhaps because the database server cannot be reached, the transaction may remain in an idle
state and any locks will continue to be held. By timing out the session, transactions will be ended,
locks will be released, and the connection slot will be freed. A value of 0 means sessions in an idle
transaction will not timeout. Setting this value to a positive integer number of seconds will cause
sessions in an idle transaction to timeout after that duration has passed. The default value is 5.
The optional environment variable POSTGRES_POOL_SIZE is used to control the maximum number of
database connections that will be kept open in the connection pool. A value of 0 means there will
be zero connections maintained in the pool, and each access to the database will cause a new connection
to be made. If set, an integer value is required. The default value is 5. Please note, the pool will
only create a connection when there isn’t one in the pool and a connection is needed, so that if your
application is single-threaded, only one connection will be created, even if the pool size is configured
to be greater than 1.
The optional environment variable POSTGRES_MAX_OVERFLOW is used to control the maximum number
of additional connections that can be opened, above the pool size. The maximum number of connections
that can be opened is the sum of POSTGRES_POOL_SIZE and POSTGRES_MAX_OVERFLOW. However
connections that are returned to the pool when it is full will be immediately closed. If set, an
integer value is required. The default value is 10.
The optional environment variable POSTGRES_MAX_WAITING is used to control the maximum number
of connection requests that can be queued to the pool, after which new requests will fail.
If set, an integer is required. The default value is 0, which means there is no queue limit.
The optional environment variable POSTGRES_CONN_MAX_AGE is used to control the length of time
in seconds before a connection is closed when returned to the pool. If this value is zero, each
connection will only be used once. If your database terminates idle connections after some
time, you should set POSTGRES_CONN_MAX_AGE to a lower value, so that attempts are not made to
use connections that have been terminated by the database server, and so that your connections are
not suddenly terminated in the middle of a transaction. If set, an float is required. The default
value is 3600.0 (one hour).
The optional environment variable POSTGRES_PRE_PING may be used to enable pessimistic
disconnection handling. Setting this to a “true” value ("y", "yes", "t", "true",
"on", or "1") means database connections will be checked that they are usable before
executing statements, and database connections remade if the connection is not usable. This
value is by default “false”, meaning connections will not be checked before they are reused.
Enabling this option will incur a small impact on performance.
The optional environment variable POSTGRES_LOCK_TIMEOUT may be used to enable a timeout
on acquiring an ‘EXCLUSIVE’ mode table lock when inserting stored events. To avoid interleaving
of inserts when writing events, an ‘EXCLUSIVE’ mode table lock is acquired when inserting events.
This avoids a potential issue where insert order and commit order are not the same.
Locking the table effectively serialises writing events. It prevents concurrent transactions
interleaving inserts, which would potentially cause notification log readers that are tailing
the application notification log to miss event notifications. Reading from the table can proceed
concurrently with other readers and writers, since selecting acquires an ‘ACCESS SHARE’ lock which
does not block and is not blocked by the ‘EXCLUSIVE’ lock. This issue of interleaving inserts
by concurrent writers is not exhibited by SQLite, which locks the entire database for writing,
effectively serializing the operations of concurrent readers. When its journal mode is set to
use write ahead logging, reading can proceed concurrently with writing. By default, this timeout
has the value of 0 seconds, which means attempts to acquire the lock will not timeout. Setting
this value to a positive integer number of seconds will cause attempt to obtain this lock to
timeout after that duration has passed. The lock will be released when the transaction ends.
The optional environment variable POSTGRES_SCHEMA may be used to configure the table names
used by the recorders to be qualified with a schema name. Setting this will create tables in
a specific PostgreSQL schema. See the
PostgreSQL Schemas
documentation for more information about creating and using PostgreSQL schemas safely.
The optional environment variable POSTGRES_SINGLE_ROW_TRACKING may be used to disable the
single-row tracking implementation of tracking recorders, which uses one row in a tracking table
per application name and which is the default, and instead continue with the legacy multi-row tracking
implementation, which records a new row for each tracking object. Setting this to a “true” value
("y", "yes", "t", "true", "on", or "1") has no effect because that is the
default. Setting this value to to a “false” value ("n", "no", "f", "false", "off",
or "0") will mean that tracking recorders will continue with the legacy multi-row tracking
implementation, unless a table for single-row tracking has already been created, in which case an exception
will be raised at runtime. Migration from multi-row tracking to single-row tracking will automatically happen
when a tracking recorder is constructed unless this value is “false” or CREATE_TABLE is “false”.
The optional environment variable CREATE_TABLE controls whether or not database tables are
created when a recorder is constructed by a factory. If the tables already exist, the CREATE_TABLE
may be set to a “false” value ("n", "no", "f", "false", "off", or "0").
This value is by default “true” which is normally okay because the tables are created only if they
do not exist.
Having configured the environment to use PostgreSQL, the infrastructure can be constructed and used in a standard way.
factory = InfrastructureFactory.construct(environ)
recorder = factory.application_recorder()
assert isinstance(recorder, PostgresApplicationRecorder)
mapper = factory.mapper(transcoder=transcoder)
event_store = factory.event_store(
mapper=mapper,
recorder=recorder,
)
event_store.put([domain_event])
domain_events = list(event_store.get(id1))
assert domain_events == [domain_event]
As above, the optional environment variables COMPRESSOR_TOPIC, CIPHER_KEY,
and CIPHER_TOPIC may be used to enable compression and encryption of stored
events recorded in PostgreSQL.
Code reference¶
- class eventsourcing.persistence.Queue(maxsize: int = 0)[source]¶
Bases:
Queue[_T]- put(item: _T, block: bool = True, timeout: float | None = None) None[source]¶
Put an item into the queue.
If optional args ‘block’ is true and ‘timeout’ is None (the default), block if necessary until a free slot is available. If ‘timeout’ is a non-negative number, it blocks at most ‘timeout’ seconds and raises the Full exception if no free slot was available within that time. Otherwise (‘block’ is false), put an item on the queue if a free slot is immediately available, else raise the Full exception (‘timeout’ is ignored in that case).
Raises ShutDown if the queue has been shut down.
- get(block: bool = True, timeout: float | None = None) _T[source]¶
Remove and return an item from the queue.
If optional args ‘block’ is true and ‘timeout’ is None (the default), block if necessary until an item is available. If ‘timeout’ is a non-negative number, it blocks at most ‘timeout’ seconds and raises the Empty exception if no item was available within that time. Otherwise (‘block’ is false), return an item if one is immediately available, else raise the Empty exception (‘timeout’ is ignored in that case).
Raises ShutDown if the queue has been shut down and is empty, or if the queue has been shut down immediately.
- shutdown(immediate: bool = False) None[source]¶
Shut-down the queue, making queue gets and puts raise ShutDown.
By default, gets will only raise once the queue is empty. Set ‘immediate’ to True to make gets raise immediately instead.
All blocked callers of put() and get() will be unblocked. If ‘immediate’, a task is marked as done for each item remaining in the queue, which may unblock callers of join().
- exception eventsourcing.persistence.ShutDown[source]¶
Bases:
ExceptionRaised when put/get with shut-down queue.
- class eventsourcing.persistence.Transcoding[source]¶
Bases:
ABCAbstract base class for custom transcodings.
- exception eventsourcing.persistence.TranscodingNotRegisteredError[source]¶
Bases:
EventSourcingError,TypeErrorRaised when a transcoding isn’t registered with JSONTranscoder.
- class eventsourcing.persistence.JSONTranscoder[source]¶
Bases:
TranscoderExtensible transcoder that uses the Python
jsonmodule.- register(transcoding: Transcoding) None[source]¶
Registers given transcoding with the transcoder.
- class eventsourcing.persistence.UUIDAsHex[source]¶
Bases:
TranscodingTranscoding that represents
UUIDobjects as hex values.- type¶
alias of
UUID
- class eventsourcing.persistence.DecimalAsStr[source]¶
Bases:
TranscodingTranscoding that represents
Decimalobjects as strings.- type¶
alias of
Decimal
- class eventsourcing.persistence.DatetimeAsISO[source]¶
Bases:
TranscodingTranscoding that represents
datetimeobjects as ISO strings.- type¶
alias of
datetime
- class eventsourcing.persistence.StoredEvent(originator_id: UUID | str, originator_version: int, topic: str, state: bytes)[source]¶
Bases:
objectFrozen dataclass that represents
DomainEventobjects, such as aggregateEventobjects andSnapshotobjects.- originator_id: UUID | str¶
ID of the originating aggregate.
- originator_version: int¶
Position in an aggregate sequence.
- topic: str¶
Topic of a domain event object class.
- state: bytes¶
Serialised state of a domain event object.
- __delattr__(name)¶
Implement delattr(self, name).
- __eq__(other)¶
Return self==value.
- __hash__()¶
Return hash(self).
- __init__(originator_id: UUID | str, originator_version: int, topic: str, state: bytes) None¶
- __repr__()¶
Return repr(self).
- __setattr__(name, value)¶
Implement setattr(self, name, value).
- class eventsourcing.persistence.Cipher(environment: Environment)[source]¶
Bases:
ABCBase class for ciphers.
- abstract __init__(environment: Environment)[source]¶
Initialises cipher with given environment.
- exception eventsourcing.persistence.MapperDeserialisationError[source]¶
Bases:
EventSourcingError,ValueErrorRaised when deserialization fails in a Mapper.
- class eventsourcing.persistence.Mapper(transcoder: Transcoder, compressor: Compressor | None = None, cipher: Cipher | None = None)[source]¶
Bases:
Generic[TAggregateID]Converts between domain event objects and
StoredEventobjects.Uses a
Transcoder, and optionally a cryptographic cipher and compressor.- __init__(transcoder: Transcoder, compressor: Compressor | None = None, cipher: Cipher | None = None)[source]¶
- to_stored_event(domain_event: DomainEventProtocol[TAggregateID]) StoredEvent[source]¶
Converts the given domain event to a
StoredEventobject.
- to_domain_event(stored_event: StoredEvent) DomainEventProtocol[TAggregateID][source]¶
Converts the given
StoredEventto a domain event object.
- exception eventsourcing.persistence.RecordConflictError[source]¶
Bases:
EventSourcingErrorLegacy exception, replaced with IntegrityError.
- exception eventsourcing.persistence.PersistenceError[source]¶
Bases:
EventSourcingErrorThe base class of the other exceptions in this module.
Exception class names follow https://www.python.org/dev/peps/pep-0249/#exceptions
- exception eventsourcing.persistence.InterfaceError[source]¶
Bases:
PersistenceErrorException raised for errors that are related to the database interface rather than the database itself.
- exception eventsourcing.persistence.DatabaseError[source]¶
Bases:
PersistenceErrorException raised for errors that are related to the database.
- exception eventsourcing.persistence.DataError[source]¶
Bases:
DatabaseErrorException raised for errors that are due to problems with the processed data like division by zero, numeric value out of range, etc.
- exception eventsourcing.persistence.OperationalError[source]¶
Bases:
DatabaseErrorException raised for errors that are related to the database’s operation and not necessarily under the control of the programmer, e.g. an unexpected disconnect occurs, the data source name is not found, a transaction could not be processed, a memory allocation error occurred during processing, etc.
- exception eventsourcing.persistence.IntegrityError[source]¶
Bases:
DatabaseError,RecordConflictErrorException raised when the relational integrity of the database is affected, e.g. a foreign key check fails.
- exception eventsourcing.persistence.InternalError[source]¶
Bases:
DatabaseErrorException raised when the database encounters an internal error, e.g. the cursor is not valid anymore, the transaction is out of sync, etc.
- exception eventsourcing.persistence.ProgrammingError[source]¶
Bases:
DatabaseErrorException raised for database programming errors, e.g. table not found or already exists, syntax error in the SQL statement, wrong number of parameters specified, etc.
- exception eventsourcing.persistence.NotSupportedError[source]¶
Bases:
DatabaseErrorException raised in case a method or database API was used which is not supported by the database, e.g. calling the rollback() method on a connection that does not support transaction or has transactions turned off.
- exception eventsourcing.persistence.WaitInterruptedError[source]¶
Bases:
PersistenceErrorRaised when waiting for a tracking record is interrupted.
- class eventsourcing.persistence.AggregateRecorder[source]¶
Bases:
Recorder,ABCAbstract base class for inserting and selecting stored events.
- abstract insert_events(stored_events: Sequence[StoredEvent], **kwargs: Any) Sequence[int] | None[source]¶
Writes stored events into database.
- abstract select_events(originator_id: UUID | str, *, gt: int | None = None, lte: int | None = None, desc: bool = False, limit: int | None = None) Sequence[StoredEvent][source]¶
Reads stored events from database.
- class eventsourcing.persistence.Notification(originator_id: UUID | str, originator_version: int, topic: str, state: bytes, id: int)[source]¶
Bases:
StoredEventFrozen dataclass that represents domain event notifications.
- id: int¶
Position in an application sequence.
- __delattr__(name)¶
Implement delattr(self, name).
- __eq__(other)¶
Return self==value.
- __hash__()¶
Return hash(self).
- __init__(originator_id: UUID | str, originator_version: int, topic: str, state: bytes, id: int) None¶
- __repr__()¶
Return repr(self).
- __setattr__(name, value)¶
Implement setattr(self, name, value).
- class eventsourcing.persistence.ApplicationRecorder[source]¶
Bases:
AggregateRecorderAbstract base class for recording events in both aggregate and application sequences.
- abstract select_notifications(start: int | None, limit: int, stop: int | None = None, topics: Sequence[str] = (), *, inclusive_of_start: bool = True) Sequence[Notification][source]¶
Returns a list of Notification objects representing events from an application sequence. If inclusive_of_start is True (the default), the returned Notification objects will have IDs greater than or equal to start and less than or equal to stop. If inclusive_of_start is False, the Notification objects will have IDs greater than start and less than or equal to stop.
- abstract max_notification_id() int | None[source]¶
Returns the largest notification ID in an application sequence, or None if no stored events have been recorded.
- abstract subscribe(gt: int | None = None, topics: Sequence[str] = ()) Subscription[ApplicationRecorder][source]¶
Returns an iterator of Notification objects representing events from an application sequence.
The iterator will block after the last recorded event has been yielded, but will then continue yielding newly recorded events when they are recorded.
Notifications will have IDs greater than the optional gt argument.
- class eventsourcing.persistence.TrackingRecorder[source]¶
Bases:
Recorder,ABCAbstract base class for recorders that record tracking objects atomically with other state.
- abstract max_tracking_id(application_name: str) int | None[source]¶
Returns the largest notification ID across all recorded tracking objects for the named application, or None if no tracking objects have been recorded.
- has_tracking_id(application_name: str, notification_id: int | None) bool[source]¶
Returns True if given notification_id is None or a tracking object with the given application_name and a notification ID greater than or equal to the given notification_id has been recorded.
- wait(application_name: str, notification_id: int | None, timeout: float = 1.0, interrupt: Event | None = None) None[source]¶
Block until a tracking object with the given application name and a notification ID greater than equal to the given value has been recorded.
Polls max_tracking_id() with exponential backoff until the timeout is reached, or until the optional interrupt event is set.
The timeout argument should be a floating point number specifying a timeout for the operation in seconds (or fractions thereof). The default is 1.0 seconds.
Raises TimeoutError if the timeout is reached.
Raises WaitInterruptError if the interrupt is set before timeout is reached.
- class eventsourcing.persistence.ProcessRecorder[source]¶
Bases:
TrackingRecorder,ApplicationRecorder,ABC
- class eventsourcing.persistence.Recording(domain_event: DomainEventProtocol[TAggregateID], notification: Notification)[source]¶
Bases:
Generic[TAggregateID]Represents the recording of a domain event.
- domain_event: DomainEventProtocol[TAggregateID]¶
The domain event that has been recorded.
- notification: Notification¶
A Notification that represents the domain event in the application sequence.
- __delattr__(name)¶
Implement delattr(self, name).
- __eq__(other)¶
Return self==value.
- __hash__()¶
Return hash(self).
- __init__(domain_event: DomainEventProtocol[TAggregateID], notification: Notification) None¶
- __repr__()¶
Return repr(self).
- __setattr__(name, value)¶
Implement setattr(self, name, value).
- class eventsourcing.persistence.EventStore(mapper: Mapper[TAggregateID], recorder: AggregateRecorder)[source]¶
Bases:
Generic[TAggregateID]Stores and retrieves domain events.
- __init__(mapper: Mapper[TAggregateID], recorder: AggregateRecorder)[source]¶
- put(domain_events: Sequence[DomainEventProtocol[TAggregateID]], **kwargs: Any) list[Recording[TAggregateID]][source]¶
Stores domain events in aggregate sequence.
- get(originator_id: TAggregateID, *, gt: int | None = None, lte: int | None = None, desc: bool = False, limit: int | None = None) Iterator[DomainEventProtocol[TAggregateID]][source]¶
Retrieves domain events from aggregate sequence.
- exception eventsourcing.persistence.InfrastructureFactoryError[source]¶
Bases:
EventSourcingErrorRaised when an infrastructure factory cannot be created.
- class eventsourcing.persistence.BaseInfrastructureFactory(env: Environment | Mapping[str, str] | None)[source]¶
Bases:
ABC,Generic[TTrackingRecorder]Abstract base class for infrastructure factories.
- __init__(env: Environment | Mapping[str, str] | None)[source]¶
Initialises infrastructure factory object with given application name.
- classmethod construct(env: Environment | None = None) Self[source]¶
Constructs concrete infrastructure factory for given named application. Reads and resolves persistence topic from environment variable ‘PERSISTENCE_MODULE’.
- transcoder() Transcoder[source]¶
Constructs a transcoder.
- cipher() Cipher | None[source]¶
Reads environment variables ‘CIPHER_TOPIC’ and ‘CIPHER_KEY’ to decide whether or not to construct a cipher.
- compressor() Compressor | None[source]¶
Reads environment variable ‘COMPRESSOR_TOPIC’ to decide whether or not to construct a compressor.
- class eventsourcing.persistence.InfrastructureFactory(env: Environment | Mapping[str, str] | None)[source]¶
Bases:
BaseInfrastructureFactory[TTrackingRecorder]Abstract base class for Application factories.
- mapper(transcoder: Transcoder | None = None, mapper_class: type[Mapper[TAggregateID]] | None = None) Mapper[TAggregateID][source]¶
Constructs a mapper.
- event_store(mapper: Mapper[TAggregateID] | None = None, recorder: AggregateRecorder | None = None) EventStore[TAggregateID][source]¶
Constructs an event store.
- abstract aggregate_recorder(purpose: str = 'events') AggregateRecorder[source]¶
Constructs an aggregate recorder.
- abstract application_recorder() ApplicationRecorder[source]¶
Constructs an application recorder.
- abstract tracking_recorder(tracking_recorder_class: type[TTrackingRecorder] | None = None) TTrackingRecorder[source]¶
Constructs a tracking recorder.
- abstract process_recorder() ProcessRecorder[source]¶
Constructs a process recorder.
- class eventsourcing.persistence.Tracking(application_name: str, notification_id: int)[source]¶
Bases:
objectFrozen dataclass representing the position of a domain event
Notificationin an application’s notification log.- __delattr__(name)¶
Implement delattr(self, name).
- __eq__(other)¶
Return self==value.
- __hash__()¶
Return hash(self).
- __init__(application_name: str, notification_id: int) None¶
- __repr__()¶
Return repr(self).
- __setattr__(name, value)¶
Implement setattr(self, name, value).
- class eventsourcing.persistence.Cursor[source]¶
Bases:
ABC
- class eventsourcing.persistence.Connection(max_age: float | None = None)[source]¶
Bases:
ABC,Generic[TCursor]
- exception eventsourcing.persistence.ConnectionPoolClosedError[source]¶
Bases:
EventSourcingErrorRaised when using a connection pool that is already closed.
- exception eventsourcing.persistence.ConnectionNotFromPoolError[source]¶
Bases:
EventSourcingErrorRaised when putting a connection in the wrong pool.
Bases:
OperationalError,TimeoutErrorRaised when a request to get a connection from a connection pool times out.
- class eventsourcing.persistence.ConnectionPool(*, pool_size: int = 5, max_overflow: int = 10, pool_timeout: float = 30.0, max_age: float | None = None, pre_ping: bool = False, mutually_exclusive_read_write: bool = False)[source]¶
Bases:
ABC,Generic[TConnection]- __init__(*, pool_size: int = 5, max_overflow: int = 10, pool_timeout: float = 30.0, max_age: float | None = None, pre_ping: bool = False, mutually_exclusive_read_write: bool = False) None[source]¶
Initialises a new connection pool.
The ‘pool_size’ argument specifies the maximum number of connections that will be put into the pool when connections are returned. The default value is 5
The ‘max_overflow’ argument specifies the additional number of connections that can be issued by the pool, above the ‘pool_size’. The default value is 10.
The ‘pool_timeout’ argument specifies the maximum time in seconds to keep requests for connections waiting. Connections are kept waiting if the number of connections currently in use is not less than the sum of ‘pool_size’ and ‘max_overflow’. The default value is 30.0
The ‘max_age’ argument specifies the time in seconds until a connection will automatically be closed. Connections are only closed in this way after are not in use. Connections that are in use will not be closed automatically. The default value in None, meaning connections will not be automatically closed in this way.
The ‘mutually_exclusive_read_write’ argument specifies whether requests for connections for writing whilst connections for reading are in use. It also specifies whether requests for connections for reading will be kept waiting whilst a connection for writing is in use. The default value is false, meaning reading and writing will not be mutually exclusive in this way.
- property num_in_use: int¶
Indicates the total number of connections currently in use.
- property num_in_pool: int¶
Indicates the number of connections currently in the pool.
- get_connection(timeout: float | None = None, *, is_writer: bool | None = None) TConnection[source]¶
Issues connections, or raises ConnectionPoolExhausted error. Provides “fairness” on attempts to get connections, meaning that connections are issued in the same order as they are requested.
The ‘timeout’ argument overrides the timeout specified by the constructor argument ‘pool_timeout’. The default value is None, meaning the ‘pool_timeout’ argument will not be overridden.
The optional ‘is_writer’ argument can be used to request a connection for writing (true), and request a connection for reading (false). If the value of this argument is None, which is the default, the writing and reading interlocking mechanism is not activated. Only one connection for writing will be issued, which means requests for connections for writing are kept waiting whilst another connection for writing is in use.
If reading and writing are mutually exclusive, requsts for connections for writing are kept waiting whilst connections for reading are in use, and requests for connections for reading are kept waiting whilst a connection for writing is in use.
- put_connection(conn: TConnection) None[source]¶
Returns connections to the pool, or closes connection if the pool is full.
Unlocks write lock after writer has returned, and updates count of readers when readers are returned.
Notifies waiters when connections have been returned, and when there are no longer any readers.
- class eventsourcing.persistence.Subscription(recorder: TApplicationRecorder_co, gt: int | None = None, topics: Sequence[str] = ())[source]¶
Bases:
Iterator[Notification],Generic[TApplicationRecorder_co]- __init__(recorder: TApplicationRecorder_co, gt: int | None = None, topics: Sequence[str] = ()) None[source]¶
- abstract __next__() Notification[source]¶
Returns the next Notification object in the application sequence.
- class eventsourcing.persistence.ListenNotifySubscription(recorder: TApplicationRecorder_co, gt: int | None = None, topics: Sequence[str] = ())[source]¶
Bases:
Subscription[TApplicationRecorder_co]- __init__(recorder: TApplicationRecorder_co, gt: int | None = None, topics: Sequence[str] = ()) None[source]¶
- __next__() Notification[source]¶
Returns the next Notification object in the application sequence.
- class eventsourcing.popo.POPOAggregateRecorder[source]¶
Bases:
POPORecorder,AggregateRecorder- insert_events(stored_events: Sequence[StoredEvent], **kwargs: Any) Sequence[int] | None[source]¶
Writes stored events into database.
- select_events(originator_id: UUID | str, *, gt: int | None = None, lte: int | None = None, desc: bool = False, limit: int | None = None) Sequence[StoredEvent][source]¶
Reads stored events from database.
- class eventsourcing.popo.POPOApplicationRecorder[source]¶
Bases:
POPOAggregateRecorder,ApplicationRecorder- insert_events(stored_events: Sequence[StoredEvent], **kwargs: Any) Sequence[int] | None[source]¶
Writes stored events into database.
- select_notifications(start: int | None, limit: int, stop: int | None = None, topics: Sequence[str] = (), *, inclusive_of_start: bool = True) Sequence[Notification][source]¶
Returns a list of Notification objects representing events from an application sequence. If inclusive_of_start is True (the default), the returned Notification objects will have IDs greater than or equal to start and less than or equal to stop. If inclusive_of_start is False, the Notification objects will have IDs greater than start and less than or equal to stop.
- max_notification_id() int | None[source]¶
Returns the largest notification ID in an application sequence, or None if no stored events have been recorded.
- subscribe(gt: int | None = None, topics: Sequence[str] = ()) Subscription[ApplicationRecorder][source]¶
Returns an iterator of Notification objects representing events from an application sequence.
The iterator will block after the last recorded event has been yielded, but will then continue yielding newly recorded events when they are recorded.
Notifications will have IDs greater than the optional gt argument.
- class eventsourcing.popo.POPOSubscription(recorder: POPOApplicationRecorder, gt: int | None = None, topics: Sequence[str] = ())[source]¶
Bases:
ListenNotifySubscription[POPOApplicationRecorder]- __init__(recorder: POPOApplicationRecorder, gt: int | None = None, topics: Sequence[str] = ()) None[source]¶
- class eventsourcing.popo.POPOTrackingRecorder[source]¶
Bases:
POPORecorder,TrackingRecorder
- class eventsourcing.popo.POPOProcessRecorder[source]¶
Bases:
POPOTrackingRecorder,POPOApplicationRecorder,ProcessRecorder
- class eventsourcing.popo.POPOFactory(env: Environment | Mapping[str, str] | None)[source]¶
Bases:
InfrastructureFactory[POPOTrackingRecorder]- aggregate_recorder(purpose: str = 'events') AggregateRecorder[source]¶
Constructs an aggregate recorder.
- application_recorder() ApplicationRecorder[source]¶
Constructs an application recorder.
- tracking_recorder(tracking_recorder_class: type[POPOTrackingRecorder] | None = None) POPOTrackingRecorder[source]¶
Constructs a tracking recorder.
- process_recorder() ProcessRecorder[source]¶
Constructs a process recorder.
- eventsourcing.popo.Factory¶
alias of
POPOFactory
- class eventsourcing.sqlite.SQLiteConnection(sqlite_conn: Connection, max_age: float | None)[source]¶
Bases:
Connection[SQLiteCursor]- cursor() SQLiteCursor[source]¶
Creates new cursor.
- class eventsourcing.sqlite.SQLiteConnectionPool(*, db_name: str, lock_timeout: int | None = None, pool_size: int = 5, max_overflow: int = 10, pool_timeout: float = 5.0, max_age: float | None = None, pre_ping: bool = False)[source]¶
Bases:
ConnectionPool[SQLiteConnection]- __init__(*, db_name: str, lock_timeout: int | None = None, pool_size: int = 5, max_overflow: int = 10, pool_timeout: float = 5.0, max_age: float | None = None, pre_ping: bool = False)[source]¶
Initialises a new connection pool.
The ‘pool_size’ argument specifies the maximum number of connections that will be put into the pool when connections are returned. The default value is 5
The ‘max_overflow’ argument specifies the additional number of connections that can be issued by the pool, above the ‘pool_size’. The default value is 10.
The ‘pool_timeout’ argument specifies the maximum time in seconds to keep requests for connections waiting. Connections are kept waiting if the number of connections currently in use is not less than the sum of ‘pool_size’ and ‘max_overflow’. The default value is 30.0
The ‘max_age’ argument specifies the time in seconds until a connection will automatically be closed. Connections are only closed in this way after are not in use. Connections that are in use will not be closed automatically. The default value in None, meaning connections will not be automatically closed in this way.
The ‘mutually_exclusive_read_write’ argument specifies whether requests for connections for writing whilst connections for reading are in use. It also specifies whether requests for connections for reading will be kept waiting whilst a connection for writing is in use. The default value is false, meaning reading and writing will not be mutually exclusive in this way.
- class eventsourcing.sqlite.SQLiteAggregateRecorder(datastore: SQLiteDatastore, events_table_name: str = 'stored_events')[source]¶
Bases:
SQLiteRecorder,AggregateRecorder- insert_events(stored_events: Sequence[StoredEvent], **kwargs: Any) Sequence[int] | None[source]¶
Writes stored events into database.
- select_events(originator_id: UUID | str, *, gt: int | None = None, lte: int | None = None, desc: bool = False, limit: int | None = None) Sequence[StoredEvent][source]¶
Reads stored events from database.
- class eventsourcing.sqlite.SQLiteApplicationRecorder(datastore: SQLiteDatastore, events_table_name: str = 'stored_events')[source]¶
Bases:
SQLiteAggregateRecorder,ApplicationRecorder- select_notifications(start: int | None, limit: int, stop: int | None = None, topics: Sequence[str] = (), *, inclusive_of_start: bool = True) Sequence[Notification][source]¶
Returns a list of event notifications from ‘start’, limited by ‘limit’.
- subscribe(gt: int | None = None, topics: Sequence[str] = ()) Subscription[ApplicationRecorder][source]¶
This method is not implemented on this class.
- class eventsourcing.sqlite.SQLiteTrackingRecorder(datastore: SQLiteDatastore, **kwargs: Any)[source]¶
Bases:
SQLiteRecorder,TrackingRecorder
- class eventsourcing.sqlite.SQLiteProcessRecorder(datastore: SQLiteDatastore, *, events_table_name: str = 'stored_events')[source]¶
Bases:
SQLiteTrackingRecorder,SQLiteApplicationRecorder,ProcessRecorder
- class eventsourcing.sqlite.SQLiteFactory(env: Environment | Mapping[str, str] | None)[source]¶
Bases:
InfrastructureFactory[SQLiteTrackingRecorder]- aggregate_recorder_class¶
alias of
SQLiteAggregateRecorder
- application_recorder_class¶
alias of
SQLiteApplicationRecorder
- tracking_recorder_class¶
alias of
SQLiteTrackingRecorder
- process_recorder_class¶
alias of
SQLiteProcessRecorder
- __init__(env: Environment | Mapping[str, str] | None)[source]¶
Initialises infrastructure factory object with given application name.
- aggregate_recorder(purpose: str = 'events') AggregateRecorder[source]¶
Constructs an aggregate recorder.
- application_recorder() ApplicationRecorder[source]¶
Constructs an application recorder.
- tracking_recorder(tracking_recorder_class: type[SQLiteTrackingRecorder] | None = None) SQLiteTrackingRecorder[source]¶
Constructs a tracking recorder.
- process_recorder() ProcessRecorder[source]¶
Constructs a process recorder.
- eventsourcing.sqlite.Factory¶
alias of
SQLiteFactory
- class eventsourcing.postgres.PgStoredEvent(originator_id, originator_version, topic, state)[source]¶
Bases:
NamedTuple- originator_id: UUID | str¶
Alias for field number 0
- originator_version: int¶
Alias for field number 1
- topic: str¶
Alias for field number 2
- state: bytes¶
Alias for field number 3
- class eventsourcing.postgres.ConnectionPool(conninfo: ConninfoParam = '', *, connection_class: type[CT] = <class 'psycopg.Connection'>, kwargs: KwargsParam | None = None, min_size: int = 4, max_size: int | None = None, open: bool | None = None, configure: ConnectionCB[CT] | None = None, check: ConnectionCB[CT] | None = None, reset: ConnectionCB[CT] | None = None, name: str | None = None, close_returns: bool = False, timeout: float = 30.0, max_waiting: int = 0, max_lifetime: float = 3600.0, max_idle: float = 600.0, reconnect_timeout: float = 300.0, reconnect_failed: ConnectFailedCB | None = None, num_workers: int = 3, get_password_func: Callable[[], str] | None = None)[source]¶
Bases:
ConnectionPool[CT],Generic[CT]- __init__(conninfo: ConninfoParam = '', *, connection_class: type[CT] = <class 'psycopg.Connection'>, kwargs: KwargsParam | None = None, min_size: int = 4, max_size: int | None = None, open: bool | None = None, configure: ConnectionCB[CT] | None = None, check: ConnectionCB[CT] | None = None, reset: ConnectionCB[CT] | None = None, name: str | None = None, close_returns: bool = False, timeout: float = 30.0, max_waiting: int = 0, max_lifetime: float = 3600.0, max_idle: float = 600.0, reconnect_timeout: float = 300.0, reconnect_failed: ConnectFailedCB | None = None, num_workers: int = 3, get_password_func: Callable[[], str] | None = None) None[source]¶
- class eventsourcing.postgres.PostgresDatastore(dbname: str, host: str, port: str | int, user: str, password: str, *, connect_timeout: float = 5.0, idle_in_transaction_session_timeout: float = 0, pool_size: int = 1, max_overflow: int = 0, max_waiting: int = 0, conn_max_age: float = 3600.0, pre_ping: bool = False, lock_timeout: int = 0, schema: str = '', pool_open_timeout: float | None = None, get_password_func: Callable[[], str] | None = None, single_row_tracking: bool = True, originator_id_type: Literal['uuid', 'text'] = 'uuid', enable_db_functions: bool = False)[source]¶
Bases:
object- __init__(dbname: str, host: str, port: str | int, user: str, password: str, *, connect_timeout: float = 5.0, idle_in_transaction_session_timeout: float = 0, pool_size: int = 1, max_overflow: int = 0, max_waiting: int = 0, conn_max_age: float = 3600.0, pre_ping: bool = False, lock_timeout: int = 0, schema: str = '', pool_open_timeout: float | None = None, get_password_func: Callable[[], str] | None = None, single_row_tracking: bool = True, originator_id_type: Literal['uuid', 'text'] = 'uuid', enable_db_functions: bool = False)[source]¶
- after_connect_func() Callable[[Connection[Any]], None][source]¶
- get_connection() Iterator[Connection[DictRow]][source]¶
- class eventsourcing.postgres.PostgresRecorder(datastore: PostgresDatastore)[source]¶
Bases:
objectBase class for recorders that use PostgreSQL.
- MAX_IDENTIFIER_LEN = 63¶
- __init__(datastore: PostgresDatastore)[source]¶
- class eventsourcing.postgres.PostgresAggregateRecorder(datastore: PostgresDatastore, *, events_table_name: str = 'stored_events')[source]¶
Bases:
PostgresRecorder,AggregateRecorder- __init__(datastore: PostgresDatastore, *, events_table_name: str = 'stored_events')[source]¶
- construct_pg_stored_event(originator_id: UUID | str, originator_version: int, topic: str, state: bytes) PgStoredEvent[source]¶
- insert_events(stored_events: Sequence[StoredEvent], **kwargs: Any) Sequence[int] | None[source]¶
Writes stored events into database.
- select_events(originator_id: UUID | str, *, gt: int | None = None, lte: int | None = None, desc: bool = False, limit: int | None = None) Sequence[StoredEvent][source]¶
Reads stored events from database.
- sql_create_statements: list[Composed]¶
- class eventsourcing.postgres.PostgresApplicationRecorder(datastore: PostgresDatastore, *, events_table_name: str = 'stored_events')[source]¶
Bases:
PostgresAggregateRecorder,ApplicationRecorder- __init__(datastore: PostgresDatastore, *, events_table_name: str = 'stored_events')[source]¶
- insert_events(stored_events: Sequence[StoredEvent], **kwargs: Any) Sequence[int] | None[source]¶
Writes stored events into database.
- select_notifications(start: int | None, limit: int, stop: int | None = None, topics: Sequence[str] = (), *, inclusive_of_start: bool = True) Sequence[Notification][source]¶
Returns a list of event notifications from ‘start’, limited by ‘limit’.
- subscribe(gt: int | None = None, topics: Sequence[str] = ()) Subscription[ApplicationRecorder][source]¶
Returns an iterator of Notification objects representing events from an application sequence.
The iterator will block after the last recorded event has been yielded, but will then continue yielding newly recorded events when they are recorded.
Notifications will have IDs greater than the optional gt argument.
- sql_create_statements: list[Composed]¶
- class eventsourcing.postgres.PostgresSubscription(recorder: PostgresApplicationRecorder, gt: int | None = None, topics: Sequence[str] = ())[source]¶
Bases:
ListenNotifySubscription[PostgresApplicationRecorder]- __init__(recorder: PostgresApplicationRecorder, gt: int | None = None, topics: Sequence[str] = ()) None[source]¶
- class eventsourcing.postgres.PostgresTrackingRecorder(datastore: PostgresDatastore, *, tracking_table_name: str = 'notification_tracking', **kwargs: Any)[source]¶
Bases:
PostgresRecorder,TrackingRecorder- __init__(datastore: PostgresDatastore, *, tracking_table_name: str = 'notification_tracking', **kwargs: Any)[source]¶
- max_tracking_id(application_name: str) int | None[source]¶
Returns the largest notification ID across all recorded tracking objects for the named application, or None if no tracking objects have been recorded.
- has_tracking_id(application_name: str, notification_id: int | None) bool[source]¶
Returns True if given notification_id is None or a tracking object with the given application_name and a notification ID greater than or equal to the given notification_id has been recorded.
- sql_create_statements: list[Composed]¶
- class eventsourcing.postgres.PostgresProcessRecorder(datastore: PostgresDatastore, *, events_table_name: str = 'stored_events', tracking_table_name: str = 'notification_tracking')[source]¶
Bases:
PostgresTrackingRecorder,PostgresApplicationRecorder,ProcessRecorder- __init__(datastore: PostgresDatastore, *, events_table_name: str = 'stored_events', tracking_table_name: str = 'notification_tracking')[source]¶
- sql_create_statements: list[Composed]¶
- tracking_table_exists: bool¶
- tracking_migration_previous: int | None¶
- tracking_migration_current: int | None¶
- has_checked_for_multi_row_tracking_table: bool¶
- class eventsourcing.postgres.BasePostgresFactory(env: Environment | Mapping[str, str] | None)[source]¶
Bases:
BaseInfrastructureFactory[TTrackingRecorder]- POSTGRES_DBNAME = 'POSTGRES_DBNAME'¶
- POSTGRES_HOST = 'POSTGRES_HOST'¶
- POSTGRES_PORT = 'POSTGRES_PORT'¶
- POSTGRES_USER = 'POSTGRES_USER'¶
- POSTGRES_PASSWORD = 'POSTGRES_PASSWORD'¶
- POSTGRES_GET_PASSWORD_TOPIC = 'POSTGRES_GET_PASSWORD_TOPIC'¶
- POSTGRES_CONNECT_TIMEOUT = 'POSTGRES_CONNECT_TIMEOUT'¶
- POSTGRES_CONN_MAX_AGE = 'POSTGRES_CONN_MAX_AGE'¶
- POSTGRES_PRE_PING = 'POSTGRES_PRE_PING'¶
- POSTGRES_MAX_WAITING = 'POSTGRES_MAX_WAITING'¶
- POSTGRES_LOCK_TIMEOUT = 'POSTGRES_LOCK_TIMEOUT'¶
- POSTGRES_POOL_SIZE = 'POSTGRES_POOL_SIZE'¶
- POSTGRES_MAX_OVERFLOW = 'POSTGRES_MAX_OVERFLOW'¶
- POSTGRES_IDLE_IN_TRANSACTION_SESSION_TIMEOUT = 'POSTGRES_IDLE_IN_TRANSACTION_SESSION_TIMEOUT'¶
- POSTGRES_SCHEMA = 'POSTGRES_SCHEMA'¶
- POSTGRES_SINGLE_ROW_TRACKING = 'SINGLE_ROW_TRACKING'¶
- ORIGINATOR_ID_TYPE = 'ORIGINATOR_ID_TYPE'¶
- POSTGRES_ENABLE_DB_FUNCTIONS = 'POSTGRES_ENABLE_DB_FUNCTIONS'¶
- CREATE_TABLE = 'CREATE_TABLE'¶
- __init__(env: Environment | Mapping[str, str] | None)[source]¶
Initialises infrastructure factory object with given application name.
- class eventsourcing.postgres.PostgresFactory(env: Environment | Mapping[str, str] | None)[source]¶
Bases:
BasePostgresFactory[PostgresTrackingRecorder],InfrastructureFactory[PostgresTrackingRecorder]- aggregate_recorder_class¶
alias of
PostgresAggregateRecorder
- application_recorder_class¶
alias of
PostgresApplicationRecorder
- tracking_recorder_class¶
alias of
PostgresTrackingRecorder
- process_recorder_class¶
alias of
PostgresProcessRecorder
- aggregate_recorder(purpose: str = 'events') AggregateRecorder[source]¶
Constructs an aggregate recorder.
- application_recorder() ApplicationRecorder[source]¶
Constructs an application recorder.
- tracking_recorder(tracking_recorder_class: type[TPostgresTrackingRecorder] | None = None) TPostgresTrackingRecorder[source]¶
Constructs a tracking recorder.
- process_recorder() ProcessRecorder[source]¶
Constructs a process recorder.
- eventsourcing.postgres.Factory¶
alias of
PostgresFactory