事件存储
事件存储是事件溯源架构的持久化基石。与传统的 CRUD 数据库覆盖状态并丢弃历史不同,事件存储充当每个领域事件的不可变、仅追加的账本。每一次状态变更 -- OrderCreated、ItemAdded、PaymentProcessed -- 都被记录且永远不能被修改或删除。
事件溯源
在传统架构中,数据库只存储当前状态,历史变更记录往往会丢失。而在事件溯源架构中:
- 完整历史:每一次状态变更都作为事件永久存储
- 可追溯性:通过重放事件可以重建任意时间点的状态
- 审计友好:天然支持操作审计和数据分析
- 解耦消费者:投影、Saga 和外部系统独立订阅同一事件流
核心接口
EventStore 接口定义了事件存储的核心操作:
interface EventStore {
fun append(eventStream: DomainEventStream): Mono<Void>
fun load(
aggregateId: AggregateId,
headVersion: Int = 1,
tailVersion: Int = Int.MAX_VALUE - 1
): Flux<DomainEventStream>
fun load(
aggregateId: AggregateId,
headEventTime: Long,
tailEventTime: Long
): Flux<DomainEventStream>
}领域事件流
DomainEventStream 表示单个命令产生的领域事件集合:
interface DomainEventStream : EventMessage<DomainEventStream, List<DomainEvent<*>>> {
val aggregateId: AggregateId
val size: Int
}关键特性:
- 一对一:一个命令产生一个事件流
- 原子性:流中的所有事件作为单个单元持久化
- 不可变性:事件一旦创建就不能被修改
核心概念
| 概念 | 描述 | 源码 |
|---|---|---|
DomainEvent | 关于聚合内过去业务行为的不可变事实 | DomainEvent.kt:52-95 |
DomainEventStream | 单个命令产生的有序领域事件批次 | DomainEventStream.kt:51-125 |
EventStore | 追加和加载事件流的核心接口 | EventStore.kt:27-98 |
SnapshotRepository | 通过带版本的快照检查点优化聚合加载 | SnapshotRepository.kt:27-58 |
聚合状态重建
框架不在传统数据库中存储当前的聚合状态。相反,每个聚合的状态是其事件历史的函数。
EventSourcingStateAggregateRepository 实现了这种重建机制:
- 快照优先加载:在请求最新版本时,仓库首先从快照仓库加载。如果存在快照,它将作为增量重放的起点。
- 全新聚合创建:如果不存在快照,通过
StateAggregateFactory创建新的聚合实例。 - 事件应用:事件按版本顺序重放,每次调用
stateAggregate.onSourcing(it)来变更内存中的状态。
事件溯源生命周期
下图展示了从命令接收、事件持久化、总线发布到下游处理的完整生命周期:
架构
框架定义了清晰的接口层次结构,支持多种持久化后端。每个实现都扩展了 AbstractEventStore,后者提供集中的日志记录、输入验证和错误映射。
AbstractEventStore 应用模板方法模式来集中处理横切关注点:
append()(公开、具体):记录操作日志,委托给appendStream(),并升级版本冲突异常。load()(公开、具体):验证版本/时间范围,然后委托给loadStream()。appendStream()/loadStream()(受保护、抽象):每个后端实现存储特定的逻辑。
异常处理
事件存储定义了层次化的类型异常:
| 异常类型 | 描述 | 行为 |
|---|---|---|
EventVersionConflictException | 并发写入导致的版本冲突 | 实现 RecoverableException -- 可安全重试 |
DuplicateAggregateIdException | 尝试创建已存在的聚合 | 致命 -- 表示 ID 冲突 |
DuplicateRequestIdException | 相同命令已被处理 | 幂等 -- 成功情况,不是错误 |
实现对比
| 特性 | MongoDB | Redis | R2DBC | 内存 |
|---|---|---|---|---|
| 持久性 | 持久(磁盘) | 可配置 | 持久(SQL) | 易失(内存) |
| 版本范围查询 | 是 | 是 (ZRANGEBYSCORE) | 是 (SQL BETWEEN) | 是 (内存) |
| 时间范围查询 | 是 | 否 | 是 (SQL BETWEEN) | 是 (内存) |
| 并发控制 | 唯一复合索引 | Lua 脚本(原子) | 唯一 SQL 索引 | 同步映射 |
| 分片支持 | 分片集合 | Redis 集群 | ShardingEventStreamSchema | 不适用 |
| 生产就绪 | 高 | 中 | 高 | 仅开发/测试 |
| 关键类 | MongoEventStore.kt | RedisEventStore.kt | R2dbcEventStore.kt | InMemoryEventStore.kt |
每种实现的存储模式
MongoDB 为每种聚合类型使用独立的集合。集合名称由聚合的上下文名称和聚合名称派生(例如 order_event_stream)。文档使用唯一复合索引 (aggregate_id, version) 和 (aggregate_id, request_id) 进行索引 (EventStreamSchemaInitializer.kt:51-69)。
Redis 将事件流存储在按聚合 ID 键的有序集合中。每个成员是 JSON 序列化的 DomainEventStream,按版本号评分。追加操作使用 Lua 脚本实现原子性 -- 在单个事务中检查版本冲突和重复请求 ID (RedisEventStore.kt:44-65)。不支持时间范围加载。
R2DBC 为每种聚合类型使用关系表(<aggregateName>_event_stream)。(aggregate_id, version) 和 request_id 上的唯一索引强制执行相同的不变量。ShardingEventStreamSchema 变体支持表分片以实现水平扩展部署 (EventStreamSchema.kt:47-53)。
配置
wow:
eventsourcing:
store:
storage: mongo # 事件存储类型 (mongo, r2dbc, redis, in_memory)
snapshot:
enabled: true
strategy: version_offset # all, version_offset
version-offset: 10
storage: mongo| 属性 | 类型 | 默认值 | 描述 |
|---|---|---|---|
wow.eventsourcing.store.storage | StorageType | mongo | 事件存储后端 |
wow.eventsourcing.snapshot.enabled | Boolean | true | 启用快照机制 |
wow.eventsourcing.snapshot.strategy | Strategy | all | 快照策略 (all, version_offset) |
wow.eventsourcing.snapshot.version-offset | Int | 5 | 版本间隔阈值 |
wow.eventsourcing.snapshot.storage | StorageType | mongo | 快照存储后端 |
最佳实践
选择合适的后端:MongoDB 和 R2DBC 推荐用于生产环境。MongoDB 适合模式灵活和水平扩展。R2DBC 适合组织已有关系数据库的场景。Redis 适合高吞吐量、数据量较低的场景。
为长期聚合启用快照:将
strategy设置为version_offset,偏移量设为 5-20,以避免拥有大量事件的聚合出现线性性能下降。监控版本冲突:偶尔出现
EventVersionConflictException是正常的。高频出现则表明存在竞争 -- 考虑重新设计聚合边界。利用请求幂等性:
requestId字段保证重试命令不会产生重复事件 -- 对于至少一次投递至关重要。保持事件不可变且声明式:事件应代表简单事实,而非条件逻辑。聚合的溯源函数只是将事件叠加到状态上。
仅在测试中使用内存存储:
InMemoryEventStore是线程安全的但具有易失性。请勿部署到生产环境。