Cyclium

Autonomous agent framework for Elixir — actors monitor domains, run multi-turn episodes with budget enforcement, and produce persistent findings and typed outputs.

Cyclium is an Elixir library for building agentic systems that monitor domains, run multi-turn episodes, classify situations, and produce typed outputs. Actors declare expectations about how things should be; when triggers fire, episodes execute strategies that can gather data, call tools, synthesize with LLMs, and converge into findings and outputs. Think of it as an OTP-native agent framework where the episode — not the request — is the unit of work.

Key features

Who is this for?

Cyclium is designed for Elixir teams building autonomous agent systems where:

If you need a simple cron job or a one-shot script, Cyclium is overkill. Cyclium shines when you have ongoing, stateful processes that produce findings and outputs — and need the lifecycle, audit trail, and coordination to go with them.

How Cyclium differs

vs. Oban — Oban is a job queue: enqueue work, run it, done. Cyclium is an agent framework that manages stateful, multi-turn episodes with budgets, findings, outputs, and workflows. Episodes happen to run as OTP Tasks, so you don't need Oban — but the two solve different problems. You could use both: Oban for fire-and-forget jobs, Cyclium for ongoing autonomous processes.

vs. Sagents — Sagents is built for interactive AI conversations where users chat with LLM-powered agents in real time. Cyclium is built for autonomous operational agents that monitor domains, classify situations, and act — with or without an LLM in the loop. Cyclium's strategies can call LLMs via :synthesize, but they can also run purely deterministic logic. The execution model (expectations → episodes → findings → outputs) is designed for operational workflows, not chat.

vs. GenServers / custom OTP — You could build all of this with raw GenServers, but Cyclium gives you the episode lifecycle (budgets, journaling, checkpoints, crash recovery), the findings system (upsert-by-key, severity, evidence), the output router (deduplication, adapters, approval gates), the workflow engine (dependency graphs, failure policies), and the event bus — all wired together with telemetry and audit trails.

Strategy-driven vs. LLM-routed

Most agent frameworks put the LLM in the driver's seat — it decides which tool to call, when to stop, and how to recover from errors. Cyclium inverts this. The developer is the router: your next_step/2 function is a deterministic state machine that decides what happens next. The LLM is a powerful tool you call at specific points via :synthesize, but it never controls the flow.

This means you can mix deterministic and AI-powered steps in a single episode — gather data with a tool call, classify it with an LLM, then act on the result with another tool call — all under explicit developer control with budget enforcement at every turn:

def next_step(%{phase: :gather} = state, _ctx) do
  {:tool_call, :erp_read, :search_pos, %{status: "STALLED"}}
end
def next_step(%{phase: :classify, po_data: data} = state, _ctx) do
  {:synthesize, %{task: :classify_po, data: data}}
end
def next_step(%{phase: :act, classification: "vendor_delay"} = state, _ctx) do
  {:tool_call, :email_write, :send_followup, build_email(state)}
end

The LLM is powerful, but it's not the control plane. You get repeatability, testability, and full visibility into exactly which steps ran and why — without sacrificing the ability to use AI where it adds value.

Architecture

The examples throughout this README use a client health monitoring system: a ClientHealthActor that evaluates client metrics (MRR, active users, support tickets) on each change and classifies health status, plus a ClientAdvisorActor that synthesizes an LLM-powered summary. See the demo application for the full working implementation.

Supervision tree

YourApp.Supervisor
├── YourApp.Repo
├── Phoenix.PubSub
├── YourApp.Actors.ClientHealthActor (GenServer)
├── YourApp.Actors.ClientAdvisorActor (GenServer)
├── Cyclium.Supervisor
│   ├── Cyclium.ActorSupervisor (DynamicSupervisor)
│   ├── Cyclium.EpisodeSupervisor (DynamicSupervisor)
│   │   └── Cyclium.EpisodeTask (one per running episode)
│   ├── Cyclium.TaskSupervisor (Task.Supervisor)
│   ├── Cyclium.Reconciler (optional — spec change detection)
│   ├── Cyclium.WorkflowEngine (optional — multi-actor workflows)
│   └── Cyclium.Findings.FindingSweep (optional — expiration + escalation)
└── YourAppWeb.Endpoint

Actors are GenServers that subscribe to Bus events and manage episode lifecycle. They're started by the consuming app's supervision tree, above Cyclium's supervisors. When a trigger fires, the actor creates an Episode row and starts an EpisodeTask under the EpisodeSupervisor. Each task resolves a strategy from the registry and runs the episode loop.

Key design principles:

Execution model

Bus event arrives
  → Actor.handle_info matches expectation trigger
  → Check debounce/cooldown → circuit breaker → sample_rate
  → Check concurrency (active < max?)
    → yes: create Episode row, start EpisodeTask under DynamicSupervisor
    → no:  apply overflow policy (queue / drop / shed_oldest)

EpisodeTask starts
  → Resolve strategy (persistent_term registration from actor boot → registry override)
  → strategy.init(episode, trigger)
  → EpisodeRunner.execute_loop:

    ┌─────────────────────────────────────────────┐
    │  check_budget → check_loop → increment_turn │
    │  strategy.next_step(state, ctx)             │
    │    :done         → journal, set done        │
    │    :converge     → run converge pipeline    │
    │    {:tool_call}  → exec tool, handle_result │
    │    {:observe}    → journal, handle_result   │
    │    {:synthesize} → journal, handle_result   │
    │    {:checkpoint} → save state, loop         │
    │    {:approval}   → block, wait for human    │
    │    {:wait}       → block, wait for external │
    │    ...           → loop                     │
    └─────────────────────────────────────────────┘

  Converge pipeline (post_converge):
    1. Persist findings (raise/update/clear) → enrich → Bus events per finding
    2. Route outputs through adapters → dedup by dedupe_key, deliver
    3. Compute final episode status from delivery outcomes
    4. Journal completion/failure step
    5. Project log via LogProjector
    6. Record service levels + check for breach
    7. Record adaptive budget sample (if enabled)
    8. Broadcast episode.completed/failed on Bus
    9. Emit telemetry

Core concepts

Actors

An actor is a GenServer that owns one or more expectations. Each actor watches a domain (e.g., :procurement, :client_health) and fires episodes when triggers match.

defmodule MyApp.Actors.ClientHealthActor do
  use Cyclium.Actor

  actor do
    domain(:client_health)
    spec_rev("v0.1.0")
    max_concurrent_episodes(5)
    episode_overflow(:queue)

    expectation(:client_should_be_healthy,
      strategy: MyApp.Strategies.ClientHealth,
      trigger: {:event, "client.health_check_requested"},
      subject_key: :client_id,
      debounce_ms: :timer.seconds(3),
      budget: %{max_turns: 3, max_tokens: 1_000, max_wall_ms: 10_000}
    )

    expectation(:contract_review,
      strategy: MyApp.Strategies.ContractReview,
      trigger: {:schedule, :timer.hours(24)},
      recovery_policy: :restart,
      budget: %{max_turns: 12, max_tokens: 25_000, max_wall_ms: 120_000}
    )
  end
end

Trigger types:

List triggers allow an expectation to fire from multiple sources. Event subscriptions and schedule timers are extracted from the list automatically. This is the recommended pattern when an expectation participates in a workflow but should also be independently triggerable:

expectation(:client_should_be_healthy,
  strategy: MyApp.Strategies.ClientHealth,
  trigger: [{:event, "client.health_check_requested"}, :workflow],
  subject_key: :client_id,
  debounce_ms: :timer.seconds(3),
  budget: %{max_turns: 3, max_tokens: 1_000, max_wall_ms: 10_000}
)

The actor subscribes to "client.health_check_requested" for standalone use (with debounce), while the :workflow marker documents that workflows can also invoke this expectation. Workflow-triggered episodes bypass the actor GenServer entirely — the workflow engine creates episodes directly — so actor-level debounce/cooldown only applies to event triggers.

Backpressure options (episode_overflow):

Expectation options:

Option Default Description
strategy required Strategy module for this expectation. Declared inline — Cyclium registers it when the actor boots
trigger required What fires the episode. Single trigger or list (e.g. [{:event, "..."}, :workflow])
synthesizernil Synthesizer module override for this expectation. Overrides the actor-level synthesizer(...) declaration
filter%{} Payload predicates — only fire when all match
debounce_msnil Coalesce rapid events into one firing
cooldown_msnil Minimum gap between firings
subject_keynil Payload key to scope debounce/cooldown per subject (e.g., :client_id)
budget%{max_turns: 12, max_tokens: 25_000, max_wall_ms: 120_000} Resource limits
log_strategy:timeline Controls materialized log verbosity AND step journal detail (see below)
outputs[] Declared output types (informational)
resources[] Declared capability dependencies (informational)
audit_level:standard Audit verbosity
retention_days90 How long to keep episode data. Set higher for audit-sensitive workflows (e.g., 365). Retention is declarative — enforcement requires a scheduled cleanup job (not yet built)
sample_ratenil Float 0.0–1.0. When set, episodes fire probabilistically. nil or 1.0 = always fire. Force-fire bypasses sampling
circuit_breakernil Circuit breaker config: %{threshold: 5, half_open_after_ms: 60_000, cancel_in_flight: false}. See Circuit Breaker
adaptive_budgetfalse When true, records episode resource usage for advisory budget recommendations
service_levelsnil Performance objectives: %{max_duration_ms: n, success_rate: 0.95, window_episodes: 20}. See Service Level Tracking
finding_enrichmentnil Post-raise enrichment callback: fn finding, episode -> {:ok, %{summary: ...}} end or {Mod, :fun}
escalation_rulesnil Time-based severity escalation: %{"class" => [%{after_minutes: 60, escalate_to: :high}]}
finding_ttl_secondsnil Default TTL for findings raised by this expectation. Individual findings can override with explicit ttl_seconds

Actor ID convention: Actor IDs are atoms in-process and strings in the database. Use identifier/1 in the DSL to set a stable actor ID that survives module renames:

actor do
  identifier(:client_health)   # explicit, rename-proof
  domain(:health)
  # ...
end

If omitted, the ID is derived from the module name (MyApp.Actors.ClientHealthActor:client_health_actor). The boundary is at episode creation — Cyclium.Actor.Server calls to_string(state.actor_id) when building the episode params. Everything upstream is atoms, everything downstream (DB, strategies, findings) is strings. Actors with persisted episodes should always declare an explicit identifier.

Strategies

A strategy implements the investigation logic for an expectation. It's the brain of an episode — a stateless module that receives state and returns actions.

defmodule MyApp.Strategies.ClientHealth do
  @behaviour Cyclium.EpisodeRunner.Strategy

  @impl true
  def init(_episode, trigger) do
    client_id = trigger.payload["client_id"]
    {:ok, %{client_id: client_id}}
  end

  @impl true
  def next_step(state, _episode_ctx) do
    :converge  # go straight to classification
  end

  @impl true
  def handle_result(state, _step, _result) do
    {:ok, state}
  end

  @impl true
  def converge(state, _episode_ctx) do
    client = MyApp.Clients.get!(state.client_id)
    {class, severity, summary} = classify(client)

    {:ok, %Cyclium.ConvergeResult{
      classification: %{"primary" => class, "severity" => to_string(severity)},
      confidence: 1.0,
      summary: summary,
      findings: [
        {:raise, %{
          actor_id: "client_health_actor",
          finding_key: "client:health:#{client.id}",
          class: class,
          severity: severity,
          confidence: 1.0,
          subject: %{kind: "client", id: client.id},
          subject_kind: "client",
          subject_id: client.id,
          summary: summary,
          evidence_refs: %{"active_users" => client.active_users}
        }}
      ],
      outputs: []
    }}
  end
end

Strategy callbacks:

Callback Purpose
init(episode, trigger) Initialize state from trigger data. Return {:ok, state}
next_step(state, episode_ctx) Decide the next action (see table below)
handle_result(state, step, result) Process a step's outcome. Return {:ok, state}, {:retry, state}, or {:abort, reason}
converge(state, episode_ctx) Produce findings, outputs, and classification. Return {:ok, ConvergeResult}
workflow_result(state, converge_result)(optional) Extract data to pass to downstream workflow steps

next_step return values:

Return Effect
:done Episode complete (skip converge phase)
:converge Run the converge pipeline
{:tool_call, capability, action, args} Call a registered tool capability, pass result to handle_result
{:observe, data} Journal data as an observation step, then pass {:ok, data} to handle_result. This is a synchronous in-process action — no external system is called. Use it to feed data you've already gathered into the strategy's result-handling flow
{:synthesize, prompt_ctx} Request LLM synthesis via app-provided Cyclium.Synthesizer. The synthesizer calls the LLM, and the response flows to handle_result
{:checkpoint, phase_name} Save strategy state for crash recovery
{:output, type, payload} Propose an output inline (outside converge)
{:approval, request} Block episode until human approval
{:wait, external_ref} Block episode until external event resolves

handle_result step kinds:

The step argument passed to handle_result/3 is an %EpisodeStep{} struct. Pattern-match on kind to distinguish which action produced the result:

