MonoCommandWaitNotifier

class MonoCommandWaitNotifier<E : MessageExchange<*, M>, M : Message<*, *>, CommandId, NamedBoundedContext, AggregateIdCapable>(commandWaitNotifier: CommandWaitNotifier, processingStage: CommandStage, messageExchange: E, source: Mono<Void>) : Mono<Void>

A Mono wrapper that automatically notifies wait strategies when command processing completes. This class intercepts the completion of a Mono operation and sends appropriate wait signals to registered wait strategies based on the processing stage and message exchange.

Parameters

E

The type of message exchange.

M

The type of message in the exchange.

commandWaitNotifier

The notifier used to send wait signals.

processingStage

The command processing stage being notified.

messageExchange

The message exchange containing processing context.

source

The original Mono operation to wrap.

Constructors

Link copied to clipboard
constructor(commandWaitNotifier: CommandWaitNotifier, processingStage: CommandStage, messageExchange: E, source: Mono<Void>)

Functions

Link copied to clipboard
fun and(other: Publisher<*>?): Mono<Void?>?
Link copied to clipboard
fun <P : Any?> as(transformer: Function<in Mono<Void?>?, P?>?): P?
Link copied to clipboard
@Nullable
open fun block(): Void?
@Nullable
open fun block(timeout: Duration?): Void?
Link copied to clipboard
open fun blockOptional(): Optional<Void?>?
open fun blockOptional(timeout: Duration?): Optional<Void?>?
Link copied to clipboard
fun cache(): Mono<Void?>?
fun cache(ttl: Duration?): Mono<Void?>?
fun cache(ttl: Duration?, timer: Scheduler?): Mono<Void?>?
fun cache(ttlForValue: Function<in Void?, Duration?>?, ttlForError: Function<Throwable?, Duration?>?, ttlForEmpty: Supplier<Duration?>?): Mono<Void?>?
fun cache(ttlForValue: Function<in Void?, Duration?>?, ttlForError: Function<Throwable?, Duration?>?, ttlForEmpty: Supplier<Duration?>?, timer: Scheduler?): Mono<Void?>?
Link copied to clipboard
fun cacheInvalidateIf(invalidationPredicate: Predicate<in Void?>?): Mono<Void?>?
Link copied to clipboard
fun cacheInvalidateWhen(invalidationTriggerGenerator: Function<in Void?, Mono<Void?>?>?): Mono<Void?>?
fun cacheInvalidateWhen(invalidationTriggerGenerator: Function<in Void?, Mono<Void?>?>?, onInvalidate: Consumer<in Void?>?): Mono<Void?>?
Link copied to clipboard
fun cancelOn(scheduler: Scheduler?): Mono<Void?>?
Link copied to clipboard
fun <E : Any?> cast(clazz: Class<E?>?): Mono<E?>?
Link copied to clipboard
fun checkpoint(): Mono<Void?>?
fun checkpoint(description: String?): Mono<Void?>?
fun checkpoint(@Nullable description: String?, forceStackTrace: Boolean): Mono<Void?>?
Link copied to clipboard
fun concatWith(other: Publisher<out Void?>?): Flux<Void?>?
Link copied to clipboard
fun contextCapture(): Mono<Void?>?
Link copied to clipboard
fun contextWrite(contextModifier: Function<Context?, Context?>?): Mono<Void?>?
fun contextWrite(contextToAppend: ContextView?): Mono<Void?>?
Link copied to clipboard
fun defaultIfEmpty(defaultV: Void?): Mono<Void?>?
Link copied to clipboard
fun delayElement(delay: Duration?): Mono<Void?>?
fun delayElement(delay: Duration?, timer: Scheduler?): Mono<Void?>?
Link copied to clipboard
fun delaySubscription(delay: Duration?): Mono<Void?>?
fun <U : Any?> delaySubscription(subscriptionDelay: Publisher<U?>?): Mono<Void?>?
fun delaySubscription(delay: Duration?, timer: Scheduler?): Mono<Void?>?
Link copied to clipboard
fun delayUntil(triggerProvider: Function<in Void?, out Publisher<*>?>?): Mono<Void?>?
Link copied to clipboard
fun <X : Any?> dematerialize(): Mono<X?>?
Link copied to clipboard
fun doAfterTerminate(afterTerminate: Runnable?): Mono<Void?>?
Link copied to clipboard
fun doFinally(onFinally: Consumer<SignalType?>?): Mono<Void?>?
Link copied to clipboard
fun doFirst(onFirst: Runnable?): Mono<Void?>?
Link copied to clipboard
fun doOnCancel(onCancel: Runnable?): Mono<Void?>?
Link copied to clipboard
fun <R : Any?> doOnDiscard(type: Class<R?>?, discardHook: Consumer<in R?>?): Mono<Void?>?
Link copied to clipboard
fun doOnEach(signalConsumer: Consumer<in Signal<Void?>?>?): Mono<Void?>?
Link copied to clipboard
fun doOnError(onError: Consumer<in Throwable?>?): Mono<Void?>?
fun <E : Throwable?> doOnError(exceptionType: Class<E?>?, onError: Consumer<in E?>?): Mono<Void?>?
fun doOnError(predicate: Predicate<in Throwable?>?, onError: Consumer<in Throwable?>?): Mono<Void?>?
Link copied to clipboard
fun doOnNext(onNext: Consumer<in Void?>?): Mono<Void?>?
Link copied to clipboard
fun doOnRequest(consumer: LongConsumer?): Mono<Void?>?
Link copied to clipboard
fun doOnSubscribe(onSubscribe: Consumer<in Subscription?>?): Mono<Void?>?
Link copied to clipboard
fun doOnSuccess(onSuccess: Consumer<in Void?>?): Mono<Void?>?
Link copied to clipboard
fun doOnTerminate(onTerminate: Runnable?): Mono<Void?>?
Link copied to clipboard
fun elapsed(): Mono<Tuple2<Long?, Void?>?>?
fun elapsed(scheduler: Scheduler?): Mono<Tuple2<Long?, Void?>?>?
Link copied to clipboard
fun expand(expander: Function<in Void?, out Publisher<out Void?>?>?): Flux<Void?>?
fun expand(expander: Function<in Void?, out Publisher<out Void?>?>?, capacityHint: Int): Flux<Void?>?
Link copied to clipboard
fun expandDeep(expander: Function<in Void?, out Publisher<out Void?>?>?): Flux<Void?>?
fun expandDeep(expander: Function<in Void?, out Publisher<out Void?>?>?, capacityHint: Int): Flux<Void?>?
Link copied to clipboard
fun filter(tester: Predicate<in Void?>?): Mono<Void?>?
Link copied to clipboard
fun filterWhen(asyncPredicate: Function<in Void?, out Publisher<Boolean?>?>?): Mono<Void?>?
Link copied to clipboard
fun Mono<*>.finallyAck(exchange: MessageExchange<*, *>): Mono<Void>

