Logo
热心市民王先生

04-实现细节与设计模式

AI Agent 代码实现 设计模式

Suna平台的核心代码实现、设计模式、关键配置及最佳实践

4.1 核心设计模式分析

4.1.1 Agent循环模式(ReAct Pattern)

Suna的核心Agent实现采用ReAct(Reasoning + Acting)模式,这是当前LLM Agent领域最成熟的架构模式之一。

模式结构

┌─────────────────────────────────────────────────────────┐
│                      ReAct Loop                         │
├─────────────────────────────────────────────────────────┤
│  1. Thought: LLM分析当前状态,形成推理                   │
│           ↓                                             │
│  2. Action: 根据推理结果,决定执行的动作                  │
│           ↓                                             │
│  3. Observation: 执行动作,获取观察结果                   │
│           ↓                                             │
│  4. 循环直到任务完成或达到最大步数                        │
└─────────────────────────────────────────────────────────┘

代码实现示例(概念性)

# backend/core/agent_loop.py(基于Suna架构的概念实现)

from typing import List, Optional
from dataclasses import dataclass
from enum import Enum

class StepType(Enum):
    THOUGHT = "thought"
    ACTION = "action"
    OBSERVATION = "observation"
    FINAL_ANSWER = "final_answer"

@dataclass
class AgentStep:
    step_type: StepType
    content: str
    metadata: Optional[dict] = None

class ReActAgent:
    def __init__(self, llm_client, tool_registry, max_steps=10):
        self.llm = llm_client
        self.tools = tool_registry
        self.max_steps = max_steps
        self.history: List[AgentStep] = []
    
    async def run(self, task: str) -> str:
        """执行ReAct循环直到任务完成"""
        
        # 初始思考
        current_step = 0
        
        while current_step < self.max_steps:
            # 1. Thought: LLM分析当前状态
            thought = await self._think(task)
            self.history.append(AgentStep(StepType.THOUGHT, thought))
            
            # 2. Action: 决定下一步动作
            action_plan = await self._plan_action(thought)
            
            if action_plan.is_complete:
                # 任务完成,返回最终答案
                final_answer = action_plan.content
                self.history.append(AgentStep(StepType.FINAL_ANSWER, final_answer))
                return final_answer
            
            self.history.append(AgentStep(StepType.ACTION, str(action_plan.tool_call)))
            
            # 3. Observation: 执行工具调用
            observation = await self._execute_action(action_plan.tool_call)
            self.history.append(AgentStep(StepType.OBSERVATION, observation))
            
            current_step += 1
        
        # 达到最大步数限制
        return f"Task exceeded maximum steps ({self.max_steps}). Last thought: {thought}"
    
    async def _think(self, task: str) -> str:
        """LLM推理当前状态"""
        prompt = self._build_thought_prompt(task, self.history)
        return await self.llm.generate(prompt)
    
    async def _plan_action(self, thought: str) -> ActionPlan:
        """基于思考结果规划动作"""
        prompt = self._build_action_prompt(thought, self.tools.get_available_tools())
        response = await self.llm.generate(prompt)
        return self._parse_action_response(response)
    
    async def _execute_action(self, tool_call: ToolCall) -> str:
        """执行工具调用"""
        tool = self.tools.get_tool(tool_call.name)
        return await tool.execute(**tool_call.parameters)

设计亮点

  1. 可解释性:每个步骤的Thought都被记录,便于调试和审计
  2. 容错性:通过Observation反馈,Agent可以从错误中恢复
  3. 可控性:max_steps限制防止无限循环

4.1.2 工具注册模式(Plugin Registry)

Suna的工具系统采用注册表模式,实现工具的热插拔。

工具基类定义

# backend/tools/base.py

from abc import ABC, abstractmethod
from typing import Any, Dict, Type
from pydantic import BaseModel

class ToolParameters(BaseModel):
    """工具参数基类,使用Pydantic进行验证"""
    pass

class ToolResult(BaseModel):
    """工具执行结果"""
    success: bool
    data: Any
    error: str = ""
    execution_time_ms: int = 0

