persistence
— Infrastructure¶
This module provides a cohesive mechanism for storing domain events.
The entire mechanism is encapsulated by the library’s event store object class. An event store stores and retrieves domain events. The event store uses a mapper to convert domain events to stored events, and it uses a recorder to insert stored events in a datastore.
A mapper converts domain event objects of various types to
stored event objects when domain events are stored in the event
store. It also converts stored events objects back to domain
event objects when domain events are retrieved from the event
store. A mapper uses an extensible transcoder that can be set up
with additional transcoding objects that serialise and deserialise
particular types of object, such as Python’s UUID
,
datetime
and Decimal
objects.
A mapper may use a compressor to compress and decompress the state
of stored event objects, and may use a cipher to encode and decode
the state of stored event objects. If both a compressor and a cipher
are being used by a mapper, the state of any stored event objects will
be compressed and then encoded when storing domain events, and will be
decoded and then decompressed when retrieving domain events.
A recorder inserts stored event objects in a datastore when domain events are stored in an event store, and selects stored events from a datastore when domain events are retrieved from an event store. Depending on the type of the recorder it may be possible to select the stored events as event notifications, and it may be possible atomically to record tracking records along with the stored events,
Transcoder¶
A transcoder is used by a mapper to serialise and deserialise the state of domain model event objects.
The library’s JSONTranscoder
class
can be constructed without any arguments.
from eventsourcing.persistence import JSONTranscoder
transcoder = JSONTranscoder()
The transcoder
object has methods encode()
and decode()
which are used to perform the
serialisation and deserialisation. The serialised state is a Python bytes
object.
data = transcoder.encode({"a": 1})
copy = transcoder.decode(data)
assert copy == {"a": 1}
The library’s JSONTranscoder
uses the Python
json
module. And so, by default, only the basic object types supported by that
module can be encoded and decoded. The transcoder can be extended by registering
transcodings for the other types of object used in your domain model’s event objects.
A transcoding will convert other types of object to a representation of the non-basic
type of object that uses the basic types that are supported. The transcoder method
register()
is used to register
individual transcodings with the transcoder.
Transcodings¶
In order to encode and decode non-basic types of object that are not supported by
the transcoder by default, custom transcodings need to be defined in code and
registered with the transcoder using the transcoder object’s
register()
method. A transcoding
will encode an instance of a non-basic type of object that cannot by default be
encoded by the transcoder into a basic type of object that can be encoded by the
transcoder, and will decode that representation into the original type of object.
This makes it possible to transcode custom value objects, including custom types
that contain custom types. The transcoder works recursively through the object
and so included custom types do not need to be encoded by the transcoder, but
will be converted subsequently.
The library includes a limited collection of custom transcoding objects. For
example, the library’s UUIDAsHex
class
transcodes a Python UUID
objects as a hexadecimal string.
from uuid import uuid4
from eventsourcing.persistence import UUIDAsHex
transcoding = UUIDAsHex()
id1 = uuid4()
data = transcoding.encode(id1)
copy = transcoding.decode(data)
assert copy == id1
The library’s DatetimeAsISO
class
transcodes Python datetime
objects as ISO strings.
from datetime import datetime
from eventsourcing.persistence import (
DatetimeAsISO,
)
transcoding = DatetimeAsISO()
datetime1 = datetime(2021, 12, 31, 23, 59, 59)
data = transcoding.encode(datetime1)
copy = transcoding.decode(data)
assert copy == datetime1
The library’s DecimalAsStr
class
transcodes Python Decimal
objects as decimal strings.
from decimal import Decimal
from eventsourcing.persistence import (
DecimalAsStr,
)
transcoding = DecimalAsStr()
decimal1 = Decimal("1.2345")
data = transcoding.encode(decimal1)
copy = transcoding.decode(data)
assert copy == decimal1
Transcodings are registered with the transcoder using the transcoder object’s
register()
method.
transcoder.register(UUIDAsHex())
transcoder.register(DatetimeAsISO())
transcoder.register(DecimalAsStr())
data = transcoder.encode(id1)
copy = transcoder.decode(data)
assert copy == id1
data = transcoder.encode(datetime1)
copy = transcoder.decode(data)
assert copy == datetime1
data = transcoder.encode(decimal1)
copy = transcoder.decode(data)
assert copy == decimal1
Attempting to serialize an unsupported type will result in a Python TypeError
.
from datetime import date
date1 = date(2021, 12, 31)
try:
data = transcoder.encode(date1)
except TypeError as e:
assert e.args[0] == (
"Object of type <class 'datetime.date'> is not serializable. "
"Please register a custom transcoding for this type."
)
else:
raise AssertionError("TypeError not raised")
Attempting to deserialize an unsupported type will also result in a Python TypeError
.
try:
JSONTranscoder().decode(data)
except TypeError as e:
assert e.args[0] == (
"Data serialized with name 'decimal_str' is not deserializable. "
"Please register a custom transcoding for this type."
)
else:
raise AssertionError("TypeError not raised")
The library’s abstract base class Transcoding
can be subclassed to define custom transcodings for other object types. To define
a custom transcoding, simply subclass this base class, assign to the class attribute
type
the class transcoded type, and assign a string to the name
.
Then define a encode()
that converts
an instance of that type to a representation that uses a basic type, and a
decode()
method that will convert
that representation back to an instance of that type.
from eventsourcing.persistence import Transcoding
from typing import Union
class DateAsISO(Transcoding):
type = date
name = "date_iso"
def encode(self, obj: date) -> str:
return obj.isoformat()
def decode(self, data: str) -> date:
return date.fromisoformat(data)
transcoder.register(DateAsISO())
data = transcoder.encode(date1)
copy = transcoder.decode(data)
assert copy == date1
Please note, due to the way the Python json
module works, it isn’t
currently possible to transcode subclasses of the basic Python types that
are supported by default, such as dict
, list
, tuple
,
str
, int
, float
, and bool
. This behaviour
also means an encoded tuple
will be decoded as a list
.
This behaviour is coded in Python as C code, and can’t be suspended without
avoiding the use of this C code and thereby incurring a performance penalty
in the transcoding of domain event objects.
data = transcoder.encode((1, 2, 3))
copy = transcoder.decode(data)
assert isinstance(copy, list)
assert copy == [1, 2, 3]
Custom or non-basic types that contain other custom or non-basic types can be
supported in the transcoder by registering a transcoding for each non-basic type.
The transcoding for the type which contains non-basic types must return an object
that represents that type by involving the included non-basic objects, and this
representation will be subsequently transcoded by the transcoder using the applicable
transcoding for the included non-basic types. In the example below, SimpleCustomValue
has a UUID
and a date
as its id
and data
attributes.
The transcoding for SimpleCustomValue
returns a Python dict
that includes
the non-basic UUID
and date
objects. The class ComplexCustomValue
simply has a ComplexCustomValue
object as its value
attribute, and its
transcoding simply returns that object.
from uuid import UUID
class SimpleCustomValue:
def __init__(self, id: UUID, date: date):
self.id = id
self.date = date
def __eq__(self, other):
return (
isinstance(other, SimpleCustomValue) and
self.id == other.id and self.date == other.date
)
class ComplexCustomValue:
def __init__(self, value: SimpleCustomValue):
self.value = value
def __eq__(self, other):
return (
isinstance(other, ComplexCustomValue) and
self.value == other.value
)
class SimpleCustomValueAsDict(Transcoding):
type = SimpleCustomValue
name = "simple_custom_value"
def encode(self, obj: SimpleCustomValue) -> dict:
return {"id": obj.id, "date": obj.date}
def decode(self, data: dict) -> SimpleCustomValue:
assert isinstance(data, dict)
return SimpleCustomValue(**data)
class ComplexCustomValueAsDict(Transcoding):
type = ComplexCustomValue
name = "complex_custom_value"
def encode(self, obj: ComplexCustomValue) -> SimpleCustomValue:
return obj.value
def decode(self, data: SimpleCustomValue) -> ComplexCustomValue:
assert isinstance(data, SimpleCustomValue)
return ComplexCustomValue(data)
The custom value object transcodings can be registered with the transcoder.
transcoder.register(SimpleCustomValueAsDict())
transcoder.register(ComplexCustomValueAsDict())
We can now transcode an instance of ComplexCustomValueAsDict
.
obj1 = ComplexCustomValue(
SimpleCustomValue(
id=UUID("b2723fe2c01a40d2875ea3aac6a09ff5"),
date=date(2000, 2, 20)
)
)
data = transcoder.encode(obj1)
copy = transcoder.decode(data)
assert copy == obj1
As you can see from the bytes representation below, the transcoder puts the return value
of each transcoding’s encode()
method in a Python dict
that has two values
_data_
and _type_
. The _data_
value is the return value of the
transcoding’s encode()
method, and the _type_
value is the name of the
transcoding. For this reason, it is necessary to avoid defining model objects to have a
Python dict
that has only two attributes _data_
and _type_
, and
avoid defining transcodings that return such a thing.
expected_data = (
b'{"_type_": "complex_custom_value", "_data_": {"_type_": '
b'"simple_custom_value", "_data_": {"id": {"_type_": '
b'"uuid_hex", "_data_": "b2723fe2c01a40d2875ea3aac6a09ff5"},'
b' "date": {"_type_": "date_iso", "_data_": "2000-02-20"}'
b'}}}'
)
assert data == expected_data
Stored event objects¶
A stored event object is a common object type that can be used to represent domain event objects of different types. By using a common object for the representation of different types of domain events objects, the domain event objects can be stored and retrieved in a standard way.
The library’s StoredEvent
class
is a Python frozen dataclass that can be used to hold information
about a domain event object between it being serialised and being
recorded in a datastore, and between it be retrieved from a datastore
from an aggregate sequence and being deserialised as a domain event object.
from uuid import uuid4
from eventsourcing.persistence import StoredEvent
stored_event = StoredEvent(
originator_id=uuid4(),
originator_version=1,
state="{}",
topic="eventsourcing.model:DomainEvent",
)
Mapper¶
A mapper maps between domain event objects and stored event objects. It brings together a transcoder, and optionally a cipher and a compressor. It is used by an event store.
The library’s Mapper
class
must be constructed with a transcoder object.
from eventsourcing.persistence import Mapper
mapper = Mapper(transcoder=transcoder)
The from_domain_event()
method of the
mapper
object converts DomainEvent
objects to
StoredEvent
objects.
from eventsourcing.domain import DomainEvent, TZINFO
domain_event1 = DomainEvent(
originator_id = id1,
originator_version = 1,
timestamp = datetime.now(tz=TZINFO),
)
stored_event1 = mapper.from_domain_event(domain_event1)
assert isinstance(stored_event1, StoredEvent)
The to_domain_event()
method of the
mapper
object converts StoredEvent
objects to
DomainEvent
objects.
assert mapper.to_domain_event(stored_event1) == domain_event1
Encryption¶
Using a cryptographic cipher with your mapper will make the state of your application encrypted “at rest” and “on the wire”.
Without encryption, the state of the domain event will be visible in the
recorded stored events in your database. For example, the timestamp
of the domain event in the example above (domain_event1
) is visible
in the stored event (stored_event1
).
assert domain_event1.timestamp.isoformat() in str(stored_event1.state)
The library’s AESCipher
class can
be used to cryptographically encode and decode the state of stored
events. It must be constructed with a cipher key. The class method
create_key()
can be used to
generate a cipher key. The AES cipher key must be either 16, 24, or
32 bytes long. Please note, the same cipher key must be used to
decrypt stored events as that which was used to encrypt stored events.
from eventsourcing.cipher import AESCipher
key = AESCipher.create_key(num_bytes=32) # 16, 24, or 32
cipher = AESCipher(cipher_key=key)
mapper = Mapper(
transcoder=transcoder,
cipher=cipher,
)
stored_event1 = mapper.from_domain_event(domain_event1)
assert isinstance(stored_event1, StoredEvent)
assert mapper.to_domain_event(stored_event1) == domain_event1
With encryption, the state of the domain event will not be visible in the stored event. This feature can be used to implement “application-level encryption” in an event-sourced application.
assert domain_event1.timestamp.isoformat() not in str(stored_event1.state)
The library’s AESCipher
class uses the
AES cipher
from the PyCryptodome library
in GCM mode.
AES is a very fast and secure symmetric block cipher, and is the de facto
standard for symmetric encryption. Galois/Counter Mode (GCM) is a mode of
operation for symmetric block ciphers that is designed to provide both data
authenticity and confidentiality, and is widely adopted for its performance.
The mapper expects an instance of the abstract base class Cipher
,
and AESCipher
implements this abstract base class,
so if you want to use another cipher strategy simply implement the base class.
Compression¶
A compressor can be used to reduce the size of stored events.
The library’s ZlibCompressor
class
can be used to compress and decompress the state of stored events. The
size of the state of a compressed and encrypted stored event will be
less than or equal to the size of the state of a stored event that is
encrypted but not compressed.
from eventsourcing.compressor import ZlibCompressor
compressor = ZlibCompressor()
mapper = Mapper(
transcoder=transcoder,
cipher=cipher,
compressor=compressor,
)
stored_event2 = mapper.from_domain_event(domain_event1)
assert mapper.to_domain_event(stored_event2) == domain_event1
assert len(stored_event2.state) <= len(stored_event1.state)
The library’s ZlibCompressor
class
uses Python’s zlib
module.
The mapper expects an instance of the abstract base class
Compressor
, and
ZlibCompressor
implements this
abstract base class, so if you want to use another compression
strategy simply implement the base class.
Notification objects¶
Event notifications are used to propagate the state of an event sourced application in a reliable way. The stored events can be positioned in a “total order” by giving each a new domain event a notification ID that is higher that any previously recorded event. By recording the domain events atomically with their notification IDs, there will never be a domain event that is not available to be passed as a message across a network, and there will never be a message passed across a network that doesn’t correspond to a recorded event. This solves the “dual writing” problem that occurs when separately a domain model is updated and then a message is put on a message queue.
The library’s Notification
class
is a Python frozen dataclass that can be used to hold information
about a domain event object when being transmitted as an item in a
section of a notification log.
It will be returned when selecting event notifications from a
recorder, and presented in an application by a
notification log.
from uuid import uuid4
from eventsourcing.persistence import Notification
stored_event = Notification(
id=123,
originator_id=uuid4(),
originator_version=1,
state="{}",
topic="eventsourcing.model:DomainEvent",
)
Tracking objects¶
A tracking object can be used to encapsulate the position of an event notification in an upstream application’s notification log. A tracking object can be passed into a process recorder along with new stored event objects, and recorded atomically with those objects. By ensuring the uniqueness of recorded tracking objects, we can ensure that a domain event notification is never processed twice. By recording the position of the last event notification that has been processed, we can ensure to resume processing event notifications at the correct position. This constructs “exactly once” semantics when processing event notifications, by solving the “dual writing” problem that occurs when separately an event notification is consumed from a message queue with updates made to materialized view, and then an acknowledgement is sent back to the message queue.
The library’s Tracking
class
is a Python frozen dataclass that can be used to hold the notification
ID of a notification that has been processed.
from uuid import uuid4
from eventsourcing.persistence import Tracking
tracking = Tracking(
notification_id=123,
application_name="bounded_context1",
)
Recorder¶
A recorder adapts a database management system for the purpose of recording stored events. It is used by an event store.
The library’s Recorder
class
is an abstract base for concrete recorder classes that will insert
stored event objects in a particular datastore.
There are three flavours of recorder: “aggregate recorders” are the simplest and simply store domain events in aggregate sequences; “application recorders” extend aggregate recorders by storing domain events with a total order; “process recorders” extend application recorders by supporting the recording of domain events atomically with “tracking” objects that record the position in a total ordering of domain events that is being processed. The “aggregate recorder” can be used for storing snapshots.
The library includes in its sqlite
module
recorder classes for SQLite that use the Python sqlite3
module, and in its postgres
module recorders for
PostgreSQL that use the third party psycopg2
module.
Recorder classes are conveniently constructed by using an infrastructure factory. For illustrative purposes, the direct use of the library’s SQLite recorders is shown below. The other persistence modules follow a similar naming scheme and pattern of use.
from eventsourcing.sqlite import SQLiteAggregateRecorder
from eventsourcing.sqlite import SQLiteApplicationRecorder
from eventsourcing.sqlite import SQLiteProcessRecorder
from eventsourcing.sqlite import SQLiteDatastore
datastore = SQLiteDatastore(db_name=":memory:")
aggregate_recorder = SQLiteAggregateRecorder(datastore, "snapshots")
aggregate_recorder.create_table()
application_recorder = SQLiteApplicationRecorder(datastore)
application_recorder.create_table()
datastore = SQLiteDatastore(db_name=":memory:")
process_recorder = SQLiteProcessRecorder(datastore)
process_recorder.create_table()
The library also includes in the popo
module recorders
that use “plain old Python objects”, which simply keep stored events in a
data structure in memory, and provides the fastest alternative for rapid
development of event sourced applications (~4x faster than using SQLite, and
~20x faster than using PostgreSQL).
Recorders compatible with this version of the library for popular ORMs such as SQLAlchemy and Django, specialist event stores such as EventStoreDB and AxonDB, and NoSQL databases such as DynamoDB and MongoDB are forthcoming.
Event store¶
An event store provides a common interface for storing and retrieving domain event objects. It combines a mapper and a recorder, so that domain event objects can be converted to stored event objects and then stored event objects can be recorded in a datastore.
The library’s EventStore
class must
be constructed with a mapper and a recorder.
The EventStore
has an object method
put()
which can be used to
store a list of new domain event objects. If any of these domain event
objects conflict with any already existing domain event object (because
they have the same aggregate ID and version number), an exception will
be raised and none of the new events will be stored.
The EventStore
has an object method
get()
which can be used to
get a list of domain event objects. Only the originator_id
argument
is required, which is the ID of the aggregate for which existing events
are wanted. The arguments gt
, lte
, limit
, and desc
condition the selection of events to be greater than a particular version
number, less then or equal to a particular version number, limited in
number, or selected in a descending fashion. The selection is by default
ascending, unlimited, and otherwise unrestricted such that all the previously
stored domain event objects for a particular aggregate will be returned
in the order in which they were created.
from eventsourcing.persistence import EventStore
event_store = EventStore(
mapper=mapper,
recorder=application_recorder,
)
event_store.put([domain_event1])
domain_events = list(event_store.get(id1))
assert domain_events == [domain_event1]
Infrastructure factory¶
An infrastructure factory helps with the construction of the persistence infrastructure objects mentioned above. By reading and responding to particular environment variables, the persistence infrastructure of an event-sourced application can be easily configured in different ways at different times.
The library’s InfrastructureFactory
class
is a base class for concrete infrastructure factories that help with the construction
of persistence objects that use a particular database in a particular way.
The class method construct()
will, by default, construct the library’s “plain old Python objects”
infrastructure Factory
, which uses recorders that simply
keep stored events in a data structure in memory (see eventsourcing.popo
).
from eventsourcing.persistence import InfrastructureFactory
factory = InfrastructureFactory.construct()
recorder = factory.application_recorder()
mapper = factory.mapper(transcoder=transcoder)
event_store = factory.event_store(
mapper=mapper,
recorder=recorder,
)
event_store.put([domain_event1])
stored_events = list(event_store.get(id1))
assert stored_events == [domain_event1]
SQLite¶
The module eventsourcing.sqlite
supports storing events in SQLite.
The library’s SQLite Factory
uses environment variables
SQLITE_DBNAME
and CREATE_TABLE
.
The SQLITE_DBNAME
value is the name of a database, normally a file path, but
the special name :memory:
can be used to create an in-memory database.
If the tables already exist, the CREATE_TABLE
may be set to a “false” value ("n"
,
"no"
, "f"
, "false"
, "off"
, or "0"
). This value is by default “true”
which is normally okay because the tables are created only if they do not exist.
import os
os.environ["INFRASTRUCTURE_FACTORY"] = "eventsourcing.sqlite:Factory"
os.environ["SQLITE_DBNAME"] = ":memory:"
factory = InfrastructureFactory.construct()
recorder = factory.application_recorder()
mapper = factory.mapper(transcoder=transcoder)
event_store = factory.event_store(
mapper=mapper,
recorder=recorder,
)
event_store.put([domain_event1])
stored_events = list(event_store.get(id1))
assert stored_events == [domain_event1]
PostgreSQL¶
The module eventsourcing.postgres
supports storing events in PostgresSQL.
The library’s PostgreSQL Factory
uses environment variables
POSTGRES_DBNAME
, POSTGRES_HOST
, POSTGRES_PORT
, POSTGRES_USER
,
POSTGRES_PASSWORD
, and CREATE_TABLE
.
The values of POSTGRES_DBNAME
, POSTGRES_HOST
, POSTGRES_PORT
, POSTGRES_USER
, and
POSTGRES_PASSWORD
are used to set the name of a database, the database server’s
host name, the database user name, and the password for that user.
If the tables already exist, the CREATE_TABLE
may be set to a “false” value ("n"
,
"no"
, "f"
, "false"
, "off"
, or "0"
). This value is by default “true”
which is normally okay because the tables are created only if they do not exist.
import os
os.environ["INFRASTRUCTURE_FACTORY"] = "eventsourcing.postgres:Factory"
os.environ["POSTGRES_DBNAME"] = "eventsourcing"
os.environ["POSTGRES_HOST"] = "127.0.0.1"
os.environ["POSTGRES_PORT"] = "5432"
os.environ["POSTGRES_USER"] = "eventsourcing"
os.environ["POSTGRES_PASSWORD"] = "eventsourcing"
factory = InfrastructureFactory.construct()
recorder = factory.application_recorder()
mapper = factory.mapper(transcoder=transcoder)
event_store = factory.event_store(
mapper=mapper,
recorder=recorder,
)
event_store.put([domain_event1])
stored_events = list(event_store.get(id1))
assert stored_events == [domain_event1]
Classes¶
-
class
eventsourcing.persistence.
Transcoding
[source]¶ Bases:
abc.ABC
Abstract base class for custom transcodings.
-
type
¶ Object type of transcoded object.
-
name
¶ Name of transcoding.
-
-
class
eventsourcing.persistence.
Transcoder
[source]¶ Bases:
abc.ABC
Abstract base class for transcoders.
-
class
eventsourcing.persistence.
JSONTranscoder
[source]¶ Bases:
eventsourcing.persistence.Transcoder
Extensible transcoder that uses the Python
json
module.
-
class
eventsourcing.persistence.
UUIDAsHex
[source]¶ Bases:
eventsourcing.persistence.Transcoding
Transcoding that represents
UUID
objects as hex values.-
type
¶ alias of
uuid.UUID
-
-
class
eventsourcing.persistence.
DecimalAsStr
[source]¶ Bases:
eventsourcing.persistence.Transcoding
Transcoding that represents
Decimal
objects as strings.-
type
¶ alias of
decimal.Decimal
-
-
class
eventsourcing.persistence.
DatetimeAsISO
[source]¶ Bases:
eventsourcing.persistence.Transcoding
Transcoding that represents
datetime
objects as ISO strings.-
type
¶ alias of
datetime.datetime
-
-
class
eventsourcing.persistence.
StoredEvent
(originator_id: uuid.UUID, originator_version: int, topic: str, state: bytes)[source]¶ Bases:
object
Frozen dataclass that represents
DomainEvent
objects, such as aggregateEvent
objects andSnapshot
objects.Constructor parameters:
Parameters: - originator_id (int) – ID of the originating aggregate
- originator_id – version of the originating aggregate
- topic (str) – topic of the domain event object class
- state (bytes) – serialised state of the domain event object
-
class
eventsourcing.persistence.
Cipher
(cipher_key: str)[source]¶ Bases:
abc.ABC
Base class for ciphers.
-
class
eventsourcing.persistence.
Mapper
(transcoder: eventsourcing.persistence.Transcoder, compressor: Optional[eventsourcing.persistence.Compressor] = None, cipher: Optional[eventsourcing.persistence.Cipher] = None)[source]¶ Bases:
typing.Generic
Converts between domain event objects and
StoredEvent
objects.Uses a
Transcoder
, and optionally a cryptographic cipher and compressor.-
__init__
(transcoder: eventsourcing.persistence.Transcoder, compressor: Optional[eventsourcing.persistence.Compressor] = None, cipher: Optional[eventsourcing.persistence.Cipher] = None)[source]¶ Initialize self. See help(type(self)) for accurate signature.
-
from_domain_event
(domain_event: TDomainEvent) → eventsourcing.persistence.StoredEvent[source]¶ Converts the given domain event to a
StoredEvent
object.
-
to_domain_event
(stored: eventsourcing.persistence.StoredEvent) → TDomainEvent[source]¶ Converts the given
StoredEvent
to a domain event object.
-
-
class
eventsourcing.persistence.
Recorder
[source]¶ Bases:
abc.ABC
Abstract base class for stored event recorders.
-
class
eventsourcing.persistence.
AggregateRecorder
[source]¶ Bases:
eventsourcing.persistence.Recorder
Abstract base class for recorders that record and retrieve stored events for domain model aggregates.
-
class
eventsourcing.persistence.
Notification
(originator_id: uuid.UUID, originator_version: int, topic: str, state: bytes, id: int)[source]¶ Bases:
eventsourcing.persistence.StoredEvent
Frozen dataclass that represents domain event notifications.
-
class
eventsourcing.persistence.
ApplicationRecorder
[source]¶ Bases:
eventsourcing.persistence.AggregateRecorder
Abstract base class for recorders that record and retrieve stored events for domain model aggregates.
Extends the behaviour of aggregate recorders by recording aggregate events in a total order that allows the stored events also to be retrieved as event notifications.
-
class
eventsourcing.persistence.
ProcessRecorder
[source]¶ Bases:
eventsourcing.persistence.ApplicationRecorder
Abstract base class for recorders that record and retrieve stored events for domain model aggregates.
Extends the behaviour of applications recorders by recording aggregate events with tracking information that records the position of a processed event notification in a notification log.
-
class
eventsourcing.persistence.
EventStore
(mapper: eventsourcing.persistence.Mapper[~TDomainEvent][TDomainEvent], recorder: eventsourcing.persistence.AggregateRecorder)[source]¶ Bases:
typing.Generic
Stores and retrieves domain events.
-
__init__
(mapper: eventsourcing.persistence.Mapper[~TDomainEvent][TDomainEvent], recorder: eventsourcing.persistence.AggregateRecorder)[source]¶ Initialize self. See help(type(self)) for accurate signature.
-
-
class
eventsourcing.persistence.
InfrastructureFactory
(application_name: str)[source]¶ Bases:
abc.ABC
Abstract base class for infrastructure factories.
-
classmethod
construct
(application_name: str = '') → eventsourcing.persistence.InfrastructureFactory[source]¶ Constructs concrete infrastructure factory for given named application. Reads and resolves infrastructure factory class topic from environment variable ‘INFRASTRUCTURE_FACTORY’.
-
__init__
(application_name: str)[source]¶ Initialises infrastructure factory object with given application name.
-
getenv
(key: str, default: Optional[str] = None, application_name: str = '') → Optional[str][source]¶ Returns value of environment variable defined by given key.
-
mapper
(transcoder: eventsourcing.persistence.Transcoder, application_name: str = '') → eventsourcing.persistence.Mapper[source]¶ Constructs a mapper.
-
cipher
(application_name: str) → Optional[eventsourcing.persistence.Cipher][source]¶ Reads environment variables ‘CIPHER_TOPIC’ and ‘CIPHER_KEY’ to decide whether or not to construct a cipher.
-
compressor
(application_name: str) → Optional[eventsourcing.persistence.Compressor][source]¶ Reads environment variable ‘COMPRESSOR_TOPIC’ to decide whether or not to construct a compressor.
-
static
event_store
(**kwargs) → eventsourcing.persistence.EventStore[source]¶ Constructs an event store.
-
aggregate_recorder
(purpose: str = 'events') → eventsourcing.persistence.AggregateRecorder[source]¶ Constructs an aggregate recorder.
-
application_recorder
() → eventsourcing.persistence.ApplicationRecorder[source]¶ Constructs an application recorder.
-
classmethod
-
class
eventsourcing.persistence.
Tracking
(application_name: str, notification_id: int)[source]¶ Bases:
object
Frozen dataclass representing the position of a domain event
Notification
in an application’s notification log.
-
class
eventsourcing.popo.
POPOAggregateRecorder
[source]¶
-
class
eventsourcing.popo.
POPOApplicationRecorder
[source]¶ Bases:
eventsourcing.persistence.ApplicationRecorder
,eventsourcing.popo.POPOAggregateRecorder
-
class
eventsourcing.popo.
POPOProcessRecorder
[source]¶ Bases:
eventsourcing.persistence.ProcessRecorder
,eventsourcing.popo.POPOApplicationRecorder
-
class
eventsourcing.popo.
Factory
(application_name: str)[source]¶ Bases:
eventsourcing.persistence.InfrastructureFactory
-
aggregate_recorder
(purpose: str = 'events') → eventsourcing.persistence.AggregateRecorder[source]¶ Constructs an aggregate recorder.
-
-
class
eventsourcing.sqlite.
SQLiteAggregateRecorder
(datastore: eventsourcing.sqlite.SQLiteDatastore, events_table_name: str = 'stored_events')[source]¶ Bases:
eventsourcing.persistence.AggregateRecorder
-
__init__
(datastore: eventsourcing.sqlite.SQLiteDatastore, events_table_name: str = 'stored_events')[source]¶ Initialize self. See help(type(self)) for accurate signature.
-
-
class
eventsourcing.sqlite.
SQLiteApplicationRecorder
(datastore: eventsourcing.sqlite.SQLiteDatastore, events_table_name: str = 'stored_events')[source]¶ Bases:
eventsourcing.sqlite.SQLiteAggregateRecorder
,eventsourcing.persistence.ApplicationRecorder
-
__init__
(datastore: eventsourcing.sqlite.SQLiteDatastore, events_table_name: str = 'stored_events')[source]¶ Initialize self. See help(type(self)) for accurate signature.
-
-
class
eventsourcing.sqlite.
SQLiteProcessRecorder
(datastore: eventsourcing.sqlite.SQLiteDatastore, events_table_name: str = 'stored_events')[source]¶ Bases:
eventsourcing.sqlite.SQLiteApplicationRecorder
,eventsourcing.persistence.ProcessRecorder
-
class
eventsourcing.sqlite.
Factory
(application_name: str)[source]¶ Bases:
eventsourcing.persistence.InfrastructureFactory
-
__init__
(application_name: str)[source]¶ Initialises infrastructure factory object with given application name.
-
aggregate_recorder
(purpose: str = 'events') → eventsourcing.persistence.AggregateRecorder[source]¶ Constructs an aggregate recorder.
-
-
class
eventsourcing.postgres.
PostgresAggregateRecorder
(datastore: eventsourcing.postgres.PostgresDatastore, events_table_name: str)[source]¶ Bases:
eventsourcing.persistence.AggregateRecorder
-
__init__
(datastore: eventsourcing.postgres.PostgresDatastore, events_table_name: str)[source]¶ Initialize self. See help(type(self)) for accurate signature.
-
-
class
eventsourcing.postgres.
PostgresApplicationRecorder
(datastore: eventsourcing.postgres.PostgresDatastore, events_table_name: str = 'stored_events')[source]¶ Bases:
eventsourcing.postgres.PostgresAggregateRecorder
,eventsourcing.persistence.ApplicationRecorder
-
__init__
(datastore: eventsourcing.postgres.PostgresDatastore, events_table_name: str = 'stored_events')[source]¶ Initialize self. See help(type(self)) for accurate signature.
-
-
class
eventsourcing.postgres.
PostgresProcessRecorder
(datastore: eventsourcing.postgres.PostgresDatastore, events_table_name: str, tracking_table_name: str)[source]¶ Bases:
eventsourcing.postgres.PostgresApplicationRecorder
,eventsourcing.persistence.ProcessRecorder
-
class
eventsourcing.postgres.
Factory
(application_name: str)[source]¶ Bases:
eventsourcing.persistence.InfrastructureFactory
-
__init__
(application_name: str)[source]¶ Initialises infrastructure factory object with given application name.
-
aggregate_recorder
(purpose: str = 'events') → eventsourcing.persistence.AggregateRecorder[source]¶ Constructs an aggregate recorder.
-