Data Hub Skill
多 Agent 数字货币量化交易系统的内存数据共享中枢。
When to Use
- - Agent 需要写入或读取行情、指标、情报、风控状态
- 编排器(Orchestrator)推送最新行情和技术指标
- 分析官(Analyst)发布研报情报
- 风控卫士(Guard)更新全局风控状态
- 任意 Agent 调用
get_summary() 获取全局数据快照
When NOT to Use
- - 需要持久化历史数据 → 使用数据库
- 直接执行交易下单 → 使用交易 Skill
- 获取实时行情源数据 → 使用行情 API Skill
依赖
CODEBLOCK0
数据模型
所有写入数据必须通过 Pydantic 校验,防止 LLM 幻觉导致脏数据。
CODEBLOCK1
内存命名空间
INLINECODE1 采用三级树状结构:category → key → value
| 命名空间 | Key | Value 类型 | 写入权限 (Agent_ID) | 维护策略 |
|---|
| INLINECODE3 | symbol (如 BTC_USDT) | dict (MarketDataModel) | INLINECODE5 | 覆盖式更新 |
| INLINECODE6 |
symbol |
Dict[str, List[float]] |
Default_Orchestrator | 滑动窗口 (上限 50) |
|
intelligence | symbol | dict (IntelligenceModel) |
Analyst_Officer | TTL 自动过期清除 |
|
risk_audit |
"global_state" | dict (RiskAuditModel) |
Guard_Agent | 覆盖式更新 + 持久化快照 |
核心 API
push_data — 写入数据
CODEBLOCK2
调用示例:
CODEBLOCK3
get_summary — 读取全局快照(触发懒惰清理)
CODEBLOCK4
规则
1. 写入权限隔离
每个命名空间只允许指定的 Agent 写入。错误的 agent_id 会被拒绝并返回错误信息,不会抛出异常。
2. 异步锁约束
所有 self._memory 读写必须在 async with self._lock: 内执行。锁内禁止任何网络 IO,只允许纯 CPU 的字典级读写,防止死锁。
3. LLM 友好型报错
绝不抛出 Exception 中断进程。捕获 ValidationError 后返回结构化英文错误信息,引导 LLM 自我纠错:
CODEBLOCK5
4. 懒惰清理(Lazy Janitor)
不后台轮询,在 get_summary() 时按需清理:
- - 行情陈旧检测:
now() - timestamp > 10秒 → 标记 INLINECODE19 - 情报过期检测:
now() - created_at > ttl_seconds → 内容替换为 INLINECODE21
5. 滑动窗口
INLINECODE22 命名空间对每个指标键维护 FIFO 队列,上限 50 条。超出时丢弃最早的数据。
Data Hub Skill
多 Agent 数字货币量化交易系统的内存数据共享中枢。
何时使用
- - Agent 需要写入或读取行情、指标、情报、风控状态
- 编排器(Orchestrator)推送最新行情和技术指标
- 分析官(Analyst)发布研报情报
- 风控卫士(Guard)更新全局风控状态
- 任意 Agent 调用 get_summary() 获取全局数据快照
何时不使用
- - 需要持久化历史数据 → 使用数据库
- 直接执行交易下单 → 使用交易 Skill
- 获取实时行情源数据 → 使用行情 API Skill
依赖
python
import time, json, asyncio
from typing import Dict, List, Any, Optional
from pydantic import BaseModel, Field, ValidationError
数据模型
所有写入数据必须通过 Pydantic 校验,防止 LLM 幻觉导致脏数据。
python
行情数据
class MarketDataModel(BaseModel):
last_price: float = Field(..., gt=0, description=最新成交价)
volume_24h: float = Field(default=0.0, ge=0)
timestamp: float = Field(default_factory=time.time)
情报数据(带 TTL)
class IntelligenceModel(BaseModel):
author: str = Field(..., description=研报生成者ID)
content: str = Field(..., description=研报正文内容)
ttl_seconds: int = Field(default=1800, description=数据有效时长(秒),默认30分钟)
created
at: float = Field(defaultfactory=time.time)
风控状态
class RiskAuditModel(BaseModel):
global_lock: bool = Field(default=False, description=是否触发全局爆仓保护,若为True则拦截开仓)
max
positionallowance: float = Field(..., ge=0, description=当前允许的最大下单量(U或币本位))
current_drawdown: float = Field(default=0.0, description=当前账户回撤比例)
指标数据:Dict[str, List[float]],通过代码逻辑限制滑动窗口上限 50
内存命名空间
self._memory 采用三级树状结构:category → key → value
| 命名空间 | Key | Value 类型 | 写入权限 (AgentID) | 维护策略 |
|---|
| marketstate | symbol (如 BTCUSDT) | dict (MarketDataModel) | DefaultOrchestrator | 覆盖式更新 |
| indicators |
symbol | Dict[str, List[float]] | Default_Orchestrator | 滑动窗口 (上限 50) |
| intelligence | symbol | dict (IntelligenceModel) | Analyst_Officer | TTL 自动过期清除 |
| risk
audit | globalstate | dict (RiskAuditModel) | Guard_Agent | 覆盖式更新 + 持久化快照 |
核心 API
push_data — 写入数据
python
async def pushdata(self, agentid: str, category: str, key: str, data: dict) -> dict:
调用示例:
python
编排器推送行情
await hub.push
data(DefaultOrchestrator, market
state, BTCUSDT, {
last_price: 65000.5,
volume_24h: 1234567.89
})
编排器推送指标
await hub.push
data(DefaultOrchestrator, indicators, BTC_USDT, {
rsi: [45.2, 48.1, 52.3],
ma20: [64800.0, 64950.0, 65100.0]
})
分析官发布研报
await hub.push
data(AnalystOfficer, intelligence, BTC_USDT, {
author: Analyst_Officer,
content: BTC 短期看涨,建议轻仓做多,
ttl_seconds: 1800
})
风控卫士更新状态
await hub.push
data(GuardAgent, risk
audit, globalstate, {
global_lock: False,
max
positionallowance: 10000.0,
current_drawdown: 0.05
})
get_summary — 读取全局快照(触发懒惰清理)
python
await hub.get_summary()
规则
1. 写入权限隔离
每个命名空间只允许指定的 Agent 写入。错误的 agent_id 会被拒绝并返回错误信息,不会抛出异常。
2. 异步锁约束
所有 self.memory 读写必须在 async with self.lock: 内执行。锁内禁止任何网络 IO,只允许纯 CPU 的字典级读写,防止死锁。
3. LLM 友好型报错
绝不抛出 Exception 中断进程。捕获 ValidationError 后返回结构化英文错误信息,引导 LLM 自我纠错:
[VALIDATIONERROR] Expected float for lastprice, got string. Please fix and retry.
[PERMISSIONDENIED] Agent AnalystOfficer cannot write to marketstate. Only DefaultOrchestrator is authorized.
4. 懒惰清理(Lazy Janitor)
不后台轮询,在 get_summary() 时按需清理:
- - 行情陈旧检测:now() - timestamp > 10秒 → 标记 isstale = True
- 情报过期检测:now() - createdat > ttl_seconds → 内容替换为 [EXPIRED] Analyst report is outdated.
5. 滑动窗口
indicators 命名空间对每个指标键维护 FIFO 队列,上限 50 条。超出时丢弃最早的数据。