step.kind Produced by result on success result on failure
:tool_call{:tool_call, capability, action, args}{:ok, tool_return_value}{:error, {error_class, detail}}
:synthesis{:synthesize, prompt_ctx}{:ok, llm_response}{:error, {error_class, detail}}
:observation{:observe, data}{:ok, data}(never fails — data is passed through as-is)

The step struct also carries tool_name (for :tool_call steps) which is useful for per-tool retry tracking:

def handle_result(state, %{kind: :synthesis}, {:ok, result}), do: ...
def handle_result(state, %{kind: :tool_call, tool_name: "erp_read"}, {:ok, data}), do: ...
def handle_result(state, %{kind: :observation}, {:ok, data}), do: ...

Multi-turn strategies

Strategies can run multiple turns before converging. next_step decides actions, handle_result absorbs outcomes — expensive work like LLM calls should be delegated to actions (:synthesize, :tool_call), not done inside handle_result.

Use a :phase field in state to drive progression. This is the recommended pattern — it makes flow explicit, prevents ambiguous matching in handle_result, and avoids accidental loops:

defmodule MyApp.Strategies.ClientAdvisor do
  @behaviour Cyclium.EpisodeRunner.Strategy

  @system_prompt "You are a customer success analyst. Assess the client&#39;s health."

  @impl true
  def init(_episode, trigger) do
    # Load data in init — next_step should be pure routing, not queries
    client_id = trigger.payload["client_id"]
    client = MyApp.Clients.get!(client_id)
    client_data = Map.take(client, [:name, :status, :mrr])
    {:ok, %{phase: :synthesize, client_id: client_id, client_data: client_data, ai_summary: nil}}
  end

  # --- next_step: pure routing based on phase ---

  @impl true
  def next_step(%{phase: :synthesize} = state, _episode_ctx) do
    {:synthesize, %{
      system_prompt: @system_prompt,
      user_message: "Client: #{state.client_data.name}, MRR: $#{state.client_data.mrr}"
    }}
  end

  def next_step(%{phase: :done}, _episode_ctx), do: :converge

  # --- handle_result: ALWAYS guard on phase ---

  @impl true
  def handle_result(%{phase: :synthesize} = state, _step, {:ok, result}) do
    summary = if is_map(result), do: result[:text] || result["text"] || inspect(result), else: inspect(result)
    {:ok, %{state | phase: :done, ai_summary: summary}}
  end

  def handle_result(%{phase: :synthesize} = state, %{kind: :synthesis}, {:error, {_class, _detail}}) do
    # Transient failure — retry the same step (next_step will re-emit :synthesize)
    {:retry, state}
  end

  def handle_result(_state, _step, {:error, reason}) do
    {:abort, reason}
  end

  @impl true
  def converge(state, _episode_ctx) do
    {:ok, %Cyclium.ConvergeResult{
      classification: %{"primary" => "ai_summary", "severity" => "low"},
      confidence: 0.9,
      summary: state.ai_summary,
      findings: [
        {:raise, %{
          actor_id: "client_advisor_actor",
          finding_key: "client:advisor:#{state.client_id}",
          class: "ai_summary",
          severity: :low,
          confidence: 0.9,
          subject_kind: "client",
          subject_id: state.client_id,
          summary: state.ai_summary
        }}
      ],
      outputs: []
    }}
  end
end

Key points:

Multi-step patterns and pitfalls

Always guard handle_result on :phase. The most common multi-step bug is an unguarded handle_result clause that matches too broadly, causing the strategy to cycle between phases instead of progressing:

# BAD — matches ANY success result during any phase
def handle_result(state, _step, {:ok, result}) do
  {:ok, %{state | phase: :done, ai_summary: inspect(result)}}
end

# GOOD — each clause is scoped to its phase
def handle_result(%{phase: :synthesize} = state, _step, {:ok, result}) do
  {:ok, %{state | phase: :done, ai_summary: extract_text(result)}}
end

def handle_result(%{phase: :tool_result} = state, _step, {:ok, data}) do
  {:ok, %{state | phase: :synthesize, gathered: data}}
end

Without phase guards, handle_result matches the wrong clause → phase doesn't advance → next_step re-emits the same action → loop. The budget will eventually kill it, but you'll burn turns for no reason.

Handle the no-synthesizer passthrough. When no Cyclium.Synthesizer is configured (or the registry returns nil for your actor), the runner passes prompt_ctx through as-is to handle_result with {:ok, prompt_ctx}. Your :synthesize phase handler must handle both the LLM response shape AND the raw passthrough:

def handle_result(%{phase: :synthesize} = state, _step, {:ok, result}) do
  summary =
    cond do
      is_binary(result) -> result
      is_map(result) && Map.has_key?(result, :text) -> result.text
      is_map(result) && Map.has_key?(result, "text") -> result["text"]
      true -> inspect(result)  # passthrough — no synthesizer configured
    end

  {:ok, %{state | phase: :done, ai_summary: summary}}
end

Handle both trigger types if your actor can be triggered by workflows. Workflow-created episodes use %Cyclium.Trigger.Workflow{input: ...}, not %Cyclium.Trigger.Event{payload: ...}:

def init(_episode, trigger) do
  client_id =
    case trigger do
      %Cyclium.Trigger.Event{payload: %{client_id: id}} -> id
      %Cyclium.Trigger.Event{payload: payload} -> payload["client_id"]
      %Cyclium.Trigger.Workflow{input: %{client_id: id}} -> id
      %Cyclium.Trigger.Workflow{input: input} when is_map(input) -> input["client_id"]
      _ -> nil
    end

  {:ok, %{phase: :gather, client_id: client_id}}
end

Loop detection

The episode runner automatically detects repeating step cycles. It fingerprints each next_step return value and watches for repeated patterns — a 1-step cycle (A, A, A), a 2-step cycle (A, B, A, B), or longer. When a cycle is detected, the episode fails immediately with error_class: "loop_detected".

This is a safety net, not a substitute for correct phase guards. If loop detection fires, it means your strategy has a bug — fix the root cause (usually a missing phase guard on handle_result).

Note that consecutive steps of the same kind but different data are fine — the fingerprint includes the full action payload via :erlang.phash2/1. A dispatch strategy that emits multiple :observe steps with different entity data won't trigger loop detection. Only identical actions repeated in a cycle will.

Episodes

An episode is one execution of a strategy. It tracks:

Episodes run as Tasks under a DynamicSupervisor — no Oban required. The cyclium_episodes table serves as a durable work queue.

Querying episodes:

Cyclium.Episodes.get!(episode_id)
Cyclium.Episodes.list_by_status([:running, :done, :failed])
Cyclium.Episodes.list_steps(episode_id)   # step journal
Cyclium.Episodes.get_log(episode_id)      # materialized log
Cyclium.Episodes.cancel(episode_id)       # cancellation sequence

# List episodes by actor(s)
Cyclium.Episodes.list_by_actors(["my_actor"], limit: 20, order: :desc)

# Filter by subject — DB-level JSON filtering across trigger types
# Checks trigger_ref.payload.<key> (event-triggered) and
# trigger_ref.input.<key> (workflow-triggered) in a single query.
Cyclium.Episodes.list_by_actors_and_subject(
  ["client_health_actor", "client_advisor_actor"],
  :client_id,
  client.id,
  limit: 20, order: :desc
)

list_by_actors_and_subject/4 detects the repo adapter at runtime and uses the appropriate JSON text extraction — Postgres #>> or SQL Server JSON_VALUE. This is the recommended way to fetch episodes for a specific entity — avoids pulling all episodes and filtering in memory.

Findings

A finding is a persistent observation about an entity. Findings have a lifecycle:

# In your converge/2 callback:
findings: [
  {:raise, %{
    actor_id: "client_health_actor",
    finding_key: "client:health:123",
    class: "churned",
    severity: :high,          # :low | :medium | :high | :critical
    confidence: 1.0,
    subject: %{kind: "client", id: "123"},
    subject_kind: "client",   # denormalized for SQL Server compat
    subject_id: "123",
    summary: "Client has churned",
    evidence_refs: %{"status" => "churned"}
  }},
  {:update, "client:health:123", %{confidence: 0.8}},
  {:clear, "client:health:123"},
  {:clear, "client:health:123", "customer reactivated"}
]

Finding key scoping — deduplicated vs. distinct:

The finding_key controls deduplication. An active finding with the same key is updated in place (last-writer-wins on mutable fields). Choose your key strategy based on intent:

The episode_ctx map passed to converge/2 contains episode_id, so you can reference it directly:

# Deduplicated: one active finding per client, updated each run
def converge(state, _episode_ctx) do
  {:ok, %Cyclium.ConvergeResult{
    findings: [{:raise, %{finding_key: "client:health:#{state.client_id}", ...}}]
  }}
end

# Distinct: one finding per episode run
def converge(state, episode_ctx) do
  {:ok, %Cyclium.ConvergeResult{
    findings: [{:raise, %{finding_key: "po_review:#{state.po_id}:#{episode_ctx.episode_id}", ...}}]
  }}
end

Findings are queried via Cyclium.Findings.active_for/1:

Cyclium.Findings.active_for(actor: "client_health_actor")
Cyclium.Findings.active_for(subject: %{kind: "client", id: "123"})
Cyclium.Findings.active_for(finding_key: "client:health:123")
Cyclium.Findings.active_for(class: "churned")
Cyclium.Findings.active_for(caused_by: "parent:finding:key")

Causality chains: Findings can reference a parent finding via caused_by_key. This enables tracing root causes through chains of related findings:

# Raise a child finding linked to a parent
{:raise, %{finding_key: "vendor:delay:PO-123", caused_by_key: "vendor:health:acme", ...}}

# Query helpers
Cyclium.Findings.caused_by("parent:key")        # direct children
Cyclium.Findings.causal_chain("child:key", 10)   # walk chain upward (max depth)
Cyclium.Findings.root_cause("child:key")         # find root (no caused_by_key)

TTL / Expiration: Findings can auto-expire after a duration. Declare a default TTL on the expectation, or pass ttl_seconds / expires_at per finding:

# Default TTL for all findings raised by this expectation
expectation(:check_temp,
  strategy: MyApp.Strategies.TempCheck,
  trigger: {:event, "sensor.updated"},
  finding_ttl_seconds: 3600
)

# Or override per finding in converge:
{:raise, %{finding_key: "temp:alert:123", ttl_seconds: 7200, ...}}

Expired findings are cleared and active findings are escalated by Cyclium.Findings.FindingSweep, an optional GenServer that runs on a configurable interval:

# config/config.exs
config :cyclium, :finding_sweep, true
config :cyclium, :finding_sweep_interval_ms, 300_000   # 5 minutes (default)
config :cyclium, :finding_sweep_batch_size, 100         # per sweep (default)

Severity escalation: Time-based rules automatically escalate finding severity based on how long a finding has been active. Declare rules on the expectation:

expectation(:check_vendor,
  strategy: MyApp.Strategies.VendorHealth,
  trigger: {:event, "vendor.updated"},
  escalation_rules: %{
    "vendor_delay" => [
      %{after_minutes: 60, escalate_to: :high},
      %{after_minutes: 1440, escalate_to: :critical}
    ]
  }
)

Escalation runs as part of the finding sweep cycle. Each sweep interval, every active finding matching a registered escalation pair (actor + expectation + classes) is loaded from the database and checked against the time-based rules. This means the sweep interval (finding_sweep_interval_ms) controls how often escalation is evaluated — rules with after_minutes granularity finer than the sweep interval won't fire any faster. For expectations with many active findings, keep this in mind when tuning the sweep interval and batch size. Application config (config :cyclium, :escalation_rules) is supported as a fallback.

Post-raise enrichment: An optional callback enriches findings immediately after they're raised. Declare it on the expectation:

expectation(:check_health,
  strategy: MyApp.Strategies.ClientHealth,
  trigger: {:event, "client.updated"},
  finding_enrichment: fn finding, _episode ->
    {:ok, %{summary: "Enriched: #{finding.summary}", confidence: 0.95}}
  end
)

# Or use a module/function tuple:
expectation(:check_health,
  strategy: MyApp.Strategies.ClientHealth,
  trigger: {:event, "client.updated"},
  finding_enrichment: {MyApp.FindingEnricher, :enrich}
)

The callback receives (finding, episode) and returns {:ok, %{...}} or :skip. Only safe fields are applied: evidence_refs, summary, confidence. Errors in the callback are logged — the finding persists unchanged. Application config (config :cyclium, :finding_enrichment) is supported as a fallback.

Outputs

Outputs are typed proposals produced during converge. They flow through the Output Router, which handles deduplication (via dedupe_key) and delivery through app-provided adapters.

# In converge result:
outputs: [
  %Cyclium.OutputProposal{
    type: :email,
    dedupe_key: "alert:client:123:#{Cyclium.Window.bucket(:h4, DateTime.utc_now())}",
    payload: %{to: "team@co.com", subject: "Client 123 churned"},
    requires_approval: false
  }
]

