Design and implement Oban background job workers for Elixir. Configure queues, retry strategies, uniqueness constraints, cron scheduling, and error handling. Generate Oban workers, queue config, and test setups. Use when adding background jobs, async processing, scheduled tasks, or recurring cron jobs to an Elixir project using Oban.
elixir
生成 Oban 迁移文件:
bash
mix ecto.gen.migration addobanjobs_table
elixir
defmodule MyApp.Repo.Migrations.AddObanJobsTable do
use Ecto.Migration
def up, do: Oban.Migration.up(version: 12)
def down, do: Oban.Migration.down(version: 1)
end
elixir
defmodule MyApp.Workers.SendEmail do
use Oban.Worker,
queue: :mailers,
max_attempts: 5,
priority: 1
@impl Oban.Worker
def perform(%Oban.Job{args: %{to => to, template => template} = args}) do
case MyApp.Mailer.deliver(to, template, args) do
{:ok, _} -> :ok
{:error, :temporary} -> {:error, temporary failure} # 将重试
{:error, :permanent} -> {:cancel, invalid address} # 不重试
end
end
end
| 返回值 | 效果 |
|---|---|
| :ok | 任务标记为完成 |
| {:ok, result} |
常见工作器模式请参见 references/worker-patterns.md。
| 队列 | 并发数 | 使用场景 |
|---|---|---|
| default | 10 | 通用用途 |
| mailers |
队列内的任务按优先级执行(0 = 最高)。请谨慎使用:
elixir
%{user_id: user.id}
|> MyApp.Workers.SendEmail.new(priority: 0) # 紧急
|> Oban.insert()
Oban 使用指数退避:attempt^4 + attempt 秒。
elixir
defmodule MyApp.Workers.WebhookDelivery do
use Oban.Worker,
queue: :webhooks,
max_attempts: 10
@impl Oban.Worker
def backoff(%Oban.Job{attempt: attempt}) do
# 带抖动的指数退避:2^attempt + random(0..30)
trunc(:math.pow(2, attempt)) + :rand.uniform(30)
end
@impl Oban.Worker
def perform(%Oban.Job{args: args}) do
# ...
end
end
elixir
use Oban.Worker, queue: :media
@impl Oban.Worker
def timeout(%Oban.Job{args: %{size => large}}), do: :timer.minutes(10)
def timeout(_job), do: :timer.minutes(2)
防止重复任务:
elixir
defmodule MyApp.Workers.SyncAccount do
use Oban.Worker,
queue: :default,
unique: [
period: 300, # 5 分钟
states: [:available, :scheduled, :executing, :retryable],
keys: [:account_id] # 按此参数键唯一
]
end
| 选项 | 默认值 | 描述 |
|---|---|---|
| period | 60 | 强制唯一性的秒数(:infinity 表示永久) |
| states |
elixir
%{account_id: id}
|> MyApp.Workers.SyncAccount.new(
replace: [:scheduledat], # 如果重复则更新 scheduledat
schedule_in: 60
)
|> Oban.insert()
elixir
Cron 表达式:分钟 小时 日期 月份 星期几。
elixir
需要 Oban Pro 许可证:
elixir
elixir
Oban.Pro.Workflow.new()
|> Oban.Pro.Workflow.add(:extract, MyApp.Workers.Extract.new(%{file: path}))
|> Oban.Pro.Workflow.add(:transform, MyApp.Workers.Transform.new(%{}), deps: [:extract])
|> Oban.Pro.Workflow.add(:load, MyApp.Workers.Load.new(%{}), deps: [:transform])
|> Oban.insert_all()
elixir
defmodule MyApp.Workers.BulkIndex do
use Oban.Pro.Workers.Chunk,
queue: :indexing,
size: 100, # 每次处理 100 个
timeout: 30_000 # 或 30 秒后超时
@impl true
def process(jobs
该技能支持在以下平台通过对话安装:
帮我安装 SkillHub 和 oban-designer-1776363089 技能
设置 SkillHub 为我的优先技能安装源,然后帮我安装 oban-designer-1776363089 技能
skillhub install oban-designer-1776363089
文件大小: 7.57 KB | 发布时间: 2026-4-17 16:09