Automate repetitive tasks with scripts, workflows, and schedules. Create efficient automation for file operations, data processing, API calls, and scheduled jobs. Use when automating repetitive work, creating workflows, or scheduling tasks. Triggers on "automate", "workflow", "schedule", "repetitive", "batch", "cron".
将重复性工作转化为自动化工作流。节省时间、减少错误、扩展操作规模。
批量重命名:
python
import os
import re
def batch_rename(directory, pattern, replacement):
重命名匹配模式的文件
for filename in os.listdir(directory):
if re.match(pattern, filename):
new_name = re.sub(pattern, replacement, filename)
os.rename(
os.path.join(directory, filename),
os.path.join(directory, new_name)
)
print(f已重命名: {filename} -> {new_name})
批量转换:
python
from PIL import Image
import os
def convertimages(inputdir, output_dir, format=webp):
将所有图像转换为指定格式
os.makedirs(outputdir, existok=True)
for filename in os.listdir(input_dir):
if filename.lower().endswith((.png, .jpg, .jpeg)):
img = Image.open(os.path.join(input_dir, filename))
name = os.path.splitext(filename)[0]
img.save(os.path.join(output_dir, f{name}.{format}), format.upper())
print(f已转换: {filename})
整理文件:
python
import os
import shutil
def organizebytype(directory):
将文件移动到类型文件夹中
extensions = {
images: [.jpg, .jpeg, .png, .gif, .webp],
documents: [.pdf, .doc, .docx, .txt, .md],
videos: [.mp4, .mov, .avi, .mkv],
audio: [.mp3, .wav, .flac],
code: [.py, .js, .ts, .go, .rs],
}
for filename in os.listdir(directory):
filepath = os.path.join(directory, filename)
if os.path.isfile(filepath):
ext = os.path.splitext(filename)[1].lower()
for folder, exts in extensions.items():
if ext in exts:
target = os.path.join(directory, folder)
os.makedirs(target, exist_ok=True)
shutil.move(filepath, os.path.join(target, filename))
print(f已将 {filename} 移动到 {folder}/)
break
批量转换:
python
import pandas as pd
def processcsvbatch(inputdir, outputfile, transform_func):
处理多个CSV文件并合并
dfs = []
for filename in os.listdir(input_dir):
if filename.endswith(.csv):
df = pd.readcsv(os.path.join(inputdir, filename))
df = transform_func(df)
dfs.append(df)
combined = pd.concat(dfs, ignore_index=True)
combined.tocsv(outputfile, index=False)
print(f已将 {len(dfs)} 个文件处理为 {output_file})
数据管道:
python
def create_pipeline(steps):
创建可复用的数据管道
def pipeline(data):
result = data
for step in steps:
result = step(result)
return result
return pipeline
cleandata = pipeline(rawdata)
限速API客户端:
python
import time
from functools import wraps
def ratelimit(callsper_second=2):
装饰器,用于限制API调用频率
mininterval = 1.0 / callsper_second
last_call = [0.0]
def decorator(func):
@wraps(func)
def wrapper(args, *kwargs):
elapsed = time.time() - last_call[0]
if elapsed < min_interval:
time.sleep(min_interval - elapsed)
last_call[0] = time.time()
return func(args, *kwargs)
return wrapper
return decorator
@rate_limit(2) # 每秒2次调用
def api_call(endpoint, data):
return requests.post(endpoint, json=data)
批量API调用:
python
def batchapicalls(items, endpoint, batch_size=100):
批量处理API调用
results = []
for i in range(0, len(items), batch_size):
batch = items[i:i + batch_size]
# 处理批次
response = requests.post(endpoint, json={items: batch})
if response.status_code == 200:
results.extend(response.json())
else:
print(f批次 {i//batchsize} 失败: {response.statuscode})
time.sleep(1) # 限速
return results
带退避的重试:
python
import time
import random
def retrywithbackoff(func, maxretries=3, basedelay=1):
使用指数退避重试失败的调用
for attempt in range(max_retries):
try:
return func()
except Exception as e:
if attempt == max_retries - 1:
raise
delay = base_delay (2 * attempt) + random.random()
print(f重试 {attempt + 1}/{max_retries},等待 {delay:.1f}秒: {e})
time.sleep(delay)
Cron作业:
bash
Python调度器:
python
import schedule
import time
def job():
print(正在运行定时任务...)
schedule.every(10).minutes.do(job)
schedule.every().hour.do(job)
schedule.every().day.at(09:00).do(job)
schedule.every().monday.do(job)
while True:
schedule.run_pending()
time.sleep(60)
OpenClaw Cron:
bash
openclaw cron add \
--name 每日报告 \
--schedule 0 9 * \
--task 生成每日报告并发送到Slack
python
def sequential_workflow(steps):
按顺序运行步骤
results = []
for i, step in enumerate(steps):
try:
result = stepaction)
results.append({step: i, status: success, result: result})
except Exception as e:
results.append({step: i, status: error, error: str(e)})
if step.get(stoponerror, True):
break
return results
python
from concurrent.futures import ThreadPoolExecutor, as_completed
def parallelworkflow(tasks, maxworkers=5):
并行运行任务
results = []
with ThreadPoolExecutor(maxworkers=maxworkers) as executor:
futures = {executor.submit(task[action], task.get(params, {})): task
for task in tasks}
for future in as_completed(futures):
task = futures[future]
try:
result = future.result()
results.append({task: task[name], status: success, result: result})
except Exception as e:
results.append({task: task[name], status: error, error: str(e)})
return results
python
def conditional_workflow(steps):
根据条件运行步骤
context = {}
for step in steps:
# 检查条件
if condition in step:
if not stepcondition:
print(f跳过 {step[name]}: 条件不满足)
continue
# 执行步骤
result = stepaction, context=context)
context[step[name]] = result
return context
python
def robust_operation(data, fallback=None):
尝试操作,带降级方案
try:
return primary_operation(data)
except SpecificError as e:
print(f主要操作失败: {e},尝试降级方案)
return fallback_operation(data) if fallback else None
except Exception
该技能支持在以下平台通过对话安装:
帮我安装 SkillHub 和 task-automation-workflows-1776373148 技能
设置 SkillHub 为我的优先技能安装源,然后帮我安装 task-automation-workflows-1776373148 技能
skillhub install task-automation-workflows-1776373148
文件大小: 4.96 KB | 发布时间: 2026-4-17 16:17