Skip to content

Realtime Streaming Gateway Specification (CoreX/integration)

本文档定义 PowerX CoreX/integration 域Realtime Streaming Gateway(实时流式网关), 它是 PowerX 的统一流事件出口层,负责将来自 Agent、Workflow、Adaptor 的实时事件推送至前端、插件页面、或第三方客户端。

它构成 「运行时事件可视化与AI响应流」的统一出入口


1️⃣ 设计目标

目标描述
统一流式通道汇聚所有运行态事件(AI token、Agent log、Workflow 状态、插件事件)
多协议支持SSE / WebSocket / (预留 GRPC-Stream)
多源聚合聚合 MCP / gRPC / Agent Stream / EventBus 消息
低延迟传递毫秒级流转 + 优先级调度
安全隔离多租户 + Scope + Topic ACL 控制
全链观测trace_id 跨 Workflow/Agent/Gateway 贯穿

2️⃣ 系统架构总览

+---------------------------------------------------------------+
|                         PowerX Runtime                        |
|---------------------------------------------------------------|
|    Orchestrator / Workflow / Agent / Adaptor (producers)      |
|               │                                                |
|               ▼                                                |
|           EventBus (CoreX Integration Bus)                     |
|               │                                                |
|               ▼                                                |
|        Realtime Streaming Gateway (出口聚合层)                |
|          ├─ SSE Endpoint (/realtime/sse)                       |
|          ├─ WebSocket Endpoint (/realtime/ws)                  |
|          └─ Poll Fallback (/realtime/poll)                     |
|               │                                                |
|               ▼                                                |
|         Frontend / Plugin Page / External Client               |
+---------------------------------------------------------------+

3️⃣ 核心组件职责

模块职责
EventBus (Integration Layer)内部事件总线(NATS/Redis/Kafka);所有流式事件写入此处。
Realtime GatewaySSE/WS 推送层;对外暴露统一 /realtime/... 通道。
Stream Transformer将内部事件封装为标准 SSE/WS 格式。
Connection Manager管理客户端连接、Topic 订阅、心跳与限流。
Auth Filter鉴权与租户隔离,校验 JWT 与 Topic ACL。
Metrics & Trace推送延迟、丢包率、订阅数等指标采集。

4️⃣ 数据流路径示意

Adaptor / Agent


EventBus.Publish("ai:run_x123", {...})


Realtime Gateway (订阅 topic)


SSE/WS 推送 → Browser / Plugin 前端

示例:Agent Stream Token 推送

event: token
data: {"seq":42,"text":"生成中...","trace_id":"trc_a7b2f"}

5️⃣ 通信协议与端点

协议Path特点
SSE/realtime/sse?topic=<topic>单向推送,轻量低延迟
WebSocket/realtime/ws双向通信,可订阅多个主题
Poll Fallback/realtime/poll长轮询降级模式

Header 认证:

Authorization: Bearer <jwt>
X-Tenant-ID: t001
X-Actor-ID: u102

6️⃣ Topic 命名空间

类型示例说明
AI/Agentagent:run_<uuid>Agent 实时输出
Workflowwf:<flow_id>编排任务进度与状态
Plugin Eventplugin:<id>:event插件定义事件
Systemsys:notice系统广播
Tenant Customtenant:<tid>:<topic>租户自定义通道

Topic ACL 与 JWT scope 精确绑定。


7️⃣ 事件结构规范

7.1 通用事件

json
{
  "seq": 15,
  "type": "token|log|state|error|done",
  "topic": "agent:run_556",
  "data": { "text": "处理中...", "step": "summarize" },
  "trace_id": "trc_ef2b",
  "tenant_id": "t001",
  "timestamp": "2025-10-12T10:21:00Z"
}

7.2 SSE 封装

event: token
data: {"seq":15,"text":"处理中...","trace_id":"trc_ef2b"}

7.3 WS 帧格式

json
{
  "action": "publish",
  "topic": "agent:run_556",
  "payload": {...}
}

8️⃣ QoS 与流控制

策略内容
At-most-once默认传输语义(低延迟优先)。
Buffer 模式可缓存最近 N=50 条事件以支持断线恢复。
Backpressure当消费过慢时丢弃低优先级消息(log < state < token)。
Priority 调度token > status > log。
TimeoutSSE/WS 空闲连接超时 15min。

