04-实现细节与设计模式
AI Agent 代码实现 设计模式
Suna平台的核心代码实现、设计模式、关键配置及最佳实践
4.1 核心设计模式分析
4.1.1 Agent循环模式(ReAct Pattern)
Suna的核心Agent实现采用ReAct(Reasoning + Acting)模式,这是当前LLM Agent领域最成熟的架构模式之一。
模式结构
┌─────────────────────────────────────────────────────────┐
│ ReAct Loop │
├─────────────────────────────────────────────────────────┤
│ 1. Thought: LLM分析当前状态,形成推理 │
│ ↓ │
│ 2. Action: 根据推理结果,决定执行的动作 │
│ ↓ │
│ 3. Observation: 执行动作,获取观察结果 │
│ ↓ │
│ 4. 循环直到任务完成或达到最大步数 │
└─────────────────────────────────────────────────────────┘
代码实现示例(概念性)
# backend/core/agent_loop.py(基于Suna架构的概念实现)
from typing import List, Optional
from dataclasses import dataclass
from enum import Enum
class StepType(Enum):
THOUGHT = "thought"
ACTION = "action"
OBSERVATION = "observation"
FINAL_ANSWER = "final_answer"
@dataclass
class AgentStep:
step_type: StepType
content: str
metadata: Optional[dict] = None
class ReActAgent:
def __init__(self, llm_client, tool_registry, max_steps=10):
self.llm = llm_client
self.tools = tool_registry
self.max_steps = max_steps
self.history: List[AgentStep] = []
async def run(self, task: str) -> str:
"""执行ReAct循环直到任务完成"""
# 初始思考
current_step = 0
while current_step < self.max_steps:
# 1. Thought: LLM分析当前状态
thought = await self._think(task)
self.history.append(AgentStep(StepType.THOUGHT, thought))
# 2. Action: 决定下一步动作
action_plan = await self._plan_action(thought)
if action_plan.is_complete:
# 任务完成,返回最终答案
final_answer = action_plan.content
self.history.append(AgentStep(StepType.FINAL_ANSWER, final_answer))
return final_answer
self.history.append(AgentStep(StepType.ACTION, str(action_plan.tool_call)))
# 3. Observation: 执行工具调用
observation = await self._execute_action(action_plan.tool_call)
self.history.append(AgentStep(StepType.OBSERVATION, observation))
current_step += 1
# 达到最大步数限制
return f"Task exceeded maximum steps ({self.max_steps}). Last thought: {thought}"
async def _think(self, task: str) -> str:
"""LLM推理当前状态"""
prompt = self._build_thought_prompt(task, self.history)
return await self.llm.generate(prompt)
async def _plan_action(self, thought: str) -> ActionPlan:
"""基于思考结果规划动作"""
prompt = self._build_action_prompt(thought, self.tools.get_available_tools())
response = await self.llm.generate(prompt)
return self._parse_action_response(response)
async def _execute_action(self, tool_call: ToolCall) -> str:
"""执行工具调用"""
tool = self.tools.get_tool(tool_call.name)
return await tool.execute(**tool_call.parameters)
设计亮点
- 可解释性:每个步骤的Thought都被记录,便于调试和审计
- 容错性:通过Observation反馈,Agent可以从错误中恢复
- 可控性:max_steps限制防止无限循环
4.1.2 工具注册模式(Plugin Registry)
Suna的工具系统采用注册表模式,实现工具的热插拔。
工具基类定义
# backend/tools/base.py
from abc import ABC, abstractmethod
from typing import Any, Dict, Type
from pydantic import BaseModel
class ToolParameters(BaseModel):
"""工具参数基类,使用Pydantic进行验证"""
pass
class ToolResult(BaseModel):
"""工具执行结果"""
success: bool
data: Any
error: str = ""
execution_time_ms: int = 0
class BaseTool(ABC):
"""工具基类"""
name: str
description: str
parameters_schema: Type[ToolParameters]
@abstractmethod
async def execute(self, params: ToolParameters) -> ToolResult:
"""执行工具逻辑"""
pass
def get_schema(self) -> Dict:
"""获取工具的OpenAI Function Calling格式schema"""
return {
"type": "function",
"function": {
"name": self.name,
"description": self.description,
"parameters": self.parameters_schema.schema()
}
}
浏览器工具示例
# backend/tools/browser.py
from playwright.async_api import async_playwright
from .base import BaseTool, ToolParameters, ToolResult
class NavigateParams(ToolParameters):
url: str
wait_for: str = "networkidle"
class BrowserNavigateTool(BaseTool):
"""浏览器导航工具"""
name = "browser_navigate"
description = "Navigate to a specific URL and wait for page load"
parameters_schema = NavigateParams
def __init__(self, browser_pool):
self.browser_pool = browser_pool
async def execute(self, params: NavigateParams) -> ToolResult:
try:
page = await self.browser_pool.get_page()
start_time = time.time()
await page.goto(params.url, wait_until=params.wait_for)
execution_time = int((time.time() - start_time) * 1000)
# 获取页面信息
title = await page.title()
url = page.url
return ToolResult(
success=True,
data={
"title": title,
"url": url,
"loaded": True
},
execution_time_ms=execution_time
)
except Exception as e:
return ToolResult(
success=False,
data=None,
error=str(e)
)
工具注册表实现
# backend/tools/registry.py
from typing import Dict, Type, List
from .base import BaseTool
class ToolRegistry:
"""工具注册表 - 单例模式"""
_instance = None
_tools: Dict[str, BaseTool] = {}
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def register(self, tool: BaseTool) -> None:
"""注册工具"""
if tool.name in self._tools:
raise ValueError(f"Tool '{tool.name}' already registered")
self._tools[tool.name] = tool
def get_tool(self, name: str) -> BaseTool:
"""获取工具实例"""
if name not in self._tools:
raise ToolNotFoundError(f"Tool '{name}' not found")
return self._tools[name]
def list_tools(self) -> List[str]:
"""列出所有已注册工具"""
return list(self._tools.keys())
def get_openai_schemas(self) -> List[Dict]:
"""获取所有工具的OpenAI Function Calling schema"""
return [tool.get_schema() for tool in self._tools.values()]
# 初始化时注册所有内置工具
def init_default_tools(registry: ToolRegistry):
"""初始化默认工具集"""
registry.register(BrowserNavigateTool(browser_pool))
registry.register(BrowserClickTool(browser_pool))
registry.register(BrowserExtractTool(browser_pool))
registry.register(FileReadTool())
registry.register(FileWriteTool())
registry.register(WebSearchTool())
registry.register(ExecuteCommandTool())
4.1.3 事件驱动架构(Event-Driven)
Suna采用事件驱动架构实现组件间的解耦通信。
事件总线实现
# backend/core/events.py
from typing import Callable, Dict, List, Any
from dataclasses import dataclass
from datetime import datetime
import asyncio
@dataclass
class Event:
type: str
payload: Dict[str, Any]
timestamp: datetime
source: str
class EventBus:
"""异步事件总线"""
def __init__(self):
self._handlers: Dict[str, List[Callable]] = {}
self._event_queue: asyncio.Queue = asyncio.Queue()
self._running = False
def subscribe(self, event_type: str, handler: Callable):
"""订阅事件"""
if event_type not in self._handlers:
self._handlers[event_type] = []
self._handlers[event_type].append(handler)
async def publish(self, event: Event):
"""发布事件到队列"""
await self._event_queue.put(event)
async def start(self):
"""启动事件循环"""
self._running = True
while self._running:
try:
event = await asyncio.wait_for(
self._event_queue.get(),
timeout=1.0
)
await self._process_event(event)
except asyncio.TimeoutError:
continue
async def _process_event(self, event: Event):
"""处理事件"""
handlers = self._handlers.get(event.type, [])
# 并行执行所有处理器
await asyncio.gather(
*[handler(event) for handler in handlers],
return_exceptions=True
)
# 使用示例
class AgentExecutionListener:
"""Agent执行事件监听器"""
def __init__(self, event_bus: EventBus, supabase_client):
self.event_bus = event_bus
self.db = supabase_client
self._register_handlers()
def _register_handlers(self):
self.event_bus.subscribe("agent.step.completed", self._on_step_completed)
self.event_bus.subscribe("agent.tool.called", self._on_tool_called)
self.event_bus.subscribe("agent.completed", self._on_agent_completed)
async def _on_step_completed(self, event: Event):
"""步骤完成时保存到数据库"""
await self.db.table("execution_logs").insert({
"thread_id": event.payload["thread_id"],
"step_number": event.payload["step_number"],
"thought": event.payload["thought"],
"timestamp": event.timestamp.isoformat()
}).execute()
async def _on_tool_called(self, event: Event):
"""工具调用时记录"""
# 可用于计费、审计、性能监控
pass
async def _on_agent_completed(self, event: Event):
"""Agent完成时发送通知"""
# 发送WebSocket通知到前端
pass
4.2 关键配置与部署
4.2.1 Docker Compose配置
# docker-compose.yaml(基于Suna架构的示例配置)
version: '3.8'
services:
# 前端服务
frontend:
build:
context: ./apps/frontend
dockerfile: Dockerfile
ports:
- "3000:3000"
environment:
- NEXT_PUBLIC_API_URL=http://localhost:8000
- NEXT_PUBLIC_SUPABASE_URL=${SUPABASE_URL}
- NEXT_PUBLIC_SUPABASE_ANON_KEY=${SUPABASE_ANON_KEY}
depends_on:
- backend
networks:
- suna-network
# 后端API服务
backend:
build:
context: ./backend
dockerfile: Dockerfile
ports:
- "8000:8000"
environment:
# 数据库配置
- DATABASE_URL=${DATABASE_URL}
- SUPABASE_URL=${SUPABASE_URL}
- SUPABASE_SERVICE_KEY=${SUPABASE_SERVICE_KEY}
# LLM配置
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
- OPENAI_API_KEY=${OPENAI_API_KEY}
- GROQ_API_KEY=${GROQ_API_KEY}
# 其他API
- TAVILY_API_KEY=${TAVILY_API_KEY}
- FIRECRAWL_API_KEY=${FIRECRAWL_API_KEY}
# Agent运行时配置
- AGENT_RUNTIME_TIMEOUT=300
- MAX_AGENT_STEPS=50
- DEFAULT_LLM_MODEL=claude-3-5-sonnet-20241022
volumes:
- /var/run/docker.sock:/var/run/docker.sock # 用于启动Agent容器
depends_on:
- redis
networks:
- suna-network
# Agent运行时池(预创建容器)
agent-runtime-pool:
build:
context: ./backend/agent-runtime
dockerfile: Dockerfile
environment:
- POOL_SIZE=5
- BROWSER_HEADLESS=true
networks:
- suna-network
deploy:
replicas: 2
resources:
limits:
cpus: '2'
memory: 4G
# Redis缓存
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
networks:
- suna-network
# 可选:向量数据库(用于RAG)
qdrant:
image: qdrant/qdrant:latest
ports:
- "6333:6333"
volumes:
- qdrant_storage:/qdrant/storage
networks:
- suna-network
volumes:
redis_data:
qdrant_storage:
networks:
suna-network:
driver: bridge
4.2.2 环境变量配置
# .env.example
# ===================
# 数据库配置 (Supabase)
# ===================
SUPABASE_URL=https://your-project.supabase.co
SUPABASE_ANON_KEY=your-anon-key
SUPABASE_SERVICE_KEY=your-service-key
DATABASE_URL=postgresql://postgres:password@db.supabase.co:5432/postgres
# ===================
# LLM API密钥
# ===================
# Anthropic (推荐)
ANTHROPIC_API_KEY=sk-ant-your-key
# OpenAI (可选)
OPENAI_API_KEY=sk-your-key
# Groq (高速推理,可选)
GROQ_API_KEY=gsk-your-key
# ===================
# 搜索API (可选但推荐)
# ===================
TAVILY_API_KEY=tvly-your-key
FIRECRAWL_API_KEY=fc-your-key
# ===================
# Agent配置
# ===================
DEFAULT_LLM_PROVIDER=anthropic
DEFAULT_LLM_MODEL=claude-3-5-sonnet-20241022
MAX_AGENT_STEPS=50
AGENT_TIMEOUT_SECONDS=300
# 安全设置
ENABLE_CODE_EXECUTION=true
SANDBOX_MODE=docker # docker | none
ALLOWED_COMMANDS=ls,cat,grep,curl,wget,git,python,node
# ===================
# 高级配置
# ===================
LOG_LEVEL=INFO
ENABLE_DEBUG_MODE=false
WEBSOCKET_MAX_CONNECTIONS=1000
4.3 前端实现亮点
4.3.1 实时消息流组件
// apps/frontend/components/AgentChat.tsx
import React, { useEffect, useRef, useState } from 'react';
import { useWebSocket } from '@/hooks/useWebSocket';
import { MessageBubble } from './MessageBubble';
import { ToolCallCard } from './ToolCallCard';
interface AgentMessage {
id: string;
role: 'user' | 'assistant' | 'system';
content: string;
type: 'text' | 'thought' | 'tool_call' | 'tool_result';
metadata?: {
toolName?: string;
toolParams?: Record<string, any>;
executionTime?: number;
};
timestamp: Date;
}
export const AgentChat: React.FC<{ threadId: string }> = ({ threadId }) => {
const [messages, setMessages] = useState<AgentMessage[]>([]);
const [inputValue, setInputValue] = useState('');
const [isExecuting, setIsExecuting] = useState(false);
const messagesEndRef = useRef<HTMLDivElement>(null);
// WebSocket连接
const { sendMessage, lastMessage, connectionStatus } = useWebSocket(
`ws://localhost:8000/ws/threads/${threadId}`
);
// 处理收到的消息
useEffect(() => {
if (lastMessage) {
const event = JSON.parse(lastMessage.data);
switch (event.type) {
case 'agent.thinking':
// 显示思考过程
setMessages(prev => [...prev, {
id: event.id,
role: 'assistant',
content: event.payload.thought,
type: 'thought',
timestamp: new Date()
}]);
break;
case 'agent.tool_call':
// 显示工具调用
setMessages(prev => [...prev, {
id: event.id,
role: 'system',
content: `Executing ${event.payload.tool_name}...`,
type: 'tool_call',
metadata: {
toolName: event.payload.tool_name,
toolParams: event.payload.parameters
},
timestamp: new Date()
}]);
break;
case 'agent.message':
// 显示最终回复
setMessages(prev => [...prev, {
id: event.id,
role: 'assistant',
content: event.payload.content,
type: 'text',
timestamp: new Date()
}]);
setIsExecuting(false);
break;
case 'agent.error':
// 显示错误
setMessages(prev => [...prev, {
id: event.id,
role: 'system',
content: `Error: ${event.payload.error}`,
type: 'text',
timestamp: new Date()
}]);
setIsExecuting(false);
break;
}
}
}, [lastMessage]);
// 自动滚动到底部
useEffect(() => {
messagesEndRef.current?.scrollIntoView({ behavior: 'smooth' });
}, [messages]);
const handleSend = () => {
if (!inputValue.trim() || isExecuting) return;
// 添加用户消息
setMessages(prev => [...prev, {
id: Date.now().toString(),
role: 'user',
content: inputValue,
type: 'text',
timestamp: new Date()
}]);
setIsExecuting(true);
// 发送执行请求
sendMessage(JSON.stringify({
type: 'execute',
payload: { message: inputValue }
}));
setInputValue('');
};
return (
<div className="flex flex-col h-full">
{/* 消息列表 */}
<div className="flex-1 overflow-y-auto p-4 space-y-4">
{messages.map((msg) => (
<MessageBubble
key={msg.id}
message={msg}
renderCustomContent={() => {
if (msg.type === 'tool_call') {
return (
<ToolCallCard
toolName={msg.metadata?.toolName || ''}
params={msg.metadata?.toolParams || {}}
/>
);
}
if (msg.type === 'thought') {
return (
<div className="text-sm text-gray-500 italic border-l-2 border-blue-300 pl-3">
💭 Thinking: {msg.content}
</div>
);
}
return null;
}}
/>
))}
{isExecuting && (
<div className="flex items-center space-x-2 text-gray-500">
<div className="animate-spin rounded-full h-4 w-4 border-b-2 border-blue-500"></div>
<span>Agent is working...</span>
</div>
)}
<div ref={messagesEndRef} />
</div>
{/* 输入框 */}
<div className="border-t p-4">
<div className="flex space-x-2">
<input
type="text"
value={inputValue}
onChange={(e) => setInputValue(e.target.value)}
onKeyPress={(e) => e.key === 'Enter' && handleSend()}
placeholder="Type your message..."
disabled={isExecuting}
className="flex-1 border rounded-lg px-4 py-2 focus:outline-none focus:ring-2 focus:ring-blue-500"
/>
<button
onClick={handleSend}
disabled={isExecuting || !inputValue.trim()}
className="bg-blue-500 text-white px-6 py-2 rounded-lg disabled:opacity-50"
>
Send
</button>
</div>
<div className="text-xs text-gray-400 mt-2">
Status: {connectionStatus} | Thread: {threadId}
</div>
</div>
</div>
);
};
4.4 最佳实践与优化建议
4.4.1 性能优化
-
LLM调用优化
- 启用流式响应(Streaming)减少首字节时间
- 实现请求批处理减少API调用次数
- 使用本地缓存减少重复查询
-
Browser操作优化
- 使用浏览器连接池复用Browser实例
- 并行执行独立的浏览器操作
- 合理设置页面加载超时
-
Docker运行时优化
- 预创建容器池减少冷启动时间
- 设置合理的资源限制防止资源耗尽
- 使用镜像缓存加速部署
4.4.2 安全最佳实践
-
命令执行安全
# 危险:直接执行用户输入 # os.system(user_input) ❌ # 安全:使用白名单和参数化 ALLOWED_COMMANDS = {'ls', 'cat', 'grep', 'curl'} def safe_execute(command: str, args: List[str]): if command not in ALLOWED_COMMANDS: raise SecurityError(f"Command '{command}' not allowed") return subprocess.run([command] + args, capture_output=True) -
网络隔离
- 使用Docker网络策略限制容器出站连接
- 对敏感操作实施额外的认证
- 记录所有工具调用用于审计
-
数据保护
- 敏感配置使用Docker Secrets或环境变量注入
- 用户数据加密存储
- 实施严格的访问控制
4.5 参考资料
- Suna Backend Code - 后端源码结构
- Suna Frontend Code - 前端源码结构
- FastAPI Best Practices - FastAPI最佳实践
- ReAct Pattern Paper - ReAct模式原始论文
- Docker Security Best Practices - Docker安全最佳实践