Source code for eventsourcing.infrastructure.base

from abc import ABCMeta, abstractmethod

import six

from eventsourcing.exceptions import OperationalError, RecordConflictError
from eventsourcing.infrastructure.sequenceditem import SequencedItem, SequencedItemFieldNames


[docs]class AbstractSequencedItemRecordManager(six.with_metaclass(ABCMeta)): def __init__(self, record_class, sequenced_item_class=SequencedItem, contiguous_record_ids=False, application_id=None, pipeline_id=-1): self.record_class = record_class self.sequenced_item_class = sequenced_item_class self.field_names = SequencedItemFieldNames(self.sequenced_item_class) self.contiguous_record_ids = contiguous_record_ids and hasattr(self.record_class, 'id') if hasattr(self.record_class, 'application_id'): assert application_id, "'application_id' not set when required" assert contiguous_record_ids, "'contiguous_record_ids' not set when required" self.application_id = application_id if hasattr(self.record_class, 'pipeline_id'): assert hasattr(self.record_class, 'application_id'), "'application_id' column not defined" self.pipeline_id = pipeline_id
[docs] @abstractmethod def append(self, sequenced_item_or_items): """ Writes sequenced item into the datastore. """
[docs] @abstractmethod def get_item(self, sequence_id, position): """ Gets sequenced item from the datastore. """
[docs] def list_items(self, *args, **kwargs): return list(self.get_items(*args, **kwargs))
[docs] def get_items(self, sequence_id, gt=None, gte=None, lt=None, lte=None, limit=None, query_ascending=True, results_ascending=True): """ Returns sequenced items. """ records = self.get_records( sequence_id=sequence_id, gt=gt, gte=gte, lt=lt, lte=lte, limit=limit, query_ascending=query_ascending, results_ascending=results_ascending, ) for item in six.moves.map(self.from_record, records): yield item
[docs] @abstractmethod def get_records(self, sequence_id, gt=None, gte=None, lt=None, lte=None, limit=None, query_ascending=True, results_ascending=True): """ Returns records for a sequence. """
[docs] def to_record(self, sequenced_item): """ Constructs and returns an ORM object, from given sequenced item object. """ kwargs = self.get_field_kwargs(sequenced_item) if hasattr(self.record_class, 'application_id'): kwargs['application_id'] = self.application_id return self.record_class(**kwargs)
[docs] def from_record(self, record): """ Constructs and returns a sequenced item object, from given ORM object. """ kwargs = self.get_field_kwargs(record) return self.sequenced_item_class(**kwargs)
[docs] @abstractmethod def get_notifications(self, start=None, stop=None, *args, **kwargs): """ Returns records sequenced by notification ID, from application, for pipeline, in given range. """
[docs] @abstractmethod def all_sequence_ids(self): """ Returns all sequence IDs. """
[docs] def list_sequence_ids(self): return list(self.all_sequence_ids())
[docs] @abstractmethod def delete_record(self, record): """ Removes permanently given record from the table. """
[docs] def get_field_kwargs(self, item): return {name: getattr(item, name) for name in self.field_names}
[docs] def raise_sequenced_item_conflict(self): msg = "Position already taken in sequence" raise RecordConflictError(msg)
[docs] def raise_index_error(self, position): raise IndexError("Sequence index out of range: {}".format(position))
[docs] def raise_record_integrity_error(self, e): raise RecordConflictError(e)
[docs] def raise_operational_error(self, e): raise OperationalError(e)
[docs]class ACIDRecordManager(AbstractSequencedItemRecordManager): """ ACID record managers can write tracking records and event records in an atomic transaction, needed for atomic processing in process applications. """ tracking_record_class = None tracking_record_field_names = [ 'application_id', 'upstream_application_id', 'pipeline_id', 'notification_id', # 'originator_id', # 'originator_version', ]
[docs] @abstractmethod def get_max_record_id(self): """Return maximum notification ID in pipeline."""
[docs]class RelationalRecordManager(ACIDRecordManager): def __init__(self, *args, **kwargs): super(RelationalRecordManager, self).__init__(*args, **kwargs) self._insert_select_max = None self._insert_values = None self._insert_tracking_record = None
[docs] def append(self, sequenced_item_or_items): # Convert sequenced item(s) to database record(s). records = self.to_records(sequenced_item_or_items) # Write records. self.write_records(records)
[docs] def to_records(self, sequenced_item_or_items): if isinstance(sequenced_item_or_items, list): records = [self.to_record(i) for i in sequenced_item_or_items] else: records = [self.to_record(sequenced_item_or_items)] return records
[docs] def write_records(self, records, tracking_kwargs=None): """ Creates records in the database. :param tracking_kwargs: """
@property def insert_select_max(self): """ SQL statement that inserts records with contiguous IDs, by selecting max ID from indexed table records. """ if self._insert_select_max is None: if hasattr(self.record_class, 'application_id'): # Todo: Maybe make it support application_id with pipeline_id? assert hasattr(self.record_class, 'pipeline_id') tmpl = self._insert_select_max_where_application_id_tmpl else: tmpl = self._insert_select_max_tmpl self._insert_select_max = self._prepare_insert( tmpl=tmpl, record_class=self.record_class, field_names=list(self.field_names), ) return self._insert_select_max
[docs] @abstractmethod def _prepare_insert(self, tmpl, record_class, field_names, placeholder_for_id=False): """ Compile SQL statement with placeholders for bind parameters. """
_insert_select_max_tmpl = ( "INSERT INTO {tablename} (id, {columns}) " "SELECT COALESCE(MAX({tablename}.id), 0) + 1, {placeholders} " "FROM {tablename};" ) _insert_select_max_where_application_id_tmpl = ( "INSERT INTO {tablename} (id, {columns}) " "SELECT COALESCE(MAX({tablename}.id), 0) + 1, {placeholders} " "FROM {tablename} WHERE application_id=:application_id AND pipeline_id=:pipeline_id;" ) @property def insert_values(self): """ SQL statement that inserts records without ID. """ if self._insert_values is None: self._insert_values = self._prepare_insert( tmpl=self._insert_values_tmpl, placeholder_for_id=True, record_class=self.record_class, field_names=self.field_names, ) return self._insert_values @property def insert_tracking_record(self): """ SQL statement that inserts tracking records. """ if self._insert_tracking_record is None: self._insert_tracking_record = self._prepare_insert( tmpl=self._insert_values_tmpl, placeholder_for_id=True, record_class=self.tracking_record_class, field_names=self.tracking_record_field_names, ) return self._insert_tracking_record _insert_values_tmpl = ( "INSERT INTO {tablename} ({columns}) " "VALUES ({placeholders});" )
[docs] @abstractmethod def get_record_table_name(self, record_class): """ Returns table name - used in raw queries. :rtype: str """
[docs] def clone(self, application_id, pipeline_id, **kwargs): return type(self)( record_class=self.record_class, contiguous_record_ids=self.contiguous_record_ids, sequenced_item_class=self.sequenced_item_class, application_id=application_id, pipeline_id=pipeline_id, **kwargs )
[docs]class AbstractTrackingRecordManager(six.with_metaclass(ABCMeta)): @property @abstractmethod def record_class(self): """Returns tracking record class."""
[docs] @abstractmethod def get_max_record_id(self, application_name, upstream_application_name, pipeline_id): """Returns maximum record ID for given application name."""