class BaseTool(ABC):
    """工具基类"""
    
    name: str
    description: str
    parameters_schema: Type[ToolParameters]
    
    @abstractmethod
    async def execute(self, params: ToolParameters) -> ToolResult:
        """执行工具逻辑"""
        pass
    
    def get_schema(self) -> Dict:
        """获取工具的OpenAI Function Calling格式schema"""
        return {
            "type": "function",
            "function": {
                "name": self.name,
                "description": self.description,
                "parameters": self.parameters_schema.schema()
            }
        }

浏览器工具示例

# backend/tools/browser.py

from playwright.async_api import async_playwright
from .base import BaseTool, ToolParameters, ToolResult

class NavigateParams(ToolParameters):
    url: str
    wait_for: str = "networkidle"

class BrowserNavigateTool(BaseTool):
    """浏览器导航工具"""
    
    name = "browser_navigate"
    description = "Navigate to a specific URL and wait for page load"
    parameters_schema = NavigateParams
    
    def __init__(self, browser_pool):
        self.browser_pool = browser_pool
    
    async def execute(self, params: NavigateParams) -> ToolResult:
        try:
            page = await self.browser_pool.get_page()
            
            start_time = time.time()
            await page.goto(params.url, wait_until=params.wait_for)
            execution_time = int((time.time() - start_time) * 1000)
            
            # 获取页面信息
            title = await page.title()
            url = page.url
            
            return ToolResult(
                success=True,
                data={
                    "title": title,
                    "url": url,
                    "loaded": True
                },
                execution_time_ms=execution_time
            )
        except Exception as e:
            return ToolResult(
                success=False,
                data=None,
                error=str(e)
            )

工具注册表实现

# backend/tools/registry.py

from typing import Dict, Type, List
from .base import BaseTool

class ToolRegistry:
    """工具注册表 - 单例模式"""
    
    _instance = None
    _tools: Dict[str, BaseTool] = {}
    
    def __new__(cls):
        if cls._instance is None:
            cls._instance = super().__new__(cls)
        return cls._instance
    
    def register(self, tool: BaseTool) -> None:
        """注册工具"""
        if tool.name in self._tools:
            raise ValueError(f"Tool '{tool.name}' already registered")
        self._tools[tool.name] = tool
    
    def get_tool(self, name: str) -> BaseTool:
        """获取工具实例"""
        if name not in self._tools:
            raise ToolNotFoundError(f"Tool '{name}' not found")
        return self._tools[name]
    
    def list_tools(self) -> List[str]:
        """列出所有已注册工具"""
        return list(self._tools.keys())
    
    def get_openai_schemas(self) -> List[Dict]:
        """获取所有工具的OpenAI Function Calling schema"""
        return [tool.get_schema() for tool in self._tools.values()]

# 初始化时注册所有内置工具
def init_default_tools(registry: ToolRegistry):
    """初始化默认工具集"""
    registry.register(BrowserNavigateTool(browser_pool))
    registry.register(BrowserClickTool(browser_pool))
    registry.register(BrowserExtractTool(browser_pool))
    registry.register(FileReadTool())
    registry.register(FileWriteTool())
    registry.register(WebSearchTool())
    registry.register(ExecuteCommandTool())

4.1.3 事件驱动架构(Event-Driven)

Suna采用事件驱动架构实现组件间的解耦通信。

事件总线实现

# backend/core/events.py

from typing import Callable, Dict, List, Any
from dataclasses import dataclass
from datetime import datetime
import asyncio

@dataclass
class Event:
    type: str
    payload: Dict[str, Any]
    timestamp: datetime
    source: str