Ensures the exchange is acknowledged after Mono completion, even on error.

Link copied to clipboard
fun <R : Any?> flatMap(transformer: Function<in Void?, out Mono<out R?>?>?): Mono<R?>?
Link copied to clipboard
fun <R : Any?> flatMapIterable(mapper: Function<in Void?, out Iterable<out R?>?>?): Flux<R?>?
Link copied to clipboard
fun <R : Any?> flatMapMany(mapper: Function<in Void?, out Publisher<out R?>?>?): Flux<R?>?
fun <R : Any?> flatMapMany(mapperOnNext: Function<in Void?, out Publisher<out R?>?>?, mapperOnError: Function<in Throwable?, out Publisher<out R?>?>?, mapperOnComplete: Supplier<out Publisher<out R?>?>?): Flux<R?>?
Link copied to clipboard
fun flux(): Flux<Void?>?
Link copied to clipboard
fun <R : Any?> handle(handler: BiConsumer<in Void?, SynchronousSink<R?>?>?): Mono<R?>?
Link copied to clipboard
fun hasElement(): Mono<Boolean?>?
Link copied to clipboard
fun hide(): Mono<Void?>?
Link copied to clipboard
fun ignoreElement(): Mono<Void?>?
Link copied to clipboard
fun log(): Mono<Void?>?
fun log(@Nullable category: String?): Mono<Void?>?
fun log(logger: Logger?): Mono<Void?>?
fun log(@Nullable category: String?, level: Level?, vararg options: SignalType?): Mono<Void?>?
fun log(@Nullable category: String?, level: Level?, showOperatorLine: Boolean, vararg options: SignalType?): Mono<Void?>?
fun log(logger: Logger?, level: Level?, showOperatorLine: Boolean, vararg options: SignalType?): Mono<Void?>?
Link copied to clipboard
fun <T : Any> Mono<T>.logErrorResume(): Mono<T>

