InMemoryCommandBus

class InMemoryCommandBus(val sinkSupplier: (NamedAggregate) -> Sinks.Many<CommandMessage<*>> = { Sinks.many().unicast().onBackpressureBuffer() }) : InMemoryMessageBus<CommandMessage<*>, ServerCommandExchange<*>> , LocalCommandBus

In-memory implementation of CommandBus for local command processing. This bus uses unicast sinks to ensure each command has exactly one consumer, making it suitable for single-instance or testing scenarios.

Author

ahoo wang

Parameters

sinkSupplier

Function that creates a unicast sink for each named aggregate. Defaults to unicast with backpressure buffer.

Constructors

Link copied to clipboard
constructor(sinkSupplier: (NamedAggregate) -> Sinks.Many<CommandMessage<*>> = { Sinks.many().unicast().onBackpressureBuffer() })

Properties

Link copied to clipboard
open override val sinkSupplier: (NamedAggregate) -> Sinks.Many<CommandMessage<*>>

Supplier for creating sinks for command distribution. Uses unicast mode to ensure each command reaches exactly one consumer.

Link copied to clipboard
open override val topicKind: TopicKind

The topic kind for command messages, always returns TopicKind.COMMAND

Functions

Link copied to clipboard
open override fun close()

Closes the message bus and releases any resources. Default implementation does nothing.

Link copied to clipboard

Creates a server command exchange for the given command message. This exchange handles the command processing lifecycle.

Link copied to clipboard

Wraps a LocalCommandBus with metrics collection capabilities. Returns a MetricLocalCommandBus that collects metrics on command operations.

Link copied to clipboard
open override fun receive(namedAggregates: Set<NamedAggregate>): Flux<ServerCommandExchange<*>>

Receives messages for the specified named aggregates.

Link copied to clipboard
open override fun send(message: CommandMessage<*>): Mono<Void>

Sends a message through the in-memory bus.

Link copied to clipboard
open override fun subscriberCount(namedAggregate: NamedAggregate): Int

Returns the number of subscribers for the specified named aggregate.