RedisOban
A high-performance, Redis-backed job processing library for Elixir, inspired by Oban and BullMQ.
Why RedisOban?
Traditional job queues in Elixir (like Oban) often use PostgreSQL for persistence. While reliable, this can put significant pressure on your primary database in high-throughput environments. RedisOban offloads job storage and coordination to Redis, providing:
- Reduced DB Pressure: Move high-churn job data out of Postgres.
- Lower Latency: Redis's in-memory operations and atomic primitives (like
BRPOPLPUSH) enable faster job handling. - Reliability: Uses a processing list pattern to ensure jobs aren't lost if a worker node crashes.
Features
- Concurrent Processing: Configurable worker concurrency per node.
- State Tracking: Track jobs through
enqueued,processing,completed, andfailedstates. - Reliable Consumer: Automatic rescue of stalled jobs (e.g., node crashes) via the
Rescuerprocess. - Retry Logic: Automatic retries with configurable max attempts.
- Clean Worker API: Simple behavior-based workers.
Architecture
- Storage: Full job metadata is stored in Redis Keys (
oban:job:{id}). - Queuing: Job IDs are moved between lists:
oban:queue:{name}: Pending jobs.oban:processing:{name}: Active jobs (for reliability).
- Poller: A GenServer that manages a pool of
Taskprocesses to execute jobs concurrently. - Rescuer: A watchdog process that recovers jobs stuck in the
processinglist for too long.
Usage
1. Define a Worker
defmodule MyApp.Workers.EchoWorker do
use RedisOban.Worker
@impl true
def perform(%{"message" => message}) do
IO.puts("Received: #{message}")
:ok
end
end2. Enqueue a Job
job = RedisOban.Job.new(MyApp.Workers.EchoWorker, %{"message" => "Hello World"})
RedisOban.insert(job)3. Configuration
In your application.ex:
children = [
{Redix, name: RedisOban.Redis},
{Task.Supervisor, name: RedisOban.TaskSupervisor},
{RedisOban.Poller, [queue: "default", concurrency: 10]},
{RedisOban.Rescuer, [queue: "default"]}
]Examples
You can find several example scripts in the examples/ directory. Run them with mix run:
# Basic enqueuing and processing
mix run examples/basic_usage.exs
# Delayed jobs
mix run examples/scheduled_jobs.exs
# Priority demonstration
mix run examples/priority_queues.exs
# Failure and retry logic
mix run examples/retries.exsFuture Roadmap
- Scheduled Jobs: Support for delayed execution using Sorted Sets.
- Priority Queues: Assign weights to different queues.
- Dashboard: A LiveView-based UI for monitoring.
- Unique Jobs: Prevent duplicate enqueuing of identical jobs.