Bedrock Job Queue
A durable, distributed job queue for Elixir built on Bedrock. Based on the ideas in Apple's QuiCK paper.
Features
- Topic-based routing - Route jobs to worker modules by topic
- Priority ordering - Lower priority numbers are processed first
- Scheduled jobs - Delay jobs or schedule for a specific time
- Automatic retries - Failed jobs retry with exponential backoff
- Lease-based recovery - Running jobs are leased and become visible again if a worker stops before completion
- Automatic lease extension - Long-running jobs keep their lease alive while the worker is still running
- Multi-tenant - Isolate jobs by queue ID (tenant, shop, etc.)
- Transactional - Jobs are enqueued atomically within Bedrock transactions
How It Works
Bedrock Job Queue follows the broad shape of Apple's QuiCK design:
- Jobs are stored in per-queue item ranges, ordered by priority and visibility time.
- A pointer index lets scanners find queues with visible work without scanning every queue.
- A manager leases one ready queue at a time to avoid a thundering herd across consumers.
- The manager leases individual jobs before dispatching them to workers.
- While a job is running, a lease extender periodically extends the job lease.
- Successful jobs are completed and removed; failed jobs are requeued with backoff or moved to dead letter storage after retries are exhausted.
The queue provides at-least-once delivery. Job handlers should be idempotent when they perform external side effects.
Long-Running Jobs
Workers do not need to predict exactly how long a job will take. When a job is leased for execution, the consumer starts a lease extender that refreshes the lease before it expires. If the worker crashes, exits, or the node goes away, lease extension stops and the job eventually becomes visible for another consumer to claim.
Installation
Add bedrock_job_queue to your dependencies in mix.exs:
def deps do
[
{:bedrock_job_queue, "~> 0.1"}
]
endQuick Start
1. Define your JobQueue
defmodule MyApp.JobQueue do
use Bedrock.JobQueue,
otp_app: :my_app,
repo: MyApp.Repo,
workers: %{
"email:send" => MyApp.Jobs.SendEmail,
"user:welcome" => MyApp.Jobs.WelcomeUser
}
end2. Create job modules
defmodule MyApp.Jobs.SendEmail do
use Bedrock.JobQueue.Job,
topic: "email:send",
priority: 50,
max_retries: 3
@impl true
def perform(%{to: to, subject: subject, body: body}, _meta) do
MyApp.Mailer.send(to, subject, body)
:ok
end
end3. Add to your supervision tree
children = [
MyApp.Cluster,
{MyApp.JobQueue, concurrency: 10, batch_size: 5}
]4. Enqueue jobs
alias MyApp.JobQueue
# Immediate processing
JobQueue.enqueue("tenant_1", "email:send", %{
to: "user@example.com",
subject: "Hello",
body: "Welcome!"
})
# Schedule for a specific time
JobQueue.enqueue("tenant_1", "email:send", payload,
at: ~U[2024-01-15 10:00:00Z]
)
# Delay by duration
JobQueue.enqueue("tenant_1", "cleanup", payload,
in: :timer.hours(1)
)
# With priority (lower = higher priority)
JobQueue.enqueue("tenant_1", "urgent", payload,
priority: 0
)Job Return Values
Jobs can return the following values from perform/2:
| Return Value | Behavior |
|---|---|
:ok | Job completed successfully |
{:ok, result} | Job completed with result |
{:error, reason} | Job failed, will retry with backoff |
{:snooze, ms} | Reschedule job after delay and count it in retry accounting |
{:discard, reason} | Discard job without retrying |
Job Metadata
The second argument to perform/2 is a metadata map for the current job:
| Field | Description |
|---|---|
:topic | Topic string used to route the job |
:queue_id | Queue or tenant identifier used for fairness |
:item_id | Unique identifier for the queued item |
:attempt | Current 1-based attempt number |
Interactive Tutorial
Try the Coffee Shop tutorial in Livebook to explore job queues interactively.
Documentation
Full documentation is available on HexDocs.
License
MIT License - see LICENSE for details.