HephaestusOban
Oban-based runner adapter for Hephaestus workflow engine.
Turns each workflow step into a durable Oban job with retry/backoff, advisory lock serialization, and zero-contention parallel execution via an auxiliary step_results table.
Installation
Add to your mix.exs:
def deps do
[
{:hephaestus_oban, "~> 0.2.0"}
]
endSetup
1. Generate and run migrations
Requires HephaestusEcto migration to be run first (FK reference).
mix hephaestus_ecto.gen.migration
mix hephaestus_oban.gen.migration
mix ecto.migrate2. Configure your workflow engine
defmodule MyApp.Hephaestus do
use Hephaestus,
storage: {HephaestusEcto.Storage, repo: MyApp.Repo},
runner: {HephaestusOban.Runner, oban: MyApp.Oban}
end3. Add to your supervision tree
children = [
MyApp.Repo,
{Oban, name: MyApp.Oban, repo: MyApp.Repo, queues: [hephaestus: 10]},
MyApp.Hephaestus
]How it works
Architecture
start_instance(Workflow, context)
|
+-- Instance.new() --> persist via HephaestusEcto.Storage
+-- Oban.insert(AdvanceWorker)
|
v
AdvanceWorker (single Instance writer, advisory lock)
| Engine.advance() --> active_steps: {StepA, StepB, StepC}
+-- enqueue 3x ExecuteStepWorker
|
+-- ExecuteStepWorker(StepA) --+
+-- ExecuteStepWorker(StepB) --+ parallel, zero contention
+-- ExecuteStepWorker(StepC) --+
|
each: execute step |
INSERT step_results |
enqueue AdvanceWorker |
v
AdvanceWorker (serialized via unique + advisory lock)
| apply step_results --> Engine.complete_step + activate_transitions
| persist Instance
|
+-- :completed --> done
+-- :waiting --> awaits ResumeWorker
+-- active --> enqueue ExecuteStepWorkers (next wave)Three workers
| Worker | Role | Writes to Instance? |
|---|---|---|
| AdvanceWorker |
Orchestrator. Reads step_results, applies Engine transitions, persists Instance. Serialized per instance via Oban unique + pg_advisory_xact_lock. | Yes (single writer) |
| ExecuteStepWorker |
Executes a single step. Writes result to step_results table, enqueues AdvanceWorker. Idempotent via existence check. | No |
| ResumeWorker |
Handles external events and durable timers. Writes to step_results, enqueues AdvanceWorker. | No |
Concurrency model
ExecuteStepWorkers run in parallel during fan-out. They never write to the Instance directly — each inserts its own row into hephaestus_step_results (zero contention). The AdvanceWorker is the single writer for the Instance, serialized via Oban unique constraint + PostgreSQL advisory lock. All Instance mutations happen atomically inside a Repo.transaction with pg_advisory_xact_lock.
Failure handling
When an ExecuteStepWorker exhausts all retries (discarded by Oban), the FailureHandler telemetry listener detects it and enqueues an AdvanceWorker, which marks the workflow as :failed and cancels remaining pending jobs.
Retry configuration
Retry config resolves with most-specific-wins priority:
Step.retry_config/0— per-step override (optional callback)Workflow.default_retry_config/0— per-workflow default (optional callback)-
Library default —
%{max_attempts: 5, backoff: :exponential, max_backoff: 60_000}
Async steps and durable timers
# Step returns {:async} --> instance moves to :waiting
# Resume with external event:
MyApp.Hephaestus.resume(instance_id, :payment_confirmed)
# Schedule a durable timer (survives VM restarts):
MyApp.Hephaestus.schedule_resume(instance_id, :wait_step, 30_000)
# Returns {:ok, job_id} — cancellable via Oban.cancel_job/1Observability — Workflow metadata and tags
All Oban jobs are automatically tagged with workflow metadata for filtering in Oban Web.
Define tags and metadata on your workflow
defmodule MyApp.Workflows.OnboardFlow do
use Hephaestus.Workflow,
tags: ["onboarding", "growth"],
metadata: %{"team" => "growth"}
@impl true
def start, do: MyApp.Steps.ValidateUser
@impl true
def transit(MyApp.Steps.ValidateUser, :valid, _ctx), do: MyApp.Steps.SendWelcome
def transit(MyApp.Steps.SendWelcome, :sent, _ctx), do: Hephaestus.Steps.Done
endWhat gets set on every Oban job
| Field | Value | Example |
|---|---|---|
meta.heph_workflow | Workflow short name (snake_case) | "onboard_flow" |
meta.instance_id | Workflow execution UUID | "CBD700A6-..." |
meta.step | Step short name (when applicable) | "validate_user" |
meta.* | Custom metadata from workflow definition | "team": "growth" |
tags | Workflow short name + custom tags | ["onboard_flow", "onboarding", "growth"] |
Filtering in Oban Web
tags:onboard_flow → all jobs for this workflow type
meta.instance_id:CBD700... → all jobs for a specific execution
meta.step:validate_user → all executions of a specific step
meta.team:growth → custom filter
Workflows without tags/metadata still get automatic heph_workflow, instance_id, and step fields.
Database schema
step_results table
Auxiliary table for zero-contention parallel step execution:
hephaestus_step_results
+-- id UUID (primary key)
+-- instance_id UUID (FK -> workflow_instances, ON DELETE CASCADE)
+-- step_ref STRING (module name)
+-- event STRING (step event or "__async__" sentinel)
+-- context_updates JSONB (step output data)
+-- processed BOOLEAN (consumed by AdvanceWorker)
+-- inserted_at TIMESTAMPIndexes:
-
Partial index on
instance_id WHERE NOT processed— fast pending lookups -
Partial unique index on
(instance_id, step_ref) WHERE NOT processed— idempotent inserts viaON CONFLICT DO NOTHING
Queue configuration
# Default: single queue for all workers
{Oban, queues: [hephaestus: 10]}
# Advanced: separate queues for orchestration vs execution
{Oban, queues: [hephaestus_advance: 5, hephaestus_execute: 20]}
The hephaestus: 10 means up to 10 Oban jobs run concurrently. In a fan-out of 20 steps, only 10 execute at once — the rest wait. Adjust based on your workload.
Error handling
| Scenario | Handler | Outcome |
|---|---|---|
Step returns {:error, reason} | Oban retry with backoff | Retried up to max_attempts |
| Step exhausts all retries | FailureHandler (telemetry) |
Workflow marked :failed |
| Step crashes/raises | Oban catches, treats as error | Same retry flow |
| AdvanceWorker fails | Oban retry | Idempotent — re-applies unprocessed step_results |
| ResumeWorker fails | Oban retry | Idempotent — INSERT deduplicated via unique index |
| DB connection lost | Oban retry | All workers are idempotent |
Requirements
- Elixir ~> 1.19
- Oban >= 2.14
- PostgreSQL (for advisory locks and JSONB)
- Hephaestus ~> 0.1.3
- HephaestusEcto ~> 0.1
License
MIT