Register adapters in config:

config :cyclium, :output_adapters, %{
  email: MyApp.Adapters.Email,
  slack: MyApp.Adapters.Slack
}

Adapters implement Cyclium.Output.Adapter:

defmodule MyApp.Adapters.Email do
  @behaviour Cyclium.Output.Adapter

  @impl true
  def deliver(:email, payload, _ctx) do
    case MyApp.Mailer.send(payload) do
      :ok -> {:ok, %{message_id: "abc123"}}
      {:error, reason} -> {:error, reason}
    end
  end
end

The adapter registry provides programmatic access:

Cyclium.Output.Adapter.resolve(:email)    # => MyApp.Adapters.Email
Cyclium.Output.Adapter.resolve("slack")   # => MyApp.Adapters.Slack
Cyclium.Output.Adapter.all()              # => [:email, :slack]

Bus

The event bus connects actors, LiveViews, and workflows without coupling. It wraps Phoenix.PubSub.

# Publish a domain event (from your app code):
Cyclium.Bus.broadcast("client.updated", %{client_id: "123"})

# Subscribe to all events (actors do this automatically):
Cyclium.Bus.subscribe()

# Subscribe to a specific event:
Cyclium.Bus.subscribe("episode.completed")

# In a LiveView or GenServer:
def handle_info({:bus, "episode.completed", payload}, socket) do
  # payload contains: episode_id, actor_id, status, workflow_instance_id
end

Runtime events emitted by Cyclium:

Category Events
Episode lifecycle episode.completed, episode.failed, episode.canceled, episode.queued, episode.dropped
Expectations expectation.triggered
Findings finding.raised, finding.updated, finding.cleared
Outputs output.delivered
Workflows workflow.started, workflow.completed, workflow.failed
System spec.updated

Setup

1. Add dependency

# mix.exs
def deps do
  [{:cyclium, path: "../cyclium_ex"}]
end

Dependencies pulled in: ecto, ecto_sql, jason, phoenix_pubsub.

2. Run migrations

# In a migration file:
def up do
  Cyclium.Migrations.V1.up()   # episodes, steps, checkpoints, findings, outputs
  Cyclium.Migrations.V2.up()   # episode_logs
  Cyclium.Migrations.V3.up()   # workflow_instances
  Cyclium.Migrations.V4.up()   # archived_at on episodes and findings
  Cyclium.Migrations.V5.up()   # unique index on episode dedupe_key
  Cyclium.Migrations.V6.up()   # work_claims table for lease-based coordination
  # ...V7 through V13...
  Cyclium.Migrations.V14.up()  # trigger_requests table for deferred execution
  # ...V15 through V18...
  Cyclium.Migrations.V19.up()  # SQL Server: convert legacy TEXT columns to nvarchar(max)
end

def down do
  Cyclium.Migrations.V19.down()
  Cyclium.Migrations.V14.down()
  # ...V13 through V7...
  Cyclium.Migrations.V6.down()
  Cyclium.Migrations.V5.down()
  Cyclium.Migrations.V4.down()
  Cyclium.Migrations.V3.down()
  Cyclium.Migrations.V2.down()
  Cyclium.Migrations.V1.down()
end

Authoring migrations: do not use bare :text. On Ecto.Adapters.Tds it emits SQL Server's legacy non-Unicode TEXT type, which silently replaces emoji and other non-CP1252 characters with ?. Use {:string, size: :max} (which becomes nvarchar(max) on Tds and TEXT on Postgres/SQLite), or branch on repo().__adapter__() for finer control. V19 is the one-shot repair migration for columns that were already declared :text.

3. Configure

# config.exs
config :cyclium, :repo, MyApp.Repo

# Optional: registry for strategy/synthesizer overrides (see "Strategy registry" section)
# config :cyclium, :strategy_registry, MyApp.StrategyRegistry

# Optional: episode runner (default: Cyclium.Runner.OTP)
# Use Cyclium.Runner.Deferred for trigger-only mode (see "Trigger-Only Mode" section)
config :cyclium, :runner, Cyclium.Runner.OTP

# Optional: node identity override for shared-name environments (see "Node Identity")
# config :cyclium, :node_identity, "my-unique-node-name"

# Optional: tool capabilities
config :cyclium, :capability_registry, %{
  erp_read: MyApp.Tools.ERP,
  vendor_api: MyApp.Tools.VendorAPI
}

# Optional: output adapters
config :cyclium, :output_adapters, %{
  email: MyApp.Adapters.Email,
  slack: MyApp.Adapters.Slack
}

# Optional: checkpoint schemas for versioned state migration
config :cyclium, :checkpoint_schemas, %{
  {"client_health_actor", "client_should_be_healthy"} => MyApp.Checkpoints.HealthCheck
}

# Optional: enable reconciler for hot spec changes
config :cyclium, :reconciler, true

# Optional: register workflows
config :cyclium, :workflows, [MyApp.Workflows.ClientReview]

4. Declare strategies on expectations

The preferred approach is declaring the strategy module directly on each expectation in the actor DSL. Cyclium registers the mapping automatically when the actor GenServer boots — no separate registry needed:

defmodule MyApp.Actors.ClientHealthActor do
  use Cyclium.Actor

  actor do
    domain(:client_health)
    spec_rev("v0.1.0")
    synthesizer(MyApp.Synthesizers.ClientHealth)  # actor-level default
    max_concurrent_episodes(5)
    episode_overflow(:queue)

    expectation(:client_should_be_healthy,
      strategy: MyApp.Strategies.ClientHealth,
      trigger: {:event, "client.health_check_requested"},
      subject_key: :client_id,
      debounce_ms: :timer.seconds(3),
      budget: %{max_turns: 3, max_tokens: 1_000, max_wall_ms: 10_000}
    )

    expectation(:client_ai_summary,
      strategy: MyApp.Strategies.ClientAdvisor,
      synthesizer: MyApp.Synthesizers.FastSummary,  # override for this expectation
      trigger: {:event, "client.summary_requested"},
      budget: %{max_turns: 5, max_tokens: 5_000, max_wall_ms: 60_000}
    )
  end
end

Optional: strategy registry for overrides

If you need to override a strategy or synthesizer without changing the actor code — for example, in a staging environment or during an A/B test — you can configure a registry:

# config.exs (optional)
config :cyclium, :strategy_registry, MyApp.StrategyRegistry

defmodule MyApp.StrategyRegistry do
  # Only add clauses for explicit overrides — anything not matched here
  # falls through to the actor&#39;s declared strategy.
  def strategy_for("client_health_actor", _exp), do: MyApp.Strategies.ClientHealthV2
end

5. Supervision tree

# application.ex
children = [
  MyApp.Repo,
  {Phoenix.PubSub, name: MyApp.PubSub},
  {Cyclium.Supervisor, pubsub: MyApp.PubSub},
  MyApp.Actors.ClientHealthActor,
  MyApp.Actors.ClientAdvisorActor,
  MyAppWeb.Endpoint
]

Cyclium.Supervisor starts the DynamicSupervisors, TaskSupervisor, and optionally the Reconciler and WorkflowEngine.

Budgets

Every expectation declares a budget. The runner enforces all three dimensions:

budget: %{
  max_turns: 12,        # loop iterations (incremented every next_step call)
  max_tokens: 25_000,   # LLM token cost (incremented by tool_call results)
  max_wall_ms: 120_000  # wall-clock deadline (enforced via Process.send_after)
}

When any limit is hit, the episode fails with error_class: "budget_exceeded". Wall time is enforced asynchronously — a :budget_wall_exceeded message interrupts the loop even if the strategy is blocked on a tool call.

Deduplication: actor-level vs. strategy-level

Cyclium provides three layers of temporal dedup:

Actor-level global (cooldown_ms) — enforced by the actor GenServer before an episode starts. Simple and zero-cost, but applies globally to the expectation — all subjects are blocked during the window.

expectation(:client_ai_summary,
  strategy: MyApp.Strategies.ClientAdvisor,
  trigger: {:event, "client.summary_requested"},
  cooldown_ms: :timer.minutes(5)  # no advisor episodes for ANY client for 5 min
)

Actor-level per-subject (subject_key + debounce_ms/cooldown_ms) — when subject_key is set, debounce and cooldown are scoped per subject value. Each unique subject gets its own independent trailing-edge timer and cooldown window. Client A and client B are debounced independently.

expectation(:client_ai_summary,
  strategy: MyApp.Strategies.ClientAdvisor,
  trigger: {:event, "client.summary_requested"},
  subject_key: :client_id,
  debounce_ms: :timer.seconds(10),   # trailing-edge, per-client
  cooldown_ms: :timer.minutes(5)     # minimum gap, per-client
)

When subject_key is set but the payload doesn't contain that key, the subject value is nil and the key becomes {expectation_id, nil} — still isolated from real subjects, never crashing.

Strategy-level (Findings.recent?/2) — checked in init/2 using the existing finding for a specific subject. This is DB-backed, so it survives actor restarts unlike the in-memory actor-level dedup.

@summary_cooldown_ms :timer.minutes(5)

def init(_episode, trigger) do
  client_id = trigger.payload["client_id"]
  skip = Cyclium.Findings.recent?("client:advisor:#{client_id}", @summary_cooldown_ms)
  {:ok, %{client_id: client_id, skip: skip}}
end

def next_step(%{skip: true}, _ctx), do: :done

When to use which:

Scenario Use
Rate-limit a high-frequency trigger globally cooldown_ms on the expectation
Coalesce rapid events per subject before firing subject_key + debounce_ms
Minimum gap between runs per subject subject_key + cooldown_ms
Dedup that survives actor restarts (DB-backed) Findings.recent?/2 in init/2
Belt-and-suspenders subject_key + debounce_ms as fast path, Findings.recent?/2 for persistence across restarts

Workflows

Workflows coordinate multiple actors in a dependency graph. Data flows between steps via the workflow_result/2 strategy callback and input: functions on downstream steps.

Defining a workflow

defmodule MyApp.Workflows.ClientReview do
  use Cyclium.Workflow

  workflow do
    trigger {:event, "client.updated"}
    debounce_ms :timer.seconds(3)
    subject_key :client_id

    step :health_check,
      actor: :client_health_actor,
      expectation: :client_should_be_healthy

    step :ai_summary,
      actor: :client_advisor_actor,
      expectation: :client_ai_summary,
      depends_on: [:health_check],
      input: fn _trigger, prior ->
        # prior[:health_check] contains the map returned by
        # the health strategy&#39;s workflow_result/2 callback
        %{client_id: prior[:health_check].client_id}
      end

    on_failure :health_check, policy: :retry, max_step_attempts: 3, backoff_ms: 5_000
    on_failure :ai_summary, policy: :abort
  end
end

Workflow debounce:debounce_ms and subject_key coalesce rapid events before starting the workflow. When subject_key is set, each unique subject value gets its own debounce window — client A and client B debounce independently. Without subject_key, all events for the workflow share a single timer. Each new event resets the debounce window (trailing-edge). This is useful for workflows that trigger on high-frequency events like "entity.updated".

Episode reuse (cross-workflow dedup): By default, workflows reuse recent completed episodes when two workflows trigger the same actor + expectation + input within a 5-minute window. To disable this:

workflow do
  disable_episode_reuse

  trigger {:event, "client.review_requested"}
  step :health_check, actor: :client_health_actor, expectation: :client_should_be_healthy
end

Cancellation cascade: When a workflow fails, pending and retrying steps are automatically canceled. To also clear active findings raised by the workflow's episodes, set clear_findings_on_cancel in the workflow instance metadata.

Passing data between steps

When a workflow step completes, the engine calls the strategy's optional workflow_result/2 callback to extract the data that downstream steps receive via prior. If workflow_result/2 is not implemented, downstream steps receive nil for that step's prior.

defmodule MyApp.Strategies.ClientHealth do
  @behaviour Cyclium.EpisodeRunner.Strategy

  # ... init, next_step, handle_result, converge as usual ...

  # Optional: extract data for downstream workflow steps
  @impl true
  def workflow_result(state, _converge_result) do
    # This map becomes prior[:health_check] in downstream input functions
    %{client_id: state.client_id, classification: state.classification}
  end
end

Configuration and usage

Register workflows in config:

config :cyclium, :workflows, [MyApp.Workflows.ClientReview]

The WorkflowEngine GenServer:

Workflows can also be started manually:

Cyclium.WorkflowEngine.start_workflow(
  MyApp.Workflows.ClientReview,
  %{client_id: "123"},
  []
)

Dynamic workflows

Dynamic workflows can be defined in the database and registered at runtime — no compiled modules required. They follow the same step-dependency model as compiled workflows but use declarative input mappings instead of Elixir functions.

Defining a workflow in the database

Insert a row into cyclium_workflow_definitions:

