dcb — Dynamic consistency boundaries

From version 9.5, this library supports Dynamic Consistency Boundaries (DCB) by providing:

Introduction to DCB

Dynamic Consistency Boundaries is a significant variant of event sourcing presented in a humorously provocative way as “killing the aggregate”.

The DCB Specification

A novel scheme has been proposed that uses a single sequence of events, an application sequence in the terminology of this library.

Events

Each event in DCB has one “type” string, some binary “data”, and any number of “tag” strings. Recorded events have an assigned “position” in the sequence, and are called “sequenced events”. These objects correspond to the stored event and notification objects previously defined in this library.

Reading

When reading events from a DCB event store, a reader can supply a “query” for selecting events. If no query is provided, or the query does not have any query items, then all events will be selected. A reader may also specify a sequence position after which to select events.

A query can have any number of “query items”. Each query item can have any number of “types” and “tags”. An event is selected by a query if matched by any of the query items. An event is matched if the event’s type is mentioned in the query item’s types, or if the query item has zero types, and if all query item’s tags are mentioned in the event’s tags. In this way, a query item with more types is more inclusive, and a query item with more tags is more restrictive.

Writing

When writing new events to a DCB event store, a writer can supply an “append condition” to ensure consistency of recorded state. An append condition can include a query to select conflicting events, and a position after which the query should be applied.

If the append condition fails, because conflicting events have been recorded, then an “integrity error” is raised and the new events are not recorded. Otherwise, all the new events are recorded. Each recorded event is assigned a new position in the application sequence, and thereby becomes a “sequenced event”.

A command method will usually read a selection of sequenced events, and then project the events into a “decision model” with which new events will be generated. When a command method writes new events, the same query that was used for reading can also be used in the append condition. The highest “last known position” at the time of reading can be used as the append condition position.

The command method’s query, used both when reading and writing, therefore defines the “dynamic consistency boundary” for the command.

Discussion

The multi-dimensional and cross-cutting possibilities offered by combining query items is impressive. However, this presents a technical challenge when implementing support for DCB applications. Firstly, the selections of events have to be correct for all possible sets of query items. But then also, it will be a technical challenge to achieve performance times for DCB applications that is comparable to that enjoyed by “traditional” event-sourced aggregates.

A sustained effort has been made here to implement persistence for DCB applications is a way that is both correct and performant. At first, an attempt was made to use GIN indexes in PostgreSQL, with both array operators and then with text vectors and then with full text search techniques. Many others have tried this too, in different ways. It is commonly experienced to be slow with any significant volume of recorded events. In consequence, an alternative implementation in PostgreSQL was developed that uses B-trees with a separate table for tags. This was much faster, especially when coded with common table expressions. Finally, the idea of using B-trees with CTEs in PostgreSQL was distilled into a specialist DCB event store written in Rust, now called UmaDB.

Furthermore, we have searched for higher-level abstractions with which domain logic can be more easily expressed, and have developed an application layer that can support such domain models. Ideas previously developed in this library for serialisation, mapping, and declarative syntax, have been applied along with ideas for implementing business logic with vertical slices.

DCB Objects

Here we present an implementation in Python of the basic objects for DCB that are described in the specification and discussed in the introduction above.

See this example of using the basic DCB objects to meet the course subscriptions challenge.

DCB Query Item

A DCBQueryItem defines a criterion for matching events. A query item will match an event if one of its types matches the event’s type or the query item’s types attribute is empty, and if all of its tags match one of the event’s tags or the query item’s tags attribute is empty.

from eventsourcing.dcb.api import DCBQueryItem

student_query_item = DCBQueryItem(
    types=["StudentRegistered"],
    tags=["student:123"],
)

DCB Query

A DCBQuery defines criteria for selecting events in an event store. An event is selected if it is matched by any query item included in items attribute, or if the items attribute is empty.

from eventsourcing.dcb.api import DCBQuery

student_query = DCBQuery(
    items=[student_query_item],
)

DCB Event

A DCBEvent represents a settled collection of facts to be appended, or that has already been recorded, in an event store.

from eventsourcing.dcb.api import DCBEvent

student_registered = DCBEvent(
    type="StudentRegistered",
    data=b'{"student_id": "student:123", "name": "Sara", "max_courses": 5}',
    tags=["student:123"],
)

course_registered = DCBEvent(
    type="CourseRegistered",
    data=b'{"course_id": "course:456", "name": "History", "max_students": 30}',
    tags=["course:456"],
)

DCB Sequenced Event

A DCBSequencedEvent represents a recorded event along with its assigned sequence number.

from eventsourcing.dcb.api import DCBSequencedEvent

DCBSequencedEvent(
    event=student_registered,
    position=56316,
)

DCBSequencedEvent(
    event=course_registered,
    position=56317,
)

DCB Append Condition

A DCBAppendCondition causes an append request to fail if events match the query value of its fail_if_events_match attribute, optionally after a sequence number.

from eventsourcing.dcb.api import DCBAppendCondition

append_condition = DCBAppendCondition(
    fail_if_events_match=student_query,
    after=None,
)

DCB Recorders

The term “recorder” here corresponds to the notion “event store” in the DCB specification, and refers to the notion of dealing with records. Following the terminology in this library, the term “recorder” is used for dealing with domain events that have been serialised into a common format, and the term “event store” is used both for a higher-level objects that deal with domain event objects of different types, and for specialist event store databases.

Abstract base class

The abstract base class DCBRecorder defines the read() and append() method signatures described in the DCB specification for an “event store”. These methods depend on the DCB Objects described above.

