dcb — Dynamic consistency boundaries¶
From version 9.5, this library supports Dynamic Consistency Boundaries (DCB) by providing:
an implementation in Python of the basic objects defined in the specification
a range of event stores that work in memory, with PostgreSQL, and with UmaDB
some higher-level abstractions to make working with DCB easier
support in the projections module for eventually-consistent materialized views
Introduction to DCB¶
Dynamic Consistency Boundaries is a significant variant of event sourcing presented in a humorously provocative way as “killing the aggregate”.
The DCB Specification¶
A novel scheme has been proposed that uses a single sequence of events, an application sequence in the terminology of this library.
Events¶
Each event in DCB has one “type” string, some binary “data”, and any number of “tag” strings. Recorded events have an assigned “position” in the sequence, and are called “sequenced events”. These objects correspond to the stored event and notification objects previously defined in this library.
Reading¶
When reading events from a DCB event store, a reader can supply a “query” for selecting events. If no query is provided, or the query does not have any query items, then all events will be selected. A reader may also specify a sequence position after which to select events.
A query can have any number of “query items”. Each query item can have any number of “types” and “tags”. An event is selected by a query if matched by any of the query items. An event is matched if the event’s type is mentioned in the query item’s types, or if the query item has zero types, and if all query item’s tags are mentioned in the event’s tags. In this way, a query item with more types is more inclusive, and a query item with more tags is more restrictive.
Writing¶
When writing new events to a DCB event store, a writer can supply an “append condition” to ensure consistency of recorded state. An append condition can include a query to select conflicting events, and a position after which the query should be applied.
If the append condition fails, because conflicting events have been recorded, then an “integrity error” is raised and the new events are not recorded. Otherwise, all the new events are recorded. Each recorded event is assigned a new position in the application sequence, and thereby becomes a “sequenced event”.
A command method will usually read a selection of sequenced events, and then project the events into a “decision model” with which new events will be generated. When a command method writes new events, the same query that was used for reading can also be used in the append condition. The highest “last known position” at the time of reading can be used as the append condition position.
The command method’s query, used both when reading and writing, therefore defines the “dynamic consistency boundary” for the command.
Discussion¶
The multi-dimensional and cross-cutting possibilities offered by combining query items is impressive. However, this presents a technical challenge when implementing support for DCB applications. Firstly, the selections of events have to be correct for all possible sets of query items. But then also, it will be a technical challenge to achieve performance times for DCB applications that is comparable to that enjoyed by “traditional” event-sourced aggregates.
A sustained effort has been made here to implement persistence for DCB applications is a way that is both correct and performant. At first, an attempt was made to use GIN indexes in PostgreSQL, with both array operators and then with text vectors and then with full text search techniques. Many others have tried this too, in different ways. It is commonly experienced to be slow with any significant volume of recorded events. In consequence, an alternative implementation in PostgreSQL was developed that uses B-trees with a separate table for tags. This was much faster, especially when coded with common table expressions. Finally, the idea of using B-trees with CTEs in PostgreSQL was distilled into a specialist DCB event store written in Rust, now called UmaDB.
Furthermore, we have searched for higher-level abstractions with which domain logic can be more easily expressed, and have developed an application layer that can support such domain models. Ideas previously developed in this library for serialisation, mapping, and declarative syntax, have been applied along with ideas for implementing business logic with vertical slices.
DCB Objects¶
Here we present an implementation in Python of the basic objects for DCB that are described in the specification and discussed in the introduction above.
See this example of using the basic DCB objects to meet the course subscriptions challenge.
DCB Query Item¶
A DCBQueryItem defines a criterion for matching events.
A query item will match an event if one of its types matches the event’s type or the
query item’s types attribute is empty, and if all of its tags match one of the event’s tags
or the query item’s tags attribute is empty.
from eventsourcing.dcb.api import DCBQueryItem
student_query_item = DCBQueryItem(
types=["StudentRegistered"],
tags=["student:123"],
)
DCB Query¶
A DCBQuery defines criteria for selecting events in an event store.
An event is selected if it is matched by any query item included in items attribute,
or if the items attribute is empty.
from eventsourcing.dcb.api import DCBQuery
student_query = DCBQuery(
items=[student_query_item],
)
DCB Event¶
A DCBEvent represents a settled collection of facts to be appended, or
that has already been recorded, in an event store.
from eventsourcing.dcb.api import DCBEvent
student_registered = DCBEvent(
type="StudentRegistered",
data=b'{"student_id": "student:123", "name": "Sara", "max_courses": 5}',
tags=["student:123"],
)
course_registered = DCBEvent(
type="CourseRegistered",
data=b'{"course_id": "course:456", "name": "History", "max_students": 30}',
tags=["course:456"],
)
DCB Sequenced Event¶
A DCBSequencedEvent represents a recorded event along
with its assigned sequence number.
from eventsourcing.dcb.api import DCBSequencedEvent
DCBSequencedEvent(
event=student_registered,
position=56316,
)
DCBSequencedEvent(
event=course_registered,
position=56317,
)
DCB Append Condition¶
A DCBAppendCondition causes an append request to fail if events
match the query value of its fail_if_events_match attribute, optionally
after a sequence number.
from eventsourcing.dcb.api import DCBAppendCondition
append_condition = DCBAppendCondition(
fail_if_events_match=student_query,
after=None,
)
DCB Recorders¶
The term “recorder” here corresponds to the notion “event store” in the DCB specification, and refers to the notion of dealing with records. Following the terminology in this library, the term “recorder” is used for dealing with domain events that have been serialised into a common format, and the term “event store” is used both for a higher-level objects that deal with domain event objects of different types, and for specialist event store databases.
Abstract base class¶
The abstract base class DCBRecorder defines the
read() and append()
method signatures described in the DCB specification for an “event store”. These methods depend on
the DCB Objects described above.
class DCBRecorder(ABC):
@abstractmethod
def read(
self,
query: DCBQuery | None = None,
*,
after: int | None = None,
limit: int | None = None,
) -> DCBReadResponse:
"""
Returns all events, unless 'after' is given then only those with position
greater than 'after', and unless any query items are given, then only those
that match at least one query item. An event matches a query item if its type
is in the item types or there are no item types, and if all the item tags are
in the event tags.
"""
@abstractmethod
def append(
self, events: Sequence[DCBEvent], condition: DCBAppendCondition | None = None
) -> int:
"""
Appends given events to the event store, unless the condition fails.
"""
@abstractmethod
def subscribe(
self,
query: DCBQuery | None = None,
*,
after: int | None = None,
) -> DCBSubscription[Self]:
"""
Returns all events, unless 'after' is given then only those with position
greater than 'after', and unless any query items are given, then only those
that match at least one query item. An event matches a query item if its type
is in the item types or there are no item types, and if all the item tags are
in the event tags. The subscription will block when the last recorded event
is received, and then continue when new events are recorded.
"""
We have made two enhancements that go beyond the DCB specification.
The first enhancement is the standard idea to return int from the append()
method. This value represents the position of the last appended event. By returning this position, systems
implemented with CQRS that transition from a “write” view to an eventually-consistent “read” view can
wait for new events to be processed, by forwarding this value in the read request, avoiding the stale
read model problem.
The second enhancement is to support subscriptions, with the subscribe()
method, so that readers can continue receiving newly recorded events.
Read response¶
The read() method returns a DCBReadResponse,
which is a Python iterator that returns DCBSequencedEvent objects.
It has a head property that allows a reader to obtain a
“last known position” that corresponds to the last recorded event in the database at the time of reading,
rather than the sequence number of the last event it receives. This gives a better value for subsequent
append conditions.
Subscription¶
The subscribe() method returns a DCBSubscription
object, which is a Python iterator that returns DCBSequencedEvent objects. It
can be used as a context manager.
The following sections describe various implementations of the DCBRecorder interface.
In-memory DCB recorder¶
The InMemoryDCBRecorder class implements the DCBRecorder
interface using only Python objects.
from eventsourcing.dcb.popo import InMemoryDCBRecorder
in_memory_recorder = InMemoryDCBRecorder()
# Conditionally append new events.
in_memory_recorder.append(
events=[student_registered],
condition=append_condition,
)
# Read previously appended events.
student_events = in_memory_recorder.read(
query=student_query,
)
It can be used by a DCB application by setting the PERSISTENCE_MODULE environment variable to "eventsourcing.dcb.popo".
DCB events are stored in memory, and “deep copied” when appending and when reading to avoid any corruption of sequenced events. If you click through to the source code, you can see the DCB query logic for selecting events, and the append condition logic that is implemented in the append method. This is extremely fast for relatively low volumes of events, and is ideal for use in unit test suites.
Postgres DCB recorder¶
The PostgresDCBRecorderTT class implements
DCBRecorder in PostgreSQL using the following approach.
It can be used by a DCB application by setting the PERSISTENCE_MODULE environment
variable to "eventsourcing.dcb.postgres_tt", along with other settings such as the database name
and password (see examples).
This is our third attempt to implement the complex DCB query logic in PostgreSQL. The first attempt used array columns and array operators. The second attempt used text search. Both of these first two attempts gave poor results. The third attempt, explained below, gives much better results.
The general idea is that tags tend to follow from individual enduring objects in the real world, whereas event type strings follow from software object classes in a domain model. Therefore tags tend to have high cardinality, whereas type strings tend to have low cardinality. Therefore tags tend to be highly selective in queries, whereas type strings do not. Mixing these up in an SQL select statement causes the PostgreSQL query planner to find sub-optimal solutions.
This implementation uses a secondary table of tags indexed with a B-tree. Queries select tags first, and then filter by position and type. The sequence positions on the main table are also indexed with a B-tree that “covers” the type column. In this way, recorded events can be selected by tag, filtered by type, ordered and limited, using only B-tree indexes, without touching the main table of recorded events.
Conditional append operations use separate multi-clause “fail condition” and “unconditional append” CTE statements. Having two separate statements in a PL/pgSQL function means each can be planned separately, whilst conditional append operations can be performed efficiently with one client-server round-trip. The function parameters include lists of DCB query items and DCB events, defined as composite arrays of custom types. A multi-clause CTE statement is also used to select events for read operations. These functions are executed as prepared statements.
Logging execution times directly from the database shows that the database typically executes conditional appends in 100-200 μs with millions of recorded events.
See the speedrun example for a comparative report and analysis of the performance.
UmaDB DCB recorder¶
UmaDB is a specialist event store for DCB written in Rust. The Python package
eventsourcing_umadb implements DCBRecorder by adapting the Python client for UmaDB.
It can be used by a DCB application by setting the PERSISTENCE_MODULE environment variable to "eventsourcing_umadb".
UmaDB uses the same idea as the Postgres DCB recorder of filtering first by tags. UmaDB follows the copy-on-write MVCC design of LMDB.
See the speedrun example for a comparative report and analysis of the performance.
Higher-level Abstractions¶
The following sections describe higher-level abstractions for event sourcing with DCB.
The higher-level abstractions shown below introduces the notion “enduring object” which is quite like “event-sourced aggregate” but with some important differences, the notion “group”, which is a collection of many enduring objects that can also make decisions which affect the whole group, and the notion “slice” which can be used with “vertical slice architecture”.
The more general abstraction that has been derived is the notion of a decision-making “perspective”. In general, a “perspective” is a selective view of the past, an apprehension of decisions already made, at the beginning of the process of creating a new decision. Here, a “perspective” is a selection of decisions already recorded, that forms the “datum” for a decision model which generates new decision.
In order to reach towards a coherent scheme, some of the names have been borrowed from the highly relevant event-oriented modern process philosophy of Alfred North Whitehead.
“The ‘settlement’ which an actual entity ‘finds’ is its datum. It is to be conceived as a limited perspective of the ‘settled’ world provided by the eternal objects concerned. This datum is ‘decided’ by the settled world. It is ‘prehended’ by the new superseding entity. The datum is the objective content of the experience. The decision, providing the datum, is a transference of self-limited appetition; the settled world provides the ‘real potentiality’ that its many actualities be felt compatibly; and the new concrescence starts from this datum. The perspective is provided by the elimination of incompatibilities. The final stage, the ‘decision’ is how the actual entity, having attained its individual ‘satisfaction’ thereby adds a determinate condition to the settlement for the future beyond itself. Thus the ‘datum’ is the ‘decision received’ and the ‘decision’ is the ‘decision transmitted’. Between these two decisions, received and transmitted, there lie the two stages, ‘process’ and ‘satisfaction’.”
“A nexus enjoys ‘personal order’ when (a) it is a ‘society’ and (b) when the genetic relatedness of its members orders these members ‘serially’. By this ‘serial ordering’ arising from the genetic relatedness, it is meant that any member of the nexus — excluding the first and the last, if there be such — constitutes a ‘cut’ in the nexus, so that (a) this member inherits from all members on one side of the cut, and from no members on the other side of the cut, and (b) if A and B are two members of the nexus and B inherits from A, then the side of B’s cut, inheriting from B, forms part of the side of A’s cut, inheriting from A, and the side of A’s cut from which A inherits forms part of the side of B’s cut from which B inherits. Thus the nexus forms a single line of inheritance of its defining characteristic. Such a nexus is called an ‘enduring object’.”
The abstractions supporting the higher-level styles of domain modelling with DCB are described briefly below with short examples.
Decision¶
The Decision class is defined as the root of
the decision class hierarchy in domain models that uses DCB.
It represents the general notion of giving form to the settled production of new facts in a domain model. Concrete subclasses will each define a name and a collection of attributes, and will be somehow serializable and deserializable. Abstract subclasses may involve some kind of declarative syntax that supports automatic serialisation, such as we see with Pydantic and MessagePack.
The examples below define decision types as Python data classes.
from dataclasses import dataclass
from eventsourcing.dcb.domain import Decision
@dataclass
class StudentRegistered(Decision):
student_id: str
name: str
max_courses: int
@dataclass
class CourseRegistered(Decision):
course_id: str
name: str
max_students: int
student_registered = StudentRegistered(
student_id="student:123",
name="Sara",
max_courses=5,
)
course_registered = CourseRegistered(
course_id="course:456",
name="History",
max_students=30,
)
Additionally, the class InitialDecision can be used to represent the first
decision in a modelled set of decisions that have serial order (a sequence). It extends Decision
with an “originator topic” type hint that may be implemented to represent the type of thing to which the sequence
belongs. All the members of such a sequence of decisions are likely each to be tagged with a common tag,
and perhaps also other tags for cross-cutting decisions or other classifications. The common tag in this case
is likely to represent the continuity ID of the thing to which the whole sequence belongs.
TDecision¶
The type variable TDecision represents
any subclass of Decision. It will be used
as a type parameter to define generic types that can be specialized to work
with one kind of decision or another.
Tagged¶
The generic class Tagged encapsulates a
Decision, along with some tag strings.
It corresponds to the “typed and tagged” lower-level DCB event type.
class Tagged(Generic[TDecision]):
def __init__(self, tags: list[str], decision: TDecision) -> None:
self.tags = tags
self.decision = decision
from eventsourcing.dcb.domain import Tagged
tagged_decision = Tagged[StudentRegistered](
tags=["student:123"],
decision=student_registered,
)
Mapper¶
The class DCBMapper is an abstract base class that
defines an interface for converting between the higher-level Decision
instances and the lower-level DCBEvent instances.
class DCBMapper(ABC, Generic[TDecision]):
@abstractmethod
def to_dcb_event(self, event: Tagged[TDecision]) -> DCBEvent:
raise NotImplementedError # pragma: no cover
@abstractmethod
def to_domain_event(self, event: DCBEvent) -> Tagged[TDecision]:
raise NotImplementedError # pragma: no cover
Concrete subclasses will implement or invoke some kind of serialization and deserialization functionality, for example by using JSON, Pydantic, MessagePack, or Protobuf.
import json
class JSONMapper(DCBMapper[Decision]):
def __init__(self, registered_types: list[type[Decision]]) -> None:
self.registered_types = {t.__qualname__: t for t in registered_types}
def to_dcb_event(self, event: Tagged[Decision]) -> DCBEvent:
return DCBEvent(
type=type(event.decision).__qualname__,
data=json.dumps(event.decision.as_dict()),
tags=event.tags,
)
def to_domain_event(self, event: DCBEvent) -> Tagged[Decision]:
return Tagged(
tags=event.tags,
decision = self.registered_types[event.type](**json.loads(event.data))
)
json_mapper = JSONMapper(
registered_types=[
StudentRegistered,
CourseRegistered,
]
)
# Convert from tagged decision to DCB event.
dcb_event = json_mapper.to_dcb_event(tagged_decision)
# Convert from DCB event to tagged decision.
tagged_decision = json_mapper.to_domain_event(dcb_event)
Usually this requires some kind of alignment with the decision classes. For example,
the module eventsourcing.dcb.msgpack defines a mapper and decision base classes
that work together using the super fast and compact msgpack format.
To use this module, you will need to install the Python msgspec package.
from eventsourcing.dcb.msgpack import Decision, MessagePackMapper, InitialDecision
class StudentJoinedCourse(Decision):
student_id: str
course_id: str
msgpack_mapper = MessagePackMapper()
tagged_decision = Tagged(
tags=["student:123", "course:456"],
decision=StudentJoinedCourse(
student_id="student:123",
course_id="course:123",
)
)
dcb_event = msgpack_mapper.to_dcb_event(tagged_decision)
tagged_decision = msgpack_mapper.to_domain_event(dcb_event)
Selector¶
A Selector defines a criterion for a consistency boundary
in a domain model, in terms of decision types and tag strings. It corresponds to the
lower-level query item.
from eventsourcing.dcb.domain import Selector
student_selector = Selector(
types=[StudentRegistered],
tags=["student:123"],
)
course_selector = Selector(
types=[CourseRegistered],
tags=["course:456"],
)
A list of selectors corresponds to the lower-level query.
student_consistency_boundary = [student_selector]
Event store¶
A DCBEventStore encapsulates both a mapper and a recorder.
It has methods for reading and appending tagged decisions.
The read() method returns an iterator of matching
tagged events. The optional cb parameter is a consistency boundary for selecting events.
The argument can be either a list of selectors, or an individual selector. The optional after parameter
is a sequence number after which events will be read.
The append() method has an events parameter, which
is a list of tagged decisions. The optional cb parameter is a consistency boundary
for detecting conflicting events. The argument can be either a list of selectors, or an individual selector.
The optional after parameter represents a sequence number after which conflicting events will be detected.
from eventsourcing.dcb.persistence import DCBEventStore
event_store = DCBEventStore(
mapper=json_mapper,
recorder=in_memory_recorder,
)
# Read already appended student events.
student_events = event_store.read(
cb=student_selector,
)
# Append new course events.
event_store.append(
events=[tagged_decision],
cb=course_selector,
)
# Read already appended course events.
course_events = event_store.read(
cb=course_selector,
)
Perspective¶
The Perspective class is an abstract base class for different
kinds of decision models. It defines an abstract method consistency_boundary()
that must be implemented by subclasses to return a selector or a sequence of selectors. It also
defines a last_known_position attribute for keeping track of the last
known sequence number when a perspective is reconstructed from sequenced events. The consistency boundary of a perspective
is used as a query when reading events that will be used to reconstruct the perspective. The consistency boundary and
the last known position is used as an append condition when appending new events.
The Perspective class also provides
trigger_event() for creating and appending new tagged decisions
to an internal list, and collect_events() for collecting
all new tagged decisions.
from eventsourcing.dcb.domain import Perspective
class MyPerspective(Perspective):
def consistency_boundary(self) -> list[Selector]:
return [Selector(tags=["tag1"]), Selector(tags=["tag2"])]
# Construct a perspective.
perspective = MyPerspective()
# Get consistency boundary.
cb = perspective.consistency_boundary()
# Update "last known position", usually after selecting tagged
# decisions and updating the state of the perspective.
perspective.last_known_position = 1234
# Generate new tagged decisions.
perspective.trigger_event(
Decision,
tags=["tag1", "tag2"],
)
# Collect new decisions, usually before append them into an event
# store using the consistency boundary and the last known position.
new_decisions = perspective.collect_events()
# Append new decisions, using the same consistency boundary and the
# "last known position" when the perspective was reconstructed....
Enduring object¶
The generic base class EnduringObject extends the perspective class,
and is similar to event-sourced aggregates. Each instance has a unique continuity ID,
which is stored in the id attribute. The continuity ID is used as a tag
in its consistency boundary, and to tag new decisions.
Enduring objects can have command methods decorated with the library’s event decorator. Calling a decorated command method will generate a new tagged decision. The method body will be used to project tagged events into the current state of the enduring object.
Enduring object subclasses must be associated with an InitialDecision class
whose attributes match the arguments of its initializer __init__() method. This association can be made
either by defining a subclass of InitialDecision as a nested class on
the enduring object subclass, or by mentioning a subclass of InitialDecision
in an event decorator in the __init__() method.
Enduring object instances can be created by calling the subclass. This will trigger a new “initial decision”,
that will be used to construct the enduring object instance. The “initial decision” can be collected from
the enduring object instance by calling collect_events(). The
examples below show a student and course modelled as enduring objects. The StudentJoinedCourse decision class,
defined in the mapper example above, is included in the projection of both enduring objects,
in preparation for the group example in the next section.
from eventsourcing.domain import event
from eventsourcing.dcb.domain import EnduringObject
class Student(EnduringObject[Decision, str]):
class Registered(InitialDecision):
student_id: str
name: str
max_courses: int
class NameUpdated(Decision):
name: str
@event(Registered)
def __init__(self, name: str, max_courses: int) -> None:
self.name = name
self.max_courses = max_courses
self.course_ids: list[str] = []
@event(NameUpdated)
def update_name(self, name: str) -> None:
self.name = name
@event(StudentJoinedCourse)
def _(self, course_id: str) -> None:
self.course_ids.append(course_id)
class Course(EnduringObject[Decision, str]):
class Registered(InitialDecision):
course_id: str
name: str
max_students: int
def __init__(self, name: str, max_students: int) -> None:
self.name = name
self.max_students = max_students
self.student_ids: list[str] = []
@event(StudentJoinedCourse)
def _(self, student_id: str) -> None:
self.student_ids.append(student_id)
# Create a new student.
student = Student(
name="Sara",
max_courses=5,
)
assert student.name == "Sara"
assert student.max_courses == 5
# Update the student name.
student.update_name("Sara P")
assert student.name == "Sara P"
# Collect new events.
tagged_decisions = student.collect_events()
assert len(tagged_decisions) == 2
# Create a new course...
course = Course(
name="History",
max_students=30,
)
assert course.name == "History"
assert course.max_students == 30
See the DCB examples for a more complete example.
The advantage of enduring objects is the conceptual unity of having everything together in one place. However, this aligns enduring objects with the central criticism of event-sourced aggregates motivating DCB: that including all events in the consistency boundary, regardless of whether they are actually required for any particular operation, increases contention unnecessarily. Following this comes the accumulation of all commands and queries in a single class, tending towards large units of code that are hard to understand. See slices for an alternative higher-level abstraction.
Group¶
The Group class extends the perspective class, and supports
cross-cutting decision-making across many enduring objects. The consistency boundary of a group is the union
of the consistency boundaries of the enduring objects in the group.
A group is constructed with already existing enduring objects. Its command methods can trigger new tagged
decisions, using the group’s trigger_event() method. Decisions created by
a group will be tagged with all the continuity IDs of the enduring objects in the group. If an enduring
object’s projection includes that decision, its state will be evolved accordingly.
The example below uses the student and course enduring objects in a group that
triggers a StudentJoinedCourse event in the student_joins_course() method that applies to both
the student and the course. Similarly, a student_leaves_course() method could be implemented for this group.
from eventsourcing.dcb.domain import Group
class StudentAndCourse(Group[Decision]):
def __init__(
self,
student: Student | None,
course: Course | None,
) -> None:
self.student = student
self.course = course
def student_joins_course(self) -> None:
# Enforce business rules.
assert len(self.student.course_ids) < self.student.max_courses
assert len(self.course.student_ids) < self.course.max_students
assert self.student.id not in self.course.student_ids
# The DCB magic: one event for "one fact".
self.trigger_event(
StudentJoinedCourse,
student_id=self.student.id,
course_id=self.course.id,
)
group = StudentAndCourse(student, course)
group.student_joins_course()
assert student.id in course.student_ids
assert course.id in student.course_ids
Using groups to trigger cross-cutting events like this demonstrates the “one fact magic” of DCB. However, because the consistency boundary for a group is the union of the consistency boundaries for the members of a group, the criticism of enduring objects applies even more to groups: that including all events in the consistency boundary, regardless of whether they are actually required for any particular operation, increases contention unnecessarily. This corresponds exactly to recording new events from more than one aggregate in the same transaction. The difference with groups is that one event can affect many enduring objects.
Slice¶
The Slice class extends the perspective class, and
is designed to support “vertical slice architecture” with DCB. The idea of “vertical slices” is that individual
use cases can be implemented with pieces of code that are entirely independent of each other.
Slices can support both command and query use cases.
The three important aspects of a slice are:
Consistency Boundary — implement
consistency_boundary()to return selectors.Projection — use the event decorator to define how selected decisions evolve state.
Command Action — implement
execute()to generate new decisions.
The consistency boundary for a slice can be used both to select events for the slice’s projection, if it has one, and to select conflicting events when appending any new events to an event store.
The example below shows a slice for updating a student’s name. The consistency_boundary()
involves decision classes mentioned in the projection by using the projected_types attribute, which
automatically collects all decision classes mentioned in the slice’s @event decorators.
from eventsourcing.dcb.domain import Slice
class UpdateStudentName(Slice[Decision]):
def __init__(self, student_id: StudentID, new_name: str) -> None:
self.student_id = student_id
self.new_name = new_name
self.name = ""
self.student_was_registered: bool = False
def consistency_boundary(self) -> Selector:
return Selector(
types=self.projected_types,
tags=[self.student_id],
)
@event(Student.Registered)
def _(self, name: str) -> None:
self.name = name
self.student_was_registered = True
@event(Student.NameUpdated)
def _(self, name: str) -> None:
self.name = name
def execute(self) -> None:
assert self.student_was_registered
assert self.name != self.new_name
self.trigger_event(
Student.NameUpdated,
tags=[self.student_id],
name=self.new_name,
)
See the DCB examples for a more complete set of examples.
The advantage of using slices is that individual use cases can be implemented with pieces of code that are entirely independent of each other, and with consistency boundaries that include only what is necessary. However, this may come at the cost of some repetition of business logic, increasing the volume of code, which tends to increase the chances of introducing coding errors. See enduring objects for an alternative higher-level abstraction.
Mixing styles¶
The decision classes used in the UpdateStudentName slice are those defined above on the Student enduring
object. In these examples, the decision classes are defined as nested classes, but defining them as module-level
classes would work just as well.
This shows that it is possible to develop a domain model with enduring objects and rework your code to use slices. Similarly, with a little care, it is possible to start with slices and rework your code to use enduring objects.
Indeed, it is possible to have some parts of your domain model defined with enduring objects, and groups, and to have other parts defined using slices. This is demonstrated in the example application below.
Repository¶
The DCBRepository class is provided to support working with
perspectives of different kinds.
A repository is constructed with an event store.
from eventsourcing.dcb.application import DCBRepository
repository = DCBRepository(
eventstore=DCBEventStore(
mapper=MessagePackMapper(),
recorder=InMemoryDCBRecorder(),
),
)
The save() method collects and appends new decisions.
student = Student(
name="Sara",
max_courses=5,
)
course = Course(
name="History",
max_students=30,
)
repository.save(student)
repository.save(course)
The get() method reconstructs an enduring object
for a given continuity ID.
student = repository.get(student.id)
course = repository.get(course.id)
assert student.name == "Sara"
assert student.max_courses == 5
assert student.course_ids == []
assert course.name == "History"
assert course.max_students == 30
assert course.student_ids == []
The get_many() method reconstructs many enduring objects
for a given sequence of continuity IDs.
student, course = repository.get_many(student.id, course.id)
assert student.name == "Sara"
assert student.max_courses == 5
assert student.course_ids == []
assert course.name == "History"
assert course.max_students == 30
assert course.student_ids == []
The get_group() method constructs a group
and its enduring object for a given sequence of continuity IDs.
group = repository.get_group(StudentAndCourse, student.id, course.id)
group.student_joins_course()
repository.save(group)
# Check the student has joined the course.
student = repository.get(student.id)
course = repository.get(course.id)
assert course.id in student.course_ids
assert student.id in course.student_ids
The advance() method selects and applies
decisions to a perspective. It can be used to update a slice to its current
state before calling its execute method.
update_student_name = UpdateStudentName(student_id=student.id, new_name="Sara P")
assert update_student_name.name == ""
assert update_student_name.student_was_registered is False
repository.advance(update_student_name)
assert update_student_name.name == "Sara"
assert update_student_name.student_was_registered is True
update_student_name.execute()
assert update_student_name.name == "Sara P"
assert update_student_name.student_was_registered is True
repository.save(update_student_name)
Application¶
An application object brings together a stand-alone domain model and supportive persistence infrastructure, and implements commands and queries that support user interfaces.
Just like the library’s original application class,
DCBApplication selects and constructs a
DCB recorder at run-time, according to its environment variable configuration.
This means we can define a DCB application independently of persistence infrastructure, and then run it
in different ways at different times.
The DCBApplication class also supports the higher-level
abstractions described above. It has a repository to support working
with perspectives. It also has a method do()
which supports working with slices by advancing and executing a slice, then saving new decisions.
It is also possible to use the basic DCB objects directly with an application, and to extend
DCBApplication to support any other higher-level style you may wish to invent.
The example below shows how to write command and query methods using enduring objects, groups, and slices.
from eventsourcing.dcb.application import DCBApplication
class CourseSubscriptions(DCBApplication):
def register_student(self, name: str) -> str:
student = Student(name=name, max_courses=5)
self.repository.save(student)
return student.id
def update_student_name(self, student_id: str, new_name: str) -> None:
self.do(UpdateStudentName(student_id, new_name))
def register_course(self, name: str) -> str:
course = Course(name=name, max_students=30)
self.repository.save(course)
return course.id
def enrol_student_on_course(self, student_id: str, course_id: str) -> None:
group = self.repository.get_group(StudentAndCourse, student_id, course_id)
group.student_joins_course()
self.repository.save(group)
def list_courses_for_student(self, student_id: str) -> list[str]:
student: Student = self.repository.get(student_id)
return [c.name for c in self.repository.get_many(*student.course_ids)]
def list_students_for_course(self, course_id: str) -> list[str]:
course: Course = self.repository.get(course_id)
return [s.name for s in self.repository.get_many(*course.student_ids)]
# Construct app to use MessagePack and in-memory persistence.
app = CourseSubscriptions(env={
"PERSISTENCE_MODULE": "eventsourcing.dcb.popo",
"MAPPER_TOPIC": "eventsourcing.dcb.msgpack:MessagePackMapper",
})
# Construct enduring objects.
student_id = app.register_student("Sara")
course_id = app.register_course("History")
# Update the student name using a vertical slice.
app.update_student_name(student_id, "Sara P")
# Enrol the student on the course using a group.
app.enrol_student_on_course(student_id, course_id)
# Query for student and course names.
assert "Sara P" in app.list_students_for_course(course_id)
assert "History" in app.list_courses_for_student(student_id)
Read the examples pages for more discussion and examples of DCB.
Code reference¶
- class eventsourcing.dcb.api.DCBQueryItem(types: 'list[str]' = <factory>, tags: 'list[str]' = <factory>)[source]¶
Bases:
object- types: list[str]¶
- tags: list[str]¶
- __init__(types: list[str] = <factory>, tags: list[str] = <factory>) None¶
- class eventsourcing.dcb.api.DCBQuery(items: 'list[DCBQueryItem]' = <factory>)[source]¶
Bases:
object- items: list[DCBQueryItem]¶
- __init__(items: list[~eventsourcing.dcb.api.DCBQueryItem] = <factory>) None¶
- class eventsourcing.dcb.api.DCBAppendCondition(fail_if_events_match: 'DCBQuery' = <factory>, after: 'int | None' = None)[source]¶
Bases:
object- after: int | None = None¶
- __init__(fail_if_events_match: ~eventsourcing.dcb.api.DCBQuery = <factory>, after: int | None = None) None¶
- class eventsourcing.dcb.api.DCBEvent(type: 'str', data: 'bytes', tags: 'list[str]' = <factory>)[source]¶
Bases:
object- type: str¶
- data: bytes¶
- tags: list[str]¶
- __init__(type: str, data: bytes, tags: list[str] = <factory>) None¶
- class eventsourcing.dcb.api.DCBSequencedEvent(event: 'DCBEvent', position: 'int')[source]¶
Bases:
object- position: int¶
- class eventsourcing.dcb.api.DCBReadResponse[source]¶
Bases:
Iterator[DCBSequencedEvent],ABC- abstract property head: int | None¶
- class eventsourcing.dcb.api.DCBRecorder[source]¶
Bases:
ABC- abstract read(query: DCBQuery | None = None, *, after: int | None = None, limit: int | None = None) DCBReadResponse[source]¶
Returns all events, unless ‘after’ is given then only those with position greater than ‘after’, and unless any query items are given, then only those that match at least one query item. An event matches a query item if its type is in the item types or there are no item types, and if all the item tags are in the event tags.
- abstract append(events: Sequence[DCBEvent], condition: DCBAppendCondition | None = None) int[source]¶
Appends given events to the event store, unless the condition fails.
- abstract subscribe(query: DCBQuery | None = None, *, after: int | None = None) DCBSubscription[Self][source]¶
Returns all events, unless ‘after’ is given then only those with position greater than ‘after’, and unless any query items are given, then only those that match at least one query item. An event matches a query item if its type is in the item types or there are no item types, and if all the item tags are in the event tags. The subscription will block when the last recorded event is received, and then continue when new events are recorded.
- class eventsourcing.dcb.api.DCBSubscription(recorder: TDCBRecorder_co, query: DCBQuery | None = None, after: int | None = None)[source]¶
Bases:
Iterator[DCBSequencedEvent],Generic[TDCBRecorder_co]
- class eventsourcing.dcb.application.DCBApplication(env: Mapping[str, str] | None = None)[source]¶
Bases:
object- name = 'DCBApplication'¶
- env: Mapping[str, str] = {'PERSISTENCE_MODULE': 'eventsourcing.dcb.popo'}¶
- construct_env(name: str, env: Mapping[str, str] | None = None) Environment[source]¶
Constructs environment from which application will be configured.
- class eventsourcing.dcb.application.DCBRepository(eventstore: DCBEventStore[TDecision])[source]¶
Bases:
Generic[TDecision]- __init__(eventstore: DCBEventStore[TDecision])[source]¶
- save(p: Perspective[TDecision]) int[source]¶
- get(enduring_object_id: str) EnduringObject[TDecision, str][source]¶
- get_many(*enduring_object_ids: str) list[EnduringObject[TDecision, str] | None][source]¶
- class eventsourcing.dcb.domain.InitialDecision[source]¶
Bases:
Decision- originator_topic: str¶
- classmethod id_attr_name(enduring_object_class: type[EnduringObject[Any, TID]]) TID[source]¶
- class eventsourcing.dcb.domain.TDecision¶
A type variable representing any subclass of
Decision.alias of TypeVar(‘TDecision’, bound=
Decision)- has_default()¶
- class eventsourcing.dcb.domain.Tagged(tags: list[str], decision: TDecision)[source]¶
Bases:
Generic[TDecision]
- class eventsourcing.dcb.domain.MetaPerspective(name, bases, namespace, /, **kwargs)[source]¶
Bases:
ABCMeta
- class eventsourcing.dcb.domain.Perspective(*_: Any, **__: Any)[source]¶
Bases:
ABC,Generic[TDecision]- last_known_position: int | None¶
- class eventsourcing.dcb.domain.MetaSupportsEventDecorator(name, bases, namespace, /, **kwargs)[source]¶
Bases:
MetaPerspective
- class eventsourcing.dcb.domain.MetaEnduringObject(name, bases, namespace, /, **kwargs)[source]¶
Bases:
MetaSupportsEventDecorator
- class eventsourcing.dcb.domain.EnduringObject(**kwargs: Any)[source]¶
Bases:
Perspective[TDecision],Generic[TDecision,TID]- id: TID¶
- trigger_event(decision_cls: Callable[P, TDecision], tags: Sequence[str] = (), *args: P.args, **kwargs: P.kwargs) None[source]¶
Constructs new tagged decision and appends to list of uncommitted events.
- projected_types = []¶
- class eventsourcing.dcb.domain.Group(*args: Any, **kwargs: Any)[source]¶
Bases:
Perspective[TDecision]
- class eventsourcing.dcb.domain.Selector(types: 'Sequence[type[Decision]]' = (), tags: 'Sequence[str]' = ())[source]¶
Bases:
object- tags: Sequence[str] = ()¶
- class eventsourcing.dcb.domain.MetaSlice(name, bases, namespace, /, **kwargs)[source]¶
Bases:
MetaSupportsEventDecorator
- class eventsourcing.dcb.domain.Slice(*_: Any, **__: Any)[source]¶
Bases:
Perspective[TDecision]- do_projection = False¶
- projected_types = []¶
- last_known_position: int | None¶
- class eventsourcing.dcb.persistence.DCBEventStore(mapper: DCBMapper[TDecision], recorder: DCBRecorder)[source]¶
Bases:
Generic[TDecision]- __init__(mapper: DCBMapper[TDecision], recorder: DCBRecorder)[source]¶
- class eventsourcing.dcb.persistence.DCBEventStoreReadResponse(dcb_read_response: DCBReadResponse, mapper: DCBMapper[TDecision])[source]¶
Bases:
Iterator[Tagged[TDecision]]- __init__(dcb_read_response: DCBReadResponse, mapper: DCBMapper[TDecision])[source]¶
- property head: int | None¶
- class eventsourcing.dcb.persistence.DCBInfrastructureFactory(env: Environment | Mapping[str, str] | None)[source]¶
Bases:
BaseInfrastructureFactory[TTrackingRecorder],ABC- abstract dcb_recorder() DCBRecorder[source]¶
- class eventsourcing.dcb.persistence.DCBListenNotifySubscription(recorder: TDCBRecorder_co, query: DCBQuery | None = None, after: int | None = None)[source]¶
Bases:
DCBSubscription[TDCBRecorder_co]
- class eventsourcing.dcb.popo.InMemoryDCBRecorder[source]¶
Bases:
DCBRecorder,POPORecorder- read(query: DCBQuery | None = None, *, after: int | None = None, limit: int | None = None) DCBReadResponse[source]¶
Returns all events, unless ‘after’ is given then only those with position greater than ‘after’, and unless any query items are given, then only those that match at least one query item. An event matches a query item if its type is in the item types or there are no item types, and if all the item tags are in the event tags.
- append(events: Sequence[DCBEvent], condition: DCBAppendCondition | None = None) int[source]¶
Appends given events to the event store, unless the condition fails.
- subscribe(query: DCBQuery | None = None, *, after: int | None = None) InMemorySubscription[source]¶
Returns all events, unless ‘after’ is given then only those with position greater than ‘after’, and unless any query items are given, then only those that match at least one query item. An event matches a query item if its type is in the item types or there are no item types, and if all the item tags are in the event tags. The subscription will block when the last recorded event is received, and then continue when new events are recorded.
- class eventsourcing.dcb.popo.InMemorySubscription(recorder: InMemoryDCBRecorder, query: DCBQuery | None = None, after: int | None = None)[source]¶
Bases:
DCBListenNotifySubscription[InMemoryDCBRecorder]- __init__(recorder: InMemoryDCBRecorder, query: DCBQuery | None = None, after: int | None = None) None[source]¶
- class eventsourcing.dcb.popo.SimpleDCBReadResponse(events: Iterator[DCBSequencedEvent], head: int | None = None)[source]¶
Bases:
DCBReadResponse- __init__(events: Iterator[DCBSequencedEvent], head: int | None = None)[source]¶
- property head: int | None¶
- class eventsourcing.dcb.popo.InMemoryDCBFactory(env: Environment | Mapping[str, str] | None)[source]¶
Bases:
POPOFactory,DCBInfrastructureFactory[POPOTrackingRecorder]- dcb_recorder() DCBRecorder[source]¶
- class eventsourcing.dcb.postgres_tt.PostgresDCBRecorderTT(datastore: PostgresDatastore, *, events_table_name: str = 'dcb_events')[source]¶
Bases:
DCBRecorder,PostgresRecorder- __init__(datastore: PostgresDatastore, *, events_table_name: str = 'dcb_events')[source]¶
- read(query: DCBQuery | None = None, *, after: int | None = None, limit: int | None = None) DCBReadResponse[source]¶
Returns all events, unless ‘after’ is given then only those with position greater than ‘after’, and unless any query items are given, then only those that match at least one query item. An event matches a query item if its type is in the item types or there are no item types, and if all the item tags are in the event tags.
- subscribe(query: DCBQuery | None = None, *, after: int | None = None) PostgresDCBSubscription[source]¶
Returns all events, unless ‘after’ is given then only those with position greater than ‘after’, and unless any query items are given, then only those that match at least one query item. An event matches a query item if its type is in the item types or there are no item types, and if all the item tags are in the event tags. The subscription will block when the last recorded event is received, and then continue when new events are recorded.
- append(events: Sequence[DCBEvent], condition: DCBAppendCondition | None = None) int[source]¶
Appends given events to the event store, unless the condition fails.
- construct_psycopg_dcb_events(dcb_events: Sequence[DCBEvent]) list[PsycopgDCBEvent][source]¶
- construct_psycopg_query_items(query_items: Sequence[DCBQueryItem]) list[PsycopgDCBQueryItem][source]¶
- class eventsourcing.dcb.postgres_tt.PsycopgDCBEvent(type, data, tags)[source]¶
Bases:
NamedTuple- type: str¶
Alias for field number 0
- data: bytes¶
Alias for field number 1
- tags: list[str]¶
Alias for field number 2
- class eventsourcing.dcb.postgres_tt.PsycopgDCBQueryItem(types, tags)[source]¶
Bases:
NamedTuple- types: list[str]¶
Alias for field number 0
- tags: list[str]¶
Alias for field number 1
- class eventsourcing.dcb.postgres_tt.PostgresDCBSubscription(recorder: PostgresDCBRecorderTT, query: DCBQuery | None = None, after: int | None = None)[source]¶
Bases:
DCBListenNotifySubscription[PostgresDCBRecorderTT]- __init__(recorder: PostgresDCBRecorderTT, query: DCBQuery | None = None, after: int | None = None) None[source]¶
- class eventsourcing.dcb.postgres_tt.PostgresTTDCBFactory(env: Environment | Mapping[str, str] | None)[source]¶
Bases:
BasePostgresFactory[PostgresTrackingRecorder],DCBInfrastructureFactory[PostgresTrackingRecorder]- dcb_recorder() DCBRecorder[source]¶
- class eventsourcing.dcb.msgpack.InitialDecision(originator_topic: str)[source]¶
Bases:
Decision,InitialDecision- originator_topic: str¶