Arrea

Async process orchestrator and telemetry for Elixir.

Arrea is an OTP-based library that provides parallel process execution, worker management, circuit breaker protection, command validation, and built-in telemetry for monitoring your Elixir applications.

Quick Start

Add Arrea to your mix.exs:

def deps do
[
{:arrea, "~> 0.1.0"}
]
end

Arrea starts its supervision tree automatically when added as a dependency. No manual setup required.

Execute a single command

# Shell command
{:ok, result} = Arrea.execute("echo hello")
# Or a function
{:ok, result} = Arrea.execute(fn -> :work end)

Run commands in parallel

{:ok, result} = Arrea.run(
[
fn -> Process.sleep(100); 1 end,
fn -> Process.sleep(100); 2 end,
fn -> Process.sleep(100); 3 end
],
workers: 2
)

Subscribe to events

:ok = Arrea.subscribe()
receive do
{:leader_event, %{type: :finished, worker_id: id}} ->
IO.puts("Worker #{id} finished")
{:leader_event, event} ->
IO.inspect(event, label: "Event")
end
:ok = Arrea.unsubscribe()

Get stats

{:ok, stats} = Arrea.stats()
# => %{
# total_workers: 10,
# active_workers: 3,
# completed_tasks: 42,
# failed_tasks: 2
# }

Features

CLI

Arrea includes a command-line interface built with Alaja:

# Build the escript
mix escript.build
# Run locally
./arrea run --command "echo hello"
# Install to ~/bin
mix install

arrea run

Execute shell commands in parallel with progress tracking.

# Single command
arrea run --command "echo hello"
# Multiple commands (parallel)
arrea run --command "echo a" --command "echo b"
# With worker limit
arrea run --command "sleep 1" --command "sleep 2" --parallel 2
# Custom timeout (ms)
arrea run --command "sleep 10" --timeout 5000
# Quiet mode (suppress progress)
arrea run --command "echo done" --quiet
# Custom shell
arrea run --command "echo $0" --shell zsh
# With ASDF version
arrea run --command "mix test" --asdf-elixir 1.18.0
# With mise version
arrea run --command "node -v" --mise-node 20.0.0

arrea config

Manage Arrea engine configuration at runtime.

# Show all config
arrea config --show
# Get a value
arrea config get max_workers
# Set a value
arrea config set max_workers 200
arrea config set default_policy stop
arrea config set asdf_enabled true
arrea config set log_level debug

arrea action

Execute Arrea commands from JSON input (stdin, file, or inline).

# From stdin
echo '{"command":"run","args":["--command","echo hello"]}' | arrea action
# From file
arrea action --file ./pipeline.json
# Inline JSON
arrea action --data '{"command":"run","args":["--command","echo hi","--quiet"]}'
# Batch actions
arrea action --data '{
"actions": [
{"command": "run", "args": ["--command", "echo first"], "order": 0},
{"command": "run", "args": ["--command", "echo second", "--quiet"], "order": 1}
]
}'

Architecture

┌─────────────────────────────────────────────────────┐
Arrea (Facade) │
└───────────────────────┬─────────────────────────────┘
┌───────────────────────▼─────────────────────────────┐
│ Arrea.Leader (GenServer) │
│ Coordinates execution, manages workers, │
│ broadcasts {:leader_event, event} to subscribers
└───────────────────────┬─────────────────────────────┘
┌───────────────────────▼─────────────────────────────┐
Arrea.WorkerSupervisor (DynamicSupervisor)
Spawns ephemeral workers
└───────────────────────┬─────────────────────────────┘
┌───────────────────────▼─────────────────────────────┐
Arrea.Worker (GenServer)
Executes individual tasks, handles policies,
reports progress via Leader
└─────────────────────────────────────────────────────┘
Arrea.Monitor (GenServer) Tracks worker lifecycle, provides stats
Arrea.CircuitBreaker Fault tolerance for external dependencies

