返回顶部
e

event-store事件存储设计

Design and implement event stores for event-sourced systems. Use when building event sourcing infrastructure, implementing event persistence, projections, snapshotting, or CQRS patterns.

作者: admin | 来源: ClawHub
源自
ClawHub
版本
V 1.0.0
安全检测
已通过
821
下载量
免费
免费
0
收藏
概述
安装方式
版本历史

event-store

事件存储

为事件溯源应用设计事件存储的指南——涵盖事件模式、投影、快照和CQRS集成。

何时使用此技能

  • - 设计事件溯源基础设施
  • 选择事件存储技术
  • 实现自定义事件存储
  • 从事件流构建投影
  • 为聚合性能添加快照
  • 将CQRS与事件溯源集成

核心概念

事件存储架构

┌─────────────────────────────────────────────────────┐
│ 事件存储 │
├─────────────────────────────────────────────────────┤
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 流 1 │ │ 流 2 │ │ 流 3 │ │
│ │ (聚合) │ │ (聚合) │ │ (聚合) │ │
│ ├─────────────┤ ├─────────────┤ ├─────────────┤ │
│ │ 事件 1 │ │ 事件 1 │ │ 事件 1 │ │
│ │ 事件 2 │ │ 事件 2 │ │ 事件 2 │ │
│ │ 事件 3 │ │ ... │ │ 事件 3 │ │
│ │ ... │ │ │ │ 事件 4 │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
├─────────────────────────────────────────────────────┤
│ 全局位置: 1 → 2 → 3 → 4 → 5 → 6 → ... │
└─────────────────────────────────────────────────────┘

事件存储要求

要求描述
仅追加事件不可变,只能追加
有序
按流和全局排序 | | 版本化 | 乐观并发控制 | | 订阅 | 实时事件通知 | | 幂等 | 安全处理重复写入 |

技术对比

技术最佳用途局限性
EventStoreDB纯事件溯源单一用途
PostgreSQL
现有Postgres技术栈 | 需手动实现 | | Kafka | 高吞吐量流 | 不适合按流查询 | | DynamoDB | 无服务器、AWS原生 | 查询局限性 |

事件模式设计

事件是事实来源。设计良好的模式可确保长期可演化性。

事件信封结构

json
{
event_id: uuid,
stream_id: Order-abc123,
event_type: OrderPlaced,
version: 1,
schema_version: 1,
data: {
customer_id: cust-1,
total_cents: 5000
},
metadata: {
correlation_id: req-xyz,
causation_id: evt-prev,
user_id: user-1,
timestamp: 2025-01-15T10:30:00Z
},
global_position: 42
}

模式演化规则

  1. 1. 自由添加字段——新的可选字段始终安全
  2. 永不删除或重命名字段——改为引入新事件类型
  3. 对事件类型进行版本控制——当模式发生重大变化时使用OrderPlacedV2
  4. 读取时向上转换——在反序列化器中将旧版本转换为当前形态

PostgreSQL事件存储模式

sql
CREATE TABLE events (
id UUID PRIMARY KEY DEFAULT genrandomuuid(),
stream_id VARCHAR(255) NOT NULL,
stream_type VARCHAR(255) NOT NULL,
event_type VARCHAR(255) NOT NULL,
event_data JSONB NOT NULL,
metadata JSONB DEFAULT {},
version BIGINT NOT NULL,
global_position BIGSERIAL,
created_at TIMESTAMPTZ DEFAULT NOW(),
CONSTRAINT uniquestreamversion UNIQUE (stream_id, version)
);

CREATE INDEX idxeventsstream ON events(stream_id, version);
CREATE INDEX idxeventsglobal ON events(global_position);
CREATE INDEX idxeventstype ON events(event_type);

CREATE TABLE snapshots (
stream_id VARCHAR(255) PRIMARY KEY,
stream_type VARCHAR(255) NOT NULL,
snapshot_data JSONB NOT NULL,
version BIGINT NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW()
);

CREATE TABLE subscription_checkpoints (
subscription_id VARCHAR(255) PRIMARY KEY,
last_position BIGINT NOT NULL DEFAULT 0,
updated_at TIMESTAMPTZ DEFAULT NOW()
);

事件存储实现

python
@dataclass
class Event:
stream_id: str
event_type: str
data: dict
metadata: dict = field(default_factory=dict)
eventid: UUID = field(defaultfactory=uuid4)
version: int | None = None
global_position: int | None = None

