Oban Designer
Installation
CODEBLOCK0
Generate the Oban migrations:
CODEBLOCK1
CODEBLOCK2
Worker Implementation
Basic Worker
CODEBLOCK3
Return Values
| Return | Effect |
|---|
| INLINECODE0 | Job marked complete |
| INLINECODE1 |
Job marked complete |
|
{:error, reason} | Job retried (counts as attempt) |
|
{:cancel, reason} | Job cancelled, no more retries |
|
{:snooze, seconds} | Re-scheduled, doesn't count as attempt |
|
{:discard, reason} | Job discarded (Oban 2.17+) |
Queue Configuration
See references/worker-patterns.md for common worker patterns.
Sizing Guidelines
| Queue | Concurrency | Use Case |
|---|
| INLINECODE6 | 10 | General-purpose |
| INLINECODE7 |
20 | Email delivery (I/O bound) |
|
webhooks | 50 | Webhook delivery (I/O bound, high volume) |
|
media | 5 | Image/video processing (CPU bound) |
|
events | 5 | Analytics, audit logs |
|
critical | 3 | Billing, payments |
Queue Priority
Jobs within a queue execute by priority (0 = highest). Use sparingly:
CODEBLOCK4
Retry Strategies
Default Backoff
Oban uses exponential backoff: attempt^4 + attempt seconds.
Custom Backoff
CODEBLOCK5
Timeout
CODEBLOCK6
Uniqueness
Prevent duplicate jobs:
CODEBLOCK7
Unique Options
| Option | Default | Description |
|---|
| INLINECODE13 | 60 | Seconds to enforce uniqueness (:infinity for forever) |
| INLINECODE15 |
all active | Which job states to check |
|
keys | all args | Specific arg keys to compare |
|
timestamp |
:inserted_at | Use
:scheduled_at for scheduled uniqueness |
Replace Existing
CODEBLOCK8
Cron Scheduling
CODEBLOCK9
Cron expressions: minute hour day_of_month month day_of_week.
Inserting Jobs
CODEBLOCK10
Oban Pro Features
Available with Oban Pro license:
Batch (group of jobs)
CODEBLOCK11
Workflow (job dependencies)
CODEBLOCK12
Chunk (aggregate multiple jobs)
CODEBLOCK13
Testing
See references/testing-oban.md for detailed testing patterns.
Setup
CODEBLOCK14
Asserting Job Enqueued
CODEBLOCK15
Executing Jobs in Tests
CODEBLOCK16
Monitoring
Telemetry Events
CODEBLOCK17
Key Metrics to Track
- - Job execution duration (p50, p95, p99)
- Queue depth (available jobs per queue)
- Error rate per worker
- Retry rate per worker
Oban Designer
安装
elixir
mix.exs
{:oban, ~> 2.18}
config/config.exs
config :my_app, Oban,
repo: MyApp.Repo,
queues: [default: 10, mailers: 20, webhooks: 50, events: 5],
plugins: [
Oban.Plugins.Pruner,
{Oban.Plugins.Cron, crontab: [
{0 2
*, MyApp.Workers.DailyCleanup},
{
/5 *, MyApp.Workers.MetricsCollector}
]}
]
在 application.ex 的子进程列表中:
{Oban, Application.fetch
env!(:myapp, Oban)}
生成 Oban 迁移文件:
bash
mix ecto.gen.migration addobanjobs_table
elixir
defmodule MyApp.Repo.Migrations.AddObanJobsTable do
use Ecto.Migration
def up, do: Oban.Migration.up(version: 12)
def down, do: Oban.Migration.down(version: 1)
end
工作器实现
基础工作器
elixir
defmodule MyApp.Workers.SendEmail do
use Oban.Worker,
queue: :mailers,
max_attempts: 5,
priority: 1
@impl Oban.Worker
def perform(%Oban.Job{args: %{to => to, template => template} = args}) do
case MyApp.Mailer.deliver(to, template, args) do
{:ok, _} -> :ok
{:error, :temporary} -> {:error, 临时故障} # 将重试
{:error, :permanent} -> {:cancel, 无效地址} # 不重试
end
end
end
返回值
| 返回值 | 效果 |
|---|
| :ok | 任务标记为完成 |
| {:ok, result} |
任务标记为完成 |
| {:error, reason} | 任务重试(计入尝试次数) |
| {:cancel, reason} | 任务取消,不再重试 |
| {:snooze, seconds} | 重新调度,不计入尝试次数 |
| {:discard, reason} | 任务丢弃(Oban 2.17+) |
队列配置
常见工作器模式请参见 references/worker-patterns.md。
规模指南
| 队列 | 并发数 | 使用场景 |
|---|
| default | 10 | 通用用途 |
| mailers |
20 | 邮件发送(I/O 密集型) |
| webhooks | 50 | Webhook 发送(I/O 密集型,高吞吐量) |
| media | 5 | 图片/视频处理(CPU 密集型) |
| events | 5 | 分析、审计日志 |
| critical | 3 | 计费、支付 |
队列优先级
队列内的任务按优先级执行(0 为最高)。请谨慎使用:
elixir
%{user_id: user.id}
|> MyApp.Workers.SendEmail.new(priority: 0) # 紧急
|> Oban.insert()
重试策略
默认退避
Oban 使用指数退避:attempt^4 + attempt 秒。
自定义退避
elixir
defmodule MyApp.Workers.WebhookDelivery do
use Oban.Worker,
queue: :webhooks,
max_attempts: 10
@impl Oban.Worker
def backoff(%Oban.Job{attempt: attempt}) do
# 带抖动的指数退避:2^attempt + random(0..30)
trunc(:math.pow(2, attempt)) + :rand.uniform(30)
end
@impl Oban.Worker
def perform(%Oban.Job{args: args}) do
# ...
end
end
超时
elixir
use Oban.Worker, queue: :media
@impl Oban.Worker
def timeout(%Oban.Job{args: %{size => large}}), do: :timer.minutes(10)
def timeout(_job), do: :timer.minutes(2)
唯一性
防止重复任务:
elixir
defmodule MyApp.Workers.SyncAccount do
use Oban.Worker,
queue: :default,
unique: [
period: 300, # 5 分钟
states: [:available, :scheduled, :executing, :retryable],
keys: [:account_id] # 按此参数键唯一
]
end
唯一性选项
| 选项 | 默认值 | 描述 |
|---|
| period | 60 | 强制执行唯一性的秒数(:infinity 表示永久) |
| states |
所有活跃状态 | 要检查的任务状态 |
| keys | 所有参数 | 要比较的特定参数键 |
| timestamp | :inserted
at | 使用 :scheduledat 实现调度唯一性 |
替换已有任务
elixir
%{account_id: id}
|> MyApp.Workers.SyncAccount.new(
replace: [:scheduledat], # 如果重复则更新 scheduledat
schedule_in: 60
)
|> Oban.insert()
Cron 调度
elixir
config.exs
plugins: [
{Oban.Plugins.Cron, crontab: [
{0
/6 , MyApp.Workers.DigestEmail},
{0 2
*, MyApp.Workers.DailyCleanup},
{0 0 1
, MyApp.Workers.MonthlyReport},
{
/5 *, MyApp.Workers.HealthCheck, args: %{service: api}},
]}
]
Cron 表达式:分钟 小时 日 月 星期。
插入任务
elixir
立即执行
%{user_id: user.id, template: welcome}
|> MyApp.Workers.SendEmail.new()
|> Oban.insert()
延迟调度
%{report_id: id}
|> MyApp.Workers.GenerateReport.new(schedule_in: 3600)
|> Oban.insert()
指定时间调度
%{report_id: id}
|> MyApp.Workers.GenerateReport.new(scheduled_at: ~U[2024-01-01 00:00:00Z])
|> Oban.insert()
批量插入
changesets = Enum.map(users, fn user ->
MyApp.Workers.SendEmail.new(%{user_id: user.id})
end)
Oban.insert_all(changesets)
在 Ecto.Multi 中使用
Ecto.Multi.new()
|> Ecto.Multi.insert(:user, changeset)
|> Oban.insert(:welcome_email, fn %{user: user} ->
MyApp.Workers.SendEmail.new(%{user_id: user.id})
end)
|> Repo.transaction()
Oban Pro 功能
需要 Oban Pro 许可证:
批次(任务组)
elixir
批量处理项目,全部完成后执行回调
batch = MyApp.Workers.ProcessItem.new_batch(
items |> Enum.map(&%{item_id: &1.id}),
callback: {MyApp.Workers.BatchComplete, %{batch_name: import}}
)
Oban.insert_all(batch)
工作流(任务依赖)
elixir
Oban.Pro.Workflow.new()
|> Oban.Pro.Workflow.add(:extract, MyApp.Workers.Extract.new(%{file: path}))
|> Oban.Pro.Workflow.add(:transform, MyApp.Workers.Transform.new(%{}), deps: [:extract])
|> Oban.Pro.Workflow.add(:load, MyApp.Workers.Load.new(%{}), deps: [:transform])
|> Oban.insert_all()
分块(聚合多个任务)
elixir
defmodule MyApp.Workers.BulkIndex do
use Oban.Pro.Workers.Chunk,
queue: :indexing,
size: 100, # 每次处理 100 个
timeout: 30_000 # 或 30 秒后超时
@impl true
def process(jobs) do