HephaestusEcto
Ecto/PostgreSQL storage adapter for Hephaestus workflow engine.
Persists workflow instances across VM restarts using a single workflow_instances table with JSONB state and GIN indexing.
Version 0.3.0 introduces mandatory business keys (key::value ID format),
new query filters (:id, :status_in), and converts the id column from UUID to varchar(255).
Installation
Add to your mix.exs:
def deps do
[
{:hephaestus_ecto, "~> 0.3.0"}
]
endSetup
1. Generate the migration
mix hephaestus_ecto.gen.migration
mix ecto.migrate
This creates the workflow_instances table with:
idvarchar(255) primary key (business key formatkey::value)workflowandstatusstring columns with B-tree indexesworkflow_versioninteger column (default1)-
composite index on
workflowandworkflow_version stateJSONB column with GIN index (jsonb_path_ops)- Timestamps
1a. Version-aware upgrades
Generated migrations can keep using the original API:
def up, do: HephaestusEcto.Migration.up()
def down, do: HephaestusEcto.Migration.down()Or target a specific schema version during staged rollouts:
def up, do: HephaestusEcto.Migration.up(version: 2)
def down, do: HephaestusEcto.Migration.down(version: 1)To inspect the applied version at runtime:
HephaestusEcto.Migration.migrated_version()2. Configure your workflow engine
defmodule MyApp.Hephaestus do
use Hephaestus,
storage: {HephaestusEcto.Storage, repo: MyApp.Repo},
runner: Hephaestus.Runtime.Runner.Local
endThat's it. Instances are now persisted to PostgreSQL.
How it works
Storage adapter
HephaestusEcto.Storage implements the Hephaestus.Runtime.Storage behaviour:
| Callback | Behavior |
|---|---|
get/1 |
Fetch instance by ID. Returns {:ok, instance} or {:error, :not_found} |
put/1 | Upsert instance (insert or replace on conflict) |
delete/1 |
Remove instance. Idempotent — returns :ok even if not found |
query/1 |
Filter by :id, :status, :status_in, :workflow, :workflow_version, and/or :workflow_family |
The adapter uses persistent_term to store the Repo reference — no GenServer process overhead. The Repo module is resolved once at startup and looked up in constant time on every call.
Serialization
Workflow instances contain Elixir-specific types (atoms, MapSets, module references) that can't be stored directly in JSONB. The Serializer handles the conversion:
| Elixir type | DB representation |
|---|---|
| Atoms | "Elixir.MyApp.Step" strings |
| MapSets | Sorted string lists |
| DateTime | ISO 8601 strings |
| Atom map keys | String keys |
All deserialization uses String.to_existing_atom/1 — no arbitrary atom creation from database values.
Serializer.to_db/1 returns a 5-tuple:
{id, workflow, status, workflow_version, state}workflow_version is stored in its own column, while state remains JSONB.
Schema
Single table, simple structure:
workflow_instances
├── id VARCHAR(255) (primary key, business key format "key::value")
├── workflow STRING (module name)
├── workflow_version INTEGER (workflow schema version, default 1)
├── status STRING (pending | running | waiting | completed | failed)
├── state JSONB (serialized context, steps, history)
└── timestampsQuerying instances
# By exact instance ID
HephaestusEcto.Storage.query(id: "orderid::abc123")
# By status
HephaestusEcto.Storage.query(status: :running)
# By multiple statuses (active instances)
HephaestusEcto.Storage.query(status_in: [:pending, :running, :waiting])
# By workflow
HephaestusEcto.Storage.query(workflow: MyApp.OrderWorkflow)
# Combined (AND semantics)
HephaestusEcto.Storage.query(id: "orderid::abc123", status_in: [:pending, :running], workflow: MyApp.OrderWorkflow)
# By exact workflow version
HephaestusEcto.Storage.query(workflow_version: 2)
# By workflow family prefix
HephaestusEcto.Storage.query(workflow_family: "Elixir.MyApp.Workflows.Payment.")
For JSONB queries on the state field, use the GIN index directly via Ecto:
import Ecto.Query
from(i in HephaestusEcto.Schema.Instance,
where: fragment("state @> ?", ^%{"context" => %{"initial" => %{"order_id" => 123}}})
)
|> MyApp.Repo.all()Named instances
Multiple storage instances can coexist (e.g., for multi-tenant setups):
# Start with a name
HephaestusEcto.Storage.start_link(repo: MyApp.Repo, name: :tenant_a)
# Use the name in calls
HephaestusEcto.Storage.get(:tenant_a, instance_id)
HephaestusEcto.Storage.query(:tenant_a, status: :running)Requirements
- Elixir ~> 1.19
- PostgreSQL 9.4+ (for JSONB and GIN indexes)
- Ecto SQL ~> 3.10
License
MIT