class EventStore: # 基于上述PostgreSQL模式
def init(self, pool: asyncpg.Pool):
self.pool = pool

async def append(self, streamid: str, streamtype: str,
events: list[Event],
expected_version: int | None = None) -> list[Event]:
使用乐观并发控制追加事件。
async with self.pool.acquire() as conn:
async with conn.transaction():
if expected_version is not None:
current = await conn.fetchval(
SELECT MAX(version) FROM events
WHERE streamid = $1, streamid
) or 0
if current != expected_version:
raise ConcurrencyError(
f期望 {expected_version},实际 {current}
)

start = await conn.fetchval(
SELECT COALESCE(MAX(version), 0) + 1
FROM events WHERE streamid = $1, streamid
)
for i, evt in enumerate(events):
evt.version = start + i
row = await conn.fetchrow(
INSERT INTO events (id, streamid, streamtype,
eventtype, eventdata, metadata, version)
VALUES ($1,$2,$3,$4,$5,$6,$7)
RETURNING global_position,
evt.eventid, streamid, stream_type,
evt.event_type, json.dumps(evt.data),
json.dumps(evt.metadata), evt.version,
)
evt.globalposition = row[globalposition]
return events

async def readstream(self, streamid: str,
from_version: int = 0) -> list[Event]:
读取单个流的事件。
async with self.pool.acquire() as conn:
rows = await conn.fetch(
SELECT * FROM events WHERE stream_id = $1
AND version >= $2 ORDER BY version,
streamid, fromversion,
)
return [self.toevent(r) for r in rows]

async def readall(self, fromposition: int = 0,
limit: int = 1000) -> list[Event]:
读取全局事件流,用于投影/订阅。
async with self.pool.acquire() as conn:
rows = await conn.fetch(
SELECT * FROM events WHERE global_position > $1
ORDER BY global_position LIMIT $2,
from_position, limit,
)
return [self.toevent(r) for r in rows]

投影

投影通过重放事件构建读取优化的视图。它们是CQRS的Q端。

投影生命周期

  1. 1. 从检查点开始——从上次处理的全局位置恢复
  2. 应用事件——为每个相关事件类型更新读取模型
  3. 保存检查点——将新位置与读取模型原子化持久化

投影示例

python
class OrderSummaryProjection:
def init(self, db, event_store: EventStore):
self.db = db
self.store = event_store

async def run(self, batch_size: int = 100):
position = await self.loadcheckpoint()
while True:
events = await self.store.readall(position, batchsize)
if not events:
await asyncio.sleep(1)
continue
for evt in events:
await self._apply(evt)
position = evt.global_position
await self.savecheckpoint(position)

async def _apply(self, event: Event):
match event.event_type:
case OrderPlaced:
await self.db.execute(
INSERT INTO order_summaries (id, customer, total, status)
VALUES ($1,$

标签

skill ai

通过对话安装

该技能支持在以下平台通过对话安装:

OpenClaw WorkBuddy QClaw Kimi Claude

方式一:安装 SkillHub 和技能

帮我安装 SkillHub 和 backend-event-stores-1776419953 技能

方式二:设置 SkillHub 为优先技能安装源

设置 SkillHub 为我的优先技能安装源,然后帮我安装 backend-event-stores-1776419953 技能

通过命令行安装

skillhub install backend-event-stores-1776419953

下载

⬇ 下载 event-store v1.0.0(免费)

文件大小: 6.68 KB | 发布时间: 2026-4-17 20:09

v1.0.0 最新 2026-4-17 20:09
Initial release with comprehensive guidance on designing and implementing event stores for event-sourced systems.

- Covers core event store concepts, requirements, and architectural patterns
- Provides technology comparisons for EventStoreDB, PostgreSQL, Kafka, and DynamoDB
- Includes recommended event schema/envelope design and schema evolution strategies
- Supplies a sample PostgreSQL schema for events, snapshots, and subscription checkpoints
- Offers code for a basic async Python event store (append, read_stream, read_all)
- Explains projection patterns and their role in building CQRS read models

Archiver·手机版·闲社网·闲社论坛·羊毛社区· 多链控股集团有限公司 · 苏ICP备2025199260号-1

Powered by Discuz! X5.0   © 2024-2025 闲社网·线报更新论坛·羊毛分享社区·http://xianshe.com

p2p_official_large
返回顶部