返回顶部
b

batch-processing-patterns批量处理模式

批量处理与长时任务编排模式。涵盖队列管理、并发调度、中断恢复、熔断器、远程任务轮询、进度报告和反风控策略。适用于批量文件处理、AI API 调用、爬虫和后台任务场景。

作者: admin | 来源: ClawHub
源自
ClawHub
版本
V 1.0.0
安全检测
已通过
440
下载量
免费
免费
0
收藏
概述
安装方式
版本历史

batch-processing-patterns

批量处理与长时任务指南

来自生产级桌面应用的实战经验,覆盖批量文件处理、远程 API 轮询、并发控制和错误恢复。

适用场景

  • - 批量文件处理(转码、压缩、水印)
  • 远程 AI 任务轮询(视频生成、语音合成)
  • 爬虫/批量 HTTP 请求
  • 后台队列任务

1. 批处理架构

任务队列
├── 并发调度器(动态调整并发数)
│ ├── Worker 1 → processItem()
│ ├── Worker 2 → processItem()
│ └── Worker N → processItem()
├── 中止控制器(shouldStop + 子进程清理)
├── 熔断器(连续失败 N 次暂停)
├── 跳过检查(断点续传 / 前置过滤)
└── 进度报告(per-item + overall)

核心原则

  1. 1. 逐项处理,逐项报告 — 不等全部完成
  2. 中断即停 — 每个 item 之间检查 abort 信号
  3. 失败不中断 — 单项失败标记 failed,继续处理其他
  4. 熔断保护 — 连续失败超阈值暂停整个队列

2. 并发调度

自适应并发池

根据每个 item 的处理耗时动态调整并发数:

typescript
class AdaptiveScheduler {
private concurrency: number;
private running = 0;
private queue: (() => void)[] = [];

constructor(
private min: number,
private max: number,
private slowThresholdMs: number
) {
this.concurrency = Math.ceil((min + max) / 2);
}

async run(fn: () => Promise): Promise {
if (this.running >= this.concurrency) {
await new Promise((resolve) => this.queue.push(resolve));
}
this.running++;
const start = Date.now();

try {
return await fn();
} finally {
const elapsed = Date.now() - start;
this.running--;

// 自适应调整
if (elapsed > this.slowThresholdMs && this.concurrency > this.min) {
this.concurrency--;
} else if (elapsed < this.slowThresholdMs / 2 && this.concurrency < this.max) {
this.concurrency++;
}

if (this.queue.length > 0) {
this.queue.shift()!();
}
}
}
}

预设配置

场景初始最小最大慢阈值
CPU 密集(FFmpeg 转码)41CPU 核数3s
API 调用(AI 服务)
3 | 1 | 5 | 8s | | 文件 I/O | 2 | 1 | 3 | 30s | | 串行(必须顺序) | 1 | 1 | 1 | - |

简易并发池(不需要自适应时)

typescript
async function runPool(
items: T[],
fn: (item: T) => Promise,
concurrency: number,
signal?: AbortSignal
): Promise<{ completed: number; failed: number }> {
let completed = 0, failed = 0;
const running = new Set>();

for (const item of items) {
if (signal?.aborted) break;

const p = fn(item)
.then(() => { completed++; })
.catch(() => { failed++; });

running.add(p.then(() => { running.delete(p); }));

if (running.size >= concurrency) {
await Promise.race(running);
}
}
await Promise.all(running);
return { completed, failed };
}



3. 中断与恢复

AbortController 模式

typescript
class BatchAbortController {
private _aborted = false;
private callbacks: (() => void)[] = [];

get aborted() { return this._aborted; }

abort() {
if (this._aborted) return; // 幂等
this._aborted = true;
this.callbacks.forEach((cb) => cb());
}

onAbort(cb: () => void) {
if (this._aborted) { cb(); return; }
this.callbacks.push(cb);
}

reset() {
this._aborted = false;
this.callbacks = [];
}
}

断点续传(shouldSkip)

typescript
const completedSet = new Set(loadCompletedFromDisk());

function shouldSkip(item: FileItem): string | null {
if (completedSet.has(item.path)) return 已完成(断点续传);
if (item.size <= targetSize) return 已满足目标条件;
return null; // 正常处理
}

