Logo
热心市民王先生

02 - ACP 消息传递机制

深入解析 ACP 协议的消息格式、通信模式、可靠性保证机制及其实现细节

1. 消息传递架构概览

1.1 通信模型

ACP 采用请求-响应模型为基础,同时支持流式传输异步通信。这种设计使协议能够适应从简单查询到复杂多步骤任务的广泛场景[1]。

flowchart TB
    subgraph "同步通信模式"
        C1[Client] -->|Request| S1[Server]
        S1 -->|Response| C1
    end
    
    subgraph "异步通信模式"
        C2[Client] -->|Async Request| S2[Server]
        S2 -->|Ack| C2
        S2 -.->|Callback/Webhook| C2
    end
    
    subgraph "流式通信模式"
        C3[Client] -->|Stream Request| S3[Server]
        S3 -->|Chunk 1| C3
        S3 -->|Chunk 2| C3
        S3 -->|Chunk N| C3
        S3 -->|End| C3
    end

1.2 核心通信原语

原语类型说明适用场景
run_sync同步阻塞式等待 Agent 完成快速查询、简单任务
run_async异步非阻塞,通过回调获取结果长时间任务、批处理
run_stream流式实时返回增量结果聊天、生成式任务

2. 消息格式与序列化

2.1 消息结构详解

ACP 消息采用分层结构,由 Message 和 MessagePart 两级组成[2]:

classDiagram
    class Message {
        +string role
        +MessagePart[] parts
    }
    
    class MessagePart {
        +string name
        +string content_type
        +string content
        +string content_encoding
        +string content_url
        +Metadata metadata
    }
    
    class Metadata {
        +TrajectoryMetadata trajectory
        +CitationMetadata citation
    }
    
    Message *-- MessagePart
    MessagePart *-- Metadata

2.2 MessagePart 内容类型

ACP 支持任意 MIME 类型,以下是常见类型映射[3]:

MIME 类型用途示例内容
text/plain纯文本用户查询、Agent 回复
text/markdown富文本格式化文档
application/json结构化数据API 响应、配置
text/x-python代码Python 代码片段
image/png图像截图、图表
image/jpeg照片用户上传图片
audio/wav音频语音输入
application/pdf文档PDF 附件

2.3 多模态消息示例

{
  "role": "user",
  "parts": [
    {
      "content_type": "text/plain",
      "content": "分析这张图片中的数据趋势",
      "content_encoding": "plain"
    },
    {
      "name": "chart.png",
      "content_type": "image/png",
      "content": "iVBORw0KGgoAAAANSUhEUgAA...",
      "content_encoding": "base64"
    },
    {
      "content_type": "application/json",
      "content": "{\"time_range\": \"2024-Q1\", \"metric\": \"revenue\"}",
      "content_encoding": "plain"
    }
  ]
}

2.4 序列化机制

ACP 使用 JSON 作为主要序列化格式,具有以下特点:

特性实现优势
文本友好UTF-8 编码易于调试和日志记录
语言无关标准 JSON跨语言互操作
可扩展支持自定义字段向后兼容
二进制支持Base64 编码支持任意二进制数据

3. 通信模式详解

3.1 同步通信

同步模式是最简单的通信方式,客户端发送请求并阻塞等待响应[4]:

sequenceDiagram
    participant C as Client
    participant S as ACP Server
    participant A as Agent
    
    C->>S: POST /runs (sync)
    S->>A: Forward Request
    A->>A: Process Task
    A->>S: Return Result
    S->>C: HTTP Response (200 OK)

Python SDK 示例

from acp_sdk.client import Client
from acp_sdk.models import Message, MessagePart

async def sync_example():
    async with Client(base_url="http://localhost:8000") as client:
        run = await client.run_sync(
            agent="echo",
            input=[
                Message(
                    role="user",
                    parts=[MessagePart(content="Hello!", content_type="text/plain")]
                )
            ],
        )
        print(run.output)

3.2 异步通信

异步模式适合长时间运行的任务,客户端可立即获得任务 ID,稍后查询结果[5]:

sequenceDiagram
    participant C as Client
    participant S as ACP Server
    participant A as Agent
    participant DB as Task Store
    
    C->>S: POST /runs (async)
    S->>DB: Create Task (status: pending)
    S->>C: Return Run ID
    
    par Background Processing
        S->>A: Forward Request
        A->>A: Process Task
        A->>S: Progress Updates
        S->>DB: Update Status
    and Client Polling
        loop Poll for Result
            C->>S: GET /runs/{id}
            S->>DB: Query Status
            S->>C: Current Status
        end
    end
    
    S->>C: Final Result (status: completed)

适用场景

  • 批量数据处理(>30 秒)
  • 复杂多步骤工作流
  • 资源密集型计算
  • 需要人工介入的任务

3.3 流式通信

流式模式提供增量更新,适合实时交互场景[6]:

sequenceDiagram
    participant C as Client
    participant S as ACP Server
    participant A as Agent
    
    C->>S: POST /runs (streaming)
    S->>C: SSE Connection Established
    
    A->>S: Token 1
    S->>C: data: {"chunk": "Hello"}
    
    A->>S: Token 2
    S->>C: data: {"chunk": " World"}
    
    A->>S: Token N
    S->>C: data: {"chunk": "!"}
    
    S->>C: data: {"status": "completed"}
    S->>C: Connection Closed

实现方式:ACP 使用 Server-Sent Events (SSE) 实现流式传输:

