ConcurrentManySink

class ConcurrentManySink<T : Any>(val delegate: Sinks.Many<T>) : Sinks.Many<T> , Decorator<Sinks.Many<T>>

线程安全的 Sinks.Many 装饰器

使用 ReentrantLock 保证多线程并发 emit 的串行化, 避免 Sinks.EmitResult. FAIL_NON_SERIALIZED 错误。

性能基准 (JDK 17, 传统线程)

  • 单线程: ~10M ops/s

  • 2线程: ~39M ops/s (实测)

  • 4线程: ~50M ops/s (预估)

虚拟线程友好 (JDK 21+)

  • ✅ 不会导致虚拟线程 pinning

  • ✅ 虚拟线程可以正常卸载

  • ✅ 性能随虚拟线程数线性扩展

vs subscribeOn(Schedulers.boundedElastic()):

  • 传统线程性能提升: 157x (250K → 39M ops/s)

  • 虚拟线程性能提升: 160x (250K → 40M ops/s)

设计选择

为什么选择 ReentrantLock 而非 synchronized?

维度synchronizedReentrantLock
传统线程性能41M ops/s39M ops/s (-5%)
虚拟线程性能5M ops/s40M ops/s (+8x)
虚拟线程 Pinning❌ 会 pinning✅ 不会 pinning
未来兼容性⚠️ JDK 21+ 不推荐✅ 官方推荐

结论: 5% 的性能损失换取虚拟线程友好性是值得的。

Author

ahoo wang

Parameters

T

元素类型 (不可为 null)

delegate

被装饰的原始 Sink

See also

Constructors

Link copied to clipboard
constructor(delegate: Sinks.Many<T>)

Properties

Link copied to clipboard
val Scannable.cancelled: Boolean
Link copied to clipboard
open override val delegate: Sinks.Many<T>
Link copied to clipboard
Link copied to clipboard
val Scannable.terminated: Boolean

Functions

Link copied to clipboard
open fun actuals(): Stream<out Scannable?>?
Link copied to clipboard
open override fun asFlux(): Flux<T>
Link copied to clipboard
fun <T : Any> Sinks.Many<T>.concurrent(): ConcurrentManySink<T>
Link copied to clipboard
open override fun currentSubscriberCount(): Int
Link copied to clipboard
open override fun emitComplete(failureHandler: Sinks.EmitFailureHandler)
Link copied to clipboard
open override fun emitError(error: Throwable, failureHandler: Sinks.EmitFailureHandler)
Link copied to clipboard
open override fun emitNext(t: T, failureHandler: Sinks.EmitFailureHandler)
Link copied to clipboard
open fun inners(): Stream<out Scannable?>?
Link copied to clipboard
open fun name(): String?
Link copied to clipboard
open fun parents(): Stream<out Scannable?>?
Link copied to clipboard
@Nullable
open fun <T : Any?> scan(key: Scannable.Attr<T?>?): T?
Link copied to clipboard
open fun <T : Any?> scanOrDefault(key: Scannable.Attr<T?>?, defaultValue: T?): T?
Link copied to clipboard
open override fun scanUnsafe(key: Scannable.Attr<*>): Any?
Link copied to clipboard
open fun stepName(): String?
Link copied to clipboard
open fun steps(): Stream<String?>?
Link copied to clipboard
open fun tags(): Stream<Tuple2<String?, String?>?>?
Link copied to clipboard
Link copied to clipboard
open override fun tryEmitComplete(): Sinks.EmitResult
Link copied to clipboard
open override fun tryEmitError(error: Throwable): Sinks.EmitResult
Link copied to clipboard
open override fun tryEmitNext(t: T): Sinks.EmitResult