Source code for eventsourcing.infrastructure.popo.manager

from typing import Any, Dict, Iterable, List, NamedTuple, Optional, Sequence
from uuid import UUID

from readerwriterlock.rwlock import RWLockFair

from eventsourcing.exceptions import ProgrammingError, RecordConflictError
from eventsourcing.infrastructure.base import (
    EVENT_NOT_NOTIFIABLE,
    RecordManagerWithTracking,
    TrackingKwargs,
)


class PopoNotification(object):
    def __init__(
        self,
        notification_id: int,
        originator_id: UUID,
        originator_version: int,
        topic: str,
        state: str,
    ):
        self.notification_id = notification_id
        self.originator_id = originator_id
        self.originator_version = originator_version
        self.topic = topic
        self.state = state

    @property
    def id(self):
        return self.notification_id


[docs]class PopoRecordManager(RecordManagerWithTracking):
[docs] def __init__(self, *args: Any, **kwargs: Any): super(PopoRecordManager, self).__init__(*args, **kwargs) self._all_sequence_records: Dict[ Optional[str], Dict[UUID, Dict[int, NamedTuple]] ] = {} self._all_sequence_max: Dict = {} self._all_tracking_records: Dict = {} self._all_tracking_max: Dict = {} self._all_notification_records: Dict[Optional[str], Dict[int, Any]] = {} self._all_notification_max: Dict = {} self._rw_lock: RWLockFair = RWLockFair()
[docs] def all_sequence_ids(self) -> List[UUID]: ids: List[UUID] = [] with self._rw_lock.gen_rlock(): try: ids = list(self._all_sequence_records[self.application_name].keys()) except KeyError: pass return ids
[docs] def delete_record(self, record: Any) -> None: with self._rw_lock.gen_wlock(): sequence_records = self._get_sequence_records(record.sequence_id) position = getattr(record, self.field_names.position) sequence_records.pop(position, None)
[docs] def get_max_notification_id(self) -> int: with self._rw_lock.gen_rlock(): max_record_id = self._get_max_record_id() return max_record_id
def _get_max_record_id(self) -> int: try: max_notification_id = self._all_notification_max[self.application_name] except KeyError: return 0 else: return max_notification_id def _get_notification_records(self) -> Dict[int, Any]: notification_records: Dict[int, Any] = {} try: notification_records = self._all_notification_records[self.application_name] except KeyError: pass return notification_records
[docs] def get_notification_records( self, start: Optional[int] = None, stop: Optional[int] = None, *args: Any, **kwargs: Any ) -> Iterable: notifications = [] with self._rw_lock.gen_rlock(): notification_records = self._get_notification_records() if start is None: i = 1 else: i = start + 1 while True: if stop is not None and i >= stop + 1: break try: notification_record = notification_records[i] notification = PopoNotification( notification_id=notification_record["notification_id"], originator_id=getattr( notification_record["sequenced_item"], self.field_names.sequence_id, ), originator_version=getattr( notification_record["sequenced_item"], self.field_names.position, ), topic=notification_record["sequenced_item"].topic, state=notification_record["sequenced_item"].state, ) notifications.append(notification) except KeyError: break else: i += 1 return notifications
[docs] def get_max_tracking_record_id(self, upstream_application_name: str) -> int: max_id = 0 with self._rw_lock.gen_rlock(): try: app_records = self._all_tracking_records[self.application_name] upstream_records = app_records[upstream_application_name] except KeyError: pass else: if len(upstream_records): max_id = max(upstream_records) return max_id
[docs] def get_record(self, sequence_id: UUID, position: int) -> Any: with self._rw_lock.gen_rlock(): try: return self._get_sequence_records(sequence_id)[position] except KeyError: raise IndexError(self.application_name, sequence_id, position)
[docs] def get_records( self, sequence_id: UUID, gt: Optional[int] = None, gte: Optional[int] = None, lt: Optional[int] = None, lte: Optional[int] = None, limit: Optional[int] = None, query_ascending: bool = True, results_ascending: bool = True, ) -> Sequence[Any]: start = None if gt is not None: start = gt + 1 if gte is not None: if start is None: start = gte else: start = max(start, gte) end = None if lt is not None: end = lt if lte is not None: if end is None: end = lte + 1 else: end = min(end, lte + 1) selected_records: List = [] with self._rw_lock.gen_rlock(): all_sequence_records = self._get_sequence_records(sequence_id) if not len(all_sequence_records): return [] if end is None: max_position = self._all_sequence_max[self.application_name][ sequence_id ] end = max_position + 1 if start is None: start = min(all_sequence_records.keys()) for position in range(start, end): try: record = all_sequence_records[position] except KeyError: pass else: selected_records.append(record) if not query_ascending: selected_records = list(reversed(selected_records)) if limit is not None: selected_records = list(selected_records)[:limit] if query_ascending != results_ascending: selected_records = list(reversed(selected_records)) return selected_records
def _get_sequence_records(self, sequence_id: UUID) -> Dict: try: return self._all_sequence_records[self.application_name][sequence_id] except KeyError: return {}
[docs] def has_tracking_record( self, upstream_application_name: str, pipeline_id: int, notification_id: int ) -> bool: with self._rw_lock.gen_rlock(): try: app_records = self._all_tracking_records[self.application_name] upstream_records = app_records[upstream_application_name] except KeyError: pass else: return notification_id in upstream_records
[docs] def record_items(self, sequenced_items: Iterable[NamedTuple]) -> None: records = self.to_records(sequenced_items) self.write_records(records=records)
[docs] def write_records( self, records: Iterable[Any], tracking_kwargs: Optional[TrackingKwargs] = None, orm_objs_pending_save: Optional[Sequence[Any]] = None, orm_objs_pending_delete: Optional[Sequence[Any]] = None, ) -> None: with self._rw_lock.gen_wlock(): # Write event and notification records. if self.notification_id_name: records = list(records) all_notification_ids = set( getattr(r, self.notification_id_name) for r in records ) if None in all_notification_ids: if len(all_notification_ids) > 1: raise ProgrammingError("Only some records have IDs") for record in records: self._insert_record(record) if tracking_kwargs: # Write a tracking record. upstream_application_name = tracking_kwargs["upstream_application_name"] application_name = tracking_kwargs["application_name"] notification_id = tracking_kwargs["notification_id"] assert application_name == self.application_name, ( application_name, self.application_name, ) try: app_tracking_records = self._all_tracking_records[application_name] except KeyError: app_tracking_records = {} self._all_tracking_records[ self.application_name ] = app_tracking_records try: upstream_tracking_records = app_tracking_records[ upstream_application_name ] except KeyError: upstream_tracking_records = set() app_tracking_records[ upstream_application_name ] = upstream_tracking_records if notification_id in upstream_tracking_records: raise RecordConflictError( (application_name, upstream_application_name, notification_id) ) upstream_tracking_records.add(notification_id)
def _insert_record(self, record: NamedTuple) -> None: position = getattr(record, self.field_names.position) if not isinstance(position, int): raise NotImplementedError( "Popo record manager only supports sequencing with integers, " "but position was a {}".format(type(position)) ) if self.notification_id_name: notification_id = getattr(record, self.notification_id_name) if notification_id != EVENT_NOT_NOTIFIABLE: if notification_id is not None: if not isinstance(notification_id, int): raise ProgrammingError( "%s must be an %s not %s: %s" % ( self.notification_id_name, int, type(notification_id), record.__dict__, ) ) sequence_id = getattr(record, self.field_names.sequence_id) try: application_records = self._all_sequence_records[self.application_name] except KeyError: sequence_records: Dict[int, NamedTuple] = {} application_records = {sequence_id: sequence_records} self._all_sequence_records[self.application_name] = application_records self._all_sequence_max[self.application_name] = {} else: try: sequence_records = application_records[sequence_id] except KeyError: sequence_records = {} application_records[sequence_id] = sequence_records if position in sequence_records: raise RecordConflictError(position, len(sequence_records)) if self.notification_id_name: # Just make sure we aren't making a gap in the sequence. if sequence_records: max_position = self._all_sequence_max[self.application_name][ sequence_id ] next_position = max_position + 1 else: next_position = 0 if position != next_position: raise AssertionError( "Next position for sequence {} is {}, not {}".format( sequence_id, next_position, position ) ) sequence_records[position] = record self._all_sequence_max[self.application_name][sequence_id] = position # Write a notification record. if self.notification_id_name: try: notification_records = self._all_notification_records[ self.application_name ] except KeyError: notification_records = {} self._all_notification_records[ self.application_name ] = notification_records if self.notification_id_name: notification_id = getattr(record, self.notification_id_name) if notification_id == EVENT_NOT_NOTIFIABLE: setattr(record, self.notification_id_name, None) else: if notification_id is None: notification_id = (self._get_max_record_id() or 0) + 1 setattr(record, self.notification_id_name, notification_id) notification_records[notification_id] = { "notification_id": notification_id, "sequenced_item": record, } self._all_notification_max[self.application_name] = notification_id
[docs] def to_record(self, sequenced_item: NamedTuple) -> object: return self.record_class(sequenced_item)
def to_records(self, sequenced_items: Iterable[NamedTuple]) -> Iterable[Any]: return (self.record_class(s) for s in sequenced_items)