%Cyclium.Schemas.WorkflowDefinition{
  workflow_id: "vendor_onboarding",
  trigger_type: "event",
  trigger_event: "vendor.registration_submitted",
  steps: Jason.encode!([
    %{
      "id" => "compliance_check",
      "actor_id" => "compliance_monitor",
      "expectation" => "vendor_risk",
      "depends_on" => [],
      "input_map" => %{"vendor_id" => "trigger.vendor_id"},
      "failure_policy" => "abort"
    },
    %{
      "id" => "connector_setup",
      "actor_id" => "integration_actor",
      "expectation" => "setup_connector",
      "depends_on" => ["compliance_check"],
      "input_map" => %{
        "vendor_id" => "trigger.vendor_id",
        "risk_level" => "prior.compliance_check.classification.primary"
      },
      "failure_policy" => "retry",
      "max_step_attempts" => 2,
      "backoff_ms" => 30000
    }
  ]),
  enabled: true
}

Input mapping syntax

Dynamic workflows use dot-notation paths instead of Elixir functions:

Path Resolves to
"trigger.order_id"trigger_ref["order_id"]
"prior.validate.classification.primary"prior[:validate][:classification]["primary"]
"fast" (no prefix) Static value "fast"

Loading dynamic workflows

# At startup — loads all enabled definitions
Cyclium.DynamicWorkflow.Loader.load_all()

# Load a single workflow
Cyclium.DynamicWorkflow.Loader.load("vendor_onboarding")

# Reload after updating the definition in DB
Cyclium.DynamicWorkflow.Loader.reload("vendor_onboarding")

# Unregister a workflow
Cyclium.DynamicWorkflow.Loader.unload("vendor_onboarding")

Starting dynamic workflows manually

Cyclium.WorkflowEngine.start_dynamic_workflow(
  "vendor_onboarding",
  %{"vendor_id" => "v-123"}
)

Dynamic workflows are event-triggered (via Bus) like compiled workflows. The Watcher also listens for workflow_definition.created/updated/disabled events for automatic refresh.

Circuit breaker

Per-expectation circuit breaker prevents cascading failures when a tool or external service is down. When consecutive episode failures exceed a threshold, the circuit opens and rejects new episodes. After a cooldown period, one probe episode is allowed through (half-open state) — if it succeeds, the circuit closes.

expectation(:check_vendor_api,
  strategy: MyApp.Strategies.VendorCheck,
  trigger: {:event, "vendor.updated"},
  circuit_breaker: %{
    threshold: 5,               # consecutive failures to trip
    half_open_after_ms: 60_000, # cooldown before probe
    cancel_in_flight: false     # cancel running episodes when circuit trips
  }
)

States::closed (normal) → :open (rejecting) → :half_open (probe) → :closed

In-flight cancellation: When cancel_in_flight: true, tripping the circuit also cancels any running or blocked episodes for that actor + expectation, preventing wasted work against a known-broken dependency.

Scope: Circuit breakers are node-local (ETS-backed). In a cluster, each node tracks failures independently. This is intentional — a service might be unreachable from one node but fine from another. For cluster-wide coordination, combine with the Bus (circuit breaker events are broadcast).

Force-fired episodes bypass the circuit breaker check.

Query state: Cyclium.CircuitBreaker.get_state(actor_id, expectation_id)

Episode sampling

Probabilistic episode firing for high-frequency triggers. Set sample_rate on an expectation to control what fraction of triggers actually fire episodes:

expectation(:health_check,
  strategy: MyApp.Strategies.MetricsCheck,
  trigger: {:event, "metrics.updated"},
  sample_rate: 0.1  # fire ~10% of triggers
)

Service level tracking

Declarative performance objectives with automatic breach detection. Define success rate and duration thresholds per expectation:

expectation(:process_order,
  strategy: MyApp.Strategies.OrderProcessor,
  trigger: {:event, "order.created"},
  service_levels: %{
    max_duration_ms: 30_000,  # p95 target
    success_rate: 0.95,       # 95% success target
    window_episodes: 20       # rolling window size
  }
)

Breaches emit [:cyclium, :service_levels, :breach] telemetry and a "service_levels.breach" Bus event with details:

%{type: :success_rate, current: 0.85, threshold: 0.95}
%{type: :duration, current: 45_000, threshold: 30_000}

Query metrics: Cyclium.ServiceLevels.metrics(actor_id, expectation_id) returns %{success_rate: f, p95_duration_ms: n, sample_count: n}.

Adaptive budgets

Advisory budget tracking based on historical episode resource usage. When enabled, Cyclium records turns, tokens, and wall time for each completed episode and recommends budgets based on p95 values with 25% headroom.

expectation(:classify_ticket,
  strategy: MyApp.Strategies.TicketClassifier,
  trigger: {:event, "ticket.created"},
  adaptive_budget: true
)

Query recommendations:

# After enough samples (minimum 5):
Cyclium.AdaptiveBudget.recommend(actor_id, expectation_id)
# => %{max_turns: 8, max_tokens: 15_000, max_wall_ms: 25_000}

# Detailed stats:
Cyclium.AdaptiveBudget.stats(actor_id, expectation_id)
# => %{samples: 47, p50: %{...}, p95: %{...}, max: %{...}}

Adaptive budgets are advisory only — they do not automatically adjust episode budgets. Use the recommendations to tune your expectation configs over time.

Logging and observability

Log strategies

Set per-expectation via log_strategy. Controls both what gets stored in step journal columns (args_redacted, result_ref) and what the materialized log renders:

Strategy Step journal args_redacted Step journal result_ref Materialized log
:none omitted omitted none
:summary_only omitted omitted one-line status summary
:timeline tool name + action only summary/IDs only step-by-step with timestamps
:full_debug surviving context after earlier layers full result payload timeline + args, results, errors

Use :full_debug for audit-sensitive workflows where you need to reconstruct exactly what context an LLM had (EOX predictions, SKU classifications). Use :timeline for high-frequency episodes where you want the flow visible without storing full payloads.

Important:log_strategy is the last layer in a pipeline. By the time it runs, earlier layers have already trimmed the data. "Full" in :full_debug means "everything that survived the earlier layers", not necessarily everything the strategy originally passed. See the pipeline below.

The materialized log is built by LogProjector reading back from the already-stored step rows — it never re-processes the original payload. Whatever log_strategy stored is exactly what appears in the log.

Storage pipeline for synthesis steps

Every synthesis step passes through three filtering layers in order before anything reaches the database:

synthesis payload from next_step
        │
        ▼
1. __transient__ stripping   — keys listed in :__transient__ are removed from storage
        │                       but the synthesizer receives the full payload
        ▼
2. tool redact callbacks     — redact/1 and redact_result/1 on tool steps
        │                       (not applicable to synthesis, but runs for tool_call steps)
        ▼
3. log_strategy filtering    — controls final shape of args_redacted / result_ref
        │
        ▼
   cyclium_episode_steps (args_redacted, result_ref)
        │
        ▼
   cyclium_episode_logs (rendered by LogProjector from stored step rows)

Each layer has a different scope:

Layer Controlled by Purpose
__transient__ Strategy (per synthesis call) Pass bulk data to LLM without persisting it
redact/1, redact_result/1 Tool module Trim domain-specific bulky fields from tool steps
log_strategy Expectation Set overall verbosity for the whole episode

Transient synthesis data

Sometimes a strategy needs to pass large data to the synthesizer (full record lists, raw API payloads) that the LLM needs for context but that you don't want persisted in the step journal or materialized log. Mark those keys under :__transient__ in the synthesis payload:

def next_step(state, _ctx) do
  orders = load_orders(state.project_id)

  {:synthesize, %{
    project_id: state.project_id,
    project_name: state.project_name,
    order_count: length(orders),           # small scalar — kept in storage
    orders: serialize_orders(orders),      # full list — synthesizer needs it, storage does not
    evidence: build_evidence(orders),      # small structured summary — kept in storage
    __transient__: [:orders]               # strip :orders before writing args_redacted
  }}
end

The runner passes the full map (minus :__transient__ itself) to synthesizer.synthesize/2, then drops the listed keys before handing off to log_strategy filtering. The synthesizer receives orders; the stored step and rendered log do not, regardless of log_strategy.

This is the right tool when:

It is not a substitute for log_strategy — if you want no storage at all for an episode type, use :none or :summary_only. __transient__ is surgical; log_strategy is wholesale.

Materialized logs are stored in cyclium_episode_logs by Cyclium.LogProjector and can be queried via Cyclium.Episodes.get_log(episode_id).

Telemetry

Cyclium emits 36 structured telemetry events under the [:cyclium, ...] prefix. Attach a handler for development:

Cyclium.Telemetry.attach_default_logger()

Key events:

Event Metadata
[:cyclium, :episode, :completed] episode_id, actor_id, output_count, finding_count
[:cyclium, :episode, :failed] episode_id, actor_id
[:cyclium, :episode, :sampled_out] actor_id, expectation_id
[:cyclium, :step, :tool_call] tool, action, episode_id
[:cyclium, :step, :synthesis] episode_id
[:cyclium, :finding, :raised] finding_key, actor_id, class
[:cyclium, :finding, :cleared] finding_key, actor_id, class
[:cyclium, :finding, :expired] count
[:cyclium, :finding, :escalated] finding_key, actor_id, class
[:cyclium, :finding_sweep, :completed] duration_ms, expired_count, escalated_count, node
[:cyclium, :finding_sweep, :failed] duration_ms, node, reason
[:cyclium, :output, :delivered] type, episode_id
[:cyclium, :actor, :event_received] actor_id, event_type
[:cyclium, :actor, :overflow] actor_id, policy
[:cyclium, :circuit_breaker, :opened] actor_id, expectation_id, consecutive_failures
[:cyclium, :circuit_breaker, :closed] actor_id, expectation_id
[:cyclium, :circuit_breaker, :rejected] actor_id, expectation_id
[:cyclium, :service_levels, :breach] actor_id, expectation_id, type, current, threshold
[:cyclium, :workflow, :step_reused] workflow_id, instance_id, step_id, reused_episode_id

Full list: Cyclium.Telemetry.events/0

Step journal

Every episode action is recorded as an EpisodeStep with one of 16 kinds:

tool_call, synthesis, observation, checkpoint, output_proposed, output_delivered, output_failed, approval_requested, approval_resolved, wait_started, wait_resolved, finding_raised, finding_updated, finding_cleared, episode_completed, episode_failed

Each step records: step_no, kind, tool_name, args_redacted, result_ref, error_class, error_detail, cost_tokens, cost_ms, created_at.

Query steps: Cyclium.Episodes.list_steps(episode_id)

Checkpointing

Strategies can save state mid-episode for crash recovery. Return {:checkpoint, phase_name} from next_step/2 to persist the current state:

def next_step(state, _ctx) do
  if state.phase == :data_collected do
    {:checkpoint, "data_collected"}
  else
    {:tool_call, :erp_read, :read_po, %{"po_id" => state.po_id}}
  end
end

The checkpoint saves the full strategy state map to the cyclium_episode_checkpoints table. On resume, EpisodeTask loads the latest checkpoint by checkpoint_no and passes it to the strategy — execution continues from where it left off.

Checkpoint schema versioning

If your strategy's state shape changes between deploys, register a checkpoint schema to migrate old checkpoints forward:

defmodule MyApp.Checkpoints.HealthCheck do
  use Cyclium.CheckpointSchema, version: 2

  # Migrate from version 1 -> 2
  def migrate(1, state), do: {:ok, Map.put(state, :new_field, nil)}
  def migrate(2, state), do: {:ok, state}
end

Register in config:

config :cyclium, :checkpoint_schemas, %{
  {"client_health_actor", "client_should_be_healthy"} => MyApp.Checkpoints.HealthCheck
}

If migration fails, EpisodeTask falls back to a fresh strategy.init/2 — the episode restarts from scratch.

When to checkpoint vs. restart

Use checkpoints when:

Use restart (no checkpoint) when:

Most strategies don't need checkpoints — recovery_policy: :restart on the expectation handles recovery by re-running the episode from scratch. See the Recovery section below.

Recovery

Cyclium provides built-in recovery for orphaned episodes after server restarts or deploys.

The problem

When a node shuts down (deploy, crash, scaling event), in-flight episodes are killed and left as :running in the database. Without recovery, these episodes stay orphaned forever.

Distributed episode claiming

In a multi-node cluster, all nodes run the same actors and receive the same Bus events via PG2. Without coordination, every node would independently create and run an episode for every trigger — tripling (or more) the work. Cyclium uses DB-based coordination with no Redis or leader election required.

Episode creation: dedupe_key

When a trigger fires, every node's actor calls maybe_fire_episode. Before inserting, the actor generates a deterministic dedupe_key:

A filtered unique index on dedupe_key (WHERE dedupe_key IS NOT NULL AND archived_at IS NULL) ensures only one node's insert succeeds. The other nodes receive a constraint violation and silently skip — no episode created, no work duplicated. Archived episodes are excluded from the constraint so that re-triggered work isn't blocked by old runs.

A random jitter (0-200ms) before insert spreads winners evenly across nodes, preventing one faster node from consistently claiming all episodes:

