from abc import ABC, abstractmethod
from eventsourcing.exceptions import OperationalError, RecordConflictError
from eventsourcing.infrastructure.sequenceditem import (
SequencedItem,
SequencedItemFieldNames,
)
DEFAULT_PIPELINE_ID = 0
[docs]class AbstractSequencedItemRecordManager(ABC):
[docs] def __init__(
self,
record_class,
sequenced_item_class=SequencedItem,
contiguous_record_ids=False,
application_name=None,
pipeline_id=DEFAULT_PIPELINE_ID,
):
self.record_class = record_class
self.sequenced_item_class = sequenced_item_class
self.field_names = SequencedItemFieldNames(self.sequenced_item_class)
if hasattr(self.record_class, "id"):
self.notification_id_name = "id"
elif hasattr(self.record_class, "notification_id"):
self.notification_id_name = "notification_id"
else:
self.notification_id_name = ""
self.contiguous_record_ids = contiguous_record_ids and self.notification_id_name
if hasattr(self.record_class, "application_name"):
assert application_name, "'application_name' not set when required"
assert (
contiguous_record_ids
), "'contiguous_record_ids' not set when required"
self.application_name = application_name
if hasattr(self.record_class, "pipeline_id"):
assert hasattr(
self.record_class, "application_name"
), "'application_name' column not defined"
self.pipeline_id = pipeline_id
def clone(self, application_name, pipeline_id, **kwargs):
return type(self)(
record_class=self.record_class,
contiguous_record_ids=self.contiguous_record_ids,
sequenced_item_class=self.sequenced_item_class,
application_name=application_name,
pipeline_id=pipeline_id,
**kwargs
)
[docs] @abstractmethod
def record_sequenced_items(self, sequenced_item_or_items):
"""
Writes sequenced item(s) into the datastore.
"""
[docs] def record_sequenced_item(self, sequenced_item):
"""
Writes sequenced item into the datastore.
"""
return self.record_sequenced_items(sequenced_item)
[docs] def get_item(self, sequence_id, position):
"""
Gets sequenced item from the datastore.
"""
return self.from_record(self.get_record(sequence_id, position))
[docs] @abstractmethod
def get_record(self, sequence_id, position):
"""
Gets record at position in sequence.
"""
[docs] def get_items(
self,
sequence_id,
gt=None,
gte=None,
lt=None,
lte=None,
limit=None,
query_ascending=True,
results_ascending=True,
):
"""
Returns sequenced item generator.
"""
records = self.get_records(
sequence_id=sequence_id,
gt=gt,
gte=gte,
lt=lt,
lte=lte,
limit=limit,
query_ascending=query_ascending,
results_ascending=results_ascending,
)
for item in map(self.from_record, records):
yield item
[docs] def list_items(self, *args, **kwargs):
"""
Returns list of sequenced items.
"""
return list(self.get_items(*args, **kwargs))
[docs] @abstractmethod
def get_records(
self,
sequence_id,
gt=None,
gte=None,
lt=None,
lte=None,
limit=None,
query_ascending=True,
results_ascending=True,
):
"""
Returns records for a sequence.
"""
[docs] def to_record(self, sequenced_item):
"""
Constructs a record object from given sequenced item object.
"""
kwargs = self.get_field_kwargs(sequenced_item)
# Supply application_name, if needed.
if hasattr(self.record_class, "application_name"):
kwargs["application_name"] = self.application_name
# Supply pipeline_id, if needed.
if hasattr(self.record_class, "pipeline_id"):
kwargs["pipeline_id"] = self.pipeline_id
return self.record_class(**kwargs)
[docs] def from_record(self, record):
"""
Constructs and returns a sequenced item object, from given ORM object.
"""
kwargs = self.get_field_kwargs(record)
return self.sequenced_item_class(**kwargs)
[docs] @abstractmethod
def get_notifications(self, start=None, stop=None, *args, **kwargs):
"""
Returns records sequenced by notification ID, from
application, for pipeline, in given range.
Args 'start' and 'stop' are positions in a zero-based
integer sequence.
"""
[docs] @abstractmethod
def all_sequence_ids(self):
"""
Returns all sequence IDs.
"""
def list_sequence_ids(self):
return list(self.all_sequence_ids())
[docs] @abstractmethod
def delete_record(self, record):
"""
Removes permanently given record from the table.
"""
def get_field_kwargs(self, item):
return {name: getattr(item, name) for name in self.field_names}
def raise_sequenced_item_conflict(self):
msg = "Position already taken in sequence"
raise RecordConflictError(msg)
def raise_index_error(self, position):
raise IndexError("Sequence index out of range: {}".format(position))
def raise_record_integrity_error(self, e):
raise RecordConflictError(e)
def raise_operational_error(self, e):
raise OperationalError(e)
[docs]class ACIDRecordManager(AbstractSequencedItemRecordManager):
"""
ACID record managers can write tracking records and event records
in an atomic transaction, needed for atomic processing in process
applications.
"""
tracking_record_field_names = [
"application_name",
"upstream_application_name",
"pipeline_id",
"notification_id",
]
[docs] def __init__(self, tracking_record_class=None, *args, **kwargs):
super(ACIDRecordManager, self).__init__(*args, **kwargs)
# assert tracking_record_class is not None
self.tracking_record_class = tracking_record_class
def clone(self, **kwargs):
return super(ACIDRecordManager, self).clone(
tracking_record_class=self.tracking_record_class, **kwargs
)
[docs] @abstractmethod
def write_records(
self,
records,
tracking_kwargs=None,
orm_objs_pending_save=None,
orm_objs_pending_delete=None,
):
"""
Writes tracking, event and notification records for a process event.
:param orm_objs_pending_delete:
:param orm_objs_pending_save:
"""
[docs] @abstractmethod
def get_max_record_id(self):
"""Return maximum notification ID in pipeline."""
[docs] @abstractmethod
def get_max_tracking_record_id(self, upstream_application_name):
"""Return maximum tracking record ID for notification from upstream
application in pipeline."""
[docs] @abstractmethod
def has_tracking_record(
self, upstream_application_name, pipeline_id, notification_id
):
"""
True if tracking record exists for notification from upstream in pipeline.
"""
[docs] def get_pipeline_and_notification_id(self, sequence_id, position):
"""
Returns pipeline ID and notification ID for
event at given position in given sequence.
"""
# Todo: Optimise query by selecting only two columns,
# pipeline_id and id (notification ID)?
record = self.get_record(sequence_id, position)
notification_id = getattr(record, self.notification_id_name)
return record.pipeline_id, notification_id
[docs]class SQLRecordManager(ACIDRecordManager):
"""
Common aspects of SQL record managers, such as SQLAlchemy and Django record
managers.
"""
# Todo: This makes the subclasses harder to read and probably more brittle. So it
# might be better to inline this with the subclasses, so that each looks more
# like normal Django or SQLAlchemy code.
# Todo: Also, the record manager test cases don't cover the notification log and
# tracking record functionality needed by ProcessApplication, and should so
# that other record managers can more easily be developed.
[docs] def __init__(self, *args, **kwargs):
super(SQLRecordManager, self).__init__(*args, **kwargs)
self._insert_select_max = None
self._insert_values = None
self._insert_tracking_record = None
[docs] def record_sequenced_items(self, sequenced_item_or_items):
# Convert sequenced item(s) to database record(s).
records = self.to_records(sequenced_item_or_items)
# Write records.
self.write_records(records)
def to_records(self, sequenced_item_or_items):
if isinstance(sequenced_item_or_items, list):
records = [self.to_record(i) for i in sequenced_item_or_items]
else:
records = [self.to_record(sequenced_item_or_items)]
return records
@property
def insert_select_max(self):
"""
SQL statement that inserts records with contiguous IDs,
by selecting max ID from indexed table records.
"""
if self._insert_select_max is None:
if hasattr(self.record_class, "application_name"):
# Todo: Maybe make it support application_name without pipeline_id?
assert hasattr(self.record_class, "pipeline_id"), self.record_class
tmpl = self._insert_select_max_tmpl + self._where_application_name_tmpl
else:
tmpl = self._insert_select_max_tmpl
self._insert_select_max = self._prepare_insert(
tmpl=tmpl,
record_class=self.record_class,
field_names=list(self.field_names),
)
return self._insert_select_max
[docs] @abstractmethod
def _prepare_insert(
self, tmpl, record_class, field_names, placeholder_for_id=False
):
"""
Compile SQL statement with placeholders for bind parameters.
"""
_insert_select_max_tmpl = (
"INSERT INTO {tablename} ({notification_id}, {columns}) "
"SELECT COALESCE(MAX({tablename}.{notification_id}), 0) + 1, {placeholders} "
"FROM "
"{tablename}"
)
_where_application_name_tmpl = None
@property
def insert_values(self):
"""
SQL statement that inserts records without ID.
"""
if self._insert_values is None:
self._insert_values = self._prepare_insert(
tmpl=self._insert_values_tmpl,
placeholder_for_id=True,
record_class=self.record_class,
field_names=self.field_names,
)
return self._insert_values
@property
def insert_tracking_record(self):
"""
SQL statement that inserts tracking records.
"""
if self._insert_tracking_record is None:
self._insert_tracking_record = self._prepare_insert(
tmpl=self._insert_values_tmpl,
placeholder_for_id=True,
record_class=self.tracking_record_class,
field_names=self.tracking_record_field_names,
)
return self._insert_tracking_record
_insert_values_tmpl = (
"INSERT INTO {tablename} ({columns}) " "VALUES ({placeholders});"
)
[docs] @abstractmethod
def get_record_table_name(self, record_class):
"""
Returns table name - used in raw queries.
:rtype: str
"""