Oban (Oban v2.19.1)
View SourceOban is a robust background job framework which uses PostgreSQL, MySQL, or SQLite3 for persistence.
Features
Oban's primary goals are reliability, consistency and observability.
Oban is a powerful and flexible library that can handle a wide range of background job use cases, and it is well-suited for systems of any size. It provides a simple and consistent API for scheduling and performing jobs, and it is built to be fault-tolerant and easy to monitor.
Oban is fundamentally different from other background job processing tools because it retains job data for historic metrics and inspection. You can leave your application running indefinitely without worrying about jobs being lost or orphaned due to crashes.
Advantages Over Other Tools
Fewer Dependencies — If you are running a web app there is a very good chance that you're running on top of a SQL database. Running your job queue within a SQL database minimizes system dependencies and simplifies data backups.
Transactional Control — Enqueue a job along with other database changes, ensuring that everything is committed or rolled back atomically.
Database Backups — Jobs are stored inside of your primary database, which means they are backed up together with the data that they relate to.
Advanced Features
Isolated Queues — Jobs are stored in a single table but are executed in distinct queues. Each queue runs in isolation, ensuring that a job in a single slow queue can't back up other faster queues.
Queue Control — Queues can be started, stopped, paused, resumed and scaled independently at runtime locally or across all running nodes (even in environments like Heroku, without distributed Erlang).
Resilient Queues — Failing queries won't crash the entire supervision tree, instead a backoff mechanism will safely retry them again in the future.
Job Canceling — Jobs can be canceled in the middle of execution regardless of which node they are running on. This stops the job at once and flags it as
cancelled
.Triggered Execution — Insert triggers ensure that jobs are dispatched on all connected nodes as soon as they are inserted into the database.
Unique Jobs — Duplicate work can be avoided through unique job controls. Uniqueness can be enforced at the argument, queue, worker and even sub-argument level for any period of time.
Scheduled Jobs — Jobs can be scheduled at any time in the future, down to the second.
Periodic (CRON) Jobs — Automatically enqueue jobs on a cron-like schedule. Duplicate jobs are never enqueued, no matter how many nodes you're running.
Job Priority — Prioritize jobs within a queue to run ahead of others with ten levels of granularity.
Historic Metrics — After a job is processed the row isn't deleted. Instead, the job is retained in the database to provide metrics. This allows users to inspect historic jobs and to see aggregate data at the job, queue or argument level.
Node Metrics — Every queue records metrics to the database during runtime. These are used to monitor queue health across nodes and may be used for analytics.
Graceful Shutdown — Queue shutdown is delayed so that slow jobs can finish executing before shutdown. When shutdown starts queues are paused and stop executing new jobs. Any jobs left running after the shutdown grace period may be rescued later.
Telemetry Integration — Job life-cycle events are emitted via Telemetry integration. This enables simple logging, error reporting and health checkups without plug-ins.
🌟 Oban Pro
An official set of extensions, plugins, and workers that expand what Oban is capable of is available as a licensed package. It includes features like:
- 🖇️ Workflows
- 🎨 Decorators
- ⛓️ Chains
- 🏗️ Structured Jobs
- 🪝 Worker Hooks
- 🌎 Global Limits
- 🔪 Queue Partitioning
- 🎢 Dynamic Queues
Plus much more. Learn more about Oban Pro
Engines
Oban ships with engines for PostgreSQL, MySQL, and SQLite3. Each engine supports the same core functionality, though they have differing levels of maturity and suitability for production.
Requirements
Oban requires:
- Elixir 1.15+
- Erlang 24+
- PostgreSQL 12.0+, MySQL 8.0+, or SQLite3 3.37.0+
Installation
See the installation guide for details on installing and configuring Oban in your application.
Quick Getting Started
Configure queues and an Ecto repo for Oban to use:
# In config/config.exs config :my_app, Oban, repo: MyApp.Repo, queues: [mailers: 20]
Define a worker to process jobs in the
mailers
queue (seeOban.Worker
):defmodule MyApp.MailerWorker do use Oban.Worker, queue: :mailers @impl Oban.Worker def perform(%Oban.Job{args: %{"email" => email} = _args}) do _ = Email.deliver(email) :ok end end
Enqueue a job (see the documentation):
%{email: %{to: "[email protected]", body: "Hello from Oban!"}} |> MyApp.MailerWorker.new() |> Oban.insert()
The magic happens! Oban executes the job when there is available bandwidth in the
mailer
queue.
Learning
Learn the fundamentals of Oban, all the way through to preparing for production with our LiveBook powered Oban Training curriculum.
Summary
Types
The name of an Oban instance. This is used to identify instances in the internal registry for configuration lookup.
Functions
Creates a facade for Oban
functions and automates fetching configuration from the application
environment.
Cancel many jobs based on a queryable and mark them as cancelled
to prevent them from running.
Cancel an executing
, available
, scheduled
or retryable
job and mark it as cancelled
to
prevent it from running. If the job is currently executing
it will be killed and otherwise it
is ignored.
Check the current state of all queue producers.
Check the current state of a queue producer.
Retrieve the Oban.Config
struct for a named Oban supervision tree.
Delete many jobs based on a queryable.
Delete a job that's not currently executing
.
Synchronously execute all available jobs in a queue.
Insert a new job into the database for execution.
Put a job insert operation into an Ecto.Multi
.
Similar to insert/3
, but raises an Ecto.InvalidChangesetError
if the job can't be inserted.
Insert multiple jobs into the database for execution.
Put an insert_all
operation into an Ecto.Multi
.
Pause all running queues to prevent them from executing any new jobs.
Pause a running queue, preventing it from executing any new jobs. All running jobs will remain running until they are finished.
Resume executing jobs in all paused queues.
Resume executing jobs in a paused queue.
Retries all jobs that match on the given queryable.
Sets a job as available
, adding attempts if already maxed out. Jobs currently available
or
executing
are ignored. The job is scheduled for immediate execution.
Scale the concurrency for a queue.
Starts an Oban
supervision tree linked to the current process.
Start a new supervised queue.
Shutdown a queue's supervision tree and stop running jobs for that queue.
Returns the pid of the root Oban process for the given name.
Types
@type changeset_or_fun() :: Oban.Job.changeset() | Oban.Job.changeset_fun()
@type changeset_wrapper() :: %{ :changesets => Oban.Job.changeset_list(), optional(atom()) => term() }
@type changesets_or_wrapper() :: Oban.Job.changeset_list() | changeset_wrapper()
@type changesets_or_wrapper_or_fun() :: changesets_or_wrapper() | Oban.Job.changeset_list_fun()
@type drain_option() :: {:queue, queue_name()} | {:with_limit, pos_integer()} | {:with_recursion, boolean()} | {:with_safety, boolean()} | {:with_scheduled, boolean() | DateTime.t()}
@type drain_result() :: %{ cancelled: non_neg_integer(), discard: non_neg_integer(), failure: non_neg_integer(), snoozed: non_neg_integer(), success: non_neg_integer() }
@type multi() :: Ecto.Multi.t()
@type multi_name() :: Ecto.Multi.name()
@type name() :: term()
The name of an Oban instance. This is used to identify instances in the internal registry for configuration lookup.
@type oban_node() :: String.t()
@type option() :: {:dispatch_cooldown, pos_integer()} | {:engine, module()} | {:get_dynamic_repo, nil | (-> pid() | atom()) | {module(), atom(), list()}} | {:log, false | Logger.level()} | {:name, name()} | {:node, oban_node()} | {:notifier, module() | {module(), Keyword.t()}} | {:peer, false | module() | {module(), Keyword.t()}} | {:plugins, false | [module() | {module() | Keyword.t()}]} | {:prefix, false | String.t()} | {:queues, false | [{queue_name(), pos_integer() | Keyword.t()}]} | {:repo, module()} | {:shutdown_grace_period, non_neg_integer()} | {:stage_interval, timeout()} | {:testing, :disabled | :inline | :manual}
@type queue_option() :: {:local_only, boolean()} | {:node, oban_node()} | {:queue, queue_name()}
@type queue_state() :: %{ :limit => pos_integer(), :node => oban_node(), :paused => boolean(), :queue => queue_name(), :running => [pos_integer()], :started_at => DateTime.t(), :updated_at => DateTime.t(), optional(atom()) => any() }
Functions
Creates a facade for Oban
functions and automates fetching configuration from the application
environment.
Facade modules support configuration via the application environment under an OTP application
key that you specify with :otp_app
. For example, to define a facade:
defmodule MyApp.Oban do
use Oban, otp_app: :my_app
end
Then, you can configure the facade with:
config :my_app, MyApp.Oban, repo: MyApp.Repo
Then you can include MyApp.Oban
in your application's supervision tree without passing extra
options:
defmodule MyApp.Application do
use Application
def start(_type, _args) do
children = [
MyApp.Repo,
MyApp.Oban
]
opts = [strategy: :one_for_one, name: MyApp.Supervisor]
Supervisor.start_link(children, opts)
end
end
Calling Functions
Facade modules allow you to call Oban
functions on instances with custom names (rather than
Oban
), without passing a Oban.name/0
as the first argument.
For example:
# Instead of:
Oban.config(MyApp.Oban)
# You can do:
MyApp.Oban.config()
Facades also make piping into Oban functions far more convenient:
%{some: :args}
|> MyWorker.new()
|> MyOban.insert()
Merging Configuration
All configuration can be provided through the use
macro or through the application
configuration, and options from the application supersedes those passed through use
.
Configuration is prioritized in this order:
- Options passed through
use
- Options pulled from the OTP app specified by
:otp_app
viaApplication.get_env/3
- Options passed through a child spec in the supervisor
@spec cancel_all_jobs(name(), queryable :: Ecto.Queryable.t()) :: {:ok, non_neg_integer()}
Cancel many jobs based on a queryable and mark them as cancelled
to prevent them from running.
Any currently executing
jobs are killed. If executing jobs happen to fail before cancellation
then the state is set to cancelled
. However, any that complete successfully will remain
completed
.
Only jobs with the statuses executing
, available
, scheduled
, or retryable
can be cancelled.
Example
Cancel all jobs:
Oban.cancel_all_jobs(Oban.Job)
{:ok, 9}
Cancel all jobs for a specific worker:
Oban.Job
|> Ecto.Query.where(worker: "MyApp.MyWorker")
|> Oban.cancel_all_jobs()
{:ok, 2}
@spec cancel_job(name(), job_or_id :: Oban.Job.t() | integer()) :: :ok
Cancel an executing
, available
, scheduled
or retryable
job and mark it as cancelled
to
prevent it from running. If the job is currently executing
it will be killed and otherwise it
is ignored.
If an executing job happens to fail before it can be cancelled the state is set to cancelled
.
However, if it manages to complete successfully then the state will still be completed
.
Example
Cancel a job:
Oban.cancel_job(job)
:ok
Cancel a job for a custom instance:
Oban.cancel_job(MyOban, job)
:ok
@spec check_all_queues(name()) :: [queue_state()]
Check the current state of all queue producers.
Example
Get information about all running queues for the default instance:
Enum.map(Oban.check_all_queues(), &Map.take(&1, [:queue, :limit]))
[%{queue: "default", limit: 10}, %{queue: "other", limit: 5}]
Get information for an alternate instance:
Oban.check_all_queues(Other.Oban)
@spec check_queue(name(), opts :: [{:queue, queue_name()}]) :: nil | queue_state()
Check the current state of a queue producer.
This allows you to introspect on a queue's health by retrieving key attributes of the producer's
state; values such as the current limit
, the running
job ids, and when the producer was
started.
Options
:queue
- a string or atom specifying the queue to check, required
Example
Get details about the default
queue:
Oban.check_queue(queue: :default)
%{
limit: 10,
node: "me@local",
paused: false,
queue: "default",
running: [100, 102],
started_at: ~D[2020-10-07 15:31:00],
updated_at: ~D[2020-10-07 15:31:00]
}
Attempt to get details about a queue that isn't running locally:
Oban.check_queue(queue: :not_really_running)
nil
@spec config(name()) :: Oban.Config.t()
Retrieve the Oban.Config
struct for a named Oban supervision tree.
Example
Retrieve the default Oban
instance config:
%Oban.Config{} = Oban.config()
Retrieve the config for an instance started with a custom name:
%Oban.Config{} = Oban.config(MyCustomOban)
@spec delete_all_jobs(name(), queryable :: Ecto.Queryable.t()) :: {:ok, non_neg_integer()}
Delete many jobs based on a queryable.
Only jobs that aren't executing
may be deleted.
Example
Delete all jobs for a specific worker:
Oban.Job
|> Ecto.Query.where(worker: "MyApp.MyWorker")
|> Oban.delete_all_jobs()
{:ok, 9}
@spec delete_job(name(), job_or_id :: Oban.Job.t() | integer()) :: :ok
Delete a job that's not currently executing
.
Example
Delete a job:
Oban.delete_job(job)
:ok
Delete a job for a custom instance:
Oban.delete_job(MyOban, job)
:ok
@spec drain_queue(name(), [drain_option()]) :: drain_result()
Synchronously execute all available jobs in a queue.
All execution happens within the current process and it is guaranteed not to raise an error or exit.
Draining a queue from within the current process is especially useful for testing. Jobs that are
enqueued by a process when Ecto
is in sandbox mode are only visible to that process. Calling
drain_queue/2
allows you to control when the jobs are executed and to wait synchronously for
all jobs to complete.
Failures & Retries
Draining a queue uses the same execution mechanism as regular job dispatch. That means that any job failures or crashes are captured and result in a retry. Retries are scheduled in the future with backoff and won't be retried immediately.
By default jobs are executed in safe
mode, just as they are in production. Safe mode catches
any errors or exits and records the formatted error in the job's errors
array. That means
exceptions and crashes are not bubbled up to the calling process.
If you expect jobs to fail, would like to track failures, or need to check for specific errors
you can pass the with_safety: false
flag. See the "Options" section below for more details.
Scheduled Jobs
By default, drain_queue/2
will execute all currently available jobs. In order to execute
scheduled jobs, you may pass the with_scheduled: true
which will cause all scheduled jobs to
be marked as available
beforehand. To run jobs scheduled up to a specific point in time, pass
a DateTime
instead.
Options
:queue
- a string or atom specifying the queue to drain, required:with_limit
— the maximum number of jobs to drain at once. When recursion is enabled this is how many jobs are processed per-iteration.:with_recursion
— whether to keep draining a queue repeatedly when jobs insert more jobs:with_safety
— whether to silently catch errors when draining, defaults totrue
. Whenfalse
, raised exceptions or unhandled exits are reraised (unhandled exits are wrapped inOban.CrashError
).:with_scheduled
— whether to include any scheduled jobs when draining, defaultfalse
. Whentrue
, drains all scheduled jobs. When aDateTime
is provided, drains all jobs scheduled up to, and including, that point in time.
Example
Drain a queue with three available jobs, two of which succeed and one of which fails:
Oban.drain_queue(queue: :default)
%{failure: 1, snoozed: 0, success: 2}
Drain a queue including any scheduled jobs:
Oban.drain_queue(queue: :default, with_scheduled: true)
%{failure: 0, snoozed: 0, success: 1}
Drain a queue including jobs scheduled up to a minute:
Oban.drain_queue(queue: :default, with_scheduled: DateTime.add(DateTime.utc_now(), 60, :second))
Drain a queue and assert an error is raised:
assert_raise RuntimeError, fn -> Oban.drain_queue(queue: :risky, with_safety: false) end
Drain a queue repeatedly until there aren't any more jobs to run. This is particularly useful for testing jobs that enqueue other jobs:
Oban.drain_queue(queue: :default, with_recursion: true)
%{failure: 1, snoozed: 0, success: 2}
Drain only the top (by scheduled time and priority) five jobs off a queue:
Oban.drain_queue(queue: :default, with_limit: 5)
%{failure: 0, snoozed: 0, success: 1}
Drain a queue recursively, only one job at a time:
Oban.drain_queue(queue: :default, with_limit: 1, with_recursion: true)
%{failure: 0, snoozed: 0, success: 3}
@spec insert(name(), Oban.Job.changeset(), Keyword.t()) :: {:ok, Oban.Job.t()} | {:error, Oban.Job.changeset() | term()}
@spec insert(multi(), multi_name(), changeset_or_fun()) :: multi()
Insert a new job into the database for execution.
This and the other insert
variants are the recommended way to enqueue jobs because they
support features like unique jobs.
See the section on "Unique Jobs" for more details.
Example
Insert a single job:
{:ok, job} = Oban.insert(MyApp.Worker.new(%{id: 1}))
Insert a job while ensuring that it is unique within the past 30 seconds:
{:ok, job} = Oban.insert(MyApp.Worker.new(%{id: 1}, unique: [period: 30]))
Insert a job using a custom timeout:
{:ok, job} = Oban.insert(MyApp.Worker.new(%{id: 1}), timeout: 10_000)
Insert a job using an alternative instance name:
{:ok, job} = Oban.insert(MyOban, MyApp.Worker.new(%{id: 1}))
@spec insert(name(), multi(), multi_name(), changeset_or_fun(), Keyword.t()) :: multi()
Put a job insert operation into an Ecto.Multi
.
Like insert/2
, this variant is recommended over Ecto.Multi.insert
because it supports all of
Oban's features, i.e. unique jobs.
See the section on "Unique Jobs" for more details.
Example
Ecto.Multi.new()
|> Oban.insert("job-1", MyApp.Worker.new(%{id: 1}))
|> Oban.insert("job-2", fn _ -> MyApp.Worker.new(%{id: 2}) end)
|> MyApp.Repo.transaction()
@spec insert!(name(), Oban.Job.changeset(), opts :: Keyword.t()) :: Oban.Job.t()
Similar to insert/3
, but raises an Ecto.InvalidChangesetError
if the job can't be inserted.
Example
Insert a single job:
job = Oban.insert!(MyApp.Worker.new(%{id: 1}))
Insert a job using a custom timeout:
job = Oban.insert!(MyApp.Worker.new(%{id: 1}), timeout: 10_000)
Insert a job using an alternative instance name:
job = Oban.insert!(MyOban, MyApp.Worker.new(%{id: 1}))
@spec insert_all( name() | multi(), changesets_or_wrapper() | multi_name(), Keyword.t() | changesets_or_wrapper_or_fun() ) :: [Oban.Job.t()] | multi()
Insert multiple jobs into the database for execution.
There are a few important differences between this function and Ecto.Repo.insert_all/3
:
- This function always returns a list rather than a tuple of
{count, records}
- This function accepts a list of changesets rather than a list of maps or keyword lists
Error Handling and Rollbacks
If insert_all
encounters an issue, the function will raise an error based on your database
adapter. This behavior is valuable in conjunction with Ecto.Repo.transaction/2
because it
allows for rollbacks.
For example, an invalid changeset raises:
* (Ecto.InvalidChangesetError) could not perform insert because changeset is invalid.
Dolphin Engine and Generated Values
MySQL doesn't return anything on insertion into the database. That means any values generated by
the database, namely the primary key and timestamps, aren't included in the job structs returned
from insert_all
.
🌟 Unique Jobs and Batching
Only the Smart Engine in Oban
Pro supports bulk unique jobs, automatic insert batching, and minimizes
parameters sent over the wire. With the basic engine, you must use insert/3
to insert unique
jobs one at a time.
Options
Accepts any of Ecto's "Shared Options" such as timeout
and log
.
Example
Insert a list of 100 jobs at once:
1..100
|> Enum.map(&MyApp.Worker.new(%{id: &1}))
|> Oban.insert_all()
Insert a stream of jobs at once (be sure the stream terminates!):
(fn -> MyApp.Worker.new(%{}))
|> Stream.repeatedly()
|> Stream.take(100)
|> Oban.insert_all()
Insert with a custom timeout:
1..100
|> Enum.map(&MyApp.Worker.new(%{id: &1}))
|> Oban.insert_all(timeout: 10_000)
Insert with an alternative instance name:
changesets = Enum.map(1..100, &MyApp.Worker.new(%{id: &1}))
jobs = Oban.insert_all(MyOban, changesets)
@spec insert_all( name(), multi(), multi_name(), changesets_or_wrapper_or_fun(), Keyword.t() ) :: multi()
Put an insert_all
operation into an Ecto.Multi
.
This function supports the same features and has the same caveats as insert_all/2
.
Example
Insert job changesets within a multi:
changesets = Enum.map(0..100, &MyApp.Worker.new(%{id: &1}))
Ecto.Multi.new()
|> Oban.insert_all(:jobs, changesets)
|> MyApp.Repo.transaction()
Insert job changesets using a function:
Ecto.Multi.new()
|> Ecto.Multi.insert(:user, user_changeset)
|> Oban.insert_all(:jobs, fn %{user: user} ->
email_job = EmailWorker.new(%{id: user.id})
staff_job = StaffWorker.new(%{id: user.id})
[email_job, staff_job]
end)
|> MyApp.Repo.transaction()
@spec pause_all_queues(name(), opts :: [queue_all_option()]) :: :ok | {:error, Exception.t()}
Pause all running queues to prevent them from executing any new jobs.
See pause_queue/2
for options and details.
Example
Pause all queues:
Oban.pause_all_queues()
Pause all queues on the local node:
Oban.pause_all_queues(local_only: true)
Pause all queues on a specific node:
Oban.pause_all_queues(node: "worker.1")
Pause all queues for an alternative instance name:
Oban.pause_all_queues(MyOban)
@spec pause_queue(name(), opts :: [queue_option()]) :: :ok | {:error, Exception.t()}
Pause a running queue, preventing it from executing any new jobs. All running jobs will remain running until they are finished.
When shutdown begins all queues are paused.
Options
:queue
- a string or atom specifying the queue to pause, required:local_only
- whether the queue will be paused only on the local node, default:false
:node
- restrict pausing to a particular node
Note: by default, Oban does not verify that the given queue exists unless :local_only
is set to true
as even if the queue does not exist locally, it might be running on
another node.
Example
Pause the default queue:
Oban.pause_queue(queue: :default)
:ok
Pause the default queue, but only on the local node:
Oban.pause_queue(queue: :default, local_only: true)
:ok
Pause the default queue only on a particular node:
Oban.pause_queue(queue: :default, node: "worker.1")
:ok
@spec resume_all_queues(name(), opts :: [queue_all_option()]) :: :ok | {:error, Exception.t()}
Resume executing jobs in all paused queues.
See resume_queue/2
for options and details.
Example
Resume all queues:
Oban.resume_all_queues()
Resume all queues on the local node:
Oban.resume_all_queues(local_only: true)
Resume all queues on a specific node:
Oban.resume_all_queues(node: "worker.1")
Resume all queues for an alternative instance name:
Oban.resume_all_queues(MyOban)
@spec resume_queue(name(), opts :: [queue_option()]) :: :ok | {:error, Exception.t()}
Resume executing jobs in a paused queue.
Options
:queue
- a string or atom specifying the queue to resume, required:local_only
- whether the queue will be resumed only on the local node, default:false
:node
- restrict resuming to a particular node
Note: by default, Oban does not verify that the given queue exists unless :local_only
is set to true
as even if the queue does not exist locally, it might be running on
another node.
Example
Resume a paused default queue:
Oban.resume_queue(queue: :default)
Resume the default queue, but only on the local node:
Oban.resume_queue(queue: :default, local_only: true)
Resume the default queue only on a particular node:
Oban.resume_queue(queue: :default, node: "worker.1")
@spec retry_all_jobs(name(), queryable :: Ecto.Queryable.t()) :: {:ok, non_neg_integer()}
Retries all jobs that match on the given queryable.
If no queryable is given, Oban will retry all jobs that aren't currently available
or
executing
. Note that regardless of constraints, it will never retry available
or
executing
jobs.
Example
Retries jobs in any state other than available
or executing
:
Oban.retry_all_jobs(Oban.Job)
{:ok, 9}
Retries jobs with the retryable
state:
Oban.Job
|> Ecto.Query.where(state: "retryable")
|> Oban.retry_all_jobs()
{:ok, 3}
Retries all inactive jobs with priority 0
Oban.Job
|> Ecto.Query.where(priority: 0)
|> Oban.retry_all_jobs()
{:ok, 5}
@spec retry_job(name(), job_or_id :: Oban.Job.t() | integer()) :: :ok
Sets a job as available
, adding attempts if already maxed out. Jobs currently available
or
executing
are ignored. The job is scheduled for immediate execution.
Example
Retry a job:
Oban.retry_job(job)
:ok
@spec scale_queue(name(), opts :: [queue_option() | {:limit, pos_integer()}]) :: :ok | {:error, Exception.t()}
Scale the concurrency for a queue.
Options
:queue
- a string or atom specifying the queue to scale, required:limit
— the new concurrency limit, required:local_only
— whether the queue will be scaled only on the local node, default:false
:node
- restrict scaling to a particular node
In addition, all engine-specific queue options are passed along after validation.
Note: by default, Oban does not verify that the given queue exists unless :local_only
is set to true
as even if the queue does not exist locally, it might be running on
another node.
Example
Scale a queue up, triggering immediate execution of queued jobs:
Oban.scale_queue(queue: :default, limit: 50)
:ok
Scale the queue back down, allowing executing jobs to finish:
Oban.scale_queue(queue: :default, limit: 5)
:ok
Scale the queue only on the local node:
Oban.scale_queue(queue: :default, limit: 10, local_only: true)
:ok
Scale the queue on a particular node:
Oban.scale_queue(queue: :default, limit: 10, node: "worker.1")
:ok
@spec start_link([option()]) :: Supervisor.on_start()
Starts an Oban
supervision tree linked to the current process.
Options
These options are required; without them the supervisor won't start:
:repo
— specifies the Ecto repo used to insert and retrieve jobs
Primary Options
These options determine what the system does at a high level, i.e. which queues to run:
:engine
— facilitates inserting, fetching, and otherwise managing jobs.There are three built-in engines:
Oban.Engines.Basic
for Postgres databases,Oban.Engines.Lite
for SQLite3 databases, andOban.Engines.Inline
for simplified testing (only available for:inline
testing mode).When the
Oban.Engines.Lite
engine is used the:notifier
and:peer
are automatically set toPG
andisolated
mode, respectively.Additional engines, such as Oban Pro's
SmartEngine
with advanced functionality for Postgres, are also available as an add-on.Defaults to the
Basic
engine for Postgres.:log
— eitherfalse
to disable logging or a standard log level (:error
,:warning
,:info
,:debug
, etc.). This determines whether queries are logged or not; overriding the repo's configured log level. Defaults tofalse
, where no queries are logged.:name
— used for supervisor registration, it must be unique across an entire VM instance. Defaults toOban
when no name is provided.:node
— used to identify the node that the supervision tree is running in. If no value is provided it will use thenode
name in a distributed system, or thehostname
in an isolated node. See "Node Name" below.:notifier
— used to relay messages between processes, nodes, and the Postgres database.There are two built-in notifiers:
Oban.Notifiers.Postgres
, which uses Postgres PubSub; andOban.Notifiers.PG
, which uses process groups with distributed erlang. Defaults to the Postgres notifier.:peer
— used to specify which peer module to use for cluster leadership.There are two built-in peers:
Oban.Peers.Database
, which uses table-based leadership through theoban_peers
table; andOban.Peers.Global
, which uses global locks through distributed Erlang.Leadership can be disabled by setting
peer: false
, but note that centralized plugins likeCron
won't run without leadership.Defaults to the
Database
peer.:plugins
— a list or modules or module/option tuples that are started as children of an Oban supervisor. Any supervisable module is a valid plugin, i.e. aGenServer
or anAgent
. May also be set tofalse
to disable plugins and disable leadership.:prefix
— the query prefix, or schema, to use for inserting and executing jobs. Anoban_jobs
table must exist within the prefix. See the "Prefix Support" section in the module documentation for more details.:queues
— a keyword list where the keys are queue names and the values are the concurrency setting or a keyword list of queue options. For example, setting queues to[default: 10, exports: 5]
would start the queuesdefault
andexports
with a combined concurrency level of 15. The concurrency setting specifies how many jobs each queue will run concurrently.Queues accept additional override options to customize their behavior, e.g. by setting
paused
ordispatch_cooldown
for a specific queue.Using an empty list or
false
prevents any queues from starting on init.:testing
— a mode that controls how an instance is configured for testing. When set to:inline
or:manual
queues, peers, and plugins are automatically disabled. Defaults to:disabled
, no test mode.
Twiddly Options
Additional options used to tune system behaviour. These are primarily useful for testing or troubleshooting and don't usually need modification.
:dispatch_cooldown
— the minimum number of milliseconds a producer will wait before fetching and running more jobs. A slight cooldown period prevents a producer from flooding with messages and thrashing the database. The cooldown period directly impacts a producer's throughput: jobs per second for a single queue is calculated by(1000 / cooldown) * limit
. For example, with a5ms
cooldown and a queue limit of25
a single queue can run 5,000 jobs/sec.The default is
5ms
and the minimum is1ms
, which is likely faster than the database can return new jobs to run.:insert_trigger
— whether to dispatch notifications to relevant queues as jobs are inserted into the database. At high load, e.g. thousands or more job inserts per second, notifications may become a bottleneck.The trigger mechanism is designed to make jobs execute immediately after insert, rather than up to
:stage_interval
(1 second) afterwards, and it can safely be disabled to improve insert throughput.Defaults to
true
, with triggering enabled.:shutdown_grace_period
— the amount of time a queue will wait for executing jobs to complete before hard shutdown, specified in milliseconds. The default is15_000
, or 15 seconds.:stage_interval
— the number of milliseconds between making scheduled jobs available and notifying relevant queues that jobs are available. This is directly tied to the resolution ofscheduled
orretryable
jobs and how frequently the database is checked for jobs to run. To minimize database load, only5_000
jobs are staged at each interval.Only the leader node stages jobs and notifies queues when the
:notifier's
pubsub notifications are functional. If pubsub messages can't get through then staging switches to a less efficient "local" mode in which all nodes poll for jobs to run.Setting the interval to
:infinity
disables staging entirely. The default is1_000ms
.
Example
Start a stand-alone Oban
instance:
{:ok, pid} = Oban.start_link(repo: MyApp.Repo, queues: [default: 10])
To start an Oban
instance within an application's supervision tree:
def start(_type, _args) do
children = [MyApp.Repo, {Oban, repo: MyApp.Repo, queues: [default: 10]}]
Supervisor.start_link(children, strategy: :one_for_one, name: MyApp.Supervisor)
end
Start multiple, named Oban
supervisors within a supervision tree:
children = [
MyApp.Repo,
{Oban, name: Oban.A, repo: MyApp.Repo, queues: [default: 10]},
{Oban, name: Oban.B, repo: MyApp.Repo, queues: [special: 10]},
]
Supervisor.start_link(children, strategy: :one_for_one, name: MyApp.Supervisor)
Start a local Oban
instance for SQLite:
{:ok, pid} = Oban.start_link(engine: Oban.Engines.Lite, repo: MyApp.Repo)
Node Name
When the node
value has not been configured it is generated based on the environment:
- If the local node is alive (e.g. in a distributed system, or when running from a mix release) the node name is used
- In a Heroku environment the system environment's
DYNO
value is used - Otherwise, the system hostname is used
When running a mix release on a Heroku node, the node is alive even if not part of a
distributed system. In order to use the DYNO
value, configure the node value using runtime
configuration via config/runtime.exs
:
config :my_app, Oban,
node: System.get_env("DYNO", "nonode@nohost")
@spec start_queue(name(), opts :: Keyword.t()) :: :ok | {:error, Exception.t()}
Start a new supervised queue.
By default this starts a new supervised queue across all nodes running Oban on the same database
and prefix. You can pass the option local_only: true
if you prefer to start the queue only on
the local node.
Options
:queue
- a string or atom specifying the queue to start, required:local_only
- whether the queue will be started only on the local node, default:false
:limit
- set the concurrency limit, required:paused
— set whether the queue starts in the "paused" state, optional
In addition, all engine-specific queue options are passed along after validation.
Example
Start the :priority
queue with a concurrency limit of 10 across the connected nodes.
Oban.start_queue(queue: :priority, limit: 10)
:ok
Start the :media
queue with a concurrency limit of 5 only on the local node.
Oban.start_queue(queue: :media, limit: 5, local_only: true)
:ok
Start the :media
queue on a particular node.
Oban.start_queue(queue: :media, limit: 5, node: "worker.1")
:ok
Start the :media
queue in a paused
state.
Oban.start_queue(queue: :media, limit: 5, paused: true)
:ok
@spec stop_queue(name(), opts :: [queue_option()]) :: :ok | {:error, Exception.t()}
Shutdown a queue's supervision tree and stop running jobs for that queue.
By default this action will occur across all the running nodes. Still, if you prefer to stop the
queue's supervision tree and stop running jobs for that queue only on the local node, you can
pass the option: local_only: true
The shutdown process pauses the queue first and allows current jobs to exit gracefully, provided they finish within the shutdown limit.
Note: by default, Oban does not verify that the given queue exists unless :local_only
is set to true
as even if the queue does not exist locally, it might be running on
another node.
Options
:queue
- a string or atom specifying the queue to stop, required:local_only
- whether the queue will be stopped only on the local node, default:false
:node
- restrict stopping to a particular node
Example
Stop a running queue on all nodes:
Oban.stop_queue(queue: :default)
:ok
Stop the queue only on the local node:
Oban.stop_queue(queue: :media, local_only: true)
:ok
Stop the queue only on a particular node:
Oban.stop_queue(queue: :media, node: "worker.1")
:ok
Returns the pid of the root Oban process for the given name.
Example
Find the default instance:
Oban.whereis(Oban)
Find a dynamically named instance:
Oban.whereis({:oban, 1})