# In Actor.Server.maybe_fire_episode/3
Process.sleep(:rand.uniform(200))
enqueue_episode(state, episode_params)

The constraint violation is caught in enqueue_episode using the same pattern as the Output router:

{:error, %Ecto.Changeset{} = cs} ->
  if has_dedupe_violation?(cs) do
    Logger.debug("[#{state.actor_id}] Dedupe skip: #{params.dedupe_key}")
  end
  state

Recovery claims: optimistic update

After a restart, all surviving nodes run Cyclium.Recovery.sweep/1. Each node sees the same list of stale episodes, but only one node can claim each episode:

# Episodes.claim_for_recovery/1
from(e in Episode,
  where: e.id == ^episode_id and e.status == :running and is_nil(e.archived_at)
)
|> repo().update_all(set: [phase: "recovering"])

This is an atomic UPDATE ... WHERE — the first node to execute it sets phase: "recovering" and gets {1, _} back. All other nodes get {0, _} (no rows affected) and skip. No locks, no races, no distributed coordination protocol needed.

Summary of coordination guarantees

Scenario Mechanism Guarantee
Same trigger on N nodes dedupe_key unique index Exactly one episode created
Same orphan on N nodes Optimistic UPDATE ... WHERE Exactly one node recovers it
Archived episodes Filtered unique index excludes archived_at IS NOT NULL Re-triggering is not blocked by old runs
Node with lower latency Random jitter (0-200ms) Winners distributed evenly over time

Recovery sweep

Cyclium.Recovery.sweep/1 finds stale :running episodes and recovers them:

  1. Query episodes where the most recent step journal entry is older than stale_after_ms (default: 2 minutes)
  2. Attempt optimistic claim on each stale episode
  3. If claimed, apply the expectation's recovery_policy:
    • :restart — enqueue fresh (re-runs strategy.init/2)
    • :fail (default) — mark :failed with error_class: "orphaned"
  4. Emit [:cyclium, :recovery, :sweep] telemetry with counts

Policy resolution checks the compiled :actor_registry map first, then falls back to the cyclium_agent_definitions table for dynamic actors. Unknown actors default to :fail.

Workflow reconciliation

Cyclium.Recovery.reconcile_workflows/0 handles a related problem: workflow instances stuck in :running because the WorkflowEngine missed a Bus event during a restart.

The WorkflowEngine is purely event-driven — it advances workflows when it receives episode.completed or episode.failed Bus events. If the engine wasn't running when those events fired (e.g. during a deploy), the workflow step_states become stale and the workflow hangs.

Reconciliation fixes this by:

  1. Finding all :running / :blocked workflow instances
  2. For each step marked "running" in step_states, loading the actual episode
  3. If the episode has already reached a terminal state, re-broadcasting the appropriate Bus event
  4. The WorkflowEngine handles the replayed event through its normal path — no special logic needed

Call reconcile_workflows/0aftersweep/1 and after workflow configs are registered (compiled modules booted, dynamic workflows loaded).

Setting recovery policy

Add recovery_policy to the expectation DSL:

actor do
  expectation(:evaluate_project,
    strategy: MyApp.Strategies.ProjectHealth,
    trigger: {:event, "project_health.check_requested"},
    recovery_policy: :restart,
    budget: %{max_turns: 5, max_tokens: 25_000, max_wall_ms: 120_000}
  )
end

Use :restart for idempotent strategies that re-query data from the DB. Use :fail (default) for strategies with non-idempotent side effects where automatic recovery could cause harm.

Wiring up recovery in your app

Add a delayed recovery task to your Cyclium supervisor with an :actor_registry map that maps actor identifier() strings to their modules. Cyclium looks up the recovery_policy from each actor's compiled expectations automatically. Dynamic actors not in the registry are resolved from the DB.

# Maps identifier (as DB string) → actor module for recovery sweep.
# Must match the identifier() declared in each actor&#39;s DSL block.
@actor_registry %{
  "project_health_actor" => MyApp.Actors.ProjectHealthActor,
  "client_health_actor" => MyApp.Actors.ClientHealthActor
}

children = [
  {Cyclium.Supervisor, pubsub: MyApp.PubSub},
  MyApp.Actors.ProjectHealthActor,
  MyApp.Actors.ClientHealthActor,
  {Task, fn ->
    # Wait for cluster to settle after deploy
    Process.sleep(:timer.minutes(2))
    Cyclium.Recovery.sweep(actor_registry: @actor_registry)
    Cyclium.Recovery.reconcile_workflows()
  end}
]

For custom policy logic, pass :resolve_policy instead:

Cyclium.Recovery.sweep(
  resolve_policy: fn episode ->
    if episode.actor_id == "critical_actor", do: :restart, else: :fail
  end
)

Deploy sequence

  1. Node gets SIGTERM -> 30s graceful shutdown timeout
  2. Episodes trap exits, try to finish current step within remaining time
  3. Episodes that don't finish stay :running in DB
  4. After boot, each node waits 2 minutes before running recovery sweep
  5. Sweep finds stale episodes via step journal recency
  6. First node to claim each orphan via optimistic update handles recovery
  7. Workflow reconciliation replays missed Bus events for stale workflow steps

Step Retry Helper

Cyclium.Strategy.Retry provides a lightweight helper for retrying failed steps within an episode. This is distinct from workflow-level retry (on_failure :step, :retry) which retries entire episodes — the step retry helper retries individual steps (e.g. a synthesis call) within a single episode run.

The problem

When a strategy calls :synthesize and the LLM provider returns a transient error (timeout, rate limit, 503), the strategy needs to retry. Without a helper, you'd manually track attempt counts in the state map:

# Without helper — manual retry tracking
def handle_result(state, %{kind: :synthesis}, {:error, _}) do
  if state[:synthesis_retries] < 3 do
    {:retry, Map.update(state, :synthesis_retries, 1, &(&1 + 1))}
  else
    {:abort, "synthesis_failed"}
  end
end

This is error-prone (forgetting to reset counters, tracking multiple step types, off-by-one errors).

Using Cyclium.Strategy.Retry

alias Cyclium.Strategy.Retry

# On success — reset the counter so future failures get fresh attempts
def handle_result(state, %{kind: :synthesis}, {:ok, result}) do
  {:ok, state |> Retry.reset(:synthesis) |> Map.put(:assessment, result)}
end

# On failure — retry up to 3 times with 2-second backoff
def handle_result(state, %{kind: :synthesis} = step, {:error, _}) do
  case Retry.check(state, step, max_attempts: 3, backoff_ms: 2_000) do
    {:retry, new_state}              -> {:retry, new_state}
    {:give_up, _attempts, new_state} -> {:abort, "synthesis_failed_after_retries"}
  end
end

When handle_result returns {:retry, state}, the runner calls do_loopnext_step again. The strategy should naturally re-emit the same step type (e.g. :synthesize) since its phase/state hasn't changed — only the internal __retries counter was updated.

Options

Option Default Description
:max_attempts3 Total attempts including the original
:backoff_ms0 Milliseconds to sleep before retry
:step_keystep.kind Key for tracking — use custom keys to track retries per tool name or phase

Custom step keys

Track retries separately for different tool calls within the same episode:

def handle_result(state, %{kind: :tool_call, tool_name: tool} = step, {:error, _}) do
  case Retry.check(state, step, step_key: {:tool, tool}, max_attempts: 2) do
    {:retry, new_state}              -> {:retry, new_state}
    {:give_up, _attempts, new_state} -> {:ok, %{new_state | phase: :skip_tool}}
  end
end

API reference

Retry layers summary

Layer Scope Mechanism Backoff Limit
Step retry (Strategy.Retry) Within one episode handle_result returns {:retry, state} Optional (backoff_ms) max_attempts per step key
Episode budget Within one episode Runner checks max_turns, max_tokens, max_wall_ms N/A Budget exhaustion
Workflow retry (on_failure) Across episodes WorkflowEngine creates new episode backoff_ms (default 5s) max_step_attempts (default 3)
Crash recovery After restart Recovery.sweep re-enqueues or fails N/A One attempt per recovery_policy

Work Claims (Distributed Lease Coordination)

For clusters where multiple applications share the same database and actor definitions, work claims provide lease-based coordination to ensure at-most-once execution.

How it works

  1. Before executing an episode, EpisodeTask calls WorkClaims.gate_acquire/3 with the episode's dedupe_key
  2. If claimed successfully, a Heartbeat GenServer renews the lease periodically (every lease/3 seconds)
  3. On completion, the claim is marked :done; on crash, :failed
  4. If a node dies, the lease expires and another node can steal it

Work claims also coordinate trigger request dispatch — TriggerRequests.Poller acquires a claim per request before dispatching to prevent multiple full-mode nodes from processing the same deferred episode.

Configuration

# Use the built-in Ecto-based implementation:
config :cyclium, work_claims: Cyclium.WorkClaims.EctoClaims

# Or a SQL Server-optimized adapter in consuming apps:
config :cyclium, work_claims: MyApp.WorkClaims.SqlServer

# Lease duration (default: 120 seconds)
config :cyclium, work_claims_lease_seconds: 180

# Or omit work_claims entirely — no claiming, fully backwards compatible

When unconfigured, all gate_* functions return passthrough values with zero overhead.

Writing a custom adapter

Implement the Cyclium.WorkClaims behaviour with 5 callbacks:

defmodule MyApp.WorkClaims.SqlServer do
  @behaviour Cyclium.WorkClaims

  @impl true
  def acquire(dedupe_key, owner_node, opts) do
    # Use hints: ["UPDLOCK"] for SQL Server lock acquisition
    # Transaction-based: read with lock, then insert or update
  end

  @impl true
  def renew(dedupe_key, owner_node, lease_seconds), do: # ...
  def complete(dedupe_key, owner_node), do: # ...
  def fail(dedupe_key, owner_node, error_detail), do: # ...
  def reclaim_expired(limit), do: # ...
end

The default EctoClaims implementation uses plain transactions (no lock hints) and works with any Ecto adapter. For SQL Server, a custom adapter can use hints: ["UPDLOCK"] on the read query inside the transaction for stronger concurrency guarantees.

Database table

The cyclium_work_claims table is created by V6 migration:

Column Type Notes
dedupe_key string(512) Unique — matches the episode's dedupe_key
state string(32) claimed, done, failed, expired
owner_node string(255) Node holding the lease
lease_until utc_datetime When the lease expires
attempt integer Incremented on each steal/reclaim

Integration with recovery

When work claims are configured, Recovery.sweep/1 uses gate_acquire to coordinate across nodes before claiming orphaned episodes. This provides two layers of coordination: the work claim lease prevents concurrent execution, and the optimistic claim_for_recovery update prevents duplicate recovery actions.

Lease tuning

The lease duration (work_claims_lease_seconds, default: 120s) controls the trade-off between availability and the "zombie window" — the time between a node crash and another node stealing the work.

Setting Zombie window Heartbeat interval Good for
60s ~60s ~20s Short tasks, fast failover
120s (default) ~120s ~40s Most workloads
300s ~5min ~100s Long-running tasks, flaky networks

Guidelines:

Heartbeat failure modes

The heartbeat GenServer is linked to the EpisodeTask process:

Idempotency guidance: Since lease expiry can cause a second node to start the same work, strategies that perform side effects should use idempotency keys. For DB writes, use unique constraints keyed by the episode's dedupe_key or step number. For external API calls, include an idempotency header derived from the episode ID + step number.

Telemetry events

All events are prefixed with [:cyclium, :work_claims, ...]:

Event Measurements Metadata Meaning
:acquiredcount, duration_msdedupe_key, owner_node Fresh claim acquired
:stealcount, duration_msdedupe_key, owner_node Expired claim reclaimed (attempt > 1)
:busycount, duration_msdedupe_key, owner_node Claim denied — another node holds it
:renewedcountdedupe_key, owner_node Heartbeat renewal succeeded
:renew_failedcountdedupe_key, owner_node Heartbeat renewal failed (lost ownership)
:completedcountdedupe_key, owner_node Work finished, claim released
:failedcountdedupe_key, owner_node Work failed, claim released

Key metrics to alert on:

Testing work claims

Unit tests: Use Cyclium.WorkClaims.FakeClaims — an Agent-backed in-memory implementation:

setup do
  {:ok, _} = Cyclium.WorkClaims.FakeClaims.start_link()
  Application.put_env(:cyclium, :work_claims, Cyclium.WorkClaims.FakeClaims)
  on_exit(fn -> Application.delete_env(:cyclium, :work_claims) end)
end

test "second acquire is busy" do
  assert {:ok, _} = Cyclium.WorkClaims.gate_acquire("key:1", "node-a")
  Cyclium.WorkClaims.FakeClaims.set_busy("key:2")
  assert {:error, :busy} = Cyclium.WorkClaims.gate_acquire("key:2", "node-b")
end

Integration tests (single node): Configure EctoClaims with your test repo. Verify:

  1. Two concurrent acquire calls on the same key — one succeeds, one gets :busy
  2. After complete, a new acquire on the same key succeeds (reclaim)
  3. After lease expiry (set a short lease), acquire steals from the previous owner

