AggregateMessageDispatcher

Abstract dispatcher for handling message exchanges for a specific aggregate.

This dispatcher groups message exchanges by a key for parallel processing, applies metrics, and handles each exchange on a specified scheduler.

Parameters

T

The type of message exchange being handled

Inheritors

Constructors

Link copied to clipboard
constructor()

Types

Link copied to clipboard
object Companion

Properties

Link copied to clipboard
open override val aggregateName: String
Link copied to clipboard
open override val contextName: String
Link copied to clipboard
open override val isDisposed: Boolean
Link copied to clipboard
abstract val messageFlux: Flux<T>

The flux of message exchanges to be processed.

Link copied to clipboard
abstract val name: String
Link copied to clipboard
Link copied to clipboard
abstract val parallelism: Int

The level of parallelism for processing grouped exchanges.

Link copied to clipboard
abstract val scheduler: Scheduler

The scheduler to use for processing message exchanges.

Functions

Link copied to clipboard
fun NamedAggregate.aggregateId(id: String = generateId(), tenantId: String = TenantId.DEFAULT_TENANT_ID): DefaultAggregateId

Creates an AggregateId for this NamedAggregate with the specified parameters.

Link copied to clipboard

Finds the aggregate type class associated with this named aggregate.

Link copied to clipboard

Converts this NamedAggregate to its corresponding AggregateMetadata.

Link copied to clipboard
fun cancel()
Link copied to clipboard
open override fun close()

Closes the dispatcher by canceling the subscription.

Link copied to clipboard
open fun currentContext(): Context?
Link copied to clipboard
open fun dispose()
Link copied to clipboard

Generates a unique ID string for this NamedAggregate.

Link copied to clipboard
fun NamedBoundedContext.getContextAlias(boundedContext: BoundedContext? = MetadataSearcher.metadata.contexts[contextName]): String
Link copied to clipboard

Gets the context alias prefix for this bounded context.

Link copied to clipboard
abstract fun handleExchange(exchange: T): Mono<Void>

Handles a single message exchange.

Link copied to clipboard

Checks if the named aggregate is available locally at runtime.

Link copied to clipboard
Link copied to clipboard
Link copied to clipboard
Link copied to clipboard
Link copied to clipboard
fun onNext(value: Void?)
Link copied to clipboard
fun onSubscribe(s: Subscription?)
Link copied to clipboard
fun request(n: Long)
Link copied to clipboard
Link copied to clipboard

Finds the aggregate type class associated with this named aggregate, throwing an exception if not found.

Link copied to clipboard
open override fun run()

Starts the dispatcher by subscribing to the message flux.

Link copied to clipboard
open fun safeOnNext(value: Void)

Safely processes the next message.

Link copied to clipboard
open fun safeOnNextError(value: Void, throwable: Throwable)

Handles errors that occur during message processing.

Link copied to clipboard

Extension function to convert a NamedAggregate to an EventNamedAggregate.

Link copied to clipboard
abstract fun T.toGroupKey(): Int

Converts a message exchange to a grouping key for parallel processing.

Link copied to clipboard

Converts this NamedAggregate to its string representation.

Link copied to clipboard

Converts this NamedAggregate to a string representation using the context alias.