Planck.Agent

planck_agent is an OTP-based agent runtime for Elixir built on top of planck_ai. It drives the full LLM loop — stream a response, collect tool calls, execute them concurrently, append results, re-stream — inside a supervised GenServer per agent, with Phoenix.PubSub broadcasting at every step.

Installation

# mix.exs
{:planck_agent, "~> 0.1"}

planck_agent is a pure library — it has no runtime config module. Callers pass paths (sessions dir, skills/tools dirs) explicitly. planck_headless owns config resolution for the full Planck stack.

Quick start

alias Planck.AI
alias Planck.Agent
alias Planck.Agent.Tool
# 1. Get a model from planck_ai
{:ok, model} = AI.get_model(:anthropic, "claude-sonnet-4-6")
# 2. Define a tool
read_file =
Tool.new(
name: "read_file",
description: "Read a file from disk",
parameters: %{
"type" => "object",
"properties" => %{"path" => %{"type" => "string"}},
"required" => ["path"]
},
execute_fn: fn _agent_id, _call_id, %{"path" => path} ->
case File.read(path) do
{:ok, content} -> {:ok, content}
{:error, reason} -> {:error, "could not read #{path}: #{reason}"}
end
end
)
# 3. Start an agent
{:ok, agent} =
DynamicSupervisor.start_child(
Planck.Agent.AgentSupervisor,
{Agent,
id: "agent-1",
model: model,
system_prompt: "You are a helpful coding assistant.",
tools: [read_file]}
)
# 4. Subscribe and prompt
Agent.subscribe(agent)
Agent.prompt(agent, "What does lib/app.ex do?")
# 5. Receive events
receive do
{:agent_event, :turn_end, %{message: msg, usage: usage}} ->
IO.inspect(msg.content)
IO.inspect(usage)
end

Pub/Sub events

Subscribe to {:agent_event, type, payload} messages via Planck.Agent.subscribe/1. Events are broadcast on two topics: "agent:#{id}" and, when a session_id is set, "session:#{session_id}".

EventPayload keysWhen
:turn_startindexNew LLM turn begins
:turn_endmessage, usageTurn complete, no pending tools
:text_deltatextStreaming text chunk
:thinking_deltatextStreaming thinking chunk
:usage_deltadelta, totalToken usage from each LLM response
:tool_startid, name, argsTool execution begins
:tool_endid, name, result, errorTool finished
:worker_exitpid, reasonWorker process exited (orchestrator only)
:errorreasonStream error; agent returns to :idle
Agent.subscribe("agent-1")
receive do
{:agent_event, :text_delta, %{text: chunk}} -> IO.write(chunk)
{:agent_event, :tool_start, %{name: name}} -> IO.puts("→ #{name}")
{:agent_event, :turn_end, %{usage: u}} -> IO.inspect(u)
end

Subscribe to the session topic to receive events from all agents in a session:

Phoenix.PubSub.subscribe(Planck.Agent.PubSub, "session:#{session_id}")

Agent lifecycle

prompt/2
idle ──────────────────► streaming ──► stream events
▲ │ (text, thinking, tool calls)
│ ▼
stream done
/ \
no tools tool calls pending
│ │ │
│◄──── turn_end ─┘ ▼
executing_tools
(Task.async_stream)
│ │
│◄─── append tool_result ────────────────┘
re-stream (loop)

abort/1 cancels in-flight streaming from any state and returns the agent to :idle. stop/1 shuts it down cleanly.

Roles

An agent's role is determined solely by its tool list at start time:

Teams

Agents with the same team_id form a team. The orchestrator owns the team; all workers are process-linked to it and exit when it does.

Built-in inter-agent tools

These tools are wired up by the caller — see Planck.Agent.OrchestratorTools and Planck.Agent.WorkerTools for the Tool structs to include.

Available to all agents:

ToolBehaviour
call_agentSync/blocking — sends a message and blocks until the target responds
send_agentAsync/fire-and-forget — sends a task and returns immediately
respond_agentNon-blocking — routes a result back to the caller
list_teamReturns all agents with id, type, name, and status

Both call_agent and send_agent accept agent_id (from list_team) and an optional reset_previous_context: true to archive the target's prior history before sending.

Orchestrator only:

ToolBehaviour
spawn_agentSpawns a new worker; returns its agent_id — save it for subsequent calls
destroy_agentTerminates a worker permanently
interrupt_agentAborts a worker's current turn; worker stays alive
list_modelsReturns the available_models list passed at start time

Built-in file and shell tools

Planck.Agent.BuiltinTools provides four ready-made Tool structs that cover file-system access and shell execution:

tools = [
Planck.Agent.BuiltinTools.read(),
Planck.Agent.BuiltinTools.write(),
Planck.Agent.BuiltinTools.edit(),
Planck.Agent.BuiltinTools.bash()
]
ToolDescription
readRead a file. Accepts optional offset (lines to skip) and limit (max lines).
writeWrite content to a file, creating missing parent directories.
editReplace an exact unique string in a file. Errors if not found or ambiguous.
bashRun a shell command. Optional cwd and timeout (ms) as runtime JSON args.

