Event Bus
The Event Bus is the central nervous system of Wow's event-driven architecture. It receives domain event streams from aggregates after they process commands and routes them to all interested consumers -- projections, sagas, event processors, and external systems. The bus guarantees ordered delivery per aggregate ID so that consumers always see events in the exact sequence they were produced.
Architecture Overview
The Event Bus is built on a layered abstraction that decouples the what (domain event transport) from the how (in-memory, Kafka, Redis). Every bus implementation starts from the same base contract and gains capabilities as it moves from local to distributed.
The layered design allows the framework to use the same bus contract whether processing events locally (single JVM, zero network overhead) or in a distributed cluster (Kafka-backed, multi-node). The default deployment uses a LocalFirstDomainEventBus that combines both strategies.
Core Interfaces
The Event Bus type hierarchy builds incrementally from a generic message bus to domain-event-specific contracts:
| Interface | Role | Extends | Source |
|---|---|---|---|
MessageBus<M, E> | Base contract: send() and receive() | AutoCloseable | MessageBus.kt:31-53 |
DomainEventBus | Event bus for DomainEventStream payloads | MessageBus, TopicKindCapable | DomainEventBus.kt:39-44 |
LocalDomainEventBus | In-process bus with subscriber counting | DomainEventBus, LocalMessageBus | DomainEventBus.kt:55-57 |
DistributedDomainEventBus | Cross-process / cross-node bus | DomainEventBus, DistributedMessageBus | DomainEventBus.kt:68-70 |
The TopicKind is always TOPIC_KIND.EVENT_STREAM for all domain event buses (DomainEventBus.kt:42-43), distinguishing them from command buses (COMMAND) and state event buses (STATE).
NoOpDomainEventBus
For testing and scenarios where event publishing is intentionally disabled, the singleton NoOpDomainEventBus (DomainEventBus.kt:81-97) silently discards all sends and returns empty fluxes on receive.
Event Publishing & Receiving Flow
The end-to-end flow from aggregate command processing through event delivery follows a publish-subscribe model with ordered per-aggregate delivery.
The sequence diagram reveals two critical design decisions:
Local-first routing (step 8--13): Events are delivered to in-process consumers before hitting the distributed bus. This gives projections and sagas running on the same node near-zero latency for event handling.
Duplicate prevention (final note): Distributed consumers check
isLocalHandled()and skip events already processed locally, preventing double-processing when a node is both producer and consumer.
Domain Event Stream
The DomainEventStream is the fundamental unit transported over the bus. It groups all domain events produced by a single command execution into one atomic payload.
| Property | Type | Description | Source |
|---|---|---|---|
id | String | Globally unique stream ID (generated via generateGlobalId()) | DomainEventStream.kt:91 |
requestId | String | Correlation ID linking stream to originating HTTP request | DomainEventStream.kt:92 |
header | Header | Message header containing metadata and propagation flags | DomainEventStream.kt:93 |
body | List<DomainEvent<*>> | Ordered list of domain events (must not be empty) | DomainEventStream.kt:94 |
aggregateId | AggregateId | Derived from the first event's aggregate ID | DomainEventStream.kt:97 |
version | Int | Aggregate version after applying this stream (from first event) | DomainEventStream.kt:106 |
size | Int | Number of domain events in the stream | DomainEventStream.kt:110 |
Events within a stream are sequentially numbered starting from DEFAULT_EVENT_SEQUENCE (1). The isLast flag on each event signals whether the stream continues or concludes, enabling consumers to batch-complete processing when the final event arrives.
Event Stream Factory
The toDomainEventStream() extension function creates a stream from command processing results. It flattens the output into individual events, assigns sequential IDs, and wraps them in a SimpleDomainEventStream:
fun Any.toDomainEventStream(
upstream: CommandMessage<*>,
aggregateVersion: Int = Version.UNINITIALIZED_VERSION,
...
): DomainEventStream {
val events = flatEvent().toDomainEvents(
streamVersion = aggregateVersion + 1,
aggregateId = upstream.aggregateId,
command = upstream,
...
)
return SimpleDomainEventStream(
id = generateGlobalId(),
requestId = upstream.requestId,
header = header,
body = events,
)
}The flatEvent() (DomainEventStreamFactory.kt:43-56) normalizes single events, arrays, and iterables into a consistent Iterable<Any> before creating individual DomainEvent instances.
Bus Implementations
Wow provides four concrete bus implementations covering the full spectrum from simple testing to production distributed clusters:
| Implementation | Class | Type | Use Case | Source |
|---|---|---|---|---|
| NoOp | NoOpDomainEventBus | Singleton | Testing, event publishing disabled | DomainEventBus.kt:81-97 |
| InMemory | InMemoryDomainEventBus | LocalDomainEventBus | Single-process apps, unit tests | InMemoryDomainEventBus.kt:38-54 |
| Kafka | KafkaDomainEventBus | DistributedDomainEventBus | Multi-node production | KafkaDomainEventBus.kt:22-41 |
| LocalFirst | LocalFirstDomainEventBus | DomainEventBus (hybrid) | Default production (combines InMemory + Kafka) | LocalFirstDomainEventBus.kt:38-42 |
Local-First Routing Strategy
The LocalFirstDomainEventBus is the default production bus. It implements a two-tier delivery strategy that optimizes for the common case where event consumers (projections, sagas) run in the same JVM as the aggregate.
The routing logic (LocalFirstMessageBus.kt:130-149) always sends a copy to the distributed bus regardless of local success or failure. This ensures other nodes always receive events, while local consumers benefit from sub-millisecond delivery via reactive sinks.
On the receive side (LocalFirstMessageBus.kt:160-170), the bus merges local and distributed fluxes, filtering out distributed events that were already processed locally via isLocalHandled().
Event Dispatcher Pipeline
Once the Event Bus delivers an EventStreamExchange to the dispatcher, a multi-stage pipeline processes each domain event through registered handler functions:
Dispatcher Components
| Component | Responsibility | Key Behavior | Source |
|---|---|---|---|
DomainEventDispatcher | Top-level coordinator | Creates EventStreamDispatcher + StateEventDispatcher; starts/stops both | DomainEventDispatcher.kt:44-84 |
EventStreamDispatcher | Routes event streams by aggregate | Groups flux by NamedAggregate, delegates to per-aggregate dispatchers | EventStreamDispatcher.kt:27-49 |
AggregateEventDispatcher | Per-aggregate event processing | Iterates events in order via concatMap, applies function filtering | AggregateEventDispatcher.kt:53-80 |
DomainEventFunctionRegistrar | Registers event handler functions | Resolves @EventProcessor classes → MessageFunction set via annotation metadata | DomainEventFunctionRegistrar.kt:92-112 |
DefaultDomainEventHandler | Filter chain executor | Runs FilterChain<DomainEventExchange<*>> with LogResumeErrorHandler | DomainEventHandler.kt:57-64 |
DomainEventFunctionFilter | Invokes user handler function | Sets ServiceProvider on exchange, calls eventFunction.invoke(exchange) | DomainEventFunctionFilter.kt:42-71 |
The CompositeEventDispatcher (CompositeEventDispatcher.kt:64-138) combines an EventStreamDispatcher (for regular domain events) and a StateEventDispatcher (for state-change events), filtering functions by FunctionKind.EVENT and FunctionKind.STATE_EVENT respectively.
Event Processor Annotation Metadata
When a class is annotated with @EventProcessor, the EventProcessorParser (EventProcessorParser.kt:34-36) scans methods annotated with @OnEvent or @OnStateEvent and converts them into MessageFunction instances. These are registered in the DomainEventFunctionRegistrar so that the dispatcher can match incoming events to the correct handler.
Each @OnEvent annotation can optionally specify aggregate name filters (vararg val value: String) to limit which aggregates the handler receives events from (OnEvent.kt:66-78).
Event Processor Lifecycle
Event processors follow a well-defined lifecycle managed by the dispatcher:
| Phase | Action | Details | Trigger |
|---|---|---|---|
| Discovery | DomainEventFunctionRegistrar.resolveProcessor() | Scans @EventProcessor classes, extracts @OnEvent methods, creates MessageFunction instances | Spring context refresh |
| Registration | registerProcessor(processor) | Registers each MessageFunction keyed by event type + aggregate name | After discovery |
| Subscription | MessageBus.receive(namedAggregates) | Subscribes to event streams for relevant aggregates | MessageDispatcher.start() |
| Grouping | EventStreamDispatcher groups by NamedAggregate | Routes events to per-aggregate schedulers | On each incoming batch |
| Matching | supportedFunctions(event) + event.match() | Finds registered functions that handle the given event type | Per event |
| Invocation | DomainEventFunctionFilter.filter() | Sets service provider, invokes handler via filter chain | Per event-to-function match |
| Acknowledgment | finallyAck(exchange) | Commits offset (Kafka) or marks processed (in-memory) | After all events in stream processed |
| Shutdown | MessageDispatcher.stopGracefully() | Waits for in-flight processing, closes subscriptions | Application shutdown |
Kafka Integration
When deployed in a distributed environment, Wow uses Apache Kafka as the event bus transport. Each named aggregate maps to a dedicated Kafka topic:
Topic Naming Convention
wow.{contextName}.{aggregateName}.eventFor example, an "order" aggregate in the default context produces the topic wow.order.event.
The naming is controlled by DefaultEventStreamTopicConverter (AggregateTopicConverter.kt:38-46), which prefixes the aggregate string representation with wow. (configurable via wow.kafka.topic-prefix).
Message Serialization
AbstractKafkaBus (AbstractKafkaBus.kt:39-131) handles serialization:
- Key:
message.aggregateId.id(the aggregate's string ID) -- ensures ordering per aggregate within a partition - Value: JSON serialization via
message.toJsonString()/toObject<DomainEventStream> - Partition:
null(let Kafka assign by key hash, guaranteeing per-aggregate ordering) - Timestamp:
message.createTime
Offset Acknowledgment
KafkaEventStreamExchange (KafkaEventStreamExchange.kt:22-32) wraps the ReceiverOffset and calls receiverOffset.acknowledge() when the dispatcher's finallyAck completes. This ensures at-least-once delivery with manual offset commits.
Kafka Configuration Properties
Configuration is managed by KafkaProperties (KafkaProperties.kt:27-68), bound to the wow.kafka prefix:
| Property | Type | Default | Description | Source |
|---|---|---|---|---|
wow.kafka.enabled | Boolean | true | Enable Kafka integration | KafkaProperties.kt:29 |
wow.kafka.bootstrap-servers | List<String> | Required | Kafka broker addresses | KafkaProperties.kt:30 |
wow.kafka.topic-prefix | String | wow. | Prefix for all topic names | KafkaProperties.kt:31 |
wow.kafka.properties | Map<String,String> | {} | Common Kafka client properties | KafkaProperties.kt:35 |
wow.kafka.producer | Map<String,String> | {} | Producer-specific overrides | KafkaProperties.kt:36 |
wow.kafka.consumer | Map<String,String> | {} | Consumer-specific overrides | KafkaProperties.kt:37 |
Event Bus Configuration Properties
The event bus type is selected via EventProperties (EventProperties.kt:21-30):
| Property | Type | Default | Description |
|---|---|---|---|
wow.event.bus.type | BusType | kafka | Event bus implementation (kafka, redis, in_memory, no_op) |
wow.event.bus.local-first.enabled | Boolean | true | Enable LocalFirstDomainEventBus wrapping (in-memory + distributed) |
YAML Example:
wow:
event:
bus:
type: kafka
local-first:
enabled: true
kafka:
enabled: true
bootstrap-servers:
- localhost:9092
topic-prefix: "wow."
producer:
acks: all
retries: 3
consumer:
auto-offset-reset: earliestSpring Boot Auto-Configuration
The KafkaAutoConfiguration (KafkaAutoConfiguration.kt:43-127) registers beans conditionally:
KafkaDomainEventBusis created only whenwow.event.bus.type=kafka(or default) andwow.kafka.enabled=trueDefaultEventStreamTopicConverteruses the configuredtopicPrefixReceiverOptionsCustomizerallows per-service overrides of Kafka consumer settings- The bus is wired into
LocalFirstDomainEventBuswhenlocal-first.enabled=true
Event Upgrade Pipeline
When an event's revision does not match the current schema version, Wow's event upgrader system (in wow-core/src/main/kotlin/me/ahoo/wow/event/upgrader/) transparently migrates legacy event formats before they reach consumers. This ensures backward compatibility as domain models evolve without requiring consumer changes.
Key Design Decisions
Why ordered delivery per aggregate? Using aggregateId.id as the Kafka partition key (AbstractKafkaBus.kt:106) guarantees that all events for a given aggregate instance arrive in the order they were produced. This is essential for projections that reconstruct aggregate state and sagas that depend on sequential business milestones.
Why local-first by default? The framework's primary optimization target is the case where the aggregate, its projections, and its sagas all run in the same JVM. LocalFirstDomainEventBus eliminates network round-trips for the 80% case while still ensuring distributed visibility.
Why stream-based (not event-per-message)? Transporting events as a DomainEventStream (all events from one command execution) rather than individual events reduces message overhead, preserves atomic context, and allows consumers to implement efficient batch completion logic via the isLast flag.
Related Pages
| Page | Description |
|---|---|
| Command Bus | Command routing and dispatch architecture |
| Event Store | How domain events are persisted and loaded |
| Event Processor | Creating and configuring event handlers |
| Saga Processor | Distributed transaction orchestration via events |
| Kafka Configuration | Kafka connection and topic configuration |
| Event Configuration | Event bus type and routing settings |
| Architecture Overview | High-level framework architecture |
| CQRS Reference | CQRS patterns in Wow |