pi_bridge
BEAM runtime bridge for pi and the pi-elixir package. It provides the Elixir-side Pi.* modules used for Livebook-style stateful eval, ExAST-backed structural tools, stdio transport, executable Elixir skills, LLM calls through pi's active model, OTP-backed logical agents, and bidirectional plugin UI events.
pi_bridge is inspired by Vibe: keep the model-facing surface small, but let trusted Elixir code operate from inside the running BEAM.
Runtime model
pi_bridge is bundled and started by the pi-elixir extension as an extension-owned sidecar. Target projects do not need to add :pi_bridge to mix.exs; the sidecar loads the target Mix project context and communicates with pi over stdio.
Public API ergonomics
The public API intentionally separates single-call and orchestration shapes:
Pi.LLM.complete/2andPi.LLM.stream/2are low-level model calls over the active pi session.Pi.Session.start/1creates a server-owned BEAM session process for OTP-backed agent/subagent work.Pi.Agent.run/2returns a single%Pi.Agent.Result{}and is backed byPi.Sessionworkers.Pi.Agent.chain/2,Pi.Agent.parallel/2, andPi.Agent.fanout/2return%Pi.Agent.Run{}so partial results, kind, status, and errors are explicit.Pi.Pluginmodules expose optionalinit/1,handle_event/2,commands/0,handle_command/3,tool_call/3,tool_result/3,apis/0, andshutdown/1; plugin process lifecycle is handled byPi.Plugin.ManagerandPi.Plugin.Supervisor.Pi.Plugin.api/1registers API metadata at compile time and fills a default alias from the module name.Pi.Plugin.command/1registers BEAM plugin commands that the pi extension exposes as/elixir:<name>slash commands.Pi.Plugin.Manager.load/2andunload/1support dynamic plugin lifecycle changes.Pi.Plugin.Waitersprovides an ETS-backed waiter registry for interactive plugins.Pi.Plugin.Event.emit/2publishes BEAM events onto pi's TypeScript extension event bus.Pi.Host.info/1,active_tools/1,append_entry/3, andsend_message/3expose small host-session APIs back to BEAM code.Pi.Sessionremains the BEAM-owned runtime session API.
Boundary JSON examples are documented in docs/protocol.md.
Eval
Pi.Eval.run/2 is the trusted project introspection path. It evaluates inside the project BEAM with project modules, aliases, application config, OTP processes, Repo modules, and IEx helpers available.
Structured eval from the pi tool is stateful: bindings and Macro.Env are kept in a supervised evaluator and persisted as sidecar snapshots next to the pi session. That gives IEx/Livebook-like continuity across eval calls and resume/branch navigation without inlining large state into JSONL transcripts.
Useful eval helpers:
Pi.Eval.bindings()
Pi.Eval.forget(:large_result)
Pi.Eval.reset()
QuackDB mirror analytics are available through token-efficient aliases in eval:
# preloaded: import Ecto.Query; use QuackDB.Ecto
# preloaded: alias Pi.Self, as: Self
# preloaded: alias Pi.CodeMap, as: CodeMap
# preloaded: alias Pi.Quack, as: Q; require Q
# preloaded: alias Pi.Quack.Event, as: E; alias Pi.Quack.SessionFile, as: SF
Self.status()
Self.context("why did sync crash?", limit: 5)
# Reach-backed semantic reflection after edits.
CodeMap.reflect(changed: true)
CodeMap.hotspots(path: "lib/my_app/module.ex")
CodeMap.context("MyApp.Module.fun/2")
from(e in E,
group_by: e.tool_name,
order_by: [desc: count(e.id)],
select: %{tool: e.tool_name, n: count(e.id)}
)
|> Q.table()
Use Q.score/2, Q.matches/2, Q.json/2, and Q.json_text/2 inside normal QuackDB/Ecto queries for FTS and payload analysis.
For untrusted snippets, use the Dune-backed sandbox:
{:ok, %{inspected: "42"}} = Pi.Eval.sandbox("40 + 2")
# Negative example: restricted system access is blocked.
{:error, message} = Pi.Eval.sandbox(~s(System.cmd("ls", [])))
The sandbox applies timeout, reduction, heap, and allowlist limits. It returns {:error, :unavailable} if the optional :dune dependency is not present.
LLM
pi owns provider/model selection, credentials, streaming, cancellation, usage, and transcript UI. The BEAM side sends structured completion/stream requests over the active bridge; it does not create a separate provider stack.
{:ok, text} = Pi.LLM.complete("Explain this module")
stream = Pi.LLM.stream("Draft a migration plan")
Enum.each(stream.stream, &IO.write/1)
ReqLLM can route through the active pi session as an adapter on top of that pi-owned model path:
Pi.ReqLLM.install()
ReqLLM.generate_text(Pi.ReqLLM.current_model(), "Summarize the current project")
Pi.ReqLLM.current_model/0 returns ReqLLM's inline model struct for the active pi session. Use it instead of the string "pi:current" so ReqLLM does not try to verify the dynamic local route against its public model catalog.
Feature flag:
PI_ELIXIR_LLM=0disables BEAM-initiated LLM requests.
Sessions and agents
The bridge keeps one pi Node.js/TUI process and one embedded BEAM process. Subagents are not extra pi processes; they are lightweight OTP session workers supervised inside BEAM:
pi Node.js/TUI
└─ embedded BEAM
├─ Pi.LLM.Broker
└─ Pi.Session.Supervisor
├─ Pi.Session.Worker
└─ Pi.Session.Worker
Use Pi.Session when you need attachable, subscribable session state:
{:ok, root} = Pi.Session.start(name: :root)
{:ok, reviewer} = Pi.Session.child(root, name: :reviewer)
{:ok, "done"} = Pi.Session.run(reviewer, "Review this change")
{:ok, state} = Pi.Session.subscribe(reviewer)
Session snapshots are emitted as pi_session events. The extension renders active/running work as a compact live widget, then emits completed root session trees once as inline transcript entries (elixir-sessions). Active BEAM snapshots are reloaded directly from the bridge on session start. Private slash commands control active sessions without adding model-facing tools. The TUI accepts either id=session_123 or the raw session_123 as the command argument:
/elixir:sessions.cancel id=session_123
/elixir:sessions.rerun id=session_123
Snapshots carry structured fields such as prompt/response previews, current activity, recent streaming output, run_count, completed_at, and timing. Streaming session runs can emit :delta events before the final assistant message:
{:ok, text} = Pi.Session.run(session, "Draft notes", stream: true)
Feature flag:
PI_ELIXIR_SESSIONS=0disables session snapshot/control affordances.
Use Pi.Agent for convenience orchestration over those sessions. Agent helpers use canonical %Pi.Session.State{} values and runtime Pi.Session workers; there is no separate agent session registry:
{:ok, result} = Pi.Agent.run("Review this change", name: :reviewer)
{:ok, run} =
Pi.Agent.chain([
"Draft an implementation plan",
"Review the plan for risks"
])
{:ok, fanout} = Pi.Agent.fanout(["Review tests", "Review API", "Review docs"])
For supervised delegation, start jobs. A job owns lifecycle; its child Pi.Session owns the transcript:
{:ok, job} = Pi.Agent.start("Review this module", role: :reviewer)
job.status
#=> :running
{:ok, done} = Pi.Agent.await(job, 60_000)
done.status
#=> :done
{:ok, text} = Pi.Agent.result(done)
Pi.Session.state(done.child_session_id)
Run multiple jobs when the tasks are independent:
{:ok, jobs} =
Pi.Agent.run_many([
%{task: "Review tests", role: :reviewer},
%{task: "Review API", role: :reviewer},
"Review docs"
])
Enum.map(jobs, &Pi.Agent.await(&1, 60_000))
Attach jobs to a parent session when you want parent-visible lifecycle events in the session widget:
{:ok, parent} = Pi.Session.start(name: :review)
parent_id = Pi.Session.state(parent).id
{:ok, job} = Pi.Agent.start("Review tests", role: :reviewer, parent_session_id: parent_id)
{:ok, done} = Pi.Agent.await(job, 60_000)
Pi.Session.state(parent).events
# includes :agent_job_started and :agent_job_finished
Cancel long-running work through the job lifecycle handle:
{:ok, job} = Pi.Agent.start("Explore a risky option", role: :researcher)
:ok = Pi.Agent.cancel(job)
{:error, cancelled} = Pi.Agent.await(job, 100)
cancelled.status
#=> :cancelled
Pi.Agent.run/2 keeps the single-run shape {:ok, %Pi.Agent.Result{}} | {:error, %Pi.Agent.Result{}}. chain/2, parallel/2, and fanout/2 return {:ok, %Pi.Agent.Run{}} | {:error, %Pi.Agent.Run{}} so orchestration metadata and partial results are explicit. Job APIs return %Pi.Agent.Job{} lifecycle handles with status, result, error, parent_session_id, and child_session_id.
Plugin command/event/hook lifecycle
- On stdio startup, BEAM sends
readywith plugin command inventory. - The TypeScript extension registers each plugin command as
/elixir:<name>. - Running the slash command sends
pi_plugin_commandto BEAM and dispatcheshandle_command/3. Pi.Plugin.Event.emit/2sends{type: "event"}back to pi and is published onpi.events.- Before a pi tool executes, the extension calls
pi_plugin_tool_call; plugintool_call/3may block or return an input-only patch. - After a pi tool result, the extension calls
pi_plugin_tool_result; plugintool_result/3may patch resultcontentorisError. - Malformed hook payloads are rejected before plugin callbacks run.
Session bridge APIs
BEAM code can ask the pi extension for small session-state snapshots, persist branch-aware custom entries, or emit a visible custom transcript message:
{:ok, info} = Pi.Host.info()
{:ok, %{tools: tools}} = Pi.Host.active_tools()
{:ok, "ok"} = Pi.Host.append_entry("demo-state", count: 1)
{:ok, "ok"} = Pi.Host.send_message("demo-message", count: 1)
Plugins
Feature flags:
PI_ELIXIR_PLUGINS=0disables built-in/project-local plugins, hooks, UI events, and plugin commands.PI_ELIXIR_SKILLS=0disables executable skill discovery.
Built-in optional plugins are loaded before project-local plugins. The built-in DuckDB event mirror (Pi.Mirror.QuackDB) is enabled by default; set PI_ELIXIR_MIRROR=0 to disable it. By default it writes ~/.pi/elixir/session-mirror.duckdb; override with PI_ELIXIR_MIRROR_DB, or point at an existing Quack server with PI_ELIXIR_MIRROR_QUACKDB_URI and PI_ELIXIR_MIRROR_QUACKDB_TOKEN.
Project-local plugins live in priv/pi_plugins, .pi/plugins, or pi_plugins. Each plugin is isolated behind a Pi.Plugin.Worker process.
defmodule DemoPiPlugin do
use Pi.Plugin
def init(_opts), do: {:ok, %{events: 0}}
def handle_event(_event, state), do: {:noreply, Map.update(state, :events, 1, &(&1 + 1))}
command name: :demo, description: "Run the demo plugin command"
def handle_command(:demo, args, state), do: {{:ok, "demo #{args}"}, state}
# Negative example: block a tool call.
# Return {:block, reason} to prevent a tool call, or {:ok, patch} to merge into the tool input only.
def tool_call(%{"toolName" => "bash"}, _context, state), do: {{:block, "bash blocked"}, state}
def tool_call(_call, _context, state), do: {:ok, state}
# Return {:ok, patch} to patch a tool result. Supported TypeScript-side patches include
# string `content` and boolean `isError`.
def tool_result(%{"toolName" => "demo"}, _context, state) do
{{:ok, %{"content" => "patched by plugin"}}, state}
end
def tool_result(_result, _context, state), do: {:ok, state}
def apis do
[name: :demo_plugin, module: __MODULE__, alias: :DemoPlugin]
end
end
Examples
See examples/vibe_workflow.exs and examples/demo_plugin.exs.