// 在批处理循环中
for (const item of items) {
if (abortController.aborted) break;

const skipReason = shouldSkip(item);
if (skipReason) {
onItemSkip(item, skipReason);
continue;
}

try {
await processItem(item);
completedSet.add(item.path);
saveCompletedToDisk(completedSet); // 持久化进度
onItemComplete(item);
} catch (err) {
onItemError(item, err);
}
}



4. 熔断器

连续失败过多时自动暂停,避免无意义的重试浪费资源。

typescript
class CircuitBreaker {
private consecutiveFailures = 0;

constructor(
private maxFailures: number = 5,
private onTrip?: (failures: number) => void
) {}

recordSuccess() {
this.consecutiveFailures = 0;
}

recordFailure(): boolean {
this.consecutiveFailures++;
if (this.consecutiveFailures >= this.maxFailures) {
this.onTrip?.(this.consecutiveFailures);
return true; // tripped
}
return false;
}

get isTripped() {
return this.consecutiveFailures >= this.maxFailures;
}

reset() {
this.consecutiveFailures = 0;
}
}



5. 远程任务轮询

递归 setTimeout 模式(推荐)

typescript
function startPolling(taskId: string, interval = 5000) {
const poll = async () => {
try {
const result = await queryTaskStatus(taskId);

if (result.status === completed) {
onComplete(result);
return; // 停止轮询
}
if (result.status === failed) {
onFailed(result);
return;
}

// 超时检查
if (Date.now() - startTime > MAXPOLLTIME) {
onTimeout(taskId);
return;
}

setTimeout(poll, interval); // 继续
} catch (err) {
if (isCriticalError(err)) return; // 停止
setTimeout(poll, interval); // 瞬态错误继续
}
};

poll();
}

关键要点

  • - 用 setTimeout 而非 setInterval — 确保前一次完成后再调度下一次
  • 超时保护 — 总轮询时间有上限,避免永久轮询
  • 错误分类 — 瞬态错误继续轮询,致命错误立即停止
  • 去重 — 用 Set 防止同一任务重复轮询

批量轮询(多任务顺序轮询)

typescript
// 多任务并发轮询可能触发限流,改为顺序轮询
const pollAll = async () => {
const activeTasks = getActiveTasks();

for (const task of activeTasks) {
if (aborted) return;
await pollOne(task.id);
// 任务间延迟,尊重 QPS
if (activeTasks.length > 1) {
await sleep(2000);
}
}

if (getActiveTasks().length > 0) {
setTimeout(pollAll, POLL_INTERVAL);
}
};



6. 错误分类与处理


错误类型行为示例
瞬态网络错误重试timeout, ECONNRESET
401 Unauthorized
停止 | API Key 无效 |
| 403 / 余额不足 | 停止 | 账户问题 |
| 429 Too Many Requests | 退避重试 | 限流 |
| 500+ Server Error | 有限重试(3 次) | 服务端异常 |
| 文件不存在 | 跳过此项 | 输入文件被删 |
| 磁盘空间不足 | 停止全部 | ENOSPC |

指数退避重试

typescript
async function with

标签

skill ai

通过对话安装

该技能支持在以下平台通过对话安装:

OpenClaw WorkBuddy QClaw Kimi Claude

方式一:安装 SkillHub 和技能

帮我安装 SkillHub 和 batch-processing-patterns-1776284786 技能

方式二:设置 SkillHub 为优先技能安装源

设置 SkillHub 为我的优先技能安装源,然后帮我安装 batch-processing-patterns-1776284786 技能

通过命令行安装

skillhub install batch-processing-patterns-1776284786

下载

⬇ 下载 batch-processing-patterns v1.0.0(免费)

文件大小: 5.04 KB | 发布时间: 2026-4-16 17:55

v1.0.0 最新 2026-4-16 17:55
批处理与长时任务编排模式。自适应并发池、AbortController、熔断器、远程任务轮询、错误分类、反风控策略。

Archiver·手机版·闲社网·闲社论坛·智能体自动化市场· 多链控股集团有限公司 · 苏ICP备2025199260号-1

Powered by Discuz! X5.0   © 2024-2026 闲社网·AI智能体论坛·AI自动化解决方案·http://xianshe.com

p2p_official_large
返回顶部