import datetime
from decimal import Decimal
from typing import Any, Optional, Union
from uuid import UUID, uuid5
from dateutil.relativedelta import relativedelta
from eventsourcing.domain.model.entity import TimestampedVersionedEntity
from eventsourcing.domain.model.events import (
EventWithOriginatorID,
EventWithTimestamp,
LoggedEvent,
publish,
)
from eventsourcing.domain.model.repository import AbstractEntityRepository
from eventsourcing.exceptions import RepositoryKeyError
from eventsourcing.utils.times import (
datetime_from_timestamp,
decimaltimestamp,
utc_timezone,
)
from eventsourcing.utils.topic import get_topic
Namespace_Timebuckets = UUID("0d7ee297-a976-4c29-91ff-84ffc79d8155")
ONE_YEAR = relativedelta(years=1)
ONE_MONTH = relativedelta(months=1)
ONE_DAY = relativedelta(days=1)
ONE_HOUR = relativedelta(hours=1)
ONE_MINUTE = relativedelta(minutes=1)
ONE_SECOND = relativedelta(seconds=1)
BUCKET_SIZES = {
"year": ONE_YEAR,
"month": ONE_MONTH,
"day": ONE_DAY,
"hour": ONE_HOUR,
"minute": ONE_MINUTE,
"second": ONE_SECOND,
}
[docs]class Timebucketedlog(TimestampedVersionedEntity):
[docs] class Event(TimestampedVersionedEntity.Event):
"""Supertype for events of time-bucketed log."""
[docs] class Started(TimestampedVersionedEntity.Created, Event):
pass
[docs] class BucketSizeChanged(Event, TimestampedVersionedEntity.AttributeChanged):
pass
[docs] def __init__(self, name: UUID, bucket_size: Optional[str] = None, **kwargs: Any):
super(Timebucketedlog, self).__init__(**kwargs)
self._name = name
self._bucket_size = bucket_size
@property
def name(self) -> UUID:
return self._name
@property
def started_on(self) -> Decimal:
return self.__created_on__
@property
def bucket_size(self) -> str:
assert self._bucket_size
return self._bucket_size
def log_message(self, message: str) -> "MessageLogged":
assert isinstance(message, str)
bucket_id = make_timebucket_id(self.name, decimaltimestamp(), self.bucket_size)
event = MessageLogged(originator_id=bucket_id, message=message)
publish([event])
return event
[docs]class TimebucketedlogRepository(AbstractEntityRepository):
[docs] def get_or_create(self, log_name: UUID, bucket_size: str) -> Timebucketedlog:
"""
Gets or creates a log.
"""
try:
return self[log_name]
except RepositoryKeyError:
return start_new_timebucketedlog(log_name, bucket_size=bucket_size)
def start_new_timebucketedlog(
name: UUID, bucket_size: Optional[str] = None
) -> Timebucketedlog:
if bucket_size is None:
bucket_size = "year"
if bucket_size not in BUCKET_SIZES:
raise ValueError(
"Bucket size '{}' not supported, must be one of: {}"
"".format(bucket_size, BUCKET_SIZES.keys())
)
event = Timebucketedlog.Started(
originator_id=name,
name=name,
bucket_size=bucket_size,
originator_topic=get_topic(Timebucketedlog),
)
entity = event.__mutate__(None)
assert entity is not None
publish([event])
return entity
[docs]class MessageLogged(EventWithTimestamp, EventWithOriginatorID, LoggedEvent):
[docs] def __init__(self, message: str, originator_id: UUID):
super(MessageLogged, self).__init__(
originator_id=originator_id, message=message
)
@property
def message(self) -> str:
return self.__dict__["message"]
def make_timebucket_id(
log_id: UUID, timestamp: Union[Decimal, float], bucket_size: str
) -> UUID:
d = datetime_from_timestamp(timestamp)
assert isinstance(d, datetime.datetime)
if bucket_size.startswith("year"):
boundary = "{:04}".format(d.year)
elif bucket_size.startswith("month"):
boundary = "{:04}-{:02}".format(d.year, d.month)
elif bucket_size.startswith("day"):
boundary = "{:04}-{:02}-{:02}".format(d.year, d.month, d.day)
elif bucket_size.startswith("hour"):
boundary = "{:04}-{:02}-{:02}_{:02}".format(d.year, d.month, d.day, d.hour)
elif bucket_size.startswith("minute"):
boundary = "{:04}-{:02}-{:02}_{:02}-{:02}".format(
d.year, d.month, d.day, d.hour, d.minute
)
elif bucket_size.startswith("second"):
boundary = "{:04}-{:02}-{:02}_{:02}-{:02}-{:02}".format(
d.year, d.month, d.day, d.hour, d.minute, d.second
)
else:
raise ValueError("Bucket size not supported: {}".format(bucket_size))
return uuid5(Namespace_Timebuckets, log_id.hex + "_" + boundary)
def next_bucket_starts(timestamp: float, bucket_size: str) -> float:
starts = bucket_starts(timestamp, bucket_size)
duration = bucket_duration(bucket_size)
return timestamp_from_datetime((starts + duration))
def previous_bucket_starts(timestamp: float, bucket_size: str) -> float:
starts = bucket_starts(timestamp, bucket_size)
duration = bucket_duration(bucket_size)
return timestamp_from_datetime((starts - duration))
def bucket_starts(timestamp: float, bucket_size: str) -> datetime.datetime:
dt = datetime_from_timestamp(timestamp)
assert isinstance(dt, datetime.datetime)
if bucket_size.startswith("year"):
return datetime.datetime(dt.year, 1, 1, tzinfo=utc_timezone)
elif bucket_size.startswith("month"):
return datetime.datetime(dt.year, dt.month, 1, tzinfo=utc_timezone)
elif bucket_size.startswith("day"):
return datetime.datetime(dt.year, dt.month, dt.day, tzinfo=utc_timezone)
elif bucket_size.startswith("hour"):
return datetime.datetime(
dt.year, dt.month, dt.day, dt.hour, tzinfo=utc_timezone
)
elif bucket_size.startswith("minute"):
return datetime.datetime(
dt.year, dt.month, dt.day, dt.hour, dt.minute, tzinfo=utc_timezone
)
elif bucket_size.startswith("second"):
return datetime.datetime(
dt.year,
dt.month,
dt.day,
dt.hour,
dt.minute,
dt.second,
tzinfo=utc_timezone,
)
else:
raise ValueError("Bucket size not supported: {}".format(bucket_size))
def bucket_duration(bucket_size: str) -> relativedelta:
try:
return BUCKET_SIZES[bucket_size]
except KeyError:
raise ValueError(
"Bucket size not supported: {}. Must be one of: {}"
"".format(bucket_size, BUCKET_SIZES.keys())
)
# Todo: Move to general utils?
def timestamp_from_datetime(dt: datetime.datetime) -> float:
assert dt.tzinfo, "Datetime object does not have tzinfo"
return dt.timestamp()