Skip to content

EventBus and Message Fabric

本文档定义 PowerX Integration Framework 的**事件总线与消息织网(EventBus & Message Fabric)**规范。 它是连接 Router / Orchestrator / AgentAdaptor / Gateway / Security / Metrics 的底层通信层。

核心目标: 让所有事件、日志、token、状态、告警、审计、指标都在一个统一的 Fabric 上流动。


1️⃣ 设计目标

目标说明
统一事件骨架所有模块事件使用统一 Envelope 结构
多源聚合支持来自 Agent、Workflow、Transport、Security、Metrics
多协议实现可基于 NATS / Redis Stream / Kafka 自适配
可订阅可回放支持持久化、历史回放与主题订阅
低延迟高并发毫秒级事件分发、十万级吞吐能力
可观测与安全每条事件可追踪、带 Trace、租户、Scope、签名

2️⃣ 架构总览

+-------------------------------------------------------------------+
|                        PowerX Message Fabric                      |
|-------------------------------------------------------------------|
|  Publisher Modules                                                |
|   ├── Router / Orchestrator (state events)                        |
|   ├── AgentAdaptor / Transport (token/log events)                 |
|   ├── Security / Audit (grant, auth)                              |
|   ├── Metrics / Runtime Monitor                                   |
|   └── System (alerts, ops)                                        |
|-------------------------------------------------------------------|
|  EventBus Core                                                    |
|   ├── TopicManager   (租户隔离 + 权限)                            |
|   ├── SubscriptionManager (Fan-out / QoS)                         |
|   ├── PersistenceLayer (Stream Storage)                           |
|   ├── Relay / Bridge (to Realtime Gateway)                        |
|   └── MetricsExporter (Prom / OTLP)                               |
|-------------------------------------------------------------------|
|  Consumers                                                        |
|   ├── Realtime Gateway (SSE/WS 推送)                              |
|   ├── Admin Console (Audit / Alert UI)                            |
|   ├── Plugin SDK (事件订阅)                                        |
|   ├── Analytics Pipeline (指标分析)                                |
|   └── External Webhook                                            |
+-------------------------------------------------------------------+

3️⃣ 事件 Envelope 规范

json
{
  "event_id": "evt_94ac",
  "seq": 128,
  "topic": "workflow:run_8f2d",
  "type": "token|state|log|error|audit|metric|alert",
  "tenant_id": "t001",
  "actor_id": "u102",
  "trace_id": "trc_9e8a2f",
  "data": { "text": "生成中..." },
  "meta": {
    "capability": "crm.lead.fetch",
    "agent": "sales_copilot",
    "provider": "crmplus"
  },
  "timestamp": "2025-10-12T16:52:21Z",
  "signature": "sha256:8aef..."
}

字段说明

字段描述
event_id唯一事件 ID(全局有序)
seq序号,用于乱序重排
topic事件主题(见 Topic 命名规范)
type事件类别
trace_id全链路追踪 ID
tenant_id租户隔离标识
actor_id操作人或 Agent ID
meta附加元信息(能力、插件、Provider 等)
signature签名校验字段
timestampISO8601 UTC 时间戳

4️⃣ Topic 命名规范

模式示例说明
workflow:<run_id>workflow:run_88fa工作流运行事件
agent:<agent_id>agent:sales_copilotAgent 事件流
plugin:<plugin_id>plugin:com.powerx.plugin.crmplus插件自定义事件
security:*security.grant_issued授权与审计事件
metrics:*metrics.agent指标上报
system:*system.alert系统广播/警报
tenant:<tid>:*tenant:t001:alert租户自定义主题

每个租户拥有独立命名空间与 ACL。


5️⃣ Channel 类型

类型特点典型实现
Transient (瞬态)仅实时消费,不持久化NATS / PubSub
Persistent (持久)顺序流存储,可回放Redis Stream / Kafka
Bridge (桥接)从 EventBus → Gateway 推送SSE / WS
DeadLetter (死信)存放投递失败消息内置 DLQ

6️⃣ 生产者接口(Publisher API)

go
type EventBus interface {
    Publish(ctx context.Context, event *Event) error
    PublishBatch(ctx context.Context, events []*Event) error
    Flush(ctx context.Context) error
}

发送示例

go
bus.Publish(ctx, &Event{
    Topic: "agent:sales_copilot",
    Type:  "token",
    Data:  map[string]any{"text": "处理中..."},
    TraceID: traceID,
})
  • SDK 自动填充 tenant_idactor_idtimestampsignature

