gabsurd

A Gleam SDK for the Absurd durable workflow system. Provides type-safe database access via Parrot (sqlc) code generation and OTP worker actors for task processing.

Overview

gabsurd wraps the Absurd PostgreSQL schema with two layers:

  1. Generated SQL layer (gabsurd/sql) — Type-safe query functions generated by Parrot (sqlc for Gleam). All database access goes through Absurd's PL/pgSQL functions.
  2. SDK layer — Hand-written Gleam modules providing ergonomic APIs for queues, tasks, events, checkpoints, and worker actors.

Quick Start

1. Install & Setup

# gleam.toml
[dependencies]
gabsurd = { path = "..." }
gleam_erlang = ">= 1.3.0 and < 2.0.0"
gleam_otp = ">= 1.2.0 and < 2.0.0"
gleam_time = ">= 1.8.0 and < 2.0.0"

Ensure the Absurd schema is loaded into your PostgreSQL database.

2. Connect to the Database

import gabsurd/client
let assert Ok(started) = client.start("postgresql://user:pass@localhost:5432/mydb")
let db = started.data

3. Create a Queue

import gabsurd/queue
let assert Ok(Nil) = queue.create(db, "emails")

4. Spawn Tasks

import gleam/json
import gabsurd/task
let assert Ok(info) =
task.spawn(
db,
"emails",
"send_welcome",
json.object([#("to", json.string("user@example.com"))]),
task.new_options() |> task.with_max_attempts(3),
)

5. Start a Worker

import gleam/dynamic/decode
import gleam/json
import gleam/option
import gabsurd/worker.{Complete, Handler}
let email_handler = Handler(
task_name: "send_welcome",
execute: fn(ctx) {
// Process the task...
Complete(json.object([#("sent", json.bool(True))]))
},
on_error: option.None,
)
let config =
worker.new(db, "emails", [email_handler])
|> worker.with_poll_interval(1000)
|> worker.with_batch_size(10)
// Start a single worker
let assert Ok(started) = worker.start(config)

6. Worker Pool with Supervisor

import gleam/otp/static_supervisor
let pool = worker.pool_child_specs("email_workers", config, 4)
// pool is a List(ChildSpecification) with exactly 4 elements
let [first, second, third, fourth] = pool
let assert Ok(sup) =
static_supervisor.new(static_supervisor.OneForOne)
|> static_supervisor.add(first)
|> static_supervisor.add(second)
|> static_supervisor.add(third)
|> static_supervisor.add(fourth)
|> static_supervisor.start()

API Reference

gabsurd/client

Database connection management.

FunctionDescription
start(url)Connect to PostgreSQL, returns StartResult(Db)

gabsurd/queue

Queue lifecycle management.

FunctionDescription
create(db, name)Create an unpartitioned queue
create_with_mode(db, name, mode)Create a queue with storage mode
drop(db, name)Drop a queue and all its data
list(db)List all queue names

gabsurd/task

Task lifecycle operations.

FunctionDescription
spawn(db, queue, name, params, options)Create a new task
claim(db, queue, worker_id, timeout, qty)Claim available tasks
complete(db, queue, run_id, state)Mark a run as completed
fail(db, queue, run_id, reason)Fail a run (uses queue retry policy)
fail_with_retry(db, queue, run_id, reason, retry_at)Fail a run and schedule retry
cancel(db, queue, task_id)Cancel a task
get_result(db, queue, task_id)Get task result as TaskResult record
retry(db, queue, task_id, options)Retry a failed task
extend_claim(db, queue, run_id, extend_by)Manually extend a claim lease (heartbeat)
schedule_run(db, queue, run_id, defer_seconds)Reschedule a run for future execution
new_options()Create empty spawn options
with_max_attempts(options, n)Set max attempts
with_retry_strategy(options, strategy)Set retry strategy
with_cancellation(options, policy)Set cancellation policy
with_headers(options, json)Set headers metadata
with_idempotency_key(options, key)Prevent duplicate spawning

gabsurd/event

Event coordination for workflows.

FunctionDescription
emit(db, queue, event_name, payload)Emit an event
await(db, queue, task_id, run_id, step, event, timeout)Await an event

gabsurd/checkpoint

Workflow checkpoint persistence.

FunctionDescription
set(db, queue, task_id, step, state, run_id, extend_claim_by)Save a checkpoint (pass claim_timeout to extend lease)
get(db, queue, task_id, step, include_pending)Retrieve a checkpoint, returns Result(Option(Checkpoint), Error)

gabsurd/worker

OTP worker actor for task processing.

TypeDescription
Handler(task_name, execute, on_error)Handler for a specific task type
HandlerResultReturn type: Complete(json), Fail(json), or Suspend
ConfigWorker configuration
WorkerRunning worker handle
FunctionDescription
-----------------------
new(db, queue, handlers)Create config with defaults
with_poll_interval(config, ms)Set poll interval (default: 5000ms)
with_batch_size(config, n)Set tasks claimed per poll (default: 1)
with_claim_timeout(config, secs)Set claim timeout (default: 30s)
with_worker_id(config, id)Set worker ID
with_max_backoff(config, ms)Set max error backoff (default: 60000ms)
start(config)Start a worker actor
stop(worker)Stop a worker gracefully
child_spec(name, config)Create supervisor child spec
pool_child_specs(name, config, count)Create N child specs for a pool

gabsurd/context

Execution context passed to worker handlers.

TypeDescription
ContextEncapsulates db, queue, claim, claim_timeout
EventResultReceived(String) or Suspended
FunctionDescription
task_id(ctx)Task's unique identifier
run_id(ctx)Current run's unique identifier
params(ctx)Task parameters as raw JSON string
task_name(ctx)Task name
attempt(ctx)Current attempt number
step(ctx, name, decoder, run)Idempotent step — skips if checkpoint exists
get_checkpoint(ctx, name)Get raw JSON state for a step
set_checkpoint(ctx, name, state)Persist a checkpoint and extend lease
heartbeat(ctx)Extend the claim lease
await_event(ctx, event_name, timeout)Await an event, returns EventResult

gabsurd/utility

Maintenance and introspection.

FunctionDescription
cleanup_queue(db, queue)Clean up completed/failed tasks
get_schema_version(db)Get the installed schema version

Architecture

┌─────────────────────────────────────┐
Your Application
├─────────────────────────────────────┤
worker.gleamtask.gleam │ ← SDK Layer (hand-written)
context.gleamqueue.gleam
event.gleamcheckpoint.gleam
utility.gleam │ │
├─────────────────────────────────────┤
sql.gleam (generated) │ ← Parrot codegen
├─────────────────────────────────────┤
param.gleamclient.gleam │ ← pog driver adapter
├─────────────────────────────────────┤
PostgreSQL + absurd.sql │ ← Database
└─────────────────────────────────────┘

Worker Design

Workers use the Handler Record pattern — each task type gets a Handler with:

The handler receives a Context that encapsulates the database connection, queue, claim details, and claim timeout. It provides context.step(ctx, ...) for idempotent steps and context.await_event(ctx, ...) for event-driven workflows.

The HandlerResult type has three variants:

The worker polls the queue using process.send_after, claims tasks, constructs a Context, dispatches to matching handlers by task_name, and handles the result.

Idempotent Steps

context.step(ctx, name, decoder, run) is the primary building block for durable workflows:

Unknown Task Deferral

Tasks with no registered handler are deferred (rescheduled with a delay) rather than failed. This supports rolling deployments where a new task type may arrive before its handler code is deployed.

Error Backoff

On transient claim errors, the worker backs off exponentially up to max_backoff (default 60s), resetting on success.

For pools, use pool_child_specs to generate N workers with globally unique IDs inside a static_supervisor.

Examples

Multi-Step Workflow with Checkpoints

Durable workflows survive crashes. Each step uses context.step — on retry, completed steps are skipped. Steps also extend the worker's claim lease so the task never times out mid-workflow.

import gleam/dynamic/decode
import gleam/json
import gleam/option
import gleam/result
import gabsurd/client
import gabsurd/context
import gabsurd/worker.{Complete, Fail, Handler}
let process_order = Handler(
task_name: "process_order",
execute: fn(ctx) {
case order_workflow(ctx) {
Ok(Nil) -> Complete(json.object([#("status", json.string("completed"))]))
Error(e) -> Fail(json.string(error_to_string(e)))
}
},
on_error: option.None,
)
fn order_workflow(ctx) -> Result(Nil, client.GabsurdError) {
let params = decode_params(context.params(ctx))
// Step 1: Charge credit card (skip if checkpoint exists)
use _ <- result.try(context.step(ctx, "charge", decode.success(Nil), fn() {
charge_card(params)
json.null()
}))
// Step 2: Reserve inventory (skip if done)
use _ <- result.try(context.step(ctx, "reserve", decode.success(Nil), fn() {
reserve_inventory(params)
json.null()
}))
// Step 3: Send confirmation
use _ <- result.try(context.step(ctx, "notify", decode.success(Nil), fn() {
send_confirmation(params)
json.null()
}))
Ok(Nil)
}

Steps That Pass Data Forward

fn order_workflow(ctx) -> Result(Nil, client.GabsurdError) {
// Charge and get back the charge_id
use charge_id <- result.try(
context.step(ctx, "charge", charge_id_decoder(), fn() {
let result = charge_card(decode_params(context.params(ctx)))
json.object([#("charge_id", json.string(result.id))])
}),
)
// Use charge_id in the next step
use _ <- result.try(context.step(ctx, "capture", decode.success(Nil), fn() {
capture_charge(charge_id)
json.null()
}))
Ok(Nil)
}
// Decoder using Gleam's continuation-based decode.field API:
fn charge_id_decoder() -> decode.Decoder(String) {
use charge_id <- decode.field("charge_id", decode.string)
decode.success(charge_id)
}

Event-Driven Workflow

Tasks can suspend waiting for an external event (e.g., a webhook callback). Call context.await_event and return Suspend when the task needs to sleep. When event.emit fires, the task wakes up and the handler runs again.

import gleam/dynamic/decode
import gleam/json
import gleam/option
import gleam/result
import gabsurd/client
import gabsurd/context
import gabsurd/context.{Received, Suspended}
import gabsurd/worker.{Complete, Fail, Handler, Suspend}
let generate_report = Handler(
task_name: "generate_report",
execute: fn(ctx) {
// Start the remote job (idempotent — skips if already done)
use job_id <- result.try(
context.step(ctx, "start", job_id_decoder(), fn() {
let id = start_remote_job(context.params(ctx))
json.object([#("job_id", json.string(id))])
}),
)
// Wait for the remote service to POST back
case context.await_event(ctx, "report_complete_" <> job_id, 3600) {
Ok(Received(payload)) -> Complete(json.string(payload))
Ok(Suspended) -> Suspend
Error(e) -> Fail(json.string("event error"))
}
},
on_error: option.None,
)
// Decoder for the job_id stored in the checkpoint:
fn job_id_decoder() -> decode.Decoder(String) {
use job_id <- decode.field("job_id", decode.string)
decode.success(job_id)
}

Then in your webhook HTTP handler (separate process):

import gabsurd/event
// When the remote service calls back:
let assert Ok(Nil) = event.emit(
db, "reports",
"report_complete_" <> job_id, // must match event_name from await
json.object([#("url", json.string(download_url))]),
)

Idempotent Task Spawning

Use with_idempotency_key to prevent duplicate tasks — critical for payment processing where double-spawning means double-charging.

let assert Ok(info) =
task.spawn(
db, "payments", "charge_card",
json.object([#("amount", json.int(9999))]),
task.new_options()
|> task.with_idempotency_key("order-" <> order_id),
)
// If the caller crashes and retries, the same task is returned
// (info.created will be False on subsequent calls)

Retry Strategies

// Exponential backoff for flaky APIs
let assert Ok(_) =
task.spawn(
db, "integrations", "sync_crm",
json.object([#("contact_id", json.string(id))]),
task.new_options()
|> task.with_max_attempts(5)
|> task.with_retry_strategy(task.ExponentialRetry(
base_seconds: 10, factor: 2.0, max_seconds: 300.0,
)),
)
// Retries at ~10s, 20s, 40s, 80s, capped at 300s

Long-Running Tasks with Manual Heartbeats

For tasks that do a single long operation without checkpoints:

import gleam/list
import gleam/json
import gabsurd/worker.{Complete, Handler}
let handler = Handler(
task_name: "batch_import",
execute: fn(ctx) {
list.each(records, fn(batch) {
process_batch(batch)
// Keep the lease alive after each batch
let _ = context.heartbeat(ctx)
})
Complete(json.object([#("imported", json.int(list.length(records)))]))
},
on_error: option.None,
)

Development

# Start PostgreSQL
bash bin/postgres.sh
# Reset database
bash bin/reset_db.sh
# Run tests
DATABASE_URL="postgresql://gabsurd:gabsurd@127.0.0.1:5432/gabsurd" gleam test
# Regenerate SQL (after changing queries.sql)
DATABASE_URL="..." PATH="$(nix eval --raw 'nixpkgs#postgresql_17.out')/bin:$PATH" gleam run -m parrot

License

Apache-2.0