TaskExtension
Higher-level Task combinators for Elixir — race, all_settled, bounded-concurrency map, and sequential fallback chains.
Why?
Elixir's Task module gives you async, await, yield_many, and async_stream — solid low-level primitives. But every non-trivial project ends up hand-rolling the same patterns on top:
- Race — hit 3 replicas, take the first response, cancel the rest
- All settled — fire N requests, collect all results (successes and failures) without crashing
- Bounded map — process 10K items with at most 20 concurrent workers, abort on first error
- Fallback chain — try cache, then DB, then API, stop at first hit
JavaScript has Promise.race / Promise.allSettled / Promise.any. Go has errgroup. Java has CompletableFuture.anyOf / allOf. TaskExtension brings these patterns to Elixir with zero dependencies and no processes to start.
Installation
Add task_extension to your list of dependencies in mix.exs:
def deps do
[
{:task_extension, "~> 0.1.0"}
]
endNo supervision tree entry needed — all functions are stateless.
Quick start
# Race — first success wins, cancel the rest
{:ok, :fast} = TaskExtension.race([
fn -> Process.sleep(100); :slow end,
fn -> :fast end
])
# All settled — run all, collect successes and failures
[{:ok, 1}, {:ok, 2}, {:error, _}] =
TaskExtension.all_settled([
fn -> 1 end,
fn -> 2 end,
fn -> raise "boom" end
])
# Map with bounded concurrency — cancel on first error
{:ok, [2, 4, 6]} = TaskExtension.map([1, 2, 3], fn x -> x * 2 end, max_concurrency: 2)
# Sequential fallback chain
{:ok, "from db"} = TaskExtension.first_ok([
fn -> {:error, :cache_miss} end,
fn -> {:ok, "from db"} end
])Real-world examples
Fastest replica wins
Hit multiple service replicas concurrently, use the first response:
def get_user(id) do
{:ok, user} = TaskExtension.race([
fn -> Repo.replica(:us_east).get!(User, id) end,
fn -> Repo.replica(:eu_west).get!(User, id) end,
fn -> Repo.replica(:ap_south).get!(User, id) end
], timeout: 5_000)
user
endFan-out with error collection
Send notifications to multiple channels, collect all results without crashing:
def notify_all(user, message) do
TaskExtension.all_settled([
fn -> Mailer.send(user.email, message) end,
fn -> SMS.send(user.phone, message) end,
fn -> Push.send(user.device_token, message) end
])
end
# => [{:ok, :sent}, {:error, :invalid_phone}, {:ok, :delivered}]Bounded batch processing
Process thousands of items with controlled concurrency — abort on first failure:
def process_uploads(files) do
case TaskExtension.map(files, &upload_to_s3/1, max_concurrency: 10) do
{:ok, urls} -> {:ok, urls}
{:error, reason} -> {:error, "Upload failed: #{inspect(reason)}"}
end
endCache / DB / API fallback chain
Try fast sources first, fall back to slower ones:
def get_config(key) do
TaskExtension.first_ok([
fn -> fetch_from_cache(key) end,
fn -> fetch_from_db(key) end,
fn -> fetch_from_api(key) end
])
endError handling
Task crashes never crash the caller. All functions use async_nolink internally, so raises, exits, and throws are captured as {:error, reason} tuples:
# race — crashes are skipped, next success wins
{:ok, :backup} = TaskExtension.race([
fn -> raise "primary down" end,
fn -> :backup end
])
# all_settled — crashes appear as {:error, reason} in results
[{:ok, 1}, {:error, {%RuntimeError{message: "boom"}, _stacktrace}}] =
TaskExtension.all_settled([fn -> 1 end, fn -> raise "boom" end])
# map — first crash cancels everything
{:error, {%RuntimeError{}, _}} =
TaskExtension.map([1, 2, 3], fn _ -> raise "boom" end)
# first_ok — crashes are skipped like any other failure
{:ok, :fallback} = TaskExtension.first_ok([
fn -> raise "boom" end,
fn -> :fallback end
])Return values
| Function | Success | All fail | Timeout | Empty input |
|---|---|---|---|---|
race/1,2 | {:ok, result} | {:error, :all_failed} | {:error, :timeout} | {:error, :empty} |
all_settled/1,2 | [{:ok, _}, ...] | [{:error, _}, ...] |
timed-out tasks get {:error, :timeout} | [] |
map/2,3 | {:ok, results} | {:error, reason} | {:error, :timeout} | {:ok, []} |
first_ok/1,2 | {:ok, result} | {:error, :all_failed} | {:error, :all_failed} | {:error, :empty} |
How it works
Each function call:
-
Starts a temporary
Task.Supervisor(no supervision tree entry needed) -
Spawns tasks with
async_nolink— task crashes are isolated from the caller -
Uses a
receiveloop to collect results as they arrive - Shuts down remaining tasks and the supervisor when done
caller ──race([f1, f2, f3])──► temp Task.Supervisor
│
spawn f1 ─── running ─── crash ✗
spawn f2 ─── running ─── {:ok, :fast} ✓ ◄── winner
spawn f3 ─── running ─── shutdown ✗
│
return {:ok, :fast}No GenServer, no ETS, no state between calls.
Comparison with other languages
| Pattern | Elixir (TaskExtension) | JavaScript | Go | Java |
|---|---|---|---|---|
| First success | race/1 | Promise.any() | — | CompletableFuture.anyOf() |
| All results | all_settled/1 | Promise.allSettled() | errgroup | CompletableFuture.allOf() |
| Bounded map | map/3 | p-map | errgroup + sem | — |
| Fallback chain | first_ok/1 | — | — | — |
License
MIT — see LICENSE.