All processes run under Arrea.Supervisor with :rest_for_one strategy, using two Registries (Arrea.Registry for workers, Arrea.CircuitBreaker.Registry for circuit breakers). With :rest_for_one, only the processes that depend on a failed process are restarted, minimizing disruption to active batches.

API

Arrea.execute/2

Execute a single command (binary shell command or zero-arity function).

@spec execute(binary() | (-> term()), keyword()) ::
{:ok, Arrea.Result.t()} | {:error, Arrea.Error.t()}

Options:

Arrea.run/2

Execute multiple commands in parallel.

@spec run([binary() | (-> term())], keyword()) ::
{:ok, Arrea.Result.t()} | {:error, Arrea.Error.t()}

Options:

Arrea.subscribe/0 / Arrea.unsubscribe/0

Subscribe (or unsubscribe) the calling process to Leader events.

Messages received are {:leader_event, event} where event is a map with at least a :type key:

TypeExtra keys
:worker_startedworker_id
:progressworker_id, percent, task_index, total
:finishedworker_id
:errorworker_id, reason
:resultworker_id, data
@spec subscribe() :: :ok
@spec unsubscribe() :: :ok

Arrea.stats/0

Get current engine statistics (provided by Arrea.Monitor).

@spec stats() :: {:ok, map()} | {:error, :monitor_unavailable}

Arrea.max_workers/0

Get the configured max workers.

@spec max_workers() :: non_neg_integer()

Configuration

Priority (lowest to highest)

For use as a library:

  1. @default in Arrea.Config — compile-time baseline
  2. config :arrea, :engine, [...] in the consuming project's config.exs — overrides defaults
  3. Arrea.Config.set/2 at runtime — overrides static config for the current session
  4. Opts passed directly to functions — highest priority, per-call

For use as a CLI binary:

  1. @default baseline
  2. Application env (from config.exs if applicable)
  3. arrea config set KEY VALUE — session-level, persists while the binary process is running
  4. CLI args — highest priority, per-invocation only

config.exs example

Accepts both keyword list and map:

config :arrea, :engine,
max_workers: 100,
max_commands_per_batch: 500,
default_policy: :retry,
max_retries: 3,
retry_delay: 1_000,
restart_limit: 3,
circuit_breaker_threshold: 5,
circuit_breaker_timeout: 60_000,
validation_rules: [:no_rm_rf, :no_sudo, :no_dd, :no_mkfs, :no_fork_bomb],
telemetry_enabled: true,
log_level: :info
KeyTypeDefaultDescription
max_workersinteger100Maximum parallel workers
max_commands_per_batchinteger500Max commands per batch
default_policyatom:retryDefault error policy for workers
max_retriesinteger3Max retry attempts
retry_delayinteger1_000Delay between retries (ms)
restart_limitinteger3Worker restart limit
circuit_breaker_thresholdinteger5Failures before circuit opens
circuit_breaker_timeoutinteger60_000Time before half-open attempt (ms)
validation_ruleslistsee belowBlocked command patterns
asdf_enabledbooleantrueEnable ASDF version management
telemetry_enabledbooleantrueEnable telemetry
log_levelatom:infoLog verbosity
shellstringnilDefault shell (e.g. "/bin/zsh")

Validation rules (default):

Runtime config

Arrea.Config.get(:max_workers) # => 100
Arrea.Config.set(:max_workers, 50) # persists for the current VM session
Arrea.Config.all() # => full effective config map

Telemetry Events

Arrea emits the following :telemetry events:

Worker events

EventMeasurementsMetadata
[:arrea, :worker, :started]worker_id
[:arrea, :worker, :completed]durationworker_id
[:arrea, :worker, :error]worker_id, reason
[:arrea, :worker, :message]worker_id

Task events

EventMeasurementsMetadata
[:arrea, :task, :started]
[:arrea, :task, :completed]duration
[:arrea, :task, :error]worker_id, reason

Engine events