Multi-node tests: Deploy to a staging cluster with 2+ nodes. Use a test actor with a short schedule:

  1. Verify only one node's episode runs (check owner_node in cyclium_work_claims)
  2. Kill a node mid-episode, wait for lease expiry, verify another node steals and completes
  3. Monitor telemetry — steal events should only appear after the kill, never during normal operation

Node Identity

By default, Cyclium uses node() to identify the current BEAM instance for work claims, trigger requests, and recovery coordination. In environments where multiple instances share the same Erlang node name (e.g., dev containers all starting as app@app), this breaks lease semantics — every node looks like the same owner.

Cyclium.NodeIdentity provides a pluggable identity layer:

# Static override — set per instance via config or env var
config :cyclium, :node_identity, "dev-jane"

# MFA callback for dynamic resolution (hostname, env var, etc.)
config :cyclium, :node_identity, {MyApp.NodeIdentity, :resolve, []}

When unconfigured and running in non-distributed mode (:nonode@nohost), a random stable identity is generated per BEAM instance and stored in :persistent_term — unique for the process lifetime but not across restarts.

All work claim operations (EpisodeTask, Heartbeat, Runner.Deferred, TriggerRequests.Poller) use Cyclium.NodeIdentity.name() instead of raw node().

Multi-Stack Deployments

A stack is one logical cyclium cluster that shares a database with other clusters but runs its own Elixir nodes (its own Phoenix.PubSub, its own in-memory persistent_term / ETS registries). Typical reasons to run multiple stacks against one schema: independent release cadences, blast-radius isolation, or partitioning actors across regions.

The library contract is cluster-level

Cyclium itself exposes no per-actor DSL option for stacks. It reads :stack_slug once per cluster, stamps every row its actors produce with that value, and scopes Recovery to the matching slug. Which actors actually run on a given cluster is the consumer's decision, implemented in the host app's supervisor.

Episodes, workflow instances, and deferred trigger requests are stamped with the current source_stack at insert time. Cyclium.Recovery.sweep/1 and Cyclium.Recovery.reconcile_workflows/1 read :stack_slug by default and only scan rows from their own stack — this prevents a crashed cluster's work from being re-driven on a cluster whose persistent_term / PubSub state doesn't know about it.

Partitioning actors across stacks

The simplest approach is one actor list per deployment:

# On the stack_a cluster&#39;s host app:
config :cyclium, :stack_slug, System.get_env("CYCLIUM_STACK_SLUG")   # "stack_a"
config :my_app, :cyclium_actors, [StackAOnlyActor, SharedActor]

# On the stack_b cluster&#39;s host app:
config :cyclium, :stack_slug, System.get_env("CYCLIUM_STACK_SLUG")   # "stack_b"
config :my_app, :cyclium_actors, [StackBOnlyActor, SharedActor]

A richer approach declares the allowed stacks on each actor's child-spec and has the supervisor filter the list at init time — useful when the same release is deployed to every cluster and you want a single source of truth for which actor runs where:

config :my_app, :cyclium_actors, [
  {SharedActor, []},
  {StackAOnlyActor, stacks: [:stack_a]},
  {CrossStackActor, stacks: [:stack_a, :stack_b]}
]

# In the supervisor&#39;s init/1:
stack = Application.get_env(:cyclium, :stack_slug)
children = Enum.filter(configured_actors, &actor_runs_on_stack?(&1, stack))

Either pattern is fine — cyclium doesn't care how the list was built, only what actually gets supervised. Because each cluster has its own node processes, PubSub, and persistent_term cache, a stack-local actor's strategy / budget / log-strategy lookups only exist on the cluster that supervises it — which is exactly why Recovery must be stack-scoped.

Runtime configuration

For a single release that can be deployed into multiple stacks, drive the slug from an env var in runtime.exs:

# runtime.exs
config :cyclium, :stack_slug, System.get_env("CYCLIUM_STACK_SLUG")

Leaving :stack_slug unset (or nil) is the single-stack default: rows are stamped NULL and Recovery scans without a stack filter. Pre-migration rows with NULLsource_stack are swept by any stack for one release so legacy episodes aren't orphaned — once all rows have been stamped, you can tighten Recovery by stamping a real slug on every cluster.

Related config

Trigger-Only Mode (Deferred Execution)

In shared environments — dev machines on a common test DB, QC/sandbox instances, CI — running cyclium actors on every node leads to competing work claims and unpredictable execution. Conversely, disabling cyclium entirely on non-processing nodes leaves the UI hobbled: episodes never fire, statuses never update.

Trigger-only mode solves both problems by decoupling event processing from episode execution.

Three operating modes

Mode Actors start? Events flow? Episodes execute locally? Trigger requests written?
:full yes yes yes no (direct)
:trigger_only yes yes no yes (to DB)
:disabled no no no no

How it works

In :trigger_only mode, the actor supervision tree starts normally — Bus subscriptions, schedule timers, debounce, circuit breakers all work. But the runner is swapped to Cyclium.Runner.Deferred, which writes a row to cyclium_trigger_requests instead of spawning a Task. The episode record is still created in the DB so the UI can display it.

On :full mode nodes, a Cyclium.TriggerRequests.Poller watches the trigger requests table and dispatches deferred episodes to Runner.OTP for local execution. The poller uses WorkClaims.gate_acquire/3 (with dedupe key "trigger_request:<id>") to coordinate dispatch across nodes — only one full-mode node will pick up a given request. If work claims are not configured, the poller falls through to passthrough mode. The poller can be scoped by source_stack to only pick up requests from specific stacks.

Configuration

# Host app config — set per environment
config :my_app, :cyclium_mode, :full          # :full | :trigger_only | :disabled

# On full-mode nodes: enable the poller
config :cyclium, :trigger_poller, true
config :cyclium, :trigger_poll_interval_ms, 5_000      # default
config :cyclium, :trigger_poll_source_stack, "stack_a"   # nil = pick up all

# On trigger-only nodes: runner is set automatically
# config :cyclium, :runner, Cyclium.Runner.Deferred
# config :cyclium, :stack_slug, :stack_a

Deployment scenarios

Dev machines + shared test DB:

Sandbox / feature-branch testing:

Production (unchanged):

Database table

The cyclium_trigger_requests table (V14 migration):

Column Type Notes
episode_id binary_id FK to cyclium_episodes
actor_id string Actor that created the trigger
expectation_id string Expectation that fired
source_node string Node identity of the trigger-only instance
source_stack string Stack slug for scoped polling
status string pending, claimed, completed, expired
opts map Runner options (e.g., resume flag)
claimed_by string Node identity of the full-mode instance

Indexed on (status, inserted_at) for efficient polling.

Runtime mode switching

Cyclium.Mode supports live mode changes without restart — both node-wide and per-actor:

# Switch the whole node (via remote console, admin endpoint, etc.)
Cyclium.Mode.set(:trigger_only)   # stop local execution, defer to DB
Cyclium.Mode.set(:full)           # resume local execution + polling

# Per-actor override — yield one actor to another node while keeping the rest
Cyclium.Mode.set_actor_override(:client_health, :trigger_only)
Cyclium.Mode.clear_actor_override(:client_health)
Cyclium.Mode.clear_all_overrides()

# Inspect current state
Cyclium.Mode.status()
# %{node_mode: :full, overrides: %{client_health: :trigger_only}, node_identity: "..."}

Mode reads are ETS-backed (read_concurrency: true) for zero overhead in hot paths. The trigger request poller self-gates on each cycle — it only polls when the node-wide mode is :full.

Dynamic Actors

Dynamic actors allow agent definitions to be stored in the database and hydrated into running supervised processes at runtime — without requiring compiled Elixir modules.

When to use dynamic actors

How it works

A single Cyclium.DynamicActor GenServer module serves all DB-defined actors. Each instance is started with different config/expectations args under Cyclium.ActorSupervisor. This avoids runtime module pollution — no Module.create/3 needed.

Cyclium.ActorSupervisor (DynamicSupervisor)
├── MyApp.Agents.CompiledActor    (compiled, use Cyclium.Actor)
├── Cyclium.DynamicActor          (from DB: "user_monitor_1")
└── Cyclium.DynamicActor          (from DB: "user_monitor_2")

Defining an agent in the database

Insert a row into cyclium_agent_definitions:

%Cyclium.Schemas.AgentDefinition{
  actor_id: "custom_health_check",
  domain: "monitoring",
  strategy_template: "observe_classify_converge",   # built-in template
  config: Jason.encode!(%{max_concurrent_episodes: 3, episode_overflow: "queue"}),
  expectations: Jason.encode!([
    %{
      id: "check_target",
      trigger: %{type: "schedule", interval_ms: 300_000},
      budget: %{max_turns: 5, max_tokens: 10_000, max_wall_ms: 60_000},
      log_strategy: "timeline"
    }
  ]),
  enabled: true
}

Loading dynamic actors

# At application startup — loads all enabled definitions
Cyclium.DynamicActor.Loader.load_all()

# Load a single actor
Cyclium.DynamicActor.Loader.load("custom_health_check")

# Reload after updating the definition in DB
Cyclium.DynamicActor.Loader.reload("custom_health_check")

# Stop a dynamic actor
Cyclium.DynamicActor.Loader.stop("custom_health_check")

Database table

cyclium_agent_definitions (V7 migration):

Column Type Notes
id uuid PK
actor_id string(255) Unique identifier
domain string(255) Grouping domain
config text (JSON) max_concurrent_episodes, episode_overflow, etc.
expectations text (JSON) Array of expectation definitions
strategy_ref string(255) Strategy module or registry lookup key
strategy_template string(255) Template name (e.g. "observe_synthesize_converge")
strategy_config text (JSON) Template parameters (gatherer, system_prompt, finding_config, etc.)
enabled boolean Soft toggle
created_by string(255) User/tenant

Strategy templates (data-driven strategies)

Dynamic actors use strategy templates — built-in parameterized strategy modules that are configured via strategy_config JSON in the DB. The compiled app defines what data sources (gatherers) and outputs are available; the DB definition composes them.

Template Pattern Use Case
"observe_synthesize_converge" Gather → LLM → Finding Health checks, advisors, analysis
"observe_classify_converge" Gather → Rules → Finding Threshold/rule-based monitoring
"dispatch" Load entities → Broadcast events Fan-out triggers

Gatherers

Gatherers are compiled modules that know how to collect domain-specific data. The compiled app implements and registers them:

defmodule MyApp.Gatherers.ProjectData do
  @behaviour Cyclium.Gatherer

  @impl true
  def gather(trigger_payload, _opts) do
    project_id = trigger_payload["project_id"]
    project = Repo.get!(Project, project_id)
    orders = load_orders(project_id)
    {:ok, %{project: project, orders: orders, order_count: length(orders)}}
  end
end

Register in app config:

config :cyclium, :gatherer_registry, %{
  "project_data" => MyApp.Gatherers.ProjectData,
  "client_metrics" => MyApp.Gatherers.ClientMetrics
}

Observe → Synthesize → Converge

The main workhorse. Gathers data, sends to LLM, maps result to findings:

%AgentDefinition{
  actor_id: "project_health_dynamic",
  strategy_template: "observe_synthesize_converge",
  strategy_config: Jason.encode!(%{
    "gatherer" => "project_data",
    "system_prompt" => "You are a project health analyst. Evaluate the project data and classify its health status.",
    "finding_config" => %{
      "actor_id_field" => "project_health_dynamic",
      "finding_key_template" => "project:health:${subject_id}",
      "class_field" => "class",
      "severity_field" => "severity",
      "summary_field" => "summary",
      "subject_kind" => "project",
      "subject_id_key" => "project_id"
    },
    "outputs" => ["email"]
  }),
  expectations: Jason.encode!([
    %{id: "evaluate", trigger: %{type: "event", event_type: "project.check_requested"}}
  ])
}

Observe → Classify → Converge

Rule-based classification without LLM. Rules are evaluated in order, first match wins:

%AgentDefinition{
  actor_id: "client_risk_monitor",
  strategy_template: "observe_classify_converge",
  strategy_config: Jason.encode!(%{
    "gatherer" => "client_metrics",
    "classify_rules" => [
      %{"field" => "mrr", "op" => "lt", "value" => 500, "class" => "at_risk", "severity" => "high"},
      %{"field" => "last_login_days_ago", "op" => "gt", "value" => 30, "class" => "inactive", "severity" => "medium"}
    ],
    "default_class" => "healthy",
    "default_severity" => "low",
    "finding_config" => %{
      "finding_key_template" => "client:risk:${subject_id}",
      "subject_kind" => "client",
      "subject_id_key" => "client_id"
    }
  })
}

Rule operators: lt, gt, eq, neq, in, not_in.

Dispatch

Fan-out pattern. Calls a gatherer that returns a list of entities, broadcasts an event for each:

%AgentDefinition{
  actor_id: "project_dispatch",
  strategy_template: "dispatch",
  strategy_config: Jason.encode!(%{
    "gatherer" => "active_projects",
    "event_type" => "project.check_requested",
    "entity_id_field" => "id",
    "entity_payload_fields" => ["id", "name"]
  })
}