class DCBRecorder(ABC):
    @abstractmethod
    def read(
        self,
        query: DCBQuery | None = None,
        *,
        after: int | None = None,
        limit: int | None = None,
    ) -> DCBReadResponse:
        """
        Returns all events, unless 'after' is given then only those with position
        greater than 'after', and unless any query items are given, then only those
        that match at least one query item. An event matches a query item if its type
        is in the item types or there are no item types, and if all the item tags are
        in the event tags.
        """

    @abstractmethod
    def append(
        self, events: Sequence[DCBEvent], condition: DCBAppendCondition | None = None
    ) -> int:
        """
        Appends given events to the event store, unless the condition fails.
        """

    @abstractmethod
    def subscribe(
        self,
        query: DCBQuery | None = None,
        *,
        after: int | None = None,
    ) -> DCBSubscription[Self]:
        """
        Returns all events, unless 'after' is given then only those with position
        greater than 'after', and unless any query items are given, then only those
        that match at least one query item. An event matches a query item if its type
        is in the item types or there are no item types, and if all the item tags are
        in the event tags. The subscription will block when the last recorded event
        is received, and then continue when new events are recorded.
        """

We have made two enhancements that go beyond the DCB specification.

The first enhancement is the standard idea to return int from the append() method. This value represents the position of the last appended event. By returning this position, systems implemented with CQRS that transition from a “write” view to an eventually-consistent “read” view can wait for new events to be processed, by forwarding this value in the read request, avoiding the stale read model problem.

The second enhancement is to support subscriptions, with the subscribe() method, so that readers can continue receiving newly recorded events.

Read response

The read() method returns a DCBReadResponse, which is a Python iterator that returns DCBSequencedEvent objects. It has a head property that allows a reader to obtain a “last known position” that corresponds to the last recorded event in the database at the time of reading, rather than the sequence number of the last event it receives. This gives a better value for subsequent append conditions.

Subscription

The subscribe() method returns a DCBSubscription object, which is a Python iterator that returns DCBSequencedEvent objects. It can be used as a context manager.

The following sections describe various implementations of the DCBRecorder interface.

In-memory DCB recorder

The InMemoryDCBRecorder class implements the DCBRecorder interface using only Python objects.

from eventsourcing.dcb.popo import InMemoryDCBRecorder

in_memory_recorder = InMemoryDCBRecorder()

# Conditionally append new events.
in_memory_recorder.append(
    events=[student_registered],
    condition=append_condition,
)

# Read previously appended events.
student_events = in_memory_recorder.read(
    query=student_query,
)

It can be used by a DCB application by setting the PERSISTENCE_MODULE environment variable to "eventsourcing.dcb.popo".

DCB events are stored in memory, and “deep copied” when appending and when reading to avoid any corruption of sequenced events. If you click through to the source code, you can see the DCB query logic for selecting events, and the append condition logic that is implemented in the append method. This is extremely fast for relatively low volumes of events, and is ideal for use in unit test suites.

Postgres DCB recorder

The PostgresDCBRecorderTT class implements DCBRecorder in PostgreSQL using the following approach.

It can be used by a DCB application by setting the PERSISTENCE_MODULE environment variable to "eventsourcing.dcb.postgres_tt", along with other settings such as the database name and password (see examples).

This is our third attempt to implement the complex DCB query logic in PostgreSQL. The first attempt used array columns and array operators. The second attempt used text search. Both of these first two attempts gave poor results. The third attempt, explained below, gives much better results.

The general idea is that tags tend to follow from individual enduring objects in the real world, whereas event type strings follow from software object classes in a domain model. Therefore tags tend to have high cardinality, whereas type strings tend to have low cardinality. Therefore tags tend to be highly selective in queries, whereas type strings do not. Mixing these up in an SQL select statement causes the PostgreSQL query planner to find sub-optimal solutions.

This implementation uses a secondary table of tags indexed with a B-tree. Queries select tags first, and then filter by position and type. The sequence positions on the main table are also indexed with a B-tree that “covers” the type column. In this way, recorded events can be selected by tag, filtered by type, ordered and limited, using only B-tree indexes, without touching the main table of recorded events.

Conditional append operations use separate multi-clause “fail condition” and “unconditional append” CTE statements. Having two separate statements in a PL/pgSQL function means each can be planned separately, whilst conditional append operations can be performed efficiently with one client-server round-trip. The function parameters include lists of DCB query items and DCB events, defined as composite arrays of custom types. A multi-clause CTE statement is also used to select events for read operations. These functions are executed as prepared statements.

Logging execution times directly from the database shows that the database typically executes conditional appends in 100-200 μs with millions of recorded events.

See the speedrun example for a comparative report and analysis of the performance.

UmaDB DCB recorder

UmaDB is a specialist event store for DCB written in Rust. The Python package eventsourcing_umadb implements DCBRecorder by adapting the Python client for UmaDB.

It can be used by a DCB application by setting the PERSISTENCE_MODULE environment variable to "eventsourcing_umadb".

UmaDB uses the same idea as the Postgres DCB recorder of filtering first by tags. UmaDB follows the copy-on-write MVCC design of LMDB.

See the speedrun example for a comparative report and analysis of the performance.

Higher-level Abstractions

The following sections describe higher-level abstractions for event sourcing with DCB.

The higher-level abstractions shown below introduces the notion “enduring object” which is quite like “event-sourced aggregate” but with some important differences, the notion “group”, which is a collection of many enduring objects that can also make decisions which affect the whole group, and the notion “slice” which can be used with “vertical slice architecture”.

The more general abstraction that has been derived is the notion of a decision-making “perspective”. In general, a “perspective” is a selective view of the past, an apprehension of decisions already made, at the beginning of the process of creating a new decision. Here, a “perspective” is a selection of decisions already recorded, that forms the “datum” for a decision model which generates new decision.

In order to reach towards a coherent scheme, some of the names have been borrowed from the highly relevant event-oriented modern process philosophy of Alfred North Whitehead.

