from abc import ABC, abstractmethod
from typing import (
Any,
Dict,
Generic,
Iterable,
Iterator,
List,
NamedTuple,
Optional,
Sequence,
Tuple,
Type,
TypeVar,
Union,
)
from uuid import UUID
from eventsourcing.exceptions import OperationalError, RecordConflictError
from eventsourcing.infrastructure.sequenceditem import (
SequencedItem,
SequencedItemFieldNames,
)
from eventsourcing.infrastructure.sequenceditemmapper import AbstractSequencedItemMapper
from eventsourcing.whitehead import TEvent
DEFAULT_PIPELINE_ID = 0
EVENT_NOT_NOTIFIABLE = "event-not-notifiable"
TrackingKwargs = Dict[str, Union[str, int]]
[docs]class AbstractRecordManager(ABC):
has_integrated_snapshots = False
can_limit_get_records = True
can_lt_lte_get_records = True
can_list_sequence_ids = True
can_delete_records = True
[docs] def __init__(self, **kwargs: Any):
"""
Initialises record manager.
"""
@property
@abstractmethod
def record_class(self) -> Any:
"""
Returns record class to be used by the record manager.
"""
[docs] @abstractmethod
def record_items(self, sequenced_items: Iterable[NamedTuple]) -> None:
"""
Writes sequenced items into the datastore.
"""
[docs] def record_item(self, sequenced_item: NamedTuple) -> None:
"""
Writes sequenced item into the datastore.
"""
self.record_items([sequenced_item])
[docs] @abstractmethod
def get_item(self, sequence_id: UUID, position: int) -> NamedTuple:
"""
Gets sequenced item from the datastore.
"""
[docs] @abstractmethod
def get_items(
self,
sequence_id: UUID,
gt: Optional[int] = None,
gte: Optional[int] = None,
lt: Optional[int] = None,
lte: Optional[int] = None,
limit: Optional[int] = None,
query_ascending: bool = True,
results_ascending: bool = True,
) -> Iterator[NamedTuple]:
"""
Iterates over records in sequence.
"""
[docs] @abstractmethod
def get_record(self, sequence_id: UUID, position: int) -> Any:
"""
Gets record at position in sequence.
"""
[docs] @abstractmethod
def get_records(
self,
sequence_id: UUID,
gt: Optional[int] = None,
gte: Optional[int] = None,
lt: Optional[int] = None,
lte: Optional[int] = None,
limit: Optional[int] = None,
query_ascending: bool = True,
results_ascending: bool = True,
) -> Sequence[Any]:
"""
Returns records for a sequence.
"""
[docs] @abstractmethod
def all_sequence_ids(self) -> Iterable[UUID]:
"""
Returns all sequence IDs.
"""
[docs] @abstractmethod
def delete_record(self, record: Any) -> None:
"""
Removes permanently given record from the table.
"""
TRecordManager = TypeVar("TRecordManager", bound=AbstractRecordManager)
[docs]class BaseRecordManager(AbstractRecordManager):
[docs] def __init__(
self,
record_class: type,
sequenced_item_class: Type[SequencedItem] = SequencedItem, # type: ignore
contiguous_record_ids: bool = False,
application_name: str = "",
pipeline_id: int = DEFAULT_PIPELINE_ID,
**kwargs: Any
):
self._record_class = record_class
self.sequenced_item_class = sequenced_item_class
self.field_names = SequencedItemFieldNames(self.sequenced_item_class)
if hasattr(self.record_class, "id"):
self.notification_id_name = "id"
elif hasattr(self.record_class, "notification_id"):
self.notification_id_name = "notification_id"
else:
self.notification_id_name = ""
self.contiguous_record_ids = bool(
contiguous_record_ids and self.notification_id_name
)
if hasattr(self.record_class, "application_name"):
assert application_name, "'application_name' not set when required"
assert (
contiguous_record_ids
), "'contiguous_record_ids' not set when required"
self.application_name = application_name
if hasattr(self.record_class, "pipeline_id"):
assert hasattr(
self.record_class, "application_name"
), "'application_name' column not defined"
self._pipeline_id = pipeline_id
@property
def record_class(self) -> type:
return self._record_class
@property
def pipeline_id(self) -> int:
return self._pipeline_id
@pipeline_id.setter
def pipeline_id(self, pipeline_id: int) -> None:
self._pipeline_id = pipeline_id
def clone(
self, application_name: str, pipeline_id: int, **kwargs: Any
) -> "BaseRecordManager":
return type(self)(
record_class=self.record_class,
contiguous_record_ids=self.contiguous_record_ids,
sequenced_item_class=self.sequenced_item_class,
application_name=application_name,
pipeline_id=pipeline_id,
**kwargs
)
[docs] def get_item(self, sequence_id: UUID, position: int) -> SequencedItem:
"""
Gets sequenced item from the datastore.
"""
return self.from_record(self.get_record(sequence_id, position))
[docs] def get_items(
self,
sequence_id: UUID,
gt: Optional[int] = None,
gte: Optional[int] = None,
lt: Optional[int] = None,
lte: Optional[int] = None,
limit: Optional[int] = None,
query_ascending: bool = True,
results_ascending: bool = True,
) -> Iterator[SequencedItem]:
"""
Returns sequenced item generator.
"""
records = self.get_records(
sequence_id=sequence_id,
gt=gt,
gte=gte,
lt=lt,
lte=lte,
limit=limit,
query_ascending=query_ascending,
results_ascending=results_ascending,
)
for item in map(self.from_record, records):
yield item
[docs] def list_items(self, *args: Any, **kwargs: Any) -> List[SequencedItem]:
"""
Returns list of sequenced items.
"""
return list(self.get_items(*args, **kwargs))
[docs] def to_record(self, sequenced_item: NamedTuple) -> object:
"""
Constructs a record object from given sequenced item object.
"""
kwargs = self.get_field_kwargs(sequenced_item)
# Supply application_name, if needed.
if hasattr(self.record_class, "application_name"):
kwargs["application_name"] = self.application_name
# Supply pipeline_id, if needed.
if hasattr(self.record_class, "pipeline_id"):
kwargs["pipeline_id"] = self.pipeline_id
return self.record_class(**kwargs)
[docs] def from_record(self, record: object) -> SequencedItem:
"""
Constructs and returns a sequenced item object, from given ORM object.
"""
kwargs = self.get_field_kwargs(record)
return self.sequenced_item_class(**kwargs)
def list_sequence_ids(self) -> List[UUID]:
return list(self.all_sequence_ids())
def get_field_kwargs(self, item: object) -> Dict[str, Any]:
return {name: getattr(item, name) for name in self.field_names}
def raise_sequenced_item_conflict(self, exp=None) -> None:
msg = "Position already taken in sequence"
raise RecordConflictError(exp or msg)
def raise_index_error(self, position: int) -> None:
raise IndexError("Sequence index out of range: {}".format(position))
def raise_record_integrity_error(self, e: Exception) -> None:
raise RecordConflictError(e)
def raise_operational_error(self, e: Exception) -> None:
raise OperationalError(e)
[docs]class RecordManagerWithNotifications(BaseRecordManager):
[docs] @abstractmethod
def get_max_notification_id(self) -> int:
"""Return maximum notification ID in pipeline."""
[docs] @abstractmethod
def get_notification_records(
self,
start: Optional[int] = None,
stop: Optional[int] = None,
*args: Any,
**kwargs: Any
) -> Iterable:
"""
Returns records sequenced by notification ID, from
application, for pipeline, in given range.
Args 'start' and 'stop' are positions in a zero-based
integer sequence.
"""
def get_notifications(
self,
start: Optional[int] = None,
stop: Optional[int] = None,
*args: Any,
**kwargs: Any
) -> Iterable:
for record in self.get_notification_records(
start=start, stop=stop, *args, **kwargs
):
yield self.create_notification_from_record(record)
def create_notification_from_record(self, record):
notification = {
"id": getattr(record, self.notification_id_name),
self.field_names.topic: getattr(record, self.field_names.topic),
self.field_names.state: getattr(record, self.field_names.state)
}
if hasattr(record, "causal_dependencies"):
notification["causal_dependencies"] = record.causal_dependencies
return notification
[docs]class RecordManagerWithTracking(RecordManagerWithNotifications):
"""
ACID record managers can write tracking records and event records
in an atomic transaction, needed for atomic processing in process
applications.
"""
tracking_record_field_names = [
"application_name",
"upstream_application_name",
"pipeline_id",
"notification_id",
]
[docs] def __init__(
self, tracking_record_class: Optional[type] = None, *args: Any, **kwargs: Any
) -> None:
super(RecordManagerWithTracking, self).__init__(*args, **kwargs)
# assert tracking_record_class is not None
self.tracking_record_class = tracking_record_class
def clone(
self, application_name: str, pipeline_id: int, **kwargs: Any
) -> "BaseRecordManager":
return super().clone(
application_name=application_name,
pipeline_id=pipeline_id,
tracking_record_class=self.tracking_record_class,
**kwargs
)
[docs] @abstractmethod
def write_records(
self,
records: Iterable[Any],
tracking_kwargs: Optional[TrackingKwargs] = None,
orm_objs_pending_save: Optional[Sequence[Any]] = None,
orm_objs_pending_delete: Optional[Sequence[Any]] = None,
) -> None:
"""
Writes tracking, event and notification records for a process event.
:param orm_objs_pending_delete:
:param orm_objs_pending_save:
"""
def to_records(self, sequenced_items: Iterable[NamedTuple]) -> Iterable[Any]:
return map(self.to_record, sequenced_items)
[docs] @abstractmethod
def get_max_tracking_record_id(self, upstream_application_name: str) -> int:
"""Return maximum tracking record ID for notification from upstream
application in pipeline."""
[docs] @abstractmethod
def has_tracking_record(
self, upstream_application_name: str, pipeline_id: int, notification_id: int
) -> bool:
"""
True if tracking record exists for notification from upstream in pipeline.
"""
[docs] def get_pipeline_and_notification_id(
self, sequence_id: UUID, position: int
) -> Tuple:
"""
Returns pipeline ID and notification ID for
event at given position in given sequence.
"""
# Todo: Optimise query by selecting only two columns,
# pipeline_id and id (notification ID)?
record = self.get_record(sequence_id, position)
notification_id = getattr(record, self.notification_id_name)
return record.pipeline_id, notification_id
[docs]class SQLRecordManager(RecordManagerWithTracking):
"""
Common aspects of SQL record managers, such as SQLAlchemy and Django record
managers.
"""
# Todo: This makes the subclasses harder to read and probably more brittle. So it
# might be better to inline this with the subclasses, so that each looks more
# like normal Django or SQLAlchemy code.
# Todo: Also, the record manager test cases don't cover the notification log and
# tracking record functionality needed by ProcessApplication, and should so
# that other record managers can more easily be developed.
[docs] def __init__(self, *args: Any, **kwargs: Any):
super(SQLRecordManager, self).__init__(*args, **kwargs)
self._insert_select_max = None
self._insert_values = None
self._insert_tracking_record = None
[docs] def record_items(self, sequenced_items: Iterable[NamedTuple]) -> None:
# Convert sequenced item(s) to database record(s).
records = self.to_records(sequenced_items)
# Write records.
self.write_records(records)
@property
def insert_select_max(self) -> Any:
"""
SQL statement that inserts records with contiguous IDs,
by selecting max ID from indexed table records.
"""
if self._insert_select_max is None:
if hasattr(self.record_class, "application_name"):
# Todo: Maybe make it support application_name without pipeline_id?
assert hasattr(self.record_class, "pipeline_id"), self.record_class
tmpl = self._insert_select_max_tmpl + self._where_application_name_tmpl
else:
tmpl = self._insert_select_max_tmpl
self._insert_select_max = self._prepare_insert(
tmpl=tmpl,
record_class=self.record_class,
field_names=list(self.field_names),
)
return self._insert_select_max
[docs] def _prepare_insert(
self,
tmpl: str,
record_class: type,
field_names: List[str],
placeholder_for_id: bool = False,
) -> Any:
"""
With transaction isolation level of "read committed" this should
generate records with a contiguous sequence of integer IDs, using
an indexed ID column, the database-side SQL max function, the
insert-select-from form, and optimistic concurrency control.
"""
if (
hasattr(record_class, "application_name")
and "application_name" not in field_names
):
field_names.append("application_name")
if hasattr(record_class, "pipeline_id") and "pipeline_id" not in field_names:
field_names.append("pipeline_id")
if (
hasattr(record_class, "causal_dependencies")
and "causal_dependencies" not in field_names
):
field_names.append("causal_dependencies")
if self.notification_id_name:
if placeholder_for_id:
if self.notification_id_name not in field_names:
field_names.append(self.notification_id_name)
statement = tmpl.format(
tablename=self.get_record_table_name(record_class),
columns=", ".join(field_names),
placeholders=", ".join([self.make_placeholder(f) for f in field_names]),
notification_id=self.notification_id_name,
)
return statement
[docs] @abstractmethod
def make_placeholder(self, field_name: str) -> str:
"""
Returns "placeholder" string for late binding of values to query.
Depends on record manager's adapted database system or adapted ORM.
"""
_insert_select_max_tmpl = (
"INSERT INTO {tablename} ({notification_id}, {columns}) "
"SELECT COALESCE(MAX({tablename}.{notification_id}), 0) + 1, {placeholders} "
"FROM "
"{tablename}"
)
@property
@abstractmethod
def _where_application_name_tmpl(self) -> str:
"""
Returns template string for "WHERE" clause of SQL statement.
"""
@property
def insert_values(self) -> Any:
"""
SQL statement that inserts records without ID.
"""
if self._insert_values is None:
self._insert_values = self._prepare_insert(
tmpl=self._insert_values_tmpl,
placeholder_for_id=True,
record_class=self.record_class,
field_names=list(self.field_names),
)
return self._insert_values
@property
def insert_tracking_record(self) -> Any:
"""
SQL statement that inserts tracking records.
"""
if self._insert_tracking_record is None:
assert self.tracking_record_class is not None
self._insert_tracking_record = self._prepare_insert(
tmpl=self._insert_values_tmpl,
placeholder_for_id=False,
record_class=self.tracking_record_class,
field_names=self.tracking_record_field_names,
)
return self._insert_tracking_record
_insert_values_tmpl = (
"INSERT INTO {tablename} ({columns}) " "VALUES ({placeholders})"
)
[docs] @abstractmethod
def get_record_table_name(self, record_class: type) -> str:
"""
Returns table name - used in raw queries.
:rtype: str
"""
[docs]class AbstractEventStore(ABC, Generic[TEvent, TRecordManager]):
"""
Abstract base class for event stores. Defines the methods
expected of an event store by other classes in the library.
"""
[docs] def __init__(
self, record_manager: TRecordManager, event_mapper: AbstractSequencedItemMapper,
):
"""
Initialises event store object.
:param record_manager: record manager
:param event_mapper: sequenced item mapper
"""
self.record_manager = record_manager
self.event_mapper = event_mapper
[docs] @abstractmethod
def store_events(self, events: Iterable[TEvent]) -> None:
"""
Put domain event in event store for later retrieval.
"""
[docs] @abstractmethod
def iter_events(
self,
originator_id: UUID,
gt: Optional[int] = None,
gte: Optional[int] = None,
lt: Optional[int] = None,
lte: Optional[int] = None,
limit: Optional[int] = None,
is_ascending: bool = True,
page_size: Optional[int] = None,
) -> Iterable[TEvent]:
"""
Returns iterable of domain events for given entity ID.
"""
[docs] def list_events(self, *args: Any, **kwargs: Any) -> List[TEvent]:
"""
Returns list of domain events for given entity ID.
"""
return list(self.iter_events(*args, **kwargs))
[docs] @abstractmethod
def get_event(self, originator_id: UUID, position: int) -> TEvent:
"""
Returns a single domain event.
"""
[docs] @abstractmethod
def get_most_recent_event(
self, originator_id: UUID, lt: Optional[int] = None, lte: Optional[int] = None
) -> Optional[TEvent]:
"""
Returns most recent domain event for given entity ID.
"""
[docs] @abstractmethod
def all_events(self) -> Iterable[TEvent]:
"""
Returns all domain events in the event store.
This works by iterating over all sequences,
so doesn't return events in order. Use a
Notification Log to project application state.
"""
[docs] def get_domain_events(
self,
originator_id: UUID,
gt: Optional[int] = None,
gte: Optional[int] = None,
lt: Optional[int] = None,
lte: Optional[int] = None,
limit: Optional[int] = None,
is_ascending: bool = True,
page_size: Optional[int] = None,
) -> Iterable[TEvent]:
"""
Deprecated. Please use iter_events() instead.
Gets domain events from the sequence identified by `originator_id`.
:param originator_id: ID of a sequence of events
:param gt: get items after this position
:param gte: get items at or after this position
:param lt: get items before this position
:param lte: get items before or at this position
:param limit: get limited number of items
:param is_ascending: get items from lowest position
:param page_size: restrict and repeat database query
:return: list of domain events
"""
return self.iter_events(
originator_id=originator_id,
gt=gt,
gte=gte,
lt=lt,
lte=lte,
limit=limit,
is_ascending=is_ascending,
page_size=page_size,
)
[docs] @abstractmethod
def items_from_events(self, events: Iterable[TEvent]) -> Iterable[NamedTuple]:
"""
Maps domain event to sequenced item namedtuple.
:param events: An iterable of events.
"""