Source code for eventsourcing.domain.model.timebucketedlog

import datetime
from uuid import UUID, uuid5

from dateutil.relativedelta import relativedelta

from eventsourcing.domain.model.entity import AbstractEntityRepository, TimestampedVersionedEntity
from eventsourcing.domain.model.events import EventWithOriginatorID, EventWithTimestamp, Logged, publish
from eventsourcing.exceptions import RepositoryKeyError
from eventsourcing.utils.times import datetime_from_timestamp, decimaltimestamp, utc_timezone
from eventsourcing.utils.topic import get_topic

Namespace_Timebuckets = UUID('0d7ee297-a976-4c29-91ff-84ffc79d8155')

ONE_YEAR = relativedelta(years=1)
ONE_MONTH = relativedelta(months=1)
ONE_DAY = relativedelta(days=1)
ONE_HOUR = relativedelta(hours=1)
ONE_MINUTE = relativedelta(minutes=1)
ONE_SECOND = relativedelta(seconds=1)

BUCKET_SIZES = {
    'year': ONE_YEAR,
    'month': ONE_MONTH,
    'day': ONE_DAY,
    'hour': ONE_HOUR,
    'minute': ONE_MINUTE,
    'second': ONE_SECOND,
}


[docs]class Timebucketedlog(TimestampedVersionedEntity):
[docs] class Event(TimestampedVersionedEntity.Event): """Supertype for events of time-bucketed log."""
[docs] class Started(TimestampedVersionedEntity.Created, Event): pass
[docs] class BucketSizeChanged(Event, TimestampedVersionedEntity.AttributeChanged): pass
[docs] def __init__(self, name, bucket_size=None, **kwargs): super(Timebucketedlog, self).__init__(**kwargs) self._name = name self._bucket_size = bucket_size
@property def name(self): return self._name @property def started_on(self): return self.__created_on__ @property def bucket_size(self): return self._bucket_size
[docs] def log_message(self, message): assert isinstance(message, str) bucket_id = make_timebucket_id(self.name, decimaltimestamp(), self.bucket_size) event = MessageLogged( originator_id=bucket_id, message=message, ) publish(event) return event
[docs]class TimebucketedlogRepository(AbstractEntityRepository):
[docs] def get_or_create(self, log_name, bucket_size): """ Gets or creates a log. :rtype: Timebucketedlog """ try: return self[log_name] except RepositoryKeyError: return start_new_timebucketedlog(log_name, bucket_size=bucket_size)
[docs]def start_new_timebucketedlog(name, bucket_size=None): if bucket_size is None: bucket_size = 'year' if bucket_size not in BUCKET_SIZES: raise ValueError("Bucket size '{}' not supported, must be one of: {}" "".format(bucket_size, BUCKET_SIZES.keys())) event = Timebucketedlog.Started( originator_id=name, name=name, bucket_size=bucket_size, originator_topic=get_topic(Timebucketedlog) ) entity = event.__mutate__() publish(event) return entity
[docs]class MessageLogged(EventWithTimestamp, EventWithOriginatorID, Logged):
[docs] def __init__(self, message, originator_id): super(MessageLogged, self).__init__(originator_id=originator_id, message=message)
@property def message(self): return self.__dict__['message']
[docs]def make_timebucket_id(log_id, timestamp, bucket_size): d = datetime_from_timestamp(timestamp) assert isinstance(d, datetime.datetime) if bucket_size.startswith('year'): boundary = '{:04}'.format( d.year ) elif bucket_size.startswith('month'): boundary = '{:04}-{:02}'.format( d.year, d.month ) elif bucket_size.startswith('day'): boundary = '{:04}-{:02}-{:02}'.format( d.year, d.month, d.day ) elif bucket_size.startswith('hour'): boundary = '{:04}-{:02}-{:02}_{:02}'.format( d.year, d.month, d.day, d.hour ) elif bucket_size.startswith('minute'): boundary = '{:04}-{:02}-{:02}_{:02}-{:02}'.format( d.year, d.month, d.day, d.hour, d.minute ) elif bucket_size.startswith('second'): boundary = '{:04}-{:02}-{:02}_{:02}-{:02}-{:02}'.format( d.year, d.month, d.day, d.hour, d.minute, d.second ) else: raise ValueError("Bucket size not supported: {}".format(bucket_size)) return uuid5(Namespace_Timebuckets, log_id.hex + '_' + boundary)
[docs]def next_bucket_starts(timestamp, bucket_size): starts = bucket_starts(timestamp, bucket_size) duration = bucket_duration(bucket_size) return timestamp_from_datetime((starts + duration))
[docs]def previous_bucket_starts(timestamp, bucket_size): starts = bucket_starts(timestamp, bucket_size) duration = bucket_duration(bucket_size) return timestamp_from_datetime((starts - duration))
[docs]def bucket_starts(timestamp, bucket_size): dt = datetime_from_timestamp(timestamp) assert isinstance(dt, datetime.datetime) if bucket_size.startswith('year'): return datetime.datetime(dt.year, 1, 1, tzinfo=utc_timezone) elif bucket_size.startswith('month'): return datetime.datetime(dt.year, dt.month, 1, tzinfo=utc_timezone) elif bucket_size.startswith('day'): return datetime.datetime(dt.year, dt.month, dt.day, tzinfo=utc_timezone) elif bucket_size.startswith('hour'): return datetime.datetime(dt.year, dt.month, dt.day, dt.hour, tzinfo=utc_timezone) elif bucket_size.startswith('minute'): return datetime.datetime(dt.year, dt.month, dt.day, dt.hour, dt.minute, tzinfo=utc_timezone) elif bucket_size.startswith('second'): return datetime.datetime(dt.year, dt.month, dt.day, dt.hour, dt.minute, dt.second, tzinfo=utc_timezone) else: raise ValueError("Bucket size not supported: {}".format(bucket_size))
[docs]def bucket_duration(bucket_size): try: return BUCKET_SIZES[bucket_size] except KeyError: raise ValueError("Bucket size not supported: {}. Must be one of: {}" "".format(bucket_size, BUCKET_SIZES.keys()))
# Todo: Move to general utils?
[docs]def timestamp_from_datetime(dt): assert dt.tzinfo, "Datetime object does not have tzinfo" return dt.timestamp()