Extension function to log errors and resume with empty Mono.

Link copied to clipboard
fun <R : Any?> map(mapper: Function<in Void?, out R?>?): Mono<R?>?
Link copied to clipboard
fun <R : Any?> mapNotNull(mapper: Function<in Void?, out R?>?): Mono<R?>?
Link copied to clipboard
fun materialize(): Mono<Signal<Void?>?>?
Link copied to clipboard
fun mergeWith(other: Publisher<out Void?>?): Flux<Void?>?
Link copied to clipboard
fun metrics(): Mono<Void?>?
Link copied to clipboard
fun name(name: String?): Mono<Void?>?
Link copied to clipboard
fun <U : Any?> ofType(clazz: Class<U?>?): Mono<U?>?
Link copied to clipboard
fun onErrorComplete(): Mono<Void?>?
fun onErrorComplete(type: Class<out Throwable?>?): Mono<Void?>?
fun onErrorComplete(predicate: Predicate<in Throwable?>?): Mono<Void?>?
Link copied to clipboard
fun onErrorContinue(errorConsumer: BiConsumer<Throwable?, Any?>?): Mono<Void?>?
fun <E : Throwable?> onErrorContinue(type: Class<E?>?, errorConsumer: BiConsumer<Throwable?, Any?>?): Mono<Void?>?
fun <E : Throwable?> onErrorContinue(errorPredicate: Predicate<E?>?, errorConsumer: BiConsumer<Throwable?, Any?>?): Mono<Void?>?
Link copied to clipboard
fun onErrorMap(mapper: Function<in Throwable?, out Throwable?>?): Mono<Void?>?
fun <E : Throwable?> onErrorMap(type: Class<E?>?, mapper: Function<in E?, out Throwable?>?): Mono<Void?>?
fun onErrorMap(predicate: Predicate<in Throwable?>?, mapper: Function<in Throwable?, out Throwable?>?): Mono<Void?>?
Link copied to clipboard
fun onErrorResume(fallback: Function<in Throwable?, out Mono<out Void?>?>?): Mono<Void?>?
fun <E : Throwable?> onErrorResume(type: Class<E?>?, fallback: Function<in E?, out Mono<out Void?>?>?): Mono<Void?>?
fun onErrorResume(predicate: Predicate<in Throwable?>?, fallback: Function<in Throwable?, out Mono<out Void?>?>?): Mono<Void?>?
Link copied to clipboard
fun onErrorReturn(fallbackValue: Void?): Mono<Void?>?
fun <E : Throwable?> onErrorReturn(type: Class<E?>?, fallbackValue: Void?): Mono<Void?>?
fun onErrorReturn(predicate: Predicate<in Throwable?>?, fallbackValue: Void?): Mono<Void?>?
Link copied to clipboard
fun onErrorStop(): Mono<Void?>?
Link copied to clipboard
fun onTerminateDetach(): Mono<Void?>?
Link copied to clipboard
fun or(other: Mono<out Void?>?): Mono<Void?>?
Link copied to clipboard
fun <R : Any?> publish(transform: Function<in Mono<Void?>?, out Mono<out R?>?>?): Mono<R?>?
Link copied to clipboard
fun publishOn(scheduler: Scheduler?): Mono<Void?>?
Link copied to clipboard
fun repeat(): Flux<Void?>?
fun repeat(predicate: BooleanSupplier?): Flux<Void?>?
fun repeat(numRepeat: Long): Flux<Void?>?
fun repeat(numRepeat: Long, predicate: BooleanSupplier?): Flux<Void?>?
Link copied to clipboard
fun repeatWhen(repeatFactory: Function<Flux<Long?>?, out Publisher<*>?>?): Flux<Void?>?
Link copied to clipboard
fun repeatWhenEmpty(repeatFactory: Function<Flux<Long?>?, out Publisher<*>?>?): Mono<Void?>?
fun repeatWhenEmpty(maxRepeat: Int, repeatFactory: Function<Flux<Long?>?, out Publisher<*>?>?): Mono<Void?>?
Link copied to clipboard
fun retry(): Mono<Void?>?
fun retry(numRetries: Long): Mono<Void?>?
Link copied to clipboard
fun retryWhen(retrySpec: Retry?): Mono<Void?>?
Link copied to clipboard
fun share(): Mono<Void?>?
Link copied to clipboard
fun single(): Mono<Void?>?
Link copied to clipboard
fun singleOptional(): Mono<Optional<Void?>?>?
Link copied to clipboard
open override fun subscribe(actual: CoreSubscriber<in Void>)
fun subscribe(actual: Subscriber<in Void?>?)
fun subscribe(): Disposable?
fun subscribe(consumer: Consumer<in Void?>?): Disposable?
fun subscribe(@Nullable consumer: Consumer<in Void?>?, errorConsumer: Consumer<in Throwable?>?): Disposable?
fun subscribe(@Nullable consumer: Consumer<in Void?>?, @Nullable errorConsumer: Consumer<in Throwable?>?, @Nullable completeConsumer: Runnable?): Disposable?
fun subscribe(@Nullable consumer: Consumer<in Void?>?, @Nullable errorConsumer: Consumer<in Throwable?>?, @Nullable completeConsumer: Runnable?, @Nullable subscriptionConsumer: Consumer<in Subscription?>?): Disposable?
fun subscribe(@Nullable consumer: Consumer<in Void?>?, @Nullable errorConsumer: Consumer<in Throwable?>?, @Nullable completeConsumer: Runnable?, @Nullable initialContext: Context?): Disposable?
Link copied to clipboard
fun subscribeOn(scheduler: Scheduler?): Mono<Void?>?
Link copied to clipboard
fun <E : Subscriber<in Void?>?> subscribeWith(subscriber: E?): E?
Link copied to clipboard
fun switchIfEmpty(alternate: Mono<out Void?>?): Mono<Void?>?
Link copied to clipboard
fun tag(key: String?, value: String?): Mono<Void?>?
Link copied to clipboard
fun <M> Mono<M>.tagSource(source: String): Mono<M>