bash captures both stdout and stderr; stderr is appended under a STDERR: header when non-empty. Shell execution is managed by erlexec, which cleans up process groups on timeout or termination.

Granting tools to spawned workers

When building the orchestrator's tools, pass a grantable_tools list to Planck.Agent.Tools.orchestrator_tools/6. The orchestrator can then grant any subset of those tools to workers it spawns by including a "tools" key in the spawn_agent call:

{
"type": "reviewer",
"name": "Reviewer",
"tools": ["read"]
}

Workers always receive the standard worker tools (call_agent, send_agent, respond_agent, list_team). Granted tools are added on top. Names not in the orchestrator's grantable_tools list are silently ignored — workers cannot escalate beyond what the orchestrator was given.

Spawning a team manually

alias Planck.Agent.{Agent, AgentSpec, Team}
{:ok, model} = Planck.AI.get_model(:anthropic, "claude-sonnet-4-6")
team_id = :crypto.strong_rand_bytes(8) |> Base.encode16(case: :lower)
session_id = :crypto.strong_rand_bytes(8) |> Base.encode16(case: :lower)
orchestrator_opts = [
id: "orch-#{team_id}",
type: "orchestrator",
model: model,
system_prompt: "You coordinate a team of agents.",
tools: orchestrator_tools,
team_id: team_id,
session_id: session_id,
compactor: nil,
available_models: Planck.AI.list_models(:anthropic)
]
DynamicSupervisor.start_child(Planck.Agent.AgentSupervisor, {Agent, orchestrator_opts})

Workers are typically spawned by the orchestrator via spawn_agent at runtime, or pre-spawned from a team template (see below).

Session persistence

Start a session before starting any agents that should persist messages:

alias Planck.Agent.Session
{:ok, _pid} = Session.start(session_id, name: "my-session", dir: sessions_dir)

Agents with a matching session_id append every message automatically. Each call to append/3 is synchronous and returns the SQLite row id, which becomes Message.id — unifying the in-memory id with the DB primary key. Messages are stored in a SQLite file at <sessions_dir>/<id>_<name>.db.

Retrieving messages

# All messages in insertion order
{:ok, rows} = Session.messages(session_id)
{:ok, rows} = Session.messages(session_id, agent_id: "agent-1")
# Each row: %{db_id: pos_integer(), agent_id: String.t(), message: Message.t(), inserted_at: integer()}

Checkpoint-based pagination

Summary messages (role {:custom, :summary}) are stored as checkpoints, enabling efficient "load recent / load more" pagination for long sessions:

# Initial load — latest summary checkpoint + all messages after it
{:ok, rows, checkpoint_id} = Session.messages_from_latest_checkpoint(session_id)
# Load more — previous chapter
{:ok, rows, prev_id} = Session.messages_before_checkpoint(session_id, checkpoint_id)
# prev_id == nil → no more history to load

Pass agent_id: to either function to filter to a specific agent.

Context compaction

Planck.Agent.Hooks.Compactor dispatches context compaction. When the estimated token count of the message history exceeds the threshold, it calls the LLM to produce a summary that preserves the active goal and recent context. Dispatch signature: Hooks.Compactor.compact(module, model, messages, sidecar_node).

When module is nil, the built-in LLM-based compactor runs locally. Pass a module atom to delegate to a sidecar compactor:

# Agent start opts — module atom or nil (use built-in)
Planck.Agent.start_link(
id: "agent-1",
model: model,
compactor: MySidecar.Compactors.Builder, # nil = built-in
sidecar_node: sidecar_node
)

When compaction triggers, the summary is inserted as a {:custom, :summary} message in the agent's history and persisted to the session. Future LLM calls are built from the latest summary onward — the full history remains in the session for audit and UI pagination.

Bring your own compaction strategy by implementing Planck.Agent.Hooks.Compactor in a sidecar module (see Sidecars below):

defmodule MySidecar.Compactors.Builder do
use Planck.Agent.Hooks.Compactor
@impl true
def compact(model, messages) do
summary = Planck.Agent.Message.new({:custom, :summary}, [{:text, summarise(messages)}])
kept = Enum.take(messages, -5)
{:compact, summary, kept}
end
@impl true
def compact_timeout, do: 60_000
end

Reference it by name in TEAM.json ("compactor": "MySidecar.Compactors.Builder"); planck_headless resolves the string to a module atom and passes compactor: module at agent start time. The sidecar fallbacks to the built-in compactor if unavailable.

The compactor receives messages since the last summary checkpoint and must return either {:compact, summary_msg, kept_messages} or :skip.

Sidecars

A sidecar is a separate OTP application that extends planck_headless with custom tools and compactors over distributed Erlang. The entry-point module implements the Planck.Agent.Sidecar behaviour:

defmodule MySidecar.Planck do
use Planck.Agent.Sidecar
@impl true
def tools do
[
Planck.Agent.Tool.new(
name: "run_tests",
description: "Run the test suite.",
parameters: %{"type" => "object", "properties" => %{}},
execute_fn: fn _agent_id, _id, _args ->
{output, 0} = System.cmd("mix", ["test"])
{:ok, output}
end
)
]
end
end

