AggregateDispatcher

Abstract dispatcher for handling message exchanges for a specific aggregate with graceful shutdown support.

This dispatcher provides a robust framework for processing message exchanges in parallel, with built-in metrics collection, error handling, and graceful shutdown capabilities. Message exchanges are grouped by key for parallel processing, ensuring ordered execution within each group while allowing concurrent processing across different groups.

Key features:

  • Parallel message processing with configurable parallelism level

  • Metrics collection for monitoring dispatcher performance

  • Graceful shutdown that waits for active tasks to complete

  • Error handling through SafeSubscriber integration

  • Scheduler-based execution for resource management

Example usage:

class CustomAggregateDispatcher(
override val parallelism: Int = 4,
override val scheduler: Scheduler = Schedulers.boundedElastic(),
override val messageFlux: Flux<CommandExchange> = commandBus.receive("my-aggregate")
) : AggregateDispatcher<CommandExchange>() {

override fun CommandExchange.toGroupKey(): Int {
return command.aggregateId.hashCode() % parallelism
}

override fun handleExchange(exchange: CommandExchange): Mono<Void> {
return commandHandler.handle(exchange)
.doOnSuccess { exchange.acknowledge() }
}
}

// Usage
val dispatcher = CustomAggregateDispatcher()
dispatcher.start()

// Graceful shutdown
dispatcher.stopGracefully().block()

Parameters

T

The type of message exchange being handled, must implement MessageExchange

See also

for the interface this class implements

for error handling capabilities

for the exchange type contract

Inheritors

Constructors

Link copied to clipboard
constructor()

Types

Link copied to clipboard
object Companion

Properties

Link copied to clipboard
open override val aggregateName: String
Link copied to clipboard
open override val contextName: String
Link copied to clipboard
open override val isDisposed: Boolean
Link copied to clipboard
abstract val messageFlux: Flux<T>

The flux of message exchanges to be processed.

Link copied to clipboard
abstract val name: String
Link copied to clipboard
Link copied to clipboard
abstract val parallelism: Int

The level of parallelism for processing grouped exchanges.

Link copied to clipboard
abstract val scheduler: Scheduler

The scheduler to use for processing message exchanges.

Functions

Link copied to clipboard
fun NamedAggregate.aggregateId(id: String = generateId(), tenantId: String = TenantId.DEFAULT_TENANT_ID): DefaultAggregateId

Creates an AggregateId for this NamedAggregate with the specified parameters.

Link copied to clipboard

Finds the aggregate type class associated with this named aggregate.

Link copied to clipboard

Converts this NamedAggregate to its corresponding AggregateMetadata.

Link copied to clipboard
fun cancel()
Link copied to clipboard
open override fun close()

Closes this resource by performing a graceful shutdown.

Link copied to clipboard
open fun currentContext(): Context?
Link copied to clipboard
open fun dispose()
Link copied to clipboard

Generates a unique ID string for this NamedAggregate.

Link copied to clipboard
fun NamedBoundedContext.getContextAlias(boundedContext: BoundedContext? = MetadataSearcher.metadata.contexts[contextName]): String
Link copied to clipboard

Gets the context alias prefix for this bounded context.

Link copied to clipboard
abstract fun handleExchange(exchange: T): Mono<Void>

Handles a single message exchange.

Link copied to clipboard

Checks if the named aggregate is available locally at runtime.

Link copied to clipboard
Link copied to clipboard
Link copied to clipboard
Link copied to clipboard
Link copied to clipboard
fun onNext(value: Void?)
Link copied to clipboard
fun onSubscribe(s: Subscription?)
Link copied to clipboard
fun request(n: Long)
Link copied to clipboard
Link copied to clipboard

Finds the aggregate type class associated with this named aggregate, throwing an exception if not found.

Link copied to clipboard
open fun safeOnNext(value: Void)

Safely processes the next message.

Link copied to clipboard
open fun safeOnNextError(value: Void, throwable: Throwable)

Handles errors that occur during message processing.

Link copied to clipboard
open override fun start()

Starts the dispatcher by subscribing to the message flux.

Link copied to clipboard
open fun stop()

Synchronously closes this resource with graceful shutdown.

open fun stop(timeout: Duration)

Synchronously closes this resource with graceful shutdown within a specified timeout.

Link copied to clipboard
open override fun stopGracefully(): Mono<Void>

Performs a graceful shutdown of the dispatcher.

Link copied to clipboard

Extension function to convert a NamedAggregate to an EventNamedAggregate.

Link copied to clipboard
abstract fun T.toGroupKey(): Int

Converts a message exchange to a grouping key for parallel processing.

Link copied to clipboard

Converts this NamedAggregate to its string representation.

Link copied to clipboard

Converts this NamedAggregate to a string representation using the context alias.