πΈ Phlox
A graph-based orchestration engine for Elixir β built for AI agent pipelines, general enough for anything.
Phlox is an Elixir port of PocketFlow by The Pocket, redesigned around Elixir idioms: explicit data threading,
Behaviour-based nodes, and a pure orchestration loop built from the ground up for OTP supervision.
Current LLM frameworks are bloated.
You only need one clean abstraction: a graph.Phlox gives you that graph β plus OTP supervision, composable middleware, persistent checkpoints with rewind, typed shared state, node-declared interceptors, and swappable LLM providers β in a library that fits comfortably in your head.
Table of Contents
- Why Phlox?
- Installation
- Core Concept: The Graph
- Your First Flow
- Node Lifecycle In Depth
- Routing and Branching
- Retry Logic
- Batch Processing
- Fan-Out: Mid-Flow Sub-Flows
- The DSL
- Middleware
- Checkpoints and Resume
- Typed Shared State
- Interceptors
- OTP: Supervised Flows
- Real-Time Monitoring
- Adapters: Phoenix and Datastar
- LLM Providers
- Telemetry
- Design Patterns
- Testing Your Nodes
- API Reference
- Attribution
Why Phlox?
Most LLM orchestration frameworks make three mistakes:
- Too much magic β implicit state, hidden retries, framework-owned I/O clients
- Not enough OTP β no supervision, no fault tolerance, no introspection
- Hard to test β nodes are tangled with the graph; you can't test them in isolation
Phlox fixes all three. A node is just a module. The graph is just a map. State is just a map passed by value. Every node is independently testable. Every flow runs under OTP supervision.
| LangChain | CrewAI | Phlox | |
|---|---|---|---|
| Core abstraction | Chain / Agent | Crew / Task | Graph of nodes |
| Elixir-native | β | β | β |
| OTP supervision | β | β | β |
| Composable middleware | β | β | β |
| Persistent checkpoints | β | β | β |
| Typed shared state | β | β | β |
| Node-declared interceptors | β | β | β |
| Step-through debugging | β | β | β |
| Real-time monitoring | β | β | β |
| Node isolation | β | β | β |
| Zero required runtime deps | β | β | β |
Installation
# mix.exs
def deps do
[
{:phlox, "~> 0.3"},
# Optional β enables telemetry events + Monitor
{:telemetry, "~> 1.0"},
# Optional β enables typed shared state with full spec algebra
{:gladius, "~> 0.6"},
# Optional β enables LLM provider adapters
{:req, "~> 0.5"},
# Optional β enables Ecto checkpoint adapter
{:ecto_sql, "~> 3.0"},
{:postgrex, ">= 0.0.0"},
# Optional β enables Phoenix adapter
{:phoenix_live_view, "~> 1.0"},
]
endPhlox has zero required runtime dependencies. Every integration β telemetry, Gladius, Ecto, Phoenix, Datastar β activates automatically when its deps are present. No configuration needed.
Core Concept: The Graph
Everything in Phlox is a directed graph of nodes.
[FetchNode] ββdefaultβββΆ [ParseNode] ββdefaultβββΆ [StoreNode]
β
βββ"error"βββΆ [ErrorNode]- Nodes are Elixir modules implementing
Phlox.Node. They receive data, do work, and pass updated data forward. - Edges are action strings returned by
post/4. Phlox follows the matching edge to find the next node. - Shared state is a plain
%{}map threaded through every node by value β no mutation, no hidden state, no surprises.
Your First Flow
defmodule MyApp.FetchNode do
use Phlox.Node
def prep(shared, _params), do: Map.fetch!(shared, :url)
def exec(url, _params) do
case HTTPoison.get(url) do
{:ok, %{status_code: 200, body: body}} -> {:ok, body}
{:ok, %{status_code: code}} -> {:error, "HTTP #{code}"}
{:error, reason} -> {:error, inspect(reason)}
end
end
def post(shared, _prep, {:ok, body}, _params) do
{:default, Map.put(shared, :body, body)}
end
def post(shared, _prep, {:error, reason}, _params) do
{"error", Map.put(shared, :error, reason)}
end
end
defmodule MyApp.WordCountNode do
use Phlox.Node
def prep(shared, _params), do: Map.fetch!(shared, :body)
def exec(body, _params), do: body |> String.split(~r/\s+/, trim: true) |> length()
def post(shared, _prep, count, _params) do
{:default, Map.put(shared, :word_count, count)}
end
end
# Wire the graph
alias Phlox.Graph
flow =
Graph.new()
|> Graph.add_node(:fetch, MyApp.FetchNode, %{}, max_retries: 3)
|> Graph.add_node(:count, MyApp.WordCountNode, %{})
|> Graph.connect(:fetch, :count)
|> Graph.start_at(:fetch)
|> Graph.to_flow!()
# Run it
{:ok, result} = Phlox.run(flow, %{url: "https://example.com"})
IO.puts("Word count: #{result.word_count}")Node Lifecycle In Depth
Every node runs three phases:
shared βββΆ prep(shared, params)
β
βΌ prep_res
exec(prep_res, params) βββ retry loop + interceptors wrap this
β
βΌ exec_res
post(shared, prep_res, exec_res, params)
β
βΌ {action, new_shared}prep/2 β Reads from shared, returns data for exec. Pure, no side effects.
exec/2 β Does the work: HTTP calls, LLM requests, DB queries. Has no access
to shared β only what prep handed it. This is what makes every node
independently testable.
post/4 β Updates state and decides the next action. Returns {action, new_shared}.
Use :default for the normal forward path.
exec_fallback/3 β Called when all retries are exhausted. Default re-raises.
Override to degrade gracefully.
Routing and Branching
Branches are pattern-matched post/4 clauses returning different action strings.
defmodule MyApp.RouterNode do
use Phlox.Node
def prep(shared, _p), do: Map.fetch!(shared, :request_type)
def exec(type, _p), do: type
def post(shared, _p, :summarise, _p2), do: {"summarise", shared}
def post(shared, _p, :translate, _p2), do: {"translate", shared}
def post(shared, _p, _unknown, _p2), do: {"error", shared}
endRetry Logic
Graph.add_node(:fetch, MyApp.FetchNode, %{}, max_retries: 3, wait_ms: 1000)
With max_retries: 3, wait_ms: 1000:
attempt 1 β raises β sleep 1000ms
attempt 2 β raises β sleep 1000ms
attempt 3 β raises β exec_fallback/3 calledBatch Processing
Sequential: Implement exec_one/2. prep/2 returns the list.
defmodule MyApp.TranslateManyNode do
use Phlox.BatchNode
def prep(shared, _p), do: Map.fetch!(shared, :texts)
def exec_one(text, params), do: MyLLM.translate(text, to: params.target_language)
def post(shared, _prep, translations, _p), do: {:default, Map.put(shared, :translations, translations)}
endParallel: Add parallel: true. Results are always in input order.
use Phlox.BatchNode, parallel: true, max_concurrency: 10, timeout: 15_000Middleware
Middleware wraps the entire node lifecycle β before_node fires before prep,
after_node fires after post. Middlewares compose in onion order.
before_node (mw1 β mw2 β mw3)
β
prep β exec β post β node work
β
after_node (mw3 β mw2 β mw1)Implementing a middleware
defmodule MyApp.Middleware.CostTracker do
@behaviour Phlox.Middleware
@impl true
def before_node(shared, _ctx) do
{:cont, Map.put(shared, :_node_start, System.monotonic_time())}
end
@impl true
def after_node(shared, action, _ctx) do
elapsed = System.monotonic_time() - shared._node_start
costs = Map.update(shared, :_costs, [elapsed], &[elapsed | &1])
{:cont, Map.delete(costs, :_node_start), action}
end
endUsing middleware
Phlox.Pipeline.orchestrate(flow, flow.start_id, shared,
middlewares: [
Phlox.Middleware.Validate, # typed state enforcement
MyApp.Middleware.CostTracker, # cost tracking
Phlox.Middleware.Checkpoint # persistent checkpoints
],
run_id: "my-run-001",
metadata: %{checkpoint: {Phlox.Checkpoint.Memory, []}}
)Halting
Return {:halt, reason} from any callback to abort the flow. The pipeline
raises Phlox.HaltedError with the reason, node id, middleware module, and
phase (:before_node or :after_node).
Checkpoints and Resume
Checkpoint middleware persists shared after each node completes, creating
an append-only event log. Flows survive restarts, and you can rewind to any
node to re-execute from a known-good state.
Enabling checkpoints
# In-memory (development/testing)
{:ok, _} = Phlox.Checkpoint.Memory.start_link()
Phlox.Pipeline.orchestrate(flow, flow.start_id, shared,
middlewares: [Phlox.Middleware.Checkpoint],
run_id: "ingest-001",
metadata: %{
checkpoint: {Phlox.Checkpoint.Memory, []},
flow_name: "IngestPipeline"
}
)
# Ecto/Postgres (production)
# 1. Add {:ecto_sql, "~> 3.0"} to deps
# 2. Run: mix phlox.gen.migration && mix ecto.migrate
Phlox.Pipeline.orchestrate(flow, flow.start_id, shared,
middlewares: [Phlox.Middleware.Checkpoint],
run_id: "ingest-001",
metadata: %{
checkpoint: {Phlox.Checkpoint.Ecto, repo: MyApp.Repo},
flow_name: "IngestPipeline"
}
)Resume after crash
{:ok, result} = Phlox.Resume.resume("ingest-001",
flow: my_flow,
checkpoint: {Phlox.Checkpoint.Ecto, repo: MyApp.Repo}
)Rewind to a specific node
Detected a hallucination at node :embed? Rewind to the checkpoint saved
after :chunk and re-execute everything downstream:
{:ok, result} = Phlox.Resume.rewind("ingest-001", :chunk,
flow: my_flow,
checkpoint: {Phlox.Checkpoint.Ecto, repo: MyApp.Repo}
)Checkpoint history
Every node completion is an immutable event with a monotonic sequence number:
{:ok, history} = Phlox.Checkpoint.Memory.history("ingest-001")
# [
# %{sequence: 1, node_id: :fetch, next_node_id: :parse, event_type: :node_completed, ...},
# %{sequence: 2, node_id: :parse, next_node_id: :embed, event_type: :node_completed, ...},
# %{sequence: 3, node_id: :embed, next_node_id: nil, event_type: :flow_completed, ...}
# ]Typed Shared State
Declare what shared must look like entering and leaving each node.
Uses Gladius for the full spec algebra,
or plain functions as a zero-dep fallback.
With Gladius
defmodule MyApp.EmbedNode do
use Phlox.Node
use Phlox.Typed
import Gladius
input open_schema(%{
required(:text) => string(:filled?),
required(:language) => string(:filled?)
})
output open_schema(%{
required(:text) => string(:filled?),
required(:language) => string(:filled?),
required(:embedding) => list_of(spec(is_list()))
})
def prep(shared, _p), do: {shared.text, shared.language}
def exec({text, lang}, _p), do: MyLLM.embed(text, lang)
def post(shared, _p, emb, _p2), do: {:default, Map.put(shared, :embedding, emb)}
end
Gladius specs are parse-don't-validate: conform/2 returns a shaped value
with coercions, transforms, and defaults applied. The middleware replaces
shared with the shaped output, so specs actively participate in data flow.
Without Gladius (plain functions)
input fn shared ->
if is_binary(shared[:text]), do: :ok, else: {:error, ":text required"}
endEnabling validation
Phlox.Pipeline.orchestrate(flow, flow.start_id, shared,
middlewares: [Phlox.Middleware.Validate]
)Nodes without specs are silently skipped β safe to include globally.
Interceptors
Interceptors are the complement to middleware. Where middleware wraps the
entire node lifecycle and is configured at the pipeline level, interceptors
wrap just exec/2 and are declared by the node itself.
Middleware = what the framework does to every node.
Interceptors = what nodes declare they want done to themselves.Declaring interceptors
defmodule MyApp.EmbedNode do
use Phlox.Node
intercept MyApp.Interceptor.Cache, ttl: :timer.minutes(5)
intercept MyApp.Interceptor.RateLimit, max: 10, per: :second
def exec(text, _params), do: MyLLM.embed(text)
endHow they execute
Interceptors run inside the retry loop, wrapping each attempt:
Retry loop:
attempt 1 β before_exec β exec β after_exec β (raises)
attempt 2 β before_exec β exec β after_exec β successImplementing an interceptor
defmodule MyApp.Interceptor.Cache do
@behaviour Phlox.Interceptor
@impl true
def before_exec(prep_res, ctx) do
case lookup(prep_res, ctx.interceptor_opts[:ttl]) do
{:hit, value} -> {:skip, value} # short-circuit exec entirely
:miss -> {:cont, prep_res}
end
end
@impl true
def after_exec(exec_res, ctx) do
store(ctx.prep_res, exec_res, ctx.interceptor_opts[:ttl])
{:cont, exec_res}
end
endAccess boundary
Interceptors see prep_res and exec_res but notshared. They cannot
change routing. This is the fundamental distinction from middleware.
OTP: Supervised Flows
FlowServer
Wraps a flow in a GenServer with middleware and interceptor support:
{:ok, pid} = Phlox.FlowServer.start_link(
flow: my_flow,
shared: %{url: "..."},
middlewares: [Phlox.Middleware.Validate, Phlox.Middleware.Checkpoint],
run_id: "job-001",
metadata: %{checkpoint: {Phlox.Checkpoint.Memory, []}}
)
# Run to completion
{:ok, final} = Phlox.FlowServer.run(pid)
# Or step through with middleware hooks firing per step
{:continue, :parse, shared} = Phlox.FlowServer.step(pid)
{:done, final} = Phlox.FlowServer.step(pid)
# Resume from checkpoint
{:ok, pid} = Phlox.FlowServer.start_link(
flow: my_flow,
resume: "job-001",
checkpoint: {Phlox.Checkpoint.Memory, []}
)FlowSupervisor
Spawn named flows at runtime under a DynamicSupervisor:
{:ok, _} = Phlox.FlowSupervisor.start_flow(:my_job, flow, shared)
{:ok, result} = Phlox.FlowServer.run(Phlox.FlowSupervisor.server(:my_job))
Phlox.FlowSupervisor.running() # => [:my_job, ...]
Phlox.FlowSupervisor.stop_flow(:my_job)Real-Time Monitoring
Phlox.Monitor tracks every running flow in ETS via telemetry events.
# Query
snapshot = Phlox.Monitor.get("my-flow-id")
# Subscribe
:ok = Phlox.Monitor.subscribe("my-flow-id")
receive do
{:phlox_monitor, :node_done, snapshot} -> IO.inspect(snapshot)
endAdapters: Phoenix and Datastar
Both adapters read from Phlox.Monitor. A Phoenix app using Datastar for its
frontend can run both simultaneously.
Phoenix LiveView
defmodule MyAppWeb.JobLive do
use MyAppWeb, :live_view
use Phlox.Adapter.Phoenix
def mount(%{"flow_id" => flow_id}, _session, socket) do
{:ok, phlox_subscribe(socket, flow_id)}
end
endDatastar SSE
# In your router
get "/phlox/stream/:flow_id", Phlox.Adapter.Datastar.Plug, []LLM Providers
Phlox ships with a Phlox.LLM behaviour and adapters for major providers.
The provider is injected via node params β swapping is one line.
Available adapters
| Adapter | Provider | Free tier | Requires |
|---|---|---|---|
Phlox.LLM.Google | Gemini via AI Studio | 1,500 req/day | GOOGLE_AI_KEY |
Phlox.LLM.Groq | Llama/Mistral on Groq | 14,400 req/day | GROQ_API_KEY |
Phlox.LLM.Anthropic | Claude API | $5 free credits | ANTHROPIC_API_KEY |
Phlox.LLM.Ollama | Local models | Unlimited | Ollama installed |
Using in a node
defmodule MyApp.ThinkNode do
use Phlox.Node
def prep(shared, _p), do: shared.prompt
def exec(prompt, params) do
provider = Map.fetch!(params, :llm)
messages = [
%{role: "system", content: "You are a helpful assistant."},
%{role: "user", content: prompt}
]
Phlox.LLM.chat!(provider, messages, params[:llm_opts] || [])
end
def post(shared, _p, response, _p2) do
{:default, Map.put(shared, :response, response)}
end
endSwapping providers
# Development β free
Graph.add_node(:think, ThinkNode, %{llm: Phlox.LLM.Google, llm_opts: [model: "gemini-2.5-flash"]})
# Production β highest quality
Graph.add_node(:think, ThinkNode, %{llm: Phlox.LLM.Anthropic, llm_opts: [model: "claude-sonnet-4-6"]})Included example: Code Review Brain Trust
{:ok, result} = Phlox.Examples.CodeReview.run(~S"""
def fetch_user(id) do
query = "SELECT * FROM users WHERE id = #{id}"
Repo.query!(query)
end
""",
llm: Phlox.LLM.Groq,
llm_opts: [model: "llama-3.3-70b-versatile"],
language: "elixir"
)
IO.puts(result.final_review)Three specialist agents (security, logic, style) analyze code from different angles, then a synthesizer combines, deduplicates, and ranks findings.
Telemetry
Phlox emits telemetry events when :telemetry is present. Zero overhead when absent.
| Event | Measurements | When |
|---|---|---|
[:phlox, :flow, :start] | system_time | flow begins |
[:phlox, :flow, :stop] | duration | flow finishes |
[:phlox, :node, :start] | system_time |
before prep/2 |
[:phlox, :node, :stop] | duration |
after post/4 |
[:phlox, :node, :exception] | duration | node raises after all retries |
Design Patterns
Simple Pipeline
defmodule MyApp.IngestFlow do
use Phlox.DSL
node :fetch, FetchNode, %{}, max_retries: 3
node :clean, CleanNode, %{}
node :embed, EmbedNode, %{model: "ada-002"}
node :store, StoreNode, %{}
connect :fetch, :clean
connect :clean, :embed
connect :embed, :store
start_at :fetch
endAgent Loop (ReAct)
connect :think, :act, action: "continue"
connect :think, :finish, action: "done"
connect :act, :think # loop backMulti-Agent Brain Trust
Three specialists review from different angles, then a synthesizer combines.
See Phlox.Examples.CodeReview for a working implementation.
Testing Your Nodes
Because exec/2 has no access to shared, nodes are trivially testable in isolation:
test "translates text to target language" do
result = MyApp.TranslateNode.exec(%{text: "hello", language: "fr"}, %{})
assert result == {:ok, "bonjour"}
end
test "routes to :default on success" do
{action, new_shared} = MyApp.TranslateNode.post(%{}, nil, {:ok, "bonjour"}, %{})
assert action == :default
assert new_shared.translation == "bonjour"
endAPI Reference
Module map
Phlox top-level: run/2
Phlox.Node behaviour + __using__ macro + intercept
Phlox.BatchNode exec_one/2 per item
Phlox.FanOutNode sub_flow per item
Phlox.BatchFlow full-flow fan-out
Phlox.DSL declarative flow definition
Phlox.Graph builder: new/add_node/connect/start_at/to_flow[!]
Phlox.Flow %Flow{} struct + run/2
Phlox.Runner orchestrate/3 (pure, no middleware, no interceptors)
Phlox.Pipeline orchestrate/4 (middleware + interceptor support)
Phlox.Retry run/3 (exec with retry + optional interceptor wrapping)
Phlox.Middleware behaviour: before_node/2, after_node/3
Phlox.Middleware.Validate enforces Phlox.Typed specs at node boundaries
Phlox.Middleware.Checkpoint persists shared after each node
Phlox.Interceptor behaviour: before_exec/2, after_exec/2 + intercept macro
Phlox.Typed use macro: input/1, output/1 spec declarations
Phlox.Checkpoint behaviour: save/load_latest/load_at/history/delete
Phlox.Checkpoint.Memory in-memory adapter (Agent-backed)
Phlox.Checkpoint.Ecto Postgres adapter (append-only event log)
Phlox.Resume resume/2, rewind/3
Phlox.HaltedError raised when middleware/interceptor halts
Phlox.FlowServer GenServer: run/step/state/reset + middleware + resume
Phlox.FlowSupervisor DynamicSupervisor: start_flow/stop_flow/whereis
Phlox.Monitor GenServer+ETS: get/list/subscribe/unsubscribe
Phlox.Telemetry soft-dep telemetry emission
Phlox.Adapter.Phoenix LiveView mixin + FlowMonitor component
Phlox.Adapter.Datastar SSE streaming via Plug
Phlox.LLM behaviour: chat/2
Phlox.LLM.Anthropic Claude API adapter
Phlox.LLM.Google Gemini AI Studio adapter
Phlox.LLM.Groq Groq inference adapter
Phlox.LLM.Ollama Local model adapterKey invariants
exec/2has NO access tosharedβ only whatprep/2returned.post/4must return{action, new_shared}β always a tuple.- Middleware wraps the node lifecycle β sees
sharedandaction. - Interceptors wrap
exec/2β seesprep_resandexec_res, notshared. - Runner stays pure β Pipeline adds middleware/interceptors on top.
- Checkpoint events are append-only β immutable once written.
sharedkey:phlox_flow_idis reserved for telemetry.
Attribution
Phlox is a port of PocketFlow
by The Pocket, originally written in Python.
PocketFlow's three-phase node lifecycle (prep β exec β post), graph wiring
model, and batch flow pattern are the direct inspiration for this library.
The Elixir design β explicit data threading, OTP supervision, behaviour-based
nodes, composable middleware, persistent checkpoints with rewind, typed shared
state via Gladius, node-declared interceptors,
FlowServer, FlowSupervisor, Monitor, real-time adapters, LLM provider
abstraction, and telemetry β is Phlox's own contribution.
License
MIT