BullMQ for Elixir

Hex.pmHex.pmDocumentationLicense: MIT

A powerful, fast, and robust job queue for Elixir backed by Redis. This is an Elixir port of the popular BullMQ library for Node.js, providing full compatibility with existing BullMQ queues.

Features

Installation

Add bullmq to your list of dependencies in mix.exs:

def deps do
  [
    {:bullmq, "~> 1.0"}
  ]
end

Quick Start

1. Add Jobs to a Queue

# Add a job using stateless API (recommended for most use cases)
{:ok, job} = BullMQ.Queue.add("emails", "send-welcome", %{
  to: "user@example.com",
  template: "welcome"
}, connection: :my_redis)

# Add a delayed job
{:ok, job} = BullMQ.Queue.add("emails", "reminder", %{message: "Don't forget!"},
  connection: :my_redis,
  delay: 60_000  # 1 minute
)

# Add a prioritized job
{:ok, job} = BullMQ.Queue.add("emails", "urgent", %{},
  connection: :my_redis,
  priority: 1  # Lower = higher priority
)

2. Process Jobs with a Worker

defmodule MyApp.EmailWorker do
  def process(%BullMQ.Job{name: "send-welcome", data: data}) do
    MyApp.Mailer.send_welcome(data["to"], data["template"])
    {:ok, %{sent: true}}
  end

  def process(%BullMQ.Job{name: name}) do
    {:error, "Unknown job type: #{name}"}
  end
end

# Start a worker with event callbacks
{:ok, worker} = BullMQ.Worker.start_link(
  queue: "emails",
  connection: :my_redis,
  processor: &MyApp.EmailWorker.process/1,
  concurrency: 5,
  on_completed: fn job, result ->
    IO.puts("Job #{job.id} completed with #{inspect(result)}")
  end,
  on_failed: fn job, reason ->
    IO.puts("Job #{job.id} failed: #{reason}")
  end
)

3. Add to Your Supervision Tree

defmodule MyApp.Application do
  use Application

  def start(_type, _args) do
    children = [
      # Start Redis connection
      {Redix, name: :my_redis, host: "localhost", port: 6379},

      # Start email worker
      {BullMQ.Worker,
        queue: "emails",
        connection: :my_redis,
        processor: &MyApp.EmailWorker.process/1,
        concurrency: 5
      }
    ]

    Supervisor.start_link(children, strategy: :one_for_one)
  end
end

Advanced Features

Job Options

BullMQ.Queue.add("tasks", "process-data", %{data: "..."},
  connection: :my_redis,
  priority: 1,              # Lower = higher priority
  delay: 60_000,            # Delay 60 seconds
  attempts: 5,              # Retry up to 5 times
  backoff: %{
    type: "exponential",
    delay: 1000
  },
  remove_on_complete: true, # Clean up after completion
  remove_on_fail: 100       # Keep last 100 failed jobs
)

Worker Event Callbacks

Workers support event callbacks similar to Node.js:

{:ok, worker} = BullMQ.Worker.start_link(
  queue: "tasks",
  connection: :my_redis,
  processor: &process/1,
  on_completed: fn job, result -> handle_completion(job, result) end,
  on_failed: fn job, reason -> handle_failure(job, reason) end,
  on_active: fn job -> handle_active(job) end,
  on_stalled: fn job_id -> handle_stalled(job_id) end
)

Queue Events (Real-time Subscriptions)

Subscribe to queue-level events using Redis Streams:

{:ok, events} = BullMQ.QueueEvents.start_link(
  queue: "tasks",
  connection: :my_redis
)

BullMQ.QueueEvents.subscribe(events)

receive do
  {:bullmq_event, :completed, %{"jobId" => id}} ->
    IO.puts("Job #{id} completed!")
  {:bullmq_event, :failed, %{"jobId" => id, "failedReason" => reason}} ->
    IO.puts("Job #{id} failed: #{reason}")
end

Rate Limiting

{:ok, worker} = BullMQ.Worker.start_link(
  queue: "api-calls",
  connection: :my_redis,
  processor: &process/1,
  limiter: %{max: 100, duration: 60_000}  # 100 per minute
)

