from __future__ import annotations
import unittest
from typing import ClassVar
from uuid import uuid4
from eventsourcing.postgres import PostgresDatastore
from eventsourcing.projection import ProjectionRunner
from eventsourcing.tests.postgres_utils import drop_postgres_table
from examples.contentmanagement.application import ContentManagement
from examples.contentmanagement.domainmodel import user_id_cvar
from examples.ftsprojection.projection import FtsProjection, PostgresFtsView
[docs]
class TestFtsProjection(unittest.TestCase):
env: ClassVar[dict[str, str]] = {
"CONTENTMANAGEMENT_PERSISTENCE_MODULE": "eventsourcing.postgres",
"CONTENTMANAGEMENT_POSTGRES_DBNAME": "eventsourcing",
"CONTENTMANAGEMENT_POSTGRES_HOST": "127.0.0.1",
"CONTENTMANAGEMENT_POSTGRES_PORT": "5432",
"CONTENTMANAGEMENT_POSTGRES_USER": "eventsourcing",
"CONTENTMANAGEMENT_POSTGRES_PASSWORD": "eventsourcing",
"FTSPROJECTION_PERSISTENCE_MODULE": "eventsourcing.postgres",
"FTSPROJECTION_POSTGRES_DBNAME": "eventsourcing",
"FTSPROJECTION_POSTGRES_HOST": "127.0.0.1",
"FTSPROJECTION_POSTGRES_PORT": "5432",
"FTSPROJECTION_POSTGRES_USER": "eventsourcing",
"FTSPROJECTION_POSTGRES_PASSWORD": "eventsourcing",
}
[docs]
def test(self) -> None:
# Construct an instance of the application ("write model").
write_model = ContentManagement(env=self.env)
# Construct tracking recorder ("read model").
read_model = PostgresFtsView(
datastore=PostgresDatastore(
dbname="eventsourcing",
host="127.0.0.1",
port=5432,
user="eventsourcing",
password="eventsourcing", # noqa: S106
),
tracking_table_name="ftsprojection_tracking",
)
read_model.create_table()
# Create some content in the write model.
user_id = uuid4()
user_id_cvar.set(user_id)
write_model.create_page(title="Animals", slug="animals")
write_model.update_body(slug="animals", body="cat dog zebra")
write_model.create_page(title="Plants", slug="plants")
notification_id = write_model.update_body(
slug="plants", body="bluebell rose jasmine"
)
# Wait for the content to be processed (should time out).
with self.assertRaises(TimeoutError):
read_model.wait(write_model.name, notification_id)
# Search in the read model, expect no results.
self.assertEqual(0, len(read_model.search("dog")))
self.assertEqual(0, len(read_model.search("rose")))
self.assertEqual(0, len(read_model.search("zinc")))
# Run the projection (independently of read and write model objects).
_ = ProjectionRunner(
application_class=ContentManagement,
projection_class=FtsProjection,
view_class=PostgresFtsView,
env=self.env,
)
# Wait for content to be processed (projection catches up).
read_model.wait(write_model.name, notification_id)
# Search in the read model, expect results.
pages = read_model.search("dog")
self.assertEqual(1, len(pages))
self.assertEqual(pages[0].slug, "animals")
self.assertEqual(pages[0].body, "cat dog zebra")
pages = read_model.search("rose")
self.assertEqual(1, len(pages))
self.assertEqual(pages[0].slug, "plants")
self.assertEqual(pages[0].body, "bluebell rose jasmine")
pages = read_model.search("zinc")
self.assertEqual(0, len(pages))
# Search for multiple words in same page.
pages = read_model.search("dog cat")
self.assertEqual(1, len(pages))
self.assertEqual(pages[0].slug, "animals")
self.assertEqual(pages[0].body, "cat dog zebra")
# Search for multiple words in same page, expect no results.
pages = read_model.search("rose zebra")
self.assertEqual(0, len(pages))
# Search for alternative words, expect two results.
pages = read_model.search("rose OR zebra")
self.assertEqual(2, len(pages))
self.assertEqual({"animals", "plants"}, {p.slug for p in pages})
# Create some more content.
write_model.create_page(title="Minerals", slug="minerals")
notification_id = write_model.update_body(
slug="minerals", body="iron zinc calcium"
)
# Wait for content to be processed (projection continues processing).
read_model.wait(write_model.name, notification_id)
# Search for the new content in the read model.
pages = read_model.search("zinc")
self.assertEqual(1, len(pages))
self.assertEqual(pages[0].slug, "minerals")
self.assertEqual(pages[0].body, "iron zinc calcium")
[docs]
def setUp(self) -> None:
super().setUp()
self.drop_tables()
[docs]
def tearDown(self) -> None:
self.drop_tables()
super().tearDown()
[docs]
def drop_tables(self) -> None:
datastore = PostgresDatastore(
"eventsourcing",
"127.0.0.1",
"5432",
"eventsourcing",
"eventsourcing",
)
drop_postgres_table(datastore, "contentmanagement_events")
drop_postgres_table(datastore, "ftsprojection")
drop_postgres_table(datastore, "ftsprojection_tracking")