Source code for eventsourcing.domain.model.timebucketedlog

import datetime
from time import mktime, time
from uuid import UUID, uuid5

import six
from dateutil.relativedelta import relativedelta

from eventsourcing.domain.model.entity import AbstractEntityRepository, TimestampedVersionedEntity
from eventsourcing.domain.model.events import publish, EventWithTimestamp, EventWithOriginatorID, Logged
from eventsourcing.exceptions import RepositoryKeyError
from eventsourcing.utils.times import utc_timezone, decimaltimestamp, datetime_from_timestamp
from eventsourcing.utils.topic import get_topic

Namespace_Timebuckets = UUID('0d7ee297-a976-4c29-91ff-84ffc79d8155')

ONE_YEAR = relativedelta(years=1)
ONE_MONTH = relativedelta(months=1)
ONE_DAY = relativedelta(days=1)
ONE_HOUR = relativedelta(hours=1)
ONE_MINUTE = relativedelta(minutes=1)
ONE_SECOND = relativedelta(seconds=1)

BUCKET_SIZES = {
    'year': ONE_YEAR,
    'month': ONE_MONTH,
    'day': ONE_DAY,
    'hour': ONE_HOUR,
    'minute': ONE_MINUTE,
    'second': ONE_SECOND,
}


[docs]class Timebucketedlog(TimestampedVersionedEntity):
[docs] class Event(TimestampedVersionedEntity.Event): """Supertype for events of time-bucketed log."""
[docs] class Started(TimestampedVersionedEntity.Created, Event): pass
[docs] class BucketSizeChanged(Event, TimestampedVersionedEntity.AttributeChanged): pass
def __init__(self, name, bucket_size=None, **kwargs): super(Timebucketedlog, self).__init__(**kwargs) self._name = name self._bucket_size = bucket_size @property def name(self): return self._name @property def started_on(self): return self.__created_on__ @property def bucket_size(self): return self._bucket_size
[docs] def append_message(self, message): assert isinstance(message, six.string_types) bucket_id = make_timebucket_id(self.name, decimaltimestamp(), self.bucket_size) event = MessageLogged( originator_id=bucket_id, message=message, ) publish(event) return event
[docs]class TimebucketedlogRepository(AbstractEntityRepository):
[docs] def get_or_create(self, log_name, bucket_size): """ Gets or creates a log. :rtype: Timebucketedlog """ try: return self[log_name] except RepositoryKeyError: return start_new_timebucketedlog(log_name, bucket_size=bucket_size)
[docs]def start_new_timebucketedlog(name, bucket_size=None): if bucket_size is None: bucket_size = 'year' if bucket_size not in BUCKET_SIZES: raise ValueError("Bucket size '{}' not supported, must be one of: {}" "".format(bucket_size, BUCKET_SIZES.keys())) event = Timebucketedlog.Started( originator_id=name, name=name, bucket_size=bucket_size, originator_topic=get_topic(Timebucketedlog) ) entity = event.__mutate__() publish(event) return entity
[docs]class MessageLogged(EventWithTimestamp, EventWithOriginatorID, Logged): def __init__(self, message, originator_id): super(MessageLogged, self).__init__(originator_id=originator_id, message=message) @property def message(self): return self.__dict__['message']
[docs]def make_timebucket_id(log_id, timestamp, bucket_size): d = datetime_from_timestamp(timestamp) assert isinstance(d, datetime.datetime) if bucket_size.startswith('year'): boundary = '{:04}'.format( d.year ) elif bucket_size.startswith('month'): boundary = '{:04}-{:02}'.format( d.year, d.month ) elif bucket_size.startswith('day'): boundary = '{:04}-{:02}-{:02}'.format( d.year, d.month, d.day ) elif bucket_size.startswith('hour'): boundary = '{:04}-{:02}-{:02}_{:02}'.format( d.year, d.month, d.day, d.hour ) elif bucket_size.startswith('minute'): boundary = '{:04}-{:02}-{:02}_{:02}-{:02}'.format( d.year, d.month, d.day, d.hour, d.minute ) elif bucket_size.startswith('second'): boundary = '{:04}-{:02}-{:02}_{:02}-{:02}-{:02}'.format( d.year, d.month, d.day, d.hour, d.minute, d.second ) else: raise ValueError("Bucket size not supported: {}".format(bucket_size)) return uuid5(Namespace_Timebuckets, log_id.hex + '_' + boundary)
[docs]def next_bucket_starts(timestamp, bucket_size): starts = bucket_starts(timestamp, bucket_size) duration = bucket_duration(bucket_size) return timestamp_from_datetime((starts + duration))
[docs]def previous_bucket_starts(timestamp, bucket_size): starts = bucket_starts(timestamp, bucket_size) duration = bucket_duration(bucket_size) return timestamp_from_datetime((starts - duration))
[docs]def bucket_starts(timestamp, bucket_size): dt = datetime_from_timestamp(timestamp) assert isinstance(dt, datetime.datetime) if bucket_size.startswith('year'): return datetime.datetime(dt.year, 1, 1, tzinfo=utc_timezone) elif bucket_size.startswith('month'): return datetime.datetime(dt.year, dt.month, 1, tzinfo=utc_timezone) elif bucket_size.startswith('day'): return datetime.datetime(dt.year, dt.month, dt.day, tzinfo=utc_timezone) elif bucket_size.startswith('hour'): return datetime.datetime(dt.year, dt.month, dt.day, dt.hour, tzinfo=utc_timezone) elif bucket_size.startswith('minute'): return datetime.datetime(dt.year, dt.month, dt.day, dt.hour, dt.minute, tzinfo=utc_timezone) elif bucket_size.startswith('second'): return datetime.datetime(dt.year, dt.month, dt.day, dt.hour, dt.minute, dt.second, tzinfo=utc_timezone) else: raise ValueError("Bucket size not supported: {}".format(bucket_size))
[docs]def bucket_duration(bucket_size): try: return BUCKET_SIZES[bucket_size] except KeyError: raise ValueError("Bucket size not supported: {}. Must be one of: {}" "".format(bucket_size, BUCKET_SIZES.keys()))
# Todo: Move to general utils?
[docs]def timestamp_from_datetime(dt): assert dt.tzinfo, "Datetime object does not have tzinfo" try: # Only in Python 3.3+. return dt.timestamp() except AttributeError: return mktime(dt.timetuple()) + (dt.microsecond / 1000000.0)