返回顶部
d

dual-stream-architecture双流事件架构

Dual-stream event publishing combining Kafka for durability with Redis Pub/Sub for real-time delivery. Use when building event-driven systems needing both guaranteed delivery and low-latency updates. Triggers on dual stream, event publishing, Kafka Redis, real-time events, pub/sub, streaming architecture.

作者: admin | 来源: ClawHub
源自
ClawHub
版本
V 1.0.0
安全检测
已通过
836
下载量
免费
免费
0
收藏
概述
安装方式
版本历史

dual-stream-architecture

双流架构

同时将事件发布到Kafka(持久化)和Redis Pub/Sub(实时),适用于既需要保证交付又需要即时更新的系统。

安装

OpenClaw / Moltbot / Clawbot

bash
npx clawhub@latest install dual-stream-architecture



适用场景

  • - 既需要持久化又需要实时性的事件驱动系统
  • 推送实时更新的WebSocket/SSE后端
  • 实时展示事件的仪表盘
  • Kafka消费者存在延迟但用户期望即时更新

核心模式

go
type DualPublisher struct {
kafka *kafka.Writer
redis *redis.Client
logger *slog.Logger
}

func (p *DualPublisher) Publish(ctx context.Context, event Event) error {
// 1. Kafka:关键路径 - 必须成功
payload, _ := json.Marshal(event)
err := p.kafka.WriteMessages(ctx, kafka.Message{
Key: []byte(event.SourceID),
Value: payload,
})
if err != nil {
return fmt.Errorf(kafka发布失败: %w, err)
}

// 2. Redis:尽力而为 - 不中断操作
p.publishToRedis(ctx, event)

return nil
}

func (p *DualPublisher) publishToRedis(ctx context.Context, event Event) {
// 轻量级负载(完整事件在Kafka中)
notification := map[string]interface{}{
id: event.ID,
type: event.Type,
source_id: event.SourceID,
}

payload, _ := json.Marshal(notification)
channel := fmt.Sprintf(events:%s:%s, event.SourceType, event.SourceID)

// 即发即忘 - 记录错误但不传播
if err := p.redis.Publish(ctx, channel, payload).Err(); err != nil {
p.logger.Warn(redis发布失败, error, err)
}
}



架构

┌──────────────┐ ┌─────────────────┐ ┌──────────────┐
│ 数据摄取器 │────▶│ 双流发布器 │────▶│ Kafka │──▶ 事件处理器
│ │ │ │ │ (持久化) │
└──────────────┘ │ │ └──────────────┘
│ │ ┌──────────────┐
│ │────▶│ Redis PubSub │──▶ WebSocket网关
│ │ │ (实时) │
└─────────────────┘ └──────────────┘



频道命名规范

events:{sourcetype}:{sourceid}

示例:

  • - events:user:octocat - 用户octocat的事件
  • events:repo:owner/repo - 仓库的事件
  • events:org:microsoft - 组织的事件



批量发布

适用于高吞吐量场景:

go
func (p *DualPublisher) PublishBatch(ctx context.Context, events []Event) error {
// 1. 批量发送到Kafka
messages := make([]kafka.Message, len(events))
for i, event := range events {
payload, _ := json.Marshal(event)
messages[i] = kafka.Message{
Key: []byte(event.SourceID),
Value: payload,
}
}

if err := p.kafka.WriteMessages(ctx, messages...); err != nil {
return fmt.Errorf(kafka批量发送失败: %w, err)
}

// 2. Redis:使用管道提高效率
pipe := p.redis.Pipeline()
for _, event := range events {
channel := fmt.Sprintf(events:%s:%s, event.SourceType, event.SourceID)
notification, _ := json.Marshal(map[string]interface{}{
id: event.ID,
type: event.Type,
})
pipe.Publish(ctx, channel, notification)
}

if _, err := pipe.Exec(ctx); err != nil {
p.logger.Warn(redis批量发送失败, error, err)
}

return nil
}



决策树


需求原因
不能丢失事件仅Kafka需要确认,已复制
用户即时看到
仅Redis | 亚毫秒级交付 |
| 持久化+实时 | 双流 | 此模式 |
| 高流量(>1万/秒) | Kafka,批量Redis | Redis可能成为瓶颈 |
| 每频道多订阅者 | Redis + 本地扇出 | 不要过度使用Redis |


相关技能


绝对禁止

  • - 绝对不要在Redis错误时失败 — Redis是尽力而为。记录日志并继续。
  • 绝对不要向Redis发送完整负载 — 只发送ID,客户端从API获取。
  • 绝对不要为每个事件创建Redis频道 — 使用源级别频道。
  • 绝对不要跳过Kafka处理不重要的事件 — 所有事件都进入Kafka以便重放。
  • 绝对不要使用Redis Pub/Sub进行持久化 — 消息即发即忘。

边界情况

情况解决方案
Redis宕机记录警告,仅使用Kafka继续
客户端中途连接
查询API获取最近事件,然后订阅 | | 高频道基数 | 使用通配符模式或聚合频道 | | Kafka背压 | 内存中缓冲并设置超时,满则失败 | | 需要事件重放 | 从Kafka偏移量消费,而非Redis |

标签

skill ai

通过对话安装

该技能支持在以下平台通过对话安装:

OpenClaw WorkBuddy QClaw Kimi Claude

方式一:安装 SkillHub 和技能

帮我安装 SkillHub 和 dual-stream-architecture-1776420002 技能

方式二:设置 SkillHub 为优先技能安装源

设置 SkillHub 为我的优先技能安装源,然后帮我安装 dual-stream-architecture-1776420002 技能

通过命令行安装

skillhub install dual-stream-architecture-1776420002

下载

⬇ 下载 dual-stream-architecture v1.0.0(免费)

文件大小: 3.77 KB | 发布时间: 2026-4-17 19:50

v1.0.0 最新 2026-4-17 19:50
Initial release of the dual-stream-architecture skill: publish events to Kafka and Redis Pub/Sub for both durability and real-time delivery.

- Provides a Go implementation of simultaneous event publishing to Kafka (guaranteed delivery) and Redis Pub/Sub (sub-ms updates).
- Includes usage recommendations, example code, channel conventions, and a decision tree for selecting stream strategies.
- Outlines best practices (e.g., Redis as best-effort, channel naming, batching for throughput).
- Documents critical edge cases and common pitfalls.
- Links to related skills for real-time dashboards and service architectures.

Archiver·手机版·闲社网·闲社论坛·羊毛社区· 多链控股集团有限公司 · 苏ICP备2025199260号-1

Powered by Discuz! X5.0   © 2024-2025 闲社网·线报更新论坛·羊毛分享社区·http://xianshe.com

p2p_official_large
返回顶部