CMDC Orchestrator

CMDC 多 Agent 编排引擎 — WorkflowSpec + DAG 驱动的 AgentOps 编排。

cmdc_orchestratorcmdc 之上提供「图驱动」的多 Agent 编排能力:把 工作流描述成 DAG(节点 + 边),由执行器按拓扑顺序调度,并把上游节点的输出 自动注入到下游节点的 prompt 中。v0.4 起新增可持久化 WorkflowSpec,用于企业 Workflow Designer、发布校验、dry run 和 Trace Viewer 接入。

理论基石参考 Agentic Design Patterns 第 2 / 3 / 7 / 12 / 15 章(路由、并行化、 多 Agent 协作、异常处理与恢复、Inter-Agent 通信)。

安装

monorepo 内部通过 path 依赖:

def deps do
[
{:cmdc_orchestrator, path: "../cmdc_orchestrator"}
]
end

核心概念

概念说明
Orchestration一个 DAG(有向无环图),定义节点和依赖
WorkflowSpecv0.4 新增的可序列化工作流规格,面向保存、校验、展示、发布
NodeDAG 节点,8 种内置类型::agent / :aggregator / :router / :gate / :tool / :eval_gate / :debate / :hierarchy
Edge节点间的数据流依赖 %{from:, to:, branch?:}
Run一次编排执行,按拓扑序分层并行调度
RuntimeDAG 全程共享的运行时容器(v0.3+),管理 Agent 会话池与 SubAgent 生命周期

WorkflowSpec(v0.4)

WorkflowSpec 是面向企业平台保存和发布的稳定配置层;旧 %DAG{} 仍可直接 execute/2

spec = %{
"workflow_id" => "wf.contract_review",
"version" => "2026.06.001",
"nodes" => [
%{"id" => "draft", "type" => "agent", "config" => %{"prompt" => "审阅合同"}},
%{"id" => "gate", "type" => "gate", "config" => %{"criteria" => ["完整"]}}
],
"edges" => [%{"from" => "draft", "to" => "gate"}]
}
{:ok, workflow, warnings} = CMDCOrchestrator.validate_workflow(spec)
{:ok, dry_report} = CMDCOrchestrator.dry_run(workflow)
{:ok, dag} = CMDCOrchestrator.to_dag(workflow)
{:ok, results} = CMDCOrchestrator.execute(workflow, agent_opts)

校验覆盖稳定 node id、edge 引用、DAG 环路、router branch、节点 preflight、 孤岛节点 warning,以及匿名函数 / pid / reference / tuple 等不可持久化配置。

节点类型

:agent

启动一个 CMDC Agent 执行配置好的 prompt,自动把 dep_results 拼到 prompt 末尾作为「上游节点输出」上下文。

%{
id: "research",
type: :agent,
config: %{
prompt: "调研 AI Agent 最新进展",
system_prompt: "你是研究员",
max_turns: 5,
mode: :standalone # v0.3+::standalone | :pool | :subagent
}
}

v0.3 新增:三种执行模式

mode说明适用场景
:standalone(默认)每次 ephemeral 会话,不复用单点任务,无需历史
:poolpool_key 共享会话,对话累积Debater / 多轮审稿等需要"角色记忆"
:subagent通过 Runtime 注册独立长生命周期会话需要 SubAgent 监督和生命周期跟踪

:pool 模式可指定 config[:pool_key](默认 node.id);:subagent 模式 由 Runtime 在 DAG 结束时统一回收。

:aggregator

合并多个上游结果。三种策略:"concat" 拼接 / "merge" 合并 map / "vote" 多数 投票。

%{id: "merge", type: :aggregator, config: %{strategy: "concat"}}

:router

按策略分发到不同分支,配合边的 :branch 标签由 Executor 自动剪枝 (ADP 第 2 章 Routing)。三种策略:

# 1. rule —— 顺序匹配 pattern
%{
id: "classify",
type: :router,
config: %{
strategy: "rule",
rules: [
%{pattern: "技术", branch: "tech_branch"},
%{pattern: "商业", branch: "biz_branch"}
]
}
}
# 2. random —— 在 branches 中均匀随机
%{config: %{strategy: "random", branches: ["a", "b", "c"]}}
# 3. llm(v0.3+)—— LLM 选择 + fallback
%{
id: "intent_router",
type: :router,
config: %{
strategy: "llm",
branches: ["weather", "news", "search"],
fallback: "search",
router_agent: %{name: "Router", model: "openai:gpt-4o-mini"}
}
}

