AggregateEventDispatcher

class AggregateEventDispatcher(val namedAggregate: NamedAggregate, val name: String = "-", val parallelism: Int = MessageParallelism.DEFAULT_PARALLELISM, val messageFlux: Flux<EventStreamExchange>, val functionRegistrar: MessageFunctionRegistrar<MessageFunction<Any, DomainEventExchange<*>, Mono<*>>>, val eventHandler: EventHandler, val scheduler: Scheduler) : AbstractAggregateEventDispatcher<EventStreamExchange>

Dispatcher for processing domain events within a specific aggregate context.

This class handles the distribution and processing of domain event streams for a particular named aggregate. It extends AbstractAggregateEventDispatcher to provide concrete implementation for event stream processing.

See also

Constructors

Link copied to clipboard
constructor(namedAggregate: NamedAggregate, name: String = "-", parallelism: Int = MessageParallelism.DEFAULT_PARALLELISM, messageFlux: Flux<EventStreamExchange>, functionRegistrar: MessageFunctionRegistrar<MessageFunction<Any, DomainEventExchange<*>, Mono<*>>>, eventHandler: EventHandler, scheduler: Scheduler)

Creates a new AggregateEventDispatcher with the specified parameters

Properties

Link copied to clipboard
open override val aggregateName: String
Link copied to clipboard
open override val contextName: String
Link copied to clipboard
open override val eventHandler: EventHandler

The handler for processing individual events

Link copied to clipboard

The registrar containing event processing functions

Link copied to clipboard
open override val isDisposed: Boolean
Link copied to clipboard
open override val messageFlux: Flux<EventStreamExchange>

The flux of event stream exchanges to process

Link copied to clipboard
open override val name: String

The name of this dispatcher (default: derived from aggregate name)

Link copied to clipboard
open override val namedAggregate: NamedAggregate

The named aggregate this dispatcher handles

Link copied to clipboard
open override val parallelism: Int

The level of parallelism for processing (default: DEFAULT_PARALLELISM)

Link copied to clipboard
open override val scheduler: Scheduler

The scheduler for managing event processing concurrency

Functions

Link copied to clipboard
fun NamedAggregate.aggregateId(id: String = generateId(), tenantId: String = TenantId.DEFAULT_TENANT_ID): DefaultAggregateId

Creates an AggregateId for this NamedAggregate with the specified parameters.

Link copied to clipboard

Finds the aggregate type class associated with this named aggregate.

Link copied to clipboard

Converts this NamedAggregate to its corresponding AggregateMetadata.

Link copied to clipboard
fun cancel()
Link copied to clipboard
open override fun close()

Closes the dispatcher by canceling the subscription.

Link copied to clipboard

Creates a domain event exchange from an event stream exchange and domain event.

Link copied to clipboard
open fun currentContext(): Context?
Link copied to clipboard
open fun dispose()
Link copied to clipboard

Generates a unique ID string for this NamedAggregate.

Link copied to clipboard
fun NamedBoundedContext.getContextAlias(boundedContext: BoundedContext? = MetadataSearcher.metadata.contexts[contextName]): String
Link copied to clipboard

Gets the context alias prefix for this bounded context.

Link copied to clipboard
open override fun handleExchange(exchange: EventStreamExchange): Mono<Void>

Handles a message exchange by processing all events in the stream.

Link copied to clipboard

Checks if the named aggregate is available locally at runtime.

Link copied to clipboard
Link copied to clipboard
Link copied to clipboard
Link copied to clipboard
Link copied to clipboard
fun onNext(value: Void?)
Link copied to clipboard
fun onSubscribe(s: Subscription?)
Link copied to clipboard
fun request(n: Long)
Link copied to clipboard
Link copied to clipboard

Finds the aggregate type class associated with this named aggregate, throwing an exception if not found.

Link copied to clipboard
open override fun run()

Starts the dispatcher by subscribing to the message flux.

Link copied to clipboard
open fun safeOnNext(value: Void)

Safely processes the next message.

Link copied to clipboard
open fun safeOnNextError(value: Void, throwable: Throwable)

Handles errors that occur during message processing.

Link copied to clipboard

Extension function to convert a NamedAggregate to an EventNamedAggregate.

Link copied to clipboard
open override fun EventStreamExchange.toGroupKey(): Int

Converts the exchange to a group key for parallel processing.

Link copied to clipboard

Converts this NamedAggregate to its string representation.

Link copied to clipboard

Converts this NamedAggregate to a string representation using the context alias.