OPQ: One Pooled Queue

Build StatusCodeBeatCoverageHex VersionHex DocsTotal DownloadLicenseLast Updated

Elixir Queue!

A simple, in-memory queue with worker pooling and rate limiting in Elixir. OPQ leverages Erlang’s queue module and Elixir’s GenStage.

Originally built to support Crawler.

Features

See Hex documentation.

Installation

def deps do
  [{:opq, "~> 4.0"}]
end

Usage

A simple example:

{:ok, opq} = OPQ.init()

OPQ.enqueue(opq, fn -> IO.inspect("hello") end)
OPQ.enqueue(opq, fn -> IO.inspect("world") end)

Specify module, function and arguments:

{:ok, opq} = OPQ.init()

OPQ.enqueue(opq, IO, :inspect, ["hello"])
OPQ.enqueue(opq, IO, :inspect, ["world"])

Specify a custom name for the queue:

OPQ.init(name: :items)

OPQ.enqueue(:items, fn -> IO.inspect("hello") end)
OPQ.enqueue(:items, fn -> IO.inspect("world") end)

Start as part of a supervision tree:

Note, when starting as part of a supervision tree, the :name option must be provided.

children = [
  {OPQ, name: :items}
]

Specify a custom worker to process items in the queue:

defmodule CustomWorker do
  def start_link(item) do
    Task.start_link(fn ->
      Agent.update(:bucket, &[item | &1])
    end)
  end
end

Agent.start_link(fn -> [] end, name: :bucket)

{:ok, opq} = OPQ.init(worker: CustomWorker)

OPQ.enqueue(opq, "hello")
OPQ.enqueue(opq, "world")

Agent.get(:bucket, & &1) # => ["world", "hello"]

Rate limit:

{:ok, opq} = OPQ.init(workers: 1, interval: 1000)

Task.async(fn ->
  OPQ.enqueue(opq, fn -> IO.inspect("hello") end)
  OPQ.enqueue(opq, fn -> IO.inspect("world") end)
end)

If no interval is supplied, the ratelimiter will be bypassed.

Check the queue and number of available workers:

{:ok, opq} = OPQ.init()

OPQ.enqueue(opq, fn -> Process.sleep(1000) end)

{status, queue, available_workers} = OPQ.info(opq) # => {:normal, #OPQ.Queue<[]>, 9}

Process.sleep(1200)

{status, queue, available_workers} = OPQ.info(opq) # => {:normal, #OPQ.Queue<[]>, 10}

If you just need to get the queue itself:

OPQ.queue(opq) # => #OPQ.Queue<[]>

Queue

OPQ implements Enumerable, so you can perform enumerable functions on the queue:

{:ok, opq} = OPQ.init()

queue = OPQ.queue(opq)

Enum.count(queue) # => 0
Enum.empty?(queue) # => true

Stop the queue:

{:ok, opq} = OPQ.init()

OPQ.enqueue(opq, fn -> IO.inspect("hello") end)
OPQ.stop(opq)
OPQ.enqueue(opq, fn -> IO.inspect("world") end) # => (EXIT) no process...

Pause and resume the queue:

{:ok, opq} = OPQ.init()

OPQ.enqueue(opq, fn -> IO.inspect("hello") end) # => "hello"
OPQ.pause(opq)
OPQ.info(opq) # => {:paused, {[], []}, 10}
OPQ.enqueue(opq, fn -> IO.inspect("world") end)
OPQ.resume(opq) # => "world"
OPQ.info(opq) # => {:normal, {[], []}, 10}

Configurations

Option Type Default Value Description

| :name | atom/module | pid | The name of the queue. | :worker | module | OPQ.Worker | The worker that processes each item from the queue. | :workers | integer | 10 | Maximum number of workers. | :interval | integer | 0 | Rate limit control - number of milliseconds before asking for more items to process, defaults to 0 which is effectively no rate limit. | :timeout | integer | 5000 | Number of milliseconds allowed to perform the work, it should always be set to higher than :interval.

Changelog

Please see CHANGELOG.md.

License

Licensed under MIT.