|
构建能够应对API故障、超时和意外状态的自动化流程——无需在每次出错时从头重建。
多步骤工作流中的每一步都必须回答三个问题:
忽略其中任何一点,工作流最终都会静默失败。
scripts/ 目录中提供了可直接使用的实现:
| 脚本 | 用途 |
|---|---|
| workflow-template.js | 包含检查点、重试、死信队列、退出处理器的完整工作流骨架 |
| lock.js |
复制并填写步骤中的待办事项:
bash
cp scripts/workflow-template.js my-workflow.js
node my-workflow.js # 运行(或重新运行——从最后一个检查点恢复)
WORKFLOWSTATEPATH=/tmp/state.json node my-workflow.js # 自定义状态路径
特性:原子状态保存、指数退避、超时包装器、死信队列、异常退出日志记录。
防止同一工作流的两个实例同时运行:
javascript
const { withLock, LockError } = require(./lock);
withLock(/tmp/my-workflow.lock, async () => {
// 同一时间只有一个进程运行此代码块
await runWorkflow();
}).catch(e => {
if (e.name === LockError) {
console.error(已在运行中:, e.message);
} else {
throw e;
}
});
在每个有意义的步骤后保存进度。永远不要在网络调用之间信任内存中的状态。
javascript
// checkpoint.js 模式
const state = loadState(workflow-id) || { step: 0, results: [] };
if (state.step < 1) {
state.results.push(await fetchData());
state.step = 1;
saveState(workflow-id, state);
}
if (state.step < 2) {
state.results.push(await processData(state.results[0]));
state.step = 2;
saveState(workflow-id, state);
}
// 从任何步骤重新启动——已完成的步骤将被跳过
停止对故障服务的持续请求。在N次失败后断开电路,冷却后半开。
javascript
class CircuitBreaker {
constructor(threshold = 3, cooldownMs = 30000) {
this.failures = 0; this.threshold = threshold;
this.state = closed; this.nextRetry = 0;
}
async call(fn) {
if (this.state === open) {
if (Date.now() < this.nextRetry) throw new Error(电路已断开);
this.state = half-open;
}
try {
const result = await fn();
this.failures = 0; this.state = closed;
return result;
} catch (e) {
this.failures++;
if (this.failures >= this.threshold) {
this.state = open;
this.nextRetry = Date.now() + this.cooldownMs;
}
throw e;
}
}
}
javascript
async function withRetry(fn, maxAttempts = 4, baseDelayMs = 1000) {
for (let attempt = 0; attempt < maxAttempts; attempt++) {
try { return await fn(); }
catch (e) {
if (attempt === maxAttempts - 1) throw e;
const delay = baseDelayMs Math.pow(2, attempt) + Math.random() 500;
await new Promise(r => setTimeout(r, delay));
}
}
}
当某一步在所有重试后仍然失败时,不要静默丢弃它。将其路由到可审查的位置。
javascript
async function processWithDLQ(items, processFn, dlqPath) {
const failed = [];
for (const item of items) {
try { await withRetry(() => processFn(item)); }
catch (e) { failed.push({ item, error: e.message, failedAt: new Date() }); }
}
if (failed.length) {
const existing = fs.existsSync(dlqPath) ? JSON.parse(fs.readFileSync(dlqPath)) : [];
fs.writeFileSync(dlqPath, JSON.stringify([...existing, ...failed], null, 2));
}
}
设计每一步,使其执行两次产生与执行一次相同的结果。
javascript
// 错误:执行两次会创建两条记录
await db.insert({ id: uuid(), data });
// 正确:基于自然键进行更新插入
await db.upsert({ id: deterministicId(data), data }, { onConflict: update });
防止重复运行(例如cron重叠、运行时手动重新触发)。
javascript
const { withLock, LockError } = require(./scripts/lock);
const LOCK_PATH = /tmp/my-workflow.lock;
async function main() {
await withLock(LOCK_PATH, async () => {
// 安全:同一时间只有一个实例到达此处
await runWorkflow();
});
}
main().catch(e => {
if (e.name === LockError) {
// 不是错误——只是另一个实例在运行
console.log(跳过:${e.message});
process.exit(0);
}
console.error(致命错误:, e.message);
process.exit(1);
});
该锁使用PID检测——崩溃进程产生的陈旧锁会自动回收。
在交付任何多步骤自动化之前:
在工作流失败时发送Telegram消息,以便你在查看之前就知道。仅使用内置的 https。
设置环境变量:ALERTTELEGRAMTOKEN 和 ALERTCHATID。
javascript
const https = require(https);
function sendTelegramAlert(message) {
const token = process.env.ALERTTELEGRAMTOKEN;
const chatId = process.env.ALERTCHATID;
if (!token || !chatId) return Promise.resolve(); // 未配置告警,静默跳过
const body = JSON.stringify({ chatid: chatId, text: message, parsemode: Markdown });
return new Promise((resolve) => {
const req = https.request(
{
hostname: api.telegram.org,
path: /bot${token}/sendMessage,
method: POST,
headers: { Content-Type: application/json, Content-Length: Buffer.byteLength(body) },
},
res => { res.resume(); res.on(end, resolve); }
);
req.on(error, () => resolve()); // 不要让告警失败导致工作流崩溃
req.setTimeout(5000, () => { req.destroy(); resolve(); });
req.write(body);
req.end();
});
}
// 用法——在你的 main() 的 catch 块中:
main().catch(async e => {
console.error(致命错误:, e.message);
await sendTelegramAlert(❌ 工作流失败\n\${e.message}\);
process.exit(1);
});
请参阅 references/failure-taxonomy.md,获取包含诊断和修复模式的代理工作流故障完整目录。
该技能支持在以下平台通过对话安装:
帮我安装 SkillHub 和 durable-workflow-1776104952 技能
设置 SkillHub 为我的优先技能安装源,然后帮我安装 durable-workflow-1776104952 技能
skillhub install durable-workflow-1776104952
文件大小: 9.93 KB | 发布时间: 2026-4-14 09:53