from abc import ABC, abstractmethod
from eventsourcing.exceptions import OperationalError, RecordConflictError
from eventsourcing.infrastructure.sequenceditem import SequencedItem, SequencedItemFieldNames
DEFAULT_PIPELINE_ID = 0
[docs]class AbstractSequencedItemRecordManager(ABC):
def __init__(self, record_class, sequenced_item_class=SequencedItem,
contiguous_record_ids=False, application_name=None,
pipeline_id=DEFAULT_PIPELINE_ID):
self.record_class = record_class
self.sequenced_item_class = sequenced_item_class
self.field_names = SequencedItemFieldNames(self.sequenced_item_class)
if hasattr(self.record_class, 'id'):
self.notification_id_name = 'id'
elif hasattr(self.record_class, 'notification_id'):
self.notification_id_name = 'notification_id'
else:
self.notification_id_name = ''
self.contiguous_record_ids = contiguous_record_ids and self.notification_id_name
if hasattr(self.record_class, 'application_name'):
assert application_name, "'application_name' not set when required"
assert contiguous_record_ids, "'contiguous_record_ids' not set when required"
self.application_name = application_name
if hasattr(self.record_class, 'pipeline_id'):
assert hasattr(self.record_class, 'application_name'), "'application_name' column not defined"
self.pipeline_id = pipeline_id
[docs] def clone(self, application_name, pipeline_id, **kwargs):
return type(self)(
record_class=self.record_class,
contiguous_record_ids=self.contiguous_record_ids,
sequenced_item_class=self.sequenced_item_class,
application_name=application_name,
pipeline_id=pipeline_id,
**kwargs
)
[docs] @abstractmethod
def record_sequenced_items(self, sequenced_item_or_items):
"""
Writes sequenced item(s) into the datastore.
"""
[docs] def record_sequenced_item(self, sequenced_item):
"""
Writes sequenced item into the datastore.
"""
return self.record_sequenced_items(sequenced_item)
[docs] def get_item(self, sequence_id, position):
"""
Gets sequenced item from the datastore.
"""
return self.from_record(self.get_record(sequence_id, position))
[docs] @abstractmethod
def get_record(self, sequence_id, position):
"""
Gets record at position in sequence.
"""
[docs] def get_items(self, sequence_id, gt=None, gte=None, lt=None, lte=None, limit=None,
query_ascending=True, results_ascending=True):
"""
Returns sequenced item generator.
"""
records = self.get_records(
sequence_id=sequence_id,
gt=gt,
gte=gte,
lt=lt,
lte=lte,
limit=limit,
query_ascending=query_ascending,
results_ascending=results_ascending,
)
for item in map(self.from_record, records):
yield item
[docs] def list_items(self, *args, **kwargs):
"""
Returns list of sequenced items.
"""
return list(self.get_items(*args, **kwargs))
[docs] @abstractmethod
def get_records(self, sequence_id, gt=None, gte=None, lt=None, lte=None, limit=None,
query_ascending=True, results_ascending=True):
"""
Returns records for a sequence.
"""
[docs] def to_record(self, sequenced_item):
"""
Constructs a record object from given sequenced item object.
"""
kwargs = self.get_field_kwargs(sequenced_item)
# Supply application_name, if needed.
if hasattr(self.record_class, 'application_name'):
kwargs['application_name'] = self.application_name
# Supply pipeline_id, if needed.
if hasattr(self.record_class, 'pipeline_id'):
kwargs['pipeline_id'] = self.pipeline_id
return self.record_class(**kwargs)
[docs] def from_record(self, record):
"""
Constructs and returns a sequenced item object, from given ORM object.
"""
kwargs = self.get_field_kwargs(record)
return self.sequenced_item_class(**kwargs)
[docs] @abstractmethod
def get_notifications(self, start=None, stop=None, *args, **kwargs):
"""
Returns records sequenced by notification ID, from application, for pipeline, in given range.
Args 'start' and 'stop' are positions in a zero-based integer sequence.
"""
[docs] @abstractmethod
def all_sequence_ids(self):
"""
Returns all sequence IDs.
"""
[docs] def list_sequence_ids(self):
return list(self.all_sequence_ids())
[docs] @abstractmethod
def delete_record(self, record):
"""
Removes permanently given record from the table.
"""
[docs] def get_field_kwargs(self, item):
return {name: getattr(item, name) for name in self.field_names}
[docs] def raise_sequenced_item_conflict(self):
msg = "Position already taken in sequence"
raise RecordConflictError(msg)
[docs] def raise_index_error(self, position):
raise IndexError("Sequence index out of range: {}".format(position))
[docs] def raise_record_integrity_error(self, e):
raise RecordConflictError(e)
[docs] def raise_operational_error(self, e):
raise OperationalError(e)
[docs]class ACIDRecordManager(AbstractSequencedItemRecordManager):
"""
ACID record managers can write tracking records and event records
in an atomic transaction, needed for atomic processing in process
applications.
"""
tracking_record_class = None
tracking_record_field_names = [
'application_name',
'upstream_application_name',
'pipeline_id',
'notification_id',
]
[docs] @abstractmethod
def write_records(self, records, tracking_kwargs=None):
"""
Writes tracking, event and notification records for a process event.
"""
[docs] @abstractmethod
def get_max_record_id(self):
"""Return maximum notification ID in pipeline."""
[docs] @abstractmethod
def get_max_tracking_record_id(self, upstream_application_name):
"""Return maximum tracking record ID for notification from upstream application in pipeline."""
[docs] @abstractmethod
def has_tracking_record(self, upstream_application_name, pipeline_id, notification_id):
"""
True if tracking record exists for notification from upstream in pipeline.
"""
[docs] def get_pipeline_and_notification_id(self, sequence_id, position):
"""
Returns pipeline ID and notification ID for
event at given position in given sequence.
"""
# Todo: Optimise query by selecting only two columns, pipeline_id and id (notification ID)?
record = self.get_record(sequence_id, position)
notification_id = getattr(record, self.notification_id_name)
return record.pipeline_id, notification_id
[docs]class SQLRecordManager(ACIDRecordManager):
"""
This is has code common to (extracted from) the SQLAlchemy and Django record managers.
This makes the subclasses harder to read and probably more brittle. So it might be better
to inline this with the subclasses, so that each looks more like normal Django or SQLAlchemy
code. Also, the record manager test cases don't cover the notification log and tracking record
functionality needed by ProcessApplication, and should so that other record managers can more
easily be developed.
"""
def __init__(self, *args, **kwargs):
super(SQLRecordManager, self).__init__(*args, **kwargs)
self._insert_select_max = None
self._insert_values = None
self._insert_tracking_record = None
[docs] def record_sequenced_items(self, sequenced_item_or_items):
# Convert sequenced item(s) to database record(s).
records = self.to_records(sequenced_item_or_items)
# Write records.
self.write_records(records)
[docs] def to_records(self, sequenced_item_or_items):
if isinstance(sequenced_item_or_items, list):
records = [self.to_record(i) for i in sequenced_item_or_items]
else:
records = [self.to_record(sequenced_item_or_items)]
return records
@property
def insert_select_max(self):
"""
SQL statement that inserts records with contiguous IDs,
by selecting max ID from indexed table records.
"""
if self._insert_select_max is None:
if hasattr(self.record_class, 'application_name'):
# Todo: Maybe make it support application_name without pipeline_id?
assert hasattr(self.record_class, 'pipeline_id'), self.record_class
tmpl = self._insert_select_max_tmpl + self._where_application_name_tmpl
else:
tmpl = self._insert_select_max_tmpl
self._insert_select_max = self._prepare_insert(
tmpl=tmpl,
record_class=self.record_class,
field_names=list(self.field_names),
)
return self._insert_select_max
[docs] @abstractmethod
def _prepare_insert(self, tmpl, record_class, field_names, placeholder_for_id=False):
"""
Compile SQL statement with placeholders for bind parameters.
"""
_insert_select_max_tmpl = (
"INSERT INTO {tablename} ({notification_id}, {columns}) "
"SELECT COALESCE(MAX({tablename}.{notification_id}), 0) + 1, {placeholders} "
"FROM ""{tablename}"
)
_where_application_name_tmpl = None
@property
def insert_values(self):
"""
SQL statement that inserts records without ID.
"""
if self._insert_values is None:
self._insert_values = self._prepare_insert(
tmpl=self._insert_values_tmpl,
placeholder_for_id=True,
record_class=self.record_class,
field_names=self.field_names,
)
return self._insert_values
@property
def insert_tracking_record(self):
"""
SQL statement that inserts tracking records.
"""
if self._insert_tracking_record is None:
self._insert_tracking_record = self._prepare_insert(
tmpl=self._insert_values_tmpl,
placeholder_for_id=True,
record_class=self.tracking_record_class,
field_names=self.tracking_record_field_names,
)
return self._insert_tracking_record
_insert_values_tmpl = (
"INSERT INTO {tablename} ({columns}) "
"VALUES ({placeholders});"
)
[docs] @abstractmethod
def get_record_table_name(self, record_class):
"""
Returns table name - used in raw queries.
:rtype: str
"""