Source code for eventsourcing.infrastructure.sqlalchemy.records

from sqlalchemy import DECIMAL, LargeBinary, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.sql.schema import Column, Index
from sqlalchemy.sql.sqltypes import BigInteger, Integer, Text
from sqlalchemy_utils.types.uuid import UUIDType

Base = declarative_base()


# Please note, the record classes without an indexed ID ('WithID') are
# more or less equivalent to the similarly named Django classes. The
# record classes without ID ('NoID') are more or less equivalent to the
# Cassandra classes, in that they don't have a record ID, so must be
# accessed by sequence ID, optionally with position, and also their
# application sequence must be constructed elsewhere such as with a big
# array. Without record IDs, the maximum rate at which events can be
# written in parallel will be greater, because the IDs don't need to be
# generated and because there isn't a record ID index to be updated.
# Also, if there is no record ID index, the table can be sharded
# with the sequence ID being used to select a shard. The drawback is
# that the application sequence will need to be constructed elsewhere.


[docs]class IntegerSequencedWithIDRecord(Base): __tablename__ = "integer_sequenced_items" # Record ID. id = Column( BigInteger().with_variant(Integer, "sqlite"), primary_key=True, index=True, unique=True, ) # Sequence ID (e.g. an entity or aggregate ID). sequence_id = Column(UUIDType(), nullable=False) # Position (index) of item in sequence. position = Column(BigInteger().with_variant(Integer, "sqlite"), nullable=False) # Topic of the item (e.g. path to domain event class). topic = Column(Text(), nullable=False) # State of the item (serialized dict, possibly encrypted). state = Column(LargeBinary()) __table_args__ = ( Index( "integer_sequenced_items_sequence_id_position_index", "sequence_id", "position", unique=True, ), )
[docs]class IntegerSequencedNoIDRecord(Base): __tablename__ = "integer_sequenced_items_noid" # Sequence ID (e.g. an entity or aggregate ID). sequence_id = Column(UUIDType(), primary_key=True) # Position (index) of item in sequence. position = Column(BigInteger().with_variant(Integer, "sqlite"), primary_key=True) # Topic of the item (e.g. path to domain event class). topic = Column(Text(), nullable=False) # State of the item (serialized dict, possibly encrypted). state = Column(LargeBinary())
IntegerSequencedRecord = IntegerSequencedWithIDRecord
[docs]class TimestampSequencedWithIDRecord(Base): __tablename__ = "timestamp_sequenced_items" # Record ID. id = Column( BigInteger().with_variant(Integer, "sqlite"), primary_key=True, index=True, unique=True, autoincrement=True, ) # Sequence ID (e.g. an entity or aggregate ID). sequence_id = Column(UUIDType(), nullable=False) # Position (timestamp) of item in sequence. position = Column(DECIMAL(24, 6, 6), nullable=False, unique=False) # Topic of the item (e.g. path to domain event class). topic = Column(Text(), nullable=False) # State of the item (serialized dict, possibly encrypted). state = Column(LargeBinary()) __table_args__ = ( Index( "timestamp_sequenced_items_sequence_id_position_index", "sequence_id", "position", unique=True, ), Index("timestamp_sequenced_items_position_index", "position", unique=False), )
[docs]class TimestampSequencedNoIDRecord(Base): __tablename__ = "timestamp_sequenced_items_noid" # Sequence ID (e.g. an entity or aggregate ID). sequence_id = Column(UUIDType(), primary_key=True) # Position (timestamp) of item in sequence. position = Column(DECIMAL(24, 6, 6), primary_key=True) # Topic of the item (e.g. path to domain event class). topic = Column(Text(), nullable=False) # State of the item (serialized dict, possibly encrypted). state = Column(LargeBinary()) __table_args__ = ( Index( "timestamp_sequenced_items_noid_position_index", "position", unique=False ), )
TimestampSequencedRecord = TimestampSequencedNoIDRecord
[docs]class SnapshotRecord(Base): __tablename__ = "snapshots" # Sequence ID (e.g. an entity or aggregate ID). sequence_id = Column(UUIDType(), primary_key=True) # Position (index) of item in sequence. position = Column(BigInteger().with_variant(Integer, "sqlite"), primary_key=True) # Topic of the item (e.g. path to domain entity class). topic = Column(Text(), nullable=False) # State of the item (serialized dict, possibly encrypted). state = Column(LargeBinary())
[docs]class EntitySnapshotRecord(Base): __tablename__ = "entity_snapshots" # Application ID. application_name = Column(String(length=32), primary_key=True) # Originator ID (e.g. an entity or aggregate ID). originator_id = Column(UUIDType(), primary_key=True) # Originator version of item in sequence. originator_version = Column( BigInteger().with_variant(Integer, "sqlite"), primary_key=True ) # Topic of the item (e.g. path to domain entity class). topic = Column(Text(), nullable=False) # State of the item (serialized dict, possibly encrypted). state = Column(LargeBinary())
[docs]class StoredEventRecord(Base): __tablename__ = "stored_events" # Application ID. application_name = Column(String(length=32), primary_key=True) # Originator ID (e.g. an entity or aggregate ID). originator_id = Column(UUIDType(), primary_key=True) # Originator version of item in sequence. originator_version = Column( BigInteger().with_variant(Integer, "sqlite"), primary_key=True ) # Pipeline ID. pipeline_id = Column(Integer(), nullable=True) # Notification ID. notification_id = Column( BigInteger().with_variant(Integer, "sqlite"), nullable=True ) # Topic of the item (e.g. path to domain event class). topic = Column(Text(), nullable=False) # State of the item (serialized dict, possibly encrypted). state = Column(LargeBinary()) # Causal dependencies. causal_dependencies = Column(Text()) __table_args__ = ( Index( "stored_events_notification_index", "application_name", "pipeline_id", "notification_id", unique=True, ), )
[docs]class NotificationTrackingRecord(Base): __tablename__ = "notification_tracking" # Application name. application_name = Column(String(length=32), primary_key=True) # Upstream application name. upstream_application_name = Column(String(length=32), primary_key=True) # Pipeline ID. pipeline_id = Column(Integer(), primary_key=True) # Notification ID. notification_id = Column( BigInteger().with_variant(Integer, "sqlite"), primary_key=True )