Source code for eventsourcing.infrastructure.django.models

from django.db import models


[docs]class IntegerSequencedRecord(models.Model): id = models.BigAutoField(primary_key=True) # Sequence ID (e.g. an entity or aggregate ID). sequence_id = models.UUIDField() # Position (index) of item in sequence. position = models.BigIntegerField() # Topic of the item (e.g. path to domain event class). topic = models.TextField() # State of the item (serialized dict, possibly encrypted). state = models.BinaryField() class Meta: unique_together = (("sequence_id", "position"),) db_table = "integer_sequenced_items"
[docs]class TimestampSequencedRecord(models.Model): id = models.BigAutoField(primary_key=True) # Sequence ID (e.g. an entity or aggregate ID). sequence_id = models.UUIDField() # Position (timestamp) of item in sequence. position = models.DecimalField(max_digits=24, decimal_places=6) # Topic of the item (e.g. path to domain event class). topic = models.TextField() # State of the item (serialized dict, possibly encrypted). state = models.BinaryField() class Meta: unique_together = (("sequence_id", "position"),) db_table = "timestamp_sequenced_items" indexes = [models.Index(fields=["position"], name="position_idx")]
[docs]class SnapshotRecord(models.Model): uid = models.BigAutoField(primary_key=True) # Sequence ID (e.g. an entity or aggregate ID). sequence_id = models.UUIDField() # Position (index) of item in sequence. position = models.BigIntegerField() # Topic of the item (e.g. path to domain event class). topic = models.TextField() # State of the item (serialized dict, possibly encrypted). state = models.BinaryField() class Meta: unique_together = (("sequence_id", "position"),) db_table = "snapshots"
[docs]class EntitySnapshotRecord(models.Model): uid = models.BigAutoField(primary_key=True) # Application name. application_name = models.CharField(max_length=32) # Originator ID (e.g. an entity or aggregate ID). originator_id = models.UUIDField() # Originator version of item in sequence. originator_version = models.BigIntegerField() # Topic of the item (e.g. path to domain event class). topic = models.TextField() # State of the item (serialized dict, possibly encrypted). state = models.BinaryField() class Meta: unique_together = (("application_name", "originator_id", "originator_version"),) db_table = "entity_snapshots"
[docs]class StoredEventRecord(models.Model): uid = models.BigAutoField(primary_key=True) # Application name. application_name = models.CharField(max_length=32) # Originator ID (e.g. an entity or aggregate ID). originator_id = models.UUIDField() # Originator version of item in sequence. originator_version = models.BigIntegerField() # Pipeline ID. pipeline_id = models.IntegerField(null=True) # Notification ID. notification_id = models.BigIntegerField(null=True) # Topic of the item (e.g. path to domain event class). topic = models.TextField() # State of the item (serialized dict, possibly encrypted). state = models.BinaryField() # Causal dependencies. causal_dependencies = models.TextField() class Meta: unique_together = ( ("application_name", "originator_id", "originator_version"), ("application_name", "pipeline_id", "notification_id"), ) db_table = "stored_events"
[docs]class NotificationTrackingRecord(models.Model): uid = models.BigAutoField(primary_key=True) # Application name. application_name = models.CharField(max_length=32) # Upstream application name. upstream_application_name = models.CharField(max_length=32) # Pipeline ID. pipeline_id = models.IntegerField() # Notification ID. notification_id = models.BigIntegerField() class Meta: unique_together = ( ( "application_name", "upstream_application_name", "pipeline_id", "notification_id", ), ) db_table = "notification_tracking"