AgentSessionManager

AgentSessionManager

Unified Elixir infrastructure for orchestrating AI agent sessions across Claude, Codex, and Amp — with lifecycle management, event streaming, persistence, and provider-agnostic observability.

Hex.pmDocumentationLicense


Why AgentSessionManager?

When you call an AI provider API directly, you get back a response. When you build a product on top of AI agents, you need much more: session state, event history, cost tracking, cancellation, persistence, and the ability to swap providers without rewriting your application.

AgentSessionManager is the infrastructure layer that gives you all of this through a clean ports-and-adapters architecture.

Concern Without ASM With ASM
Session state Hand-roll per provider Unified lifecycle with state machines
Event formats Parse each provider separately 20+ normalized event types
Persistence Build your own Pluggable: InMemory, Ecto (Postgres/SQLite), Ash, S3
Cost visibility None Model-aware cost tracking per run
Provider lock-in Rewrite when switching Swap adapters, keep application code
Observability Ad-hoc logging Telemetry, rendering pipeline, audit trail

See Architecture for the full design.

Quick Start

Add the dependency and at least one provider SDK:

def deps do
  [
    {:agent_session_manager, "~> 0.8.0"},
    {:claude_agent_sdk, "~> 0.14.0"}  # or {:codex_sdk, "~> 0.10.0"} / {:amp_sdk, "~> 0.4.0"}
  ]
end

Run a one-shot session:

alias AgentSessionManager.SessionManager
alias AgentSessionManager.Adapters.{ClaudeAdapter, InMemorySessionStore}

{:ok, store} = InMemorySessionStore.start_link()
{:ok, adapter} = ClaudeAdapter.start_link()

{:ok, result} =
  SessionManager.run_once(store, adapter,
    %{messages: [%{role: "user", content: "What is the BEAM?"}]},
    event_callback: fn
      %{type: :message_streamed, data: %{delta: text}} -> IO.write(text)
      _ -> :ok
    end
  )

IO.inspect(result.token_usage)

This creates a session, executes one run, streams events, and cleans up. For provider-specific setup, see Provider Adapters.

Choose Your Abstraction Level

Start with the simplest API that meets your needs:

API Use when you need Complexity
SessionManager.run_once/4 Single request/response, scripts, testing Lowest
StreamSession.start/1 Lazy event stream from a one-shot call Low
SessionManager full lifecycle Multi-run sessions, explicit state control Medium
SessionServer Per-session queuing, concurrent runs, subscriptions Highest

StreamSession

Collapses boilerplate into a single call that returns a lazy stream:

{:ok, stream, close, _meta} =
  StreamSession.start(
    adapter: {ClaudeAdapter, []},
    input: %{messages: [%{role: "user", content: "Hello!"}]}
  )

stream |> Stream.each(&IO.inspect/1) |> Stream.run()
close.()

See StreamSession.

Full Lifecycle

Explicit control over session creation, activation, runs, and completion:

{:ok, session} = SessionManager.start_session(store, adapter, %{agent_id: "my-agent"})
{:ok, session} = SessionManager.activate_session(store, session.id)
{:ok, run} = SessionManager.start_run(store, adapter, session.id, %{messages: messages})
{:ok, result} = SessionManager.execute_run(store, adapter, run.id)
{:ok, _} = SessionManager.complete_session(store, session.id)

See Sessions and Runs.

SessionServer

Per-session GenServer with FIFO queuing and multi-slot concurrency:

{:ok, server} =
  SessionServer.start_link(
    store: store,
    adapter: adapter,
    session_opts: %{agent_id: "runtime-agent"},
    max_concurrent_runs: 2
  )

{:ok, run_id} = SessionServer.submit_run(server, %{messages: messages})
{:ok, result} = SessionServer.await_run(server, run_id, 120_000)
:ok = SessionServer.drain(server, 30_000)

See Session Server Runtime.

Core Concepts

Your Application
       |
  SessionManager         -- orchestrates lifecycle, events, capability checks
       |
  +----+----+
  |         |
Store    Adapter          -- ports (interfaces / behaviours)
  |         |
ETS/DB   Claude/Codex/Amp -- adapters (implementations)

Sessions are containers for a series of agent interactions. State machine: pending -> active -> completed / failed / cancelled.

Runs represent one execution within a session -- sending input and receiving output. State machine: pending -> running -> completed / failed / cancelled / timeout. Runs track token usage and output.

Events are immutable records emitted during execution. They provide an audit trail, power streaming, and enable cursor-based replay.

Ports

The core depends only on behaviours. Swap implementations without touching application code.

Port Purpose
ProviderAdapter AI provider integration (execute, cancel, capabilities)
SessionStore Session, run, and event persistence
ArtifactStore Binary artifact storage (patches, files)
QueryAPI Cross-session queries and analytics
Maintenance Retention, cleanup, health checks

Providers

Adapter Provider Streaming Tool Use Cancel
ClaudeAdapter Anthropic Claude Yes Yes Yes
CodexAdapter Codex CLI Yes Yes Yes
AmpAdapter Amp (Sourcegraph) Yes Yes Yes
ShellAdapter Shell commands Yes No Yes

All adapters accept :permission_mode, :max_turns, :system_prompt, and :sdk_opts for provider-specific passthrough. For multi-provider setups, ProviderRouter acts as a meta-adapter with capability-based selection, failover, and circuit breaker. See Provider Routing.

Normalized Events

Each provider emits events in its own format. Adapters normalize them into a canonical set:

