ConcurrentManySink
class ConcurrentManySink<T : Any>(val delegate: Sinks.Many<T>) : Sinks.Many<T> , Decorator<Sinks.Many<T>>
线程安全的 Sinks.Many 装饰器
使用 ReentrantLock 保证多线程并发 emit 的串行化, 避免 Sinks.EmitResult. FAIL_NON_SERIALIZED 错误。
性能基准 (JDK 17, 传统线程)
单线程: ~10M ops/s
2线程: ~39M ops/s (实测)
4线程: ~50M ops/s (预估)
虚拟线程友好 (JDK 21+)
✅ 不会导致虚拟线程 pinning
✅ 虚拟线程可以正常卸载
✅ 性能随虚拟线程数线性扩展
vs subscribeOn(Schedulers.boundedElastic()):
传统线程性能提升: 157x (250K → 39M ops/s)
虚拟线程性能提升: 160x (250K → 40M ops/s)
设计选择
为什么选择 ReentrantLock 而非 synchronized?
| 维度 | synchronized | ReentrantLock |
|---|---|---|
| 传统线程性能 | 41M ops/s | 39M ops/s (-5%) |
| 虚拟线程性能 | 5M ops/s | 40M ops/s (+8x) |
| 虚拟线程 Pinning | ❌ 会 pinning | ✅ 不会 pinning |
| 未来兼容性 | ⚠️ JDK 21+ 不推荐 | ✅ 官方推荐 |
结论: 5% 的性能损失换取虚拟线程友好性是值得的。
Author
ahoo wang
Parameters
T
元素类型 (不可为 null)
delegate
被装饰的原始 Sink