MainDispatcher

Abstract base class for message dispatchers that manage multiple aggregate dispatchers.

This class coordinates the dispatching of messages to multiple named aggregates by creating individual dispatchers for each aggregate and managing their lifecycle. It provides a framework for implementing dispatchers that need to handle messages across different aggregates, ensuring proper initialization, starting, and graceful shutdown.

Subclasses must implement the abstract methods to define how messages are received for each aggregate and how individual aggregate dispatchers are created.

Example usage:

class MyMainDispatcher : MainDispatcher<MyMessage>() {
override val namedAggregates = setOf(myAggregate1, myAggregate2)

override fun receiveMessage(namedAggregate: NamedAggregate): Flux<MyMessage> {
// Implementation to receive messages for the aggregate
return myMessageFlux
}

override fun newAggregateDispatcher(
namedAggregate: NamedAggregate,
messageFlux: Flux<MyMessage>
): MessageDispatcher {
// Implementation to create dispatcher for the aggregate
return MyAggregateDispatcher(namedAggregate, messageFlux)
}
}

val dispatcher = MyMainDispatcher()
dispatcher.start()
// ... application logic ...
dispatcher.stopGracefully().block()

Parameters

T

The type of message being dispatched, must be a non-null type.

See also

Inheritors

Constructors

Link copied to clipboard
constructor()

Types

Link copied to clipboard
object Companion

Properties

Link copied to clipboard
abstract val name: String
Link copied to clipboard

The set of named aggregates that this dispatcher will manage.

Functions

Link copied to clipboard
open override fun close()

Closes this resource by performing a graceful shutdown.

Link copied to clipboard
abstract fun newAggregateDispatcher(namedAggregate: NamedAggregate, messageFlux: Flux<T>): MessageDispatcher

Creates a new message dispatcher for a specific named aggregate.

Link copied to clipboard
abstract fun receiveMessage(namedAggregate: NamedAggregate): Flux<T>

Creates a flux of messages for the specified named aggregate.

Link copied to clipboard
open override fun start()

Starts the dispatcher by running all aggregate dispatchers.

Link copied to clipboard
open fun stop()

Synchronously closes this resource with graceful shutdown.

open fun stop(timeout: Duration)

Synchronously closes this resource with graceful shutdown within a specified timeout.

Link copied to clipboard
open override fun stopGracefully(): Mono<Void>

Stops the dispatcher gracefully by shutting down all aggregate dispatchers.