Event Description
run_started Execution began
message_streamed Streaming content chunk
message_received Complete message ready
tool_call_started Tool invocation begins
tool_call_completed Tool finished
token_usage_updated Usage stats updated
run_completed Execution finished successfully
run_failed Execution failed
run_cancelled Execution cancelled
policy_violation Policy limit exceeded

Failure events keep backward-compatible error_message and may also include provider_error:

%{
  provider: :codex | :amp | :claude | :gemini | :unknown,
  kind: atom(),
  message: String.t(),
  exit_code: integer() | nil,
  stderr: String.t() | nil,
  truncated?: boolean() | nil
}

provider_error.stderr is truncated before emission/persistence using AgentSessionManager.Config keys :error_text_max_bytes and :error_text_max_lines. When persistence redaction is enabled, nested provider_error fields are also scanned.

See Events and Streaming for the full event type reference.

Features

Persistence and Storage

Pluggable storage backends for sessions, runs, events, and artifacts. InMemorySessionStore for development and testing; EctoSessionStore for PostgreSQL or SQLite in production; AshSessionStore as an Ash Framework alternative; S3ArtifactStore for large binary artifacts; CompositeSessionStore to unify session and artifact backends.

# Production setup with Ecto + SQLite
{:ok, store} = EctoSessionStore.start_link(repo: MyApp.Repo)

Guides: Persistence Overview | Ecto | SQLite | Ash | S3 | Custom

Event Streaming and Observability

Cursor-backed event streaming with monotonic per-session sequence numbers, durable pagination (after/before/limit), and optional long-poll support. The rendering pipeline separates formatting (renderers) from output (sinks) -- compose StudioRenderer, CompactRenderer, or VerboseRenderer with TTYSink, FileSink, JSONLSink, PubSubSink, or CallbackSink.

Rendering.stream(event_stream,
  renderer: {StudioRenderer, verbosity: :summary},
  sinks: [{TTYSink, []}, {JSONLSink, path: "events.jsonl"}]
)

Guides: Events and Streaming | Cursor Streaming | Rendering | Telemetry

Session Continuity and Workspace

Continuity: Provider-agnostic transcript reconstruction from persisted events. Continuation modes (:auto, :replay, :native) with token-aware truncation keep conversations within budget across runs.

Workspace: Optional pre/post snapshots with git or hash backends. Computes diffs, captures patches as artifacts, and supports rollback on failure (git backend only).

Guides: Session Continuity | Workspace Snapshots

Routing, Policy, and Cost

Routing: ProviderRouter selects providers by capability matching, with health tracking, failover, weighted scoring, session stickiness, and circuit breaker.

Policy: Real-time budget and tool governance. Define token, duration, and cost limits with tool allow/deny rules. Violations trigger :cancel or :warn actions. Policies stack with deterministic merge.

Cost: Model-aware cost calculation using configurable pricing tables. Integrates with policy enforcement for budget limits.

{:ok, policy} = Policy.new(
  name: "production",
  limits: [{:max_total_tokens, 8_000}, {:max_cost_usd, 0.50}],
  tool_rules: [{:deny, ["bash"]}],
  on_violation: :cancel
)

Guides: Provider Routing | Policy Enforcement | Cost Tracking

Concurrency, PubSub, and Integration

Concurrency: ConcurrencyLimiter enforces max parallel sessions/runs. SessionServer provides per-session FIFO queuing with multi-slot execution and durable subscriptions.

PubSub: Phoenix.PubSub integration for real-time event broadcasting to LiveView, WebSocket, or other subscribers.

Workflow Bridge: Thin integration layer for DAG/workflow engines with step execution, error classification (retry/failover/abort), and multi-run session lifecycle helpers.

Secrets Redaction: EventRedactor strips sensitive data from events before persistence.

Guides: Concurrency | PubSub | Workflow Bridge | Secrets Redaction | Shell Runner

Guides

Introduction

Core Concepts

Persistence

Integration

Reference

Production Checklist

A SessionStore is always required, but InMemorySessionStore works fine if you don't need data to survive restarts. Durable storage (Ecto, Ash, S3) is opt-in.

Required:

If you need durable storage:

Recommended:

Optional:

Error Handling

All operations return tagged tuples. Errors use a structured taxonomy with machine-readable codes:

case SessionManager.start_session(store, adapter, attrs) do
  {:ok, session} -> session
  {:error, %Error{code: :validation_error, message: msg}} ->
    Logger.error("Validation failed: #{msg}")
end

Error codes are grouped into categories: validation, resource, provider, storage, runtime, concurrency, and tool errors. Some errors are marked retryable via Error.retryable?/1. See Error Handling.

Examples

The examples/ directory contains 40+ runnable scripts. Run them all:

bash examples/run_all.sh
# Or for a single provider:
bash examples/run_all.sh --provider claude

Getting started:oneshot.exs, live_session.exs, stream_session.exs, common_surface.exs

Provider-specific:claude_direct.exs, codex_direct.exs, amp_direct.exs

Features:session_continuity.exs, workspace_snapshot.exs, provider_routing.exs, policy_enforcement.exs, cost_tracking.exs, rendering_studio.exs, approval_gates.exs, secrets_redaction.exs

Persistence:persistence_live.exs, sqlite_session_store_live.exs, ecto_session_store_live.exs, ash_session_store.exs, composite_store_live.exs

Advanced:session_concurrency.exs, interactive_interrupt.exs, workflow_bridge.exs, shell_exec.exs, pubsub_sink.exs

See examples/README.md for full documentation.

Documentation

Full API documentation is available at HexDocs.

License

AgentSessionManager is released under the MIT License.