from abc import ABCMeta, abstractmethod
import six
from eventsourcing.domain.model.decorators import retry
from eventsourcing.exceptions import RecordIDConflict, SequencedItemConflict
from eventsourcing.infrastructure.sequenceditem import SequencedItem, SequencedItemFieldNames
[docs]class AbstractRecordManager(six.with_metaclass(ABCMeta)):
def __init__(self, record_class, sequenced_item_class=SequencedItem, contiguous_record_ids=False):
self.record_class = record_class
self.sequenced_item_class = sequenced_item_class
self.field_names = SequencedItemFieldNames(self.sequenced_item_class)
self.contiguous_record_ids = contiguous_record_ids and hasattr(self.record_class, 'id')
[docs] @abstractmethod
def append(self, sequenced_item_or_items):
"""
Writes sequenced item into the datastore.
"""
[docs] @abstractmethod
def get_item(self, sequence_id, eq):
"""
Reads sequenced item from the datastore.
"""
[docs] def list_items(self, *args, **kwargs):
return list(self.get_items(*args, **kwargs))
[docs] @abstractmethod
def get_items(self, sequence_id, gt=None, gte=None, lt=None, lte=None, limit=None,
query_ascending=True, results_ascending=True):
"""
Reads sequenced items from the datastore.
"""
[docs] def all_items(self, *args, **kwargs):
"""
Returns all items across all sequences.
"""
return six.moves.map(self.from_record, self.all_records(*args, **kwargs))
[docs] def to_record(self, sequenced_item):
"""
Returns a database record, from given sequenced item.
"""
# Check we got a sequenced item.
assert isinstance(sequenced_item, self.sequenced_item_class), (type(sequenced_item), self.sequenced_item_class)
# Construct and return an ORM object.
kwargs = self.get_field_kwargs(sequenced_item)
return self.record_class(**kwargs)
[docs] def from_record(self, record):
"""
Returns a sequenced item instance, from given database record.
"""
kwargs = self.get_field_kwargs(record)
return self.sequenced_item_class(**kwargs)
[docs] @abstractmethod
def all_records(self, start=None, stop=None, *args, **kwargs):
"""
Returns all records in the table (possibly in chronological order, depending on database).
"""
[docs] @abstractmethod
def delete_record(self, record):
"""
Removes permanently given record from the table.
"""
[docs] def get_field_kwargs(self, item):
return {name: getattr(item, name) for name in self.field_names}
[docs] def raise_sequenced_item_conflict(self):
msg = "Position already taken in sequence"
raise SequencedItemConflict(msg)
[docs] def raise_index_error(self, eq):
raise IndexError("Sequence index out of range: {}".format(eq))
[docs]class RelationalRecordManager(AbstractRecordManager):
def __init__(self, *args, **kwargs):
super(RelationalRecordManager, self).__init__(*args, **kwargs)
self._insert_select_max = None
self._insert_values = None
[docs] def append(self, sequenced_item_or_items):
# Convert sequenced item(s) to database record(s).
if isinstance(sequenced_item_or_items, list):
records = [self.to_record(i) for i in sequenced_item_or_items]
else:
records = [self.to_record(sequenced_item_or_items)]
self.write_records(records)
[docs] @retry(RecordIDConflict, max_attempts=100, wait=0.005)
def write_records(self, records):
"""
Calls _write_records() implemented by concrete classes.
Retries call in case of a RecordIDConflict.
"""
self._write_records(records)
[docs] @abstractmethod
def _write_records(self, records):
"""
Actually creates records in the database.
"""
@property
def insert_select_max(self):
"""
SQL statement that inserts records with contiguous IDs,
by selecting max ID from indexed table records.
"""
if self._insert_select_max is None:
self._insert_select_max = self._prepare_insert(self._insert_select_max_tmpl)
return self._insert_select_max
[docs] @abstractmethod
def _prepare_insert(self, tmpl):
"""
Compile SQL statement with placeholders for bind parameters.
"""
_insert_select_max_tmpl = (
"INSERT INTO {tablename} (id, {columns}) "
"SELECT COALESCE(MAX({tablename}.id), 0) + 1, {placeholders} "
"FROM {tablename};"
)
@property
def insert_values(self):
"""
SQL statement that inserts records without ID.
"""
if self._insert_values is None:
self._insert_values = self._prepare_insert(tmpl=self._insert_values_tmpl)
return self._insert_values
_insert_values_tmpl = (
"INSERT INTO {tablename} ({columns}) "
"VALUES ({placeholders});"
)
[docs] def get_items(self, sequence_id, gt=None, gte=None, lt=None, lte=None, limit=None,
query_ascending=True, results_ascending=True):
"""
Returns items of a sequence.
"""
records = self.get_records(
sequence_id=sequence_id,
gt=gt,
gte=gte,
lt=lt,
lte=lte,
limit=limit,
query_ascending=query_ascending,
results_ascending=results_ascending,
)
for item in six.moves.map(self.from_record, records):
yield item
[docs] @abstractmethod
def get_records(self, sequence_id, gt=None, gte=None, lt=None, lte=None, limit=None,
query_ascending=True, results_ascending=True):
"""
Returns records for a sequence.
"""
# # Todo: Drop this, it doesn't really help.
# def __getitem__(self, item=None):
# assert isinstance(item, slice), type(item)
# # start = item.start or 0
# # assert start >= 0, start
# return self.all_records(start=item.start, stop=item.stop)
[docs] @abstractmethod
def get_max_record_id(self):
"""Return maximum ID of existing records."""
@property
@abstractmethod
def record_table_name(self):
"""
Returns table name - used in raw queries,
and to detect record ID conflicts.
:rtype: str
"""
[docs] def raise_after_integrity_error(self, e):
error = str(e)
# Try to identify record ID conflicts.
if self.contiguous_record_ids:
# Assume record ID is primary key.
# - SQLite
if "UNIQUE constraint failed: {}.id".format(self.record_table_name) in error:
self.raise_record_id_conflict()
# - MySQL
elif 'Duplicate entry' in error and "for key 'PRIMARY'" in error:
self.raise_record_id_conflict()
# - PostgreSQL
elif 'duplicate key value violates unique constraint "{}_pkey"'.format(self.record_table_name) in error:
self.raise_record_id_conflict()
self.raise_sequenced_item_conflict()
[docs] @staticmethod
def raise_record_id_conflict():
"""
Raises RecordIDConflict exception.
"""
msg = "There was a record ID conflict"
raise RecordIDConflict(msg)