BeexQueue

CIHex.pmLicense

A Redis-backed job queue library for Elixir

BeexQueue is an Elixir port of the Node.js bee-queue library, designed to provide seamless interoperability with existing Node.js bee-queue systems. It uses Redis as the backend with the same key structure, JSON serialization, and atomic operations for full compatibility.

Key Features

Requirements

Installation

Add beexqueue to your list of dependencies in mix.exs:

def deps do
  [
    {:beexqueue, "~> 0.1.0"}
  ]
end

Then run:

mix deps.get

Quick Start

# Create a queue
queue = BeexQueue.new("my-queue")

# Create and enqueue a job
{:ok, job} = BeexQueue.create_job(queue, %{task: "process_data", data: [1, 2, 3]})
:ok = BeexQueue.save(job)

# Process jobs
BeexQueue.process(queue, fn(job) ->
  IO.inspect(job.data)
  :ok
end)

Configuration

Basic Redis Configuration

Configure Redis connection in your config/config.exs:

config :beexqueue, redis_url: "redis://localhost:6379"

Advanced Queue Configuration

BeexQueue supports extensive configuration options:

config :beexqueue,
  redis_url: "redis://localhost:6379",
  default_queue_settings: %{
    # Redis connection settings
    redis: %{host: "localhost", port: 6379},

    # Queue behavior
    prefix: "bq",                           # Redis key prefix
    is_worker: true,                        # Whether to process jobs
    stall_interval: 5000,                   # Stall check interval (ms)
    near_term_window: 1_200_000,            # Delayed job window (ms)

    # Delayed job processing
    activate_delayed_jobs: false,           # Enable delayed job activation
    delayed_debounce: 1000,                 # Debounce delay for delayed jobs

    # Event publishing
    send_events: true,                      # Publish job events to Redis

    # Job storage
    store_jobs: true,                       # Store job instances in memory
    remove_on_success: false,               # Remove jobs from Redis on success
    remove_on_failure: false,               # Remove jobs from Redis on failure

    # Performance tuning
    redis_scan_count: 100,                  # Batch size for Redis SCAN
    initial_redis_failure_retry_delay: 1000,# Initial retry delay for Redis failures

    # Connection management
    auto_connect: true,                     # Auto-connect to Redis on creation
    ensure_scripts: true                    # Cache Lua scripts in Redis
  }

Queue-Specific Configuration

Override settings when creating individual queues:

# Custom Redis connection
{:ok, queue} = BeexQueue.new("custom_queue",
  redis: %{host: "redis.example.com", port: 6380, password: "secret"},
  stall_interval: 10000,
  send_events: false
)

# High-performance queue for time-sensitive jobs
{:ok, fast_queue} = BeexQueue.new("fast_queue",
  stall_interval: 1000,
  near_term_window: 300_000,
  activate_delayed_jobs: true,
  redis_scan_count: 500
)

# Memory-efficient queue for large volumes
{:ok, efficient_queue} = BeexQueue.new("efficient_queue",
  store_jobs: false,
  remove_on_success: true,
  remove_on_failure: true
)

Redis Key Structure

BeexQueue uses the same Redis key structure as bee-queue for interoperability:

Usage Examples

Basic Job Processing

# Create a queue
{:ok, queue} = BeexQueue.new("email-queue")

# Create and save a job
{:ok, job} = BeexQueue.create_job(queue, %{to: "user@example.com", subject: "Hello"})
{:ok, saved_job} = BeexQueue.save(job)

# Process jobs with a handler function
BeexQueue.process(queue, fn(job) ->
  # Your job logic here
  IO.puts("Sending email to #{job.data["to"]}")
  send_email(job.data)
  :ok  # Return :ok for success
end)

Advanced Job Options

# Job with retries and timeout
{:ok, job} = BeexQueue.create_job(queue,
  %{task: "process_payment", amount: 100},
  %{
    retries: 3,
    timeout: 30000,  # 30 seconds
    backoff: %{strategy: "exponential", delay: 1000}
  }
)
BeexQueue.save(job)

Backoff Strategies

# Immediate retry (no delay)
{:ok, job} = BeexQueue.create_job(queue, data,
  %{retries: 3, backoff: %{strategy: "immediate"}}
)

# Fixed delay between retries
{:ok, job} = BeexQueue.create_job(queue, data,
  %{retries: 3, backoff: %{strategy: "fixed", delay: 5000}}
)

# Exponential backoff (delay doubles each retry)
{:ok, job} = BeexQueue.create_job(queue, data,
  %{retries: 3, backoff: %{strategy: "exponential", delay: 1000}}
)

Delayed Jobs

# Schedule job to run in 1 hour
delay_ms = 60 * 60 * 1000
{:ok, job} = BeexQueue.create_job(queue, job_data, %{delay: delay_ms})
BeexQueue.save(job)