Strategy resolution for dynamic actors

Dynamic actors use the same :persistent_term registration path as compiled actors. When a dynamic actor boots, Loader resolves the strategy from strategy_template and injects it into each expectation — init_state_from_config then registers it automatically. No strategy registry entries needed.

If you need to override a strategy without updating the DB record, add a strategy_for/2 clause to your registry as usual.

Custom templates can be registered in app config:

config :cyclium, :strategy_templates, %{
  "my_custom_template" => MyApp.Strategies.CustomTemplate
}

Lifecycle and draining

Safe lifecycle operations for updating dynamic actors without losing in-flight episodes.

Drain and reload

# Graceful: waits for active episodes to finish, then reloads from DB
Cyclium.DynamicActor.Lifecycle.drain_and_reload("my_monitor")

# Graceful stop (waits for episodes)
Cyclium.DynamicActor.Lifecycle.drain_and_stop("my_monitor")

# Instant stop/reload (existing behavior, may lose in-flight episodes)
Cyclium.DynamicActor.Loader.stop("my_monitor")
Cyclium.DynamicActor.Loader.reload("my_monitor")

Event-driven refresh

Start the optional Watcher in your supervision tree for automatic refresh:

children = [
  # ... your app ...
  Cyclium.DynamicActor.Watcher
]

Then broadcast events when definitions change:

# Agent definitions:
Cyclium.Bus.broadcast("agent_definition.created", %{actor_id: "my_monitor"})
Cyclium.Bus.broadcast("agent_definition.updated", %{actor_id: "my_monitor"})
Cyclium.Bus.broadcast("agent_definition.disabled", %{actor_id: "my_monitor"})

# Workflow definitions:
Cyclium.Bus.broadcast("workflow_definition.created", %{workflow_id: "onboarding"})
Cyclium.Bus.broadcast("workflow_definition.updated", %{workflow_id: "onboarding"})
Cyclium.Bus.broadcast("workflow_definition.disabled", %{workflow_id: "onboarding"})

The Watcher handles each event appropriately — created loads, updated reloads, disabled stops/unloads. For actors, updates use drain-and-reload to preserve in-flight episodes.

Deploy patterns

Rolling deploy:

# In application stop callback or shutdown hook:
Cyclium.DynamicActor.Lifecycle.stop_all(drain: true, timeout: 30_000)

Blue-green: The new instance calls Loader.load_all() on startup. Global name registration ensures only one instance runs per actor across the cluster.

Dry Runs / Simulations

Dry runs let you test what an actor would do without producing real findings, outputs, or side effects. Useful for validating agent configurations and "what if" testing.

How dry runs work

An episode with mode: "dry_run":

Force-firing a dry run

# Simplest: real tool calls and synthesis, skip persist
Cyclium.Episodes.force_fire("project_health_actor", "evaluate_project",
  mode: :dry_run,
  trigger_payload: %{project_id: 123}
)

# With mock overrides — skip real API calls
Cyclium.Episodes.force_fire("project_health_actor", "evaluate_project",
  mode: :dry_run,
  trigger_payload: %{project_id: 123},
  overrides: %{
    tool_overrides: %{"erp.get_orders" => [%{id: 1, status: "complete"}]},
    synthesis_override: %{"class" => "healthy", "severity" => "low"}
  }
)

Override resolution (layered)

Three sources of overrides, checked in priority order:

fire-time overrides > expectation-level DSL > real execution
  1. Fire-time overrides — passed to force_fire/3 as :overrides option
  2. Expectation-level DSL — defined in the actor definition:
expectation(:evaluate_project,
  strategy: MyApp.Strategies.ProjectHealth,
  trigger: {:event, "project_health.check_requested"},
  dry_run: [
    tool_overrides: %{
      {"erp", "get_orders"} => {:ok, %{orders: []}}
    },
    synthesis_override: {:ok, %{"class" => "healthy"}}
  ]
)
  1. No overrides — real tool calls and synthesis execute normally, only findings and outputs are skipped

Persisting findings in dry runs

By default, dry run findings are journaled but not persisted to the DB. You can opt in to persistence with prefixed keys so dry run findings don't collide with live ones:

# Persist with default "dry_run" prefix (finding_key becomes "dry_run:po_stalled:PO-123")
Cyclium.Episodes.force_fire("po_monitor", "check_pos",
  mode: :dry_run,
  overrides: %{persist_findings: true}
)

# Persist with custom prefix (finding_key becomes "experiment1:po_stalled:PO-123")
Cyclium.Episodes.force_fire("po_monitor", "check_pos",
  mode: :dry_run,
  overrides: %{persist_findings: "experiment1"}
)

The prefix is also applied to actor_id on persisted findings, so Findings.active_for(actor: "po_monitor") won't return dry run findings — use Findings.active_for(actor: "dry_run:po_monitor") instead, or use the mode-aware helper:

# Automatically prefixes filters when episode is a dry run with persist_findings enabled:
Cyclium.Findings.active_for_mode([actor: "po_monitor"], episode)

This can also be set at the expectation level in the actor DSL:

expectation(:check_pos,
  strategy: MyApp.Strategies.PoCheck,
  trigger: {:schedule, 300_000},
  dry_run: [persist_findings: true]
)

Via the Actor GenServer

You can also fire dry runs through the actor's message interface:

GenServer.cast(MyApp.Agents.ProjectHealth, {:force_fire, :evaluate_project, mode: :dry_run})

Dry run results

The episode completes with full step journal. In the UI:

Workflow dry runs

Workflows support dry run mode — every step episode inherits the mode and opts from the workflow instance:

# Compiled workflow
WorkflowEngine.start_workflow(MyWorkflow, trigger_data, mode: :dry_run)

# Dynamic workflow with finding persistence
WorkflowEngine.start_dynamic_workflow("order_flow", trigger_data,
  mode: :dry_run,
  dry_run_opts: %{persist_findings: true}
)

All step episodes will run in dry run mode: findings are journaled (and optionally persisted with prefix), outputs are skipped. The workflow instance itself stores mode and dry_run_opts (V9 migration), so retries and subsequent steps also inherit the mode.

Per-step overrides allow targeting different mocks and options to individual steps:

WorkflowEngine.start_dynamic_workflow("vendor_onboarding", trigger_data,
  mode: :dry_run,
  dry_run_opts: %{
    persist_findings: true,
    steps: %{
      "compliance_check" => %{
        "synthesis_override" => %{"class" => "high_risk", "severity" => "high"}
      },
      "connector_setup" => %{
        "tool_overrides" => %{"erp.create_vendor" => %{"id" => "mock-v-001"}},
        "persist_findings" => "experiment1"
      }
    }
  }
)

Global keys (like persist_findings: true) apply to all steps. Step-specific keys override globals for that step. The "steps" key itself is stripped from each episode's opts.

Strategies in workflow steps can use Findings.active_for_mode/3 to transparently query their own dry run findings when persist_findings is enabled.

Tools

External capabilities are registered as tools implementing Cyclium.Tool. Use use Cyclium.Tool for sensible defaults — the only required callback is call/3:

defmodule MyApp.Tools.ERP do
  use Cyclium.Tool

  @impl true
  def call(:read_po, args, _ctx) do
    case MyApp.ERP.get_po(args["po_id"]) do
      {:ok, po} -> {:ok, po}
      {:error, reason} -> {:error, reason}
    end
  end
end

Override optional callbacks as needed:

defmodule MyApp.Tools.VendorAPI do
  use Cyclium.Tool

  @impl true
  def call(:send_notification, args, _ctx), do: # ...

  # Strip credentials before journaling
  @impl true
  def redact(args), do: Map.drop(args, ["api_key"])

  # Strip large payloads from results before journaling
  @impl true
  def redact_result(result) when is_list(result) do
    %{count: length(result), ids: Enum.map(result, & &1.id)}
  end
  def redact_result(result), do: result

  # Mark as having side effects (affects caching/retry behavior)
  @impl true
  def side_effect?, do: true

  # Cache results for 5 minutes
  @impl true
  def cache_ttl, do: :timer.minutes(5)

  # Cache key scope — same PO ID returns cached result
  @impl true
  def cache_scope(args), do: args["po_id"]
end
Callback Default Description
call(action, args, ctx)required Execute the tool action
redact(args) passthrough Strip sensitive/bulky data from args before journaling
redact_result(result) passthrough Strip bulky data from results before journaling
side_effect?()false Whether the action mutates external state
cache_ttl():no_cache How long to cache results (ms)
cache_scope(args)"" Cache key discriminator

Register tools in config:

config :cyclium, :capability_registry, %{
  erp_read: MyApp.Tools.ERP,
  vendor_api: MyApp.Tools.VendorAPI
}

Strategies invoke tools via {:tool_call, :erp_read, :read_po, %{"po_id" => "PO-123"}}. The ToolExec wrapper handles capability resolution, caching, redaction, and error classification.

Reconciler

The optional Cyclium.Reconciler watches for spec.updated Bus events and reconciles running actors when their configuration changes at runtime:

Enable via config:

config :cyclium, :reconciler, true

Or trigger manually:

Cyclium.Reconciler.reconcile_actor(actor_pid, new_module)

LiveView integration

Cyclium integrates with Phoenix LiveView via the Bus. Subscribe in your LiveView's mount and handle events:

defmodule MyAppWeb.DashboardLive do
  use MyAppWeb, :live_view

  def mount(_params, _session, socket) do
    if connected?(socket), do: Cyclium.Bus.subscribe()
    {:ok, assign(socket, findings: load_findings())}
  end

  def handle_info({:bus, event, _payload}, socket)
      when event in ["finding.raised", "finding.updated", "finding.cleared"] do
    {:noreply, assign(socket, findings: load_findings())}
  end

  def handle_info({:bus, _event, _payload}, socket) do
    {:noreply, socket}
  end
end

Database tables

All tables use binary_id primary keys and are SQL Server 2017 compatible (no JSON operators in DDL, application-layer upserts, denormalized columns for indexed queries).

Table Migration Purpose
cyclium_episodes V1 Episode lifecycle, budget tracking, classification
cyclium_episode_steps V1 Step-by-step journal (16 step kinds)
cyclium_episode_checkpoints V1 Versioned strategy state snapshots
cyclium_findings V1 Persistent observations with raise/update/clear lifecycle
cyclium_outputs V1 Output proposals, delivery status, deduplication
cyclium_episode_logs V2 Materialized human-readable logs
cyclium_workflow_instances V3, V9 Workflow execution tracking, step states, dry run mode
cyclium_work_claims V6 Lease-based distributed work coordination
cyclium_agent_definitions V7 DB-stored actor definitions for dynamic actors
cyclium_workflow_definitions V8 DB-stored workflow definitions for dynamic workflows
cyclium_trigger_requests V14 Deferred episode execution for trigger-only nodes

V4 adds archived_at to episodes and findings. V5 replaces the non-unique dedupe_key index on episodes with a filtered unique index (WHERE dedupe_key IS NOT NULL AND archived_at IS NULL) for multi-node coordination. V6 adds the cyclium_work_claims table for lease-based distributed coordination across clustered nodes. V7 adds cyclium_agent_definitions for dynamic actors and mode/dry_run_opts columns to episodes for simulation support. V8 adds cyclium_workflow_definitions for dynamic workflows. V10 adds caused_by_key (finding causality chains) and expires_at (TTL-based expiration) to cyclium_findings.

Window helpers

Cyclium.Window provides clock-aligned deduplication buckets for output dedupe_key construction:

Cyclium.Window.bucket(:h4, DateTime.utc_now())   # "2026-02-24T08"  (4-hour windows)
Cyclium.Window.bucket(:h24, DateTime.utc_now())   # "2026-02-24"     (daily)
Cyclium.Window.bucket(:h48, DateTime.utc_now())   # "2026-02-24"     (every-other-day)
Cyclium.Window.bucket(:w1, DateTime.utc_now())    # "2026-W09"       (ISO week)

Use these in dedupe_key to prevent duplicate outputs within a time window:

dedupe_key: "alert:client:#{id}:#{Cyclium.Window.bucket(:h4, DateTime.utc_now())}"

Batch helpers

Cyclium.Batch provides a lightweight struct for strategies that process data in grouped batches across multiple :synthesize calls. No new step types — strategies continue using :tool_call and :synthesize as normal.

# Group items semantically (e.g., by base item so variants are compared together)
groups = Cyclium.Batch.group_by(items, & &1.base_item_id)
batch = Cyclium.Batch.init(groups)

# Or chunk by fixed size
batch = items |> Cyclium.Batch.chunk(10) |> Cyclium.Batch.init()

In next_step, drive the loop:

case Cyclium.Batch.current_group(state.batch) do
  nil -> :converge  # all groups processed
  {group_key, items} -> {:synthesize, build_prompt(group_key, items)}
end

In handle_result, advance:

