Data Flow
This page traces the complete lifecycle of data as it flows through the Wow framework, from the moment a command arrives at the gateway to the point where projections, sagas, and snapshots have been updated.
High-Level Pipeline
Phase 1: Command Arrival and Gateway Processing
The journey begins when a client sends a command through the CommandGateway. This can happen via a WebFlux endpoint or by calling the gateway directly.
Validation
The DefaultCommandGateway performs two levels of validation:
Self-validation: If the command body implements
CommandValidator, itsvalidate()method is called first. [wow-core/src/main/kotlin/me/ahoo/wow/command/DefaultCommandGateway.kt:62]Bean validation: The Jakarta
Validatorchecks all constraint annotations (@NotNull,@Size, etc.). [wow-core/src/main/kotlin/me/ahoo/wow/command/DefaultCommandGateway.kt:66]
Idempotency Check
Before sending, the gateway checks whether the command's requestId has already been processed for this aggregate. The AggregateIdempotencyCheckerProvider provides per-aggregate checkers. If a duplicate is detected, DuplicateRequestIdException is thrown. [wow-core/src/main/kotlin/me/ahoo/wow/command/DefaultCommandGateway.kt:77]
Wait Strategy Registration
If a wait strategy is provided, the gateway:
- Propagates the wait endpoint into the command message header
- Registers the strategy with
WaitStrategyRegistrarfor signal routing - Sets up cleanup on completion (success, error, or cancel)
[wow-core/src/main/kotlin/me/ahoo/wow/command/DefaultCommandGateway.kt:217]
Phase 2: Command Dispatching
The command bus routes the command to the appropriate AggregateProcessor. The CommandDispatcher subscribes to the command bus and creates per-aggregate dispatchers:
CommandDispatcher
The CommandDispatcher subscribes to the CommandBus for all locally registered aggregates. It creates an AggregateCommandDispatcher per aggregate type, ensuring that commands for the same aggregate ID are processed sequentially through the AggregateScheduler. [wow-core/src/main/kotlin/me/ahoo/wow/modeling/command/dispatcher/CommandDispatcher.kt:34]
Filter Chain
The command handler uses a filter chain pattern. Two key filters in the chain:
- AggregateProcessorFilter — invokes the
AggregateProcessor.process()method - SendDomainEventStreamFilter — publishes the resulting
DomainEventStreamto theDomainEventBus
[wow-core/src/main/kotlin/me/ahoo/wow/modeling/command/dispatcher/SendDomainEventStreamFilter.kt:26]
Phase 3: Aggregate Processing
This is the core of the write side. The CommandAggregate processes the command and produces domain events.
Pre-Processing Checks
The SimpleCommandAggregate performs several validation checks before executing the command function:
Version conflict check — If the command carries an expected
aggregateVersion, it must match the current state version. [wow-core/src/main/kotlin/me/ahoo/wow/modeling/command/SimpleCommandAggregate.kt:92]Initialization check — Non-create commands are rejected if the aggregate has not been initialized. [wow-core/src/main/kotlin/me/ahoo/wow/modeling/command/SimpleCommandAggregate.kt:99]
Ownership check — If the command specifies an
ownerId, it must match the aggregate's owner. [wow-core/src/main/kotlin/me/ahoo/wow/modeling/command/SimpleCommandAggregate.kt:102]Deletion check — Commands (except
RecoverAggregate) are rejected if the aggregate is in a deleted state. [wow-core/src/main/kotlin/me/ahoo/wow/modeling/command/SimpleCommandAggregate.kt:111]
CommandState Machine
The CommandState enum manages the processing lifecycle:
[wow-core/src/main/kotlin/me/ahoo/wow/modeling/command/CommandAggregate.kt:65]
Event Sourcing on State
After the command function produces a DomainEventStream, the events are applied to the StateAggregate via onSourcing(). This updates the in-memory state before the events are persisted. If no matching sourcing method is found, the event is silently ignored (but the version number is still updated). [wow-core/src/main/kotlin/me/ahoo/wow/modeling/state/StateAggregate.kt:31]
Event Persistence
Events are persisted to the EventStore via append(). This operation is atomic and enforces:
- Version ordering — the event version must equal
expectedNextVersion(current version + 1) - Aggregate ID uniqueness — the first event for a new aggregate must use a unique aggregate ID
- Request ID deduplication — prevents the same command from producing events twice
[wow-core/src/main/kotlin/me/ahoo/wow/eventsourcing/EventStore.kt:38]
Phase 4: Event Publishing
After the event stream is persisted, the SendDomainEventStreamFilter publishes it to the DomainEventBus:
The DomainEventBus interface supports two topologies:
- LocalDomainEventBus — in-process event delivery for single-instance deployments
- DistributedDomainEventBus — cross-process delivery via Kafka for distributed deployments
[wow-core/src/main/kotlin/me/ahoo/wow/event/DomainEventBus.kt:55]
Phase 5: Event Dispatching to Handlers
The DomainEventDispatcher receives events from the bus and dispatches them to registered handlers. It uses a composite pattern that separates event stream dispatching from state event dispatching:
CompositeEventDispatcher
The CompositeEventDispatcher manages two parallel sub-dispatchers:
- EventStreamDispatcher — subscribes to
DomainEventBus, dispatches to handlers withFunctionKind.EVENT(projections and sagas) - StateEventDispatcher — subscribes to
StateEventBus, dispatches to handlers withFunctionKind.STATE_EVENT(snapshot strategies)
Both sub-dispatchers use AggregateSchedulerSupplier to ensure per-aggregate ordering guarantees. Events for the same aggregate ID are always processed sequentially, even across different handler types. [wow-core/src/main/kotlin/me/ahoo/wow/event/dispatcher/CompositeEventDispatcher.kt:96]
Projection Processing
Projections receive domain events and update read models. The DefaultProjectionHandler uses a filter chain with LogResumeErrorHandler for fault tolerance — if a projection fails, the error is logged and processing continues with the next event. [wow-core/src/main/kotlin/me/ahoo/wow/projection/ProjectionHandler.kt:36]
Saga Processing
Stateless sagas receive domain events and can produce new commands. The DefaultStatelessSagaHandler also uses a filter chain pattern. Sagas coordinate long-running business processes across aggregate boundaries without maintaining their own state. [wow-core/src/main/kotlin/me/ahoo/wow/saga/stateless/StatelessSagaHandler.kt:36]
Snapshot Creation
The snapshot strategy evaluates state events and creates snapshots when criteria are met:
- SimpleSnapshotStrategy — creates a snapshot after every event
- VersionOffsetSnapshotStrategy — creates a snapshot at configurable version intervals
[wow-core/src/main/kotlin/me/ahoo/wow/eventsourcing/snapshot/SimpleSnapshotStrategy.kt:25]
Phase 6: Wait Strategy Notification
After the command has been processed, the wait strategy receives signals at each processing stage:
Wait Stages
The WaitStrategy supports waiting at different processing stages:
| Stage | Meaning |
|---|---|
SENT | Command accepted by the CommandBus |
PROCESSED | Command executed by the aggregate, events persisted |
PROJECTED | Projections have processed the events |
SNAPSHOT | Snapshot has been created |
The WaitingForStage factory creates strategies for each stage:
WaitingForStage.sent(commandId)— wait until the command is sentWaitingForStage.processed(commandId)— wait until events are persistedWaitingForStage.snapshot(commandId)— wait until snapshot is created
[wow-core/src/main/kotlin/me/ahoo/wow/command/CommandGateway.kt:145]
Signal Routing
When a downstream processor (projection, saga, snapshot) completes, it sends a WaitSignal through the CommandWaitNotifier. The WaitStrategyRegistrar routes signals to the correct strategy based on waitCommandId. [wow-core/src/main/kotlin/me/ahoo/wow/command/wait/WaitStrategy.kt:104]
Aggregate Loading (Read Path)
When an aggregate needs to be loaded for a new command, the framework reconstructs its state:
The loading process:
- Load snapshot — If a snapshot exists for the aggregate, start from that state and version
- Load remaining events — Fetch all events after the snapshot version from the
EventStore - Replay events — Apply each event stream to the
StateAggregateviaonSourcing()
The EventStore.load() method supports loading by version range or time range, and defaults to loading all events from version 1. [wow-core/src/main/kotlin/me/ahoo/wow/eventsourcing/EventStore.kt:54]
Error Handling
The data flow includes error handling at every phase:
Error Functions
The SimpleCommandAggregate supports per-command-type error functions. If an error function is registered for the command type, it is invoked when processing fails, allowing the aggregate to produce compensating events. [wow-core/src/main/kotlin/me/ahoo/wow/modeling/command/SimpleCommandAggregate.kt:150]
Projection/Saga Error Recovery
Projections and sagas use LogResumeErrorHandler — errors are logged but processing continues with the next event. This ensures that a failing projection does not block other handlers.
Related Pages
- Architecture Overview — layered architecture and CQRS patterns
- Module Dependencies — detailed module dependency graph