class EventBus:
    """异步事件总线"""
    
    def __init__(self):
        self._handlers: Dict[str, List[Callable]] = {}
        self._event_queue: asyncio.Queue = asyncio.Queue()
        self._running = False
    
    def subscribe(self, event_type: str, handler: Callable):
        """订阅事件"""
        if event_type not in self._handlers:
            self._handlers[event_type] = []
        self._handlers[event_type].append(handler)
    
    async def publish(self, event: Event):
        """发布事件到队列"""
        await self._event_queue.put(event)
    
    async def start(self):
        """启动事件循环"""
        self._running = True
        while self._running:
            try:
                event = await asyncio.wait_for(
                    self._event_queue.get(), 
                    timeout=1.0
                )
                await self._process_event(event)
            except asyncio.TimeoutError:
                continue
    
    async def _process_event(self, event: Event):
        """处理事件"""
        handlers = self._handlers.get(event.type, [])
        
        # 并行执行所有处理器
        await asyncio.gather(
            *[handler(event) for handler in handlers],
            return_exceptions=True
        )

# 使用示例
class AgentExecutionListener:
    """Agent执行事件监听器"""
    
    def __init__(self, event_bus: EventBus, supabase_client):
        self.event_bus = event_bus
        self.db = supabase_client
        self._register_handlers()
    
    def _register_handlers(self):
        self.event_bus.subscribe("agent.step.completed", self._on_step_completed)
        self.event_bus.subscribe("agent.tool.called", self._on_tool_called)
        self.event_bus.subscribe("agent.completed", self._on_agent_completed)
    
    async def _on_step_completed(self, event: Event):
        """步骤完成时保存到数据库"""
        await self.db.table("execution_logs").insert({
            "thread_id": event.payload["thread_id"],
            "step_number": event.payload["step_number"],
            "thought": event.payload["thought"],
            "timestamp": event.timestamp.isoformat()
        }).execute()
    
    async def _on_tool_called(self, event: Event):
        """工具调用时记录"""
        # 可用于计费、审计、性能监控
        pass
    
    async def _on_agent_completed(self, event: Event):
        """Agent完成时发送通知"""
        # 发送WebSocket通知到前端
        pass

4.2 关键配置与部署

4.2.1 Docker Compose配置

# docker-compose.yaml(基于Suna架构的示例配置)

version: '3.8'

services:
  # 前端服务
  frontend:
    build:
      context: ./apps/frontend
      dockerfile: Dockerfile
    ports:
      - "3000:3000"
    environment:
      - NEXT_PUBLIC_API_URL=http://localhost:8000
      - NEXT_PUBLIC_SUPABASE_URL=${SUPABASE_URL}
      - NEXT_PUBLIC_SUPABASE_ANON_KEY=${SUPABASE_ANON_KEY}
    depends_on:
      - backend
    networks:
      - suna-network

  # 后端API服务
  backend:
    build:
      context: ./backend
      dockerfile: Dockerfile
    ports:
      - "8000:8000"
    environment:
      # 数据库配置
      - DATABASE_URL=${DATABASE_URL}
      - SUPABASE_URL=${SUPABASE_URL}
      - SUPABASE_SERVICE_KEY=${SUPABASE_SERVICE_KEY}
      
      # LLM配置
      - ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
      - OPENAI_API_KEY=${OPENAI_API_KEY}
      - GROQ_API_KEY=${GROQ_API_KEY}
      
      # 其他API
      - TAVILY_API_KEY=${TAVILY_API_KEY}
      - FIRECRAWL_API_KEY=${FIRECRAWL_API_KEY}
      
      # Agent运行时配置
      - AGENT_RUNTIME_TIMEOUT=300
      - MAX_AGENT_STEPS=50
      - DEFAULT_LLM_MODEL=claude-3-5-sonnet-20241022
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock  # 用于启动Agent容器
    depends_on:
      - redis
    networks:
      - suna-network

  # Agent运行时池(预创建容器)
  agent-runtime-pool:
    build:
      context: ./backend/agent-runtime
      dockerfile: Dockerfile
    environment:
      - POOL_SIZE=5
      - BROWSER_HEADLESS=true
    networks:
      - suna-network
    deploy:
      replicas: 2
      resources:
        limits:
          cpus: '2'
          memory: 4G

  # Redis缓存
  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
    volumes:
      - redis_data:/data
    networks:
      - suna-network

  # 可选:向量数据库(用于RAG)
  qdrant:
    image: qdrant/qdrant:latest
    ports:
      - "6333:6333"
    volumes:
      - qdrant_storage:/qdrant/storage
    networks:
      - suna-network

