gaffer
[ 
A reliable job queue implemented in Erlang.
Features
- Priority-based execution
- Per-queue concurrency limits (local and global)
- Pluggable storage drivers (ETS for dev/test, Postgres for production)
- Lifecycle hooks (pre/post on queue and job events)
-
Dead-letter queues (
on_discard) - Queue introspection and automatic/manual job pruning
- Delayed job scheduling
- Automatic retries with backoff
- Drain and flush (graceful shutdown)
- Job execution timeouts
- Worker shutdown timeouts
Usage
Shell
For simple jobs, pass an anonymous function as a worker:
1> ok = gaffer:ensure_queue(#{
name => greetings,
driver => ets,
worker => fun(#{payload := #{~"name" := Name}}) ->
io:format(~"Hello, ~s!~n", [Name]),
complete
end
}).
ok
2> gaffer:insert(greetings, #{~"name" => ~"world"}).
#{id => <<...>>, queue => greetings, state => available, ...}
Hello, world!Application
Define a worker
Implement the gaffer_worker behaviour:
-module(email_sender).
-behaviour(gaffer_worker).
-export([perform/1]).
perform(#{payload := #{~"to" := To, ~"body" := Body}}) ->
logger:info(~"Sending email to ~s: ~s", [To, Body]),
complete.
The perform/1 callback can return:
complete- mark the job as completed{complete, Result}- complete with a result{fail, Reason}- fail and retry (up tomax_attempts){cancel, Reason}- cancel the job permanently{schedule, Timestamp}- reschedule the job for later
Crashes are treated as failures and their reason recorded.
Create a queue
Driver = gaffer_driver_pgo:start(#{
pool => my_pool,
start => #{host => ~"localhost", database => ~"my_app", pool_size => 5}
}),
gaffer:ensure_queue(#{
name => emails,
driver => {gaffer_driver_pgo, Driver},
worker => email_sender
}).Insert a job
Job = gaffer:insert(emails, #{~"to" => ~"user@example.com", ~"body" => ~"Welcome!"}).Configuration
Queues are configured via gaffer:queue_conf() maps:
name(atom(), required)Queue identifier.
worker(module() | fun/1, required)Worker callback module or function.
driver({module(), state()})Storage driver.
max_workers(pos_integer(), default =1)Max concurrent workers per node.
global_max_workers(pos_integer() | infinity, default =infinity)Max concurrent workers across all nodes.
poll_interval(pos_integer() | infinity, default =1000).Polling interval in ms.
max_attempts(pos_integer(), default =3).Max execution attempts.
timeout(pos_integer(), default =30000).Execution timeout in ms.
backoff([non_neg_integer()], default =[1000]).Retry backoff schedule in ms.
priority(integer(), default =0).Default job priority. Can be negative. Jobs with higher values are claimed first.
shutdown_timeout(pos_integer(), default =5000).Worker shutdown grace period in ms.
on_discard(atom()).Dead-letter queue name.
hooks([hook()], default =[]).Lifecycle hook modules or funs.
prune(prune_conf())Pruning configuration. A per-queue pruner periodically deletes jobs in terminal states older than the configured max age.
interval(pos_integer() | infinity)Prune interval in ms.
max_age(#{job_state() | '_' => age()})Per-state max age in milliseconds.
infinitymeans never prune.'_'sets a default for all states.Default:
completed,discarded, andcancelledjobs are pruned immediately, others are kept indefinitely.
Changelog
See the Releases page.
Code of Conduct
Find this project's code of conduct in Contributor Covenant Code of Conduct.
Contributing
First of all, thank you for contributing with your time and energy.
If you want to request a new feature make sure to open an issue so we can discuss it first.
Bug reports and questions are also welcome, but do check you're using the latest version of the application - if you found a bug - and/or search the issue database - if you have a question, since it might have already been answered before.
Contributions will be subject to the MIT License. You will retain the copyright.
For more information check out CONTRIBUTING.md.
Security
This project's security policy is made explicit in SECURITY.md.
Conventions
Versions
This project adheres to Semantic Versioning.
License
This project uses the MIT License.