Event Store
The Event Store is the persistence backbone of the event sourcing architecture. Unlike traditional CRUD databases that overwrite state and discard history, the event store acts as an immutable, append-only ledger of every domain event. Every state change — an OrderCreated, an ItemAdded, a PaymentProcessed — is recorded and can never be modified or deleted.
Event Sourcing
In traditional architectures, databases only store the current state, and historical change records are often lost. In event sourcing architecture:
- Complete History: Every state change is permanently stored as an event
- Traceability: State at any point in time can be reconstructed by replaying events
- Audit-Friendly: Naturally supports operation auditing and data analysis
- Decoupled Consumers: Projections, sagas, and external systems independently subscribe to the same event stream
Core Interface
The EventStore interface defines the core operations for event storage:
interface EventStore {
fun append(eventStream: DomainEventStream): Mono<Void>
fun load(
aggregateId: AggregateId,
headVersion: Int = 1,
tailVersion: Int = Int.MAX_VALUE - 1
): Flux<DomainEventStream>
fun load(
aggregateId: AggregateId,
headEventTime: Long,
tailEventTime: Long
): Flux<DomainEventStream>
}Domain Event Stream
DomainEventStream represents a collection of domain events produced by a single command:
interface DomainEventStream : EventMessage<DomainEventStream, List<DomainEvent<*>>> {
val aggregateId: AggregateId
val size: Int
}Key characteristics:
- One-to-One: One command produces one event stream
- Atomicity: All events in a stream are persisted as a single unit
- Immutability: Events cannot be modified once created
Key Concepts
| Concept | Description | Source |
|---|---|---|
DomainEvent | Immutable fact about a past business action within an aggregate | DomainEvent.kt:52-95 |
DomainEventStream | Ordered batch of domain events produced by a single command | DomainEventStream.kt:51-125 |
EventStore | Core interface for appending and loading event streams | EventStore.kt:27-98 |
SnapshotRepository | Optimizes aggregate loading with versioned state checkpoints | SnapshotRepository.kt:27-58 |
Aggregate State Reconstruction
The framework does not store current aggregate state in a traditional database. Instead, every aggregate's state is a function of its event history.
The EventSourcingStateAggregateRepository implements this reconstruction:
- Snapshot-first loading: When requesting the latest version, the repository first loads from the snapshot repository. If a snapshot exists, it serves as the starting point for incremental replay.
- Fresh aggregate creation: If no snapshot exists, a new aggregate instance is created via the
StateAggregateFactory. - Event application: Events are replayed in version order, each calling
stateAggregate.onSourcing(it)to mutate the in-memory state.
Event Sourcing Lifecycle
The following diagram illustrates the complete lifecycle from command receipt through event persistence, bus publication, and downstream processing:
Architecture
The framework defines a clean interface hierarchy with multiple persistence backends. Every implementation extends AbstractEventStore which provides centralized logging, input validation, and error mapping.
The AbstractEventStore applies the template method pattern to centralize cross-cutting concerns:
append()(public, concrete): Logs the operation, delegates toappendStream(), and upgrades version-conflict exceptions.load()(public, concrete): Validates version/time ranges, then delegates toloadStream().appendStream()/loadStream()(protected, abstract): Each backend implements storage-specific logic.
Exception Handling
The event store defines a hierarchy of typed exceptions:
| Exception Type | Description | Behavior |
|---|---|---|
EventVersionConflictException | Version conflict from concurrent writes | Implements RecoverableException — safe to retry |
DuplicateAggregateIdException | Attempt to create an already-existing aggregate | Fatal — indicates ID collision |
DuplicateRequestIdException | Same command was already processed | Idempotent — success case, not an error |
Implementation Comparison
| Feature | MongoDB | Redis | R2DBC | In-Memory |
|---|---|---|---|---|
| Persistence | Durable (disk) | Configurable | Durable (SQL) | Volatile (memory) |
| Version range query | Yes | Yes (ZRANGEBYSCORE) | Yes (SQL BETWEEN) | Yes (in-memory) |
| Time range query | Yes | No | Yes (SQL BETWEEN) | Yes (in-memory) |
| Concurrency control | Unique compound index | Lua script (atomic) | Unique SQL index | Synchronized map |
| Sharding support | Sharded collections | Redis cluster | ShardingEventStreamSchema | N/A |
| Production readiness | High | Medium | High | Dev/Test only |
| Key class | MongoEventStore.kt | RedisEventStore.kt | R2dbcEventStore.kt | InMemoryEventStore.kt |
Storage Schema Per Implementation
MongoDB uses per-aggregate-type collections. The collection name is derived from the aggregate's context name and aggregate name (e.g., order_event_stream). Documents are indexed with a unique compound index on (aggregate_id, version) and another on (aggregate_id, request_id) (EventStreamSchemaInitializer.kt:51-69).
Redis stores event streams in a sorted set keyed by aggregate ID. Each member is a JSON-serialized DomainEventStream, scored by version number. Append operations use a Lua script for atomicity — checking version conflicts and duplicate request IDs in a single transaction (RedisEventStore.kt:44-65). Time-range loading is not supported.
R2DBC uses a relational table per aggregate type (<aggregateName>_event_stream). Unique indexes on (aggregate_id, version) and request_id enforce the same invariants. The ShardingEventStreamSchema variant supports table sharding for horizontally scaled deployments (EventStreamSchema.kt:47-53).
Configuration
wow:
eventsourcing:
store:
storage: mongo # Event store type (mongo, r2dbc, redis, in_memory)
snapshot:
enabled: true
strategy: version_offset # all, version_offset
version-offset: 10
storage: mongo| Property | Type | Default | Description |
|---|---|---|---|
wow.eventsourcing.store.storage | StorageType | mongo | Event store backend |
wow.eventsourcing.snapshot.enabled | Boolean | true | Enable snapshot mechanism |
wow.eventsourcing.snapshot.strategy | Strategy | all | Snapshot strategy (all, version_offset) |
wow.eventsourcing.snapshot.version-offset | Int | 5 | Version gap threshold |
wow.eventsourcing.snapshot.storage | StorageType | mongo | Snapshot storage backend |
Best Practices
Choose the right backend: MongoDB and R2DBC are recommended for production. MongoDB for schema flexibility and horizontal scaling. R2DBC if your organization operates relational databases. Redis for high-throughput, lower-data-volume scenarios.
Enable snapshots for long-lived aggregates: Set
strategytoversion_offsetwith offset 5-20 to avoid linear degradation for aggregates with many events.Monitor version conflicts: Occasional
EventVersionConflictExceptions are normal. High frequency indicates contention — consider redesigning aggregate boundaries.Leverage request idempotency: The
requestIdfield guarantees that retrying a command does not produce duplicate events — essential for at-least-once delivery.Keep events immutable and declarative: Events should represent simple facts rather than conditional logic. The aggregate's sourcing function simply overlays events onto state.
Use In-Memory for testing only:
InMemoryEventStoreis thread-safe but volatile. Do not deploy to production.
Related Topics
- Snapshot — Optimize aggregate loading with snapshots
- Command Gateway — How commands are routed to aggregates
- Saga — Distributed transactions across aggregates
- Projection — How projections consume event streams
- Business Intelligence — Leverage event streams for data analysis