返回顶部
d

durable-workflow持久工作流

|

作者: admin | 来源: ClawHub
源自
ClawHub
版本
V 1.0.1
安全检测
已通过
150
下载量
免费
免费
0
收藏
概述
安装方式
版本历史

durable-workflow

持久化工作流模式

构建能够应对API故障、超时和意外状态的自动化流程——无需在每次出错时从头重建。

核心原则

多步骤工作流中的每一步都必须回答三个问题:

  1. 1. 我完成了什么?(检查点)
  2. 如果这一步失败了我该怎么做?(恢复)
  3. 如果出现问题谁来发现?(告警)

忽略其中任何一点,工作流最终都会静默失败。

脚本

scripts/ 目录中提供了可直接使用的实现:

脚本用途
workflow-template.js包含检查点、重试、死信队列、退出处理器的完整工作流骨架
lock.js
基于文件的进程锁——防止并发运行 |

workflow-template.js

复制并填写步骤中的待办事项:

bash
cp scripts/workflow-template.js my-workflow.js
node my-workflow.js # 运行(或重新运行——从最后一个检查点恢复)
WORKFLOWSTATEPATH=/tmp/state.json node my-workflow.js # 自定义状态路径

特性:原子状态保存、指数退避、超时包装器、死信队列、异常退出日志记录。

lock.js

防止同一工作流的两个实例同时运行:

javascript
const { withLock, LockError } = require(./lock);

withLock(/tmp/my-workflow.lock, async () => {
// 同一时间只有一个进程运行此代码块
await runWorkflow();
}).catch(e => {
if (e.name === LockError) {
console.error(已在运行中:, e.message);
} else {
throw e;
}
});

模式1:检查点状态

在每个有意义的步骤后保存进度。永远不要在网络调用之间信任内存中的状态。

javascript
// checkpoint.js 模式
const state = loadState(workflow-id) || { step: 0, results: [] };

if (state.step < 1) {
state.results.push(await fetchData());
state.step = 1;
saveState(workflow-id, state);
}
if (state.step < 2) {
state.results.push(await processData(state.results[0]));
state.step = 2;
saveState(workflow-id, state);
}
// 从任何步骤重新启动——已完成的步骤将被跳过

模式2:断路器

停止对故障服务的持续请求。在N次失败后断开电路,冷却后半开。

javascript
class CircuitBreaker {
constructor(threshold = 3, cooldownMs = 30000) {
this.failures = 0; this.threshold = threshold;
this.state = closed; this.nextRetry = 0;
}
async call(fn) {
if (this.state === open) {
if (Date.now() < this.nextRetry) throw new Error(电路已断开);
this.state = half-open;
}
try {
const result = await fn();
this.failures = 0; this.state = closed;
return result;
} catch (e) {
this.failures++;
if (this.failures >= this.threshold) {
this.state = open;
this.nextRetry = Date.now() + this.cooldownMs;
}
throw e;
}
}
}

模式3:带抖动的指数退避

javascript
async function withRetry(fn, maxAttempts = 4, baseDelayMs = 1000) {
for (let attempt = 0; attempt < maxAttempts; attempt++) {
try { return await fn(); }
catch (e) {
if (attempt === maxAttempts - 1) throw e;
const delay = baseDelayMs Math.pow(2, attempt) + Math.random() 500;
await new Promise(r => setTimeout(r, delay));
}
}
}

模式4:死信队列

当某一步在所有重试后仍然失败时,不要静默丢弃它。将其路由到可审查的位置。

javascript
async function processWithDLQ(items, processFn, dlqPath) {
const failed = [];
for (const item of items) {
try { await withRetry(() => processFn(item)); }
catch (e) { failed.push({ item, error: e.message, failedAt: new Date() }); }
}
if (failed.length) {
const existing = fs.existsSync(dlqPath) ? JSON.parse(fs.readFileSync(dlqPath)) : [];
fs.writeFileSync(dlqPath, JSON.stringify([...existing, ...failed], null, 2));
}
}

模式5:幂等操作

设计每一步,使其执行两次产生与执行一次相同的结果。