Job Schedulers (Repeatable Jobs)

# Create a scheduler with cron pattern
{:ok, job} = BullMQ.JobScheduler.upsert(:my_redis, "maintenance", "cleanup",
  %{pattern: "0 * * * *"},  # Every hour
  "cleanup-job",
  %{type: "hourly"},
  prefix: "bull"
)

# Create an interval-based scheduler
{:ok, job} = BullMQ.JobScheduler.upsert(:my_redis, "heartbeats", "ping",
  %{every: 60_000},  # Every minute
  "heartbeat",
  %{},
  prefix: "bull"
)

# List schedulers
{:ok, schedulers} = BullMQ.JobScheduler.list(:my_redis, "maintenance", prefix: "bull")

# Remove a scheduler
{:ok, removed} = BullMQ.JobScheduler.remove(:my_redis, "maintenance", "cleanup", prefix: "bull")

Job Progress

def process(%BullMQ.Job{} = job) do
  Enum.each(1..100, fn i ->
    do_work(i)
    BullMQ.Worker.update_progress(job, i)
  end)

  {:ok, "done"}
end

Queue Getters

# Get job counts
{:ok, counts} = BullMQ.Queue.get_counts("emails", connection: :my_redis)
# => %{waiting: 10, active: 2, delayed: 5, completed: 100, failed: 3, ...}

# Get jobs in a specific state
{:ok, jobs} = BullMQ.Queue.get_jobs("emails", [:waiting, :delayed],
  connection: :my_redis, start: 0, end: 9)

# Get a specific job
{:ok, job} = BullMQ.Queue.get_job("emails", "job-id-123", connection: :my_redis)

# Get job state
{:ok, state} = BullMQ.Queue.get_job_state("emails", "job-id-123", connection: :my_redis)
# => :waiting | :active | :delayed | :completed | :failed

Queue Operations

# Pause the queue
:ok = BullMQ.Queue.pause("emails", connection: :my_redis)

# Resume the queue
:ok = BullMQ.Queue.resume("emails", connection: :my_redis)

# Check if paused
{:ok, is_paused} = BullMQ.Queue.paused?("emails", connection: :my_redis)

# Drain the queue (remove all waiting jobs)
:ok = BullMQ.Queue.drain("emails", connection: :my_redis)

# Remove a specific job
:ok = BullMQ.Queue.remove_job("emails", "job-id-123", connection: :my_redis)

# Retry a failed job
:ok = BullMQ.Queue.retry_job("emails", "job-id-123", connection: :my_redis)

Graceful Shutdown

Workers automatically wait for active jobs to complete when closing:

# Close worker and wait for active jobs to finish
:ok = BullMQ.Worker.close(worker)

# Force close without waiting
:ok = BullMQ.Worker.close(worker, force: true)

Documentation

Full documentation is available at HexDocs.

Requirements

Compatibility

This library is fully compatible with the Node.js BullMQ library. Jobs can be added from Node.js and processed by Elixir workers, and vice versa. They share the same Redis data structures and Lua scripts.

License

MIT License - see LICENSE for details.

Contributing

Contributions are welcome! Please see our Contributing Guide.

Commit Convention

This project uses Conventional Commits with automated releases via semantic-release. For Elixir-specific changes, add [elixir] tag to your commit message:

# Bug fix (patch release: 0.0.x)
git commit -m "fix(worker): handle job timeout correctly [elixir]"

# New feature (minor release: 0.x.0)
git commit -m "feat(queue): add bulk job operations [elixir]"

# Breaking change (major release: x.0.0)
git commit -m "feat(worker)!: change processor callback signature [elixir]"
Commit Type Version Bump Example
fix(...): ... [elixir] Patch fix(scripts): correct ARGV order [elixir]
feat(...): ... [elixir] Minor feat(queue): add getJobCounts [elixir]
feat(...)!: ... [elixir] Major feat(worker)!: new API [elixir]
docs(...): ... [elixir] None docs(readme): update examples [elixir]
chore(...): ... [elixir] None chore(deps): update redix [elixir]

Credits

This is an Elixir port of BullMQ by Taskforce.sh.