volumes:
  redis_data:
  qdrant_storage:

networks:
  suna-network:
    driver: bridge

4.2.2 环境变量配置

# .env.example

# ===================
# 数据库配置 (Supabase)
# ===================
SUPABASE_URL=https://your-project.supabase.co
SUPABASE_ANON_KEY=your-anon-key
SUPABASE_SERVICE_KEY=your-service-key
DATABASE_URL=postgresql://postgres:password@db.supabase.co:5432/postgres

# ===================
# LLM API密钥
# ===================
# Anthropic (推荐)
ANTHROPIC_API_KEY=sk-ant-your-key

# OpenAI (可选)
OPENAI_API_KEY=sk-your-key

# Groq (高速推理,可选)
GROQ_API_KEY=gsk-your-key

# ===================
# 搜索API (可选但推荐)
# ===================
TAVILY_API_KEY=tvly-your-key
FIRECRAWL_API_KEY=fc-your-key

# ===================
# Agent配置
# ===================
DEFAULT_LLM_PROVIDER=anthropic
DEFAULT_LLM_MODEL=claude-3-5-sonnet-20241022
MAX_AGENT_STEPS=50
AGENT_TIMEOUT_SECONDS=300

# 安全设置
ENABLE_CODE_EXECUTION=true
SANDBOX_MODE=docker  # docker | none
ALLOWED_COMMANDS=ls,cat,grep,curl,wget,git,python,node

# ===================
# 高级配置
# ===================
LOG_LEVEL=INFO
ENABLE_DEBUG_MODE=false
WEBSOCKET_MAX_CONNECTIONS=1000

4.3 前端实现亮点

4.3.1 实时消息流组件

// apps/frontend/components/AgentChat.tsx

import React, { useEffect, useRef, useState } from 'react';
import { useWebSocket } from '@/hooks/useWebSocket';
import { MessageBubble } from './MessageBubble';
import { ToolCallCard } from './ToolCallCard';

interface AgentMessage {
  id: string;
  role: 'user' | 'assistant' | 'system';
  content: string;
  type: 'text' | 'thought' | 'tool_call' | 'tool_result';
  metadata?: {
    toolName?: string;
    toolParams?: Record<string, any>;
    executionTime?: number;
  };
  timestamp: Date;
}

