BeexQueue
A Redis-backed job queue library for Elixir
BeexQueue is an Elixir port of the Node.js bee-queue library, designed to provide seamless interoperability with existing Node.js bee-queue systems. It uses Redis as the backend with the same key structure, JSON serialization, and atomic operations for full compatibility.
Key Features
- Job Queue Management: Create, enqueue, and process jobs with custom handlers
- Retry Support: Configurable retry counts with multiple backoff strategies
- Delayed Jobs: Schedule jobs to run at future times using Redis sorted sets
- Stall Detection: Automatic detection and re-enqueueing of stalled jobs
- Cross-Language Compatibility: Full interoperability with Node.js bee-queue
- Concurrent Processing: Support for multiple workers and concurrency control
- Backoff Strategies: Immediate, fixed, and exponential backoff support
- Event Publishing: Optional Redis pub/sub event publishing
- Job Progress Tracking: Built-in progress reporting and status management
- Comprehensive Testing: Unit, integration, and cross-language interop tests
Requirements
- Elixir ~> 1.15
- Erlang ~> 26.0
-
Redis (default:
redis://localhost:6379)
Installation
Add beexqueue to your list of dependencies in mix.exs:
def deps do
[
{:beexqueue, "~> 0.1.0"}
]
endThen run:
mix deps.getQuick Start
# Create a queue
queue = BeexQueue.new("my-queue")
# Create and enqueue a job
{:ok, job} = BeexQueue.create_job(queue, %{task: "process_data", data: [1, 2, 3]})
:ok = BeexQueue.save(job)
# Process jobs
BeexQueue.process(queue, fn(job) ->
IO.inspect(job.data)
:ok
end)Configuration
Basic Redis Configuration
Configure Redis connection in your config/config.exs:
config :beexqueue, redis_url: "redis://localhost:6379"Advanced Queue Configuration
BeexQueue supports extensive configuration options:
config :beexqueue,
redis_url: "redis://localhost:6379",
default_queue_settings: %{
# Redis connection settings
redis: %{host: "localhost", port: 6379},
# Queue behavior
prefix: "bq", # Redis key prefix
is_worker: true, # Whether to process jobs
stall_interval: 5000, # Stall check interval (ms)
near_term_window: 1_200_000, # Delayed job window (ms)
# Delayed job processing
activate_delayed_jobs: false, # Enable delayed job activation
delayed_debounce: 1000, # Debounce delay for delayed jobs
# Event publishing
send_events: true, # Publish job events to Redis
# Job storage
store_jobs: true, # Store job instances in memory
remove_on_success: false, # Remove jobs from Redis on success
remove_on_failure: false, # Remove jobs from Redis on failure
# Performance tuning
redis_scan_count: 100, # Batch size for Redis SCAN
initial_redis_failure_retry_delay: 1000,# Initial retry delay for Redis failures
# Connection management
auto_connect: true, # Auto-connect to Redis on creation
ensure_scripts: true # Cache Lua scripts in Redis
}Queue-Specific Configuration
Override settings when creating individual queues:
# Custom Redis connection
{:ok, queue} = BeexQueue.new("custom_queue",
redis: %{host: "redis.example.com", port: 6380, password: "secret"},
stall_interval: 10000,
send_events: false
)
# High-performance queue for time-sensitive jobs
{:ok, fast_queue} = BeexQueue.new("fast_queue",
stall_interval: 1000,
near_term_window: 300_000,
activate_delayed_jobs: true,
redis_scan_count: 500
)
# Memory-efficient queue for large volumes
{:ok, efficient_queue} = BeexQueue.new("efficient_queue",
store_jobs: false,
remove_on_success: true,
remove_on_failure: true
)Redis Key Structure
BeexQueue uses the same Redis key structure as bee-queue for interoperability:
bq:<queue_name>:id: Auto-incrementing job ID counterbq:<queue_name>:jobs: Hash mapping job ID to JSON serialized job databq:<queue_name>:waiting: List of pending job IDsbq:<queue_name>:active: List of active job IDsbq:<queue_name>:succeeded: Set of completed job IDsbq:<queue_name>:failed: Set of failed job IDsbq:<queue_name>:delayed: Sorted set for delayed jobs (score = Unix timestamp in ms)bq:<queue_name>:stalling:<id>: Expiring keys for stall detection
Usage Examples
Basic Job Processing
# Create a queue
{:ok, queue} = BeexQueue.new("email-queue")
# Create and save a job
{:ok, job} = BeexQueue.create_job(queue, %{to: "user@example.com", subject: "Hello"})
{:ok, saved_job} = BeexQueue.save(job)
# Process jobs with a handler function
BeexQueue.process(queue, fn(job) ->
# Your job logic here
IO.puts("Sending email to #{job.data["to"]}")
send_email(job.data)
:ok # Return :ok for success
end)Advanced Job Options
# Job with retries and timeout
{:ok, job} = BeexQueue.create_job(queue,
%{task: "process_payment", amount: 100},
%{
retries: 3,
timeout: 30000, # 30 seconds
backoff: %{strategy: "exponential", delay: 1000}
}
)
BeexQueue.save(job)Backoff Strategies
# Immediate retry (no delay)
{:ok, job} = BeexQueue.create_job(queue, data,
%{retries: 3, backoff: %{strategy: "immediate"}}
)
# Fixed delay between retries
{:ok, job} = BeexQueue.create_job(queue, data,
%{retries: 3, backoff: %{strategy: "fixed", delay: 5000}}
)
# Exponential backoff (delay doubles each retry)
{:ok, job} = BeexQueue.create_job(queue, data,
%{retries: 3, backoff: %{strategy: "exponential", delay: 1000}}
)Delayed Jobs
# Schedule job to run in 1 hour
delay_ms = 60 * 60 * 1000
{:ok, job} = BeexQueue.create_job(queue, job_data, %{delay: delay_ms})
BeexQueue.save(job)
# Schedule job for specific time
future_time = DateTime.utc_now() |> DateTime.add(2, :hour)
delay_ms = DateTime.diff(future_time, DateTime.utc_now(), :millisecond)
{:ok, job} = BeexQueue.create_job(queue, job_data, %{delay: delay_ms})
BeexQueue.save(job)Job Progress Tracking
BeexQueue.process(queue, fn(job) ->
# Report progress during long-running jobs
BeexQueue.Job.progress(job, 25)
do_step_1()
BeexQueue.Job.progress(job, 50)
do_step_2()
BeexQueue.Job.progress(job, 100)
:ok
end)Multiple Workers and Concurrency
# Process with 5 concurrent workers
BeexQueue.process(queue, handler_function, concurrency: 5)
# High-throughput processing
{:ok, high_volume_queue} = BeexQueue.new("high-volume",
stall_interval: 1000,
redis_scan_count: 200
)
BeexQueue.process(high_volume_queue, handler, concurrency: 10)Error Handling and Job Failures
BeexQueue.process(queue, fn(job) ->
case process_job(job.data) do
{:ok, result} ->
{:ok, result}
{:error, :temporary_failure} ->
# Job will be retried based on retry settings
{:error, "Temporary failure"}
{:error, :permanent_failure} ->
# Job will fail immediately
{:error, "Permanent failure"}
end
end)Job Status Monitoring
# Get all waiting jobs
{:ok, waiting_jobs} = BeexQueue.get_jobs(queue, "waiting")
# Get all active jobs
{:ok, active_jobs} = BeexQueue.get_jobs(queue, "active")
# Get specific job by ID
{:ok, job} = BeexQueue.get_job(queue, "job_123")
# Get queue statistics
stats = BeexQueue.stats(queue)
IO.inspect(stats) # %{waiting: 5, active: 2, succeeded: 100, failed: 3}Architecture
BeexQueue follows Elixir best practices with:
- Supervision: Uses
Task.Supervisorfor workers and periodic tasks - State Management: Queue state managed through structs (no global state)
- Error Handling: Returns
{:ok, value}or{:error, reason}tuples - Logging: Uses Elixir's Logger for debugging and monitoring
Development
Prerequisites
Ensure you have the following installed (matching our CI environment):
- Elixir 1.18.3 (CI uses 1.18.x)
- Erlang OTP 27.2 (CI uses 27.x)
- Node.js 18.20.4 (CI uses 18.x for interop tests)
- Docker (for Redis and other services)
We recommend using asdf for version management. A .tool-versions file is included in the repository.
# Install asdf if you haven't already
git clone https://github.com/asdf-vm/asdf.git ~/.asdf
# Install required versions
asdf installQuick Setup (CI-Compatible Environment)
For the fastest setup that exactly matches CI:
# Clone and enter the repository
git clone https://github.com/nathanpotter/beexqueue.git
cd beexqueue
# Run the automated setup script (includes all CI checks)
./bin/setupThis script will:
- ✅ Verify you have the correct tool versions
- ✅ Install all Elixir and Node.js dependencies
- ✅ Start Redis using Docker Compose
- ✅ Run all CI checks (format, credo, compile, dialyzer)
- ✅ Run the complete test suite
Manual Setup
If you prefer to set up manually:
Clone the repository
git clone https://github.com/nathanpotter/beexqueue.git cd beexqueueInstall Elixir dependencies
mix local.rebar --force mix local.hex --force mix deps.getInstall Node.js dependencies (for interop tests)
cd test/interop npm install cd ../..Start Redis
# Using Docker Compose (recommended - matches CI) docker compose up -d redis # Verify Redis is running docker exec beexqueue-redis-1 redis-cli pingVerify your setup
# Compile the project mix compile --warnings-as-errors # Run formatting check mix format --check-formatted # Run linting mix credo --strict # Run type checking mix dialyzer # Run tests mix test
Development Workflow
Use the development helper script for common tasks:
# Run all tests
./bin/dev test
# Run specific test types
./bin/dev unit # Unit tests only
./bin/dev integration # Integration tests only
./bin/dev interop # Interop tests only
# Code quality
./bin/dev format # Format code
./bin/dev lint # Run Credo
./bin/dev dialyzer # Run Dialyzer
# CI simulation
./bin/dev ci # Run full CI check locally
# Redis management
./bin/dev redis:start # Start Redis
./bin/dev redis:stop # Stop Redis
# Cleanup
./bin/dev clean # Clean build artifactsRunning CI Checks Locally
To ensure your changes pass CI before pushing:
# Run the exact same checks as CI
./bin/dev ci
# Or run individual checks
mix format --check-formatted
mix credo
mix compile --warnings-as-errors
mix dialyzer
mix testRedis Development
Monitor Redis keys during development:
# List all BeexQueue keys
redis-cli KEYS "bq:*"
# Monitor Redis commands in real-time
redis-cli MONITOR
# Inspect job data
redis-cli HGETALL "bq:my_queue:jobs:1"Testing
Running Tests
# Run all tests
mix test
# Run specific test files
mix test test/beexqueue/job_test.exs
mix test test/beexqueue/integration_test.exs
# Run doctests (examples in @doc attributes)
mix test --doctest
# Run tests with verbose output
mix test --verboseTest Types
Unit Tests (test/beexqueue/):
- Pure function testing with mocked Redis
- Job serialization/deserialization
- Configuration validation
- Backoff strategy calculations
Integration Tests (test/beexqueue/integration_test.exs):
- Full Redis integration
- Job lifecycle testing
- Queue operations
- Concurrency testing
Interop Tests (test/beexqueue/interop_test.exs):
- Node.js ↔ Elixir job compatibility
- Cross-language data preservation
- Bidirectional job processing
- JSON serialization compatibility
Interoperability Testing
BeexQueue includes comprehensive tests for Node.js bee-queue compatibility:
# Set up Node.js dependencies for interop tests
mix setup_interop
# Run interop tests
mix test --only interop
# Run interop tests with verbose output
mix test test/beexqueue/interop_test.exs --verboseInterop Test Scenarios:
- Node.js Producer → Elixir Consumer: Jobs created by bee-queue processed by BeexQueue
- Elixir Producer → Node.js Consumer: Jobs created by BeexQueue processed by bee-queue
- Bidirectional Flow: Both systems working together concurrently
- Data Integrity: Complex data structures, nested objects, arrays
- Error Handling: Retry logic and failure scenarios
Prerequisites for Interop Tests:
- Node.js 12+
- Redis running on localhost:6379
- Internet connection (for npm install)
See test/interop/README.md for detailed interop test documentation.
Code Quality
# Format code
mix format
# Check formatting (CI)
mix format --check-formatted
# Run linter
mix credo
# Run linter in strict mode
mix credo --strict
# Type checking
mix dialyzer
# Generate documentation
mix docsRedis Monitoring
Monitor Redis keys during development:
# List all BeexQueue keys
redis-cli KEYS "bq:*"
# Inspect jobs hash
redis-cli HGETALL "bq:my-queue:jobs"
# Check queue lengths
redis-cli LLEN "bq:my-queue:waiting"
redis-cli LLEN "bq:my-queue:active"
redis-cli SCARD "bq:my-queue:succeeded"
redis-cli SCARD "bq:my-queue:failed"Dependencies
Runtime Dependencies
redix ~> 1.5: Redis client for Elixirjason ~> 1.4: JSON serialization for job data
Development and Testing Dependencies
credo ~> 1.7: Code quality linter and style checkerdialyxir ~> 1.4: Static type analysis with Dialyzerex_doc ~> 0.34: Documentation generatormox ~> 1.1: Mocking framework for unit tests
Compatibility
BeexQueue is designed to be fully compatible with Node.js bee-queue:
- Same Redis key structure and operations
- Compatible JSON serialization of job data
- Matching atomic operations (BRPOPLPUSH, MULTI/EXEC)
- Cross-language job interoperability
- Shared retry logic and backoff strategies
- Identical stall detection mechanisms
Interoperability with Node.js bee-queue
BeexQueue provides seamless interoperability with the original Node.js bee-queue library, allowing you to:
- Migrate gradually between Node.js and Elixir systems
- Share job queues between different parts of your application
- Process jobs created by either system with either worker
- Maintain data integrity across language boundaries
Interop Features
- Bidirectional Job Processing: Jobs created in Node.js can be processed by Elixir workers and vice versa
- Data Structure Preservation: Complex nested objects, arrays, and primitive types are maintained
- JSON Compatibility: Uses identical JSON serialization format
- Redis Key Compatibility: Same key structure and atomic operations
- Job Options Compatibility: Retry counts, timeouts, and backoff strategies work identically
Setting Up Interop Tests
# Install Node.js dependencies for bee-queue
mix setup_interop
# Verify setup
cd test/interop && npm listRunning Interop Tests
# Run all interop tests
mix test --only interop
# Run with detailed output
mix test test/beexqueue/interop_test.exs --verboseInterop Test Scenarios
The interop tests cover comprehensive compatibility scenarios:
Node.js Producer → Elixir Consumer
// Node.js producer
const Queue = require("bee-queue");
const queue = new Queue("test-queue");
const job = queue.createJob({ message: "from Node.js" });
await job.save();# Elixir consumer
{:ok, queue} = BeexQueue.new("test-queue")
BeexQueue.process(queue, fn(job) ->
IO.puts("Processing: #{job.data["message"]}")
:ok
end)Elixir Producer → Node.js Consumer
# Elixir producer
{:ok, queue} = BeexQueue.new("test-queue")
{:ok, job} = BeexQueue.create_job(queue, %{message: "from Elixir"})
BeexQueue.save(job)// Node.js consumer
const Queue = require("bee-queue");
const queue = new Queue("test-queue");
queue.process((job, done) => {
console.log(`Processing: ${job.data.message}`);
done();
});Data Structure Compatibility
Both systems preserve complex data structures:
// Node.js
job.data = {
nested: { array: [1, 2, 3], boolean: true, null: null },
timestamp: new Date().toISOString(),
};# Elixir
job_data = %{
"nested" => %{
"array" => [1, 2, 3],
"boolean" => true,
"null" => nil
},
"timestamp" => DateTime.utc_now() |> DateTime.to_iso8601()
}Production Interop Setup
For production interoperability:
- Use the same Redis instance for both Node.js and Elixir applications
- Configure identical queue names across systems
- Set compatible job options (retries, timeouts, backoff)
- Monitor both systems for job processing statistics
- Test data serialization thoroughly before deployment
Troubleshooting Interop Issues
- Version Compatibility: Ensure bee-queue versions are compatible
- Redis Connection: Verify both systems connect to the same Redis instance
- Data Types: Check that data types are serializable in both languages
- Job Options: Validate that retry and backoff configurations match
- Network Latency: Account for network delays in distributed setups
See test/interop/README.md for detailed documentation and troubleshooting guides.
Contributing
We welcome contributions! Please follow these guidelines:
Development Workflow
Fork the repository and create a feature branch
Follow coding standards:
-
Use
mix formatto format your code -
Run
mix credoto check code quality - Add tests for new features
- Ensure compatibility with Node.js bee-queue
-
Use
Testing:
- Write comprehensive tests
- Aim for high test coverage
- Test both success and error scenarios
Documentation:
-
Add
@docand@moduledocfor public APIs - Update README if needed
- Include examples in documentation
-
Add
Commits:
- Use conventional commit format
- Write clear, descriptive commit messages
Pull Request Process
- Create a Pull Request with a clear description
- Ensure CI passes - all tests, linting, and formatting checks
- Code Review - address any feedback from maintainers
- Merge - once approved, your PR will be merged
Code Quality Tools
Before submitting a PR, run these commands:
# Format code
mix format
# Run linter
mix credo
# Run type checker
mix dialyzer
# Run tests with coverage
mix test --coverIssues
- Bug reports: Use the issue template with detailed reproduction steps
- Feature requests: Describe the use case and proposed API
- Questions: Check existing issues and discussions first
License
This project is licensed under the MIT License - see the LICENSE file for details.
Support
- Documentation: HexDocs
- Issues: GitHub Issues
- Discussions: GitHub Discussions