Static extension function to tag a Mono publisher with a specified source identifier. This is used for metrics tagging to identify the component that generated the metrics.

Link copied to clipboard
fun take(duration: Duration?): Mono<Void?>?
fun take(duration: Duration?, timer: Scheduler?): Mono<Void?>?
Link copied to clipboard
fun takeUntilOther(other: Publisher<*>?): Mono<Void?>?
Link copied to clipboard
fun tap(listenerGenerator: Function<ContextView?, SignalListener<Void?>?>?): Mono<Void?>?
fun tap(simpleListenerGenerator: Supplier<SignalListener<Void?>?>?): Mono<Void?>?
fun tap(listenerFactory: SignalListenerFactory<Void?, *>?): Mono<Void?>?
Link copied to clipboard
fun then(): Mono<Void?>?
fun <V : Any?> then(other: Mono<V?>?): Mono<V?>?
Link copied to clipboard
fun <R> Mono<*>.thenDefer(defer: () -> Mono<R>): Mono<R>

Chains a deferred Mono creation after this Mono completes.

Link copied to clipboard
fun thenEmpty(other: Publisher<Void?>?): Mono<Void?>?
Link copied to clipboard
fun <V : Any?> thenMany(other: Publisher<V?>?): Flux<V?>?
Link copied to clipboard
fun <E : MessageExchange<*, M>, M : Message<*, *>, CommandId, NamedBoundedContext, AggregateIdCapable> Mono<Void>.thenNotifyAndForget(commandWaitNotifier: CommandWaitNotifier, processingStage: CommandStage, messageExchange: E): Mono<Void>

