Honeydew 💪🏻🍈

Honeydew ("Honey, do!") is a pluggable job queue + worker pool for Elixir.

Honeydew attempts to provide "at least once" job execution, it's possible that circumstances could conspire to execute a job, and prevent Honeydew from reporting that success back to the queue. I encourage you to write your jobs idempotently.

Honeydew isn't intended as a simple resource pool, the user's code isn't executed in the requesting process. Though you may use it as such, there are likely other alternatives that would fit your situation better.

tl;dr

Queue Feature Support

filter status cancel in-memory disk-backed
ErlangQueue (:queue) ✅* ✅*
Mnesia ✅* ✅* ✅ (ets) ✅ (dets)

Getting Started

In your mix.exs file:

defp deps do
  [{:honeydew, "~> 1.0.0-rc3"}]
end

You can run honeydew on a single node, or with components distributed over a cluster.

Local Queue Example

local queue

There's an uncaring firehose of data pointed at us, we need to store it all in our database, Riak. The requester isn't expecting a response, and we can't drop a write due to overloaded workers.

Let's create a worker module. Honeydew will call our worker's init/1 and keep the state from an {:ok, state} return.

Our workers are going to call functions from our module, the last argument will be the worker's state, riak in this case.

defmodule Riak do
  @moduledoc """
    This is an example Worker to interface with Riak.
    You'll need to add the erlang riak driver to your mix.exs:
    `{:riakc, ">= 2.4.1}`
  """

  def init([ip, port]) do
    :riakc_pb_socket.start_link(ip, port) # returns {:ok, riak}
  end

  def up?(riak) do
    :riakc_pb_socket.ping(riak) == :pong
  end

  def put(bucket, key, obj, content_type, riak) do
    :ok = :riakc_pb_socket.put(riak, :riakc_obj.new(bucket, key, obj, content_type))
  end

  def get(bucket, key, riak) do
    case :riakc_pb_socket.get(riak, bucket, key) do
      {:ok, obj} -> :riakc_obj.get_value(obj)
      {:error, :notfound} -> nil
      error -> error
    end
  end
end

Then we'll start the both a queue and workers in our supervision tree.

defmodule App do
  def start do
    children = [
      Honeydew.queue_spec(:riak),
      Honeydew.worker_spec(:riak, {Riak, ['127.0.0.1', 8087]}, num: 5, init_retry_secs: 10)
    ]

    Supervisor.start_link(children, strategy: :one_for_one)
  end
end

A task is simply a tuple with the name of a function and arguments, or a fn.

We'll add tasks to the queue using async/3 and wait for responses with yield/2. To tell Honeydew that we expect a response from the job, we'll specify reply: true, like so:

iex(1)> :up? |> Honeydew.async(:riak, reply: true) |> Honeydew.yield
{:ok, true}

iex(2)> {:put, ["bucket", "key", "value", "text/plain"]} |> Honeydew.async(:riak)
%Honeydew.Job{by: nil, failure_private: nil, from: nil, monitor: nil,
 private: -576460752303422557, queue: :riak, result: nil,
 task: {:put, ["bucket", "key", "value", "text/plain"]}}

iex(3)> {:get, ["bucket", "key"]} |> Honeydew.async(:riak, reply: true) |> Honeydew.yield
{:ok, "value"}

# our worker is holding a riak connection (a pid) as its state, let's ask that pid what its state is.
iex(4)> fn riak -> {riak, :sys.get_state(riak)} end |> Honeydew.async(:riak, reply: true) |> Honeydew.yield
{:ok,
 {#PID<0.256.0>,
  {:state, &#39;127.0.0.1&#39;, 8087, false, false, #Port<0.8365>, false, :gen_tcp,
   :undefined, {[], []}, 1, [], :infinity, :undefined, :undefined, :undefined,
   :undefined, [], 100}}}

If you pass reply: true, and you never call yield/2 to read the result, your process' mailbox may fill up after multiple calls. Don't do that.

