gaffer CI Status][ci-img]][ci] [![Hex.pm Version][hex-img]][hex] [![Docs][docs-img]][docs] [![Minimum Erlang Version][erlang-img] [ License

A reliable job queue implemented in Erlang.

Features

Road Map

Usage

Shell

For simple jobs, pass an anonymous function as a worker:

1> ok = gaffer:ensure_queue(#{
       name => greetings,
       driver => ets,
       worker => fun(#{payload := #{~"name" := Name}}) ->
           io:format(~"Hello, ~s!~n", [Name]),
           complete
       end
   }).
ok
2> gaffer:insert(greetings, #{~"name" => ~"world"}).
#{id => <<...>>, queue => greetings, state => available, ...}
Hello, world!

Application

Define a worker

Implement the gaffer_worker behaviour:

-module(email_sender).
-behaviour(gaffer_worker).
-export([perform/1]).

perform(#{payload := #{~"to" := To, ~"body" := Body}}) ->
    logger:info(~"Sending email to ~s: ~s", [To, Body]),
    complete.

The perform/1 callback can return:

Crashes are treated as failures and their reason recorded.

Create a queue

Driver = gaffer_driver_pgo:start(#{
    pool => my_pool,
    start => #{host => ~"localhost", database => ~"my_app", pool_size => 5}
}),
gaffer:ensure_queue(#{
    name => emails,
    driver => {gaffer_driver_pgo, Driver},
    worker => email_sender
}).

Insert a job

Job = gaffer:insert(emails, #{~"to" => ~"user@example.com", ~"body" => ~"Welcome!"}).

Job States

---
title: Job States
---
stateDiagram-v2
    [*] --> available
    available --> executing: polled

    executing --> available: schedule
    executing --> available: failure

    executing --> completed: complete

    executing --> failed: failure when Attempts >= Max

    executing --> cancelled: cancel
    available --> cancelled: cancel

    completed --> [*]
    failed --> [*]
    cancelled --> [*]

Chains

Jobs can be tagged with a chain to serialize their execution within a queue. Jobs sharing a chain value run in normal queue order (by priority, then by insert time) as if they were the only jobs in the queue, while other jobs in the queue keep executing concurrently.

Ordering is strict: an earlier job that is retried or rescheduled still blocks later jobs in its chain until it reaches a terminal state. Once a job is in a final state (completed, failed, or cancelled), the next job in the same chain becomes eligible to run.

Chains are scoped to a single queue. The same value used in two queues forms two independent chains, and forward does not carry the chain value into the destination queue (the value stays in the forwarded payload for inspection).

gaffer:insert(emails, #{~"to" => ~"a@example.com"}, #{chain => ~"user-42"}),
gaffer:insert(emails, #{~"to" => ~"b@example.com"}, #{chain => ~"user-42"}).

Configuration

Queues are configured via gaffer:queue_conf() maps:

Hooks

Gaffer notifies registered hooks after queue and job events. Each hook receives an event path (a list of atoms) and a payload map carrying an actor field that identifies which Gaffer process or public API call caused the event.

Hooks can be registered per queue via the hooks configuration option or globally via the gaffer application's hooks environment variable.

Hooks are also the recommended way to collect queue metrics: per-state counters, timestamps, throughput, and latency can all be derived from the job lifecycle events.

See gaffer_hooks for the full list of events and their payload shapes.

Changelog

See the Releases page.

Code of Conduct

Find this project's code of conduct in Contributor Covenant Code of Conduct.

Contributing

First of all, thank you for contributing with your time and energy.

If you want to request a new feature make sure to open an issue so we can discuss it first.

Bug reports and questions are also welcome, but do check you're using the latest version of the application - if you found a bug - and/or search the issue database - if you have a question, since it might have already been answered before.

Contributions will be subject to the MIT License. You will retain the copyright.

For more information check out CONTRIBUTING.md.

Security

This project's security policy is made explicit in SECURITY.md.

Conventions

Versions

This project adheres to Semantic Versioning.

License

This project uses the MIT License.