export const AgentChat: React.FC<{ threadId: string }> = ({ threadId }) => {
  const [messages, setMessages] = useState<AgentMessage[]>([]);
  const [inputValue, setInputValue] = useState('');
  const [isExecuting, setIsExecuting] = useState(false);
  const messagesEndRef = useRef<HTMLDivElement>(null);
  
  // WebSocket连接
  const { sendMessage, lastMessage, connectionStatus } = useWebSocket(
    `ws://localhost:8000/ws/threads/${threadId}`
  );
  
  // 处理收到的消息
  useEffect(() => {
    if (lastMessage) {
      const event = JSON.parse(lastMessage.data);
      
      switch (event.type) {
        case 'agent.thinking':
          // 显示思考过程
          setMessages(prev => [...prev, {
            id: event.id,
            role: 'assistant',
            content: event.payload.thought,
            type: 'thought',
            timestamp: new Date()
          }]);
          break;
          
        case 'agent.tool_call':
          // 显示工具调用
          setMessages(prev => [...prev, {
            id: event.id,
            role: 'system',
            content: `Executing ${event.payload.tool_name}...`,
            type: 'tool_call',
            metadata: {
              toolName: event.payload.tool_name,
              toolParams: event.payload.parameters
            },
            timestamp: new Date()
          }]);
          break;
          
        case 'agent.message':
          // 显示最终回复
          setMessages(prev => [...prev, {
            id: event.id,
            role: 'assistant',
            content: event.payload.content,
            type: 'text',
            timestamp: new Date()
          }]);
          setIsExecuting(false);
          break;
          
        case 'agent.error':
          // 显示错误
          setMessages(prev => [...prev, {
            id: event.id,
            role: 'system',
            content: `Error: ${event.payload.error}`,
            type: 'text',
            timestamp: new Date()
          }]);
          setIsExecuting(false);
          break;
      }
    }
  }, [lastMessage]);
  
  // 自动滚动到底部
  useEffect(() => {
    messagesEndRef.current?.scrollIntoView({ behavior: 'smooth' });
  }, [messages]);
  
  const handleSend = () => {
    if (!inputValue.trim() || isExecuting) return;
    
    // 添加用户消息
    setMessages(prev => [...prev, {
      id: Date.now().toString(),
      role: 'user',
      content: inputValue,
      type: 'text',
      timestamp: new Date()
    }]);
    
    setIsExecuting(true);
    
    // 发送执行请求
    sendMessage(JSON.stringify({
      type: 'execute',
      payload: { message: inputValue }
    }));
    
    setInputValue('');
  };
  
  return (
    <div className="flex flex-col h-full">
      {/* 消息列表 */}
      <div className="flex-1 overflow-y-auto p-4 space-y-4">
        {messages.map((msg) => (
          <MessageBubble
            key={msg.id}
            message={msg}
            renderCustomContent={() => {
              if (msg.type === 'tool_call') {
                return (
                  <ToolCallCard
                    toolName={msg.metadata?.toolName || ''}
                    params={msg.metadata?.toolParams || {}}
                  />
                );
              }
              if (msg.type === 'thought') {
                return (
                  <div className="text-sm text-gray-500 italic border-l-2 border-blue-300 pl-3">
                    💭 Thinking: {msg.content}
                  </div>
                );
              }
              return null;
            }}
          />
        ))}
        
        {isExecuting && (
          <div className="flex items-center space-x-2 text-gray-500">
            <div className="animate-spin rounded-full h-4 w-4 border-b-2 border-blue-500"></div>
            <span>Agent is working...</span>
          </div>
        )}
        
        <div ref={messagesEndRef} />
      </div>
      
      {/* 输入框 */}
      <div className="border-t p-4">
        <div className="flex space-x-2">
          <input
            type="text"
            value={inputValue}
            onChange={(e) => setInputValue(e.target.value)}
            onKeyPress={(e) => e.key === 'Enter' && handleSend()}
            placeholder="Type your message..."
            disabled={isExecuting}
            className="flex-1 border rounded-lg px-4 py-2 focus:outline-none focus:ring-2 focus:ring-blue-500"
          />
          <button
            onClick={handleSend}
            disabled={isExecuting || !inputValue.trim()}
            className="bg-blue-500 text-white px-6 py-2 rounded-lg disabled:opacity-50"
          >
            Send
          </button>
        </div>
        <div className="text-xs text-gray-400 mt-2">
          Status: {connectionStatus} | Thread: {threadId}
        </div>
      </div>
    </div>
  );
};

4.4 最佳实践与优化建议

4.4.1 性能优化

  1. LLM调用优化

    • 启用流式响应(Streaming)减少首字节时间
    • 实现请求批处理减少API调用次数
    • 使用本地缓存减少重复查询
  2. Browser操作优化

    • 使用浏览器连接池复用Browser实例
    • 并行执行独立的浏览器操作
    • 合理设置页面加载超时
  3. Docker运行时优化

    • 预创建容器池减少冷启动时间
    • 设置合理的资源限制防止资源耗尽
    • 使用镜像缓存加速部署

4.4.2 安全最佳实践

  1. 命令执行安全

    # 危险:直接执行用户输入
    # os.system(user_input)  ❌
    
    # 安全:使用白名单和参数化
    ALLOWED_COMMANDS = {'ls', 'cat', 'grep', 'curl'}
    
    def safe_execute(command: str, args: List[str]):
        if command not in ALLOWED_COMMANDS:
            raise SecurityError(f"Command '{command}' not allowed")
        return subprocess.run([command] + args, capture_output=True)
  2. 网络隔离

    • 使用Docker网络策略限制容器出站连接
    • 对敏感操作实施额外的认证
    • 记录所有工具调用用于审计
  3. 数据保护

    • 敏感配置使用Docker Secrets或环境变量注入
    • 用户数据加密存储
    • 实施严格的访问控制

4.5 参考资料