Event Sourcing¶
pyfly.eventsourcing is a port of org.fireflyframework.eventsourcing.
Aggregates emit DomainEvents; an EventStore persists them; a
repository replays the stream to reconstruct state; a snapshot store
truncates replay cost; a TransactionalOutbox provides at-least-once
delivery to a broker; ProjectionRunner updates read models.
Defining an aggregate¶
from dataclasses import dataclass
from pyfly.eventsourcing import AggregateRoot, DomainEvent
@dataclass
class OrderPlaced(DomainEvent):
order_id: str = ""
amount: int = 0
class Order(AggregateRoot):
def __init__(self) -> None:
super().__init__()
self.amount = 0
self.when(OrderPlaced, lambda agg, evt: setattr(agg, "amount", evt.amount))
AggregateRoot._dispatch routes each event to its registered handler
in the following order:
- A handler registered via
when(EventType, fn). - A method named
on_{event_type}on the aggregate class. - If neither exists,
EventHandlerExceptionis raised — a missing handler would silently corrupt reconstructed state, so the aggregate fails loudly rather than swallowing the event.
The version counter is incremented after each successfully dispatched
event regardless of which of the two handler paths was used.
Saving and loading¶
from pyfly.eventsourcing import (
InMemoryEventStore, InMemorySnapshotStore,
)
from pyfly.eventsourcing.repository import EventSourcedRepository
store = InMemoryEventStore()
snapshots = InMemorySnapshotStore()
repo = EventSourcedRepository(store, factory=Order, snapshots=snapshots)
order = Order()
order.id = "o-1"
order.apply(OrderPlaced(order_id="o-1", amount=99))
await repo.save(order)
# Restart, then:
recovered = await repo.load("o-1")
assert recovered.amount == 99
Envelope validation on load¶
EventSourcedRepository.load validates every replayed envelope:
- If
envelope.aggregate_iddoes not match the requested aggregate ID,EventHandlerExceptionis raised — this indicates a store bug or cross-aggregate data corruption. - If
envelope.aggregate_typeis set and does not match the aggregate's class name,EventHandlerExceptionis raised.
Snapshot interval crossing¶
Snapshots are taken when saving a batch crosses a multiple of
snapshot_interval (default 100), rather than on exact divisibility.
This handles the case where a single batch straddles the threshold:
# batch pushes version from 95 to 105: crosses the 100 boundary → snapshot taken
crossed_interval = (aggregate.version // snapshot_interval) > (previous_version // snapshot_interval)
Outbox pattern¶
from pyfly.eventsourcing import TransactionalOutbox
async def publish(envelope):
await broker.publish(envelope)
outbox = TransactionalOutbox(publish=publish, max_attempts=5)
await outbox.start()
await outbox.enqueue(envelope_for_event)
Projections¶
from pyfly.eventsourcing.projection import FunctionProjection, ProjectionRunner
async def read_model(envelope):
await db.upsert("orders_view", envelope)
runner = ProjectionRunner(FunctionProjection("orders_view", read_model), store)
await runner.start()
Durable event store providers¶
The event store adapter is chosen via pyfly.eventsourcing.store.provider
(default memory).
| Value | Class | Durable | Notes |
|---|---|---|---|
memory |
InMemoryEventStore |
No | Default; no extra deps. |
sqlalchemy |
SqlAlchemyEventStore |
Yes | Requires sqlalchemy[asyncio] + async driver. |
Memory¶
The default; suitable for development and tests. All events are lost on process restart.
SQLAlchemy¶
pyfly:
eventsourcing:
store:
provider: sqlalchemy
url: postgresql+asyncpg://user:pass@host/db # optional
When pyfly.eventsourcing.store.url is not set, the adapter falls back to
pyfly.data.relational.url. If neither is configured, it defaults to
sqlite+aiosqlite:///./app.db.
SqlAlchemyEventStore manages the table pyfly_event_store:
CREATE TABLE IF NOT EXISTS pyfly_event_store (
event_id VARCHAR(64) PRIMARY KEY,
aggregate_id VARCHAR(64) NOT NULL,
aggregate_type VARCHAR(255) NOT NULL,
sequence INTEGER NOT NULL,
event_type VARCHAR(255) NOT NULL,
payload TEXT NOT NULL,
metadata TEXT NOT NULL,
occurred_at TIMESTAMP NOT NULL,
version INTEGER NOT NULL,
tenant_id VARCHAR(64) NULL,
UNIQUE (aggregate_id, sequence)
)
The UNIQUE (aggregate_id, sequence) constraint provides optimistic
locking: concurrent appenders with the same expected version race on the
constraint and the loser receives a ConcurrencyError. The version check
and the inserts happen inside a single database transaction so there is no
TOCTOU race; the unique constraint is an additional backstop against races
between concurrent writers.
Durable snapshot store providers¶
The snapshot store adapter is chosen via pyfly.eventsourcing.snapshot.provider
(default memory).
| Value | Class | Durable | Notes |
|---|---|---|---|
memory |
InMemorySnapshotStore |
No | Default; no extra deps. |
sqlalchemy |
SqlAlchemySnapshotStore |
Yes | Requires sqlalchemy[asyncio] + async driver. |
SQLAlchemy snapshot store¶
pyfly:
eventsourcing:
snapshot:
provider: sqlalchemy
url: postgresql+asyncpg://user:pass@host/db # optional; falls back to pyfly.data.relational.url
SqlAlchemySnapshotStore manages the table pyfly_snapshots:
CREATE TABLE IF NOT EXISTS pyfly_snapshots (
aggregate_id VARCHAR(64) PRIMARY KEY,
aggregate_type VARCHAR(255) NOT NULL,
sequence INTEGER NOT NULL,
payload TEXT NOT NULL,
created_at TIMESTAMP NOT NULL
)
Snapshots are upserted: a new snapshot row replaces the existing one only
when its sequence is newer than what is already stored
(WHERE pyfly_snapshots.sequence < EXCLUDED.sequence). This prevents
older snapshots from overwriting newer ones under concurrent saves.
EDA bridge — EventSourcingPublisher¶
EventSourcingPublisher forwards stored-event envelopes onto the EDA bus.
It is wired automatically when an EventPublisher bean is present in the
application context; when EDA is not configured the bean is absent (returns
None) and is silently skipped.
pyfly.eventsourcing.eda.destination sets the routing destination (topic /
exchange / subject). Each envelope is published with headers carrying
aggregate_id, aggregate_type, sequence, version, and optionally
tenant_id. String-valued entries from StoredEventEnvelope.metadata are
also promoted to headers.
Usage:
from pyfly.eventsourcing.publisher import EventSourcingPublisher
# Inject from the DI container (created automatically when EDA is active):
publisher: EventSourcingPublisher = container.get(EventSourcingPublisher)
await publisher.publish(envelope)
await publisher.publish_all(envelopes)
Auto-configuration¶
EventSourcingAutoConfiguration activates when
pyfly.eventsourcing.enabled=true and wires the following beans:
| Bean | Type | Description |
|---|---|---|
event_store |
EventStore |
Adapter selected by pyfly.eventsourcing.store.provider. |
snapshot_store |
SnapshotStore |
Adapter selected by pyfly.eventsourcing.snapshot.provider. |
event_sourcing_publisher |
EventSourcingPublisher \| None |
EDA bridge; None when no EventPublisher bean is present. |
Configuration reference¶
| Key | Default | Description |
|---|---|---|
pyfly.eventsourcing.enabled |
false |
Enable the event-sourcing module. |
pyfly.eventsourcing.store.provider |
memory |
Event store backend: memory or sqlalchemy. |
pyfly.eventsourcing.store.url |
(none) | Async SQLAlchemy URL for the event store. Falls back to pyfly.data.relational.url, then sqlite+aiosqlite:///./app.db. |
pyfly.eventsourcing.snapshot.provider |
memory |
Snapshot store backend: memory or sqlalchemy. |
pyfly.eventsourcing.snapshot.url |
(none) | Async SQLAlchemy URL for the snapshot store. Falls back to pyfly.data.relational.url, then sqlite+aiosqlite:///./app.db. |
pyfly.eventsourcing.eda.destination |
pyfly.events |
EDA routing destination for EventSourcingPublisher. |
Testing¶
The unit test suite in tests/eventsourcing/test_durable_adapters.py
covers SqlAlchemySnapshotStore against an in-memory SQLite engine,
EventSourcingPublisher with a fake bus, and the provider-selection logic
in EventSourcingAutoConfiguration.
SqlAlchemyEventStore is additionally covered by a real-Postgres
integration test in
tests/integration/test_eventsourcing_postgres_integration.py that runs
against a Testcontainers Postgres instance (gated by @requires_docker):