CMDC Orchestrator

CMDC 多 Agent 编排引擎 — WorkflowSpec + Run API + DAG 驱动的 AgentOps 编排。

cmdc_orchestratorcmdc 之上提供「图驱动」的多 Agent 编排能力:把 工作流描述成 DAG(节点 + 边),由执行器按拓扑顺序调度,并把上游节点的输出 自动注入到下游节点的 prompt 中。v0.4 起新增可持久化 WorkflowSpec,用于企业 Workflow Designer、发布校验、dry run 和 Trace Viewer 接入。v0.5 新增异步 Run API、 RunStore 事件账本、condition/signal 分支和 output_key 上下文合并。v0.6 补齐 gen_statem 可恢复执行器、运行控制 API、retry/timeout/fallback policy、 :fork / :join 聚合、:human_task 暂停审批恢复和 AgentOps 企业接入契约。

理论基石参考 Agentic Design Patterns 第 2 / 3 / 7 / 12 / 15 章(路由、并行化、 多 Agent 协作、异常处理与恢复、Inter-Agent 通信)。

安装

Hex 依赖:

def deps do
[
{:cmdc, "~> 0.5"},
{:cmdc_orchestrator, "~> 0.6"}
]
end

monorepo 内部通过 path 依赖:

def deps do
[
{:cmdc_orchestrator, path: "../cmdc_orchestrator"}
]
end

核心概念

概念说明
Orchestration一个 DAG(有向无环图),定义节点和依赖
WorkflowSpecv0.4 新增的可序列化工作流规格,面向保存、校验、展示、发布
NodeDAG 节点,12 种内置类型::agent / :aggregator / :router / :condition / :human_task / :fork / :join / :gate / :tool / :eval_gate / :debate / :hierarchy
Edge节点间的数据流依赖 %{from:, to:, branch?:, signal?:}
Run一次编排执行,v0.5 起可通过 start_run/2 取得异步句柄
RuntimeDAG 全程共享的运行时容器(v0.3+),管理 Agent 会话池与 SubAgent 生命周期

WorkflowSpec(v0.4)

WorkflowSpec 是面向企业平台保存和发布的稳定配置层;旧 %DAG{} 仍可直接 execute/2

spec = %{
"workflow_id" => "wf.contract_review",
"version" => "2026.06.001",
"nodes" => [
%{"id" => "draft", "type" => "agent", "config" => %{"prompt" => "审阅合同"}},
%{"id" => "gate", "type" => "gate", "config" => %{"criteria" => ["完整"]}}
],
"edges" => [%{"from" => "draft", "to" => "gate"}]
}
{:ok, workflow, warnings} = CMDCOrchestrator.validate_workflow(spec)
{:ok, dry_report} = CMDCOrchestrator.dry_run(workflow)
{:ok, dag} = CMDCOrchestrator.to_dag(workflow)
{:ok, results} = CMDCOrchestrator.execute(workflow, agent_opts)

校验覆盖稳定 node id、edge 引用、DAG 环路、router branch、节点 preflight、 孤岛节点 warning,以及匿名函数 / pid / reference / tuple 等不可持久化配置。

Run API(v0.5+)

企业 Run Console / Trace Viewer 推荐使用异步 Run API:

{:ok, run_id} =
CMDCOrchestrator.start_run(workflow,
input: %{"amount" => 128},
trigger_source: :webhook,
metadata: %{tenant_id: "t_001"}
)
{:ok, %{run: run, node_runs: node_runs, events: events}} =
CMDCOrchestrator.status(run_id)
{:ok, final_run} = CMDCOrchestrator.await_run(run_id, timeout: 120_000)
{:ok, events} =
CMDCOrchestrator.events(run_id,
type: "orchestrator.node.completed",
limit: 100,
after_id: last_event_id
)
{:ok, _paused} = CMDCOrchestrator.pause_run(run_id, :operator_pause)
{:ok, _running} = CMDCOrchestrator.resume_run(run_id, idempotency_key: "resume-req-001")
{:ok, _cancelled} = CMDCOrchestrator.cancel(run_id, :user_cancelled)

start_run/2 支持 :idempotency_key,用于 HTTP 重试或重复点击时返回同一个 run:

{:ok, run_id} = CMDCOrchestrator.start_run(workflow, idempotency_key: "trigger-001")
{:ok, ^run_id} = CMDCOrchestrator.start_run(workflow, idempotency_key: "trigger-001")

v0.6 M1 开始,Run Console 可使用:

v0.6 M2 增加 human_task 决策 API:

{:ok, _running} =
CMDCOrchestrator.complete_human_task(run_id, "legal-review",
actor_ref: "role:legal",
idempotency_key: "approval-click-001"
)
{:ok, _waiting} =
CMDCOrchestrator.submit_human_task_decision(
run_id,
"legal-review",
%{action: :progress, comment: "需要补充附件"},
actor_ref: "role:legal"
)
{:ok, _running} = CMDCOrchestrator.expire_waiting_tasks(run_id)

