Tasque

Tasque Logo

Tasque is an asynchronous, bounded-concurrency task queue for Elixir. It lets you enqueue anonymous functions or MFA tuples, run them under a supervised Task.Supervisor, and receive results back via standard OTP messages.

It is a good fit when you need bounded parallelism, per-task timeouts, and OTP-friendly result delivery without introducing a separate job system.


Good use cases for this library include:

Why Tasque?

Quick Start

Add a queue to your supervision tree:

children = [
  {Tasque, name: MyApp.Queue, max_concurrency: 10}
]

Supervisor.start_link(children, strategy: :one_for_one)

Then enqueue work and await the result:

{:ok, ref} = Tasque.queue_task(MyApp.Queue, fn -> expensive_work() end)
{:ok, result} = Tasque.await(ref)

You can also consume results directly from the caller mailbox:

{:ok, ref} = Tasque.queue_task(MyApp.Queue, fn -> String.upcase("hello") end)

receive do
  {:tasque_result, ^ref, {:ok, result}} -> result
  {:tasque_result, ^ref, {:exit, reason}} -> {:error, reason}
end

Task Formats

Tasks can be provided as either:

Invalid task formats return {:error, :invalid_task}.

Result Format

Every task result is delivered to the calling process as:

{:tasque_result, ref, outcome}

Where outcome is one of:

Timeouts

Per-task timeouts start when a task is enqueued, so they include both queue wait time and execution time. If a timeout fires before dispatch, the task is dropped from the queue and the caller receives {:exit, :timeout}. If it fires while the task is running, the task process is terminated and the caller receives the same timeout result.

This is separate from Tasque.await/2, whose timeout only controls how long the caller waits for a result. An await/2 timeout does not cancel the task.

Naming

The queue :name supports the standard OTP naming forms:

Tasque derives matching companion names for its supervisor processes using the same naming strategy.

Examples

Different workloads often benefit from different concurrency limits. For example, CPU-bound work is usually best capped near the number of schedulers, while I/O-heavy work can often tolerate a higher limit:

children = [
  {Tasque, name: MyApp.CpuQueue, max_concurrency: System.schedulers_online()},
  {Tasque, name: MyApp.IoQueue, max_concurrency: 50}
]

You can then route work to the appropriate queue:

{:ok, image_ref} =
  Tasque.queue_task(MyApp.CpuQueue, fn -> render_thumbnail(image) end)

{:ok, api_ref} =
  Tasque.queue_task(MyApp.IoQueue, fn -> fetch_remote_profile(user_id) end, timeout: 5_000)

{:ok, thumbnail} = Tasque.await(image_ref)
{:ok, profile} = Tasque.await(api_ref)

MFA tasks work the same way:

{:ok, ref} = Tasque.queue_task(MyApp.IoQueue, {String, :upcase, ["hello"]})
{:ok, "HELLO"} = Tasque.await(ref)

Caveats

Installation

The package can be installed by adding tasque to your list of dependencies in mix.exs:

def deps do
  [
    {:tasque, "~> 1.0.0"}
  ]
end

Documentation can be found at https://hexdocs.pm/tasque.