aion_flow

Typed Gleam SDK for authoring durable Aion workflows. Use it to define workflow entry points, declare typed activities, receive signals, expose read-only queries, use deterministic timers/time/randomness, and test workflow code in Gleam.

Workflow code should be deterministic: do not read wall clocks or ambient entropy directly. Use aion/workflow.now, aion/workflow.random, aion/workflow.random_int, and timer primitives so replay sees the same command stream.

Use in this repository

The package is named aion_flow and targets Erlang/BEAM. Repository examples use it as a local path dependency rather than relying on a published package:

[dependencies]
aion_flow = { path = "../../gleam/aion_flow" }

Public modules:

Define a workflow

A workflow definition names an entry function and carries codecs for input, output, and workflow errors:

import aion/workflow
pub fn definition() {
workflow.define(
"hello_world",
request_codec(),
greeting_codec(),
workflow_error_codec(),
run,
)
}
pub fn run(input: Request) -> Result(Greeting, String) {
// Durable workflow logic goes here.
Ok(Greeting(message: "Hello, " <> input.name <> "!"))
}

workflow.define(name, input_codec, output_codec, error_codec, entry_fn) returns a typed WorkflowDefinition(input, output, workflow_error) that package tooling and tests can inspect with workflow.name, workflow.input_codec, workflow.output_codec, workflow.error_codec, and workflow.entry_fn.

Declare and call activities

Activities are typed values. activity.new stores the activity name, typed input, codecs, and local runner shape; workflow.run records the dispatch and returns Result(output, error.ActivityError).

import aion/activity
import aion/error
import aion/workflow
fn greet(name: String) -> activity.Activity(String, String) {
activity.new(
"greet",
name,
string_codec(),
string_codec(),
fn(value) { Ok("Hello, " <> value <> "!") },
)
}
pub fn run(input: Request) -> Result(String, String) {
case workflow.run(greet(input.name)) {
Ok(message) -> Ok(message)
Error(error.Retryable(message:, details: _)) -> Error(message)
Error(error.Terminal(message:, details: _)) -> Error(message)
Error(_) -> Error("activity failed")
}
}

An activity created with activity.new has no hidden retry, timeout, or heartbeat defaults. Add policies explicitly with activity.retry, activity.timeout, and activity.heartbeat when a workflow needs them.

For homogeneous fanout, use workflow.all, workflow.race, or workflow.map over activity values.

Codecs

All workflow, activity, signal, and query payloads cross engine boundaries through aion/codec.Codec(a):

import aion/codec
import gleam/dynamic/decode
import gleam/json
fn string_codec() -> codec.Codec(String) {
codec.json_codec(json.string, decode.string)
}

For custom records, provide a JSON encoder and a gleam/dynamic/decode.Decoder for the same shape.

Signals

Signals are named, typed messages delivered to a running workflow. Construct a SignalRef(payload) once and receive it from workflow code:

import aion/signal
import aion/workflow
fn approval_signal() -> signal.SignalRef(Bool) {
signal.new("approval", bool_codec())
}
pub fn wait_for_approval() -> Result(Bool, String) {
case workflow.receive(approval_signal()) {
Ok(approved) -> Ok(approved)
Error(_) -> Error("approval signal failed")
}
}

signal.send(workflow_id, reference, payload) is available for callers already inside the engine/Gleam-client boundary. Network-facing callers should use the client SDKs.

Timers, deterministic time, and timeouts

Use workflow primitives instead of wall-clock functions:

import aion/duration
import aion/error
import aion/workflow
import gleam/result
pub fn pause_then_read_time() -> Result(workflow.Timestamp, error.EngineError) {
use _ <- result.try(workflow.sleep(duration.minutes(5)))
workflow.now()
}

The timer API also includes workflow.start_timer, workflow.cancel_timer, workflow.timer_id, and workflow.with_timeout.

Queries

Queries are read-only and record no workflow events. A handler returns a typed value through a codec; by convention it must not dispatch activities or mutate workflow state.

import aion/error
import aion/query
pub fn register_state_query(
current_status: fn() -> String,
) -> Result(Nil, error.QueryError) {
query.handler("state", string_codec(), current_status)
}

query.dispatch(name, value_codec) is provided for callers inside the engine boundary and for the Gleam test harness.

Minimal example

import aion/activity
import aion/codec
import aion/duration
import aion/error
import aion/signal
import aion/workflow
import gleam/dynamic/decode
import gleam/json
import gleam/result
type Request {
Request(name: String)
}
fn string_codec() -> codec.Codec(String) {
codec.json_codec(json.string, decode.string)
}
fn request_codec() -> codec.Codec(Request) {
codec.json_codec(request_to_json, request_decoder())
}
fn request_to_json(request: Request) -> json.Json {
json.object([#("name", json.string(request.name))])
}
fn request_decoder() -> decode.Decoder(Request) {
use name <- decode.field("name", decode.string)
decode.success(Request(name: name))
}
fn greet_activity(name: String) -> activity.Activity(String, String) {
activity.new("greet", name, string_codec(), string_codec(), fn(name) {
Ok("Hello, " <> name <> "!")
})
}
fn bool_codec() -> codec.Codec(Bool) {
codec.json_codec(json.bool, decode.bool)
}
fn approval_signal() -> signal.SignalRef(Bool) {
signal.new("approval", bool_codec())
}
pub fn definition() {
workflow.define("hello_world", request_codec(), string_codec(), string_codec(), run)
}
pub fn run(input: Request) -> Result(String, String) {
use greeting <- result.try(
case workflow.run(greet_activity(input.name)) {
Ok(value) -> Ok(value)
Error(_) -> Error("activity failed")
},
)
use _ <- result.try(
case workflow.sleep(duration.seconds(1)) {
Ok(value) -> Ok(value)
Error(_) -> Error("timer failed")
},
)
use approved <- result.try(
case workflow.receive(approval_signal()) {
Ok(value) -> Ok(value)
Error(_) -> Error("signal failed")
},
)
case approved {
True -> Ok(greeting)
False -> Error("not approved")
}
}

For a complete end-to-end sample that builds a Gleam workflow, packages it as .aion, starts aion-server, runs a Python worker, and starts a workflow over HTTP, see ../../examples/hello-world/README.md.