:gate

质量检查点,criteria 全部通过才继续,否则中止整个 DAG。

%{id: "review", type: :gate, config: %{criteria: ["accurate", "concise"]}}

:tool(v0.4 新增)

直接调用已注册的 CMDC.Tool 模块,输入可从上游节点结果模板渲染。

%{
id: "rag_search",
type: :tool,
config: %{
tool_name: "enterprise_rag_search",
args: %{"query" => "{{classify.query}}"},
output_key: "evidence"
}
}

运行时通过 agent_opts[:tool_registry] 注入:

CMDCOrchestrator.execute(dag,
tool_registry: %{"enterprise_rag_search" => MyApp.Tools.RAGSearch}
)

:eval_gate(v0.4 新增)

离线评测门禁,适合 AgentSpec / RAG preset / Workflow 发布前阻断。

%{
id: "release_gate",
type: :eval_gate,
config: %{
metrics: %{groundedness: 0.91, unauthorized_source_count: 0},
thresholds: %{groundedness: 0.85, unauthorized_source_count: 0}
}
}

也可以配置 gate_module 委托给企业侧 CMDCEvalCMDCRAGArcana.Eval.Gate 风格模块的 check/2

:debate(v0.3 新增)

多 Agent 辩论 + Judge 模式,对应 ADP 第 7 章 Multi-Agent Collaboration。

%{
id: "debate",
type: :debate,
config: %{
topic: "Elixir vs Python on Agents?",
debaters: [%{name: "ProElixir"}, %{name: "ProPython"}],
judge: %{name: "Judge"},
max_rounds: 3,
consensus_fn: fn rounds -> length(rounds) >= 2 end # 可选提前终止
}
}

:hierarchy(v0.3 新增)

Manager → Workers → Synthesizer 三段式协作。

%{
id: "hier",
type: :hierarchy,
config: %{
goal: "为新 SaaS 制定市场调研报告",
manager: %{name: "PM"}, # 可选
workers: [%{name: "Researcher"}, %{name: "Analyst"}],
synthesizer: %{name: "Writer"}, # 可选
worker_assign: :round_robin, # 或 :pairwise
max_parallel: 4
}
}

可选 :tasks / :split_fn 完全跳过 LLM 拆解,直接用静态子任务列表跑离线测试。

使用示例

dag = %CMDCOrchestrator.DAG{
nodes: [
%{id: "research", type: :agent, config: %{prompt: "调研 AI Agent"}},
%{id: "write", type: :agent, config: %{prompt: "写一篇 800 字博客"}},
%{id: "review", type: :gate, config: %{criteria: ["完整"]}}
],
edges: [
%{from: "research", to: "write"},
%{from: "write", to: "review"}
]
}
{:ok, results} = CMDCOrchestrator.execute(dag,
model: "openai:gpt-4o-mini",
api_key: System.fetch_env!("OPENAI_API_KEY")
)
results["write"]

执行失败时返回结构化错误,方便上层 UI 聚焦失败点:

{:error, %{node_id: "review", reason: "门禁未通过: 完整", completed: %{...}}}

与 CMDC v0.2 的集成

:agent 节点支持透传 cmdc 0.2 的全部 Agent 选项::user_data / :prompt_mode / :plugins / :tools / :provider / :model 等。

v0.3 通过 Runtime 提供原生的会话池和 SubAgent 支持,无需用户手工管理 CMDC.SubAgent.Supervisor

内置模板(v0.4)

CMDCOrchestrator.Templates.names()
#=> ["contract_review", "order_delay_diagnosis", "ticket_triage", "rag_release_gate", "debate_review"]
spec = CMDCOrchestrator.Templates.get!("contract_review")
{:ok, _report} = CMDCOrchestrator.dry_run(spec)

模板是 JSON-ready 的 WorkflowSpec 草稿,可作为 Workflow Designer 初始化数据。

Telemetry(v0.4)

执行器发出稳定事件,metadata 只包含结构化摘要,不放 prompt、chunk 或完整工具输出:

这些事件可由 cmdc_gateway 或企业 AgentOps Trace Viewer 映射为 SSE / timeline。

计划中

开发

mix deps.get
mix test
mix format --check-formatted
mix compile --warnings-as-errors
mix credo --strict

路线图

详见 CHANGELOG.mdexample/multi_agent_debate_demo.exs

许可

Apache-2.0