(Ignoring the response of the :put above is just used as an exmaple, you probably want to check the return value of a database insert unless you have good reason to ignore it)

The Honeydew.Job struct above is used to track the status of a job, you can send it to cancel/1, if you want to try to kill the job.

Distributed Queue Example

distributed queue

Say we've got some pretty heavy tasks that we want to distribute over a farm of background job processing nodes, they're too heavy to process on our client-facing nodes. In a distributed Erlang scenario, you have the option of distributing Honeydew's various components around different nodes in your cluster. Honeydew is basically a simple collection of queue processes and worker processes. Honeydew detects when nodes go up and down, and reconnects workers.

To start a global queue, pass a {:global, name} tuple when you start Honeydew's components

In this example, we'll use the Mnesa queue with stateless workers.

We'll start the queue on node queue@dax with:

defmodule QueueApp do
  def start do
    nodes = [node()]

    children = [
      Honeydew.queue_spec({:global, :my_queue}, queue: {Honeydew.Queue.Mnesia, [nodes, [disc_copies: nodes], []]})
    ]

    Supervisor.start_link(children, strategy: :one_for_one)
  end
end

iex(queue@dax)1> QueueApp.start
{:ok, #PID<0.209.0>}

And we'll run our workers on background@dax with:

defmodule HeavyTask do
  def work_really_hard(secs) do
    :timer.sleep(1_000 * secs)
    IO.puts "I worked really hard for #{secs} secs!"
  end
end

defmodule WorkerApp do
  def start do
    children = [
      Honeydew.worker_spec({:global, :my_queue}, HeavyTask, num: 10)
    ]

    Supervisor.start_link(children, strategy: :one_for_one)
  end
end

iex(background@dax)1> Node.ping :queue@dax
:pong

iex(background@dax)2> WorkerApp.start
{:ok, #PID<0.205.0>}

(note that in this case, our worker is stateless, so we left out init/1)

You can connect the nodes together at any point in the process, Honeydew will automatically detect where its components are running.

Then on any node in the cluster, we can enqueue a job:

iex(clientfacing@dax)1> Node.ping :queue@dax
:pong

iex(clientfacing@dax)2> {:work_really_hard, [5]} |> Honeydew.async({:global, :my_queue})
%Honeydew.Job{by: nil, failure_private: nil, from: nil, monitor: nil,
 private: {false, -576460752303423485}, queue: {:global, :my_queue},
 result: nil, task: {:work_really_hard, [5]}}
````

The job will run on the worker node, five seconds later it&#39;ll print `I worked really hard for 5 secs!`


There&#39;s one important caveat that you should note, Honeydew doesn&#39;t yet support OTP failover/takeover, so please be careful in production. I&#39;ll send you three emoji of your choice if you submit a PR. :)

### Suspend and Resume
You can suspend a queue (halt the distribution of new jobs to workers), by calling `Honeydew.suspend(:my_queue)`, then resume with `Honeydew.resume(:my_queue)`.

### Cancelling Jobs
To cancel a job that hasn&#39;t yet run, use `Honeydew.cancel/2`. If the job was successfully cancelled before execution, `:ok` will be returned. If the job wasn&#39;t present in the queue, `nil`. If the job is currently being executed, `{:error, :in_progress}`.

### Queue Options
There are various options you can pass to `queue_spec/2` and `worker_spec/3`, see the [Honeydew](https://github.com/koudelka/honeydew/blob/master/lib/honeydew.ex) module.

### Failure Modes
When a worker crashes, a monitoring process runs the `handle_failure/4` function from the selected module on the queue&#39;s node. Honeydew ships with two failure modes, at present:

- `Honeydew.FailureMode.Abandon`: Simply forgets about the job.
- `Honeydew.FailureMode.Requeue`: Removes the job from the original queue, and places it on another.

See `Honeydew.queue_spec/2` to select a failure mode.

## The Dungeon

### Job Lifecycle
In general, a job goes through the following stages: