Skip to content

Orchestrator Service Interface Specification

本文档定义 CoreX/integration/orchestrator 模块的接口设计与运行规范。 Orchestrator 是 PowerX 编排内核的 统一服务门面(Service Facade), 它负责接收上层(Agent / Workflow)的执行请求,协调 Router / Transport / Flow / EventBus 等子系统, 并输出完整的可追踪执行链(TraceID)。


1️⃣ 定位与职责

职责说明
统一入口负责接收一切 “能力调用” 或 “Flow 执行” 请求
调用协调调用 Router 选路、通过 Transport 执行实际 Provider 能力
上下文封装统一构造 ExecutionContext,注入租户、用户、追踪信息
事件分发将状态、日志、token 等事件写入 EventBus
回调网关对接 Realtime Gateway,为前端推送流式执行结果
A2A 调度桥transport=agent,通过 AgentAdaptor 实现 Agent-to-Agent 通信

2️⃣ 模块架构

+-----------------------------------------------------------+
|                   Orchestrator Service                    |
|-----------------------------------------------------------|
|  API Layer  →  Controller →  Executor →  Flow Engine      |
|                    ↓              ↓             ↓         |
|               Router (capability) → Transport → Provider  |
|                    │                                    |
|                    └── EventBus → Realtime Gateway      |
+-----------------------------------------------------------+

模块职责划分

子模块功能路径
controller/对外接口层(HTTP/gRPC)/api/v1/orchestrator/...
executor/调度器,负责执行节点或能力调用ExecuteStep()
context/执行上下文构建器租户 / 追踪 / 鉴权
flow/Flow Engine 集成接口连接 DAG 状态机
event/事件上报与聚合写入 EventBus
metrics/指标收集与统计Prometheus Exporter

3️⃣ 接口定义总览

类型协议路径 / 方法说明
RESTPOST/api/v1/orchestrator/execute执行单个能力(capability)
RESTPOST/api/v1/orchestrator/workflow/run启动工作流执行
RESTPOST/api/v1/orchestrator/flow/run直接运行 Flow 图
RESTGET/api/v1/orchestrator/runs/{id}查询运行状态
gRPCExecutepowerx.orchestrator.v1等价 RPC 调用
gRPCStreamExecute双向流式接口(token 逐步输出)

4️⃣ 数据结构定义

4.1 ExecutionContext(核心)

json
{
  "trace_id": "trc_39d8a",
  "tenant_id": "t001",
  "actor_id": "u108",
  "scopes": ["crm.*", "ai.*"],
  "source": "agent:sales_copilot",
  "workflow_run_id": "wf_998",
  "options": {
    "timeout_ms": 15000,
    "retry": 2,
    "stream": true
  }
}

4.2 ExecuteRequest(能力调用)

json
{
  "capability": "crm.lead.fetch",
  "inputs": {"id": "L102"},
  "prefer": "grpc",
  "context": { "tenant_id": "t001", "actor_id": "u108" }
}

4.3 ExecuteResponse(流式)

json
{
  "trace_id": "trc_39d8a",
  "status": "running",
  "type": "token|log|state|done",
  "data": {"text": "处理中..."},
  "timestamp": "2025-10-12T15:40:21Z"
}

5️⃣ 编排执行主流程

Client/Agent

   ├──▶ POST /orchestrator/execute

   ├──▶ [1] 构建 ExecutionContext
   ├──▶ [2] Router.Select(capability)
   ├──▶ [3] Transport.Invoke(endpoint)
   ├──▶ [4] 收集结果/流式输出 → EventBus
   ├──▶ [5] 更新 StateStore(Flow/Workflow)
   └──▶ [6] Gateway 推送结果 → 前端 / Agent 回调

6️⃣ gRPC Service 定义(原型)

proto
service Orchestrator {
  rpc Execute (ExecuteRequest) returns (ExecuteResponse);
  rpc StreamExecute (ExecuteRequest) returns (stream ExecuteResponse);
}

ExecuteRequest