“The ‘settlement’ which an actual entity ‘finds’ is its datum. It is to be conceived as a limited perspective of the ‘settled’ world provided by the eternal objects concerned. This datum is ‘decided’ by the settled world. It is ‘prehended’ by the new superseding entity. The datum is the objective content of the experience. The decision, providing the datum, is a transference of self-limited appetition; the settled world provides the ‘real potentiality’ that its many actualities be felt compatibly; and the new concrescence starts from this datum. The perspective is provided by the elimination of incompatibilities. The final stage, the ‘decision’ is how the actual entity, having attained its individual ‘satisfaction’ thereby adds a determinate condition to the settlement for the future beyond itself. Thus the ‘datum’ is the ‘decision received’ and the ‘decision’ is the ‘decision transmitted’. Between these two decisions, received and transmitted, there lie the two stages, ‘process’ and ‘satisfaction’.”

“A nexus enjoys ‘personal order’ when (a) it is a ‘society’ and (b) when the genetic relatedness of its members orders these members ‘serially’. By this ‘serial ordering’ arising from the genetic relatedness, it is meant that any member of the nexus — excluding the first and the last, if there be such — constitutes a ‘cut’ in the nexus, so that (a) this member inherits from all members on one side of the cut, and from no members on the other side of the cut, and (b) if A and B are two members of the nexus and B inherits from A, then the side of B’s cut, inheriting from B, forms part of the side of A’s cut, inheriting from A, and the side of A’s cut from which A inherits forms part of the side of B’s cut from which B inherits. Thus the nexus forms a single line of inheritance of its defining characteristic. Such a nexus is called an ‘enduring object’.”

The abstractions supporting the higher-level styles of domain modelling with DCB are described briefly below with short examples.

Decision

The Decision class is defined as the root of the decision class hierarchy in domain models that uses DCB.

It represents the general notion of giving form to the settled production of new facts in a domain model. Concrete subclasses will each define a name and a collection of attributes, and will be somehow serializable and deserializable. Abstract subclasses may involve some kind of declarative syntax that supports automatic serialisation, such as we see with Pydantic and MessagePack.

The examples below define decision types as Python data classes.

from dataclasses import dataclass
from eventsourcing.dcb.domain import Decision


@dataclass
class StudentRegistered(Decision):
    student_id: str
    name: str
    max_courses: int


@dataclass
class CourseRegistered(Decision):
    course_id: str
    name: str
    max_students: int


student_registered = StudentRegistered(
    student_id="student:123",
    name="Sara",
    max_courses=5,
)

course_registered = CourseRegistered(
    course_id="course:456",
    name="History",
    max_students=30,
)

Additionally, the class InitialDecision can be used to represent the first decision in a modelled set of decisions that have serial order (a sequence). It extends Decision with an “originator topic” type hint that may be implemented to represent the type of thing to which the sequence belongs. All the members of such a sequence of decisions are likely each to be tagged with a common tag, and perhaps also other tags for cross-cutting decisions or other classifications. The common tag in this case is likely to represent the continuity ID of the thing to which the whole sequence belongs.

TDecision

The type variable TDecision represents any subclass of Decision. It will be used as a type parameter to define generic types that can be specialized to work with one kind of decision or another.

Tagged

The generic class Tagged encapsulates a Decision, along with some tag strings. It corresponds to the “typed and tagged” lower-level DCB event type.

class Tagged(Generic[TDecision]):
    def __init__(self, tags: list[str], decision: TDecision) -> None:
        self.tags = tags
        self.decision = decision
from eventsourcing.dcb.domain import Tagged

tagged_decision = Tagged[StudentRegistered](
    tags=["student:123"],
    decision=student_registered,
)

Mapper

The class DCBMapper is an abstract base class that defines an interface for converting between the higher-level Decision instances and the lower-level DCBEvent instances.

class DCBMapper(ABC, Generic[TDecision]):
    @abstractmethod
    def to_dcb_event(self, event: Tagged[TDecision]) -> DCBEvent:
        raise NotImplementedError  # pragma: no cover

    @abstractmethod
    def to_domain_event(self, event: DCBEvent) -> Tagged[TDecision]:
        raise NotImplementedError  # pragma: no cover

Concrete subclasses will implement or invoke some kind of serialization and deserialization functionality, for example by using JSON, Pydantic, MessagePack, or Protobuf.

import json

class JSONMapper(DCBMapper[Decision]):
    def __init__(self, registered_types: list[type[Decision]]) -> None:
        self.registered_types = {t.__qualname__: t for t in registered_types}

    def to_dcb_event(self, event: Tagged[Decision]) -> DCBEvent:
        return DCBEvent(
            type=type(event.decision).__qualname__,
            data=json.dumps(event.decision.as_dict()),
            tags=event.tags,
        )

    def to_domain_event(self, event: DCBEvent) -> Tagged[Decision]:
        return Tagged(
            tags=event.tags,
            decision = self.registered_types[event.type](**json.loads(event.data))
        )


json_mapper = JSONMapper(
    registered_types=[
        StudentRegistered,
        CourseRegistered,
    ]
)

# Convert from tagged decision to DCB event.
dcb_event = json_mapper.to_dcb_event(tagged_decision)

# Convert from DCB event to tagged decision.
tagged_decision = json_mapper.to_domain_event(dcb_event)

Usually this requires some kind of alignment with the decision classes. For example, the module eventsourcing.dcb.msgpack defines a mapper and decision base classes that work together using the super fast and compact msgpack format. To use this module, you will need to install the Python msgspec package.

from eventsourcing.dcb.msgpack import Decision, MessagePackMapper, InitialDecision


class StudentJoinedCourse(Decision):
    student_id: str
    course_id: str


msgpack_mapper = MessagePackMapper()

tagged_decision = Tagged(
    tags=["student:123", "course:456"],
    decision=StudentJoinedCourse(
        student_id="student:123",
        course_id="course:123",
    )
)

dcb_event = msgpack_mapper.to_dcb_event(tagged_decision)

tagged_decision = msgpack_mapper.to_domain_event(dcb_event)

Selector

A Selector defines a criterion for a consistency boundary in a domain model, in terms of decision types and tag strings. It corresponds to the lower-level query item.

