AggregateDispatcher
Abstract dispatcher for handling message exchanges for a specific aggregate with graceful shutdown support.
This dispatcher provides a robust framework for processing message exchanges in parallel, with built-in metrics collection, error handling, and graceful shutdown capabilities. Message exchanges are grouped by key for parallel processing, ensuring ordered execution within each group while allowing concurrent processing across different groups.
Key features:
Parallel message processing with configurable parallelism level
Metrics collection for monitoring dispatcher performance
Graceful shutdown that waits for active tasks to complete
Error handling through SafeSubscriber integration
Scheduler-based execution for resource management
Example usage:
class CustomAggregateDispatcher(
override val parallelism: Int = 4,
override val scheduler: Scheduler = Schedulers.boundedElastic(),
override val messageFlux: Flux<CommandExchange> = commandBus.receive("my-aggregate")
) : AggregateDispatcher<CommandExchange>() {
override fun CommandExchange.toGroupKey(): Int {
return command.aggregateId.hashCode() % parallelism
}
override fun handleExchange(exchange: CommandExchange): Mono<Void> {
return commandHandler.handle(exchange)
.doOnSuccess { exchange.acknowledge() }
}
}
// Usage
val dispatcher = CustomAggregateDispatcher()
dispatcher.start()
// Graceful shutdown
dispatcher.stopGracefully().block()Parameters
The type of message exchange being handled, must implement MessageExchange
See also
for the interface this class implements
for error handling capabilities
for the exchange type contract
Inheritors
Properties
Functions
Creates an AggregateId for this NamedAggregate with the specified parameters.
Finds the aggregate type class associated with this named aggregate.
Converts this NamedAggregate to its corresponding AggregateMetadata.
Generates a unique ID string for this NamedAggregate.
Gets the context alias prefix for this bounded context.
Handles a single message exchange.
Checks if the named aggregate is available locally at runtime.
Materializes this NamedAggregate into a MaterializedNamedAggregate.
Finds the aggregate type class associated with this named aggregate, throwing an exception if not found.
Safely processes the next message.
Handles errors that occur during message processing.
Performs a graceful shutdown of the dispatcher.
Extension function to convert a NamedAggregate to an EventNamedAggregate.
Converts a message exchange to a grouping key for parallel processing.
Converts this NamedAggregate to its string representation.
Converts this NamedAggregate to a string representation using the context alias.