默认 RunStore 是 ETS 后端,适合开发和测试。v0.6 M1 还提供 CMDCOrchestrator.RunStore.Checkpoint,可复用 CMDC.Checkpoint.Backend.ETS/DETScmdc_memory_pg 的 checkpoint backend 做轻持久化。企业 Phoenix 平台可以实现 CMDCOrchestrator.RunStore behaviour,把 run、node run、event ledger 映射到 自己的 Ecto schema、租户、RBAC 和审计系统。

企业 Phoenix AgentOps 接入建议见 guides/agentops_integration.md,覆盖 Workflow Designer JSON DSL、RunService API、Approval Center、Gateway SSE 事件映射、 RunStore Ecto 注意事项、Oban/Temporal 边界和 Hive 迁移对照。

run_sync/2 是新 Run API 的同步 wrapper,成功时仍返回旧 execute/2 风格 {:ok, results}

{:ok, results} = CMDCOrchestrator.run_sync(workflow, timeout: 120_000)

节点类型

:agent

启动一个 CMDC Agent 执行配置好的 prompt,自动把 dep_results 拼到 prompt 末尾作为「上游节点输出」上下文。

%{
id: "research",
type: :agent,
config: %{
prompt: "调研 AI Agent 最新进展",
system_prompt: "你是研究员",
max_turns: 5,
mode: :standalone # v0.3+::standalone | :pool | :subagent
}
}

v0.3 新增:三种执行模式

mode说明适用场景
:standalone(默认)每次 ephemeral 会话,不复用单点任务,无需历史
:poolpool_key 共享会话,对话累积Debater / 多轮审稿等需要"角色记忆"
:subagent通过 Runtime 注册独立长生命周期会话需要 SubAgent 监督和生命周期跟踪

:pool 模式可指定 config[:pool_key](默认 node.id);:subagent 模式 由 Runtime 在 DAG 结束时统一回收。

:aggregator

合并多个上游结果。三种策略:"concat" 拼接 / "merge" 合并 map / "vote" 多数 投票。

%{id: "merge", type: :aggregator, config: %{strategy: "concat"}}

:router

按策略分发到不同分支,配合边的 :branch 标签由 Executor 自动剪枝 (ADP 第 2 章 Routing)。三种策略:

# 1. rule —— 顺序匹配 pattern
%{
id: "classify",
type: :router,
config: %{
strategy: "rule",
rules: [
%{pattern: "技术", branch: "tech_branch"},
%{pattern: "商业", branch: "biz_branch"}
]
}
}
# 2. random —— 在 branches 中均匀随机
%{config: %{strategy: "random", branches: ["a", "b", "c"]}}
# 3. llm(v0.3+)—— LLM 选择 + fallback
%{
id: "intent_router",
type: :router,
config: %{
strategy: "llm",
branches: ["weather", "news", "search"],
fallback: "search",
router_agent: %{name: "Router", model: "openai:gpt-4o-mini"}
}
}

:condition(v0.5 新增)

确定性条件节点,返回 "true" / "false" signal,下游边按 :signal:branch 匹配:

%{
id: "risk_check",
type: :condition,
config: %{
left: "{{amount}}",
operator: "gte",
right: 100_000,
output_key: "risk_check"
}
}

支持算子:eq / neq / gt / gte / lt / lte / contains / not_contains / is_truthy / is_falsy

边示例:

[
%{from: "risk_check", to: "legal_review", signal: "true"},
%{from: "risk_check", to: "auto_report", signal: "false"}
]

:fork / :join(v0.6 M1)

fork 把多个分支放入同一个可恢复 run 内并发执行;join 聚合分支结果。分支使用 branch-local context,完成后一次性写回 run snapshot:

%{id: "split", type: :fork, config: %{output_key: "parallel_review"}}
%{id: "legal", type: :agent, config: %{prompt: "法务审阅"}}
%{id: "finance", type: :agent, config: %{prompt: "财务审阅"}}
%{id: "join", type: :join, config: %{mode: :all, fail_strategy: :fail_fast}}

join.config[:mode] 支持 :all / :any / :n_of_mfail_strategy 支持 :fail_fast / :wait_all / :tolerateWorkflowSpec.validate/1 会拒绝 dangling join、join 入边不足、嵌套 fork 和多 join 歧义。

:human_task(v0.6 M2)

human_task 把 workflow 挂起到人工审批或补录等待点。库内只保存任务描述、 assignee_refs、决策聚合状态、resume signal 与事件账本;审批 UI、RBAC/ABAC、 通知和企业审计表由 Phoenix 平台实现。导入 :human_approval / :approval 会归一化为 :human_task

%{
id: "legal_review",
type: :human_task,
config: %{
task_id: "legal-review",
title: "法务审批:{{risk_review.summary}}",
assignee_refs: ["role:legal"],
approval_mode: :quorum,
required_count: 2,
timeout_ms: 86_400_000,
on_timeout: :proceed_with_default,
default_signal: "timeout",
default_output: %{timed_out: true},
output_key: "legal_review"
}
}

决策支持 :approve / :reject / :request_changes / :progress。终局决策会把 human_task 输出写入 Run.completedRun.context_data,再按 "approved" / "rejected" / "request_changes" / timeout signal 继续下游;progress 只更新 任务和事件账本,不恢复 run。CMDCOrchestrator.AssigneeResolver 只定义可选 解析 behaviour,通用包不查询 Accounts/Roles。

