Design and implement event stores for event-sourced systems. Use when building event sourcing infrastructure, implementing event persistence, projections, snapshotting, or CQRS patterns.
为事件溯源应用设计事件存储的指南——涵盖事件模式、投影、快照和CQRS集成。
┌─────────────────────────────────────────────────────┐
│ 事件存储 │
├─────────────────────────────────────────────────────┤
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 流 1 │ │ 流 2 │ │ 流 3 │ │
│ │ (聚合) │ │ (聚合) │ │ (聚合) │ │
│ ├─────────────┤ ├─────────────┤ ├─────────────┤ │
│ │ 事件 1 │ │ 事件 1 │ │ 事件 1 │ │
│ │ 事件 2 │ │ 事件 2 │ │ 事件 2 │ │
│ │ 事件 3 │ │ ... │ │ 事件 3 │ │
│ │ ... │ │ │ │ 事件 4 │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
├─────────────────────────────────────────────────────┤
│ 全局位置: 1 → 2 → 3 → 4 → 5 → 6 → ... │
└─────────────────────────────────────────────────────┘
| 要求 | 描述 |
|---|---|
| 仅追加 | 事件不可变,只能追加 |
| 有序 |
| 技术 | 最佳用途 | 局限性 |
|---|---|---|
| EventStoreDB | 纯事件溯源 | 单一用途 |
| PostgreSQL |
事件是事实来源。设计良好的模式可确保长期可演化性。
json
{
event_id: uuid,
stream_id: Order-abc123,
event_type: OrderPlaced,
version: 1,
schema_version: 1,
data: {
customer_id: cust-1,
total_cents: 5000
},
metadata: {
correlation_id: req-xyz,
causation_id: evt-prev,
user_id: user-1,
timestamp: 2025-01-15T10:30:00Z
},
global_position: 42
}
sql
CREATE TABLE events (
id UUID PRIMARY KEY DEFAULT genrandomuuid(),
stream_id VARCHAR(255) NOT NULL,
stream_type VARCHAR(255) NOT NULL,
event_type VARCHAR(255) NOT NULL,
event_data JSONB NOT NULL,
metadata JSONB DEFAULT {},
version BIGINT NOT NULL,
global_position BIGSERIAL,
created_at TIMESTAMPTZ DEFAULT NOW(),
CONSTRAINT uniquestreamversion UNIQUE (stream_id, version)
);
CREATE INDEX idxeventsstream ON events(stream_id, version);
CREATE INDEX idxeventsglobal ON events(global_position);
CREATE INDEX idxeventstype ON events(event_type);
CREATE TABLE snapshots (
stream_id VARCHAR(255) PRIMARY KEY,
stream_type VARCHAR(255) NOT NULL,
snapshot_data JSONB NOT NULL,
version BIGINT NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE TABLE subscription_checkpoints (
subscription_id VARCHAR(255) PRIMARY KEY,
last_position BIGINT NOT NULL DEFAULT 0,
updated_at TIMESTAMPTZ DEFAULT NOW()
);
python
@dataclass
class Event:
stream_id: str
event_type: str
data: dict
metadata: dict = field(default_factory=dict)
eventid: UUID = field(defaultfactory=uuid4)
version: int | None = None
global_position: int | None = None
class EventStore: # 基于上述PostgreSQL模式
def init(self, pool: asyncpg.Pool):
self.pool = pool
async def append(self, streamid: str, streamtype: str,
events: list[Event],
expected_version: int | None = None) -> list[Event]:
使用乐观并发控制追加事件。
async with self.pool.acquire() as conn:
async with conn.transaction():
if expected_version is not None:
current = await conn.fetchval(
SELECT MAX(version) FROM events
WHERE streamid = $1, streamid
) or 0
if current != expected_version:
raise ConcurrencyError(
f期望 {expected_version},实际 {current}
)
start = await conn.fetchval(
SELECT COALESCE(MAX(version), 0) + 1
FROM events WHERE streamid = $1, streamid
)
for i, evt in enumerate(events):
evt.version = start + i
row = await conn.fetchrow(
INSERT INTO events (id, streamid, streamtype,
eventtype, eventdata, metadata, version)
VALUES ($1,$2,$3,$4,$5,$6,$7)
RETURNING global_position,
evt.eventid, streamid, stream_type,
evt.event_type, json.dumps(evt.data),
json.dumps(evt.metadata), evt.version,
)
evt.globalposition = row[globalposition]
return events
async def readstream(self, streamid: str,
from_version: int = 0) -> list[Event]:
读取单个流的事件。
async with self.pool.acquire() as conn:
rows = await conn.fetch(
SELECT * FROM events WHERE stream_id = $1
AND version >= $2 ORDER BY version,
streamid, fromversion,
)
return [self.toevent(r) for r in rows]
async def readall(self, fromposition: int = 0,
limit: int = 1000) -> list[Event]:
读取全局事件流,用于投影/订阅。
async with self.pool.acquire() as conn:
rows = await conn.fetch(
SELECT * FROM events WHERE global_position > $1
ORDER BY global_position LIMIT $2,
from_position, limit,
)
return [self.toevent(r) for r in rows]
投影通过重放事件构建读取优化的视图。它们是CQRS的Q端。
python
class OrderSummaryProjection:
def init(self, db, event_store: EventStore):
self.db = db
self.store = event_store
async def run(self, batch_size: int = 100):
position = await self.loadcheckpoint()
while True:
events = await self.store.readall(position, batchsize)
if not events:
await asyncio.sleep(1)
continue
for evt in events:
await self._apply(evt)
position = evt.global_position
await self.savecheckpoint(position)
async def _apply(self, event: Event):
match event.event_type:
case OrderPlaced:
await self.db.execute(
INSERT INTO order_summaries (id, customer, total, status)
VALUES ($1,$
该技能支持在以下平台通过对话安装:
帮我安装 SkillHub 和 backend-event-stores-1776419953 技能
设置 SkillHub 为我的优先技能安装源,然后帮我安装 backend-event-stores-1776419953 技能
skillhub install backend-event-stores-1776419953
文件大小: 6.68 KB | 发布时间: 2026-4-17 20:09