Cyclium
Autonomous agent framework for Elixir — actors monitor domains, run multi-turn episodes with budget enforcement, and produce persistent findings and typed outputs.
Cyclium is an Elixir library for building agentic systems that monitor domains, run multi-turn episodes, classify situations, and produce typed outputs. Actors declare expectations about how things should be; when triggers fire, episodes execute strategies that can gather data, call tools, synthesize with LLMs, and converge into findings and outputs. Think of it as an OTP-native agent framework where the episode — not the request — is the unit of work.
Key features
- Declarative Actor DSL — Define actors, expectations, triggers, and budgets in a compact macro-based syntax
- Strategy Pattern — Pluggable investigation logic with a clear init → observe → converge lifecycle
- Episode Runner — Budget-enforced execution loop with step journaling, checkpointing, and crash recovery
- Findings Lifecycle — Persistent observations with raise/update/clear semantics, upsert-by-key, causality chains, TTL expiration, severity escalation, and post-raise enrichment hooks
- Output Router — Deduplicated, adapter-based delivery (email, Slack, webhooks) with approval gates and adapter registry
- Event Bus — Phoenix.PubSub-backed event system connecting actors without coupling
- Workflow Engine — Multi-actor coordination with dependency graphs, failure policies, retry with backoff, cross-workflow episode dedup, and cancellation cascade
- Circuit Breaker — Per-expectation circuit breaker with configurable thresholds, half-open recovery, and optional in-flight episode cancellation
- Episode Sampling — Probabilistic firing control via
sample_rateon expectations - Service Level Tracking — Declarative performance objectives with breach detection and telemetry
- Adaptive Budgets — Advisory budget recommendations based on historical episode usage (p95 with headroom)
- Backpressure Controls — Per-actor concurrency limits with queue, drop, or shed-oldest overflow policies
- Debounce and Cooldown — Temporal controls to coalesce rapid-fire events and enforce minimum gaps
- Log Projection — Materialized human-readable logs at configurable verbosity (none → full_debug)
- Telemetry — 36 structured telemetry events for observability
- OTP-Native — No Oban or external job queue required; episodes run as Tasks under DynamicSupervisor
- Test Kit — Assertion macros and fakes for validating actors, strategies, synthesizers, output adapters, workflows, and checkpoint migrations in host apps
- SQL Server 2017 Compatible — Transaction-based upserts, denormalized query columns, no JSON operators in DDL
Who is this for?
Cyclium is designed for Elixir teams building autonomous agent systems where:
- Business rules define what should be true (SLAs, health thresholds, compliance checks)
- Episodes involve multiple steps: data gathering, LLM synthesis, tool calls, human approval
- Findings need to persist and evolve over time (raised → updated → cleared)
- Actions need deduplication, audit trails, and typed delivery through adapters
- Multiple actors need to coordinate through workflows with dependency ordering
- Real-time visibility into agent state is essential (Phoenix LiveView integration via Bus)
If you need a simple cron job or a one-shot script, Cyclium is overkill. Cyclium shines when you have ongoing, stateful processes that produce findings and outputs — and need the lifecycle, audit trail, and coordination to go with them.
How Cyclium differs
vs. Oban — Oban is a job queue: enqueue work, run it, done. Cyclium is an agent framework that manages stateful, multi-turn episodes with budgets, findings, outputs, and workflows. Episodes happen to run as OTP Tasks, so you don't need Oban — but the two solve different problems. You could use both: Oban for fire-and-forget jobs, Cyclium for ongoing autonomous processes.
vs. Sagents — Sagents is built for interactive AI conversations where users chat with LLM-powered agents in real time. Cyclium is built for autonomous operational agents that monitor domains, classify situations, and act — with or without an LLM in the loop. Cyclium's strategies can call LLMs via :synthesize, but they can also run purely deterministic logic. The execution model (expectations → episodes → findings → outputs) is designed for operational workflows, not chat.
vs. GenServers / custom OTP — You could build all of this with raw GenServers, but Cyclium gives you the episode lifecycle (budgets, journaling, checkpoints, crash recovery), the findings system (upsert-by-key, severity, evidence), the output router (deduplication, adapters, approval gates), the workflow engine (dependency graphs, failure policies), and the event bus — all wired together with telemetry and audit trails.
Strategy-driven vs. LLM-routed
Most agent frameworks put the LLM in the driver's seat — it decides which tool to call, when to stop, and how to recover from errors. Cyclium inverts this. The developer is the router: your next_step/2 function is a deterministic state machine that decides what happens next. The LLM is a powerful tool you call at specific points via :synthesize, but it never controls the flow.
This means you can mix deterministic and AI-powered steps in a single episode — gather data with a tool call, classify it with an LLM, then act on the result with another tool call — all under explicit developer control with budget enforcement at every turn:
def next_step(%{phase: :gather} = state, _ctx) do
{:tool_call, :erp_read, :search_pos, %{status: "STALLED"}}
end
def next_step(%{phase: :classify, po_data: data} = state, _ctx) do
{:synthesize, %{task: :classify_po, data: data}}
end
def next_step(%{phase: :act, classification: "vendor_delay"} = state, _ctx) do
{:tool_call, :email_write, :send_followup, build_email(state)}
endThe LLM is powerful, but it's not the control plane. You get repeatability, testability, and full visibility into exactly which steps ran and why — without sacrificing the ability to use AI where it adds value.
Architecture
The examples throughout this README use a client health monitoring system: a
ClientHealthActorthat evaluates client metrics (MRR, active users, support tickets) on each change and classifies health status, plus aClientAdvisorActorthat synthesizes an LLM-powered summary. See the demo application for the full working implementation.
Supervision tree
YourApp.Supervisor
├── YourApp.Repo
├── Phoenix.PubSub
├── YourApp.Actors.ClientHealthActor (GenServer)
├── YourApp.Actors.ClientAdvisorActor (GenServer)
├── Cyclium.Supervisor
│ ├── Cyclium.ActorSupervisor (DynamicSupervisor)
│ ├── Cyclium.EpisodeSupervisor (DynamicSupervisor)
│ │ └── Cyclium.EpisodeTask (one per running episode)
│ ├── Cyclium.TaskSupervisor (Task.Supervisor)
│ ├── Cyclium.Reconciler (optional — spec change detection)
│ ├── Cyclium.WorkflowEngine (optional — multi-actor workflows)
│ └── Cyclium.Findings.FindingSweep (optional — expiration + escalation)
└── YourAppWeb.EndpointActors are GenServers that subscribe to Bus events and manage episode lifecycle. They're started by the consuming app's supervision tree, above Cyclium's supervisors. When a trigger fires, the actor creates an Episode row and starts an EpisodeTask under the EpisodeSupervisor. Each task resolves a strategy from the registry and runs the episode loop.
Key design principles:
- Actors own concurrency limits — they track active/queued episodes in-process
-
Episodes are durable — the
cyclium_episodestable is itself a work queue - The Bus connects everything — actors, LiveViews, and workflows all subscribe to the same event stream
- Strategies are stateless modules — all state lives in the episode's strategy state map
Execution model
Bus event arrives
→ Actor.handle_info matches expectation trigger
→ Check debounce/cooldown → circuit breaker → sample_rate
→ Check concurrency (active < max?)
→ yes: create Episode row, start EpisodeTask under DynamicSupervisor
→ no: apply overflow policy (queue / drop / shed_oldest)
EpisodeTask starts
→ Resolve strategy (persistent_term registration from actor boot → registry override)
→ strategy.init(episode, trigger)
→ EpisodeRunner.execute_loop:
┌─────────────────────────────────────────────┐
│ check_budget → check_loop → increment_turn │
│ strategy.next_step(state, ctx) │
│ :done → journal, set done │
│ :converge → run converge pipeline │
│ {:tool_call} → exec tool, handle_result │
│ {:observe} → journal, handle_result │
│ {:synthesize} → journal, handle_result │
│ {:checkpoint} → save state, loop │
│ {:approval} → block, wait for human │
│ {:wait} → block, wait for external │
│ ... → loop │
└─────────────────────────────────────────────┘
Converge pipeline (post_converge):
1. Persist findings (raise/update/clear) → enrich → Bus events per finding
2. Route outputs through adapters → dedup by dedupe_key, deliver
3. Compute final episode status from delivery outcomes
4. Journal completion/failure step
5. Project log via LogProjector
6. Record service levels + check for breach
7. Record adaptive budget sample (if enabled)
8. Broadcast episode.completed/failed on Bus
9. Emit telemetryCore concepts
Actors
An actor is a GenServer that owns one or more expectations. Each actor watches a domain (e.g., :procurement, :client_health) and fires episodes when triggers match.
defmodule MyApp.Actors.ClientHealthActor do
use Cyclium.Actor
actor do
domain(:client_health)
spec_rev("v0.1.0")
max_concurrent_episodes(5)
episode_overflow(:queue)
expectation(:client_should_be_healthy,
strategy: MyApp.Strategies.ClientHealth,
trigger: {:event, "client.health_check_requested"},
subject_key: :client_id,
debounce_ms: :timer.seconds(3),
budget: %{max_turns: 3, max_tokens: 1_000, max_wall_ms: 10_000}
)
expectation(:contract_review,
strategy: MyApp.Strategies.ContractReview,
trigger: {:schedule, :timer.hours(24)},
recovery_policy: :restart,
budget: %{max_turns: 12, max_tokens: 25_000, max_wall_ms: 120_000}
)
end
endTrigger types:
{:event, "event.name"}— fires when a matching Bus event arrives{:schedule, interval_ms}— fires on a recurring timer:manual— fires on explicit request:workflow— fires as part of a multi-actor workflow- List — combine multiple triggers:
[{:event, "client.health_check_requested"}, :workflow]
List triggers allow an expectation to fire from multiple sources. Event subscriptions and schedule timers are extracted from the list automatically. This is the recommended pattern when an expectation participates in a workflow but should also be independently triggerable:
expectation(:client_should_be_healthy,
strategy: MyApp.Strategies.ClientHealth,
trigger: [{:event, "client.health_check_requested"}, :workflow],
subject_key: :client_id,
debounce_ms: :timer.seconds(3),
budget: %{max_turns: 3, max_tokens: 1_000, max_wall_ms: 10_000}
)
The actor subscribes to "client.health_check_requested" for standalone use (with debounce), while the :workflow marker documents that workflows can also invoke this expectation. Workflow-triggered episodes bypass the actor GenServer entirely — the workflow engine creates episodes directly — so actor-level debounce/cooldown only applies to event triggers.
Backpressure options (episode_overflow):
:queue— buffer excess episodes (default):drop— discard when at capacity:shed_oldest— cancel the oldest queued episode to make room
Expectation options:
| Option | Default | Description |
|---|---|---|
strategy | required | Strategy module for this expectation. Declared inline — Cyclium registers it when the actor boots |
trigger | required |
What fires the episode. Single trigger or list (e.g. [{:event, "..."}, :workflow]) |
synthesizer | nil |
Synthesizer module override for this expectation. Overrides the actor-level synthesizer(...) declaration |
filter | %{} | Payload predicates — only fire when all match |
debounce_ms | nil | Coalesce rapid events into one firing |
cooldown_ms | nil | Minimum gap between firings |
subject_key | nil |
Payload key to scope debounce/cooldown per subject (e.g., :client_id) |
budget | %{max_turns: 12, max_tokens: 25_000, max_wall_ms: 120_000} | Resource limits |
log_strategy | :timeline | Controls materialized log verbosity AND step journal detail (see below) |
outputs | [] | Declared output types (informational) |
resources | [] | Declared capability dependencies (informational) |
audit_level | :standard | Audit verbosity |
retention_days | 90 | How long to keep episode data. Set higher for audit-sensitive workflows (e.g., 365). Retention is declarative — enforcement requires a scheduled cleanup job (not yet built) |
sample_rate | nil |
Float 0.0–1.0. When set, episodes fire probabilistically. nil or 1.0 = always fire. Force-fire bypasses sampling |
circuit_breaker | nil |
Circuit breaker config: %{threshold: 5, half_open_after_ms: 60_000, cancel_in_flight: false}. See Circuit Breaker |
adaptive_budget | false |
When true, records episode resource usage for advisory budget recommendations |
service_levels | nil |
Performance objectives: %{max_duration_ms: n, success_rate: 0.95, window_episodes: 20}. See Service Level Tracking |
finding_enrichment | nil |
Post-raise enrichment callback: fn finding, episode -> {:ok, %{summary: ...}} end or {Mod, :fun} |
escalation_rules | nil |
Time-based severity escalation: %{"class" => [%{after_minutes: 60, escalate_to: :high}]} |
finding_ttl_seconds | nil |
Default TTL for findings raised by this expectation. Individual findings can override with explicit ttl_seconds |
Actor ID convention: Actor IDs are atoms in-process and strings in the database. Use identifier/1 in the DSL to set a stable actor ID that survives module renames:
actor do
identifier(:client_health) # explicit, rename-proof
domain(:health)
# ...
end
If omitted, the ID is derived from the module name (MyApp.Actors.ClientHealthActor → :client_health_actor). The boundary is at episode creation — Cyclium.Actor.Server calls to_string(state.actor_id) when building the episode params. Everything upstream is atoms, everything downstream (DB, strategies, findings) is strings. Actors with persisted episodes should always declare an explicit identifier.
Strategies
A strategy implements the investigation logic for an expectation. It's the brain of an episode — a stateless module that receives state and returns actions.
defmodule MyApp.Strategies.ClientHealth do
@behaviour Cyclium.EpisodeRunner.Strategy
@impl true
def init(_episode, trigger) do
client_id = trigger.payload["client_id"]
{:ok, %{client_id: client_id}}
end
@impl true
def next_step(state, _episode_ctx) do
:converge # go straight to classification
end
@impl true
def handle_result(state, _step, _result) do
{:ok, state}
end
@impl true
def converge(state, _episode_ctx) do
client = MyApp.Clients.get!(state.client_id)
{class, severity, summary} = classify(client)
{:ok, %Cyclium.ConvergeResult{
classification: %{"primary" => class, "severity" => to_string(severity)},
confidence: 1.0,
summary: summary,
findings: [
{:raise, %{
actor_id: "client_health_actor",
finding_key: "client:health:#{client.id}",
class: class,
severity: severity,
confidence: 1.0,
subject: %{kind: "client", id: client.id},
subject_kind: "client",
subject_id: client.id,
summary: summary,
evidence_refs: %{"active_users" => client.active_users}
}}
],
outputs: []
}}
end
endStrategy callbacks:
| Callback | Purpose |
|---|---|
init(episode, trigger) |
Initialize state from trigger data. Return {:ok, state} |
next_step(state, episode_ctx) | Decide the next action (see table below) |
handle_result(state, step, result) |
Process a step's outcome. Return {:ok, state}, {:retry, state}, or {:abort, reason} |
converge(state, episode_ctx) |
Produce findings, outputs, and classification. Return {:ok, ConvergeResult} |
workflow_result(state, converge_result) | (optional) Extract data to pass to downstream workflow steps |
next_step return values:
| Return | Effect |
|---|---|
:done | Episode complete (skip converge phase) |
:converge | Run the converge pipeline |
{:tool_call, capability, action, args} |
Call a registered tool capability, pass result to handle_result |
{:observe, data} |
Journal data as an observation step, then pass {:ok, data} to handle_result. This is a synchronous in-process action — no external system is called. Use it to feed data you've already gathered into the strategy's result-handling flow |
{:synthesize, prompt_ctx} |
Request LLM synthesis via app-provided Cyclium.Synthesizer. The synthesizer calls the LLM, and the response flows to handle_result |
{:checkpoint, phase_name} | Save strategy state for crash recovery |
{:output, type, payload} | Propose an output inline (outside converge) |
{:approval, request} | Block episode until human approval |
{:wait, external_ref} | Block episode until external event resolves |
handle_result step kinds:
The step argument passed to handle_result/3 is an %EpisodeStep{} struct. Pattern-match on kind to distinguish which action produced the result:
step.kind | Produced by | result on success | result on failure |
|---|---|---|---|
:tool_call | {:tool_call, capability, action, args} | {:ok, tool_return_value} | {:error, {error_class, detail}} |
:synthesis | {:synthesize, prompt_ctx} | {:ok, llm_response} | {:error, {error_class, detail}} |
:observation | {:observe, data} | {:ok, data} | (never fails — data is passed through as-is) |
The step struct also carries tool_name (for :tool_call steps) which is useful for per-tool retry tracking:
def handle_result(state, %{kind: :synthesis}, {:ok, result}), do: ...
def handle_result(state, %{kind: :tool_call, tool_name: "erp_read"}, {:ok, data}), do: ...
def handle_result(state, %{kind: :observation}, {:ok, data}), do: ...Multi-turn strategies
Strategies can run multiple turns before converging. next_step decides actions, handle_result absorbs outcomes — expensive work like LLM calls should be delegated to actions (:synthesize, :tool_call), not done inside handle_result.
Use a :phase field in state to drive progression. This is the recommended pattern — it makes flow explicit, prevents ambiguous matching in handle_result, and avoids accidental loops:
defmodule MyApp.Strategies.ClientAdvisor do
@behaviour Cyclium.EpisodeRunner.Strategy
@system_prompt "You are a customer success analyst. Assess the client's health."
@impl true
def init(_episode, trigger) do
# Load data in init — next_step should be pure routing, not queries
client_id = trigger.payload["client_id"]
client = MyApp.Clients.get!(client_id)
client_data = Map.take(client, [:name, :status, :mrr])
{:ok, %{phase: :synthesize, client_id: client_id, client_data: client_data, ai_summary: nil}}
end
# --- next_step: pure routing based on phase ---
@impl true
def next_step(%{phase: :synthesize} = state, _episode_ctx) do
{:synthesize, %{
system_prompt: @system_prompt,
user_message: "Client: #{state.client_data.name}, MRR: $#{state.client_data.mrr}"
}}
end
def next_step(%{phase: :done}, _episode_ctx), do: :converge
# --- handle_result: ALWAYS guard on phase ---
@impl true
def handle_result(%{phase: :synthesize} = state, _step, {:ok, result}) do
summary = if is_map(result), do: result[:text] || result["text"] || inspect(result), else: inspect(result)
{:ok, %{state | phase: :done, ai_summary: summary}}
end
def handle_result(%{phase: :synthesize} = state, %{kind: :synthesis}, {:error, {_class, _detail}}) do
# Transient failure — retry the same step (next_step will re-emit :synthesize)
{:retry, state}
end
def handle_result(_state, _step, {:error, reason}) do
{:abort, reason}
end
@impl true
def converge(state, _episode_ctx) do
{:ok, %Cyclium.ConvergeResult{
classification: %{"primary" => "ai_summary", "severity" => "low"},
confidence: 0.9,
summary: state.ai_summary,
findings: [
{:raise, %{
actor_id: "client_advisor_actor",
finding_key: "client:advisor:#{state.client_id}",
class: "ai_summary",
severity: :low,
confidence: 0.9,
subject_kind: "client",
subject_id: state.client_id,
summary: state.ai_summary
}}
],
outputs: []
}}
end
endKey points:
next_stepis pure decision-making — it returns what action to take, never does expensive work itselfhandle_resultabsorbs outcomes — it pattern-matches on step kind and result, then updates state{:retry, state}re-enters the loop with the same state, lettingnext_stepretry the action{:abort, reason}immediately fails the episode with the given reason-
The
:synthesizeaction delegates LLM calls to the app-providedCyclium.Synthesizer, keeping the strategy free of HTTP concerns
Multi-step patterns and pitfalls
Always guard handle_result on :phase. The most common multi-step bug is an unguarded handle_result clause that matches too broadly, causing the strategy to cycle between phases instead of progressing:
# BAD — matches ANY success result during any phase
def handle_result(state, _step, {:ok, result}) do
{:ok, %{state | phase: :done, ai_summary: inspect(result)}}
end
# GOOD — each clause is scoped to its phase
def handle_result(%{phase: :synthesize} = state, _step, {:ok, result}) do
{:ok, %{state | phase: :done, ai_summary: extract_text(result)}}
end
def handle_result(%{phase: :tool_result} = state, _step, {:ok, data}) do
{:ok, %{state | phase: :synthesize, gathered: data}}
end
Without phase guards, handle_result matches the wrong clause → phase doesn't advance → next_step re-emits the same action → loop. The budget will eventually kill it, but you'll burn turns for no reason.
Handle the no-synthesizer passthrough. When no Cyclium.Synthesizer is configured (or the registry returns nil for your actor), the runner passes prompt_ctx through as-is to handle_result with {:ok, prompt_ctx}. Your :synthesize phase handler must handle both the LLM response shape AND the raw passthrough:
def handle_result(%{phase: :synthesize} = state, _step, {:ok, result}) do
summary =
cond do
is_binary(result) -> result
is_map(result) && Map.has_key?(result, :text) -> result.text
is_map(result) && Map.has_key?(result, "text") -> result["text"]
true -> inspect(result) # passthrough — no synthesizer configured
end
{:ok, %{state | phase: :done, ai_summary: summary}}
endHandle both trigger types if your actor can be triggered by workflows. Workflow-created episodes use %Cyclium.Trigger.Workflow{input: ...}, not %Cyclium.Trigger.Event{payload: ...}:
def init(_episode, trigger) do
client_id =
case trigger do
%Cyclium.Trigger.Event{payload: %{client_id: id}} -> id
%Cyclium.Trigger.Event{payload: payload} -> payload["client_id"]
%Cyclium.Trigger.Workflow{input: %{client_id: id}} -> id
%Cyclium.Trigger.Workflow{input: input} when is_map(input) -> input["client_id"]
_ -> nil
end
{:ok, %{phase: :gather, client_id: client_id}}
endLoop detection
The episode runner automatically detects repeating step cycles. It fingerprints each next_step return value and watches for repeated patterns — a 1-step cycle (A, A, A), a 2-step cycle (A, B, A, B), or longer. When a cycle is detected, the episode fails immediately with error_class: "loop_detected".
This is a safety net, not a substitute for correct phase guards. If loop detection fires, it means your strategy has a bug — fix the root cause (usually a missing phase guard on handle_result).
Note that consecutive steps of the same kind but different data are fine — the fingerprint includes the full action payload via :erlang.phash2/1. A dispatch strategy that emits multiple :observe steps with different entity data won't trigger loop detection. Only identical actions repeated in a cycle will.
Episodes
An episode is one execution of a strategy. It tracks:
- Budget usage (turns, tokens, wall time)
-
Step journal (every action recorded as an
EpisodeStep) - Classification and summary (set during converge)
-
Status lifecycle:
:running→:done|:failed|:blocked|:canceled|:partially_failed
Episodes run as Tasks under a DynamicSupervisor — no Oban required. The cyclium_episodes table serves as a durable work queue.
Querying episodes:
Cyclium.Episodes.get!(episode_id)
Cyclium.Episodes.list_by_status([:running, :done, :failed])
Cyclium.Episodes.list_steps(episode_id) # step journal
Cyclium.Episodes.get_log(episode_id) # materialized log
Cyclium.Episodes.cancel(episode_id) # cancellation sequence
# List episodes by actor(s)
Cyclium.Episodes.list_by_actors(["my_actor"], limit: 20, order: :desc)
# Filter by subject — DB-level JSON filtering across trigger types
# Checks trigger_ref.payload.<key> (event-triggered) and
# trigger_ref.input.<key> (workflow-triggered) in a single query.
Cyclium.Episodes.list_by_actors_and_subject(
["client_health_actor", "client_advisor_actor"],
:client_id,
client.id,
limit: 20, order: :desc
)list_by_actors_and_subject/4 detects the repo adapter at runtime and uses the appropriate JSON text extraction — Postgres #>> or SQL Server JSON_VALUE. This is the recommended way to fetch episodes for a specific entity — avoids pulling all episodes and filtering in memory.
Findings
A finding is a persistent observation about an entity. Findings have a lifecycle:
- Raise — create or update an active finding (upsert by
finding_key) - Update — modify mutable fields on an active finding
- Clear — mark a finding as resolved (idempotent)
# In your converge/2 callback:
findings: [
{:raise, %{
actor_id: "client_health_actor",
finding_key: "client:health:123",
class: "churned",
severity: :high, # :low | :medium | :high | :critical
confidence: 1.0,
subject: %{kind: "client", id: "123"},
subject_kind: "client", # denormalized for SQL Server compat
subject_id: "123",
summary: "Client has churned",
evidence_refs: %{"status" => "churned"}
}},
{:update, "client:health:123", %{confidence: 0.8}},
{:clear, "client:health:123"},
{:clear, "client:health:123", "customer reactivated"}
]Finding key scoping — deduplicated vs. distinct:
The finding_key controls deduplication. An active finding with the same key is updated in place (last-writer-wins on mutable fields). Choose your key strategy based on intent:
- Deduplicated (default pattern): Use a stable key like
"client:health:123". Repeated episodes update the same active finding — ideal for ongoing status tracking where you want one finding per subject. - Distinct per episode: Include the episode ID in the key, e.g.
"po_review:PO-1955:#{episode.id}". Each episode creates a separate finding — useful for audit trails or point-in-time snapshots where every run should produce its own record.
The episode_ctx map passed to converge/2 contains episode_id, so you can reference it directly:
# Deduplicated: one active finding per client, updated each run
def converge(state, _episode_ctx) do
{:ok, %Cyclium.ConvergeResult{
findings: [{:raise, %{finding_key: "client:health:#{state.client_id}", ...}}]
}}
end
# Distinct: one finding per episode run
def converge(state, episode_ctx) do
{:ok, %Cyclium.ConvergeResult{
findings: [{:raise, %{finding_key: "po_review:#{state.po_id}:#{episode_ctx.episode_id}", ...}}]
}}
end
Findings are queried via Cyclium.Findings.active_for/1:
Cyclium.Findings.active_for(actor: "client_health_actor")
Cyclium.Findings.active_for(subject: %{kind: "client", id: "123"})
Cyclium.Findings.active_for(finding_key: "client:health:123")
Cyclium.Findings.active_for(class: "churned")
Cyclium.Findings.active_for(caused_by: "parent:finding:key")Causality chains: Findings can reference a parent finding via caused_by_key. This enables tracing root causes through chains of related findings:
# Raise a child finding linked to a parent
{:raise, %{finding_key: "vendor:delay:PO-123", caused_by_key: "vendor:health:acme", ...}}
# Query helpers
Cyclium.Findings.caused_by("parent:key") # direct children
Cyclium.Findings.causal_chain("child:key", 10) # walk chain upward (max depth)
Cyclium.Findings.root_cause("child:key") # find root (no caused_by_key)TTL / Expiration: Findings can auto-expire after a duration. Declare a default TTL on the expectation, or pass ttl_seconds / expires_at per finding:
# Default TTL for all findings raised by this expectation
expectation(:check_temp,
strategy: MyApp.Strategies.TempCheck,
trigger: {:event, "sensor.updated"},
finding_ttl_seconds: 3600
)
# Or override per finding in converge:
{:raise, %{finding_key: "temp:alert:123", ttl_seconds: 7200, ...}}
Expired findings are cleared and active findings are escalated by Cyclium.Findings.FindingSweep, an optional GenServer that runs on a configurable interval:
# config/config.exs
config :cyclium, :finding_sweep, true
config :cyclium, :finding_sweep_interval_ms, 300_000 # 5 minutes (default)
config :cyclium, :finding_sweep_batch_size, 100 # per sweep (default)Severity escalation: Time-based rules automatically escalate finding severity based on how long a finding has been active. Declare rules on the expectation:
expectation(:check_vendor,
strategy: MyApp.Strategies.VendorHealth,
trigger: {:event, "vendor.updated"},
escalation_rules: %{
"vendor_delay" => [
%{after_minutes: 60, escalate_to: :high},
%{after_minutes: 1440, escalate_to: :critical}
]
}
)
Escalation runs as part of the finding sweep cycle. Each sweep interval, every active finding matching a registered escalation pair (actor + expectation + classes) is loaded from the database and checked against the time-based rules. This means the sweep interval (finding_sweep_interval_ms) controls how often escalation is evaluated — rules with after_minutes granularity finer than the sweep interval won't fire any faster. For expectations with many active findings, keep this in mind when tuning the sweep interval and batch size. Application config (config :cyclium, :escalation_rules) is supported as a fallback.
Post-raise enrichment: An optional callback enriches findings immediately after they're raised. Declare it on the expectation:
expectation(:check_health,
strategy: MyApp.Strategies.ClientHealth,
trigger: {:event, "client.updated"},
finding_enrichment: fn finding, _episode ->
{:ok, %{summary: "Enriched: #{finding.summary}", confidence: 0.95}}
end
)
# Or use a module/function tuple:
expectation(:check_health,
strategy: MyApp.Strategies.ClientHealth,
trigger: {:event, "client.updated"},
finding_enrichment: {MyApp.FindingEnricher, :enrich}
)
The callback receives (finding, episode) and returns {:ok, %{...}} or :skip. Only safe fields are applied: evidence_refs, summary, confidence. Errors in the callback are logged — the finding persists unchanged. Application config (config :cyclium, :finding_enrichment) is supported as a fallback.
Outputs
Outputs are typed proposals produced during converge. They flow through the Output Router, which handles deduplication (via dedupe_key) and delivery through app-provided adapters.
# In converge result:
outputs: [
%Cyclium.OutputProposal{
type: :email,
dedupe_key: "alert:client:123:#{Cyclium.Window.bucket(:h4, DateTime.utc_now())}",
payload: %{to: "team@co.com", subject: "Client 123 churned"},
requires_approval: false
}
]Register adapters in config:
config :cyclium, :output_adapters, %{
email: MyApp.Adapters.Email,
slack: MyApp.Adapters.Slack
}
Adapters implement Cyclium.Output.Adapter:
defmodule MyApp.Adapters.Email do
@behaviour Cyclium.Output.Adapter
@impl true
def deliver(:email, payload, _ctx) do
case MyApp.Mailer.send(payload) do
:ok -> {:ok, %{message_id: "abc123"}}
{:error, reason} -> {:error, reason}
end
end
endThe adapter registry provides programmatic access:
Cyclium.Output.Adapter.resolve(:email) # => MyApp.Adapters.Email
Cyclium.Output.Adapter.resolve("slack") # => MyApp.Adapters.Slack
Cyclium.Output.Adapter.all() # => [:email, :slack]Bus
The event bus connects actors, LiveViews, and workflows without coupling. It wraps Phoenix.PubSub.
# Publish a domain event (from your app code):
Cyclium.Bus.broadcast("client.updated", %{client_id: "123"})
# Subscribe to all events (actors do this automatically):
Cyclium.Bus.subscribe()
# Subscribe to a specific event:
Cyclium.Bus.subscribe("episode.completed")
# In a LiveView or GenServer:
def handle_info({:bus, "episode.completed", payload}, socket) do
# payload contains: episode_id, actor_id, status, workflow_instance_id
endRuntime events emitted by Cyclium:
| Category | Events |
|---|---|
| Episode lifecycle | episode.completed, episode.failed, episode.canceled, episode.queued, episode.dropped |
| Expectations | expectation.triggered |
| Findings | finding.raised, finding.updated, finding.cleared |
| Outputs | output.delivered |
| Workflows | workflow.started, workflow.completed, workflow.failed |
| System | spec.updated |
Setup
1. Add dependency
# mix.exs
def deps do
[{:cyclium, path: "../cyclium_ex"}]
end
Dependencies pulled in: ecto, ecto_sql, jason, phoenix_pubsub.
2. Run migrations
# In a migration file:
def up do
Cyclium.Migrations.V1.up() # episodes, steps, checkpoints, findings, outputs
Cyclium.Migrations.V2.up() # episode_logs
Cyclium.Migrations.V3.up() # workflow_instances
Cyclium.Migrations.V4.up() # archived_at on episodes and findings
Cyclium.Migrations.V5.up() # unique index on episode dedupe_key
Cyclium.Migrations.V6.up() # work_claims table for lease-based coordination
# ...V7 through V13...
Cyclium.Migrations.V14.up() # trigger_requests table for deferred execution
# ...V15 through V18...
Cyclium.Migrations.V19.up() # SQL Server: convert legacy TEXT columns to nvarchar(max)
end
def down do
Cyclium.Migrations.V19.down()
Cyclium.Migrations.V14.down()
# ...V13 through V7...
Cyclium.Migrations.V6.down()
Cyclium.Migrations.V5.down()
Cyclium.Migrations.V4.down()
Cyclium.Migrations.V3.down()
Cyclium.Migrations.V2.down()
Cyclium.Migrations.V1.down()
endAuthoring migrations: do not use bare
:text. OnEcto.Adapters.Tdsit emits SQL Server's legacy non-UnicodeTEXTtype, which silently replaces emoji and other non-CP1252 characters with?. Use{:string, size: :max}(which becomesnvarchar(max)on Tds andTEXTon Postgres/SQLite), or branch onrepo().__adapter__()for finer control. V19 is the one-shot repair migration for columns that were already declared:text.
3. Configure
# config.exs
config :cyclium, :repo, MyApp.Repo
# Optional: registry for strategy/synthesizer overrides (see "Strategy registry" section)
# config :cyclium, :strategy_registry, MyApp.StrategyRegistry
# Optional: episode runner (default: Cyclium.Runner.OTP)
# Use Cyclium.Runner.Deferred for trigger-only mode (see "Trigger-Only Mode" section)
config :cyclium, :runner, Cyclium.Runner.OTP
# Optional: node identity override for shared-name environments (see "Node Identity")
# config :cyclium, :node_identity, "my-unique-node-name"
# Optional: tool capabilities
config :cyclium, :capability_registry, %{
erp_read: MyApp.Tools.ERP,
vendor_api: MyApp.Tools.VendorAPI
}
# Optional: output adapters
config :cyclium, :output_adapters, %{
email: MyApp.Adapters.Email,
slack: MyApp.Adapters.Slack
}
# Optional: checkpoint schemas for versioned state migration
config :cyclium, :checkpoint_schemas, %{
{"client_health_actor", "client_should_be_healthy"} => MyApp.Checkpoints.HealthCheck
}
# Optional: enable reconciler for hot spec changes
config :cyclium, :reconciler, true
# Optional: register workflows
config :cyclium, :workflows, [MyApp.Workflows.ClientReview]4. Declare strategies on expectations
The preferred approach is declaring the strategy module directly on each expectation in the actor DSL. Cyclium registers the mapping automatically when the actor GenServer boots — no separate registry needed:
defmodule MyApp.Actors.ClientHealthActor do
use Cyclium.Actor
actor do
domain(:client_health)
spec_rev("v0.1.0")
synthesizer(MyApp.Synthesizers.ClientHealth) # actor-level default
max_concurrent_episodes(5)
episode_overflow(:queue)
expectation(:client_should_be_healthy,
strategy: MyApp.Strategies.ClientHealth,
trigger: {:event, "client.health_check_requested"},
subject_key: :client_id,
debounce_ms: :timer.seconds(3),
budget: %{max_turns: 3, max_tokens: 1_000, max_wall_ms: 10_000}
)
expectation(:client_ai_summary,
strategy: MyApp.Strategies.ClientAdvisor,
synthesizer: MyApp.Synthesizers.FastSummary, # override for this expectation
trigger: {:event, "client.summary_requested"},
budget: %{max_turns: 5, max_tokens: 5_000, max_wall_ms: 60_000}
)
end
endOptional: strategy registry for overrides
If you need to override a strategy or synthesizer without changing the actor code — for example, in a staging environment or during an A/B test — you can configure a registry:
# config.exs (optional)
config :cyclium, :strategy_registry, MyApp.StrategyRegistry
defmodule MyApp.StrategyRegistry do
# Only add clauses for explicit overrides — anything not matched here
# falls through to the actor's declared strategy.
def strategy_for("client_health_actor", _exp), do: MyApp.Strategies.ClientHealthV2
end5. Supervision tree
# application.ex
children = [
MyApp.Repo,
{Phoenix.PubSub, name: MyApp.PubSub},
{Cyclium.Supervisor, pubsub: MyApp.PubSub},
MyApp.Actors.ClientHealthActor,
MyApp.Actors.ClientAdvisorActor,
MyAppWeb.Endpoint
]Cyclium.Supervisor starts the DynamicSupervisors, TaskSupervisor, and optionally the Reconciler and WorkflowEngine.
Budgets
Every expectation declares a budget. The runner enforces all three dimensions:
budget: %{
max_turns: 12, # loop iterations (incremented every next_step call)
max_tokens: 25_000, # LLM token cost (incremented by tool_call results)
max_wall_ms: 120_000 # wall-clock deadline (enforced via Process.send_after)
}
When any limit is hit, the episode fails with error_class: "budget_exceeded". Wall time is enforced asynchronously — a :budget_wall_exceeded message interrupts the loop even if the strategy is blocked on a tool call.
Deduplication: actor-level vs. strategy-level
Cyclium provides three layers of temporal dedup:
Actor-level global (cooldown_ms) — enforced by the actor GenServer before an episode starts. Simple and zero-cost, but applies globally to the expectation — all subjects are blocked during the window.
expectation(:client_ai_summary,
strategy: MyApp.Strategies.ClientAdvisor,
trigger: {:event, "client.summary_requested"},
cooldown_ms: :timer.minutes(5) # no advisor episodes for ANY client for 5 min
)Actor-level per-subject (subject_key + debounce_ms/cooldown_ms) — when subject_key is set, debounce and cooldown are scoped per subject value. Each unique subject gets its own independent trailing-edge timer and cooldown window. Client A and client B are debounced independently.
expectation(:client_ai_summary,
strategy: MyApp.Strategies.ClientAdvisor,
trigger: {:event, "client.summary_requested"},
subject_key: :client_id,
debounce_ms: :timer.seconds(10), # trailing-edge, per-client
cooldown_ms: :timer.minutes(5) # minimum gap, per-client
)
When subject_key is set but the payload doesn't contain that key, the subject value is nil and the key becomes {expectation_id, nil} — still isolated from real subjects, never crashing.
Strategy-level (Findings.recent?/2) — checked in init/2 using the existing finding for a specific subject. This is DB-backed, so it survives actor restarts unlike the in-memory actor-level dedup.
@summary_cooldown_ms :timer.minutes(5)
def init(_episode, trigger) do
client_id = trigger.payload["client_id"]
skip = Cyclium.Findings.recent?("client:advisor:#{client_id}", @summary_cooldown_ms)
{:ok, %{client_id: client_id, skip: skip}}
end
def next_step(%{skip: true}, _ctx), do: :doneWhen to use which:
| Scenario | Use |
|---|---|
| Rate-limit a high-frequency trigger globally | cooldown_ms on the expectation |
| Coalesce rapid events per subject before firing | subject_key + debounce_ms |
| Minimum gap between runs per subject | subject_key + cooldown_ms |
| Dedup that survives actor restarts (DB-backed) | Findings.recent?/2 in init/2 |
| Belt-and-suspenders | subject_key + debounce_ms as fast path, Findings.recent?/2 for persistence across restarts |
Workflows
Workflows coordinate multiple actors in a dependency graph. Data flows between steps via the workflow_result/2 strategy callback and input: functions on downstream steps.
Defining a workflow
defmodule MyApp.Workflows.ClientReview do
use Cyclium.Workflow
workflow do
trigger {:event, "client.updated"}
debounce_ms :timer.seconds(3)
subject_key :client_id
step :health_check,
actor: :client_health_actor,
expectation: :client_should_be_healthy
step :ai_summary,
actor: :client_advisor_actor,
expectation: :client_ai_summary,
depends_on: [:health_check],
input: fn _trigger, prior ->
# prior[:health_check] contains the map returned by
# the health strategy's workflow_result/2 callback
%{client_id: prior[:health_check].client_id}
end
on_failure :health_check, policy: :retry, max_step_attempts: 3, backoff_ms: 5_000
on_failure :ai_summary, policy: :abort
end
endWorkflow debounce:debounce_ms and subject_key coalesce rapid events before starting the workflow. When subject_key is set, each unique subject value gets its own debounce window — client A and client B debounce independently. Without subject_key, all events for the workflow share a single timer. Each new event resets the debounce window (trailing-edge). This is useful for workflows that trigger on high-frequency events like "entity.updated".
Episode reuse (cross-workflow dedup): By default, workflows reuse recent completed episodes when two workflows trigger the same actor + expectation + input within a 5-minute window. To disable this:
workflow do
disable_episode_reuse
trigger {:event, "client.review_requested"}
step :health_check, actor: :client_health_actor, expectation: :client_should_be_healthy
endCancellation cascade: When a workflow fails, pending and retrying steps are automatically canceled. To also clear active findings raised by the workflow's episodes, set clear_findings_on_cancel in the workflow instance metadata.
Passing data between steps
When a workflow step completes, the engine calls the strategy's optional workflow_result/2 callback to extract the data that downstream steps receive via prior. If workflow_result/2 is not implemented, downstream steps receive nil for that step's prior.
defmodule MyApp.Strategies.ClientHealth do
@behaviour Cyclium.EpisodeRunner.Strategy
# ... init, next_step, handle_result, converge as usual ...
# Optional: extract data for downstream workflow steps
@impl true
def workflow_result(state, _converge_result) do
# This map becomes prior[:health_check] in downstream input functions
%{client_id: state.client_id, classification: state.classification}
end
endConfiguration and usage
Register workflows in config:
config :cyclium, :workflows, [MyApp.Workflows.ClientReview]
The WorkflowEngine GenServer:
- Listens for trigger events on the Bus
-
Creates a
WorkflowInstancerecord to track execution - Fires steps in dependency order (DAG validated at compile time)
-
Passes data between steps via
workflow_result/2→inputfunctions -
Applies failure policies per-step:
:abort(cancel all),:retry(with backoff),:pause(wait for manual intervention)
Workflows can also be started manually:
Cyclium.WorkflowEngine.start_workflow(
MyApp.Workflows.ClientReview,
%{client_id: "123"},
[]
)Dynamic workflows
Dynamic workflows can be defined in the database and registered at runtime — no compiled modules required. They follow the same step-dependency model as compiled workflows but use declarative input mappings instead of Elixir functions.
Defining a workflow in the database
Insert a row into cyclium_workflow_definitions:
%Cyclium.Schemas.WorkflowDefinition{
workflow_id: "vendor_onboarding",
trigger_type: "event",
trigger_event: "vendor.registration_submitted",
steps: Jason.encode!([
%{
"id" => "compliance_check",
"actor_id" => "compliance_monitor",
"expectation" => "vendor_risk",
"depends_on" => [],
"input_map" => %{"vendor_id" => "trigger.vendor_id"},
"failure_policy" => "abort"
},
%{
"id" => "connector_setup",
"actor_id" => "integration_actor",
"expectation" => "setup_connector",
"depends_on" => ["compliance_check"],
"input_map" => %{
"vendor_id" => "trigger.vendor_id",
"risk_level" => "prior.compliance_check.classification.primary"
},
"failure_policy" => "retry",
"max_step_attempts" => 2,
"backoff_ms" => 30000
}
]),
enabled: true
}Input mapping syntax
Dynamic workflows use dot-notation paths instead of Elixir functions:
| Path | Resolves to |
|---|---|
"trigger.order_id" | trigger_ref["order_id"] |
"prior.validate.classification.primary" | prior[:validate][:classification]["primary"] |
"fast" (no prefix) |
Static value "fast" |
Loading dynamic workflows
# At startup — loads all enabled definitions
Cyclium.DynamicWorkflow.Loader.load_all()
# Load a single workflow
Cyclium.DynamicWorkflow.Loader.load("vendor_onboarding")
# Reload after updating the definition in DB
Cyclium.DynamicWorkflow.Loader.reload("vendor_onboarding")
# Unregister a workflow
Cyclium.DynamicWorkflow.Loader.unload("vendor_onboarding")Starting dynamic workflows manually
Cyclium.WorkflowEngine.start_dynamic_workflow(
"vendor_onboarding",
%{"vendor_id" => "v-123"}
)
Dynamic workflows are event-triggered (via Bus) like compiled workflows. The Watcher also listens for workflow_definition.created/updated/disabled events for automatic refresh.
Circuit breaker
Per-expectation circuit breaker prevents cascading failures when a tool or external service is down. When consecutive episode failures exceed a threshold, the circuit opens and rejects new episodes. After a cooldown period, one probe episode is allowed through (half-open state) — if it succeeds, the circuit closes.
expectation(:check_vendor_api,
strategy: MyApp.Strategies.VendorCheck,
trigger: {:event, "vendor.updated"},
circuit_breaker: %{
threshold: 5, # consecutive failures to trip
half_open_after_ms: 60_000, # cooldown before probe
cancel_in_flight: false # cancel running episodes when circuit trips
}
)States::closed (normal) → :open (rejecting) → :half_open (probe) → :closed
In-flight cancellation: When cancel_in_flight: true, tripping the circuit also cancels any running or blocked episodes for that actor + expectation, preventing wasted work against a known-broken dependency.
Scope: Circuit breakers are node-local (ETS-backed). In a cluster, each node tracks failures independently. This is intentional — a service might be unreachable from one node but fine from another. For cluster-wide coordination, combine with the Bus (circuit breaker events are broadcast).
Force-fired episodes bypass the circuit breaker check.
Query state: Cyclium.CircuitBreaker.get_state(actor_id, expectation_id)
Episode sampling
Probabilistic episode firing for high-frequency triggers. Set sample_rate on an expectation to control what fraction of triggers actually fire episodes:
expectation(:health_check,
strategy: MyApp.Strategies.MetricsCheck,
trigger: {:event, "metrics.updated"},
sample_rate: 0.1 # fire ~10% of triggers
)nilor1.0= always fire (default)0.0= never fire-
Sampled-out episodes emit
[:cyclium, :episode, :sampled_out]telemetry - Force-fired episodes bypass sampling
Service level tracking
Declarative performance objectives with automatic breach detection. Define success rate and duration thresholds per expectation:
expectation(:process_order,
strategy: MyApp.Strategies.OrderProcessor,
trigger: {:event, "order.created"},
service_levels: %{
max_duration_ms: 30_000, # p95 target
success_rate: 0.95, # 95% success target
window_episodes: 20 # rolling window size
}
)
Breaches emit [:cyclium, :service_levels, :breach] telemetry and a "service_levels.breach" Bus event with details:
%{type: :success_rate, current: 0.85, threshold: 0.95}
%{type: :duration, current: 45_000, threshold: 30_000}
Query metrics: Cyclium.ServiceLevels.metrics(actor_id, expectation_id) returns %{success_rate: f, p95_duration_ms: n, sample_count: n}.
Adaptive budgets
Advisory budget tracking based on historical episode resource usage. When enabled, Cyclium records turns, tokens, and wall time for each completed episode and recommends budgets based on p95 values with 25% headroom.
expectation(:classify_ticket,
strategy: MyApp.Strategies.TicketClassifier,
trigger: {:event, "ticket.created"},
adaptive_budget: true
)Query recommendations:
# After enough samples (minimum 5):
Cyclium.AdaptiveBudget.recommend(actor_id, expectation_id)
# => %{max_turns: 8, max_tokens: 15_000, max_wall_ms: 25_000}
# Detailed stats:
Cyclium.AdaptiveBudget.stats(actor_id, expectation_id)
# => %{samples: 47, p50: %{...}, p95: %{...}, max: %{...}}Adaptive budgets are advisory only — they do not automatically adjust episode budgets. Use the recommendations to tune your expectation configs over time.
Logging and observability
Log strategies
Set per-expectation via log_strategy. Controls both what gets stored in step journal columns (args_redacted, result_ref) and what the materialized log renders:
| Strategy |
Step journal args_redacted |
Step journal result_ref | Materialized log |
|---|---|---|---|
:none | omitted | omitted | none |
:summary_only | omitted | omitted | one-line status summary |
:timeline | tool name + action only | summary/IDs only | step-by-step with timestamps |
:full_debug | surviving context after earlier layers | full result payload | timeline + args, results, errors |
Use :full_debug for audit-sensitive workflows where you need to reconstruct exactly what context an LLM had (EOX predictions, SKU classifications). Use :timeline for high-frequency episodes where you want the flow visible without storing full payloads.
Important:log_strategy is the last layer in a pipeline. By the time it runs, earlier layers have already trimmed the data. "Full" in :full_debug means "everything that survived the earlier layers", not necessarily everything the strategy originally passed. See the pipeline below.
The materialized log is built by LogProjector reading back from the already-stored step rows — it never re-processes the original payload. Whatever log_strategy stored is exactly what appears in the log.
Storage pipeline for synthesis steps
Every synthesis step passes through three filtering layers in order before anything reaches the database:
synthesis payload from next_step
│
▼
1. __transient__ stripping — keys listed in :__transient__ are removed from storage
│ but the synthesizer receives the full payload
▼
2. tool redact callbacks — redact/1 and redact_result/1 on tool steps
│ (not applicable to synthesis, but runs for tool_call steps)
▼
3. log_strategy filtering — controls final shape of args_redacted / result_ref
│
▼
cyclium_episode_steps (args_redacted, result_ref)
│
▼
cyclium_episode_logs (rendered by LogProjector from stored step rows)Each layer has a different scope:
| Layer | Controlled by | Purpose |
|---|---|---|
__transient__ | Strategy (per synthesis call) | Pass bulk data to LLM without persisting it |
redact/1, redact_result/1 | Tool module | Trim domain-specific bulky fields from tool steps |
log_strategy | Expectation | Set overall verbosity for the whole episode |
Transient synthesis data
Sometimes a strategy needs to pass large data to the synthesizer (full record lists, raw API payloads) that the LLM needs for context but that you don't want persisted in the step journal or materialized log. Mark those keys under :__transient__ in the synthesis payload:
def next_step(state, _ctx) do
orders = load_orders(state.project_id)
{:synthesize, %{
project_id: state.project_id,
project_name: state.project_name,
order_count: length(orders), # small scalar — kept in storage
orders: serialize_orders(orders), # full list — synthesizer needs it, storage does not
evidence: build_evidence(orders), # small structured summary — kept in storage
__transient__: [:orders] # strip :orders before writing args_redacted
}}
end
The runner passes the full map (minus :__transient__ itself) to synthesizer.synthesize/2, then drops the listed keys before handing off to log_strategy filtering. The synthesizer receives orders; the stored step and rendered log do not, regardless of log_strategy.
This is the right tool when:
- You need full detail for synthesis quality (long order lists, raw API responses, document text)
- You don't want that data in the audit trail or materialized log
- You still want other context fields (counts, summaries, IDs) persisted for debugging
It is not a substitute for log_strategy — if you want no storage at all for an episode type, use :none or :summary_only. __transient__ is surgical; log_strategy is wholesale.
Materialized logs are stored in cyclium_episode_logs by Cyclium.LogProjector and can be queried via Cyclium.Episodes.get_log(episode_id).
Telemetry
Cyclium emits 36 structured telemetry events under the [:cyclium, ...] prefix. Attach a handler for development:
Cyclium.Telemetry.attach_default_logger()Key events:
| Event | Metadata |
|---|---|
[:cyclium, :episode, :completed] | episode_id, actor_id, output_count, finding_count |
[:cyclium, :episode, :failed] | episode_id, actor_id |
[:cyclium, :episode, :sampled_out] | actor_id, expectation_id |
[:cyclium, :step, :tool_call] | tool, action, episode_id |
[:cyclium, :step, :synthesis] | episode_id |
[:cyclium, :finding, :raised] | finding_key, actor_id, class |
[:cyclium, :finding, :cleared] | finding_key, actor_id, class |
[:cyclium, :finding, :expired] | count |
[:cyclium, :finding, :escalated] | finding_key, actor_id, class |
[:cyclium, :finding_sweep, :completed] | duration_ms, expired_count, escalated_count, node |
[:cyclium, :finding_sweep, :failed] | duration_ms, node, reason |
[:cyclium, :output, :delivered] | type, episode_id |
[:cyclium, :actor, :event_received] | actor_id, event_type |
[:cyclium, :actor, :overflow] | actor_id, policy |
[:cyclium, :circuit_breaker, :opened] | actor_id, expectation_id, consecutive_failures |
[:cyclium, :circuit_breaker, :closed] | actor_id, expectation_id |
[:cyclium, :circuit_breaker, :rejected] | actor_id, expectation_id |
[:cyclium, :service_levels, :breach] | actor_id, expectation_id, type, current, threshold |
[:cyclium, :workflow, :step_reused] | workflow_id, instance_id, step_id, reused_episode_id |
Full list: Cyclium.Telemetry.events/0
Step journal
Every episode action is recorded as an EpisodeStep with one of 16 kinds:
tool_call, synthesis, observation, checkpoint, output_proposed, output_delivered, output_failed, approval_requested, approval_resolved, wait_started, wait_resolved, finding_raised, finding_updated, finding_cleared, episode_completed, episode_failed
Each step records: step_no, kind, tool_name, args_redacted, result_ref, error_class, error_detail, cost_tokens, cost_ms, created_at.
Query steps: Cyclium.Episodes.list_steps(episode_id)
Checkpointing
Strategies can save state mid-episode for crash recovery. Return {:checkpoint, phase_name} from next_step/2 to persist the current state:
def next_step(state, _ctx) do
if state.phase == :data_collected do
{:checkpoint, "data_collected"}
else
{:tool_call, :erp_read, :read_po, %{"po_id" => state.po_id}}
end
end
The checkpoint saves the full strategy state map to the cyclium_episode_checkpoints table. On resume, EpisodeTask loads the latest checkpoint by checkpoint_no and passes it to the strategy — execution continues from where it left off.
Checkpoint schema versioning
If your strategy's state shape changes between deploys, register a checkpoint schema to migrate old checkpoints forward:
defmodule MyApp.Checkpoints.HealthCheck do
use Cyclium.CheckpointSchema, version: 2
# Migrate from version 1 -> 2
def migrate(1, state), do: {:ok, Map.put(state, :new_field, nil)}
def migrate(2, state), do: {:ok, state}
endRegister in config:
config :cyclium, :checkpoint_schemas, %{
{"client_health_actor", "client_should_be_healthy"} => MyApp.Checkpoints.HealthCheck
}
If migration fails, EpisodeTask falls back to a fresh strategy.init/2 — the episode restarts from scratch.
When to checkpoint vs. restart
Use checkpoints when:
- The strategy accumulates state across many turns that would be expensive to recompute (multi-turn LLM conversations, progressive data aggregation)
- Steps have non-idempotent side effects that shouldn't be repeated
Use restart (no checkpoint) when:
- The strategy re-queries all data from the DB each turn (most monitoring/health strategies)
- Side effects are idempotent (findings upsert by key, outputs are deduplicated)
- Episodes are short (< a few minutes)
Most strategies don't need checkpoints — recovery_policy: :restart on the expectation handles recovery by re-running the episode from scratch. See the Recovery section below.
Recovery
Cyclium provides built-in recovery for orphaned episodes after server restarts or deploys.
The problem
When a node shuts down (deploy, crash, scaling event), in-flight episodes are killed and left as :running in the database. Without recovery, these episodes stay orphaned forever.
Distributed episode claiming
In a multi-node cluster, all nodes run the same actors and receive the same Bus events via PG2. Without coordination, every node would independently create and run an episode for every trigger — tripling (or more) the work. Cyclium uses DB-based coordination with no Redis or leader election required.
Episode creation: dedupe_key
When a trigger fires, every node's actor calls maybe_fire_episode. Before inserting, the actor generates a deterministic dedupe_key:
- Schedule triggers:
"schedule:{actor_id}:{expectation_id}:{date}"— one episode per schedule window - Event triggers:
"event:{actor_id}:{expectation_id}:{payload_hash}"— one episode per distinct event payload
A filtered unique index on dedupe_key (WHERE dedupe_key IS NOT NULL AND archived_at IS NULL) ensures only one node's insert succeeds. The other nodes receive a constraint violation and silently skip — no episode created, no work duplicated. Archived episodes are excluded from the constraint so that re-triggered work isn't blocked by old runs.
A random jitter (0-200ms) before insert spreads winners evenly across nodes, preventing one faster node from consistently claiming all episodes:
# In Actor.Server.maybe_fire_episode/3
Process.sleep(:rand.uniform(200))
enqueue_episode(state, episode_params)
The constraint violation is caught in enqueue_episode using the same pattern as the Output router:
{:error, %Ecto.Changeset{} = cs} ->
if has_dedupe_violation?(cs) do
Logger.debug("[#{state.actor_id}] Dedupe skip: #{params.dedupe_key}")
end
stateRecovery claims: optimistic update
After a restart, all surviving nodes run Cyclium.Recovery.sweep/1. Each node sees the same list of stale episodes, but only one node can claim each episode:
# Episodes.claim_for_recovery/1
from(e in Episode,
where: e.id == ^episode_id and e.status == :running and is_nil(e.archived_at)
)
|> repo().update_all(set: [phase: "recovering"])
This is an atomic UPDATE ... WHERE — the first node to execute it sets phase: "recovering" and gets {1, _} back. All other nodes get {0, _} (no rows affected) and skip. No locks, no races, no distributed coordination protocol needed.
Summary of coordination guarantees
| Scenario | Mechanism | Guarantee |
|---|---|---|
| Same trigger on N nodes | dedupe_key unique index | Exactly one episode created |
| Same orphan on N nodes |
Optimistic UPDATE ... WHERE | Exactly one node recovers it |
| Archived episodes |
Filtered unique index excludes archived_at IS NOT NULL | Re-triggering is not blocked by old runs |
| Node with lower latency | Random jitter (0-200ms) | Winners distributed evenly over time |
Recovery sweep
Cyclium.Recovery.sweep/1 finds stale :running episodes and recovers them:
-
Query episodes where the most recent step journal entry is older than
stale_after_ms(default: 2 minutes) - Attempt optimistic claim on each stale episode
-
If claimed, apply the expectation's
recovery_policy::restart— enqueue fresh (re-runsstrategy.init/2):fail(default) — mark:failedwitherror_class: "orphaned"
-
Emit
[:cyclium, :recovery, :sweep]telemetry with counts
Policy resolution checks the compiled :actor_registry map first, then falls back to the cyclium_agent_definitions table for dynamic actors. Unknown actors default to :fail.
Workflow reconciliation
Cyclium.Recovery.reconcile_workflows/0 handles a related problem: workflow instances stuck in :running because the WorkflowEngine missed a Bus event during a restart.
The WorkflowEngine is purely event-driven — it advances workflows when it receives episode.completed or episode.failed Bus events. If the engine wasn't running when those events fired (e.g. during a deploy), the workflow step_states become stale and the workflow hangs.
Reconciliation fixes this by:
-
Finding all
:running/:blockedworkflow instances -
For each step marked
"running"in step_states, loading the actual episode - If the episode has already reached a terminal state, re-broadcasting the appropriate Bus event
- The WorkflowEngine handles the replayed event through its normal path — no special logic needed
Call reconcile_workflows/0aftersweep/1 and after workflow configs are registered (compiled modules booted, dynamic workflows loaded).
Setting recovery policy
Add recovery_policy to the expectation DSL:
actor do
expectation(:evaluate_project,
strategy: MyApp.Strategies.ProjectHealth,
trigger: {:event, "project_health.check_requested"},
recovery_policy: :restart,
budget: %{max_turns: 5, max_tokens: 25_000, max_wall_ms: 120_000}
)
end
Use :restart for idempotent strategies that re-query data from the DB. Use :fail (default) for strategies with non-idempotent side effects where automatic recovery could cause harm.
Wiring up recovery in your app
Add a delayed recovery task to your Cyclium supervisor with an :actor_registry map that maps actor identifier() strings to their modules. Cyclium looks up the recovery_policy from each actor's compiled expectations automatically. Dynamic actors not in the registry are resolved from the DB.
# Maps identifier (as DB string) → actor module for recovery sweep.
# Must match the identifier() declared in each actor's DSL block.
@actor_registry %{
"project_health_actor" => MyApp.Actors.ProjectHealthActor,
"client_health_actor" => MyApp.Actors.ClientHealthActor
}
children = [
{Cyclium.Supervisor, pubsub: MyApp.PubSub},
MyApp.Actors.ProjectHealthActor,
MyApp.Actors.ClientHealthActor,
{Task, fn ->
# Wait for cluster to settle after deploy
Process.sleep(:timer.minutes(2))
Cyclium.Recovery.sweep(actor_registry: @actor_registry)
Cyclium.Recovery.reconcile_workflows()
end}
]
For custom policy logic, pass :resolve_policy instead:
Cyclium.Recovery.sweep(
resolve_policy: fn episode ->
if episode.actor_id == "critical_actor", do: :restart, else: :fail
end
)Deploy sequence
- Node gets SIGTERM -> 30s graceful shutdown timeout
- Episodes trap exits, try to finish current step within remaining time
-
Episodes that don't finish stay
:runningin DB - After boot, each node waits 2 minutes before running recovery sweep
- Sweep finds stale episodes via step journal recency
- First node to claim each orphan via optimistic update handles recovery
- Workflow reconciliation replays missed Bus events for stale workflow steps
Step Retry Helper
Cyclium.Strategy.Retry provides a lightweight helper for retrying failed steps within an episode. This is distinct from workflow-level retry (on_failure :step, :retry) which retries entire episodes — the step retry helper retries individual steps (e.g. a synthesis call) within a single episode run.
The problem
When a strategy calls :synthesize and the LLM provider returns a transient error (timeout, rate limit, 503), the strategy needs to retry. Without a helper, you'd manually track attempt counts in the state map:
# Without helper — manual retry tracking
def handle_result(state, %{kind: :synthesis}, {:error, _}) do
if state[:synthesis_retries] < 3 do
{:retry, Map.update(state, :synthesis_retries, 1, &(&1 + 1))}
else
{:abort, "synthesis_failed"}
end
endThis is error-prone (forgetting to reset counters, tracking multiple step types, off-by-one errors).
Using Cyclium.Strategy.Retry
alias Cyclium.Strategy.Retry
# On success — reset the counter so future failures get fresh attempts
def handle_result(state, %{kind: :synthesis}, {:ok, result}) do
{:ok, state |> Retry.reset(:synthesis) |> Map.put(:assessment, result)}
end
# On failure — retry up to 3 times with 2-second backoff
def handle_result(state, %{kind: :synthesis} = step, {:error, _}) do
case Retry.check(state, step, max_attempts: 3, backoff_ms: 2_000) do
{:retry, new_state} -> {:retry, new_state}
{:give_up, _attempts, new_state} -> {:abort, "synthesis_failed_after_retries"}
end
end
When handle_result returns {:retry, state}, the runner calls do_loop → next_step again. The strategy should naturally re-emit the same step type (e.g. :synthesize) since its phase/state hasn't changed — only the internal __retries counter was updated.
Options
| Option | Default | Description |
|---|---|---|
:max_attempts | 3 | Total attempts including the original |
:backoff_ms | 0 | Milliseconds to sleep before retry |
:step_key | step.kind | Key for tracking — use custom keys to track retries per tool name or phase |
Custom step keys
Track retries separately for different tool calls within the same episode:
def handle_result(state, %{kind: :tool_call, tool_name: tool} = step, {:error, _}) do
case Retry.check(state, step, step_key: {:tool, tool}, max_attempts: 2) do
{:retry, new_state} -> {:retry, new_state}
{:give_up, _attempts, new_state} -> {:ok, %{new_state | phase: :skip_tool}}
end
endAPI reference
Retry.check(state, step, opts)— returns{:retry, state}or{:give_up, count, state}Retry.reset(state, key)— clears the counter for one key (call on success)Retry.reset_all(state)— clears all retry tracking
Retry layers summary
| Layer | Scope | Mechanism | Backoff | Limit |
|---|---|---|---|---|
Step retry (Strategy.Retry) | Within one episode | handle_result returns {:retry, state} |
Optional (backoff_ms) | max_attempts per step key |
| Episode budget | Within one episode |
Runner checks max_turns, max_tokens, max_wall_ms | N/A | Budget exhaustion |
Workflow retry (on_failure) | Across episodes | WorkflowEngine creates new episode | backoff_ms (default 5s) | max_step_attempts (default 3) |
| Crash recovery | After restart | Recovery.sweep re-enqueues or fails | N/A |
One attempt per recovery_policy |
Work Claims (Distributed Lease Coordination)
For clusters where multiple applications share the same database and actor definitions, work claims provide lease-based coordination to ensure at-most-once execution.
How it works
-
Before executing an episode,
EpisodeTaskcallsWorkClaims.gate_acquire/3with the episode'sdedupe_key -
If claimed successfully, a
HeartbeatGenServer renews the lease periodically (every lease/3 seconds) -
On completion, the claim is marked
:done; on crash,:failed - If a node dies, the lease expires and another node can steal it
Work claims also coordinate trigger request dispatch — TriggerRequests.Poller acquires a claim per request before dispatching to prevent multiple full-mode nodes from processing the same deferred episode.
Configuration
# Use the built-in Ecto-based implementation:
config :cyclium, work_claims: Cyclium.WorkClaims.EctoClaims
# Or a SQL Server-optimized adapter in consuming apps:
config :cyclium, work_claims: MyApp.WorkClaims.SqlServer
# Lease duration (default: 120 seconds)
config :cyclium, work_claims_lease_seconds: 180
# Or omit work_claims entirely — no claiming, fully backwards compatible
When unconfigured, all gate_* functions return passthrough values with zero overhead.
Writing a custom adapter
Implement the Cyclium.WorkClaims behaviour with 5 callbacks:
defmodule MyApp.WorkClaims.SqlServer do
@behaviour Cyclium.WorkClaims
@impl true
def acquire(dedupe_key, owner_node, opts) do
# Use hints: ["UPDLOCK"] for SQL Server lock acquisition
# Transaction-based: read with lock, then insert or update
end
@impl true
def renew(dedupe_key, owner_node, lease_seconds), do: # ...
def complete(dedupe_key, owner_node), do: # ...
def fail(dedupe_key, owner_node, error_detail), do: # ...
def reclaim_expired(limit), do: # ...
end
The default EctoClaims implementation uses plain transactions (no lock hints) and works with any Ecto adapter. For SQL Server, a custom adapter can use hints: ["UPDLOCK"] on the read query inside the transaction for stronger concurrency guarantees.
Database table
The cyclium_work_claims table is created by V6 migration:
| Column | Type | Notes |
|---|---|---|
dedupe_key | string(512) | Unique — matches the episode's dedupe_key |
state | string(32) | claimed, done, failed, expired |
owner_node | string(255) | Node holding the lease |
lease_until | utc_datetime | When the lease expires |
attempt | integer | Incremented on each steal/reclaim |
Integration with recovery
When work claims are configured, Recovery.sweep/1 uses gate_acquire to coordinate across nodes before claiming orphaned episodes. This provides two layers of coordination: the work claim lease prevents concurrent execution, and the optimistic claim_for_recovery update prevents duplicate recovery actions.
Lease tuning
The lease duration (work_claims_lease_seconds, default: 120s) controls the trade-off between availability and the "zombie window" — the time between a node crash and another node stealing the work.
| Setting | Zombie window | Heartbeat interval | Good for |
|---|---|---|---|
| 60s | ~60s | ~20s | Short tasks, fast failover |
| 120s (default) | ~120s | ~40s | Most workloads |
| 300s | ~5min | ~100s | Long-running tasks, flaky networks |
Guidelines:
-
Heartbeat fires at
lease / 3— set the lease to at least 3x your worst-case DB round-trip time - If your episodes typically run for minutes, 120s is fine — the heartbeat keeps the lease alive indefinitely
- If network partitions last >30s regularly, increase the lease to avoid false steals
- After a steal, the new node restarts the episode fresh (strategies should be idempotent)
Heartbeat failure modes
The heartbeat GenServer is linked to the EpisodeTask process:
- Heartbeat crashes — The EpisodeTask traps the EXIT and can restart the heartbeat. The lease has margin (only 1/3 expired per interval), so a brief restart is safe.
- EpisodeTask crashes — The heartbeat dies with it. The rescue block marks the claim as
:failed. If it doesn't (hard kill), the lease expires naturally and another node can steal it. - DB becomes unreachable — Heartbeat renewal fails, lease expires. When DB comes back, another node may steal. This is by design — if you can't reach the DB, you can't guarantee exclusive access.
- Lost ownership — If
gate_renewreturns{:error, :not_owner}(another node stole the claim), the heartbeat stops itself.
Idempotency guidance: Since lease expiry can cause a second node to start the same work, strategies that perform side effects should use idempotency keys. For DB writes, use unique constraints keyed by the episode's dedupe_key or step number. For external API calls, include an idempotency header derived from the episode ID + step number.
Telemetry events
All events are prefixed with [:cyclium, :work_claims, ...]:
| Event | Measurements | Metadata | Meaning |
|---|---|---|---|
:acquired | count, duration_ms | dedupe_key, owner_node | Fresh claim acquired |
:steal | count, duration_ms | dedupe_key, owner_node | Expired claim reclaimed (attempt > 1) |
:busy | count, duration_ms | dedupe_key, owner_node | Claim denied — another node holds it |
:renewed | count | dedupe_key, owner_node | Heartbeat renewal succeeded |
:renew_failed | count | dedupe_key, owner_node | Heartbeat renewal failed (lost ownership) |
:completed | count | dedupe_key, owner_node | Work finished, claim released |
:failed | count | dedupe_key, owner_node | Work failed, claim released |
Key metrics to alert on:
stealrate > 0 during normal operation → nodes are dying or leases are too shortbusyrate proportional to node count → expected (N-1 nodes get busy per dedupe key)renew_failed> 0 → possible clock drift or DB contention
Testing work claims
Unit tests: Use Cyclium.WorkClaims.FakeClaims — an Agent-backed in-memory implementation:
setup do
{:ok, _} = Cyclium.WorkClaims.FakeClaims.start_link()
Application.put_env(:cyclium, :work_claims, Cyclium.WorkClaims.FakeClaims)
on_exit(fn -> Application.delete_env(:cyclium, :work_claims) end)
end
test "second acquire is busy" do
assert {:ok, _} = Cyclium.WorkClaims.gate_acquire("key:1", "node-a")
Cyclium.WorkClaims.FakeClaims.set_busy("key:2")
assert {:error, :busy} = Cyclium.WorkClaims.gate_acquire("key:2", "node-b")
endIntegration tests (single node): Configure EctoClaims with your test repo. Verify:
-
Two concurrent
acquirecalls on the same key — one succeeds, one gets:busy -
After
complete, a newacquireon the same key succeeds (reclaim) -
After lease expiry (set a short lease),
acquiresteals from the previous owner
Multi-node tests: Deploy to a staging cluster with 2+ nodes. Use a test actor with a short schedule:
-
Verify only one node's episode runs (check
owner_nodeincyclium_work_claims) - Kill a node mid-episode, wait for lease expiry, verify another node steals and completes
-
Monitor telemetry —
stealevents should only appear after the kill, never during normal operation
Node Identity
By default, Cyclium uses node() to identify the current BEAM instance for work claims, trigger requests, and recovery coordination. In environments where multiple instances share the same Erlang node name (e.g., dev containers all starting as app@app), this breaks lease semantics — every node looks like the same owner.
Cyclium.NodeIdentity provides a pluggable identity layer:
# Static override — set per instance via config or env var
config :cyclium, :node_identity, "dev-jane"
# MFA callback for dynamic resolution (hostname, env var, etc.)
config :cyclium, :node_identity, {MyApp.NodeIdentity, :resolve, []}
When unconfigured and running in non-distributed mode (:nonode@nohost), a random stable identity is generated per BEAM instance and stored in :persistent_term — unique for the process lifetime but not across restarts.
All work claim operations (EpisodeTask, Heartbeat, Runner.Deferred, TriggerRequests.Poller) use Cyclium.NodeIdentity.name() instead of raw node().
Multi-Stack Deployments
A stack is one logical cyclium cluster that shares a database with other clusters but runs its own Elixir nodes (its own Phoenix.PubSub, its own in-memory persistent_term / ETS registries). Typical reasons to run multiple stacks against one schema: independent release cadences, blast-radius isolation, or partitioning actors across regions.
The library contract is cluster-level
Cyclium itself exposes no per-actor DSL option for stacks. It reads :stack_slug once per cluster, stamps every row its actors produce with that value, and scopes Recovery to the matching slug. Which actors actually run on a given cluster is the consumer's decision, implemented in the host app's supervisor.
Episodes, workflow instances, and deferred trigger requests are stamped with the current source_stack at insert time. Cyclium.Recovery.sweep/1 and Cyclium.Recovery.reconcile_workflows/1 read :stack_slug by default and only scan rows from their own stack — this prevents a crashed cluster's work from being re-driven on a cluster whose persistent_term / PubSub state doesn't know about it.
Partitioning actors across stacks
The simplest approach is one actor list per deployment:
# On the stack_a cluster's host app:
config :cyclium, :stack_slug, System.get_env("CYCLIUM_STACK_SLUG") # "stack_a"
config :my_app, :cyclium_actors, [StackAOnlyActor, SharedActor]
# On the stack_b cluster's host app:
config :cyclium, :stack_slug, System.get_env("CYCLIUM_STACK_SLUG") # "stack_b"
config :my_app, :cyclium_actors, [StackBOnlyActor, SharedActor]A richer approach declares the allowed stacks on each actor's child-spec and has the supervisor filter the list at init time — useful when the same release is deployed to every cluster and you want a single source of truth for which actor runs where:
config :my_app, :cyclium_actors, [
{SharedActor, []},
{StackAOnlyActor, stacks: [:stack_a]},
{CrossStackActor, stacks: [:stack_a, :stack_b]}
]
# In the supervisor's init/1:
stack = Application.get_env(:cyclium, :stack_slug)
children = Enum.filter(configured_actors, &actor_runs_on_stack?(&1, stack))
Either pattern is fine — cyclium doesn't care how the list was built, only what actually gets supervised. Because each cluster has its own node processes, PubSub, and persistent_term cache, a stack-local actor's strategy / budget / log-strategy lookups only exist on the cluster that supervises it — which is exactly why Recovery must be stack-scoped.
Runtime configuration
For a single release that can be deployed into multiple stacks, drive the slug from an env var in runtime.exs:
# runtime.exs
config :cyclium, :stack_slug, System.get_env("CYCLIUM_STACK_SLUG")
Leaving :stack_slug unset (or nil) is the single-stack default: rows are stamped NULL and Recovery scans without a stack filter. Pre-migration rows with NULLsource_stack are swept by any stack for one release so legacy episodes aren't orphaned — once all rows have been stamped, you can tighten Recovery by stamping a real slug on every cluster.
Related config
:stack_slug— identity of this cluster (stamps rows and scopes Recovery):trigger_poll_source_stack— separate filter onTriggerRequests.Poller: which stacks' deferred requests this full-mode node will pick up (often the same value as:stack_slug, but can be broader)Cyclium.StackSlug.current/0— read the slug; returnsnilwhen unset
Trigger-Only Mode (Deferred Execution)
In shared environments — dev machines on a common test DB, QC/sandbox instances, CI — running cyclium actors on every node leads to competing work claims and unpredictable execution. Conversely, disabling cyclium entirely on non-processing nodes leaves the UI hobbled: episodes never fire, statuses never update.
Trigger-only mode solves both problems by decoupling event processing from episode execution.
Three operating modes
| Mode | Actors start? | Events flow? | Episodes execute locally? | Trigger requests written? |
|---|---|---|---|---|
:full | yes | yes | yes | no (direct) |
:trigger_only | yes | yes | no | yes (to DB) |
:disabled | no | no | no | no |
How it works
In :trigger_only mode, the actor supervision tree starts normally — Bus subscriptions, schedule timers, debounce, circuit breakers all work. But the runner is swapped to Cyclium.Runner.Deferred, which writes a row to cyclium_trigger_requests instead of spawning a Task. The episode record is still created in the DB so the UI can display it.
On :full mode nodes, a Cyclium.TriggerRequests.Poller watches the trigger requests table and dispatches deferred episodes to Runner.OTP for local execution. The poller uses WorkClaims.gate_acquire/3 (with dedupe key "trigger_request:<id>") to coordinate dispatch across nodes — only one full-mode node will pick up a given request. If work claims are not configured, the poller falls through to passthrough mode. The poller can be scoped by source_stack to only pick up requests from specific stacks.
Configuration
# Host app config — set per environment
config :my_app, :cyclium_mode, :full # :full | :trigger_only | :disabled
# On full-mode nodes: enable the poller
config :cyclium, :trigger_poller, true
config :cyclium, :trigger_poll_interval_ms, 5_000 # default
config :cyclium, :trigger_poll_source_stack, "stack_a" # nil = pick up all
# On trigger-only nodes: runner is set automatically
# config :cyclium, :runner, Cyclium.Runner.Deferred
# config :cyclium, :stack_slug, :stack_aDeployment scenarios
Dev machines + shared test DB:
-
QC/test node runs
:fullwith the poller enabled -
Dev machines run
:trigger_only— events flow, UI works, episodes defer to the processing node -
A dev who wants to process locally overrides to
:fulland narrows their actor list
Sandbox / feature-branch testing:
-
Sandbox runs
:trigger_only— UI flows complete, episode records exist, Bus events fire -
Designated processing node runs
:fulland picks up deferred triggers
Production (unchanged):
-
All nodes run
:fullas before; work claims handle multi-node coordination - The trigger requests table stays empty
Database table
The cyclium_trigger_requests table (V14 migration):
| Column | Type | Notes |
|---|---|---|
episode_id | binary_id |
FK to cyclium_episodes |
actor_id | string | Actor that created the trigger |
expectation_id | string | Expectation that fired |
source_node | string | Node identity of the trigger-only instance |
source_stack | string | Stack slug for scoped polling |
status | string | pending, claimed, completed, expired |
opts | map | Runner options (e.g., resume flag) |
claimed_by | string | Node identity of the full-mode instance |
Indexed on (status, inserted_at) for efficient polling.
Runtime mode switching
Cyclium.Mode supports live mode changes without restart — both node-wide and per-actor:
# Switch the whole node (via remote console, admin endpoint, etc.)
Cyclium.Mode.set(:trigger_only) # stop local execution, defer to DB
Cyclium.Mode.set(:full) # resume local execution + polling
# Per-actor override — yield one actor to another node while keeping the rest
Cyclium.Mode.set_actor_override(:client_health, :trigger_only)
Cyclium.Mode.clear_actor_override(:client_health)
Cyclium.Mode.clear_all_overrides()
# Inspect current state
Cyclium.Mode.status()
# %{node_mode: :full, overrides: %{client_health: :trigger_only}, node_identity: "..."}
Mode reads are ETS-backed (read_concurrency: true) for zero overhead in hot paths. The trigger request poller self-gates on each cycle — it only polls when the node-wide mode is :full.
Dynamic Actors
Dynamic actors allow agent definitions to be stored in the database and hydrated into running supervised processes at runtime — without requiring compiled Elixir modules.
When to use dynamic actors
- Users create custom monitoring agents through a UI
- Agent definitions are stored per-tenant
- Agent configurations change frequently without requiring code deploys
How it works
A single Cyclium.DynamicActor GenServer module serves all DB-defined actors. Each instance is started with different config/expectations args under Cyclium.ActorSupervisor. This avoids runtime module pollution — no Module.create/3 needed.
Cyclium.ActorSupervisor (DynamicSupervisor)
├── MyApp.Agents.CompiledActor (compiled, use Cyclium.Actor)
├── Cyclium.DynamicActor (from DB: "user_monitor_1")
└── Cyclium.DynamicActor (from DB: "user_monitor_2")Defining an agent in the database
Insert a row into cyclium_agent_definitions:
%Cyclium.Schemas.AgentDefinition{
actor_id: "custom_health_check",
domain: "monitoring",
strategy_template: "observe_classify_converge", # built-in template
config: Jason.encode!(%{max_concurrent_episodes: 3, episode_overflow: "queue"}),
expectations: Jason.encode!([
%{
id: "check_target",
trigger: %{type: "schedule", interval_ms: 300_000},
budget: %{max_turns: 5, max_tokens: 10_000, max_wall_ms: 60_000},
log_strategy: "timeline"
}
]),
enabled: true
}Loading dynamic actors
# At application startup — loads all enabled definitions
Cyclium.DynamicActor.Loader.load_all()
# Load a single actor
Cyclium.DynamicActor.Loader.load("custom_health_check")
# Reload after updating the definition in DB
Cyclium.DynamicActor.Loader.reload("custom_health_check")
# Stop a dynamic actor
Cyclium.DynamicActor.Loader.stop("custom_health_check")Database table
cyclium_agent_definitions (V7 migration):
| Column | Type | Notes |
|---|---|---|
id | uuid | PK |
actor_id | string(255) | Unique identifier |
domain | string(255) | Grouping domain |
config | text (JSON) | max_concurrent_episodes, episode_overflow, etc. |
expectations | text (JSON) | Array of expectation definitions |
strategy_ref | string(255) | Strategy module or registry lookup key |
strategy_template | string(255) |
Template name (e.g. "observe_synthesize_converge") |
strategy_config | text (JSON) | Template parameters (gatherer, system_prompt, finding_config, etc.) |
enabled | boolean | Soft toggle |
created_by | string(255) | User/tenant |
Strategy templates (data-driven strategies)
Dynamic actors use strategy templates — built-in parameterized strategy modules that are configured via strategy_config JSON in the DB. The compiled app defines what data sources (gatherers) and outputs are available; the DB definition composes them.
| Template | Pattern | Use Case |
|---|---|---|
"observe_synthesize_converge" | Gather → LLM → Finding | Health checks, advisors, analysis |
"observe_classify_converge" | Gather → Rules → Finding | Threshold/rule-based monitoring |
"dispatch" | Load entities → Broadcast events | Fan-out triggers |
Gatherers
Gatherers are compiled modules that know how to collect domain-specific data. The compiled app implements and registers them:
defmodule MyApp.Gatherers.ProjectData do
@behaviour Cyclium.Gatherer
@impl true
def gather(trigger_payload, _opts) do
project_id = trigger_payload["project_id"]
project = Repo.get!(Project, project_id)
orders = load_orders(project_id)
{:ok, %{project: project, orders: orders, order_count: length(orders)}}
end
endRegister in app config:
config :cyclium, :gatherer_registry, %{
"project_data" => MyApp.Gatherers.ProjectData,
"client_metrics" => MyApp.Gatherers.ClientMetrics
}Observe → Synthesize → Converge
The main workhorse. Gathers data, sends to LLM, maps result to findings:
%AgentDefinition{
actor_id: "project_health_dynamic",
strategy_template: "observe_synthesize_converge",
strategy_config: Jason.encode!(%{
"gatherer" => "project_data",
"system_prompt" => "You are a project health analyst. Evaluate the project data and classify its health status.",
"finding_config" => %{
"actor_id_field" => "project_health_dynamic",
"finding_key_template" => "project:health:${subject_id}",
"class_field" => "class",
"severity_field" => "severity",
"summary_field" => "summary",
"subject_kind" => "project",
"subject_id_key" => "project_id"
},
"outputs" => ["email"]
}),
expectations: Jason.encode!([
%{id: "evaluate", trigger: %{type: "event", event_type: "project.check_requested"}}
])
}Observe → Classify → Converge
Rule-based classification without LLM. Rules are evaluated in order, first match wins:
%AgentDefinition{
actor_id: "client_risk_monitor",
strategy_template: "observe_classify_converge",
strategy_config: Jason.encode!(%{
"gatherer" => "client_metrics",
"classify_rules" => [
%{"field" => "mrr", "op" => "lt", "value" => 500, "class" => "at_risk", "severity" => "high"},
%{"field" => "last_login_days_ago", "op" => "gt", "value" => 30, "class" => "inactive", "severity" => "medium"}
],
"default_class" => "healthy",
"default_severity" => "low",
"finding_config" => %{
"finding_key_template" => "client:risk:${subject_id}",
"subject_kind" => "client",
"subject_id_key" => "client_id"
}
})
}
Rule operators: lt, gt, eq, neq, in, not_in.
Dispatch
Fan-out pattern. Calls a gatherer that returns a list of entities, broadcasts an event for each:
%AgentDefinition{
actor_id: "project_dispatch",
strategy_template: "dispatch",
strategy_config: Jason.encode!(%{
"gatherer" => "active_projects",
"event_type" => "project.check_requested",
"entity_id_field" => "id",
"entity_payload_fields" => ["id", "name"]
})
}Strategy resolution for dynamic actors
Dynamic actors use the same :persistent_term registration path as compiled actors. When a dynamic actor boots, Loader resolves the strategy from strategy_template and injects it into each expectation — init_state_from_config then registers it automatically. No strategy registry entries needed.
If you need to override a strategy without updating the DB record, add a strategy_for/2 clause to your registry as usual.
Custom templates can be registered in app config:
config :cyclium, :strategy_templates, %{
"my_custom_template" => MyApp.Strategies.CustomTemplate
}Lifecycle and draining
Safe lifecycle operations for updating dynamic actors without losing in-flight episodes.
Drain and reload
# Graceful: waits for active episodes to finish, then reloads from DB
Cyclium.DynamicActor.Lifecycle.drain_and_reload("my_monitor")
# Graceful stop (waits for episodes)
Cyclium.DynamicActor.Lifecycle.drain_and_stop("my_monitor")
# Instant stop/reload (existing behavior, may lose in-flight episodes)
Cyclium.DynamicActor.Loader.stop("my_monitor")
Cyclium.DynamicActor.Loader.reload("my_monitor")Event-driven refresh
Start the optional Watcher in your supervision tree for automatic refresh:
children = [
# ... your app ...
Cyclium.DynamicActor.Watcher
]Then broadcast events when definitions change:
# Agent definitions:
Cyclium.Bus.broadcast("agent_definition.created", %{actor_id: "my_monitor"})
Cyclium.Bus.broadcast("agent_definition.updated", %{actor_id: "my_monitor"})
Cyclium.Bus.broadcast("agent_definition.disabled", %{actor_id: "my_monitor"})
# Workflow definitions:
Cyclium.Bus.broadcast("workflow_definition.created", %{workflow_id: "onboarding"})
Cyclium.Bus.broadcast("workflow_definition.updated", %{workflow_id: "onboarding"})
Cyclium.Bus.broadcast("workflow_definition.disabled", %{workflow_id: "onboarding"})
The Watcher handles each event appropriately — created loads, updated reloads, disabled stops/unloads. For actors, updates use drain-and-reload to preserve in-flight episodes.
Deploy patterns
Rolling deploy:
# In application stop callback or shutdown hook:
Cyclium.DynamicActor.Lifecycle.stop_all(drain: true, timeout: 30_000)Blue-green: The new instance calls Loader.load_all() on startup. Global name registration ensures only one instance runs per actor across the cluster.
Dry Runs / Simulations
Dry runs let you test what an actor would do without producing real findings, outputs, or side effects. Useful for validating agent configurations and "what if" testing.
How dry runs work
An episode with mode: "dry_run":
-
Runs the full strategy loop (same
next_step→handle_resultcycle) - Findings are NOT persisted — but are journaled for inspection (optionally persistable with prefixed keys via
persist_findingsoption) - Outputs are NOT delivered — but output proposals are journaled
- Tool calls and synthesis can be overridden with mock responses
- Steps are fully journaled — complete audit trail of what would have happened
- Episode is tagged as
mode: "dry_run"for filtering in UI and metrics
Force-firing a dry run
# Simplest: real tool calls and synthesis, skip persist
Cyclium.Episodes.force_fire("project_health_actor", "evaluate_project",
mode: :dry_run,
trigger_payload: %{project_id: 123}
)
# With mock overrides — skip real API calls
Cyclium.Episodes.force_fire("project_health_actor", "evaluate_project",
mode: :dry_run,
trigger_payload: %{project_id: 123},
overrides: %{
tool_overrides: %{"erp.get_orders" => [%{id: 1, status: "complete"}]},
synthesis_override: %{"class" => "healthy", "severity" => "low"}
}
)Override resolution (layered)
Three sources of overrides, checked in priority order:
fire-time overrides > expectation-level DSL > real execution- Fire-time overrides — passed to
force_fire/3as:overridesoption - Expectation-level DSL — defined in the actor definition:
expectation(:evaluate_project,
strategy: MyApp.Strategies.ProjectHealth,
trigger: {:event, "project_health.check_requested"},
dry_run: [
tool_overrides: %{
{"erp", "get_orders"} => {:ok, %{orders: []}}
},
synthesis_override: {:ok, %{"class" => "healthy"}}
]
)- No overrides — real tool calls and synthesis execute normally, only findings and outputs are skipped
Persisting findings in dry runs
By default, dry run findings are journaled but not persisted to the DB. You can opt in to persistence with prefixed keys so dry run findings don't collide with live ones:
# Persist with default "dry_run" prefix (finding_key becomes "dry_run:po_stalled:PO-123")
Cyclium.Episodes.force_fire("po_monitor", "check_pos",
mode: :dry_run,
overrides: %{persist_findings: true}
)
# Persist with custom prefix (finding_key becomes "experiment1:po_stalled:PO-123")
Cyclium.Episodes.force_fire("po_monitor", "check_pos",
mode: :dry_run,
overrides: %{persist_findings: "experiment1"}
)
The prefix is also applied to actor_id on persisted findings, so Findings.active_for(actor: "po_monitor") won't return dry run findings — use Findings.active_for(actor: "dry_run:po_monitor") instead, or use the mode-aware helper:
# Automatically prefixes filters when episode is a dry run with persist_findings enabled:
Cyclium.Findings.active_for_mode([actor: "po_monitor"], episode)This can also be set at the expectation level in the actor DSL:
expectation(:check_pos,
strategy: MyApp.Strategies.PoCheck,
trigger: {:schedule, 300_000},
dry_run: [persist_findings: true]
)Via the Actor GenServer
You can also fire dry runs through the actor's message interface:
GenServer.cast(MyApp.Agents.ProjectHealth, {:force_fire, :evaluate_project, mode: :dry_run})Dry run results
The episode completes with full step journal. In the UI:
- "DRY RUN" badge on the episode
-
Step timeline shows which steps used mock overrides (
_dry_run: truein result_ref) - Findings show what would have been created
- Full step-by-step debugging available
Workflow dry runs
Workflows support dry run mode — every step episode inherits the mode and opts from the workflow instance:
# Compiled workflow
WorkflowEngine.start_workflow(MyWorkflow, trigger_data, mode: :dry_run)
# Dynamic workflow with finding persistence
WorkflowEngine.start_dynamic_workflow("order_flow", trigger_data,
mode: :dry_run,
dry_run_opts: %{persist_findings: true}
)
All step episodes will run in dry run mode: findings are journaled (and optionally persisted with prefix), outputs are skipped. The workflow instance itself stores mode and dry_run_opts (V9 migration), so retries and subsequent steps also inherit the mode.
Per-step overrides allow targeting different mocks and options to individual steps:
WorkflowEngine.start_dynamic_workflow("vendor_onboarding", trigger_data,
mode: :dry_run,
dry_run_opts: %{
persist_findings: true,
steps: %{
"compliance_check" => %{
"synthesis_override" => %{"class" => "high_risk", "severity" => "high"}
},
"connector_setup" => %{
"tool_overrides" => %{"erp.create_vendor" => %{"id" => "mock-v-001"}},
"persist_findings" => "experiment1"
}
}
}
)
Global keys (like persist_findings: true) apply to all steps. Step-specific keys override globals for that step. The "steps" key itself is stripped from each episode's opts.
Strategies in workflow steps can use Findings.active_for_mode/3 to transparently query their own dry run findings when persist_findings is enabled.
Tools
External capabilities are registered as tools implementing Cyclium.Tool. Use use Cyclium.Tool for sensible defaults — the only required callback is call/3:
defmodule MyApp.Tools.ERP do
use Cyclium.Tool
@impl true
def call(:read_po, args, _ctx) do
case MyApp.ERP.get_po(args["po_id"]) do
{:ok, po} -> {:ok, po}
{:error, reason} -> {:error, reason}
end
end
endOverride optional callbacks as needed:
defmodule MyApp.Tools.VendorAPI do
use Cyclium.Tool
@impl true
def call(:send_notification, args, _ctx), do: # ...
# Strip credentials before journaling
@impl true
def redact(args), do: Map.drop(args, ["api_key"])
# Strip large payloads from results before journaling
@impl true
def redact_result(result) when is_list(result) do
%{count: length(result), ids: Enum.map(result, & &1.id)}
end
def redact_result(result), do: result
# Mark as having side effects (affects caching/retry behavior)
@impl true
def side_effect?, do: true
# Cache results for 5 minutes
@impl true
def cache_ttl, do: :timer.minutes(5)
# Cache key scope — same PO ID returns cached result
@impl true
def cache_scope(args), do: args["po_id"]
end| Callback | Default | Description |
|---|---|---|
call(action, args, ctx) | required | Execute the tool action |
redact(args) | passthrough | Strip sensitive/bulky data from args before journaling |
redact_result(result) | passthrough | Strip bulky data from results before journaling |
side_effect?() | false | Whether the action mutates external state |
cache_ttl() | :no_cache | How long to cache results (ms) |
cache_scope(args) | "" | Cache key discriminator |
Register tools in config:
config :cyclium, :capability_registry, %{
erp_read: MyApp.Tools.ERP,
vendor_api: MyApp.Tools.VendorAPI
}
Strategies invoke tools via {:tool_call, :erp_read, :read_po, %{"po_id" => "PO-123"}}. The ToolExec wrapper handles capability resolution, caching, redaction, and error classification.
Reconciler
The optional Cyclium.Reconciler watches for spec.updated Bus events and reconciles running actors when their configuration changes at runtime:
- Sends updated config to actor GenServers
- Cancels timers for removed expectations
- Starts timers for newly added schedule expectations
- Identifies orphaned blocked episodes (expectation removed) and cancels them
Enable via config:
config :cyclium, :reconciler, trueOr trigger manually:
Cyclium.Reconciler.reconcile_actor(actor_pid, new_module)LiveView integration
Cyclium integrates with Phoenix LiveView via the Bus. Subscribe in your LiveView's mount and handle events:
defmodule MyAppWeb.DashboardLive do
use MyAppWeb, :live_view
def mount(_params, _session, socket) do
if connected?(socket), do: Cyclium.Bus.subscribe()
{:ok, assign(socket, findings: load_findings())}
end
def handle_info({:bus, event, _payload}, socket)
when event in ["finding.raised", "finding.updated", "finding.cleared"] do
{:noreply, assign(socket, findings: load_findings())}
end
def handle_info({:bus, _event, _payload}, socket) do
{:noreply, socket}
end
endDatabase tables
All tables use binary_id primary keys and are SQL Server 2017 compatible (no JSON operators in DDL, application-layer upserts, denormalized columns for indexed queries).
| Table | Migration | Purpose |
|---|---|---|
cyclium_episodes | V1 | Episode lifecycle, budget tracking, classification |
cyclium_episode_steps | V1 | Step-by-step journal (16 step kinds) |
cyclium_episode_checkpoints | V1 | Versioned strategy state snapshots |
cyclium_findings | V1 | Persistent observations with raise/update/clear lifecycle |
cyclium_outputs | V1 | Output proposals, delivery status, deduplication |
cyclium_episode_logs | V2 | Materialized human-readable logs |
cyclium_workflow_instances | V3, V9 | Workflow execution tracking, step states, dry run mode |
cyclium_work_claims | V6 | Lease-based distributed work coordination |
cyclium_agent_definitions | V7 | DB-stored actor definitions for dynamic actors |
cyclium_workflow_definitions | V8 | DB-stored workflow definitions for dynamic workflows |
cyclium_trigger_requests | V14 | Deferred episode execution for trigger-only nodes |
V4 adds archived_at to episodes and findings. V5 replaces the non-unique dedupe_key index on episodes with a filtered unique index (WHERE dedupe_key IS NOT NULL AND archived_at IS NULL) for multi-node coordination. V6 adds the cyclium_work_claims table for lease-based distributed coordination across clustered nodes. V7 adds cyclium_agent_definitions for dynamic actors and mode/dry_run_opts columns to episodes for simulation support. V8 adds cyclium_workflow_definitions for dynamic workflows. V10 adds caused_by_key (finding causality chains) and expires_at (TTL-based expiration) to cyclium_findings.
Window helpers
Cyclium.Window provides clock-aligned deduplication buckets for output dedupe_key construction:
Cyclium.Window.bucket(:h4, DateTime.utc_now()) # "2026-02-24T08" (4-hour windows)
Cyclium.Window.bucket(:h24, DateTime.utc_now()) # "2026-02-24" (daily)
Cyclium.Window.bucket(:h48, DateTime.utc_now()) # "2026-02-24" (every-other-day)
Cyclium.Window.bucket(:w1, DateTime.utc_now()) # "2026-W09" (ISO week)
Use these in dedupe_key to prevent duplicate outputs within a time window:
dedupe_key: "alert:client:#{id}:#{Cyclium.Window.bucket(:h4, DateTime.utc_now())}"Batch helpers
Cyclium.Batch provides a lightweight struct for strategies that process data in grouped batches across multiple :synthesize calls. No new step types — strategies continue using :tool_call and :synthesize as normal.
# Group items semantically (e.g., by base item so variants are compared together)
groups = Cyclium.Batch.group_by(items, & &1.base_item_id)
batch = Cyclium.Batch.init(groups)
# Or chunk by fixed size
batch = items |> Cyclium.Batch.chunk(10) |> Cyclium.Batch.init()
In next_step, drive the loop:
case Cyclium.Batch.current_group(state.batch) do
nil -> :converge # all groups processed
{group_key, items} -> {:synthesize, build_prompt(group_key, items)}
end
In handle_result, advance:
batch = Cyclium.Batch.advance(state.batch, parsed_result)
{:ok, %{state | batch: batch}}
Progress tracking via Batch.group_count/1, Batch.processed_count/1, and Batch.done?/1.
Per-item episodes vs. batch processing
Batch helpers are useful when a single episode needs to process many items in groups — but they're not the only pattern. An alternative is to fire one episode per item, driven by domain events. This is the pattern used by the project health actor.
The difference:
| Approach | When to use | Episode count |
|---|---|---|
| Batch (single episode) | Scheduled sweep over a large dataset; items need cross-comparison |
One episode, many :synthesize turns |
| Per-item (many episodes) | Event-driven re-evaluation; each item is independent | One episode per item |
Per-item pattern — ProjectHealthActor:
The actor listens for "project.updated" events. Each event carries a project_id, and each project gets its own independent episode. There's no need to load the full dataset — the trigger tells you which item changed.
defmodule MyApp.Actors.ProjectHealthActor do
use Cyclium.Actor
actor do
domain(:project_health)
spec_rev("v0.1.0")
max_concurrent_episodes(5)
episode_overflow(:queue)
expectation(:project_should_be_healthy,
strategy: MyApp.Strategies.ProjectHealth,
trigger: {:event, "project.updated"},
subject_key: :project_id,
debounce_ms: :timer.seconds(2),
budget: %{max_turns: 3, max_tokens: 1_000, max_wall_ms: 10_000}
)
end
endThe strategy is single-turn — load the item, classify it deterministically, emit findings:
defmodule MyApp.Strategies.ProjectHealth do
@behaviour Cyclium.EpisodeRunner.Strategy
@impl true
def init(_episode, trigger) do
{:ok, %{project_id: trigger.payload["project_id"]}}
end
@impl true
def next_step(_state, _episode_ctx), do: :converge
@impl true
def handle_result(state, _step, _result), do: {:ok, state}
@impl true
def converge(state, episode_ctx) do
project = MyApp.Projects.get!(state.project_id)
percent_spent = project.spent / max(project.budget, 1)
days_remaining = Date.diff(project.due_date, Date.utc_today())
{class, severity, summary} = classify(project.status, percent_spent, days_remaining)
{:ok, %Cyclium.ConvergeResult{
classification: %{"primary" => class, "severity" => to_string(severity)},
confidence: 1.0,
summary: summary,
findings: [
{:raise, %{
actor_id: "project_health_actor",
finding_key: "project:health:#{project.id}:#{episode_ctx.episode_id}",
class: class,
severity: severity,
confidence: 1.0,
subject_kind: "project",
subject_id: project.id,
summary: summary,
evidence_refs: %{
"percent_spent" => percent_spent,
"days_remaining" => days_remaining
}
}}
],
outputs: []
}}
end
defp classify(:completed, _pct, _days), do: {"complete", :low, "Project completed"}
defp classify(_s, pct, _d) when pct > 1.0, do: {"over_budget", :high, "Over budget"}
defp classify(_s, pct, _d) when pct > 0.85, do: {"budget_risk", :medium, "Budget at risk"}
defp classify(_s, _pct, d) when d < 0, do: {"overdue", :high, "Past due date"}
defp classify(_s, _pct, d) when d < 3, do: {"schedule_risk", :medium, "Due date approaching"}
defp classify(_s, _pct, _d), do: {"healthy", :low, "On track"}
endWhy this works well:
- Reactive — evaluation happens the moment data changes, not on a polling schedule
- Isolated — each project gets its own episode with its own budget, journal, and findings
- Backpressure built in —
max_concurrent_episodes+debounce_ms+subject_keyprevent a burst of updates from overwhelming the system. Rapid updates to the same project coalesce into one episode - Findings create an audit trail — the episode-scoped
finding_key("project:health:#{id}:#{episode_id}") means each evaluation produces its own finding, giving you a point-in-time history of how health evolved
When to reach for Batch instead: If you need to process 500 items in one pass (e.g., a nightly SKU classification sweep), a single episode with Cyclium.Batch is more efficient than 500 separate episodes — fewer rows, one journal, and the ability to compare items within groups.
Synthesizers
A synthesizer is the bridge between strategies and LLM infrastructure. It implements Cyclium.Synthesizer and is called when a strategy returns {:synthesize, prompt_ctx}. The synthesizer handles the actual LLM call, and its response flows back through handle_result/3.
defmodule MyApp.Synthesizers.ProjectAnalysis do
@behaviour Cyclium.Synthesizer
@impl true
def synthesize(prompt_ctx, _episode_ctx) do
case MyApp.LLM.chat(prompt_ctx.system, prompt_ctx.user) do
{:ok, text} -> {:ok, %{text: text}}
{:error, reason} -> {:error, :llm_error, reason}
end
end
@impl true
def estimate_tokens(prompt_ctx) do
# Rough estimate for budget tracking
String.length(prompt_ctx.user || "") |> div(4)
end
endAttaching a synthesizer to an actor
Synthesizers can be declared at two levels:
Actor-level — inherited by all expectations in the actor that use :synthesize. Declare it when most or all expectations share the same synthesizer:
defmodule MyApp.Actors.ProjectAdvisorActor do
use Cyclium.Actor
actor do
domain(:project_advisory)
spec_rev("v0.1.0")
synthesizer(MyApp.Synthesizers.ProjectAnalysis)
max_concurrent_episodes(3)
episode_overflow(:queue)
expectation(:project_ai_summary,
strategy: MyApp.Strategies.ProjectAdvisor,
trigger: {:event, "project.summary_requested"},
budget: %{max_turns: 5, max_tokens: 10_000, max_wall_ms: 30_000}
)
expectation(:project_risk_assessment,
strategy: MyApp.Strategies.ProjectRisk,
trigger: {:event, "project.risk_review_requested"},
budget: %{max_turns: 8, max_tokens: 15_000, max_wall_ms: 60_000}
)
end
end
Both expectations above use MyApp.Synthesizers.ProjectAnalysis.
Expectation-level — overrides the actor-level synthesizer for a specific expectation. Use this when one expectation needs a different model or configuration:
actor do
domain(:project_advisory)
synthesizer(MyApp.Synthesizers.ProjectAnalysis) # default for this actor
expectation(:project_ai_summary,
strategy: MyApp.Strategies.ProjectAdvisor,
trigger: {:event, "project.summary_requested"},
synthesizer: MyApp.Synthesizers.FastSummary # override for this expectation
)
expectation(:project_risk_assessment,
strategy: MyApp.Strategies.ProjectRisk,
trigger: {:event, "project.risk_review_requested"}
# uses ProjectAnalysis (inherited from actor)
)
end
The strategy itself doesn't know or care which synthesizer is wired in — it just returns {:synthesize, prompt_ctx} and receives the result in handle_result:
def next_step(%{project_data: data, ai_summary: nil}, _ctx) do
{:synthesize, %{
system: "You are a project risk analyst.",
user: "Project: #{data.name}, #{data.percent_spent * 100}% spent, #{data.days_remaining} days left"
}}
end
def handle_result(state, %{kind: :synthesis}, {:ok, %{text: text}}) do
{:ok, %{state | ai_summary: text}}
endWhen you don't need a synthesizer: If your strategy is purely deterministic (like the ProjectHealthStrategy above), you don't need to declare a synthesizer at all. The synthesizer is only invoked when a strategy returns {:synthesize, prompt_ctx} — if your strategy never does, the synthesizer configuration is ignored.
LLM-provided confidence
Findings have a confidence field (0.0–1.0). For deterministic strategies, hardcoding 1.0 is fine. But when an LLM produces the assessment, you can ask it to self-report confidence and pass that through to the finding.
Step 1 — Add confidence to the tool schema in your synthesizer:
@tool_definition %{
type: "function",
function: %{
name: "project_health_assessment",
parameters: %{
type: "object",
properties: %{
class: %{type: "string", enum: ["healthy", "at_risk", "critical"]},
severity: %{type: "string", enum: ["low", "medium", "high", "critical"]},
summary: %{type: "string", description: "One sentence health summary"},
confidence: %{
type: "number",
minimum: 0.0,
maximum: 1.0,
description:
"How confident you are in this assessment (0.0–1.0). " <>
"Use lower values when data is sparse or ambiguous, " <>
"higher values when the evidence clearly supports the classification."
}
# ... other fields
},
required: ["class", "severity", "summary", "confidence"]
}
}
}
The description matters — it tells the LLM what the scale means, which produces more calibrated values than a bare "confidence" field.
Step 2 — Read it in converge and pass it to the finding:
def converge(state, episode_ctx) do
result = state.assessment
confidence = parse_confidence(result["confidence"])
{:ok, %ConvergeResult{
classification: %{"primary" => result["class"]},
confidence: confidence,
summary: result["summary"],
findings: [
{:raise, %{
finding_key: "project:health:#{state.project_id}:#{episode_ctx.episode_id}",
confidence: confidence,
# ... other finding fields
}}
]
}}
end
defp parse_confidence(val) when is_number(val), do: max(0.0, min(val, 1.0))
defp parse_confidence(_), do: 0.5
The parse_confidence/1 helper clamps to [0.0, 1.0] and falls back to 0.5 if the LLM returns something unexpected. The same confidence flows into both the ConvergeResult (episode-level) and the finding (queryable).
When to hardcode instead: If the classification is deterministic (rule-based, no LLM), use confidence: 1.0. The LLM confidence pattern is for cases where the assessment involves judgment — ambiguous data, sparse evidence, or nuanced classification where the LLM's certainty is genuinely informative.
Test kit
Cyclium ships a test kit in Cyclium.Test.* that host apps can use to smoke-test their definitions without running full episodes. Import the helpers with use:
Actor validation
defmodule MyApp.Actors.ClientHealthActorTest do
use ExUnit.Case, async: true
use Cyclium.Test.ActorCase
test "actor definition is valid" do
assert_valid_actor(MyApp.Actors.ClientHealthActor)
end
test "all expectations have strategies" do
assert_strategies_defined(MyApp.Actors.ClientHealthActor)
end
test "budgets are well-formed" do
assert_budgets_valid(MyApp.Actors.ClientHealthActor)
end
test "spec_rev is set" do
assert_spec_rev_set(MyApp.Actors.ClientHealthActor)
end
endStrategy contract verification
defmodule MyApp.Strategies.ClientHealthTest do
use ExUnit.Case, async: true
use Cyclium.Test.StrategyCase
@episode build_test_episode(actor_id: "client_health", expectation_id: "health_check")
@trigger %Cyclium.Trigger.Manual{requested_by: "test"}
test "init returns valid state" do
assert_valid_init(MyApp.Strategies.ClientHealth, @episode, @trigger)
end
test "strategy terminates within budget" do
assert_strategy_terminates(MyApp.Strategies.ClientHealth, @episode, @trigger,
max_steps: 20
)
end
endSynthesizer testing
use Cyclium.Test.SynthesizerCase
# Contract validation
assert_valid_synthesize(MySynthesizer, prompt_ctx, episode_ctx)
assert_valid_estimate_tokens(MySynthesizer, prompt_ctx)
# FakeSynthesizer for strategy tests
{:ok, _} = Cyclium.Test.FakeSynthesizer.start_link()
Cyclium.Test.FakeSynthesizer.set_response(%{"answer" => "42"})
# ... run strategy, then inspect calls:
Cyclium.Test.FakeSynthesizer.calls()Output adapter testing
use Cyclium.Test.OutputCase
# Contract validation
assert_valid_deliver(MyApp.Adapters.Slack, :slack, payload, ctx)
# FakeOutputAdapter for integration tests
{:ok, _} = Cyclium.Test.FakeOutputAdapter.start_link()
# ... run episode, then inspect deliveries:
Cyclium.Test.FakeOutputAdapter.deliveries()Workflow validation
defmodule MyApp.Workflows.VendorOnboardingTest do
use ExUnit.Case, async: true
use Cyclium.Test.WorkflowCase
test "workflow is valid" do
assert_valid_workflow(MyApp.Workflows.VendorOnboarding)
end
test "all steps have failure policies" do
assert_failure_policies_complete(MyApp.Workflows.VendorOnboarding)
end
test "step inputs don't crash" do
assert_step_inputs_safe(MyApp.Workflows.VendorOnboarding,
trigger: %{"vendor_id" => "v123"}
)
end
endCheckpoint migration fuzzing
Property-based testing for migrate/2 chains using StreamData:
use Cyclium.Test.CheckpointMigration
# Fuzz test: generate random states, migrate through version chain, assert no crashes
assert_migration_safe(MyCheckpoint, iterations: 200)
# Specific version migration
assert_migration(MyCheckpoint, %{"old_field" => 1}, 1, 3)
# Idempotency: migrating an already-current state is a no-op
assert_migration_idempotent(MyCheckpoint, iterations: 100)Demo application
See cyclium_ex_hapi for a complete Phoenix LiveView application demonstrating Cyclium:
- Client health monitoring with real-time evaluation
- LLM-powered AI advisor actor (Anthropic Claude integration)
- Simulation controls for testing different scenarios
- Episode detail view with step timeline and rendered logs
- Reactive UI via Bus event subscriptions
Development
# Install dependencies
mix deps.get
# Run tests
mix test
# Dialyzer (static analysis)
mix dialyzer
# Compile
mix compile --warnings-as-errors