AggregateCommandDispatcher

class AggregateCommandDispatcher<C : Any, S : Any>(val aggregateMetadata: AggregateMetadata<C, S>, val parallelism: Int = MessageParallelism.DEFAULT_PARALLELISM, val scheduler: Scheduler, val messageFlux: Flux<ServerCommandExchange<*>>, val name: String = "-", aggregateProcessorFactory: AggregateProcessorFactory, commandHandler: CommandHandler, serviceProvider: ServiceProvider) : AggregateMessageDispatcher<ServerCommandExchange<*>>

Aggregate command dispatcher grouped by named aggregate.

This dispatcher manages command processing for a specific named aggregate, ensuring proper parallelism and thread affinity. Each aggregate ID is bound to one worker thread, but one worker can handle multiple aggregate IDs, providing efficient resource utilization.

Key characteristics:

  • One AggregateId binds to one Worker (Thread)

  • One Worker can be bound by multiple aggregateIds

  • Workers have aggregate ID affinity for consistent processing

Parameters

C

The type of the command aggregate root.

S

The type of the state aggregate.

parallelism

The level of parallelism for message processing.

scheduler

The scheduler for handling messages.

messageFlux

The flux of command exchanges to process.

name

The name of this dispatcher.

aggregateProcessorFactory

Factory for creating aggregate processors.

commandHandler

The command handler for processing commands.

serviceProvider

Provider for accessing services.

Constructors

Link copied to clipboard
constructor(aggregateMetadata: AggregateMetadata<C, S>, parallelism: Int = MessageParallelism.DEFAULT_PARALLELISM, scheduler: Scheduler, messageFlux: Flux<ServerCommandExchange<*>>, name: String = "-", aggregateProcessorFactory: AggregateProcessorFactory, commandHandler: CommandHandler, serviceProvider: ServiceProvider)

Properties

Link copied to clipboard

The metadata for the aggregate being dispatched.

Link copied to clipboard
open override val aggregateName: String
Link copied to clipboard
open override val contextName: String
Link copied to clipboard
open override val isDisposed: Boolean
Link copied to clipboard
open override val messageFlux: Flux<ServerCommandExchange<*>>
Link copied to clipboard
open override val name: String
Link copied to clipboard
open override val namedAggregate: NamedAggregate
Link copied to clipboard
open override val parallelism: Int
Link copied to clipboard
open override val scheduler: Scheduler

Functions

Link copied to clipboard
fun NamedAggregate.aggregateId(id: String = generateId(), tenantId: String = TenantId.DEFAULT_TENANT_ID): DefaultAggregateId

Creates an AggregateId for this NamedAggregate with the specified parameters.

Link copied to clipboard

Finds the aggregate type class associated with this named aggregate.

Link copied to clipboard

Converts this NamedAggregate to its corresponding AggregateMetadata.

Link copied to clipboard
fun cancel()
Link copied to clipboard
open override fun close()

Closes the dispatcher by canceling the subscription.

Link copied to clipboard
open fun currentContext(): Context?
Link copied to clipboard
open fun dispose()
Link copied to clipboard

Generates a unique ID string for this NamedAggregate.

Link copied to clipboard
fun NamedBoundedContext.getContextAlias(boundedContext: BoundedContext? = MetadataSearcher.metadata.contexts[contextName]): String
Link copied to clipboard

Gets the context alias prefix for this bounded context.

Link copied to clipboard
open override fun handleExchange(exchange: ServerCommandExchange<*>): Mono<Void>

Handles a single command exchange by setting up the processing context and delegating to the command handler.

Link copied to clipboard

Checks if the named aggregate is available locally at runtime.

Link copied to clipboard
Link copied to clipboard
Link copied to clipboard
Link copied to clipboard
Link copied to clipboard
fun onNext(value: Void?)
Link copied to clipboard
fun onSubscribe(s: Subscription?)
Link copied to clipboard
fun request(n: Long)
Link copied to clipboard
Link copied to clipboard

Finds the aggregate type class associated with this named aggregate, throwing an exception if not found.

Link copied to clipboard
open override fun run()

Starts the dispatcher by subscribing to the message flux.

Link copied to clipboard
open fun safeOnNext(value: Void)

Safely processes the next message.

Link copied to clipboard
open fun safeOnNextError(value: Void, throwable: Throwable)

Handles errors that occur during message processing.

Link copied to clipboard

Extension function to convert a NamedAggregate to an EventNamedAggregate.

Link copied to clipboard
open override fun ServerCommandExchange<*>.toGroupKey(): Int

Generates a group key for the command exchange to ensure proper parallelism and ordering.

Link copied to clipboard

Converts this NamedAggregate to its string representation.

Link copied to clipboard

Converts this NamedAggregate to a string representation using the context alias.