javascript
// 错误:执行两次会创建两条记录
await db.insert({ id: uuid(), data });

// 正确:基于自然键进行更新插入
await db.upsert({ id: deterministicId(data), data }, { onConflict: update });

模式6:实例锁

防止重复运行(例如cron重叠、运行时手动重新触发)。

javascript
const { withLock, LockError } = require(./scripts/lock);

const LOCK_PATH = /tmp/my-workflow.lock;

async function main() {
await withLock(LOCK_PATH, async () => {
// 安全:同一时间只有一个实例到达此处
await runWorkflow();
});
}

main().catch(e => {
if (e.name === LockError) {
// 不是错误——只是另一个实例在运行
console.log(跳过:${e.message});
process.exit(0);
}
console.error(致命错误:, e.message);
process.exit(1);
});

该锁使用PID检测——崩溃进程产生的陈旧锁会自动回收。

工作流设计检查清单

在交付任何多步骤自动化之前:

  • - [ ] 每一步在进入下一步之前保存状态
  • [ ] 外部API调用包装了重试+退避
  • [ ] 每次运行中调用超过一次的服务使用断路器
  • [ ] 失败的项目进入死信文件/队列,而不是 /dev/null
  • [ ] 工作流可以从任何步骤重新启动,而不会重复已完成的工作
  • [ ] 工作流异常退出时触发告警(不仅仅是异常时)
  • [ ] 所有外部调用设置超时(永远不要在没有截止时间的情况下 await fetch())
  • [ ] 如果由cron或多个调用者触发,则设置实例锁

告警

在工作流失败时发送Telegram消息,以便你在查看之前就知道。仅使用内置的 https。

设置环境变量:ALERTTELEGRAMTOKEN 和 ALERTCHATID。

javascript
const https = require(https);

function sendTelegramAlert(message) {
const token = process.env.ALERTTELEGRAMTOKEN;
const chatId = process.env.ALERTCHATID;
if (!token || !chatId) return Promise.resolve(); // 未配置告警,静默跳过

const body = JSON.stringify({ chatid: chatId, text: message, parsemode: Markdown });
return new Promise((resolve) => {
const req = https.request(
{
hostname: api.telegram.org,
path: /bot${token}/sendMessage,
method: POST,
headers: { Content-Type: application/json, Content-Length: Buffer.byteLength(body) },
},
res => { res.resume(); res.on(end, resolve); }
);
req.on(error, () => resolve()); // 不要让告警失败导致工作流崩溃
req.setTimeout(5000, () => { req.destroy(); resolve(); });
req.write(body);
req.end();
});
}

// 用法——在你的 main() 的 catch 块中:
main().catch(async e => {
console.error(致命错误:, e.message);
await sendTelegramAlert(❌ 工作流失败\n\${e.message}\);
process.exit(1);
});

常见故障模式

请参阅 references/failure-taxonomy.md,获取包含诊断和修复模式的代理工作流故障完整目录。

标签

skill ai

通过对话安装

该技能支持在以下平台通过对话安装:

OpenClaw WorkBuddy QClaw Kimi Claude

方式一:安装 SkillHub 和技能

帮我安装 SkillHub 和 durable-workflow-1776104952 技能

方式二:设置 SkillHub 为优先技能安装源

设置 SkillHub 为我的优先技能安装源,然后帮我安装 durable-workflow-1776104952 技能

通过命令行安装

skillhub install durable-workflow-1776104952

下载

⬇ 下载 durable-workflow v1.0.1(免费)

文件大小: 9.93 KB | 发布时间: 2026-4-14 09:53

v1.0.1 最新 2026-4-14 09:53
v1.0.1 of durable-workflow

- No file changes detected in this version.
- No user-facing updates or documentation changes.

Archiver·手机版·闲社网·闲社论坛·智能体自动化市场· 多链控股集团有限公司 · 苏ICP备2025199260号-1

Powered by Discuz! X5.0   © 2024-2026 闲社网·AI智能体论坛·AI自动化解决方案·http://xianshe.com

p2p_official_large
返回顶部