# Schedule job for specific time
future_time = DateTime.utc_now() |> DateTime.add(2, :hour)
delay_ms = DateTime.diff(future_time, DateTime.utc_now(), :millisecond)
{:ok, job} = BeexQueue.create_job(queue, job_data, %{delay: delay_ms})
BeexQueue.save(job)

Job Progress Tracking

BeexQueue.process(queue, fn(job) ->
  # Report progress during long-running jobs
  BeexQueue.Job.progress(job, 25)
  do_step_1()

  BeexQueue.Job.progress(job, 50)
  do_step_2()

  BeexQueue.Job.progress(job, 100)
  :ok
end)

Multiple Workers and Concurrency

# Process with 5 concurrent workers
BeexQueue.process(queue, handler_function, concurrency: 5)

# High-throughput processing
{:ok, high_volume_queue} = BeexQueue.new("high-volume",
  stall_interval: 1000,
  redis_scan_count: 200
)
BeexQueue.process(high_volume_queue, handler, concurrency: 10)

Error Handling and Job Failures

BeexQueue.process(queue, fn(job) ->
  case process_job(job.data) do
    {:ok, result} ->
      {:ok, result}

    {:error, :temporary_failure} ->
      # Job will be retried based on retry settings
      {:error, "Temporary failure"}

    {:error, :permanent_failure} ->
      # Job will fail immediately
      {:error, "Permanent failure"}
  end
end)

Job Status Monitoring

# Get all waiting jobs
{:ok, waiting_jobs} = BeexQueue.get_jobs(queue, "waiting")

# Get all active jobs
{:ok, active_jobs} = BeexQueue.get_jobs(queue, "active")

# Get specific job by ID
{:ok, job} = BeexQueue.get_job(queue, "job_123")

# Get queue statistics
stats = BeexQueue.stats(queue)
IO.inspect(stats)  # %{waiting: 5, active: 2, succeeded: 100, failed: 3}

Architecture

BeexQueue follows Elixir best practices with:

Development

Prerequisites

Ensure you have the following installed (matching our CI environment):

We recommend using asdf for version management. A .tool-versions file is included in the repository.

# Install asdf if you haven't already
git clone https://github.com/asdf-vm/asdf.git ~/.asdf

# Install required versions
asdf install

Quick Setup (CI-Compatible Environment)

For the fastest setup that exactly matches CI:

# Clone and enter the repository
git clone https://github.com/nathanpotter/beexqueue.git
cd beexqueue

# Run the automated setup script (includes all CI checks)
./bin/setup

This script will:

Manual Setup

If you prefer to set up manually:

  1. Clone the repository

    git clone https://github.com/nathanpotter/beexqueue.git
    cd beexqueue
  2. Install Elixir dependencies

    mix local.rebar --force
    mix local.hex --force
    mix deps.get
  3. Install Node.js dependencies (for interop tests)

    cd test/interop
    npm install
    cd ../..
  4. Start Redis

    # Using Docker Compose (recommended - matches CI)
    docker compose up -d redis
    
    # Verify Redis is running
    docker exec beexqueue-redis-1 redis-cli ping
  5. Verify your setup

    # Compile the project
    mix compile --warnings-as-errors
    
    # Run formatting check
    mix format --check-formatted
    
    # Run linting
    mix credo --strict
    
    # Run type checking
    mix dialyzer
    
    # Run tests
    mix test

Development Workflow

Use the development helper script for common tasks:

# Run all tests
./bin/dev test

# Run specific test types
./bin/dev unit          # Unit tests only
./bin/dev integration   # Integration tests only
./bin/dev interop       # Interop tests only

# Code quality
./bin/dev format        # Format code
./bin/dev lint          # Run Credo
./bin/dev dialyzer      # Run Dialyzer

# CI simulation
./bin/dev ci            # Run full CI check locally

# Redis management
./bin/dev redis:start   # Start Redis
./bin/dev redis:stop    # Stop Redis

# Cleanup
./bin/dev clean         # Clean build artifacts

Running CI Checks Locally

To ensure your changes pass CI before pushing:

# Run the exact same checks as CI
./bin/dev ci

# Or run individual checks
mix format --check-formatted
mix credo
mix compile --warnings-as-errors
mix dialyzer
mix test

Redis Development

Monitor Redis keys during development:

# List all BeexQueue keys
redis-cli KEYS "bq:*"

# Monitor Redis commands in real-time
redis-cli MONITOR

# Inspect job data
redis-cli HGETALL "bq:my_queue:jobs:1"

Testing

Running Tests

# Run all tests
mix test

# Run specific test files
mix test test/beexqueue/job_test.exs
mix test test/beexqueue/integration_test.exs

# Run doctests (examples in @doc attributes)
mix test --doctest

# Run tests with verbose output
mix test --verbose

