HephaestusOban
Oban-based runner adapter for Hephaestus workflow engine.
Turns each workflow step into a durable Oban job with retry/backoff, advisory lock serialization, and zero-contention parallel execution via an auxiliary step_results table.
Installation
Add to your mix.exs:
def deps do
[
{:hephaestus_oban, "~> 0.1.0"}
]
endSetup
1. Generate and run migrations
Requires HephaestusEcto migration to be run first (FK reference).
mix hephaestus_ecto.gen.migration
mix hephaestus_oban.gen.migration
mix ecto.migrate2. Configure your workflow engine
defmodule MyApp.Hephaestus do
use Hephaestus,
storage: {HephaestusEcto.Storage, repo: MyApp.Repo},
runner: {HephaestusOban.Runner, oban: MyApp.Oban}
end3. Add to your supervision tree
children = [
MyApp.Repo,
{Oban, name: MyApp.Oban, repo: MyApp.Repo, queues: [hephaestus: 10]},
MyApp.Hephaestus
]How it works
Architecture
start_instance(Workflow, context)
|
+-- Instance.new() --> persist via HephaestusEcto.Storage
+-- Oban.insert(AdvanceWorker)
|
v
AdvanceWorker (single Instance writer, advisory lock)
| Engine.advance() --> active_steps: {StepA, StepB, StepC}
+-- enqueue 3x ExecuteStepWorker
|
+-- ExecuteStepWorker(StepA) --+
+-- ExecuteStepWorker(StepB) --+ parallel, zero contention
+-- ExecuteStepWorker(StepC) --+
|
each: execute step |
INSERT step_results |
enqueue AdvanceWorker |
v
AdvanceWorker (serialized via unique + advisory lock)
| apply step_results --> Engine.complete_step + activate_transitions
| persist Instance
|
+-- :completed --> done
+-- :waiting --> awaits ResumeWorker
+-- active --> enqueue ExecuteStepWorkers (next wave)Three workers
| Worker | Role | Writes to Instance? |
|---|---|---|
| AdvanceWorker |
Orchestrator. Reads step_results, applies Engine transitions, persists Instance. Serialized per instance via Oban unique + pg_advisory_xact_lock. | Yes (single writer) |
| ExecuteStepWorker |
Executes a single step. Writes result to step_results table, enqueues AdvanceWorker. Idempotent via existence check. | No |
| ResumeWorker |
Handles external events and durable timers. Writes to step_results, enqueues AdvanceWorker. | No |
Concurrency model
ExecuteStepWorkers run in parallel during fan-out. They never write to the Instance directly — each inserts its own row into hephaestus_step_results (zero contention). The AdvanceWorker is the single writer for the Instance, serialized via Oban unique constraint + PostgreSQL advisory lock. All Instance mutations happen atomically inside a Repo.transaction with pg_advisory_xact_lock.
Failure handling
When an ExecuteStepWorker exhausts all retries (discarded by Oban), the FailureHandler telemetry listener detects it and enqueues an AdvanceWorker, which marks the workflow as :failed and cancels remaining pending jobs.
Retry configuration
Retry config resolves with most-specific-wins priority:
Step.retry_config/0— per-step override (optional callback)Workflow.default_retry_config/0— per-workflow default (optional callback)-
Library default —
%{max_attempts: 5, backoff: :exponential, max_backoff: 60_000}
Async steps and durable timers
# Step returns {:async} --> instance moves to :waiting
# Resume with external event:
MyApp.Hephaestus.resume(instance_id, :payment_confirmed)
# Schedule a durable timer (survives VM restarts):
MyApp.Hephaestus.schedule_resume(instance_id, :wait_step, 30_000)
# Returns {:ok, job_id} — cancellable via Oban.cancel_job/1Database schema
step_results table
Auxiliary table for zero-contention parallel step execution:
hephaestus_step_results
+-- id UUID (primary key)
+-- instance_id UUID (FK -> workflow_instances, ON DELETE CASCADE)
+-- step_ref STRING (module name)
+-- event STRING (step event or "__async__" sentinel)
+-- context_updates JSONB (step output data)
+-- processed BOOLEAN (consumed by AdvanceWorker)
+-- inserted_at TIMESTAMPIndexes:
-
Partial index on
instance_id WHERE NOT processed— fast pending lookups -
Partial unique index on
(instance_id, step_ref) WHERE NOT processed— idempotent inserts viaON CONFLICT DO NOTHING
Queue configuration
# Default: single queue for all workers
{Oban, queues: [hephaestus: 10]}
# Advanced: separate queues for orchestration vs execution
{Oban, queues: [hephaestus_advance: 5, hephaestus_execute: 20]}
The hephaestus: 10 means up to 10 Oban jobs run concurrently. In a fan-out of 20 steps, only 10 execute at once — the rest wait. Adjust based on your workload.
Error handling
| Scenario | Handler | Outcome |
|---|---|---|
Step returns {:error, reason} | Oban retry with backoff | Retried up to max_attempts |
| Step exhausts all retries | FailureHandler (telemetry) |
Workflow marked :failed |
| Step crashes/raises | Oban catches, treats as error | Same retry flow |
| AdvanceWorker fails | Oban retry | Idempotent — re-applies unprocessed step_results |
| ResumeWorker fails | Oban retry | Idempotent — INSERT deduplicated via unique index |
| DB connection lost | Oban retry | All workers are idempotent |
Requirements
- Elixir ~> 1.19
- Oban >= 2.14
- PostgreSQL (for advisory locks and JSONB)
- Hephaestus ~> 0.1
- HephaestusEcto ~> 0.1
License
MIT