Shigoto 仕事

PostgreSQL-backed background job processing for Erlang.

Shigoto (仕事, "work") is a feature-rich job queue built on PostgreSQL's FOR UPDATE SKIP LOCKED for safe multi-node operation. No Redis or external broker needed — if you have PostgreSQL, you have a job queue.

Features

Core

Scheduling

Reliability

Resilience (seki integration)

Operations

Testing

Quick Start

Add to your deps:

{deps, [
    {shigoto, {git, "https://github.com/Taure/shigoto.git", {branch, "main"}}}
]}.

Configure in sys.config:

{shigoto, [
    {pool, my_app_db},
    {queues, [{<<"default">>, 10}, {<<"emails">>, 5}]},
    {poll_interval, 5000},
    {cron, [
        {<<"daily_cleanup">>, <<"0 3 * * *">>, my_cleanup_worker, #{}}
    ]}
]}

Run the migration:

shigoto_migration:up(my_app_db).

Define a worker:

-module(my_email_worker).
-behaviour(shigoto_worker).
-export([perform/1]).

perform(#{<<"to">> := To, <<"subject">> := Subject}) ->
    send_email(To, Subject),
    ok.

Enqueue jobs:

%% Simple insert
shigoto:insert(#{
    worker => my_email_worker,
    args => #{<<"to">> => <<"user@example.com">>, <<"subject">> => <<"Welcome">>}
}).

%% Scheduled for later
shigoto:insert(#{
    worker => my_cleanup_worker,
    scheduled_at => {{2026, 3, 20}, {3, 0, 0}}
}).

%% With priority and queue
shigoto:insert(#{
    worker => my_urgent_worker,
    args => #{},
    priority => 10,
    queue => <<"critical">>
}).

Job Lifecycle

available → executing → completed
                     ↘ retryable → available (retry with backoff)
                     ↘ discarded (max attempts reached)
                     ↘ snoozed → available (rescheduled, attempt preserved)

Jobs can also be cancelled via shigoto:cancel/2 and retried via shigoto:retry/2.

Worker Callbacks

All optional except perform/1:

Callback Default Description
perform/1required Execute the job. Return ok, {error, Reason}, or {snooze, Seconds}
max_attempts/03 Maximum retry attempts before discarding
queue/0<<"default">> Default queue name
priority/00 Default priority (higher = claimed first)
timeout/0300000 Execution timeout in milliseconds
unique/0 Uniqueness constraints
tags/0[] Default tags for filtering
backoff/2 exponential Custom retry delay: (Attempt, Error) -> Seconds
rate_limit/0 Seki rate limiter config
concurrency/0 Max concurrent executions per node (seki bulkhead)
global_concurrency/0 Max concurrent executions across all nodes
circuit_breaker/0 Per-worker circuit breaker thresholds
middleware/0[] Worker-specific middleware chain
on_discard/2 Called when a job is permanently discarded

Configuration

Option Default Description
poolrequired pgo pool name
queues[{<<"default">>, 10}] Queue names and concurrency limits
poll_interval5000 Milliseconds between polling
cron[]{Name, Schedule, Worker, Args} tuples
prune_after_days14 Days to keep completed/discarded jobs
shutdown_timeout15000 Milliseconds to wait for in-flight jobs
middleware[] Global middleware chain
encryption_key 32-byte AES-256-GCM key
encryption_keys[] Ordered key list for rotation (newest first)
heartbeat_interval30000 Stale job detection interval
load_shedding CoDel config map for seki
queue_weights#{} Weighted polling distribution
fair_queues[] Queues using partition-key fair claiming
notifier LISTEN/NOTIFY connection config

Supervision Tree

shigoto_sup (one_for_one)
  ├─ shigoto_executor_sup    — simple_one_for_one for job execution
  ├─ shigoto_queue_sup       — one gen_server per queue
  │    ├─ shigoto_queue:default
  │    └─ shigoto_queue:emails
  ├─ shigoto_cron            — cron scheduling with leader election
  ├─ shigoto_pruner          — hourly archival and cleanup
  ├─ shigoto_heartbeat       — periodic heartbeat updates
  └─ shigoto_notifier        — LISTEN/NOTIFY (optional)

Guides

Ecosystem

Requirements

License

MIT