HephaestusOban

Oban-based runner adapter for Hephaestus workflow engine.

Turns each workflow step into a durable Oban job with retry/backoff, advisory lock serialization, and zero-contention parallel execution via an auxiliary step_results table.

Installation

Add to your mix.exs:

def deps do
  [
    {:hephaestus_oban, "~> 0.2.0"}
  ]
end

Setup

1. Generate and run migrations

Requires HephaestusEcto migration to be run first (FK reference).

mix hephaestus_ecto.gen.migration
mix hephaestus_oban.gen.migration
mix ecto.migrate

2. Configure your workflow engine

defmodule MyApp.Hephaestus do
  use Hephaestus,
    storage: {HephaestusEcto.Storage, repo: MyApp.Repo},
    runner: {HephaestusOban.Runner, oban: MyApp.Oban}
end

3. Add to your supervision tree

children = [
  MyApp.Repo,
  {Oban, name: MyApp.Oban, repo: MyApp.Repo, queues: [hephaestus: 10]},
  MyApp.Hephaestus
]

How it works

Architecture

start_instance(Workflow, context)
  |
  +-- Instance.new() --> persist via HephaestusEcto.Storage
  +-- Oban.insert(AdvanceWorker)
       |
       v
  AdvanceWorker (single Instance writer, advisory lock)
  | Engine.advance() --> active_steps: {StepA, StepB, StepC}
  +-- enqueue 3x ExecuteStepWorker
       |
       +-- ExecuteStepWorker(StepA) --+
       +-- ExecuteStepWorker(StepB) --+  parallel, zero contention
       +-- ExecuteStepWorker(StepC) --+
                                      |
       each: execute step             |
             INSERT step_results      |
             enqueue AdvanceWorker    |
                                      v
  AdvanceWorker (serialized via unique + advisory lock)
  | apply step_results --> Engine.complete_step + activate_transitions
  | persist Instance
  |
  +-- :completed --> done
  +-- :waiting   --> awaits ResumeWorker
  +-- active     --> enqueue ExecuteStepWorkers (next wave)

Three workers

Worker Role Writes to Instance?
AdvanceWorker Orchestrator. Reads step_results, applies Engine transitions, persists Instance. Serialized per instance via Oban unique + pg_advisory_xact_lock. Yes (single writer)
ExecuteStepWorker Executes a single step. Writes result to step_results table, enqueues AdvanceWorker. Idempotent via existence check. No
ResumeWorker Handles external events and durable timers. Writes to step_results, enqueues AdvanceWorker. No

Concurrency model

ExecuteStepWorkers run in parallel during fan-out. They never write to the Instance directly — each inserts its own row into hephaestus_step_results (zero contention). The AdvanceWorker is the single writer for the Instance, serialized via Oban unique constraint + PostgreSQL advisory lock. All Instance mutations happen atomically inside a Repo.transaction with pg_advisory_xact_lock.

Failure handling

When an ExecuteStepWorker exhausts all retries (discarded by Oban), the FailureHandler telemetry listener detects it and enqueues an AdvanceWorker, which marks the workflow as :failed and cancels remaining pending jobs.

Retry configuration

Retry config resolves with most-specific-wins priority:

  1. Step.retry_config/0 — per-step override (optional callback)
  2. Workflow.default_retry_config/0 — per-workflow default (optional callback)
  3. Library default — %{max_attempts: 5, backoff: :exponential, max_backoff: 60_000}

Async steps and durable timers

# Step returns {:async} --> instance moves to :waiting
# Resume with external event:
MyApp.Hephaestus.resume(instance_id, :payment_confirmed)

# Schedule a durable timer (survives VM restarts):
MyApp.Hephaestus.schedule_resume(instance_id, :wait_step, 30_000)
# Returns {:ok, job_id} — cancellable via Oban.cancel_job/1

Observability — Workflow metadata and tags

All Oban jobs are automatically tagged with workflow metadata for filtering in Oban Web.

Define tags and metadata on your workflow

defmodule MyApp.Workflows.OnboardFlow do
  use Hephaestus.Workflow,
    tags: ["onboarding", "growth"],
    metadata: %{"team" => "growth"}

  @impl true
  def start, do: MyApp.Steps.ValidateUser

  @impl true
  def transit(MyApp.Steps.ValidateUser, :valid, _ctx), do: MyApp.Steps.SendWelcome
  def transit(MyApp.Steps.SendWelcome, :sent, _ctx), do: Hephaestus.Steps.Done
end

What gets set on every Oban job

Field Value Example
meta.heph_workflow Workflow short name (snake_case) "onboard_flow"
meta.instance_id Workflow execution UUID "CBD700A6-..."
meta.step Step short name (when applicable) "validate_user"
meta.* Custom metadata from workflow definition "team": "growth"
tags Workflow short name + custom tags ["onboard_flow", "onboarding", "growth"]

Filtering in Oban Web

tags:onboard_flow             → all jobs for this workflow type
meta.instance_id:CBD700...    → all jobs for a specific execution
meta.step:validate_user       → all executions of a specific step
meta.team:growth              → custom filter

Workflows without tags/metadata still get automatic heph_workflow, instance_id, and step fields.

Database schema

step_results table

Auxiliary table for zero-contention parallel step execution:

hephaestus_step_results
+-- id               UUID (primary key)
+-- instance_id      UUID (FK -> workflow_instances, ON DELETE CASCADE)
+-- step_ref         STRING (module name)
+-- event            STRING (step event or "__async__" sentinel)
+-- context_updates  JSONB (step output data)
+-- processed        BOOLEAN (consumed by AdvanceWorker)
+-- inserted_at      TIMESTAMP

Indexes:

Queue configuration

# Default: single queue for all workers
{Oban, queues: [hephaestus: 10]}

# Advanced: separate queues for orchestration vs execution
{Oban, queues: [hephaestus_advance: 5, hephaestus_execute: 20]}

The hephaestus: 10 means up to 10 Oban jobs run concurrently. In a fan-out of 20 steps, only 10 execute at once — the rest wait. Adjust based on your workload.

Error handling

Scenario Handler Outcome
Step returns {:error, reason} Oban retry with backoff Retried up to max_attempts
Step exhausts all retries FailureHandler (telemetry) Workflow marked :failed
Step crashes/raises Oban catches, treats as error Same retry flow
AdvanceWorker fails Oban retry Idempotent — re-applies unprocessed step_results
ResumeWorker fails Oban retry Idempotent — INSERT deduplicated via unique index
DB connection lost Oban retry All workers are idempotent

Requirements

License

MIT