batch = Cyclium.Batch.advance(state.batch, parsed_result)
{:ok, %{state | batch: batch}}

Progress tracking via Batch.group_count/1, Batch.processed_count/1, and Batch.done?/1.

Per-item episodes vs. batch processing

Batch helpers are useful when a single episode needs to process many items in groups — but they're not the only pattern. An alternative is to fire one episode per item, driven by domain events. This is the pattern used by the project health actor.

The difference:

Approach When to use Episode count
Batch (single episode) Scheduled sweep over a large dataset; items need cross-comparison One episode, many :synthesize turns
Per-item (many episodes) Event-driven re-evaluation; each item is independent One episode per item

Per-item pattern — ProjectHealthActor:

The actor listens for "project.updated" events. Each event carries a project_id, and each project gets its own independent episode. There's no need to load the full dataset — the trigger tells you which item changed.

defmodule MyApp.Actors.ProjectHealthActor do
  use Cyclium.Actor

  actor do
    domain(:project_health)
    spec_rev("v0.1.0")
    max_concurrent_episodes(5)
    episode_overflow(:queue)

    expectation(:project_should_be_healthy,
      strategy: MyApp.Strategies.ProjectHealth,
      trigger: {:event, "project.updated"},
      subject_key: :project_id,
      debounce_ms: :timer.seconds(2),
      budget: %{max_turns: 3, max_tokens: 1_000, max_wall_ms: 10_000}
    )
  end
end

The strategy is single-turn — load the item, classify it deterministically, emit findings:

defmodule MyApp.Strategies.ProjectHealth do
  @behaviour Cyclium.EpisodeRunner.Strategy

  @impl true
  def init(_episode, trigger) do
    {:ok, %{project_id: trigger.payload["project_id"]}}
  end

  @impl true
  def next_step(_state, _episode_ctx), do: :converge

  @impl true
  def handle_result(state, _step, _result), do: {:ok, state}

  @impl true
  def converge(state, episode_ctx) do
    project = MyApp.Projects.get!(state.project_id)
    percent_spent = project.spent / max(project.budget, 1)
    days_remaining = Date.diff(project.due_date, Date.utc_today())

    {class, severity, summary} = classify(project.status, percent_spent, days_remaining)

    {:ok, %Cyclium.ConvergeResult{
      classification: %{"primary" => class, "severity" => to_string(severity)},
      confidence: 1.0,
      summary: summary,
      findings: [
        {:raise, %{
          actor_id: "project_health_actor",
          finding_key: "project:health:#{project.id}:#{episode_ctx.episode_id}",
          class: class,
          severity: severity,
          confidence: 1.0,
          subject_kind: "project",
          subject_id: project.id,
          summary: summary,
          evidence_refs: %{
            "percent_spent" => percent_spent,
            "days_remaining" => days_remaining
          }
        }}
      ],
      outputs: []
    }}
  end

  defp classify(:completed, _pct, _days), do: {"complete", :low, "Project completed"}
  defp classify(_s, pct, _d) when pct > 1.0, do: {"over_budget", :high, "Over budget"}
  defp classify(_s, pct, _d) when pct > 0.85, do: {"budget_risk", :medium, "Budget at risk"}
  defp classify(_s, _pct, d) when d < 0, do: {"overdue", :high, "Past due date"}
  defp classify(_s, _pct, d) when d < 3, do: {"schedule_risk", :medium, "Due date approaching"}
  defp classify(_s, _pct, _d), do: {"healthy", :low, "On track"}
end

Why this works well:

When to reach for Batch instead: If you need to process 500 items in one pass (e.g., a nightly SKU classification sweep), a single episode with Cyclium.Batch is more efficient than 500 separate episodes — fewer rows, one journal, and the ability to compare items within groups.

Synthesizers

A synthesizer is the bridge between strategies and LLM infrastructure. It implements Cyclium.Synthesizer and is called when a strategy returns {:synthesize, prompt_ctx}. The synthesizer handles the actual LLM call, and its response flows back through handle_result/3.

defmodule MyApp.Synthesizers.ProjectAnalysis do
  @behaviour Cyclium.Synthesizer

  @impl true
  def synthesize(prompt_ctx, _episode_ctx) do
    case MyApp.LLM.chat(prompt_ctx.system, prompt_ctx.user) do
      {:ok, text} -> {:ok, %{text: text}}
      {:error, reason} -> {:error, :llm_error, reason}
    end
  end

  @impl true
  def estimate_tokens(prompt_ctx) do
    # Rough estimate for budget tracking
    String.length(prompt_ctx.user || "") |> div(4)
  end
end

Attaching a synthesizer to an actor

Synthesizers can be declared at two levels:

Actor-level — inherited by all expectations in the actor that use :synthesize. Declare it when most or all expectations share the same synthesizer:

defmodule MyApp.Actors.ProjectAdvisorActor do
  use Cyclium.Actor

  actor do
    domain(:project_advisory)
    spec_rev("v0.1.0")
    synthesizer(MyApp.Synthesizers.ProjectAnalysis)
    max_concurrent_episodes(3)
    episode_overflow(:queue)

    expectation(:project_ai_summary,
      strategy: MyApp.Strategies.ProjectAdvisor,
      trigger: {:event, "project.summary_requested"},
      budget: %{max_turns: 5, max_tokens: 10_000, max_wall_ms: 30_000}
    )

    expectation(:project_risk_assessment,
      strategy: MyApp.Strategies.ProjectRisk,
      trigger: {:event, "project.risk_review_requested"},
      budget: %{max_turns: 8, max_tokens: 15_000, max_wall_ms: 60_000}
    )
  end
end

Both expectations above use MyApp.Synthesizers.ProjectAnalysis.

Expectation-level — overrides the actor-level synthesizer for a specific expectation. Use this when one expectation needs a different model or configuration:

actor do
  domain(:project_advisory)
  synthesizer(MyApp.Synthesizers.ProjectAnalysis)   # default for this actor

  expectation(:project_ai_summary,
    strategy: MyApp.Strategies.ProjectAdvisor,
    trigger: {:event, "project.summary_requested"},
    synthesizer: MyApp.Synthesizers.FastSummary      # override for this expectation
  )

  expectation(:project_risk_assessment,
    strategy: MyApp.Strategies.ProjectRisk,
    trigger: {:event, "project.risk_review_requested"}
    # uses ProjectAnalysis (inherited from actor)
  )
end

The strategy itself doesn't know or care which synthesizer is wired in — it just returns {:synthesize, prompt_ctx} and receives the result in handle_result:

def next_step(%{project_data: data, ai_summary: nil}, _ctx) do
  {:synthesize, %{
    system: "You are a project risk analyst.",
    user: "Project: #{data.name}, #{data.percent_spent * 100}% spent, #{data.days_remaining} days left"
  }}
end

def handle_result(state, %{kind: :synthesis}, {:ok, %{text: text}}) do
  {:ok, %{state | ai_summary: text}}
end

When you don't need a synthesizer: If your strategy is purely deterministic (like the ProjectHealthStrategy above), you don't need to declare a synthesizer at all. The synthesizer is only invoked when a strategy returns {:synthesize, prompt_ctx} — if your strategy never does, the synthesizer configuration is ignored.

LLM-provided confidence

Findings have a confidence field (0.0–1.0). For deterministic strategies, hardcoding 1.0 is fine. But when an LLM produces the assessment, you can ask it to self-report confidence and pass that through to the finding.

Step 1 — Add confidence to the tool schema in your synthesizer:

@tool_definition %{
  type: "function",
  function: %{
    name: "project_health_assessment",
    parameters: %{
      type: "object",
      properties: %{
        class: %{type: "string", enum: ["healthy", "at_risk", "critical"]},
        severity: %{type: "string", enum: ["low", "medium", "high", "critical"]},
        summary: %{type: "string", description: "One sentence health summary"},
        confidence: %{
          type: "number",
          minimum: 0.0,
          maximum: 1.0,
          description:
            "How confident you are in this assessment (0.0–1.0). " <>
            "Use lower values when data is sparse or ambiguous, " <>
            "higher values when the evidence clearly supports the classification."
        }
        # ... other fields
      },
      required: ["class", "severity", "summary", "confidence"]
    }
  }
}

The description matters — it tells the LLM what the scale means, which produces more calibrated values than a bare "confidence" field.

Step 2 — Read it in converge and pass it to the finding:

def converge(state, episode_ctx) do
  result = state.assessment
  confidence = parse_confidence(result["confidence"])

  {:ok, %ConvergeResult{
    classification: %{"primary" => result["class"]},
    confidence: confidence,
    summary: result["summary"],
    findings: [
      {:raise, %{
        finding_key: "project:health:#{state.project_id}:#{episode_ctx.episode_id}",
        confidence: confidence,
        # ... other finding fields
      }}
    ]
  }}
end

defp parse_confidence(val) when is_number(val), do: max(0.0, min(val, 1.0))
defp parse_confidence(_), do: 0.5

The parse_confidence/1 helper clamps to [0.0, 1.0] and falls back to 0.5 if the LLM returns something unexpected. The same confidence flows into both the ConvergeResult (episode-level) and the finding (queryable).

When to hardcode instead: If the classification is deterministic (rule-based, no LLM), use confidence: 1.0. The LLM confidence pattern is for cases where the assessment involves judgment — ambiguous data, sparse evidence, or nuanced classification where the LLM's certainty is genuinely informative.

Test kit

Cyclium ships a test kit in Cyclium.Test.* that host apps can use to smoke-test their definitions without running full episodes. Import the helpers with use:

Actor validation

defmodule MyApp.Actors.ClientHealthActorTest do
  use ExUnit.Case, async: true
  use Cyclium.Test.ActorCase

  test "actor definition is valid" do
    assert_valid_actor(MyApp.Actors.ClientHealthActor)
  end

  test "all expectations have strategies" do
    assert_strategies_defined(MyApp.Actors.ClientHealthActor)
  end

  test "budgets are well-formed" do
    assert_budgets_valid(MyApp.Actors.ClientHealthActor)
  end

  test "spec_rev is set" do
    assert_spec_rev_set(MyApp.Actors.ClientHealthActor)
  end
end

Strategy contract verification

defmodule MyApp.Strategies.ClientHealthTest do
  use ExUnit.Case, async: true
  use Cyclium.Test.StrategyCase

  @episode build_test_episode(actor_id: "client_health", expectation_id: "health_check")
  @trigger %Cyclium.Trigger.Manual{requested_by: "test"}

  test "init returns valid state" do
    assert_valid_init(MyApp.Strategies.ClientHealth, @episode, @trigger)
  end

  test "strategy terminates within budget" do
    assert_strategy_terminates(MyApp.Strategies.ClientHealth, @episode, @trigger,
      max_steps: 20
    )
  end
end

Synthesizer testing

use Cyclium.Test.SynthesizerCase

# Contract validation
assert_valid_synthesize(MySynthesizer, prompt_ctx, episode_ctx)
assert_valid_estimate_tokens(MySynthesizer, prompt_ctx)

# FakeSynthesizer for strategy tests
{:ok, _} = Cyclium.Test.FakeSynthesizer.start_link()
Cyclium.Test.FakeSynthesizer.set_response(%{"answer" => "42"})
# ... run strategy, then inspect calls:
Cyclium.Test.FakeSynthesizer.calls()

Output adapter testing

use Cyclium.Test.OutputCase

# Contract validation
assert_valid_deliver(MyApp.Adapters.Slack, :slack, payload, ctx)

# FakeOutputAdapter for integration tests
{:ok, _} = Cyclium.Test.FakeOutputAdapter.start_link()
# ... run episode, then inspect deliveries:
Cyclium.Test.FakeOutputAdapter.deliveries()

Workflow validation

defmodule MyApp.Workflows.VendorOnboardingTest do
  use ExUnit.Case, async: true
  use Cyclium.Test.WorkflowCase

  test "workflow is valid" do
    assert_valid_workflow(MyApp.Workflows.VendorOnboarding)
  end

  test "all steps have failure policies" do
    assert_failure_policies_complete(MyApp.Workflows.VendorOnboarding)
  end

  test "step inputs don&#39;t crash" do
    assert_step_inputs_safe(MyApp.Workflows.VendorOnboarding,
      trigger: %{"vendor_id" => "v123"}
    )
  end
end

Checkpoint migration fuzzing

Property-based testing for migrate/2 chains using StreamData:

use Cyclium.Test.CheckpointMigration

# Fuzz test: generate random states, migrate through version chain, assert no crashes
assert_migration_safe(MyCheckpoint, iterations: 200)

# Specific version migration
assert_migration(MyCheckpoint, %{"old_field" => 1}, 1, 3)

# Idempotency: migrating an already-current state is a no-op
assert_migration_idempotent(MyCheckpoint, iterations: 100)

Demo application

See cyclium_ex_hapi for a complete Phoenix LiveView application demonstrating Cyclium:

Development

# Install dependencies
mix deps.get

# Run tests
mix test

# Dialyzer (static analysis)
mix dialyzer

# Compile
mix compile --warnings-as-errors