from eventsourcing.dcb.domain import Selector

student_selector = Selector(
    types=[StudentRegistered],
    tags=["student:123"],
)

course_selector = Selector(
    types=[CourseRegistered],
    tags=["course:456"],
)

A list of selectors corresponds to the lower-level query.

student_consistency_boundary = [student_selector]

Event store

A DCBEventStore encapsulates both a mapper and a recorder. It has methods for reading and appending tagged decisions.

The read() method returns an iterator of matching tagged events. The optional cb parameter is a consistency boundary for selecting events. The argument can be either a list of selectors, or an individual selector. The optional after parameter is a sequence number after which events will be read.

The append() method has an events parameter, which is a list of tagged decisions. The optional cb parameter is a consistency boundary for detecting conflicting events. The argument can be either a list of selectors, or an individual selector. The optional after parameter represents a sequence number after which conflicting events will be detected.

from eventsourcing.dcb.persistence import DCBEventStore

event_store = DCBEventStore(
    mapper=json_mapper,
    recorder=in_memory_recorder,
)

# Read already appended student events.
student_events = event_store.read(
    cb=student_selector,
)

# Append new course events.
event_store.append(
    events=[tagged_decision],
    cb=course_selector,
)

# Read already appended course events.
course_events = event_store.read(
    cb=course_selector,
)

Perspective

The Perspective class is an abstract base class for different kinds of decision models. It defines an abstract method consistency_boundary() that must be implemented by subclasses to return a selector or a sequence of selectors. It also defines a last_known_position attribute for keeping track of the last known sequence number when a perspective is reconstructed from sequenced events. The consistency boundary of a perspective is used as a query when reading events that will be used to reconstruct the perspective. The consistency boundary and the last known position is used as an append condition when appending new events.

The Perspective class also provides trigger_event() for creating and appending new tagged decisions to an internal list, and collect_events() for collecting all new tagged decisions.

from eventsourcing.dcb.domain import Perspective

class MyPerspective(Perspective):
    def consistency_boundary(self) -> list[Selector]:
        return [Selector(tags=["tag1"]), Selector(tags=["tag2"])]


# Construct a perspective.
perspective = MyPerspective()

# Get consistency boundary.
cb = perspective.consistency_boundary()

# Update "last known position", usually after selecting tagged
# decisions and updating the state of the perspective.
perspective.last_known_position = 1234

# Generate new tagged decisions.
perspective.trigger_event(
    Decision,
    tags=["tag1", "tag2"],
)

# Collect new decisions, usually before append them into an event
# store using the consistency boundary and the last known position.
new_decisions = perspective.collect_events()

# Append new decisions, using the same consistency boundary and the
# "last known position" when the perspective was reconstructed....

Enduring object

The generic base class EnduringObject extends the perspective class, and is similar to event-sourced aggregates. Each instance has a unique continuity ID, which is stored in the id attribute. The continuity ID is used as a tag in its consistency boundary, and to tag new decisions.

Enduring objects can have command methods decorated with the library’s event decorator. Calling a decorated command method will generate a new tagged decision. The method body will be used to project tagged events into the current state of the enduring object.

Enduring object subclasses must be associated with an InitialDecision class whose attributes match the arguments of its initializer __init__() method. This association can be made either by defining a subclass of InitialDecision as a nested class on the enduring object subclass, or by mentioning a subclass of InitialDecision in an event decorator in the __init__() method.

Enduring object instances can be created by calling the subclass. This will trigger a new “initial decision”, that will be used to construct the enduring object instance. The “initial decision” can be collected from the enduring object instance by calling collect_events(). The examples below show a student and course modelled as enduring objects. The StudentJoinedCourse decision class, defined in the mapper example above, is included in the projection of both enduring objects, in preparation for the group example in the next section.

from eventsourcing.domain import event
from eventsourcing.dcb.domain import EnduringObject


class Student(EnduringObject[Decision, str]):
    class Registered(InitialDecision):
        student_id: str
        name: str
        max_courses: int

    class NameUpdated(Decision):
        name: str

    @event(Registered)
    def __init__(self, name: str, max_courses: int) -> None:
        self.name = name
        self.max_courses = max_courses
        self.course_ids: list[str] = []

    @event(NameUpdated)
    def update_name(self, name: str) -> None:
        self.name = name

    @event(StudentJoinedCourse)
    def _(self, course_id: str) -> None:
        self.course_ids.append(course_id)


class Course(EnduringObject[Decision, str]):
    class Registered(InitialDecision):
        course_id: str
        name: str
        max_students: int

    def __init__(self, name: str, max_students: int) -> None:
        self.name = name
        self.max_students = max_students
        self.student_ids: list[str] = []

    @event(StudentJoinedCourse)
    def _(self, student_id: str) -> None:
        self.student_ids.append(student_id)


# Create a new student.
student = Student(
    name="Sara",
    max_courses=5,
)
assert student.name == "Sara"
assert student.max_courses == 5

# Update the student name.
student.update_name("Sara P")
assert student.name == "Sara P"

# Collect new events.
tagged_decisions = student.collect_events()
assert len(tagged_decisions) == 2

# Create a new course...
course = Course(
    name="History",
    max_students=30,
)
assert course.name == "History"
assert course.max_students == 30

See the DCB examples for a more complete example.

The advantage of enduring objects is the conceptual unity of having everything together in one place. However, this aligns enduring objects with the central criticism of event-sourced aggregates motivating DCB: that including all events in the consistency boundary, regardless of whether they are actually required for any particular operation, increases contention unnecessarily. Following this comes the accumulation of all commands and queries in a single class, tending towards large units of code that are hard to understand. See slices for an alternative higher-level abstraction.

Group

The Group class extends the perspective class, and supports cross-cutting decision-making across many enduring objects. The consistency boundary of a group is the union of the consistency boundaries of the enduring objects in the group.