9️⃣ EventBus 集成模型

内部 EventBus 支持 NATS JetStream / Redis Stream / Kafka 等实现。

go
sub := eventbus.Subscribe(topic)
for msg := range sub.Chan() {
    gateway.Publish(topic, msg)
}
  • 所有事件带 seq → 保序;
  • 同 Topic 多源写入时由 trace_id 聚合;
  • status 事件按周期聚合(减少噪音)。

🔟 多协议源事件映射

来源转换为内部事件类型说明
MCP Streamtoken/log/state直接映射。
gRPC Stream拆分 Recv() → EventBus消息分片。
Agent A2A Streamagent.outputAgent Channel 映射。
HTTP SSE转换后写入 EventBus。
Local FunctionChannel→EventBus。

Realtime Gateway 是聚合出口层,不关心来源,只做统一封装。


11️⃣ 安全与隔离策略

层级策略
认证JWT 验证(tenant_id, actor_id, scopes)。
授权Topic ACL(scope-topic 映射)。
租户隔离每租户独立命名空间 tenant:<tid>:
限流每用户最大连接数 & 每 topic 每秒事件速率。
加密TLS + WSS 强制。
签名校验可选:系统级事件签名验证。

12️⃣ Metrics 与 Observability

指标说明
realtime_connections_active当前活跃连接数
realtime_publish_rate每秒事件发布数
realtime_delivery_latency_ms推送延迟
realtime_dropped_messages_total丢包数
realtime_topic_subscribers每 topic 订阅数
realtime_buffer_recoveries_total断线重连恢复次数

Trace 链路:

Workflow/Agent → EventBus → Gateway → Client
(trace_id 贯穿全链)

13️⃣ 故障恢复与降级

场景行为
客户端断线自动重连,恢复最近 N 条缓存。
Gateway 重启从 EventBus 恢复状态。
EventBus 异常内存缓存重试 3 次。
网络不稳定自动降级 /poll 模式。
多节点部署Gateway 通过 Redis/NATS 分布式广播同步。

14️⃣ 插件与 Gateway 交互模式

插件作为事件源

go
eventbus.Publish("plugin:crm:event", data)

插件前端作为消费者

GET /_p/com.powerx.crmplus/realtime/sse?topic=plugin:crm:event
  • PluginManager 反代 Gateway;
  • JWT 自动注入;
  • 可展示 AI 输出、日志、工作流进度等。

流程:Plugin Runtime → EventBus → Gateway → Plugin Frontend


15️⃣ 性能目标

指标目标值
平均推送延迟≤ 100 ms
并发连接数/节点≥ 10,000
吞吐量≥ 50,000 msg/s
丢包率≤ 0.01%
恢复率100%(断线自动恢复)

16️⃣ 管理与监控 API

方法路径说明
GET/api/v1/integration/realtime/topics查询活跃 topic
GET/api/v1/integration/realtime/connections当前连接
POST/api/v1/integration/realtime/broadcast系统广播
DELETE/api/v1/integration/realtime/connections/{id}断开连接

17️⃣ 技术选型建议(Go 实现)

模块推荐实现
EventBusNATS JetStream(或 Redis Stream)
SSEnet/http + FlushWriter
WSgorilla/websocket
JWTgithub.com/golang-jwt/jwt/v5
MetricsPrometheus + OTEL
State SyncRedis pub/sub 或 NATS 分布式事件

18️⃣ 与其他模块关系

模块交互
Transport Adapter写入流式事件至 EventBus
Workflow / Agent产生 AI/任务事件
RuntimeResolver提供租户上下文与 trace_id
Router提供调用上下文(trace/tenant)
Observability Layer采集推送与丢包指标
PluginManager提供反代入口 /realtime/...

✅ 一句话总结

Realtime Streaming Gateway = PowerX 的运行时事件出口与前端实时桥。 它聚合所有流式事件、统一封装、鉴权推送, 支撑 AI Token、Workflow 状态、Agent Log 的毫秒级实时可视化, 是 PowerX Integration Framework 的「流式中枢与可观测出口」。

基于 Apache 2.0 许可发布