Source code for eventsourcing.example.domainmodel

import uuid

from eventsourcing.domain.model.entity import AbstractEntityRepository, TimestampedVersionedEntity, mutate_entity
from eventsourcing.domain.model.events import publish
from eventsourcing.domain.model.decorators import mutator, attribute


[docs]class Example(TimestampedVersionedEntity): """ An example event sourced domain model entity. """
[docs] class Event(TimestampedVersionedEntity.Event): """Layer supertype."""
[docs] class Created(Event, TimestampedVersionedEntity.Created): """Published when an Example is created."""
[docs] class AttributeChanged(Event, TimestampedVersionedEntity.AttributeChanged): """Published when an Example is created."""
[docs] class Discarded(Event, TimestampedVersionedEntity.Discarded): """Published when an Example is discarded."""
[docs] class Heartbeat(Event, TimestampedVersionedEntity.Event): """Published when a heartbeat in the entity occurs (see below)."""
def __init__(self, foo='', a='', b='', **kwargs): super(Example, self).__init__(**kwargs) self._foo = foo self._a = a self._b = b self._count_heartbeats = 0 @attribute def foo(self): """An example attribute.""" @attribute def a(self): """An example attribute.""" @attribute def b(self): """Another example attribute."""
[docs] def beat_heart(self, number_of_beats=1): self._assert_not_discarded() while number_of_beats > 0: event = self.Heartbeat(originator_id=self._id, originator_version=self._version) self._apply_and_publish(event) number_of_beats -= 1
[docs] def count_heartbeats(self): return self._count_heartbeats
@classmethod def _mutate(cls, initial, event): return example_mutator(initial or cls, event)
[docs]@mutator def example_mutator(initial, event, ): return mutate_entity(initial, event)
[docs]@example_mutator.register(Example.Heartbeat) def heartbeat_mutator(self, event): self._validate_originator(event) assert isinstance(self, Example), self self._count_heartbeats += 1 self._increment_version() return self
[docs]class AbstractExampleRepository(AbstractEntityRepository): pass
[docs]def create_new_example(foo='', a='', b=''): """ Factory method for example entities. :rtype: Example """ entity_id = uuid.uuid4() event = Example.Created(originator_id=entity_id, foo=foo, a=a, b=b) entity = Example._mutate(initial=None, event=event) publish(event=event) return entity