Test Types

Unit Tests (test/beexqueue/):

Integration Tests (test/beexqueue/integration_test.exs):

Interop Tests (test/beexqueue/interop_test.exs):

Interoperability Testing

BeexQueue includes comprehensive tests for Node.js bee-queue compatibility:

# Set up Node.js dependencies for interop tests
mix setup_interop

# Run interop tests
mix test --only interop

# Run interop tests with verbose output
mix test test/beexqueue/interop_test.exs --verbose

Interop Test Scenarios:

Prerequisites for Interop Tests:

See test/interop/README.md for detailed interop test documentation.

Code Quality

# Format code
mix format

# Check formatting (CI)
mix format --check-formatted

# Run linter
mix credo

# Run linter in strict mode
mix credo --strict

# Type checking
mix dialyzer

# Generate documentation
mix docs

Redis Monitoring

Monitor Redis keys during development:

# List all BeexQueue keys
redis-cli KEYS "bq:*"

# Inspect jobs hash
redis-cli HGETALL "bq:my-queue:jobs"

# Check queue lengths
redis-cli LLEN "bq:my-queue:waiting"
redis-cli LLEN "bq:my-queue:active"
redis-cli SCARD "bq:my-queue:succeeded"
redis-cli SCARD "bq:my-queue:failed"

Dependencies

Runtime Dependencies

Development and Testing Dependencies

Compatibility

BeexQueue is designed to be fully compatible with Node.js bee-queue:

Interoperability with Node.js bee-queue

BeexQueue provides seamless interoperability with the original Node.js bee-queue library, allowing you to:

Interop Features

Setting Up Interop Tests

# Install Node.js dependencies for bee-queue
mix setup_interop

# Verify setup
cd test/interop && npm list

Running Interop Tests

# Run all interop tests
mix test --only interop

# Run with detailed output
mix test test/beexqueue/interop_test.exs --verbose

Interop Test Scenarios

The interop tests cover comprehensive compatibility scenarios:

Node.js Producer → Elixir Consumer

// Node.js producer
const Queue = require("bee-queue");
const queue = new Queue("test-queue");
const job = queue.createJob({ message: "from Node.js" });
await job.save();
# Elixir consumer
{:ok, queue} = BeexQueue.new("test-queue")
BeexQueue.process(queue, fn(job) ->
  IO.puts("Processing: #{job.data["message"]}")
  :ok
end)

Elixir Producer → Node.js Consumer

# Elixir producer
{:ok, queue} = BeexQueue.new("test-queue")
{:ok, job} = BeexQueue.create_job(queue, %{message: "from Elixir"})
BeexQueue.save(job)
// Node.js consumer
const Queue = require("bee-queue");
const queue = new Queue("test-queue");
queue.process((job, done) => {
  console.log(`Processing: ${job.data.message}`);
  done();
});

Data Structure Compatibility

Both systems preserve complex data structures:

// Node.js
job.data = {
  nested: { array: [1, 2, 3], boolean: true, null: null },
  timestamp: new Date().toISOString(),
};
# Elixir
job_data = %{
  "nested" => %{
    "array" => [1, 2, 3],
    "boolean" => true,
    "null" => nil
  },
  "timestamp" => DateTime.utc_now() |> DateTime.to_iso8601()
}

Production Interop Setup

For production interoperability:

  1. Use the same Redis instance for both Node.js and Elixir applications
  2. Configure identical queue names across systems
  3. Set compatible job options (retries, timeouts, backoff)
  4. Monitor both systems for job processing statistics
  5. Test data serialization thoroughly before deployment

Troubleshooting Interop Issues

See test/interop/README.md for detailed documentation and troubleshooting guides.

Contributing

We welcome contributions! Please follow these guidelines:

Development Workflow

  1. Fork the repository and create a feature branch

  2. Follow coding standards:

    • Use mix format to format your code
    • Run mix credo to check code quality
    • Add tests for new features
    • Ensure compatibility with Node.js bee-queue
  3. Testing:

    • Write comprehensive tests
    • Aim for high test coverage
    • Test both success and error scenarios
  4. Documentation:

    • Add @doc and @moduledoc for public APIs
    • Update README if needed
    • Include examples in documentation
  5. Commits:

    • Use conventional commit format
    • Write clear, descriptive commit messages

Pull Request Process

  1. Create a Pull Request with a clear description
  2. Ensure CI passes - all tests, linting, and formatting checks
  3. Code Review - address any feedback from maintainers
  4. Merge - once approved, your PR will be merged

Code Quality Tools

Before submitting a PR, run these commands:

# Format code
mix format

# Run linter
mix credo

# Run type checker
mix dialyzer

# Run tests with coverage
mix test --cover

Issues

License

This project is licensed under the MIT License - see the LICENSE file for details.

Support