7️⃣ 订阅者接口(Subscriber API)

go
type Subscription struct {
    Topic     string
    Group     string
    Cursor    string
    AutoAck   bool
    Callback  func(*Event) error
}

订阅示例

go
bus.Subscribe(Subscription{
    Topic: "workflow:run_8f2d",
    Group: "orchestrator",
    Callback: func(evt *Event) error {
        fmt.Println(evt.Type, evt.Data)
        return nil
    },
})

Group 表示消费者组,用于水平扩展。


8️⃣ QoS 与投递策略

策略说明
At-most-once默认;快速无重发
At-least-once需 Ack;确保送达
Exactly-onceKafka 模式;幂等 + 事务提交
Backpressure慢消费自动降采样或丢弃低优事件
Batch Flush聚合发送以提升吞吐
PriorityQueuetype 优先级:token > state > log

9️⃣ 权限与安全机制

安全层策略
认证JWT 校验(tenant, actor, scope)
授权仅允许访问本租户 topic;跨租户禁止
签名验证对关键事件(audit, security)做 HMAC 校验
加密传输使用 TLS(NATS+TLS / HTTPS)
防篡改签名 + hash 链机制(可选)

🔟 事件生命周期

[Producer]
   │  publish()

[EventBus Core]
   │  validate + sign

[Broker]  (NATS / Kafka)

   ├─► Persistent Store

   └─► Realtime Gateway (Bridge)


       [Consumers]
        ├─ Agent Console
        ├─ Frontend SSE
        └─ Admin Dashboard

11️⃣ Metrics 指标

指标标签含义
eventbus_publish_totaltopic,type发布次数
eventbus_delivery_latency_mstopic投递延迟
eventbus_dropped_totaltopic,reason丢弃消息数
eventbus_backpressure_activetenant背压激活标志
eventbus_storage_size_bytestopic持久化存储大小
eventbus_subscribers_activetopic活跃订阅者数

12️⃣ 可观测性与追踪

  • 每条事件自动注入:

    • trace_id
    • span_id
    • tenant_id
    • origin_module
  • 可通过 /api/v1/admin/trace/{trace_id} 回溯事件路径。

事件流示意:

AgentAdaptor → EventBus(topic=agent:a1)


 Orchestrator → topic=workflow:run_8f2d


 RealtimeGateway → topic=tenant:t001:ui

13️⃣ 故障与恢复机制

场景处理策略
Broker 掉线自动重连 + 本地缓存重放
消费者组失联转交给备用组
消息重复幂等判定 (event_id)
超时未 Ack重投递,计数器限次
存储积压触发 backpressure 降采样
无订阅者延迟删除或写入 DLQ

14️⃣ 推荐实现(默认配置)

功能实现方案
BrokerNATS JetStream(轻量快速)
PersistenceRedis Stream(内嵌)
Bridge内置 Gateway Bridge(SSE/WS)
DLQRedis List
MonitoringPrometheus + Tempo
SDKpkg/corex/integration/eventbus

15️⃣ 开发与调试接口

MethodPath功能
GET/api/v1/admin/eventbus/topics查询所有活跃 Topic
GET/api/v1/admin/eventbus/stats查看事件流量统计
POST/api/v1/admin/eventbus/publish手动发布事件(测试)
GET/api/v1/admin/eventbus/subscribe?topic=xxxSSE 模拟订阅
DELETE/api/v1/admin/eventbus/topics/{id}删除持久化 Topic

16️⃣ 与其他模块关系

模块交互说明
Workflow / Orchestrator发布 step state、token、done 事件
AgentAdaptor输出 token/log/state
Security Layer发布授权 / 审计 / 告警事件
Metrics Collector周期性发布指标快照
Realtime Gateway从 EventBus 订阅后向前端推送
Admin API通过 /eventbus/* 查询状态
Plugin SDK提供轻量级发布与订阅接口

17️⃣ 性能指标目标

指标目标值
单节点吞吐≥ 100,000 msg/s
平均延迟≤ 50 ms
最大订阅者≥ 10,000
可靠性≥ 99.99% 投递成功率
恢复时间≤ 5s 重连恢复

✅ 一句话总结

EventBus & Message Fabric = PowerX 的实时神经网络。 所有智能体、插件、工作流、治理与安全事件都通过统一总线流动, 形成一张低延迟、可追踪、可治理、可扩展的“事件织网”, 支撑 PowerX 的多智能体协作与企业级可观测体系。

基于 Apache 2.0 许可发布