persistence
— Infrastructure¶
This module provides a cohesive mechanism for storing domain events.
The entire mechanism is encapsulated by the library’s event store object class. An event store stores and retrieves domain events. The event store uses a mapper to convert domain events to stored events, and it uses a recorder to insert stored events in a datastore.
A mapper converts domain event objects of various types to
stored event objects when domain events are stored in the event
store. It also converts stored events objects back to domain
event objects when domain events are retrieved from the event
store. A mapper uses an extensible transcoder that can be set up
with additional transcoding objects that serialise and deserialise
particular types of object, such as Python’s UUID
,
datetime
and Decimal
objects.
A mapper may use a compressor to compress and decompress the state
of stored event objects, and may use a cipher to encode and decode
the state of stored event objects. If both a compressor and a cipher
are being used by a mapper, the state of any stored event objects will
be compressed and then encoded when storing domain events, and will be
decoded and then decompressed when retrieving domain events.
A recorder inserts stored event objects in a datastore when domain events are stored in an event store, and selects stored events from a datastore when domain events are retrieved from an event store. Depending on the type of the recorder it may be possible to select the stored events as event notifications, and it may be possible atomically to record tracking records along with the stored events,
Transcoder¶
A transcoder is used by a mapper to serialise and deserialise the state of domain model event objects.
The library’s JSONTranscoder
class
can be constructed without any arguments.
from eventsourcing.persistence import JSONTranscoder
transcoder = JSONTranscoder()
The transcoder
object has methods encode()
and decode()
which are used to perform the
serialisation and deserialisation. The serialised state is a Python bytes
object.
data = transcoder.encode({"a": 1})
copy = transcoder.decode(data)
assert copy == {"a": 1}
The library’s JSONTranscoder
uses the Python
json
module. And so, by default, only the basic object types supported by that
module can be encoded and decoded. The transcoder can be extended by registering
transcodings for the other types of object used in your domain model’s event objects.
A transcoding will convert other types of object to a representation of the non-basic
type of object that uses the basic types that are supported. The transcoder method
register()
is used to register
individual transcodings with the transcoder.
Transcodings¶
In order to encode and decode non-basic types of object that are not supported by
the transcoder by default, custom transcodings need to be defined in code and
registered with the transcoder using the transcoder object’s
register()
method. A transcoding
will encode an instance of a non-basic type of object that cannot by default be
encoded by the transcoder into a basic type of object that can be encoded by the
transcoder, and will decode that representation into the original type of object.
This makes it possible to transcode custom value objects, including custom types
that contain custom types. The transcoder works recursively through the object
and so included custom types do not need to be encoded by the transcoder, but
will be converted subsequently.
The library includes a limited collection of custom transcoding objects. For
example, the library’s UUIDAsHex
class
transcodes a Python UUID
objects as a hexadecimal string.
from uuid import uuid4
from eventsourcing.persistence import UUIDAsHex
transcoding = UUIDAsHex()
id1 = uuid4()
data = transcoding.encode(id1)
copy = transcoding.decode(data)
assert copy == id1
The library’s DatetimeAsISO
class
transcodes Python datetime
objects as ISO strings.
from datetime import datetime
from eventsourcing.persistence import (
DatetimeAsISO,
)
transcoding = DatetimeAsISO()
datetime1 = datetime(2021, 12, 31, 23, 59, 59)
data = transcoding.encode(datetime1)
copy = transcoding.decode(data)
assert copy == datetime1
The library’s DecimalAsStr
class
transcodes Python Decimal
objects as decimal strings.
from decimal import Decimal
from eventsourcing.persistence import (
DecimalAsStr,
)
transcoding = DecimalAsStr()
decimal1 = Decimal("1.2345")
data = transcoding.encode(decimal1)
copy = transcoding.decode(data)
assert copy == decimal1
Transcodings are registered with the transcoder using the transcoder object’s
register()
method.
transcoder.register(UUIDAsHex())
transcoder.register(DatetimeAsISO())
transcoder.register(DecimalAsStr())
data = transcoder.encode(id1)
copy = transcoder.decode(data)
assert copy == id1
data = transcoder.encode(datetime1)
copy = transcoder.decode(data)
assert copy == datetime1
data = transcoder.encode(decimal1)
copy = transcoder.decode(data)
assert copy == decimal1
Attempting to serialize an unsupported type will result in a Python TypeError
.
from datetime import date
date1 = date(2021, 12, 31)
try:
data = transcoder.encode(date1)
except TypeError as e:
assert e.args[0] == (
"Object of type <class 'datetime.date'> is not serializable. "
"Please define and register a custom transcoding for this type."
)
else:
raise AssertionError("TypeError not raised")
Attempting to deserialize an unsupported type will also result in a Python TypeError
.
try:
JSONTranscoder().decode(data)
except TypeError as e:
assert e.args[0] == (
"Data serialized with name 'decimal_str' is not deserializable. "
"Please register a custom transcoding for this type."
)
else:
raise AssertionError("TypeError not raised")
The library’s abstract base class Transcoding
can be subclassed to define custom transcodings for other object types. To define
a custom transcoding, simply subclass this base class, assign to the class attribute
type
the class transcoded type, and assign a string to the class attribute
name
. Then define an encode()
method that converts an instance of that type to a representation that uses a basic
type, and a decode()
method that will
convert that representation back to an instance of that type.
from eventsourcing.persistence import Transcoding
from typing import Union
class DateAsISO(Transcoding):
type = date
name = "date_iso"
def encode(self, obj: date) -> str:
return obj.isoformat()
def decode(self, data: str) -> date:
return date.fromisoformat(data)
transcoder.register(DateAsISO())
data = transcoder.encode(date1)
copy = transcoder.decode(data)
assert copy == date1
Please note, due to the way the Python json
module works, it isn’t
currently possible to transcode subclasses of the basic Python types that
are supported by default, such as dict
, list
, tuple
,
str
, int
, float
, and bool
. This behaviour
also means an encoded tuple
will be decoded as a list
.
This behaviour is coded in Python as C code, and can’t be suspended without
avoiding the use of this C code and thereby incurring a performance penalty
in the transcoding of domain event objects.
data = transcoder.encode((1, 2, 3))
copy = transcoder.decode(data)
assert isinstance(copy, list)
assert copy == [1, 2, 3]
Custom or non-basic types that contain other custom or non-basic types can be
supported in the transcoder by registering a transcoding for each non-basic type.
The transcoding for the type which contains non-basic types must return an object
that represents that type by involving the included non-basic objects, and this
representation will be subsequently transcoded by the transcoder using the applicable
transcoding for the included non-basic types. In the example below, SimpleCustomValue
has a UUID
and a date
as its id
and data
attributes.
The transcoding for SimpleCustomValue
returns a Python dict
that includes
the non-basic UUID
and date
objects. The class ComplexCustomValue
simply has a ComplexCustomValue
object as its value
attribute, and its
transcoding simply returns that object.
from uuid import UUID
class SimpleCustomValue:
def __init__(self, id: UUID, date: date):
self.id = id
self.date = date
def __eq__(self, other):
return (
isinstance(other, SimpleCustomValue) and
self.id == other.id and self.date == other.date
)
class ComplexCustomValue:
def __init__(self, value: SimpleCustomValue):
self.value = value
def __eq__(self, other):
return (
isinstance(other, ComplexCustomValue) and
self.value == other.value
)
class SimpleCustomValueAsDict(Transcoding):
type = SimpleCustomValue
name = "simple_custom_value"
def encode(self, obj: SimpleCustomValue) -> dict:
return {"id": obj.id, "date": obj.date}
def decode(self, data: dict) -> SimpleCustomValue:
assert isinstance(data, dict)
return SimpleCustomValue(**data)
class ComplexCustomValueAsDict(Transcoding):
type = ComplexCustomValue
name = "complex_custom_value"
def encode(self, obj: ComplexCustomValue) -> SimpleCustomValue:
return obj.value
def decode(self, data: SimpleCustomValue) -> ComplexCustomValue:
assert isinstance(data, SimpleCustomValue)
return ComplexCustomValue(data)
The custom value object transcodings can be registered with the transcoder.
transcoder.register(SimpleCustomValueAsDict())
transcoder.register(ComplexCustomValueAsDict())
We can now transcode an instance of ComplexCustomValueAsDict
.
obj1 = ComplexCustomValue(
SimpleCustomValue(
id=UUID("b2723fe2c01a40d2875ea3aac6a09ff5"),
date=date(2000, 2, 20)
)
)
data = transcoder.encode(obj1)
copy = transcoder.decode(data)
assert copy == obj1
As you can see from the bytes representation below, the transcoder puts the return value
of each transcoding’s encode()
method in a Python dict
that has two values
_data_
and _type_
. The _data_
value is the return value of the
transcoding’s encode()
method, and the _type_
value is the name of the
transcoding. For this reason, it is necessary to avoid defining model objects to have a
Python dict
that has only two attributes _data_
and _type_
, and
avoid defining transcodings that return such a thing.
expected_data = (
b'{"_type_": "complex_custom_value", "_data_": {"_type_": '
b'"simple_custom_value", "_data_": {"id": {"_type_": '
b'"uuid_hex", "_data_": "b2723fe2c01a40d2875ea3aac6a09ff5"},'
b' "date": {"_type_": "date_iso", "_data_": "2000-02-20"}'
b'}}}'
)
assert data == expected_data
Stored event objects¶
A stored event object is a common object type that can be used to represent domain event objects of different types. By using a common object for the representation of different types of domain events objects, the domain event objects can be stored and retrieved in a standard way.
The library’s StoredEvent
class
is a Python frozen dataclass that can be used to hold information
about a domain event object between it being serialised and being
recorded in a datastore, and between it be retrieved from a datastore
from an aggregate sequence and being deserialised as a domain event object.
from uuid import uuid4
from eventsourcing.persistence import StoredEvent
stored_event = StoredEvent(
originator_id=uuid4(),
originator_version=1,
state="{}",
topic="eventsourcing.model:DomainEvent",
)
Mapper¶
A mapper maps between domain event objects and stored event objects. It brings together a transcoder, and optionally a cipher and a compressor. It is used by an event store.
The library’s Mapper
class
must be constructed with a transcoder object.
from eventsourcing.persistence import Mapper
mapper = Mapper(transcoder=transcoder)
The from_domain_event()
method of the
mapper
object converts DomainEvent
objects to
StoredEvent
objects.
from eventsourcing.domain import DomainEvent, TZINFO
domain_event1 = DomainEvent(
originator_id = id1,
originator_version = 1,
timestamp = datetime.now(tz=TZINFO),
)
stored_event1 = mapper.from_domain_event(domain_event1)
assert isinstance(stored_event1, StoredEvent)
The to_domain_event()
method of the
mapper
object converts StoredEvent
objects to
DomainEvent
objects.
assert mapper.to_domain_event(stored_event1) == domain_event1
Encryption¶
Using a cryptographic cipher with your mapper will make the state of your application encrypted “at rest” and “on the wire”.
Without encryption, the state of the domain event will be visible in the
recorded stored events in your database. For example, the timestamp
of the domain event in the example above (domain_event1
) is visible
in the stored event (stored_event1
).
assert domain_event1.timestamp.isoformat() in str(stored_event1.state)
The library’s AESCipher
class can
be used to cryptographically encode and decode the state of stored
events. It must be constructed with a cipher key. The class method
create_key()
can be used to
generate a cipher key. The AES cipher key must be either 16, 24, or
32 bytes long. Please note, the same cipher key must be used to
decrypt stored events as that which was used to encrypt stored events.
from eventsourcing.cipher import AESCipher
key = AESCipher.create_key(num_bytes=32) # 16, 24, or 32
cipher = AESCipher(cipher_key=key)
mapper = Mapper(
transcoder=transcoder,
cipher=cipher,
)
stored_event1 = mapper.from_domain_event(domain_event1)
assert isinstance(stored_event1, StoredEvent)
assert mapper.to_domain_event(stored_event1) == domain_event1
With encryption, the state of the domain event will not be visible in the stored event. This feature can be used to implement “application-level encryption” in an event-sourced application.
assert domain_event1.timestamp.isoformat() not in str(stored_event1.state)
The library’s AESCipher
class uses the
AES cipher
from the PyCryptodome library
in GCM mode.
AES is a very fast and secure symmetric block cipher, and is the de facto
standard for symmetric encryption. Galois/Counter Mode (GCM) is a mode of
operation for symmetric block ciphers that is designed to provide both data
authenticity and confidentiality, and is widely adopted for its performance.
The mapper expects an instance of the abstract base class Cipher
,
and AESCipher
implements this abstract base class,
so if you want to use another cipher strategy simply implement the base class.
Compression¶
A compressor can be used to reduce the size of stored events.
The library’s ZlibCompressor
class
can be used to compress and decompress the state of stored events. The
size of the state of a compressed and encrypted stored event will be
less than or equal to the size of the state of a stored event that is
encrypted but not compressed.
from eventsourcing.compressor import ZlibCompressor
compressor = ZlibCompressor()
mapper = Mapper(
transcoder=transcoder,
cipher=cipher,
compressor=compressor,
)
stored_event2 = mapper.from_domain_event(domain_event1)
assert mapper.to_domain_event(stored_event2) == domain_event1
assert len(stored_event2.state) <= len(stored_event1.state)
The library’s ZlibCompressor
class
uses Python’s zlib
module.
The mapper expects an instance of the abstract base class
Compressor
, and
ZlibCompressor
implements this
abstract base class, so if you want to use another compression
strategy simply implement the base class.
Notification objects¶
Event notifications are used to propagate the state of an event sourced application in a reliable way. The stored events can be positioned in a “total order” by giving each a new domain event a notification ID that is higher that any previously recorded event. By recording the domain events atomically with their notification IDs, there will never be a domain event that is not available to be passed as a message across a network, and there will never be a message passed across a network that doesn’t correspond to a recorded event. This solves the “dual writing” problem that occurs when separately a domain model is updated and then a message is put on a message queue.
The library’s Notification
class
is a Python frozen dataclass that can be used to hold information
about a domain event object when being transmitted as an item in a
section of a notification log.
It will be returned when selecting event notifications from a
recorder, and presented in an application by a
notification log.
from uuid import uuid4
from eventsourcing.persistence import Notification
stored_event = Notification(
id=123,
originator_id=uuid4(),
originator_version=1,
state="{}",
topic="eventsourcing.model:DomainEvent",
)
Tracking objects¶
A tracking object can be used to encapsulate the position of an event notification in an upstream application’s notification log. A tracking object can be passed into a process recorder along with new stored event objects, and recorded atomically with those objects. By ensuring the uniqueness of recorded tracking objects, we can ensure that a domain event notification is never processed twice. By recording the position of the last event notification that has been processed, we can ensure to resume processing event notifications at the correct position. This constructs “exactly once” semantics when processing event notifications, by solving the “dual writing” problem that occurs when separately an event notification is consumed from a message queue with updates made to materialized view, and then an acknowledgement is sent back to the message queue.
The library’s Tracking
class
is a Python frozen dataclass that can be used to hold the notification
ID of a notification that has been processed.
from uuid import uuid4
from eventsourcing.persistence import Tracking
tracking = Tracking(
notification_id=123,
application_name="bounded_context1",
)
Recorder¶
A recorder adapts a database management system for the purpose of recording stored events. It is used by an event store.
The library’s Recorder
class
is an abstract base for concrete recorder classes that will insert
stored event objects in a particular datastore.
There are three flavours of recorder: “aggregate recorders” are the simplest and simply store domain events in aggregate sequences; “application recorders” extend aggregate recorders by storing domain events with a total order; “process recorders” extend application recorders by supporting the recording of domain events atomically with “tracking” objects that record the position in a total ordering of domain events that is being processed. The “aggregate recorder” can be used for storing snapshots.
The library includes in its sqlite
module
recorder classes for SQLite that use the Python sqlite3
module, and in its postgres
module recorders for
PostgreSQL that use the third party psycopg2
module.
Recorder classes are conveniently constructed by using an infrastructure factory. For illustrative purposes, the direct use of the library’s SQLite recorders is shown below. The other persistence modules follow a similar naming scheme and pattern of use.
from eventsourcing.sqlite import SQLiteAggregateRecorder
from eventsourcing.sqlite import SQLiteApplicationRecorder
from eventsourcing.sqlite import SQLiteProcessRecorder
from eventsourcing.sqlite import SQLiteDatastore
datastore = SQLiteDatastore(db_name=":memory:")
aggregate_recorder = SQLiteAggregateRecorder(datastore, "snapshots")
aggregate_recorder.create_table()
application_recorder = SQLiteApplicationRecorder(datastore)
application_recorder.create_table()
datastore = SQLiteDatastore(db_name=":memory:")
process_recorder = SQLiteProcessRecorder(datastore)
process_recorder.create_table()
The library also includes in the popo
module recorders
that use “plain old Python objects”, which simply keep stored events in a
data structure in memory, and provides the fastest alternative for rapid
development of event sourced applications (~4x faster than using SQLite, and
~20x faster than using PostgreSQL).
Recorders compatible with this version of the library for popular ORMs such as SQLAlchemy and Django, specialist event stores such as EventStoreDB and AxonDB, and NoSQL databases such as DynamoDB and MongoDB are forthcoming.
Event store¶
An event store provides a common interface for storing and retrieving domain event objects. It combines a mapper and a recorder, so that domain event objects can be converted to stored event objects and then stored event objects can be recorded in a datastore.
The library’s EventStore
class must
be constructed with a mapper and a recorder.
The EventStore
has an object method
put()
which can be used to
store a list of new domain event objects. If any of these domain event
objects conflict with any already existing domain event object (because
they have the same aggregate ID and version number), an exception will
be raised and none of the new events will be stored.
The EventStore
has an object method
get()
which can be used to
get a list of domain event objects. Only the originator_id
argument
is required, which is the ID of the aggregate for which existing events
are wanted. The arguments gt
, lte
, limit
, and desc
condition the selection of events to be greater than a particular version
number, less then or equal to a particular version number, limited in
number, or selected in a descending fashion. The selection is by default
ascending, unlimited, and otherwise unrestricted such that all the previously
stored domain event objects for a particular aggregate will be returned
in the order in which they were created.
from eventsourcing.persistence import EventStore
event_store = EventStore(
mapper=mapper,
recorder=application_recorder,
)
event_store.put([domain_event1])
domain_events = list(event_store.get(id1))
assert domain_events == [domain_event1]
Infrastructure factory¶
An infrastructure factory helps with the construction of the persistence infrastructure objects mentioned above. By reading and responding to particular environment variables, the persistence infrastructure of an event-sourced application can be easily configured in different ways at different times.
The library’s InfrastructureFactory
class
is a base class for concrete infrastructure factories that help with the construction
of persistence objects that use a particular database in a particular way.
The class method construct()
will, by default, construct the library’s “plain old Python objects”
infrastructure Factory
, which uses recorders that simply
keep stored events in a data structure in memory (see eventsourcing.popo
).
from eventsourcing.persistence import InfrastructureFactory
factory = InfrastructureFactory.construct()
recorder = factory.application_recorder()
mapper = factory.mapper(transcoder=transcoder)
event_store = factory.event_store(
mapper=mapper,
recorder=recorder,
)
event_store.put([domain_event1])
stored_events = list(event_store.get(id1))
assert stored_events == [domain_event1]
The optional environment variables COMPRESSOR_TOPIC
, CIPHER_KEY
, and CIPHER_TOPIC
may
be used to enable compression and encryption of stored events when using POPO infrastructure.
SQLite¶
The module eventsourcing.sqlite
supports storing events in SQLite.
The library’s SQLite Factory
uses various
environment variables to control the construction and configuration of its
persistence infrastructure.
The environment variable SQLITE_DBNAME
is required to set the name of a database,
normally a file path, but the special name :memory:
can be used to create an
in-memory database.
The optional environment variable SQLITE_LOCK_TIMEOUT
may be used to adjust the SQLite timeout
value. A file-based SQLite database will have its journal mode set to use write-ahead
logging (WAL), which allows reading to proceed concurrently reading and writing. Writing
is serialised with a lock. Setting this value to a positive number of seconds will cause
attempts to lock the SQLite database for writing to timeout after that duration. By default
this value is 5 (seconds).
The optional environment variables COMPRESSOR_TOPIC
, CIPHER_KEY
, and CIPHER_TOPIC
may
be used to enable compression and encryption of stored events.
The optional environment variable CREATE_TABLE
may be control whether database tables are created.
If the tables already exist, the CREATE_TABLE
may be set to a “false” value ("n"
,
"no"
, "f"
, "false"
, "off"
, or "0"
). This value is by default “true”
which is normally okay because the tables are created only if they do not exist.
import os
os.environ["INFRASTRUCTURE_FACTORY"] = "eventsourcing.sqlite:Factory"
os.environ["SQLITE_DBNAME"] = ":memory:"
os.environ["SQLITE_LOCK_TIMEOUT"] = "10"
factory = InfrastructureFactory.construct()
recorder = factory.application_recorder()
mapper = factory.mapper(transcoder=transcoder)
event_store = factory.event_store(
mapper=mapper,
recorder=recorder,
)
event_store.put([domain_event1])
stored_events = list(event_store.get(id1))
assert stored_events == [domain_event1]
PostgreSQL¶
The module eventsourcing.postgres
supports storing events in PostgresSQL.
The library’s PostgreSQL Factory
uses various
environment variables to control the construction and configuration of its persistence
infrastructure.
The environment variables POSTGRES_DBNAME
, POSTGRES_HOST
, POSTGRES_PORT
,
POSTGRES_USER
, and POSTGRES_PASSWORD
are required to set the name of a database,
the database server’s host name and port number, and the database user name and password.
The optional environment variable POSTGRES_CONN_MAX_AGE
is used to control the length of time in
seconds before a connection is closed. By default this value is not set, and connections will
be reused indefinitely (or until an operational database error is encountered). If this
value is set to a positive integer, the connection will be closed after this number of
seconds from the time it was created, but only when the connection is idle. If this value
if set to zero, each connection will only be used for one transaction. Setting this value
to an empty string has the same effect as not setting this value. Setting this value to
any other value will cause an environment error exception to be raised. If your database
terminates idle connections after some time, you should set POSTGRES_CONN_MAX_AGE
to a
lower value, so that attempts are not made to use connections that have been terminated
by the database server.
The optional environment variable POSTGRES_PRE_PING
may be used to enable pessimistic
disconnection handling. Setting this to a “true” value ("y"
, "yes"
, "t"
, "true"
,
"on"
, or "1"
) means database connections will be checked that they are usable before
executing statements, and database connections remade if the connection is not usable. This
value is by default “false”, meaning connections will not be checked before they are reused.
Enabling this option will incur a small impact on performance.
The optional environment variable POSTGRES_LOCK_TIMEOUT
may be used to enable a timeout
on acquiring an ‘EXCLUSIVE’ mode table lock when inserting stored events. To avoid interleaving
of inserts when writing events, an ‘EXCLUSIVE’ mode table lock is acquired when inserting events.
This effectively serialises writing events. It prevents concurrent transactions interleaving inserts,
which would potentially cause notification log readers that are tailing the application notification
log to miss event notifications. Reading from the table can proceed concurrently with other readers
and writers, since selecting acquires an ‘ACCESS SHARE’ lock which does not block and is not blocked
by the ‘EXCLUSIVE’ lock. This issue of interleaving inserts by concurrent writers is not exhibited
by SQLite, which supports concurrent readers when its journal mode is set to use write ahead logging.
By default, this timeout has the value of 0 seconds, which means attempts to acquire the lock will
not timeout. Setting this value to a positive integer number of seconds will cause attempt to obtain this
lock to timeout after that duration has passed. The lock will be released when the transaction ends.
The optional environment variable POSTGRES_IDLE_IN_TRANSACTION_SESSION_TIMEOUT
may be used to
timeout sessions that are idle in a transaction. If a transaction cannot be ended for some reason,
perhaps because the database server cannot be reached, the transaction may remain in an idle
state and any locks will continue to be held. By timing out the session, transactions will be ended,
locks will be released, and the connection slot will be freed. By default, this timeout has the value
of 0 seconds, which means sessions in an idle transaction will not timeout. Setting this value to a
positive integer number of seconds will cause sessions in an idle transaction to timeout after that duration
has passed.
The optional environment variables COMPRESSOR_TOPIC
, CIPHER_KEY
, and CIPHER_TOPIC
may
be used to enable compression and encryption of stored events.
The optional environment variable CREATE_TABLE
may be control whether database tables are created.
If the tables already exist, the CREATE_TABLE
may be set to a “false” value ("n"
,
"no"
, "f"
, "false"
, "off"
, or "0"
). This value is by default “true”
which is normally okay because the tables are created only if they do not exist.
import os
os.environ["INFRASTRUCTURE_FACTORY"] = "eventsourcing.postgres:Factory"
os.environ["POSTGRES_DBNAME"] = "eventsourcing"
os.environ["POSTGRES_HOST"] = "127.0.0.1"
os.environ["POSTGRES_PORT"] = "5432"
os.environ["POSTGRES_USER"] = "eventsourcing"
os.environ["POSTGRES_PASSWORD"] = "eventsourcing"
os.environ["POSTGRES_CONN_MAX_AGE"] = "10"
os.environ["POSTGRES_PRE_PING"] = "y"
os.environ["POSTGRES_LOCK_TIMEOUT"] = "5"
os.environ["POSTGRES_IDLE_IN_TRANSACTION_SESSION_TIMEOUT"] = "5"
factory = InfrastructureFactory.construct()
recorder = factory.application_recorder()
mapper = factory.mapper(transcoder=transcoder)
event_store = factory.event_store(
mapper=mapper,
recorder=recorder,
)
event_store.put([domain_event1])
stored_events = list(event_store.get(id1))
assert stored_events == [domain_event1]
Classes¶
-
class
eventsourcing.persistence.
Transcoding
[source]¶ Bases:
abc.ABC
Abstract base class for custom transcodings.
-
type
¶ Object type of transcoded object.
-
name
¶ Name of transcoding.
-
-
class
eventsourcing.persistence.
Transcoder
[source]¶ Bases:
abc.ABC
Abstract base class for transcoders.
-
class
eventsourcing.persistence.
JSONTranscoder
[source]¶ Bases:
eventsourcing.persistence.Transcoder
Extensible transcoder that uses the Python
json
module.
-
class
eventsourcing.persistence.
UUIDAsHex
[source]¶ Bases:
eventsourcing.persistence.Transcoding
Transcoding that represents
UUID
objects as hex values.-
type
¶ alias of
uuid.UUID
-
-
class
eventsourcing.persistence.
DecimalAsStr
[source]¶ Bases:
eventsourcing.persistence.Transcoding
Transcoding that represents
Decimal
objects as strings.-
type
¶ alias of
decimal.Decimal
-
-
class
eventsourcing.persistence.
DatetimeAsISO
[source]¶ Bases:
eventsourcing.persistence.Transcoding
Transcoding that represents
datetime
objects as ISO strings.-
type
¶ alias of
datetime.datetime
-
-
class
eventsourcing.persistence.
StoredEvent
(originator_id: uuid.UUID, originator_version: int, topic: str, state: bytes)[source]¶ Bases:
object
Frozen dataclass that represents
DomainEvent
objects, such as aggregateEvent
objects andSnapshot
objects.Constructor parameters:
Parameters: - originator_id (UUID) – ID of the originating aggregate
- originator_version (int) – version of the originating aggregate
- topic (str) – topic of the domain event object class
- state (bytes) – serialised state of the domain event object
-
class
eventsourcing.persistence.
Cipher
(cipher_key: str)[source]¶ Bases:
abc.ABC
Base class for ciphers.
-
class
eventsourcing.persistence.
Mapper
(transcoder: eventsourcing.persistence.Transcoder, compressor: Optional[eventsourcing.persistence.Compressor] = None, cipher: Optional[eventsourcing.persistence.Cipher] = None)[source]¶ Bases:
typing.Generic
Converts between domain event objects and
StoredEvent
objects.Uses a
Transcoder
, and optionally a cryptographic cipher and compressor.-
__init__
(transcoder: eventsourcing.persistence.Transcoder, compressor: Optional[eventsourcing.persistence.Compressor] = None, cipher: Optional[eventsourcing.persistence.Cipher] = None)[source]¶ Initialize self. See help(type(self)) for accurate signature.
-
from_domain_event
(domain_event: TDomainEvent) → eventsourcing.persistence.StoredEvent[source]¶ Converts the given domain event to a
StoredEvent
object.
-
to_domain_event
(stored: eventsourcing.persistence.StoredEvent) → TDomainEvent[source]¶ Converts the given
StoredEvent
to a domain event object.
-
-
exception
eventsourcing.persistence.
RecordConflictError
[source]¶ Bases:
Exception
Legacy exception, replaced with IntegrityError.
-
exception
eventsourcing.persistence.
PersistenceError
[source]¶ Bases:
Exception
The base class of the other exceptions in this module.
Exception class names follow https://www.python.org/dev/peps/pep-0249/#exceptions
-
exception
eventsourcing.persistence.
InterfaceError
[source]¶ Bases:
eventsourcing.persistence.PersistenceError
Exception raised for errors that are related to the database interface rather than the database itself.
-
exception
eventsourcing.persistence.
DatabaseError
[source]¶ Bases:
eventsourcing.persistence.PersistenceError
Exception raised for errors that are related to the database.
-
exception
eventsourcing.persistence.
DataError
[source]¶ Bases:
eventsourcing.persistence.DatabaseError
Exception raised for errors that are due to problems with the processed data like division by zero, numeric value out of range, etc.
-
exception
eventsourcing.persistence.
OperationalError
[source]¶ Bases:
eventsourcing.persistence.DatabaseError
Exception raised for errors that are related to the database’s operation and not necessarily under the control of the programmer, e.g. an unexpected disconnect occurs, the data source name is not found, a transaction could not be processed, a memory allocation error occurred during processing, etc.
-
exception
eventsourcing.persistence.
IntegrityError
[source]¶ Bases:
eventsourcing.persistence.DatabaseError
,eventsourcing.persistence.RecordConflictError
Exception raised when the relational integrity of the database is affected, e.g. a foreign key check fails.
-
exception
eventsourcing.persistence.
InternalError
[source]¶ Bases:
eventsourcing.persistence.DatabaseError
Exception raised when the database encounters an internal error, e.g. the cursor is not valid anymore, the transaction is out of sync, etc.
-
exception
eventsourcing.persistence.
ProgrammingError
[source]¶ Bases:
eventsourcing.persistence.DatabaseError
Exception raised for programming errors, e.g. table not found or already exists, syntax error in the SQL statement, wrong number of parameters specified, etc.
-
exception
eventsourcing.persistence.
NotSupportedError
[source]¶ Bases:
eventsourcing.persistence.DatabaseError
Exception raised in case a method or database API was used which is not supported by the database, e.g. calling the rollback() method on a connection that does not support transaction or has transactions turned off.
-
class
eventsourcing.persistence.
Recorder
[source]¶ Bases:
abc.ABC
Abstract base class for stored event recorders.
-
class
eventsourcing.persistence.
AggregateRecorder
[source]¶ Bases:
eventsourcing.persistence.Recorder
Abstract base class for recorders that record and retrieve stored events for domain model aggregates.
-
class
eventsourcing.persistence.
Notification
(originator_id: uuid.UUID, originator_version: int, topic: str, state: bytes, id: int)[source]¶ Bases:
eventsourcing.persistence.StoredEvent
Frozen dataclass that represents domain event notifications.
-
class
eventsourcing.persistence.
ApplicationRecorder
[source]¶ Bases:
eventsourcing.persistence.AggregateRecorder
Abstract base class for recorders that record and retrieve stored events for domain model aggregates.
Extends the behaviour of aggregate recorders by recording aggregate events in a total order that allows the stored events also to be retrieved as event notifications.
-
class
eventsourcing.persistence.
ProcessRecorder
[source]¶ Bases:
eventsourcing.persistence.ApplicationRecorder
Abstract base class for recorders that record and retrieve stored events for domain model aggregates.
Extends the behaviour of applications recorders by recording aggregate events with tracking information that records the position of a processed event notification in a notification log.
-
class
eventsourcing.persistence.
EventStore
(mapper: eventsourcing.persistence.Mapper[~TDomainEvent][TDomainEvent], recorder: eventsourcing.persistence.AggregateRecorder)[source]¶ Bases:
typing.Generic
Stores and retrieves domain events.
-
__init__
(mapper: eventsourcing.persistence.Mapper[~TDomainEvent][TDomainEvent], recorder: eventsourcing.persistence.AggregateRecorder)[source]¶ Initialize self. See help(type(self)) for accurate signature.
-
-
class
eventsourcing.persistence.
InfrastructureFactory
(application_name: str, env: Mapping[KT, VT_co])[source]¶ Bases:
abc.ABC
Abstract base class for infrastructure factories.
-
classmethod
construct
(application_name: str = '', env: Optional[Mapping[KT, VT_co]] = None) → eventsourcing.persistence.InfrastructureFactory[source]¶ Constructs concrete infrastructure factory for given named application. Reads and resolves infrastructure factory class topic from environment variable ‘INFRASTRUCTURE_FACTORY’.
-
__init__
(application_name: str, env: Mapping[KT, VT_co])[source]¶ Initialises infrastructure factory object with given application name.
-
getenv
(key: str, default: Optional[str] = None, application_name: str = '') → Optional[str][source]¶ Returns value of environment variable defined by given key.
-
mapper
(transcoder: eventsourcing.persistence.Transcoder, application_name: str = '') → eventsourcing.persistence.Mapper[source]¶ Constructs a mapper.
-
cipher
(application_name: str) → Optional[eventsourcing.persistence.Cipher][source]¶ Reads environment variables ‘CIPHER_TOPIC’ and ‘CIPHER_KEY’ to decide whether or not to construct a cipher.
-
compressor
(application_name: str) → Optional[eventsourcing.persistence.Compressor][source]¶ Reads environment variable ‘COMPRESSOR_TOPIC’ to decide whether or not to construct a compressor.
-
static
event_store
(**kwargs) → eventsourcing.persistence.EventStore[source]¶ Constructs an event store.
-
aggregate_recorder
(purpose: str = 'events') → eventsourcing.persistence.AggregateRecorder[source]¶ Constructs an aggregate recorder.
-
application_recorder
() → eventsourcing.persistence.ApplicationRecorder[source]¶ Constructs an application recorder.
-
classmethod
-
class
eventsourcing.persistence.
Tracking
(application_name: str, notification_id: int)[source]¶ Bases:
object
Frozen dataclass representing the position of a domain event
Notification
in an application’s notification log.
-
class
eventsourcing.popo.
POPOAggregateRecorder
[source]¶
-
class
eventsourcing.popo.
POPOApplicationRecorder
[source]¶ Bases:
eventsourcing.persistence.ApplicationRecorder
,eventsourcing.popo.POPOAggregateRecorder
-
class
eventsourcing.popo.
POPOProcessRecorder
[source]¶ Bases:
eventsourcing.persistence.ProcessRecorder
,eventsourcing.popo.POPOApplicationRecorder
-
class
eventsourcing.popo.
Factory
(application_name: str, env: Mapping[KT, VT_co])[source]¶ Bases:
eventsourcing.persistence.InfrastructureFactory
-
aggregate_recorder
(purpose: str = 'events') → eventsourcing.persistence.AggregateRecorder[source]¶ Constructs an aggregate recorder.
-
-
class
eventsourcing.sqlite.
SQLiteAggregateRecorder
(datastore: eventsourcing.sqlite.SQLiteDatastore, events_table_name: str = 'stored_events')[source]¶ Bases:
eventsourcing.persistence.AggregateRecorder
-
__init__
(datastore: eventsourcing.sqlite.SQLiteDatastore, events_table_name: str = 'stored_events')[source]¶ Initialize self. See help(type(self)) for accurate signature.
-
-
class
eventsourcing.sqlite.
SQLiteApplicationRecorder
(datastore: eventsourcing.sqlite.SQLiteDatastore, events_table_name: str = 'stored_events')[source]¶ Bases:
eventsourcing.sqlite.SQLiteAggregateRecorder
,eventsourcing.persistence.ApplicationRecorder
-
__init__
(datastore: eventsourcing.sqlite.SQLiteDatastore, events_table_name: str = 'stored_events')[source]¶ Initialize self. See help(type(self)) for accurate signature.
-
-
class
eventsourcing.sqlite.
SQLiteProcessRecorder
(datastore: eventsourcing.sqlite.SQLiteDatastore, events_table_name: str = 'stored_events')[source]¶ Bases:
eventsourcing.sqlite.SQLiteApplicationRecorder
,eventsourcing.persistence.ProcessRecorder
-
class
eventsourcing.sqlite.
Factory
(application_name: str, env: Mapping[KT, VT_co])[source]¶ Bases:
eventsourcing.persistence.InfrastructureFactory
-
__init__
(application_name: str, env: Mapping[KT, VT_co])[source]¶ Initialises infrastructure factory object with given application name.
-
aggregate_recorder
(purpose: str = 'events') → eventsourcing.persistence.AggregateRecorder[source]¶ Constructs an aggregate recorder.
-
-
class
eventsourcing.postgres.
PostgresAggregateRecorder
(datastore: eventsourcing.postgres.PostgresDatastore, events_table_name: str)[source]¶ Bases:
eventsourcing.persistence.AggregateRecorder
-
__init__
(datastore: eventsourcing.postgres.PostgresDatastore, events_table_name: str)[source]¶ Initialize self. See help(type(self)) for accurate signature.
-
-
class
eventsourcing.postgres.
PostgresApplicationRecorder
(datastore: eventsourcing.postgres.PostgresDatastore, events_table_name: str = 'stored_events')[source]¶ Bases:
eventsourcing.postgres.PostgresAggregateRecorder
,eventsourcing.persistence.ApplicationRecorder
-
__init__
(datastore: eventsourcing.postgres.PostgresDatastore, events_table_name: str = 'stored_events')[source]¶ Initialize self. See help(type(self)) for accurate signature.
-
-
class
eventsourcing.postgres.
PostgresProcessRecorder
(datastore: eventsourcing.postgres.PostgresDatastore, events_table_name: str, tracking_table_name: str)[source]¶ Bases:
eventsourcing.postgres.PostgresApplicationRecorder
,eventsourcing.persistence.ProcessRecorder
-
class
eventsourcing.postgres.
Factory
(application_name: str, env: Mapping[KT, VT_co])[source]¶ Bases:
eventsourcing.persistence.InfrastructureFactory
-
__init__
(application_name: str, env: Mapping[KT, VT_co])[source]¶ Initialises infrastructure factory object with given application name.
-
aggregate_recorder
(purpose: str = 'events') → eventsourcing.persistence.AggregateRecorder[source]¶ Constructs an aggregate recorder.
-