统一 Policy(v0.6 M1)

节点可通过 policy 配置执行策略:

%{
id: "risk_check",
type: :tool,
config: %{tool_name: "risk_score"},
policy: %{
retries: 2,
timeout_ms: 5_000,
backoff: :exponential,
on_error: :emit_signal,
signal: "needs_review"
}
}

on_error 支持 :fail / :continue / :skip / :fallback / :emit_signal:fallback 可配 fallback_output 返回确定性输出。

:gate

质量检查点,criteria 全部通过才继续,否则中止整个 DAG。

%{id: "review", type: :gate, config: %{criteria: ["accurate", "concise"]}}

:tool(v0.4 新增)

直接调用已注册的 CMDC.Tool 模块,输入可从上游节点结果模板渲染。

%{
id: "rag_search",
type: :tool,
config: %{
tool_name: "enterprise_rag_search",
args: %{"query" => "{{classify.query}}"},
output_key: "evidence"
}
}

运行时通过 agent_opts[:tool_registry] 注入:

CMDCOrchestrator.execute(dag,
tool_registry: %{"enterprise_rag_search" => MyApp.Tools.RAGSearch}
)

:eval_gate(v0.4 新增)

离线评测门禁,适合 AgentSpec / RAG preset / Workflow 发布前阻断。

%{
id: "release_gate",
type: :eval_gate,
config: %{
metrics: %{groundedness: 0.91, unauthorized_source_count: 0},
thresholds: %{groundedness: 0.85, unauthorized_source_count: 0}
}
}

也可以配置 gate_module 委托给企业侧 CMDCEvalCMDCRAGArcana.Eval.Gate 风格模块的 check/2

:debate(v0.3 新增)

多 Agent 辩论 + Judge 模式,对应 ADP 第 7 章 Multi-Agent Collaboration。

%{
id: "debate",
type: :debate,
config: %{
topic: "Elixir vs Python on Agents?",
debaters: [%{name: "ProElixir"}, %{name: "ProPython"}],
judge: %{name: "Judge"},
max_rounds: 3,
consensus_fn: fn rounds -> length(rounds) >= 2 end # 可选提前终止
}
}

:hierarchy(v0.3 新增)

Manager → Workers → Synthesizer 三段式协作。

%{
id: "hier",
type: :hierarchy,
config: %{
goal: "为新 SaaS 制定市场调研报告",
manager: %{name: "PM"}, # 可选
workers: [%{name: "Researcher"}, %{name: "Analyst"}],
synthesizer: %{name: "Writer"}, # 可选
worker_assign: :round_robin, # 或 :pairwise
max_parallel: 4
}
}

可选 :tasks / :split_fn 完全跳过 LLM 拆解,直接用静态子任务列表跑离线测试。

使用示例

dag = %CMDCOrchestrator.DAG{
nodes: [
%{id: "research", type: :agent, config: %{prompt: "调研 AI Agent"}},
%{id: "write", type: :agent, config: %{prompt: "写一篇 800 字博客"}},
%{id: "review", type: :gate, config: %{criteria: ["完整"]}}
],
edges: [
%{from: "research", to: "write"},
%{from: "write", to: "review"}
]
}
{:ok, results} = CMDCOrchestrator.execute(dag,
model: "openai:gpt-4o-mini",
api_key: System.fetch_env!("OPENAI_API_KEY")
)
results["write"]

执行失败时返回结构化错误,方便上层 UI 聚焦失败点:

{:error, %{node_id: "review", reason: "门禁未通过: 完整", completed: %{...}}}

与 CMDC v0.2 的集成

:agent 节点支持透传 cmdc 0.2 的全部 Agent 选项::user_data / :prompt_mode / :plugins / :tools / :provider / :model 等。

v0.3 通过 Runtime 提供原生的会话池和 SubAgent 支持,无需用户手工管理 CMDC.SubAgent.Supervisor

内置模板(v0.4)

CMDCOrchestrator.Templates.names()
#=> ["contract_review", "order_delay_diagnosis", "ticket_triage", "rag_release_gate", "debate_review"]
spec = CMDCOrchestrator.Templates.get!("contract_review")
{:ok, _report} = CMDCOrchestrator.dry_run(spec)

模板是 JSON-ready 的 WorkflowSpec 草稿,可作为 Workflow Designer 初始化数据。

Telemetry(v0.4)

执行器发出稳定事件,metadata 只包含结构化摘要,不放 prompt、chunk 或完整工具输出:

这些事件可由 cmdc_gateway 或企业 AgentOps Trace Viewer 映射为 SSE / timeline。

human_task 会额外写入 human_task.createdhuman_task.decision_recordedhuman_task.progresshuman_task.completedhuman_task.timeout 等事件,供 Approval Center、Run Console 和 Trace Viewer 关联展示。

开发

mix deps.get
mix test
mix format --check-formatted
mix compile --warnings-as-errors
mix credo --strict

路线图

详见 CHANGELOG.mdexample/multi_agent_debate_demo.exs

许可

Apache-2.0