proto
message ExecuteRequest {
  string capability = 1;
  map<string, google.protobuf.Value> inputs = 2;
  string prefer = 3; // mcp|grpc|http|agent
  ExecutionContext context = 4;
}

7️⃣ 执行模式

模式描述示例场景
Sync单能力调用,等待返回CRM 创建客户
Async流式事件推送AI Token 生成
Workflow执行 DAG工作流定义的多步骤调用
AgentPlanAgent 自动生成计划并调用A2A 智能体协作

8️⃣ A2A 执行模式(AgentAdaptor 集成)

Orchestrator 内置对 Agent-to-Agent 调度的原生支持。

步骤行为
1️⃣ Router 选到 transport=agent 端点
2️⃣ Orchestrator 调用 AgentAdaptor.Invoke()
3️⃣ 对端 Agent 执行计划(Plan)并产生 Tool 调用
4️⃣ Tool 调用回流至本 Orchestrator 再执行
5️⃣ 所有 token/log/status 事件统一走 EventBus → Gateway

即:Orchestrator 是 A2A 会话控制中心。 每个 Agent 调用链仍共享同一 trace_id,可跨 Agent 追踪。


9️⃣ 内部模块接口(Golang 伪代码)

go
type OrchestratorService interface {
    Execute(ctx context.Context, req *ExecuteRequest) (*ExecuteResponse, error)
    StreamExecute(req *ExecuteRequest, stream Orchestrator_StreamExecuteServer) error
    RunFlow(ctx context.Context, flow *FlowRunRequest) (*FlowRunResponse, error)
}

type Executor interface {
    RunStep(ctx ExecutionContext, step StepSpec) (StepResult, error)
    StreamStep(ctx ExecutionContext, step StepSpec, writer EventWriter) error
}

🔟 Metrics & Logging

指标说明
orchestrator_requests_total执行请求总数
orchestrator_latency_seconds平均执行耗时
orchestrator_active_flows当前活跃流程数
orchestrator_errors_total异常数(按 transport 分类)
a2a_invocations_totalAgent-to-Agent 调用次数

日志示例:

json
{
  "ts": "2025-10-12T15:55:43Z",
  "trace_id": "trc_94a7e",
  "component": "Orchestrator",
  "capability": "crm.lead.fetch",
  "transport": "grpc",
  "duration_ms": 321,
  "status": "success"
}

11️⃣ 控制接口(Admin API)

MethodPath功能
GET/api/v1/orchestrator/runs查询所有运行实例
GET/api/v1/orchestrator/runs/{id}查看执行详情
POST/api/v1/orchestrator/runs/{id}/pause暂停执行
POST/api/v1/orchestrator/runs/{id}/resume恢复执行
POST/api/v1/orchestrator/runs/{id}/cancel取消执行
GET/api/v1/orchestrator/stats返回实时指标摘要

12️⃣ 安全与隔离

层级策略
认证JWT 或服务令牌验证
授权按 scope 验证 capability 调用权限
租户隔离ExecutionContext 限定 tenant_id
调用深度限制防止递归调用(A2A)
敏感日志脱敏Mask input/output 字段

13️⃣ 故障与恢复

场景处理
连接中断断点重试(基于 trace_id 快照)
Orchestrator 重启从 StateStore 恢复 Flow 状态
EventBus 阻塞缓存队列写入本地 WAL
Router 错误重选 Endpoint
AgentAdaptor 超时降级到 HTTP fallback

14️⃣ 与其他模块关系

模块交互说明
Flow Engine提供节点依赖解析与状态更新
Router / Registry提供能力发现与端点选择
Transport Adapter执行实际协议调用
Agent Manager提供上层决策计划输入
EventBus / Gateway推送执行流事件
Security Layer执行权限校验与审计
Metrics Layer输出运行指标与日志

✅ 一句话总结

Orchestrator 是 PowerX 编排系统的“统一执行门面”, 负责调度所有能力调用、Flow 执行与 Agent 协作。 它封装 Router / Transport / EventBus / FlowEngine 的内部细节, 并以可观测、可追踪、可审计的方式提供统一执行入口。

基于 Apache 2.0 许可发布