DAG-based job workflows for Oban: dependency ordering, fan-out/synthesis, result passing, retry idempotency, and per-step LLM cost tracking — built entirely on Oban OSS, no Oban Pro required.
Features
- Directed acyclic graphs of Oban jobs with named dependencies, validated for cycles before insertion (Kahn's algorithm).
- Self-gating execution — each job checks its dependencies at runtime and snoozes, proceeds, or cancels accordingly. No external scheduler.
- Completion-triggered rescheduling so downstream steps start promptly instead of waiting out a snooze timer.
- Result passing between steps, stored in the engine's own table (never in
oban_jobs.meta). - Retry idempotency — a retried step that already produced a result returns it without re-running side effects (important for paid LLM calls).
- Multi-model fan-out — run the same step across several models and synthesize the results.
- Observability — telemetry for every transition, optional per-step token/
cost stats, optional full context-window capture, and live step events over
Phoenix.PubSubfor building a LiveView dashboard.
Installation
def deps do
[
{:baton, "~> 0.1"},
{:oban, "~> 2.17"}
]
end
Add the schema via a migration:
defmodule MyApp.Repo.Migrations.AddBaton do
use Ecto.Migration
# Omit :version to install the latest schema. The migration is idempotent
# (create_if_not_exists), so to upgrade an existing install you can ship a new
# migration that simply calls Baton.Migration.up/0 again.
def up, do: Baton.Migration.up()
def down, do: Baton.Migration.down()
end
Configure (the repo is inherited from Oban automatically):
config :baton,
oban_name: Oban,
pubsub: MyApp.PubSub, # only for live events/dashboard
pricing: MyApp.LLMPricing # only if tracking cost
config :my_app, Oban,
plugins: [
Oban.Plugins.Pruner,
{Baton.Plugin, interval: :timer.seconds(60)}
],
queues: [default: 20]
Data retention
Baton's tables (workflow_nodes, workflow_step_stats,
workflow_debug_logs, workflow_completions) have no foreign key to
oban_jobs, so Oban's Pruner does not clean them up — left alone they grow
without bound. Enable pruning on Baton.Plugin to delete Baton rows
once their backing Oban job has been pruned:
{Baton.Plugin,
interval: :timer.seconds(60),
prune: true, # off by default
debug_log_max_age: 24 * 60 * 60} # optional: cap debug logs at 24h (seconds)
This piggybacks on Oban's Pruner, so there's a single retention policy. For
this to be safe, the Pruner's max_age must exceed your longest workflow's
runtime (which Baton already requires for correct dependency gating) — set
it generously, e.g. {Oban.Plugins.Pruner, max_age: 60 * 60 * 24}.
Usage
defmodule MyApp.Steps.Fetch do
use Baton.Worker, queue: :default
@impl true
def perform_workflow(%Oban.Job{args: %{"url" => url}}) do
{:ok, %{body: fetch(url)}}
end
end
Baton.new(workflow_name: "ingest")
|> Baton.add(:fetch, MyApp.Steps.Fetch.new(%{url: "https://example.com"}))
|> Baton.add(:parse, MyApp.Steps.Parse.new(%{}), deps: [:fetch])
|> Baton.add(:store, MyApp.Steps.Store.new(%{}), deps: [:parse])
|> Baton.insert!()
See the getting started guide, the building a workflow guide (fan-out/fan-in, pruning, and a live LiveView), and the multi-model guide.
Integrating with Phoenix LiveView
Baton ships no LiveView of its own. Instead, every step transition is
broadcast over Phoenix.PubSub, so you render progress however you like. (The
same transitions are also emitted as telemetry — see Baton.Telemetry — if
you'd rather not use Phoenix at all.)
1. Point Baton at your PubSub
A Phoenix app already starts one in its supervision tree ({Phoenix.PubSub, name: MyApp.PubSub}). Tell Baton to use it:
config :baton, pubsub: MyApp.PubSub
If :pubsub is left unset, broadcasting is a no-op and the engine runs fine
without Phoenix — only telemetry is emitted.
2. Topics and message shape
Each transition is published on two topics so views can subscribe at the granularity they need:
"workflow:all"— every event from every workflow (index views)"workflow:<workflow_id>"— one workflow's events (detail views)
Don't build these strings by hand — use the helpers in Baton.Events. The
message is always:
{:workflow_step_updated, %{
workflow_id: "uuid",
workflow_label: "patent:US11234567B2", # the :workflow_name you passed to new/1
step_name: "assess_quality",
worker: "MyApp.Patent.AssessQuality",
state: "completed", # see below
job_id: 123,
attempt: 1,
has_result: true,
error: nil, # an error string on failure, else nil
timestamp: ~U[2026-06-14 18:00:00Z]
}}
state is one of "executing", "snoozed", "completed", "retryable",
"discarded", or "cancelled".
When the last step in a workflow settles, a single terminal event is published on the same two topics:
{:workflow_finished, %{
workflow_id: "uuid",
workflow_label: "patent:US11234567B2",
outcome: :completed, # or :failed
failed_steps: [], # step names that were cancelled/discarded
timestamp: ~U[2026-06-14 18:00:24Z]
}}
Use it to flip the page to a done state, redirect, or fire a notification
without polling. (The same signal is available as
[:baton, :workflow, :finished] telemetry if you're not using PubSub.)
Requires
Baton.Pluginfor crash-case coverage. When a step fails by returning{:error, reason}, the finished event fires immediately. But if a step hard-crashes (raises/exits) or is killed by Oban, the worker never gets to announce —Baton.Plugin's periodic sweep is what detects the settled workflow and broadcasts{:workflow_finished, outcome: :failed}as a backstop (typically within one sweep interval). Make sure the plugin is in your Obanplugins:list (see Installation); without it, workflows that die from a hard crash won't emit a terminal event.
3. Subscribe in a LiveView
defmodule MyAppWeb.WorkflowLive do
use MyAppWeb, :live_view
alias Baton.Events
def mount(%{"id" => workflow_id}, _session, socket) do
if connected?(socket), do: Events.subscribe_workflow(workflow_id)
{:ok, assign(socket, workflow_id: workflow_id, steps: %{})}
end
def handle_info({:workflow_step_updated, %{step_name: name} = event}, socket) do
{:noreply, update(socket, :steps, &Map.put(&1, name, event))}
end
# ... render @steps ...
end
For an index of all running workflows, subscribe with Events.subscribe_all/0
and key your state by event.workflow_id. A complete, copy-paste pair of
detail and index LiveViews lives in
examples/my_app/live/workflow_live.ex.
Seeding initial state
PubSub only delivers events that occur aftermount, so a fresh page load (or
a step that completed before the user opened the view) won't be reflected by
events alone. Seed @steps from the database on mount using Baton.Query,
then let incoming events keep it current — and handle {:workflow_finished, _}
to react when the whole workflow is done.
How it compares to Oban Pro Workflow
Baton covers DAG ordering, fan-out/fan-in, dynamic workflows, result passing, and dependency-failure cascading. It adds cycle detection, retry idempotency, multi-model fan-out, and LLM cost tracking. The main mechanical difference is that completion uses snooze-based gating plus an opportunistic reschedule rather than Pro's event-driven completion; correctness does not depend on the reschedule.
License
MIT — see LICENSE.