TaskExtension

CIHex.pmDocs

Higher-level Task combinators for Elixir — race, all_settled, bounded-concurrency map, and sequential fallback chains.

Why?

Elixir's Task module gives you async, await, yield_many, and async_stream — solid low-level primitives. But every non-trivial project ends up hand-rolling the same patterns on top:

JavaScript has Promise.race / Promise.allSettled / Promise.any. Go has errgroup. Java has CompletableFuture.anyOf / allOf. TaskExtension brings these patterns to Elixir with zero dependencies and no processes to start.

Installation

Add task_extension to your list of dependencies in mix.exs:

def deps do
  [
    {:task_extension, "~> 0.1.0"}
  ]
end

No supervision tree entry needed — all functions are stateless.

Quick start

# Race — first success wins, cancel the rest
{:ok, :fast} = TaskExtension.race([
  fn -> Process.sleep(100); :slow end,
  fn -> :fast end
])

# All settled — run all, collect successes and failures
[{:ok, 1}, {:ok, 2}, {:error, _}] =
  TaskExtension.all_settled([
    fn -> 1 end,
    fn -> 2 end,
    fn -> raise "boom" end
  ])

# Map with bounded concurrency — cancel on first error
{:ok, [2, 4, 6]} = TaskExtension.map([1, 2, 3], fn x -> x * 2 end, max_concurrency: 2)

# Sequential fallback chain
{:ok, "from db"} = TaskExtension.first_ok([
  fn -> {:error, :cache_miss} end,
  fn -> {:ok, "from db"} end
])

Real-world examples

Fastest replica wins

Hit multiple service replicas concurrently, use the first response:

def get_user(id) do
  {:ok, user} = TaskExtension.race([
    fn -> Repo.replica(:us_east).get!(User, id) end,
    fn -> Repo.replica(:eu_west).get!(User, id) end,
    fn -> Repo.replica(:ap_south).get!(User, id) end
  ], timeout: 5_000)

  user
end

Fan-out with error collection

Send notifications to multiple channels, collect all results without crashing:

def notify_all(user, message) do
  TaskExtension.all_settled([
    fn -> Mailer.send(user.email, message) end,
    fn -> SMS.send(user.phone, message) end,
    fn -> Push.send(user.device_token, message) end
  ])
end
# => [{:ok, :sent}, {:error, :invalid_phone}, {:ok, :delivered}]

Bounded batch processing

Process thousands of items with controlled concurrency — abort on first failure:

def process_uploads(files) do
  case TaskExtension.map(files, &upload_to_s3/1, max_concurrency: 10) do
    {:ok, urls} -> {:ok, urls}
    {:error, reason} -> {:error, "Upload failed: #{inspect(reason)}"}
  end
end

Cache / DB / API fallback chain

Try fast sources first, fall back to slower ones:

def get_config(key) do
  TaskExtension.first_ok([
    fn -> fetch_from_cache(key) end,
    fn -> fetch_from_db(key) end,
    fn -> fetch_from_api(key) end
  ])
end

Error handling

Task crashes never crash the caller. All functions use async_nolink internally, so raises, exits, and throws are captured as {:error, reason} tuples:

# race — crashes are skipped, next success wins
{:ok, :backup} = TaskExtension.race([
  fn -> raise "primary down" end,
  fn -> :backup end
])

# all_settled — crashes appear as {:error, reason} in results
[{:ok, 1}, {:error, {%RuntimeError{message: "boom"}, _stacktrace}}] =
  TaskExtension.all_settled([fn -> 1 end, fn -> raise "boom" end])

# map — first crash cancels everything
{:error, {%RuntimeError{}, _}} =
  TaskExtension.map([1, 2, 3], fn _ -> raise "boom" end)

# first_ok — crashes are skipped like any other failure
{:ok, :fallback} = TaskExtension.first_ok([
  fn -> raise "boom" end,
  fn -> :fallback end
])

Return values

Function Success All fail Timeout Empty input
race/1,2{:ok, result}{:error, :all_failed}{:error, :timeout}{:error, :empty}
all_settled/1,2[{:ok, _}, ...][{:error, _}, ...] timed-out tasks get {:error, :timeout}[]
map/2,3{:ok, results}{:error, reason}{:error, :timeout}{:ok, []}
first_ok/1,2{:ok, result}{:error, :all_failed}{:error, :all_failed}{:error, :empty}

How it works

Each function call:

  1. Starts a temporary Task.Supervisor (no supervision tree entry needed)
  2. Spawns tasks with async_nolink — task crashes are isolated from the caller
  3. Uses a receive loop to collect results as they arrive
  4. Shuts down remaining tasks and the supervisor when done
caller ──race([f1, f2, f3])──► temp Task.Supervisor
                                    │
                               spawn f1 ─── running ─── crash ✗
                               spawn f2 ─── running ─── {:ok, :fast} ✓ ◄── winner
                               spawn f3 ─── running ─── shutdown ✗
                                    │
                               return {:ok, :fast}

No GenServer, no ETS, no state between calls.

Comparison with other languages

Pattern Elixir (TaskExtension) JavaScript Go Java
First success race/1Promise.any()CompletableFuture.anyOf()
All results all_settled/1Promise.allSettled()errgroupCompletableFuture.allOf()
Bounded map map/3 p-map errgroup + sem
Fallback chain first_ok/1

License

MIT — see LICENSE.