async def stream_example():
    async with Client(base_url="http://localhost:8000") as client:
        async for event in client.run_stream(
            agent="chatbot",
            input=[Message(role="user", parts=[...])]
        ):
            if event.type == "chunk":
                print(event.data.content, end="")
            elif event.type == "status":
                print(f"Status: {event.data.status}")

3.4 Await 机制:交互式通信

ACP 支持交互式通信,Agent 可以暂停执行并等待客户端输入[7]:

sequenceDiagram
    participant C as Client
    participant A as Agent
    
    C->>A: 初始请求
    A->>A: 处理中...
    A->>C: await_request (需要确认)
    
    Note over C,A: Agent 暂停执行,等待响应
    
    C->>A: 提供确认信息
    A->>A: 继续执行
    A->>C: 最终结果

应用场景

  • 需要用户确认的操作(“是否删除文件?”)
  • 多轮对话中的信息收集
  • 工作流中的审批节点

4. 可靠性保证机制

4.1 状态管理

ACP 支持**有状态(Stateful)无状态(Stateless)**两种模式[8]:

模式特点适用场景
无状态每次请求独立,无上下文简单查询、一次性任务
有状态维护会话历史和上下文多轮对话、复杂工作流

有状态实现

flowchart LR
    subgraph "Session"
        S1[Message 1] --> S2[Message 2] --> S3[Message 3]
    end
    
    Session --> Agent
    Agent --> Session
    
    DB[(Session Store)] -.-> Session

4.2 错误处理与重试

ACP 定义了标准的错误响应格式:

{
  "run_id": "uuid",
  "status": "failed",
  "error": {
    "code": "AGENT_ERROR",
    "message": "Agent execution failed",
    "details": {
      "agent_name": "echo",
      "error_type": "TimeoutError"
    }
  }
}

常见错误码

错误码HTTP 状态说明处理建议
INVALID_INPUT400输入格式错误检查消息结构
AGENT_NOT_FOUND404Agent 不存在验证 Agent 名称
AGENT_ERROR500Agent 执行错误查看错误详情
TIMEOUT504执行超时使用异步模式

4.3 分布式会话

ACP 支持跨服务器实例的会话连续性,使用基于 URI 的资源共享[9]:

flowchart TB
    subgraph "负载均衡器"
        LB[Nginx/ALB]
    end
    
    subgraph "ACP Server Cluster"
        S1[Server 1]
        S2[Server 2]
        S3[Server 3]
    end
    
    subgraph "共享存储"
        Redis[(Redis)]
        Postgres[(PostgreSQL)]
    end
    
    LB --> S1
    LB --> S2
    LB --> S3
    
    S1 <-->|Session Data| Redis
    S2 <-->|Session Data| Redis
    S3 <-->|Session Data| Redis

高可用配置

from acp_sdk.server import Server
from acp_sdk.storage import RedisStorage

server = Server(
    storage=RedisStorage(
        host="redis.example.com",
        port=6379,
        db=0
    )
)

5. 消息元数据扩展

5.1 Trajectory Metadata

ACP 支持轨迹元数据,用于跟踪多步推理和工具调用[10]:

{
  "parts": [
    {
      "content_type": "text/plain",
      "content": "计算结果: 42",
      "metadata": {
        "trajectory": {
          "steps": [
            {"tool": "calculator", "input": "20+22", "output": "42"}
          ],
          "reasoning": "使用计算器工具进行加法运算"
        }
      }
    }
  ]
}

5.2 Citation Metadata

引用元数据支持来源追踪和归属[11]:

{
  "metadata": {
    "citations": [
      {
        "source": "https://example.com/doc1",
        "title": "技术文档",
        "relevance_score": 0.95
      }
    ]
  }
}

6. 性能特征分析

6.1 延迟分析

xychart-beta
    title "不同通信模式的延迟对比"
    x-axis ["同步", "异步", "流式"]
    y-axis "延迟 (ms)"
    bar [50, 10, 30]
    line [50, 10, 30]
模式首字节延迟总延迟带宽占用
同步中等较高
异步极低可变
流式极低渐进中等

6.2 吞吐量优化

最佳实践

  1. 批量处理:合并多个小请求为单个批量请求
  2. 连接池复用:保持长连接减少握手开销
  3. 压缩传输:启用 gzip 压缩大消息体
  4. 流式分块:大响应使用流式传输避免内存压力

7. 安全考虑

7.1 传输安全

机制实现推荐级别
TLS 1.3强制 HTTPS必需
mTLS双向证书认证高安全场景
API Key请求头认证基础保护
OAuth 2.0令牌认证企业场景

7.2 消息完整性

sequenceDiagram
    participant C as Client
    participant S as Server
    
    C->>C: 计算消息签名
    C->>S: 请求 + Signature Header
    S->>S: 验证签名
    alt 签名有效
        S->>C: 正常响应
    else 签名无效
        S->>C: 401 Unauthorized
    end

参考来源

  1. ACP 官方文档 - Communication Patterns
  2. ACP SDK Python 源码 - models.py
  3. ACP 规范文档 - MIME Type Support
  4. ACP 官方文档 - Synchronous Communication
  5. ACP 官方文档 - Asynchronous Communication
  6. ACP 官方文档 - Streaming
  7. ACP 官方文档 - Agent Run Await
  8. ACP 官方文档 - Stateful Agents
  9. ACP 官方文档 - Distributed Sessions
  10. ACP 官方文档 - Trajectory Metadata
  11. ACP 官方文档 - Citation Metadata

本文档详细分析了 ACP 的消息传递机制。这些机制已被整合进 A2A 协议,建议开发者参考 A2A 最新规范。