A group is constructed with already existing enduring objects. Its command methods can trigger new tagged decisions, using the group’s trigger_event() method. Decisions created by a group will be tagged with all the continuity IDs of the enduring objects in the group. If an enduring object’s projection includes that decision, its state will be evolved accordingly.

The example below uses the student and course enduring objects in a group that triggers a StudentJoinedCourse event in the student_joins_course() method that applies to both the student and the course. Similarly, a student_leaves_course() method could be implemented for this group.

from eventsourcing.dcb.domain import Group


class StudentAndCourse(Group[Decision]):
    def __init__(
        self,
        student: Student | None,
        course: Course | None,
    ) -> None:
        self.student = student
        self.course = course

    def student_joins_course(self) -> None:
        # Enforce business rules.
        assert len(self.student.course_ids) < self.student.max_courses
        assert len(self.course.student_ids) < self.course.max_students
        assert self.student.id not in self.course.student_ids
        # The DCB magic: one event for "one fact".
        self.trigger_event(
            StudentJoinedCourse,
            student_id=self.student.id,
            course_id=self.course.id,
        )


group = StudentAndCourse(student, course)
group.student_joins_course()

assert student.id in course.student_ids
assert course.id in student.course_ids

Using groups to trigger cross-cutting events like this demonstrates the “one fact magic” of DCB. However, because the consistency boundary for a group is the union of the consistency boundaries for the members of a group, the criticism of enduring objects applies even more to groups: that including all events in the consistency boundary, regardless of whether they are actually required for any particular operation, increases contention unnecessarily. This corresponds exactly to recording new events from more than one aggregate in the same transaction. The difference with groups is that one event can affect many enduring objects.

Slice

The Slice class extends the perspective class, and is designed to support “vertical slice architecture” with DCB. The idea of “vertical slices” is that individual use cases can be implemented with pieces of code that are entirely independent of each other. Slices can support both command and query use cases.

The three important aspects of a slice are:

The consistency boundary for a slice can be used both to select events for the slice’s projection, if it has one, and to select conflicting events when appending any new events to an event store.

The example below shows a slice for updating a student’s name. The consistency_boundary() involves decision classes mentioned in the projection by using the projected_types attribute, which automatically collects all decision classes mentioned in the slice’s @event decorators.

from eventsourcing.dcb.domain import Slice

class UpdateStudentName(Slice[Decision]):
    def __init__(self, student_id: StudentID, new_name: str) -> None:
        self.student_id = student_id
        self.new_name = new_name
        self.name = ""
        self.student_was_registered: bool = False

    def consistency_boundary(self) -> Selector:
        return Selector(
            types=self.projected_types,
            tags=[self.student_id],
        )

    @event(Student.Registered)
    def _(self, name: str) -> None:
        self.name = name
        self.student_was_registered = True

    @event(Student.NameUpdated)
    def _(self, name: str) -> None:
        self.name = name

    def execute(self) -> None:
        assert self.student_was_registered
        assert self.name != self.new_name
        self.trigger_event(
            Student.NameUpdated,
            tags=[self.student_id],
            name=self.new_name,
        )

See the DCB examples for a more complete set of examples.

The advantage of using slices is that individual use cases can be implemented with pieces of code that are entirely independent of each other, and with consistency boundaries that include only what is necessary. However, this may come at the cost of some repetition of business logic, increasing the volume of code, which tends to increase the chances of introducing coding errors. See enduring objects for an alternative higher-level abstraction.

Mixing styles

The decision classes used in the UpdateStudentName slice are those defined above on the Student enduring object. In these examples, the decision classes are defined as nested classes, but defining them as module-level classes would work just as well.

This shows that it is possible to develop a domain model with enduring objects and rework your code to use slices. Similarly, with a little care, it is possible to start with slices and rework your code to use enduring objects.

Indeed, it is possible to have some parts of your domain model defined with enduring objects, and groups, and to have other parts defined using slices. This is demonstrated in the example application below.

Repository

The DCBRepository class is provided to support working with perspectives of different kinds.

A repository is constructed with an event store.

from eventsourcing.dcb.application import DCBRepository

repository = DCBRepository(
    eventstore=DCBEventStore(
        mapper=MessagePackMapper(),
        recorder=InMemoryDCBRecorder(),
    ),
)

The save() method collects and appends new decisions.

student = Student(
    name="Sara",
    max_courses=5,
)

course = Course(
    name="History",
    max_students=30,
)

repository.save(student)
repository.save(course)

The get() method reconstructs an enduring object for a given continuity ID.

student = repository.get(student.id)
course = repository.get(course.id)

assert student.name == "Sara"
assert student.max_courses == 5
assert student.course_ids == []

assert course.name == "History"
assert course.max_students == 30
assert course.student_ids == []

The get_many() method reconstructs many enduring objects for a given sequence of continuity IDs.

student, course = repository.get_many(student.id, course.id)

assert student.name == "Sara"
assert student.max_courses == 5
assert student.course_ids == []

assert course.name == "History"
assert course.max_students == 30
assert course.student_ids == []

The get_group() method constructs a group and its enduring object for a given sequence of continuity IDs.

group = repository.get_group(StudentAndCourse, student.id, course.id)
group.student_joins_course()

repository.save(group)

# Check the student has joined the course.
student = repository.get(student.id)
course = repository.get(course.id)
assert course.id in student.course_ids
assert student.id in course.student_ids

The advance() method selects and applies decisions to a perspective. It can be used to update a slice to its current state before calling its execute method.

update_student_name = UpdateStudentName(student_id=student.id, new_name="Sara P")
assert update_student_name.name == ""
assert update_student_name.student_was_registered is False

repository.advance(update_student_name)
assert update_student_name.name == "Sara"
assert update_student_name.student_was_registered is True

update_student_name.execute()
assert update_student_name.name == "Sara P"
assert update_student_name.student_was_registered is True

