02 - ACP 消息传递机制
深入解析 ACP 协议的消息格式、通信模式、可靠性保证机制及其实现细节
1. 消息传递架构概览
1.1 通信模型
ACP 采用请求-响应模型为基础,同时支持流式传输和异步通信。这种设计使协议能够适应从简单查询到复杂多步骤任务的广泛场景[1]。
flowchart TB
subgraph "同步通信模式"
C1[Client] -->|Request| S1[Server]
S1 -->|Response| C1
end
subgraph "异步通信模式"
C2[Client] -->|Async Request| S2[Server]
S2 -->|Ack| C2
S2 -.->|Callback/Webhook| C2
end
subgraph "流式通信模式"
C3[Client] -->|Stream Request| S3[Server]
S3 -->|Chunk 1| C3
S3 -->|Chunk 2| C3
S3 -->|Chunk N| C3
S3 -->|End| C3
end
1.2 核心通信原语
| 原语 | 类型 | 说明 | 适用场景 |
|---|---|---|---|
run_sync | 同步 | 阻塞式等待 Agent 完成 | 快速查询、简单任务 |
run_async | 异步 | 非阻塞,通过回调获取结果 | 长时间任务、批处理 |
run_stream | 流式 | 实时返回增量结果 | 聊天、生成式任务 |
2. 消息格式与序列化
2.1 消息结构详解
ACP 消息采用分层结构,由 Message 和 MessagePart 两级组成[2]:
classDiagram
class Message {
+string role
+MessagePart[] parts
}
class MessagePart {
+string name
+string content_type
+string content
+string content_encoding
+string content_url
+Metadata metadata
}
class Metadata {
+TrajectoryMetadata trajectory
+CitationMetadata citation
}
Message *-- MessagePart
MessagePart *-- Metadata
2.2 MessagePart 内容类型
ACP 支持任意 MIME 类型,以下是常见类型映射[3]:
| MIME 类型 | 用途 | 示例内容 |
|---|---|---|
text/plain | 纯文本 | 用户查询、Agent 回复 |
text/markdown | 富文本 | 格式化文档 |
application/json | 结构化数据 | API 响应、配置 |
text/x-python | 代码 | Python 代码片段 |
image/png | 图像 | 截图、图表 |
image/jpeg | 照片 | 用户上传图片 |
audio/wav | 音频 | 语音输入 |
application/pdf | 文档 | PDF 附件 |
2.3 多模态消息示例
{
"role": "user",
"parts": [
{
"content_type": "text/plain",
"content": "分析这张图片中的数据趋势",
"content_encoding": "plain"
},
{
"name": "chart.png",
"content_type": "image/png",
"content": "iVBORw0KGgoAAAANSUhEUgAA...",
"content_encoding": "base64"
},
{
"content_type": "application/json",
"content": "{\"time_range\": \"2024-Q1\", \"metric\": \"revenue\"}",
"content_encoding": "plain"
}
]
}
2.4 序列化机制
ACP 使用 JSON 作为主要序列化格式,具有以下特点:
| 特性 | 实现 | 优势 |
|---|---|---|
| 文本友好 | UTF-8 编码 | 易于调试和日志记录 |
| 语言无关 | 标准 JSON | 跨语言互操作 |
| 可扩展 | 支持自定义字段 | 向后兼容 |
| 二进制支持 | Base64 编码 | 支持任意二进制数据 |
3. 通信模式详解
3.1 同步通信
同步模式是最简单的通信方式,客户端发送请求并阻塞等待响应[4]:
sequenceDiagram
participant C as Client
participant S as ACP Server
participant A as Agent
C->>S: POST /runs (sync)
S->>A: Forward Request
A->>A: Process Task
A->>S: Return Result
S->>C: HTTP Response (200 OK)
Python SDK 示例:
from acp_sdk.client import Client
from acp_sdk.models import Message, MessagePart
async def sync_example():
async with Client(base_url="http://localhost:8000") as client:
run = await client.run_sync(
agent="echo",
input=[
Message(
role="user",
parts=[MessagePart(content="Hello!", content_type="text/plain")]
)
],
)
print(run.output)
3.2 异步通信
异步模式适合长时间运行的任务,客户端可立即获得任务 ID,稍后查询结果[5]:
sequenceDiagram
participant C as Client
participant S as ACP Server
participant A as Agent
participant DB as Task Store
C->>S: POST /runs (async)
S->>DB: Create Task (status: pending)
S->>C: Return Run ID
par Background Processing
S->>A: Forward Request
A->>A: Process Task
A->>S: Progress Updates
S->>DB: Update Status
and Client Polling
loop Poll for Result
C->>S: GET /runs/{id}
S->>DB: Query Status
S->>C: Current Status
end
end
S->>C: Final Result (status: completed)
适用场景:
- 批量数据处理(>30 秒)
- 复杂多步骤工作流
- 资源密集型计算
- 需要人工介入的任务
3.3 流式通信
流式模式提供增量更新,适合实时交互场景[6]:
sequenceDiagram
participant C as Client
participant S as ACP Server
participant A as Agent
C->>S: POST /runs (streaming)
S->>C: SSE Connection Established
A->>S: Token 1
S->>C: data: {"chunk": "Hello"}
A->>S: Token 2
S->>C: data: {"chunk": " World"}
A->>S: Token N
S->>C: data: {"chunk": "!"}
S->>C: data: {"status": "completed"}
S->>C: Connection Closed
实现方式:ACP 使用 Server-Sent Events (SSE) 实现流式传输:
async def stream_example():
async with Client(base_url="http://localhost:8000") as client:
async for event in client.run_stream(
agent="chatbot",
input=[Message(role="user", parts=[...])]
):
if event.type == "chunk":
print(event.data.content, end="")
elif event.type == "status":
print(f"Status: {event.data.status}")
3.4 Await 机制:交互式通信
ACP 支持交互式通信,Agent 可以暂停执行并等待客户端输入[7]:
sequenceDiagram
participant C as Client
participant A as Agent
C->>A: 初始请求
A->>A: 处理中...
A->>C: await_request (需要确认)
Note over C,A: Agent 暂停执行,等待响应
C->>A: 提供确认信息
A->>A: 继续执行
A->>C: 最终结果
应用场景:
- 需要用户确认的操作(“是否删除文件?”)
- 多轮对话中的信息收集
- 工作流中的审批节点
4. 可靠性保证机制
4.1 状态管理
ACP 支持**有状态(Stateful)和无状态(Stateless)**两种模式[8]:
| 模式 | 特点 | 适用场景 |
|---|---|---|
| 无状态 | 每次请求独立,无上下文 | 简单查询、一次性任务 |
| 有状态 | 维护会话历史和上下文 | 多轮对话、复杂工作流 |
有状态实现:
flowchart LR
subgraph "Session"
S1[Message 1] --> S2[Message 2] --> S3[Message 3]
end
Session --> Agent
Agent --> Session
DB[(Session Store)] -.-> Session
4.2 错误处理与重试
ACP 定义了标准的错误响应格式:
{
"run_id": "uuid",
"status": "failed",
"error": {
"code": "AGENT_ERROR",
"message": "Agent execution failed",
"details": {
"agent_name": "echo",
"error_type": "TimeoutError"
}
}
}
常见错误码:
| 错误码 | HTTP 状态 | 说明 | 处理建议 |
|---|---|---|---|
INVALID_INPUT | 400 | 输入格式错误 | 检查消息结构 |
AGENT_NOT_FOUND | 404 | Agent 不存在 | 验证 Agent 名称 |
AGENT_ERROR | 500 | Agent 执行错误 | 查看错误详情 |
TIMEOUT | 504 | 执行超时 | 使用异步模式 |
4.3 分布式会话
ACP 支持跨服务器实例的会话连续性,使用基于 URI 的资源共享[9]:
flowchart TB
subgraph "负载均衡器"
LB[Nginx/ALB]
end
subgraph "ACP Server Cluster"
S1[Server 1]
S2[Server 2]
S3[Server 3]
end
subgraph "共享存储"
Redis[(Redis)]
Postgres[(PostgreSQL)]
end
LB --> S1
LB --> S2
LB --> S3
S1 <-->|Session Data| Redis
S2 <-->|Session Data| Redis
S3 <-->|Session Data| Redis
高可用配置:
from acp_sdk.server import Server
from acp_sdk.storage import RedisStorage
server = Server(
storage=RedisStorage(
host="redis.example.com",
port=6379,
db=0
)
)
5. 消息元数据扩展
5.1 Trajectory Metadata
ACP 支持轨迹元数据,用于跟踪多步推理和工具调用[10]:
{
"parts": [
{
"content_type": "text/plain",
"content": "计算结果: 42",
"metadata": {
"trajectory": {
"steps": [
{"tool": "calculator", "input": "20+22", "output": "42"}
],
"reasoning": "使用计算器工具进行加法运算"
}
}
}
]
}
5.2 Citation Metadata
引用元数据支持来源追踪和归属[11]:
{
"metadata": {
"citations": [
{
"source": "https://example.com/doc1",
"title": "技术文档",
"relevance_score": 0.95
}
]
}
}
6. 性能特征分析
6.1 延迟分析
xychart-beta
title "不同通信模式的延迟对比"
x-axis ["同步", "异步", "流式"]
y-axis "延迟 (ms)"
bar [50, 10, 30]
line [50, 10, 30]
| 模式 | 首字节延迟 | 总延迟 | 带宽占用 |
|---|---|---|---|
| 同步 | 中等 | 较高 | 低 |
| 异步 | 极低 | 可变 | 低 |
| 流式 | 极低 | 渐进 | 中等 |
6.2 吞吐量优化
最佳实践:
- 批量处理:合并多个小请求为单个批量请求
- 连接池复用:保持长连接减少握手开销
- 压缩传输:启用 gzip 压缩大消息体
- 流式分块:大响应使用流式传输避免内存压力
7. 安全考虑
7.1 传输安全
| 机制 | 实现 | 推荐级别 |
|---|---|---|
| TLS 1.3 | 强制 HTTPS | 必需 |
| mTLS | 双向证书认证 | 高安全场景 |
| API Key | 请求头认证 | 基础保护 |
| OAuth 2.0 | 令牌认证 | 企业场景 |
7.2 消息完整性
sequenceDiagram
participant C as Client
participant S as Server
C->>C: 计算消息签名
C->>S: 请求 + Signature Header
S->>S: 验证签名
alt 签名有效
S->>C: 正常响应
else 签名无效
S->>C: 401 Unauthorized
end
参考来源
- ACP 官方文档 - Communication Patterns
- ACP SDK Python 源码 - models.py
- ACP 规范文档 - MIME Type Support
- ACP 官方文档 - Synchronous Communication
- ACP 官方文档 - Asynchronous Communication
- ACP 官方文档 - Streaming
- ACP 官方文档 - Agent Run Await
- ACP 官方文档 - Stateful Agents
- ACP 官方文档 - Distributed Sessions
- ACP 官方文档 - Trajectory Metadata
- ACP 官方文档 - Citation Metadata
本文档详细分析了 ACP 的消息传递机制。这些机制已被整合进 A2A 协议,建议开发者参考 A2A 最新规范。