数据流
本页追踪数据在 Wow 框架中流动的完整生命周期,从命令到达 Gateway 到投影、Saga 和快照完成更新的全过程。
高层管道
阶段一:命令到达与 Gateway 处理
旅程始于客户端通过 CommandGateway 发送命令。这可以通过 WebFlux 端点或直接调用 Gateway 来完成。
验证
DefaultCommandGateway 执行两个级别的验证:
自验证:如果命令体实现了
CommandValidator,首先调用其validate()方法。[wow-core/src/main/kotlin/me/ahoo/wow/command/DefaultCommandGateway.kt:62]Bean 验证:Jakarta
Validator检查所有约束注解(@NotNull、@Size等)。[wow-core/src/main/kotlin/me/ahoo/wow/command/DefaultCommandGateway.kt:66]
幂等性检查
发送前,Gateway 检查命令的 requestId 是否已在该聚合根上处理过。AggregateIdempotencyCheckerProvider 提供每个聚合的检查器。如果检测到重复,抛出 DuplicateRequestIdException。[wow-core/src/main/kotlin/me/ahoo/wow/command/DefaultCommandGateway.kt:77]
等待策略注册
如果提供了等待策略,Gateway:
- 将等待端点传播到命令消息头
- 通过
WaitStrategyRegistrar注册策略以进行信号路由 - 在完成(成功、错误或取消)时设置清理
[wow-core/src/main/kotlin/me/ahoo/wow/command/DefaultCommandGateway.kt:217]
阶段二:命令分发
命令总线将命令路由到相应的 AggregateProcessor。CommandDispatcher 订阅命令总线并为每个聚合创建分发器:
CommandDispatcher
CommandDispatcher 为所有本地注册的聚合订阅 CommandBus。它为每个聚合类型创建 AggregateCommandDispatcher,确保同一聚合 ID 的命令通过 AggregateScheduler 顺序处理。[wow-core/src/main/kotlin/me/ahoo/wow/modeling/command/dispatcher/CommandDispatcher.kt:34]
过滤器链
命令处理器使用过滤器链模式。链中的两个关键过滤器:
- AggregateProcessorFilter — 调用
AggregateProcessor.process()方法 - SendDomainEventStreamFilter — 将产生的
DomainEventStream发布到DomainEventBus
[wow-core/src/main/kotlin/me/ahoo/wow/modeling/command/dispatcher/SendDomainEventStreamFilter.kt:26]
阶段三:聚合处理
这是写端的核心。CommandAggregate 处理命令并产生领域事件。
预处理检查
SimpleCommandAggregate 在执行命令函数前执行多项验证检查:
版本冲突检查 — 如果命令携带预期的
aggregateVersion,必须与当前状态版本匹配。[wow-core/src/main/kotlin/me/ahoo/wow/modeling/command/SimpleCommandAggregate.kt:92]初始化检查 — 如果聚合根未初始化,非创建命令将被拒绝。[wow-core/src/main/kotlin/me/ahoo/wow/modeling/command/SimpleCommandAggregate.kt:99]
所有权检查 — 如果命令指定了
ownerId,必须与聚合根的所有者匹配。[wow-core/src/main/kotlin/me/ahoo/wow/modeling/command/SimpleCommandAggregate.kt:102]删除检查 — 如果聚合根处于已删除状态,除
RecoverAggregate外的命令将被拒绝。[wow-core/src/main/kotlin/me/ahoo/wow/modeling/command/SimpleCommandAggregate.kt:111]
CommandState 状态机
CommandState 枚举管理处理生命周期:
[wow-core/src/main/kotlin/me/ahoo/wow/modeling/command/CommandAggregate.kt:65]
状态上的 Event Sourcing
命令函数产生 DomainEventStream 后,事件通过 onSourcing() 应用到 StateAggregate。这在事件持久化之前更新内存状态。如果没有找到匹配的 Sourcing 方法,事件会被静默忽略(但版本号仍会更新)。[wow-core/src/main/kotlin/me/ahoo/wow/modeling/state/StateAggregate.kt:31]
事件持久化
事件通过 append() 持久化到 EventStore。此操作是原子性的,强制执行:
- 版本排序 — 事件版本必须等于
expectedNextVersion(当前版本 + 1) - 聚合 ID 唯一性 — 新聚合根的第一个事件必须使用唯一的聚合 ID
- 请求 ID 去重 — 防止同一命令产生两次事件
[wow-core/src/main/kotlin/me/ahoo/wow/eventsourcing/EventStore.kt:38]
阶段四:事件发布
事件流持久化后,SendDomainEventStreamFilter 将其发布到 DomainEventBus:
DomainEventBus 接口支持两种拓扑:
- LocalDomainEventBus — 单实例部署的进程内事件投递
- DistributedDomainEventBus — 通过 Kafka 实现跨进程投递的分布式部署
[wow-core/src/main/kotlin/me/ahoo/wow/event/DomainEventBus.kt:55]
阶段五:事件分发到处理器
DomainEventDispatcher 从总线接收事件并分发给已注册的处理器。它使用组合模式将事件流分发与状态事件分发分离:
CompositeEventDispatcher
CompositeEventDispatcher 管理两个并行子分发器:
- EventStreamDispatcher — 订阅
DomainEventBus,分发给具有FunctionKind.EVENT的处理器(投影和 Saga) - StateEventDispatcher — 订阅
StateEventBus,分发给具有FunctionKind.STATE_EVENT的处理器(快照策略)
两个子分发器都使用 AggregateSchedulerSupplier 确保每个聚合的排序保证。相同聚合 ID 的事件始终按顺序处理,即使跨越不同的处理器类型。[wow-core/src/main/kotlin/me/ahoo/wow/event/dispatcher/CompositeEventDispatcher.kt:96]
投影处理
投影接收领域事件并更新读模型。DefaultProjectionHandler 使用带有 LogResumeErrorHandler 的过滤器链实现容错 — 如果投影失败,错误会被记录,处理继续进行下一个事件。[wow-core/src/main/kotlin/me/ahoo/wow/projection/ProjectionHandler.kt:36]
Saga 处理
无状态 Saga 接收领域事件并可以产生新命令。DefaultStatelessSagaHandler 也使用过滤器链模式。Saga 在不维护自身状态的情况下协调跨聚合边界的长时间运行业务流程。[wow-core/src/main/kotlin/me/ahoo/wow/saga/stateless/StatelessSagaHandler.kt:36]
快照创建
快照策略评估状态事件并在满足条件时创建快照:
- SimpleSnapshotStrategy — 每个事件后创建快照
- VersionOffsetSnapshotStrategy — 按可配置的版本间隔创建快照
[wow-core/src/main/kotlin/me/ahoo/wow/eventsourcing/snapshot/SimpleSnapshotStrategy.kt:25]
阶段六:等待策略通知
命令处理完成后,等待策略在每个处理阶段接收信号:
等待阶段
WaitStrategy 支持在不同处理阶段等待:
| 阶段 | 含义 |
|---|---|
SENT | 命令已被 CommandBus 接受 |
PROCESSED | 命令已被聚合根执行,事件已持久化 |
PROJECTED | 投影已处理事件 |
SNAPSHOT | 快照已创建 |
WaitingForStage 工厂为每个阶段创建策略:
WaitingForStage.sent(commandId)— 等待命令发送WaitingForStage.processed(commandId)— 等待事件持久化WaitingForStage.snapshot(commandId)— 等待快照创建
[wow-core/src/main/kotlin/me/ahoo/wow/command/CommandGateway.kt:145]
信号路由
当下游处理器(投影、Saga、快照)完成时,它通过 CommandWaitNotifier 发送 WaitSignal。WaitStrategyRegistrar 根据 waitCommandId 将信号路由到正确的策略。[wow-core/src/main/kotlin/me/ahoo/wow/command/wait/WaitStrategy.kt:104]
聚合加载(读路径)
当需要为新命令加载聚合根时,框架重建其状态:
加载过程:
- 加载快照 — 如果聚合根存在快照,从该状态和版本开始
- 加载剩余事件 — 从
EventStore获取快照版本之后的所有事件 - 重放事件 — 通过
onSourcing()将每个事件流应用到StateAggregate
EventStore.load() 方法支持按版本范围或时间范围加载,默认从版本 1 加载所有事件。[wow-core/src/main/kotlin/me/ahoo/wow/eventsourcing/EventStore.kt:54]
错误处理
数据流在每个阶段都包含错误处理:
错误函数
SimpleCommandAggregate 支持每个命令类型的错误函数。如果为命令类型注册了错误函数,处理失败时会调用该函数,允许聚合根产生补偿事件。[wow-core/src/main/kotlin/me/ahoo/wow/modeling/command/SimpleCommandAggregate.kt:150]
投影/Saga 错误恢复
投影和 Saga 使用 LogResumeErrorHandler — 错误被记录但处理继续进行下一个事件。这确保失败的投影不会阻塞其他处理器。