repository.save(update_student_name)

Application

An application object brings together a stand-alone domain model and supportive persistence infrastructure, and implements commands and queries that support user interfaces.

Just like the library’s original application class, DCBApplication selects and constructs a DCB recorder at run-time, according to its environment variable configuration. This means we can define a DCB application independently of persistence infrastructure, and then run it in different ways at different times.

The DCBApplication class also supports the higher-level abstractions described above. It has a repository to support working with perspectives. It also has a method do() which supports working with slices by advancing and executing a slice, then saving new decisions.

It is also possible to use the basic DCB objects directly with an application, and to extend DCBApplication to support any other higher-level style you may wish to invent.

The example below shows how to write command and query methods using enduring objects, groups, and slices.

from eventsourcing.dcb.application import DCBApplication


class CourseSubscriptions(DCBApplication):
    def register_student(self, name: str) -> str:
        student = Student(name=name, max_courses=5)
        self.repository.save(student)
        return student.id

    def update_student_name(self, student_id: str, new_name: str) -> None:
        self.do(UpdateStudentName(student_id, new_name))

    def register_course(self, name: str) -> str:
        course = Course(name=name, max_students=30)
        self.repository.save(course)
        return course.id

    def enrol_student_on_course(self, student_id: str, course_id: str) -> None:
        group = self.repository.get_group(StudentAndCourse, student_id, course_id)
        group.student_joins_course()
        self.repository.save(group)

    def list_courses_for_student(self, student_id: str) -> list[str]:
        student: Student = self.repository.get(student_id)
        return [c.name for c in self.repository.get_many(*student.course_ids)]

    def list_students_for_course(self, course_id: str) -> list[str]:
        course: Course = self.repository.get(course_id)
        return [s.name for s in self.repository.get_many(*course.student_ids)]


# Construct app to use MessagePack and in-memory persistence.
app = CourseSubscriptions(env={
    "PERSISTENCE_MODULE": "eventsourcing.dcb.popo",
    "MAPPER_TOPIC": "eventsourcing.dcb.msgpack:MessagePackMapper",
})

# Construct enduring objects.
student_id = app.register_student("Sara")
course_id = app.register_course("History")

# Update the student name using a vertical slice.
app.update_student_name(student_id, "Sara P")

# Enrol the student on the course using a group.
app.enrol_student_on_course(student_id, course_id)

# Query for student and course names.
assert "Sara P" in app.list_students_for_course(course_id)
assert "History" in app.list_courses_for_student(student_id)

Read the examples pages for more discussion and examples of DCB.

Code reference

class eventsourcing.dcb.api.DCBQueryItem(types: 'list[str]' = <factory>, tags: 'list[str]' = <factory>)[source]

Bases: object

types: list[str]
tags: list[str]
__init__(types: list[str] = <factory>, tags: list[str] = <factory>) None
class eventsourcing.dcb.api.DCBQuery(items: 'list[DCBQueryItem]' = <factory>)[source]

Bases: object

items: list[DCBQueryItem]
__init__(items: list[~eventsourcing.dcb.api.DCBQueryItem] = <factory>) None
class eventsourcing.dcb.api.DCBAppendCondition(fail_if_events_match: 'DCBQuery' = <factory>, after: 'int | None' = None)[source]

Bases: object

fail_if_events_match: DCBQuery
after: int | None = None
__init__(fail_if_events_match: ~eventsourcing.dcb.api.DCBQuery = <factory>, after: int | None = None) None
class eventsourcing.dcb.api.DCBEvent(type: 'str', data: 'bytes', tags: 'list[str]' = <factory>)[source]

Bases: object

type: str
data: bytes
tags: list[str]
__init__(type: str, data: bytes, tags: list[str] = <factory>) None
class eventsourcing.dcb.api.DCBSequencedEvent(event: 'DCBEvent', position: 'int')[source]

Bases: object

event: DCBEvent
position: int
__init__(event: DCBEvent, position: int) None
class eventsourcing.dcb.api.DCBReadResponse[source]

Bases: Iterator[DCBSequencedEvent], ABC

abstract property head: int | None
class eventsourcing.dcb.api.DCBRecorder[source]

Bases: ABC

abstract read(query: DCBQuery | None = None, *, after: int | None = None, limit: int | None = None) DCBReadResponse[source]

Returns all events, unless ‘after’ is given then only those with position greater than ‘after’, and unless any query items are given, then only those that match at least one query item. An event matches a query item if its type is in the item types or there are no item types, and if all the item tags are in the event tags.

abstract append(events: Sequence[DCBEvent], condition: DCBAppendCondition | None = None) int[source]

Appends given events to the event store, unless the condition fails.

abstract subscribe(query: DCBQuery | None = None, *, after: int | None = None) DCBSubscription[Self][source]

Returns all events, unless ‘after’ is given then only those with position greater than ‘after’, and unless any query items are given, then only those that match at least one query item. An event matches a query item if its type is in the item types or there are no item types, and if all the item tags are in the event tags. The subscription will block when the last recorded event is received, and then continue when new events are recorded.

class eventsourcing.dcb.api.DCBSubscription(recorder: TDCBRecorder_co, query: DCBQuery | None = None, after: int | None = None)[source]

Bases: Iterator[DCBSequencedEvent], Generic[TDCBRecorder_co]

__init__(recorder: TDCBRecorder_co, query: DCBQuery | None = None, after: int | None = None) None[source]
stop() None[source]

Stops the subscription.

class eventsourcing.dcb.application.DCBApplication(env: Mapping[str, str] | None = None)[source]

Bases: object

name = 'DCBApplication'
__init__(env: Mapping[str, str] | None = None)[source]
env: Mapping[str, str] = {'PERSISTENCE_MODULE': 'eventsourcing.dcb.popo'}
construct_env(name: str, env: Mapping[str, str] | None = None) Environment[source]

Constructs environment from which application will be configured.

