Dual-stream event publishing combining Kafka for durability with Redis Pub/Sub for real-time delivery. Use when building event-driven systems needing both guaranteed delivery and low-latency updates. Triggers on dual stream, event publishing, Kafka Redis, real-time events, pub/sub, streaming architecture.
同时将事件发布到Kafka(持久化)和Redis Pub/Sub(实时),适用于既需要保证交付又需要即时更新的系统。
bash
npx clawhub@latest install dual-stream-architecture
go
type DualPublisher struct {
kafka *kafka.Writer
redis *redis.Client
logger *slog.Logger
}
func (p *DualPublisher) Publish(ctx context.Context, event Event) error {
// 1. Kafka:关键路径 - 必须成功
payload, _ := json.Marshal(event)
err := p.kafka.WriteMessages(ctx, kafka.Message{
Key: []byte(event.SourceID),
Value: payload,
})
if err != nil {
return fmt.Errorf(kafka发布失败: %w, err)
}
// 2. Redis:尽力而为 - 不中断操作
p.publishToRedis(ctx, event)
return nil
}
func (p *DualPublisher) publishToRedis(ctx context.Context, event Event) {
// 轻量级负载(完整事件在Kafka中)
notification := map[string]interface{}{
id: event.ID,
type: event.Type,
source_id: event.SourceID,
}
payload, _ := json.Marshal(notification)
channel := fmt.Sprintf(events:%s:%s, event.SourceType, event.SourceID)
// 即发即忘 - 记录错误但不传播
if err := p.redis.Publish(ctx, channel, payload).Err(); err != nil {
p.logger.Warn(redis发布失败, error, err)
}
}
┌──────────────┐ ┌─────────────────┐ ┌──────────────┐
│ 数据摄取器 │────▶│ 双流发布器 │────▶│ Kafka │──▶ 事件处理器
│ │ │ │ │ (持久化) │
└──────────────┘ │ │ └──────────────┘
│ │ ┌──────────────┐
│ │────▶│ Redis PubSub │──▶ WebSocket网关
│ │ │ (实时) │
└─────────────────┘ └──────────────┘
events:{sourcetype}:{sourceid}
示例:
适用于高吞吐量场景:
go
func (p *DualPublisher) PublishBatch(ctx context.Context, events []Event) error {
// 1. 批量发送到Kafka
messages := make([]kafka.Message, len(events))
for i, event := range events {
payload, _ := json.Marshal(event)
messages[i] = kafka.Message{
Key: []byte(event.SourceID),
Value: payload,
}
}
if err := p.kafka.WriteMessages(ctx, messages...); err != nil {
return fmt.Errorf(kafka批量发送失败: %w, err)
}
// 2. Redis:使用管道提高效率
pipe := p.redis.Pipeline()
for _, event := range events {
channel := fmt.Sprintf(events:%s:%s, event.SourceType, event.SourceID)
notification, _ := json.Marshal(map[string]interface{}{
id: event.ID,
type: event.Type,
})
pipe.Publish(ctx, channel, notification)
}
if _, err := pipe.Exec(ctx); err != nil {
p.logger.Warn(redis批量发送失败, error, err)
}
return nil
}
| 需求 | 流 | 原因 |
|---|---|---|
| 不能丢失事件 | 仅Kafka | 需要确认,已复制 |
| 用户即时看到 |
| 情况 | 解决方案 |
|---|---|
| Redis宕机 | 记录警告,仅使用Kafka继续 |
| 客户端中途连接 |
该技能支持在以下平台通过对话安装:
帮我安装 SkillHub 和 dual-stream-architecture-1776420002 技能
设置 SkillHub 为我的优先技能安装源,然后帮我安装 dual-stream-architecture-1776420002 技能
skillhub install dual-stream-architecture-1776420002
文件大小: 3.77 KB | 发布时间: 2026-4-17 19:50