Source code for examples.shopvertical.common

from __future__ import annotations

from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Any, cast

from examples.aggregate7.immutablemodel import Immutable
from examples.aggregate7.orjsonpydantic import PydanticApplication
from examples.shopvertical.events import DomainEvents

if TYPE_CHECKING:
    from collections.abc import Sequence
    from uuid import UUID


[docs] class Command(Immutable, ABC):
[docs] @abstractmethod def handle(self, events: DomainEvents) -> DomainEvents: pass # pragma: no cover
[docs] @abstractmethod def execute(self) -> int | None: pass # pragma: no cover
[docs] class Query(Immutable, ABC):
[docs] @abstractmethod def execute(self) -> Any: pass # pragma: no cover
class _Globals: app = PydanticApplication()
[docs] def reset_application() -> None: _Globals.app = PydanticApplication()
[docs] def get_events(originator_id: UUID) -> DomainEvents: return cast(DomainEvents, tuple(_Globals.app.events.get(originator_id)))
[docs] def put_events(events: DomainEvents) -> int | None: recordings = _Globals.app.events.put(events) return recordings[-1].notification.id if recordings else None
[docs] def get_all_events(topics: Sequence[str] = ()) -> DomainEvents: return cast( DomainEvents, tuple( map( _Globals.app.mapper.to_domain_event, _Globals.app.recorder.select_notifications( start=None, limit=1000000, topics=topics, ), ) ), )