do(s: TSlice) TSlice[source]

Advances and executes a slice, then saves new decisions.

close() None[source]
class eventsourcing.dcb.application.DCBRepository(eventstore: DCBEventStore[TDecision])[source]

Bases: Generic[TDecision]

__init__(eventstore: DCBEventStore[TDecision])[source]
save(p: Perspective[TDecision]) int[source]
get(enduring_object_id: str) EnduringObject[TDecision, str][source]
get_many(*enduring_object_ids: str) list[EnduringObject[TDecision, str] | None][source]
get_group(cls: type[TGroup], *enduring_object_ids: str) TGroup[source]
advance(p: TPerspective) TPerspective[source]
class eventsourcing.dcb.domain.Decision[source]

Bases: AbstractDecision

as_dict() dict[str, Any][source]
mutate(obj: TPerspective | None) TPerspective | None[source]
apply(obj: Any) None[source]
class eventsourcing.dcb.domain.InitialDecision[source]

Bases: Decision

originator_topic: str
mutate(obj: TPerspective | None) TPerspective | None[source]
classmethod id_attr_name(enduring_object_class: type[EnduringObject[Any, TID]]) TID[source]
class eventsourcing.dcb.domain.TDecision

A type variable representing any subclass of Decision.

alias of TypeVar(‘TDecision’, bound=Decision)

has_default()
class eventsourcing.dcb.domain.Tagged(tags: list[str], decision: TDecision)[source]

Bases: Generic[TDecision]

__init__(tags: list[str], decision: TDecision) None[source]
class eventsourcing.dcb.domain.MetaPerspective(name, bases, namespace, /, **kwargs)[source]

Bases: ABCMeta

class eventsourcing.dcb.domain.Perspective(*_: Any, **__: Any)[source]

Bases: ABC, Generic[TDecision]

last_known_position: int | None
new_decisions: list[Tagged[TDecision]]
abstract consistency_boundary() Selector | Sequence[Selector][source]
trigger_event(decision_cls: Callable[P, TDecision], tags: Sequence[str] = (), *args: P.args, **kwargs: P.kwargs) None[source]

Constructs new tagged decision and appends to list of uncommitted events.

collect_events() Sequence[Tagged[TDecision]][source]

Drains list of triggered events.

class eventsourcing.dcb.domain.MetaSupportsEventDecorator(name, bases, namespace, /, **kwargs)[source]

Bases: MetaPerspective

__init__(name: str, bases: tuple[type, ...], namespace: dict[str, Any]) None[source]
class eventsourcing.dcb.domain.MetaEnduringObject(name, bases, namespace, /, **kwargs)[source]

Bases: MetaSupportsEventDecorator

__init__(name: str, bases: tuple[type, ...], namespace: dict[str, Any]) None[source]
class eventsourcing.dcb.domain.EnduringObject(**kwargs: Any)[source]

Bases: Perspective[TDecision], Generic[TDecision, TID]

id: TID
consistency_boundary() list[Selector][source]
trigger_event(decision_cls: Callable[P, TDecision], tags: Sequence[str] = (), *args: P.args, **kwargs: P.kwargs) None[source]

Constructs new tagged decision and appends to list of uncommitted events.

projected_types = []
class eventsourcing.dcb.domain.Group(*args: Any, **kwargs: Any)[source]

Bases: Perspective[TDecision]

consistency_boundary() list[Selector][source]
trigger_event(decision_cls: Callable[P, TDecision], tags: Sequence[str] = (), *args: P.args, **kwargs: P.kwargs) None[source]

Constructs new tagged decision and appends to list of uncommitted events.

class eventsourcing.dcb.domain.Selector(types: 'Sequence[type[Decision]]' = (), tags: 'Sequence[str]' = ())[source]

Bases: object

types: Sequence[type[Decision]] = ()
tags: Sequence[str] = ()
__init__(types: Sequence[type[Decision]] = (), tags: Sequence[str] = ()) None
class eventsourcing.dcb.domain.MetaSlice(name, bases, namespace, /, **kwargs)[source]

Bases: MetaSupportsEventDecorator

__init__(name: str, bases: tuple[type, ...], namespace: dict[str, Any]) None[source]
class eventsourcing.dcb.domain.Slice(*_: Any, **__: Any)[source]

Bases: Perspective[TDecision]

execute() None[source]
do_projection = False
projected_types = []
last_known_position: int | None
new_decisions: list[Tagged[TDecision]]
class eventsourcing.dcb.persistence.DCBMapper[source]

Bases: ABC, Generic[TDecision]

abstract to_dcb_event(event: Tagged[TDecision]) DCBEvent[source]
abstract to_domain_event(event: DCBEvent) Tagged[TDecision][source]
class eventsourcing.dcb.persistence.DCBEventStore(mapper: DCBMapper[TDecision], recorder: DCBRecorder)[source]

Bases: Generic[TDecision]

__init__(mapper: DCBMapper[TDecision], recorder: DCBRecorder)[source]
append(events: Sequence[Tagged[TDecision]], cb: Selector | Sequence[Selector] | None = None, after: int | None = None) int[source]
read(cb: Selector | Sequence[Selector] | None = None, *, after: int | None = None) DCBEventStoreReadResponse[TDecision][source]
class eventsourcing.dcb.persistence.DCBEventStoreReadResponse(dcb_read_response: DCBReadResponse, mapper: DCBMapper[TDecision])[source]

Bases: Iterator[Tagged[TDecision]]

__init__(dcb_read_response: DCBReadResponse, mapper: DCBMapper[TDecision])[source]
property head: int | None
exception eventsourcing.dcb.persistence.NotFoundError[source]

Bases: Exception

class eventsourcing.dcb.persistence.DCBInfrastructureFactory(env: Environment | Mapping[str, str] | None)[source]

Bases: BaseInfrastructureFactory[TTrackingRecorder], ABC

