gabsurd
A Gleam SDK for the Absurd durable workflow system. Provides type-safe database access via Parrot (sqlc) code generation and OTP worker actors for task processing.
Overview
gabsurd wraps the Absurd PostgreSQL schema with two layers:
- Generated SQL layer (
gabsurd/sql) — Type-safe query functions generated by Parrot (sqlc for Gleam). All database access goes through Absurd's PL/pgSQL functions. - SDK layer — Hand-written Gleam modules providing ergonomic APIs for queues, tasks, events, checkpoints, and worker actors.
Quick Start
1. Install & Setup
# gleam.toml
[dependencies]
gabsurd = { path = "..." }
gleam_erlang = ">= 1.3.0 and < 2.0.0"
gleam_otp = ">= 1.2.0 and < 2.0.0"
gleam_time = ">= 1.8.0 and < 2.0.0"
Ensure the Absurd schema is loaded into your PostgreSQL database.
2. Connect to the Database
import gabsurd/client
let assert Ok(started) = client.start("postgresql://user:pass@localhost:5432/mydb")
let db = started.data
3. Create a Queue
import gabsurd/queue
let assert Ok(Nil) = queue.create(db, "emails")
4. Spawn Tasks
import gleam/json
import gabsurd/task
let assert Ok(info) =
task.spawn(
db,
"emails",
"send_welcome",
json.object([#("to", json.string("user@example.com"))]),
task.new_options() |> task.with_max_attempts(3),
)
5. Start a Worker
import gleam/dynamic/decode
import gleam/json
import gleam/option
import gabsurd/worker.{Complete, Handler}
let email_handler = Handler(
task_name: "send_welcome",
execute: fn(ctx) {
// Process the task...
Complete(json.object([#("sent", json.bool(True))]))
},
on_error: option.None,
)
let config =
worker.new(db, "emails", [email_handler])
|> worker.with_poll_interval(1000)
|> worker.with_batch_size(10)
// Start a single worker
let assert Ok(started) = worker.start(config)
6. Worker Pool with Supervisor
import gleam/otp/static_supervisor
let pool = worker.pool_child_specs("email_workers", config, 4)
// pool is a List(ChildSpecification) with exactly 4 elements
let [first, second, third, fourth] = pool
let assert Ok(sup) =
static_supervisor.new(static_supervisor.OneForOne)
|> static_supervisor.add(first)
|> static_supervisor.add(second)
|> static_supervisor.add(third)
|> static_supervisor.add(fourth)
|> static_supervisor.start()
API Reference
gabsurd/client
Database connection management.
| Function | Description |
|---|---|
start(url) | Connect to PostgreSQL, returns StartResult(Db) |
gabsurd/queue
Queue lifecycle management.
| Function | Description |
|---|---|
create(db, name) | Create an unpartitioned queue |
create_with_mode(db, name, mode) | Create a queue with storage mode |
drop(db, name) | Drop a queue and all its data |
list(db) | List all queue names |
gabsurd/task
Task lifecycle operations.
| Function | Description |
|---|---|
spawn(db, queue, name, params, options) | Create a new task |
claim(db, queue, worker_id, timeout, qty) | Claim available tasks |
complete(db, queue, run_id, state) | Mark a run as completed |
fail(db, queue, run_id, reason) | Fail a run (uses queue retry policy) |
fail_with_retry(db, queue, run_id, reason, retry_at) | Fail a run and schedule retry |
cancel(db, queue, task_id) | Cancel a task |
get_result(db, queue, task_id) | Get task result as TaskResult record |
retry(db, queue, task_id, options) | Retry a failed task |
extend_claim(db, queue, run_id, extend_by) | Manually extend a claim lease (heartbeat) |
schedule_run(db, queue, run_id, defer_seconds) | Reschedule a run for future execution |
new_options() | Create empty spawn options |
with_max_attempts(options, n) | Set max attempts |
with_retry_strategy(options, strategy) | Set retry strategy |
with_cancellation(options, policy) | Set cancellation policy |
with_headers(options, json) | Set headers metadata |
with_idempotency_key(options, key) | Prevent duplicate spawning |
gabsurd/event
Event coordination for workflows.
| Function | Description |
|---|---|
emit(db, queue, event_name, payload) | Emit an event |
await(db, queue, task_id, run_id, step, event, timeout) | Await an event |
gabsurd/checkpoint
Workflow checkpoint persistence.
| Function | Description |
|---|---|
set(db, queue, task_id, step, state, run_id, extend_claim_by) | Save a checkpoint (pass claim_timeout to extend lease) |
get(db, queue, task_id, step, include_pending) | Retrieve a checkpoint, returns Result(Option(Checkpoint), Error) |
gabsurd/worker
OTP worker actor for task processing.
| Type | Description |
|---|---|
Handler(task_name, execute, on_error) | Handler for a specific task type |
HandlerResult | Return type: Complete(json), Fail(json), or Suspend |
Config | Worker configuration |
Worker | Running worker handle |
| Function | Description |
| ---------- | ------------- |
new(db, queue, handlers) | Create config with defaults |
with_poll_interval(config, ms) | Set poll interval (default: 5000ms) |
with_batch_size(config, n) | Set tasks claimed per poll (default: 1) |
with_claim_timeout(config, secs) | Set claim timeout (default: 30s) |
with_worker_id(config, id) | Set worker ID |
with_max_backoff(config, ms) | Set max error backoff (default: 60000ms) |
start(config) | Start a worker actor |
stop(worker) | Stop a worker gracefully |
child_spec(name, config) | Create supervisor child spec |
pool_child_specs(name, config, count) | Create N child specs for a pool |
gabsurd/context
Execution context passed to worker handlers.
| Type | Description |
|---|---|
Context | Encapsulates db, queue, claim, claim_timeout |
EventResult | Received(String) or Suspended |
| Function | Description |
|---|---|
task_id(ctx) | Task's unique identifier |
run_id(ctx) | Current run's unique identifier |
params(ctx) | Task parameters as raw JSON string |
task_name(ctx) | Task name |
attempt(ctx) | Current attempt number |
step(ctx, name, decoder, run) | Idempotent step — skips if checkpoint exists |
get_checkpoint(ctx, name) | Get raw JSON state for a step |
set_checkpoint(ctx, name, state) | Persist a checkpoint and extend lease |
heartbeat(ctx) | Extend the claim lease |
await_event(ctx, event_name, timeout) | Await an event, returns EventResult |
gabsurd/utility
Maintenance and introspection.
| Function | Description |
|---|---|
cleanup_queue(db, queue) | Clean up completed/failed tasks |
get_schema_version(db) | Get the installed schema version |
Architecture
┌─────────────────────────────────────┐
│ Your Application │
├─────────────────────────────────────┤
│ worker.gleam │ task.gleam │ ← SDK Layer (hand-written)
│ context.gleam │ queue.gleam │
│ event.gleam │ checkpoint.gleam │
│ utility.gleam │ │
├─────────────────────────────────────┤
│ sql.gleam (generated) │ ← Parrot codegen
├─────────────────────────────────────┤
│ param.gleam │ client.gleam │ ← pog driver adapter
├─────────────────────────────────────┤
│ PostgreSQL + absurd.sql │ ← Database
└─────────────────────────────────────┘
Worker Design
Workers use the Handler Record pattern — each task type gets a Handler with:
task_name: Which tasks this handler processesexecute: The business logic —fn(Context) -> HandlerResulton_error: Optional hook for logging/metrics when execute returnsFail
The handler receives a Context that encapsulates the database connection, queue,
claim details, and claim timeout. It provides context.step(ctx, ...) for
idempotent steps and context.await_event(ctx, ...) for event-driven workflows.
The HandlerResult type has three variants:
Complete(json)— mark the task as successfully doneFail(json)— mark the task as failed (triggers retry if attempts remain)Suspend— the task is waiting for an event; skip complete/fail
The worker polls the queue using process.send_after, claims tasks, constructs
a Context, dispatches to matching handlers by task_name, and handles the result.
Idempotent Steps
context.step(ctx, name, decoder, run) is the primary building block for durable workflows:
- On first execution: calls
run(), persists the result as a checkpoint, extends the lease - On retry: finds the checkpoint, decodes the stored value, skips execution
decoderis agleam/dynamic/decode.Decoder(a)— Gleam'sjson.Jsonis write-only, so a decoder is required to recover typed values from the database- For steps that don't need a return value, use
decode.success(Nil)
Unknown Task Deferral
Tasks with no registered handler are deferred (rescheduled with a delay) rather than failed. This supports rolling deployments where a new task type may arrive before its handler code is deployed.
Error Backoff
On transient claim errors, the worker backs off exponentially up to max_backoff
(default 60s), resetting on success.
For pools, use pool_child_specs to generate N workers with globally unique IDs
inside a static_supervisor.
Examples
Multi-Step Workflow with Checkpoints
Durable workflows survive crashes. Each step uses context.step — on retry,
completed steps are skipped. Steps also extend the worker's claim lease
so the task never times out mid-workflow.
import gleam/dynamic/decode
import gleam/json
import gleam/option
import gleam/result
import gabsurd/client
import gabsurd/context
import gabsurd/worker.{Complete, Fail, Handler}
let process_order = Handler(
task_name: "process_order",
execute: fn(ctx) {
case order_workflow(ctx) {
Ok(Nil) -> Complete(json.object([#("status", json.string("completed"))]))
Error(e) -> Fail(json.string(error_to_string(e)))
}
},
on_error: option.None,
)
fn order_workflow(ctx) -> Result(Nil, client.GabsurdError) {
let params = decode_params(context.params(ctx))
// Step 1: Charge credit card (skip if checkpoint exists)
use _ <- result.try(context.step(ctx, "charge", decode.success(Nil), fn() {
charge_card(params)
json.null()
}))
// Step 2: Reserve inventory (skip if done)
use _ <- result.try(context.step(ctx, "reserve", decode.success(Nil), fn() {
reserve_inventory(params)
json.null()
}))
// Step 3: Send confirmation
use _ <- result.try(context.step(ctx, "notify", decode.success(Nil), fn() {
send_confirmation(params)
json.null()
}))
Ok(Nil)
}
Steps That Pass Data Forward
fn order_workflow(ctx) -> Result(Nil, client.GabsurdError) {
// Charge and get back the charge_id
use charge_id <- result.try(
context.step(ctx, "charge", charge_id_decoder(), fn() {
let result = charge_card(decode_params(context.params(ctx)))
json.object([#("charge_id", json.string(result.id))])
}),
)
// Use charge_id in the next step
use _ <- result.try(context.step(ctx, "capture", decode.success(Nil), fn() {
capture_charge(charge_id)
json.null()
}))
Ok(Nil)
}
// Decoder using Gleam's continuation-based decode.field API:
fn charge_id_decoder() -> decode.Decoder(String) {
use charge_id <- decode.field("charge_id", decode.string)
decode.success(charge_id)
}
Event-Driven Workflow
Tasks can suspend waiting for an external event (e.g., a webhook callback).
Call context.await_event and return Suspend when the task needs to sleep.
When event.emit fires, the task wakes up and the handler runs again.
import gleam/dynamic/decode
import gleam/json
import gleam/option
import gleam/result
import gabsurd/client
import gabsurd/context
import gabsurd/context.{Received, Suspended}
import gabsurd/worker.{Complete, Fail, Handler, Suspend}
let generate_report = Handler(
task_name: "generate_report",
execute: fn(ctx) {
// Start the remote job (idempotent — skips if already done)
use job_id <- result.try(
context.step(ctx, "start", job_id_decoder(), fn() {
let id = start_remote_job(context.params(ctx))
json.object([#("job_id", json.string(id))])
}),
)
// Wait for the remote service to POST back
case context.await_event(ctx, "report_complete_" <> job_id, 3600) {
Ok(Received(payload)) -> Complete(json.string(payload))
Ok(Suspended) -> Suspend
Error(e) -> Fail(json.string("event error"))
}
},
on_error: option.None,
)
// Decoder for the job_id stored in the checkpoint:
fn job_id_decoder() -> decode.Decoder(String) {
use job_id <- decode.field("job_id", decode.string)
decode.success(job_id)
}
Then in your webhook HTTP handler (separate process):
import gabsurd/event
// When the remote service calls back:
let assert Ok(Nil) = event.emit(
db, "reports",
"report_complete_" <> job_id, // must match event_name from await
json.object([#("url", json.string(download_url))]),
)
Idempotent Task Spawning
Use with_idempotency_key to prevent duplicate tasks — critical for
payment processing where double-spawning means double-charging.
let assert Ok(info) =
task.spawn(
db, "payments", "charge_card",
json.object([#("amount", json.int(9999))]),
task.new_options()
|> task.with_idempotency_key("order-" <> order_id),
)
// If the caller crashes and retries, the same task is returned
// (info.created will be False on subsequent calls)
Retry Strategies
// Exponential backoff for flaky APIs
let assert Ok(_) =
task.spawn(
db, "integrations", "sync_crm",
json.object([#("contact_id", json.string(id))]),
task.new_options()
|> task.with_max_attempts(5)
|> task.with_retry_strategy(task.ExponentialRetry(
base_seconds: 10, factor: 2.0, max_seconds: 300.0,
)),
)
// Retries at ~10s, 20s, 40s, 80s, capped at 300s
Long-Running Tasks with Manual Heartbeats
For tasks that do a single long operation without checkpoints:
import gleam/list
import gleam/json
import gabsurd/worker.{Complete, Handler}
let handler = Handler(
task_name: "batch_import",
execute: fn(ctx) {
list.each(records, fn(batch) {
process_batch(batch)
// Keep the lease alive after each batch
let _ = context.heartbeat(ctx)
})
Complete(json.object([#("imported", json.int(list.length(records)))]))
},
on_error: option.None,
)
Development
# Start PostgreSQL
bash bin/postgres.sh
# Reset database
bash bin/reset_db.sh
# Run tests
DATABASE_URL="postgresql://gabsurd:gabsurd@127.0.0.1:5432/gabsurd" gleam test
# Regenerate SQL (after changing queries.sql)
DATABASE_URL="..." PATH="$(nix eval --raw 'nixpkgs#postgresql_17.out')/bin:$PATH" gleam run -m parrot
License
Apache-2.0