Extension function that wraps a Mono to automatically notify wait strategies on completion. This provides a convenient way to add wait notification behavior to any Mono operation in the command processing pipeline.

Link copied to clipboard
fun <V : Any?> thenReturn(value: V?): Mono<V?>?
Link copied to clipboard
fun Mono<*>.thenRunnable(runnable: () -> Unit): Mono<Void>

Executes a runnable after this Mono completes successfully.

Link copied to clipboard
fun <T> Mono<T>.throwNotFoundIfEmpty(errorMsg: String = ErrorCodes.NOT_FOUND_MESSAGE, cause: Throwable? = null): Mono<T>

Throws NotFoundResourceException if the Mono is empty.

Link copied to clipboard
fun timed(): Mono<Timed<Void?>?>?
fun timed(clock: Scheduler?): Mono<Timed<Void?>?>?
Link copied to clipboard
fun timeout(timeout: Duration?): Mono<Void?>?
fun <U : Any?> timeout(firstTimeout: Publisher<U?>?): Mono<Void?>?
fun timeout(timeout: Duration?, fallback: Mono<out Void?>?): Mono<Void?>?
fun timeout(timeout: Duration?, timer: Scheduler?): Mono<Void?>?
fun <U : Any?> timeout(firstTimeout: Publisher<U?>?, fallback: Mono<out Void?>?): Mono<Void?>?
fun timeout(timeout: Duration?, @Nullable fallback: Mono<out Void?>?, timer: Scheduler?): Mono<Void?>?
Link copied to clipboard
fun timestamp(): Mono<Tuple2<Long?, Void?>?>?
fun timestamp(scheduler: Scheduler?): Mono<Tuple2<Long?, Void?>?>?
Link copied to clipboard
fun <T> Mono<T>.toBlockable(scheduler: Scheduler = Schedulers.boundedElastic()): Mono<T>

Extension function that makes a Mono blockable by scheduling it on a separate thread if needed. If the current thread is non-blocking (reactive), it subscribes the Mono on the provided scheduler. If already on a blocking thread, returns the Mono unchanged.

Link copied to clipboard
Link copied to clipboard
fun <V : Any?> transform(transformer: Function<in Mono<Void?>?, out Publisher<V?>?>?): Mono<V?>?
Link copied to clipboard
fun <V : Any?> transformDeferred(transformer: Function<in Mono<Void?>?, out Publisher<V?>?>?): Mono<V?>?
Link copied to clipboard
fun <V : Any?> transformDeferredContextual(transformer: BiFunction<in Mono<Void?>?, in ContextView?, out Publisher<V?>?>?): Mono<V?>?
Link copied to clipboard
fun <T2 : Any?> zipWhen(rightGenerator: Function<Void?, Mono<out T2?>?>?): Mono<Tuple2<Void?, T2?>?>?
fun <T2 : Any?, O : Any?> zipWhen(rightGenerator: Function<Void?, Mono<out T2?>?>?, combinator: BiFunction<Void?, T2?, O?>?): Mono<O?>?
Link copied to clipboard
fun <T2 : Any?> zipWith(other: Mono<out T2?>?): Mono<Tuple2<Void?, T2?>?>?
fun <T2 : Any?, O : Any?> zipWith(other: Mono<out T2?>?, combinator: BiFunction<in Void?, in T2?, out O?>?): Mono<O?>?