aion_client

Gleam caller SDK for Aion workflow servers. It exposes connect plus the seven workflow operations: start, signal, query, cancel, list, describe, and subscribe.

Install

gleam add aion_client

Server prerequisite

Run an aion-server that implements the AW workflow API. The example test mirrors the AL-007 fixture values and can be run with:

export AION_SERVER_URL=http://127.0.0.1:8080
export AION_AUTH_TOKEN=dev-token # optional
gleam test

See test/example_test.gleam for a complete seven-operation flow using the SDK surface and fixture transport. The default transport reports Unavailable until the server HTTP/WebSocket adapter is supplied by the embedding application.

Connect

import aion_client
import gleam/option.{Some}
let assert Ok(client) =
aion_client.connect(aion_client.Config(
endpoint: "http://127.0.0.1:8080",
bearer_token: Some("dev-token"),
namespace: "conformance",
tls: False,
))

start

Typed payloads use a Gleam JSON encoder. The idempotency key makes retrying the same start safe; conflicting reuse is surfaced as error.AlreadyExists.

import gleam/json
import gleam/option.{Some}
let assert Ok(handle) =
aion_client.start(
client,
aion_client.StartOptions(
workflow_id: "echo-readme",
workflow_type: "conformance.echo",
task_queue: "conformance",
idempotency_key: Some("readme-seven-operations"),
),
#("hello", 1),
fn(input) {
let #(message, counter) = input
json.object([
#("message", json.string(message)),
#("counter", json.int(counter)),
])
},
)

signal

import aion_client/handle as workflow_handle
let assert Ok(Nil) =
workflow_handle.signal(handle, "record", "signal-observed", fn(value) {
json.object([#("value", json.string(value))])
})

query

import gleam/dynamic/decode
let assert Ok(last_signal) =
workflow_handle.query(handle, "state", Nil, fn(_) { json.null() }, decode.string)

list

let assert Ok(summaries) =
aion_client.list(client, aion_client.ListOptions(namespace: Some("conformance")))

describe

let assert Ok(description) = workflow_handle.describe(handle)

cancel

Cancellation is a cooperative request: success means the server accepted the request.

let assert Ok(Nil) = workflow_handle.cancel(handle, "caller requested cancellation")

subscribe

handle.subscribe returns an EventStream. Stream collection yields EventItem, StreamError, or StreamEnd; transient reconnect/resume semantics are exercised in the example test through stream.subscribe_with_stub.

import aion_client/stream
let events = workflow_handle.subscribe(handle, decode.string) |> stream.collect

Typed and raw payloads

Typed operations accept encoders/decoders from gleam/json and gleam/dynamic/decode. The raw escape hatch is the public payload.Payload(content_type:, bytes:) type and *_raw functions:

import aion_client/payload
let raw = payload.Payload(content_type: payload.json_content_type, bytes: "{\"value\":\"raw\"}")
let assert Ok(Nil) = workflow_handle.signal_raw(handle, "record", raw)

Branching on errors

All operations return Result(_, error.Error), so callers can branch on the shared taxonomy.

import aion_client/error
import gleam/io
case workflow_handle.query(handle, "state", Nil, fn(_) { json.null() }, decode.string) {
Ok(state) -> io.println(state)
Error(error.QueryTimeout) -> io.println("query timed out; use a longer timeout")
Error(error.AlreadyExists) -> io.println("idempotency key conflict")
Error(error.Unavailable) -> io.println("server or stream transport is unavailable")
Error(_) -> io.println("workflow operation failed")
}