Planck.Agent.Sidecar itself provides the RPC entry points planck_headless calls on the sidecar node:

See specs/sidecar.md for the full design including startup sequence, compactor integration, and configuration.

Team templates

Define a team in JSON and load it at runtime:

[
{
"type": "planner",
"name": "Planner",
"description": "Breaks tasks into steps",
"provider": "anthropic",
"model_id": "claude-sonnet-4-6",
"system_prompt": "You are an expert planner.",
"opts": { "temperature": 0.5 }
},
{
"type": "coder",
"name": "Coder",
"description": "Writes and edits code",
"provider": "openai",
"base_url": "http://localhost:11434",
"model_id": "llama3.2",
"system_prompt": "prompts/coder.md"
}
]

system_prompt accepts an inline string or a .md/.txt path resolved relative to the template file. Valid providers are "anthropic", "openai", "google". OpenAI-compatible servers (e.g. Ollama) use "openai" + "base_url".

The optional "tools" array lists tool names the agent should receive. Names are resolved at start time from the tool_pool: keyword passed to AgentSpec.to_start_opts/2:

{ "type": "coder", "tools": ["read", "write", "bash"], ... }
pool = Planck.Agent.BuiltinTools.all() ++ Planck.Agent.ExternalTool.load_all(dirs)
start_opts = AgentSpec.to_start_opts(spec,
tool_pool: pool,
team_id: team_id,
session_id: session_id
)

Unknown names are silently ignored. When spec.tools is empty, to_start_opts/2 falls back to the tools: keyword — the behaviour before this feature was added.

alias Planck.Agent.{Agent, AgentSpec, Team}
{:ok, team} = Team.load(".planck/teams/my-team")
tools_by_type = %{
"planner" => [list_team_tool, send_agent_tool, spawn_agent_tool],
"coder" => [read_file_tool, write_file_tool, bash_tool, respond_agent_tool]
}
team_id = :crypto.strong_rand_bytes(8) |> Base.encode16(case: :lower)
session_id = :crypto.strong_rand_bytes(8) |> Base.encode16(case: :lower)
Enum.each(team.members, fn spec ->
{:ok, _model} = Planck.AI.get_model(spec.provider, spec.model_id)
tools = Map.get(tools_by_type, spec.type, [])
start_opts =
AgentSpec.to_start_opts(spec,
tools: tools,
team_id: team_id,
session_id: session_id
)
DynamicSupervisor.start_child(Planck.Agent.AgentSupervisor, {Agent, start_opts})
end)

Skills

Skills are reusable agent capabilities stored on the filesystem. Each skill is a directory containing a SKILL.md with YAML-style frontmatter:

.planck/skills/
code_review/
SKILL.md
resources/
rubric.md
---
name: code_review
description: Reviews code for correctness, style, and performance.
---
Review the provided code...

Load skills and pass them to AgentSpec.to_start_opts/2 via skill_pool:. Skill descriptions are resolved fresh from the pool before each LLM turn — not baked into the system prompt at start time:

alias Planck.Agent.Skill
skills = Skill.load_all(["~/.planck/skills"])
# Per-agent skill scoping is driven by spec.skills and skill_pool: in
# AgentSpec.to_start_opts/2 — see the Teams section.

Each skill entry includes the path to its SKILL.md file and its resources directory.

Dynamic tool management

Add and remove tools without restarting the agent:

Agent.add_tool(agent, new_tool)
Agent.remove_tool(agent, "tool_name")

Editing history

Truncate both the session and in-memory history to strictly before a given message. The message id is the SQLite row id (Message.id == db_id for persisted messages). A no-op for ephemeral agents.

Agent.rewind_to_message(agent, message_id)

Typically called via Planck.Headless.rewind_to_message/3 which also re-prompts the orchestrator with the edited text.

Configuration

planck_agent has no runtime configuration module. Every function that reads from disk (Session.start/2, Skill.load_all/1, ExternalTool.load_all/1, Compactor.load/1) takes its path(s) as an explicit argument. Applications using this library should resolve those paths themselves — or depend on planck_headless, which exposes Planck.Headless.Config for the full Planck stack.

Supervision tree

Planck.Agent.Supervisor (strategy: :one_for_all)
├── Phoenix.PubSub (name: Planck.Agent.PubSub)
├── Registry (keys: :duplicate, name: Planck.Agent.Registry)
├── Task.Supervisor (name: Planck.Agent.TaskSupervisor)
├── DynamicSupervisor (name: Planck.Agent.SessionSupervisor)
│ └── Planck.Agent.Session (restart: :temporary)
└── DynamicSupervisor (name: Planck.Agent.AgentSupervisor)
├── Planck.Agent (role: :orchestrator)
└── Planck.Agent (role: :worker)

:one_for_all on the top-level supervisor ensures the Registry and PubSub always restart together — a stale Registry after a crash would leave agents unable to find each other.