EventMeasurementsMetadata
[:arrea, :engine, :execute, :start]command
[:arrea, :engine, :execute, :stop]durationcommand, success
[:arrea, :engine, :execute, :error]durationcommand, reason
[:arrea, :engine, :run, :start]count, workers
[:arrea, :engine, :run, :stop]batch_id

Circuit breaker events

EventMeasurementsMetadata
[:arrea, :circuit_breaker, :open]breaker_id
[:arrea, :circuit_breaker, :closed]breaker_id
[:arrea, :circuit_breaker, :trip]breaker_id, failure_count

Communication events

EventMeasurementsMetadata
[:arrea, :communication, :message_sent]
[:arrea, :communication, :message_received]
[:arrea, :communication, :error]
[:arrea, :communication, :retry]

UI events (CLI / alaja components)

EventMeasurementsMetadata
[:arrea, :ui, :render]
[:arrea, :ui, :keypress]
[:arrea, :ui, :focus_change]

Validation / Execution / System events

EventMeasurementsMetadata
[:arrea, :validation, :passed]
[:arrea, :validation, :failed]
[:arrea, :execution, :started]
[:arrea, :execution, :completed]
[:arrea, :execution, :failed]
[:arrea, :system, :started]
[:arrea, :system, :stopped]

Attaching a custom handler

:telemetry.attach(
"my-handler",
[:arrea, :worker, :completed],
fn _event, measurements, metadata, _config ->
IO.puts("Worker #{metadata.worker_id} finished in #{measurements.duration}ms")
end,
nil
)

Built-in metrics and debug

# Setup built-in ETS metrics (worker/task/circuit breaker counters)
Arrea.Telemetry.setup()
# Get current metrics snapshot
Arrea.Telemetry.get_current()
# Attach debug handler for development
Arrea.Telemetry.attach()
# Measure a function with telemetry
Arrea.Telemetry.measure(fn -> do_work() end, metadata: %{tag: "batch-1"})

Policies

Arrea provides configurable error policies for workers:

# Default policy (retry 3 times with 1s delay)
policy = Arrea.Policies.default()
# Strict policy (stop on first error)
policy = Arrea.Policies.strict()
# Tolerant policy (retry up to 10 times with 2s delay)
policy = Arrea.Policies.tolerant(max_retries: 10, retry_delay: 2000)
# Custom handler
policy = Arrea.Policies.custom(fn error, retry_count, context ->
if retry_count < 5, do: :retry, else: :stop
end)

Workers without an explicit policy fall back to Arrea.Config.get(:default_policy), which defaults to :retry.

Policy maps support the following fields:

FieldTypeDefaultDescription
on_error:retry | :stop | :continue | function:retryAction on task error
on_warning:log | :notify | :continue | :promote_to_error:logAction on warning
on_timeout:retry | :stop | :continue:retryAction on timeout
max_retriesinteger3Maximum retry attempts
retry_delayinteger1000Delay between retries (ms)
timeoutinteger30000Per-task timeout (ms)

Command Validation

Arrea validates all shell commands before execution, blocking dangerous patterns:

iex> Arrea.Validation.Validator.validate_command("echo hello")
{:ok, "echo hello"}
iex> Arrea.Validation.Validator.validate_command("rm -rf /")
{:error, {:dangerous_command, "rm -rf"}}
iex> Arrea.Validation.Validator.validate_command("$(whoami)")
{:error, :possible_injection}

Inter-worker Messaging

Workers can send messages to each other:

# Structured message
Arrea.Worker.send_message(:worker_1, %{type: :ping})
# Route a message to another worker
Arrea.Worker.send_message(:worker_1, {:send_to_worker, :worker_2, %{type: :data, value: 42}})

Dependencies

Installation

Add arrea to your mix.exs dependencies:

def deps do
[
{:arrea, "~> 0.1.0"}
]
end

Then run:

mix deps.get

License

MIT License. See the source repository for details.