abstract dcb_recorder() DCBRecorder[source]
class eventsourcing.dcb.persistence.DCBListenNotifySubscription(recorder: TDCBRecorder_co, query: DCBQuery | None = None, after: int | None = None)[source]

Bases: DCBSubscription[TDCBRecorder_co]

__init__(recorder: TDCBRecorder_co, query: DCBQuery | None = None, after: int | None = None) None[source]
stop() None[source]

Stops the subscription.

class eventsourcing.dcb.popo.InMemoryDCBRecorder[source]

Bases: DCBRecorder, POPORecorder

__init__() None[source]
read(query: DCBQuery | None = None, *, after: int | None = None, limit: int | None = None) DCBReadResponse[source]

Returns all events, unless ‘after’ is given then only those with position greater than ‘after’, and unless any query items are given, then only those that match at least one query item. An event matches a query item if its type is in the item types or there are no item types, and if all the item tags are in the event tags.

append(events: Sequence[DCBEvent], condition: DCBAppendCondition | None = None) int[source]

Appends given events to the event store, unless the condition fails.

subscribe(query: DCBQuery | None = None, *, after: int | None = None) InMemorySubscription[source]

Returns all events, unless ‘after’ is given then only those with position greater than ‘after’, and unless any query items are given, then only those that match at least one query item. An event matches a query item if its type is in the item types or there are no item types, and if all the item tags are in the event tags. The subscription will block when the last recorded event is received, and then continue when new events are recorded.

listen(event: Event) None[source]
unlisten(event: Event) None[source]
class eventsourcing.dcb.popo.InMemorySubscription(recorder: InMemoryDCBRecorder, query: DCBQuery | None = None, after: int | None = None)[source]

Bases: DCBListenNotifySubscription[InMemoryDCBRecorder]

__init__(recorder: InMemoryDCBRecorder, query: DCBQuery | None = None, after: int | None = None) None[source]
stop() None[source]

Stops the subscription.

class eventsourcing.dcb.popo.SimpleDCBReadResponse(events: Iterator[DCBSequencedEvent], head: int | None = None)[source]

Bases: DCBReadResponse

__init__(events: Iterator[DCBSequencedEvent], head: int | None = None)[source]
property head: int | None
class eventsourcing.dcb.popo.InMemoryDCBFactory(env: Environment | Mapping[str, str] | None)[source]

Bases: POPOFactory, DCBInfrastructureFactory[POPOTrackingRecorder]

dcb_recorder() DCBRecorder[source]
class eventsourcing.dcb.postgres_tt.PostgresDCBRecorderTT(datastore: PostgresDatastore, *, events_table_name: str = 'dcb_events')[source]

Bases: DCBRecorder, PostgresRecorder

__init__(datastore: PostgresDatastore, *, events_table_name: str = 'dcb_events')[source]
format(sql: SQL) Composed[source]
read(query: DCBQuery | None = None, *, after: int | None = None, limit: int | None = None) DCBReadResponse[source]

Returns all events, unless ‘after’ is given then only those with position greater than ‘after’, and unless any query items are given, then only those that match at least one query item. An event matches a query item if its type is in the item types or there are no item types, and if all the item tags are in the event tags.

subscribe(query: DCBQuery | None = None, *, after: int | None = None) PostgresDCBSubscription[source]

Returns all events, unless ‘after’ is given then only those with position greater than ‘after’, and unless any query items are given, then only those that match at least one query item. An event matches a query item if its type is in the item types or there are no item types, and if all the item tags are in the event tags. The subscription will block when the last recorded event is received, and then continue when new events are recorded.

append(events: Sequence[DCBEvent], condition: DCBAppendCondition | None = None) int[source]

Appends given events to the event store, unless the condition fails.

construct_psycopg_dcb_events(dcb_events: Sequence[DCBEvent]) list[PsycopgDCBEvent][source]
construct_psycopg_query_items(query_items: Sequence[DCBQueryItem]) list[PsycopgDCBQueryItem][source]
has_one_query_item_one_type(query: DCBQuery) bool[source]
all_query_items_have_tags(query: DCBQuery) bool[source]
execute(cursor: Cursor[DictRow], statement: Composed, params: Params | None = None, *, explain: bool = False, prepare: bool = True) None[source]
class eventsourcing.dcb.postgres_tt.PsycopgDCBEvent(type, data, tags)[source]

Bases: NamedTuple

type: str

Alias for field number 0

data: bytes

Alias for field number 1

tags: list[str]

Alias for field number 2

class eventsourcing.dcb.postgres_tt.PsycopgDCBQueryItem(types, tags)[source]

Bases: NamedTuple

types: list[str]

Alias for field number 0

tags: list[str]

Alias for field number 1

class eventsourcing.dcb.postgres_tt.PostgresDCBSubscription(recorder: PostgresDCBRecorderTT, query: DCBQuery | None = None, after: int | None = None)[source]

Bases: DCBListenNotifySubscription[PostgresDCBRecorderTT]

__init__(recorder: PostgresDCBRecorderTT, query: DCBQuery | None = None, after: int | None = None) None[source]
class eventsourcing.dcb.postgres_tt.PostgresTTDCBFactory(env: Environment | Mapping[str, str] | None)[source]

Bases: BasePostgresFactory[PostgresTrackingRecorder], DCBInfrastructureFactory[PostgresTrackingRecorder]

dcb_recorder() DCBRecorder[source]
class eventsourcing.dcb.msgpack.Decision[source]

Bases: Struct, Decision

as_dict() dict[str, Any][source]
class eventsourcing.dcb.msgpack.MessagePackMapper[source]

Bases: DCBMapper[Decision]

to_dcb_event(event: Tagged[TDecision]) DCBEvent[source]
to_domain_event(event: DCBEvent) Tagged[Decision][source]
class eventsourcing.dcb.msgpack.InitialDecision(originator_topic: str)[source]

Bases: Decision, InitialDecision

originator_topic: str