Source code for eventsourcing.domain.model.timebucketedlog

import datetime
from decimal import Decimal
from typing import Any, Optional, Union
from uuid import UUID, uuid5

from dateutil.relativedelta import relativedelta

from eventsourcing.domain.model.entity import TimestampedVersionedEntity
from eventsourcing.domain.model.events import (
    EventWithOriginatorID,
    EventWithTimestamp,
    LoggedEvent,
    publish,
)
from eventsourcing.domain.model.repository import AbstractEntityRepository
from eventsourcing.exceptions import RepositoryKeyError
from eventsourcing.utils.times import (
    datetime_from_timestamp,
    decimaltimestamp,
    utc_timezone,
)
from eventsourcing.utils.topic import get_topic

Namespace_Timebuckets = UUID("0d7ee297-a976-4c29-91ff-84ffc79d8155")

ONE_YEAR = relativedelta(years=1)
ONE_MONTH = relativedelta(months=1)
ONE_DAY = relativedelta(days=1)
ONE_HOUR = relativedelta(hours=1)
ONE_MINUTE = relativedelta(minutes=1)
ONE_SECOND = relativedelta(seconds=1)

BUCKET_SIZES = {
    "year": ONE_YEAR,
    "month": ONE_MONTH,
    "day": ONE_DAY,
    "hour": ONE_HOUR,
    "minute": ONE_MINUTE,
    "second": ONE_SECOND,
}


[docs]class Timebucketedlog(TimestampedVersionedEntity):
[docs] class Event(TimestampedVersionedEntity.Event): """Supertype for events of time-bucketed log."""
[docs] class Started(TimestampedVersionedEntity.Created, Event): pass
[docs] class BucketSizeChanged(Event, TimestampedVersionedEntity.AttributeChanged): pass
[docs] def __init__(self, name: UUID, bucket_size: Optional[str] = None, **kwargs: Any): super(Timebucketedlog, self).__init__(**kwargs) self._name = name self._bucket_size = bucket_size
@property def name(self) -> UUID: return self._name @property def started_on(self) -> Decimal: return self.__created_on__ @property def bucket_size(self) -> str: assert self._bucket_size return self._bucket_size def log_message(self, message: str) -> "MessageLogged": assert isinstance(message, str) bucket_id = make_timebucket_id(self.name, decimaltimestamp(), self.bucket_size) event = MessageLogged(originator_id=bucket_id, message=message) publish([event]) return event
[docs]class TimebucketedlogRepository(AbstractEntityRepository):
[docs] def get_or_create(self, log_name: UUID, bucket_size: str) -> Timebucketedlog: """ Gets or creates a log. """ try: return self[log_name] except RepositoryKeyError: return start_new_timebucketedlog(log_name, bucket_size=bucket_size)
def start_new_timebucketedlog( name: UUID, bucket_size: Optional[str] = None ) -> Timebucketedlog: if bucket_size is None: bucket_size = "year" if bucket_size not in BUCKET_SIZES: raise ValueError( "Bucket size '{}' not supported, must be one of: {}" "".format(bucket_size, BUCKET_SIZES.keys()) ) event = Timebucketedlog.Started( originator_id=name, name=name, bucket_size=bucket_size, originator_topic=get_topic(Timebucketedlog), ) entity = event.__mutate__(None) assert entity is not None publish([event]) return entity
[docs]class MessageLogged(EventWithTimestamp, EventWithOriginatorID, LoggedEvent):
[docs] def __init__(self, message: str, originator_id: UUID): super(MessageLogged, self).__init__( originator_id=originator_id, message=message )
@property def message(self) -> str: return self.__dict__["message"]
def make_timebucket_id( log_id: UUID, timestamp: Union[Decimal, float], bucket_size: str ) -> UUID: d = datetime_from_timestamp(timestamp) assert isinstance(d, datetime.datetime) if bucket_size.startswith("year"): boundary = "{:04}".format(d.year) elif bucket_size.startswith("month"): boundary = "{:04}-{:02}".format(d.year, d.month) elif bucket_size.startswith("day"): boundary = "{:04}-{:02}-{:02}".format(d.year, d.month, d.day) elif bucket_size.startswith("hour"): boundary = "{:04}-{:02}-{:02}_{:02}".format(d.year, d.month, d.day, d.hour) elif bucket_size.startswith("minute"): boundary = "{:04}-{:02}-{:02}_{:02}-{:02}".format( d.year, d.month, d.day, d.hour, d.minute ) elif bucket_size.startswith("second"): boundary = "{:04}-{:02}-{:02}_{:02}-{:02}-{:02}".format( d.year, d.month, d.day, d.hour, d.minute, d.second ) else: raise ValueError("Bucket size not supported: {}".format(bucket_size)) return uuid5(Namespace_Timebuckets, log_id.hex + "_" + boundary) def next_bucket_starts(timestamp: float, bucket_size: str) -> float: starts = bucket_starts(timestamp, bucket_size) duration = bucket_duration(bucket_size) return timestamp_from_datetime((starts + duration)) def previous_bucket_starts(timestamp: float, bucket_size: str) -> float: starts = bucket_starts(timestamp, bucket_size) duration = bucket_duration(bucket_size) return timestamp_from_datetime((starts - duration)) def bucket_starts(timestamp: float, bucket_size: str) -> datetime.datetime: dt = datetime_from_timestamp(timestamp) assert isinstance(dt, datetime.datetime) if bucket_size.startswith("year"): return datetime.datetime(dt.year, 1, 1, tzinfo=utc_timezone) elif bucket_size.startswith("month"): return datetime.datetime(dt.year, dt.month, 1, tzinfo=utc_timezone) elif bucket_size.startswith("day"): return datetime.datetime(dt.year, dt.month, dt.day, tzinfo=utc_timezone) elif bucket_size.startswith("hour"): return datetime.datetime( dt.year, dt.month, dt.day, dt.hour, tzinfo=utc_timezone ) elif bucket_size.startswith("minute"): return datetime.datetime( dt.year, dt.month, dt.day, dt.hour, dt.minute, tzinfo=utc_timezone ) elif bucket_size.startswith("second"): return datetime.datetime( dt.year, dt.month, dt.day, dt.hour, dt.minute, dt.second, tzinfo=utc_timezone, ) else: raise ValueError("Bucket size not supported: {}".format(bucket_size)) def bucket_duration(bucket_size: str) -> relativedelta: try: return BUCKET_SIZES[bucket_size] except KeyError: raise ValueError( "Bucket size not supported: {}. Must be one of: {}" "".format(bucket_size, BUCKET_SIZES.keys()) ) # Todo: Move to general utils? def timestamp_from_datetime(dt: datetime.datetime) -> float: assert dt.tzinfo, "Datetime object does not have tzinfo" return dt.timestamp()