Logo
热心市民王先生

关键代码验证

代码示例 实现指南 配置方案

提供动态上下文载入的具体实现示例、配置方法和集成方案

1. Progressive Loading 实现示例

1.1 基础架构

# context_manager.py
from typing import List, Dict, Optional
from dataclasses import dataclass
import json

@dataclass
class ContextLayer:
    """上下文层级定义"""
    name: str
    priority: int  # 0-9, 0为最高优先级
    always_load: bool
    content: str
    token_count: int

class ProgressiveContextManager:
    """渐进式上下文管理器"""
    
    def __init__(self, max_tokens: int = 8000):
        self.max_tokens = max_tokens
        self.layers: Dict[str, ContextLayer] = {}
        self.loaded_cache: Dict[str, str] = {}
        
    def register_layer(self, layer_id: str, layer: ContextLayer):
        """注册一个上下文层级"""
        self.layers[layer_id] = layer
    
    def assemble_context(
        self, 
        user_intent: str,
        required_capabilities: List[str] = None
    ) -> str:
        """
        根据用户意图组装上下文
        
        Args:
            user_intent: 用户意图描述
            required_capabilities: 需要的特定能力ID列表
        """
        selected_layers = []
        total_tokens = 0
        
        # 第一步:加载必须层(P0 - 核心身份)
        for layer_id, layer in self.layers.items():
            if layer.always_load:
                selected_layers.append((layer.priority, layer_id, layer))
                total_tokens += layer.token_count
        
        # 第二步:根据意图识别需要的层级
        if required_capabilities:
            for cap_id in required_capabilities:
                if cap_id in self.layers:
                    layer = self.layers[cap_id]
                    if not layer.always_load:
                        selected_layers.append((layer.priority, cap_id, layer))
                        total_tokens += layer.token_count
        
        # 第三步:按优先级排序
        selected_layers.sort(key=lambda x: x[0])
        
        # 第四步:应用Token预算
        final_layers = self._apply_budget(selected_layers, total_tokens)
        
        # 组装最终上下文
        context_parts = [layer.content for _, _, layer in final_layers]
        return "\n\n".join(context_parts)
    
    def _apply_budget(
        self, 
        layers: List[tuple], 
        total_tokens: int
    ) -> List[tuple]:
        """应用Token预算,必要时摘要化或丢弃低优先级内容"""
        if total_tokens <= self.max_tokens:
            return layers
        
        # 从低优先级开始摘要或丢弃
        final_layers = []
        current_tokens = 0
        
        for priority, layer_id, layer in layers:
            if layer.always_load:
                # 必须层不能被丢弃,但可以摘要
                if current_tokens + layer.token_count > self.max_tokens:
                    # 对必须层进行摘要
                    summarized = self._summarize(layer.content)
                    layer = ContextLayer(
                        name=layer.name,
                        priority=layer.priority,
                        always_load=True,
                        content=summarized,
                        token_count=len(summarized.split())  # 简化估算
                    )
                final_layers.append((priority, layer_id, layer))
                current_tokens += layer.token_count
            else:
                # 非必须层:如果预算够则加载,否则跳过
                if current_tokens + layer.token_count <= self.max_tokens:
                    final_layers.append((priority, layer_id, layer))
                    current_tokens += layer.token_count
        
        return final_layers
    
    def _summarize(self, content: str, max_length: int = 500) -> str:
        """对内容进行摘要(实际实现可调用LLM)"""
        # 简化示例:截断+提示
        if len(content) <= max_length:
            return content
        return content[:max_length] + "... [内容已摘要]"


# ========== 使用示例 ==========

def create_context_manager() -> ProgressiveContextManager:
    """创建配置好的上下文管理器"""
    manager = ProgressiveContextManager(max_tokens=6000)
    
    # L1: 核心身份(始终加载)
    manager.register_layer("core_identity", ContextLayer(
        name="Agent核心身份",
        priority=0,
        always_load=True,
        content="""你是OpenCode助手,专注于帮助用户进行代码开发和研究。
核心原则:
- 提供准确、实用的技术建议
- 遵循最佳实践
- 在不确定时坦诚告知""",
        token_count=80
    ))
    
    # L2: 工具摘要(始终加载)
    manager.register_layer("tool_summary", ContextLayer(
        name="工具能力摘要",
        priority=1,
        always_load=True,
        content="""可用工具类别:
- file: 文件读写操作
- search: 代码搜索和grep
- bash: 命令行执行
- web: 网络请求和搜索

使用方式:告诉我你需要什么操作,我会调用相应工具。""",
        token_count=60
    ))
    
    # L3: Web工具详情(动态加载)
    manager.register_layer("web_tools", ContextLayer(
        name="Web工具详情",
        priority=3,
        always_load=False,
        content="""web_search工具参数:
- query: 搜索关键词
- num_results: 结果数量(默认10)
- filters: 可选过滤条件

示例:
{"tool": "web_search", "params": {"query": "Python asyncio"}}""",
        token_count=120
    ))
    
    # L4: 代码分析技能(动态加载)
    manager.register_layer("code_analysis_skill", ContextLayer(
        name="代码分析技能",
        priority=4,
        always_load=False,
        content="""代码分析最佳实践:
1. 先理解整体架构
2. 识别关键数据流
3. 检查潜在问题
4. 提供改进建议

支持语言:Python, JavaScript, TypeScript, Go, Rust""",
        token_count=150
    ))
    
    return manager


# 实际使用
if __name__ == "__main__":
    manager = create_context_manager()
    
    # 场景1:简单问候 - 只需基础上下文
    context_v1 = manager.assemble_context("你好")
    print(f"简单场景Token数: ~{len(context_v1.split())}")
    
    # 场景2:需要Web搜索 - 动态加载Web工具
    context_v2 = manager.assemble_context(
        "搜索最新的Python异步编程最佳实践",
        required_capabilities=["web_tools"]
    )
    print(f"搜索场景Token数: ~{len(context_v2.split())}")
    
    # 场景3:代码分析 - 动态加载代码技能
    context_v3 = manager.assemble_context(
        "分析这段代码的性能问题",
        required_capabilities=["code_analysis_skill"]
    )
    print(f"代码分析场景Token数: ~{len(context_v3.split())}")

1.2 意图识别与能力映射

# intent_classifier.py
from typing import List, Dict
import re

class IntentClassifier:
    """简单的意图分类器,用于决定加载哪些能力"""
    
    def __init__(self):
        self.patterns: Dict[str, List[str]] = {
            "web_tools": [
                r"搜索|查找|search|find|look up",
                r"网上|网络|web|internet|online",
                r"最新|最近|latest|recent"
            ],
            "code_analysis_skill": [
                r"分析| review|analyze|分析",
                r"代码|code|program|script",
                r"性能|优化|performance|optimize",
                r"bug|问题|issue|problem"
            ],
            "file_tools": [
                r"文件|file|读取|read|写入|write",
                r"目录|folder|directory|path"
            ],
            "git_tools": [
                r"git|commit|push|pull|branch|merge",
                r"版本控制|version control"
            ]
        }
    
    def classify(self, user_input: str) -> List[str]:
        """
        识别用户输入中隐含的能力需求
        
        Returns:
            List[str]: 需要加载的能力ID列表
        """
        user_input = user_input.lower()
        required_capabilities = []
        
        for capability, patterns in self.patterns.items():
            for pattern in patterns:
                if re.search(pattern, user_input, re.IGNORECASE):
                    required_capabilities.append(capability)
                    break
        
        return required_capabilities


# 集成到Agent
class SmartAgent:
    def __init__(self):
        self.context_manager = create_context_manager()
        self.intent_classifier = IntentClassifier()
    
    async def process(self, user_input: str) -> str:
        # 1. 识别意图
        capabilities = self.intent_classifier.classify(user_input)
        
        # 2. 组装上下文
        context = self.context_manager.assemble_context(
            user_input,
            required_capabilities=capabilities
        )
        
        # 3. 调用LLM
        response = await self.call_llm(
            system_prompt=context,
            user_message=user_input
        )
        
        return response

2. OpenClaw风格的ContextEngine实现

2.1 插件化架构

// context-engine.ts

// 核心接口定义
interface ContextEngine {
  name: string;
  version: string;
  
  // 组装上下文
  assemble(request: ContextRequest): Promise<ContextAssembly>;
  
  // 处理历史消息
  compactHistory(history: Message[]): Promise<Message[]>;
  
  // 管理记忆
  saveMemory?(key: string, value: any): Promise<void>;
  loadMemory?(key: string): Promise<any>;
}

interface ContextRequest {
  userMessage: string;
  conversationHistory: Message[];
  workspaceFiles?: string[];
  maxTokens: number;
  preferences?: UserPreferences;
}

interface ContextAssembly {
  systemPrompt: string;
  messages: Message[];
  tokenEstimate: number;
  loadedModules: string[];
}

// ===== Legacy Engine(滑动窗口)=====
class LegacyContextEngine implements ContextEngine {
  name = "legacy";
  version = "1.0.0";
  
  async assemble(request: ContextRequest): Promise<ContextAssembly> {
    const modules = [
      this.loadCoreIdentity(),
      this.loadToolDefinitions(),
      this.loadSkills(),
    ];
    
    // 应用滑动窗口压缩
    const compactedHistory = await this.compactHistory(
      request.conversationHistory
    );
    
    const systemPrompt = modules.join("\n\n");
    
    return {
      systemPrompt,
      messages: compactedHistory,
      tokenEstimate: this.estimateTokens(systemPrompt),
      loadedModules: ["identity", "tools", "skills"]
    };
  }
  
  async compactHistory(history: Message[]): Promise<Message[]> {
    const MAX_HISTORY = 10;
    
    if (history.length <= MAX_HISTORY) {
      return history;
    }
    
    // 保留最新的消息,对旧消息进行摘要
    const recent = history.slice(-MAX_HISTORY);
    const older = history.slice(0, -MAX_HISTORY);
    
    // 对旧消息生成摘要
    const summary = await this.summarizeMessages(older);
    
    return [
      { role: "system", content: `[Previous conversation summary: ${summary}]` },
      ...recent
    ];
  }
  
  private loadCoreIdentity(): string {
    return `You are OpenCode Assistant...`;
  }
  
  private loadToolDefinitions(): string {
    // 加载所有工具定义
    return `Available tools: ...`;
  }
  
  private loadSkills(): string {
    // 加载所有技能
    return `Available skills: ...`;
  }
  
  private async summarizeMessages(messages: Message[]): Promise<string> {
    // 调用LLM或使用启发式规则生成摘要
    return "Summary of previous discussion";
  }
  
  private estimateTokens(text: string): number {
    // 简化的token估算
    return Math.ceil(text.length / 4);
  }
}

// ===== Progressive Engine(渐进加载)=====
class ProgressiveContextEngine implements ContextEngine {
  name = "progressive";
  version = "2.0.0";
  
  private layerRegistry: Map<string, ContextLayer> = new Map();
  
  constructor() {
    this.registerLayers();
  }
  
  async assemble(request: ContextRequest): Promise<ContextAssembly> {
    const loadedModules: string[] = [];
    let systemPrompt = "";
    
    // L1: 核心层(始终加载)
    const core = this.getLayer("core");
    systemPrompt += core.content;
    loadedModules.push("core");
    
    // L2: 根据意图识别需要的模块
    const intent = await this.analyzeIntent(request.userMessage);
    
    for (const moduleId of intent.requiredModules) {
      const layer = this.getLayer(moduleId);
      if (layer) {
        systemPrompt += "\n\n" + layer.content;
        loadedModules.push(moduleId);
      }
    }
    
    // L3: 检索相关记忆(如果配置了RAG)
    if (this.hasRAGCapability()) {
      const relevantMemories = await this.retrieveMemories(request.userMessage);
      if (relevantMemories) {
        systemPrompt += "\n\n[Relevant context: " + relevantMemories + "]";
        loadedModules.push("memories");
      }
    }
    
    // 应用token预算
    const finalPrompt = this.enforceBudget(systemPrompt, request.maxTokens);
    
    return {
      systemPrompt: finalPrompt,
      messages: request.conversationHistory,
      tokenEstimate: this.estimateTokens(finalPrompt),
      loadedModules
    };
  }
  
  private registerLayers() {
    this.layerRegistry.set("core", {
      id: "core",
      priority: 0,
      content: `You are OpenCode Assistant...`,
      tokenCount: 100
    });
    
    this.layerRegistry.set("web_tools", {
      id: "web_tools",
      priority: 3,
      content: `Web search tool: ...`,
      tokenCount: 200
    });
    
    // ... 注册其他层
  }
  
  private getLayer(id: string): ContextLayer | undefined {
    return this.layerRegistry.get(id);
  }
  
  private async analyzeIntent(message: string): Promise<Intent> {
    // 使用轻量级分类器或LLM分析意图
    const requiredModules: string[] = [];
    
    if (message.includes("搜索") || message.includes("search")) {
      requiredModules.push("web_tools");
    }
    if (message.includes("代码") || message.includes("code")) {
      requiredModules.push("code_tools");
    }
    // ...
    
    return { requiredModules };
  }
  
  private enforceBudget(prompt: string, maxTokens: number): string {
    // 如果超出预算,进行摘要或截断
    const estimated = this.estimateTokens(prompt);
    if (estimated <= maxTokens) {
      return prompt;
    }
    
    // 简单的截断策略(实际应用中应使用更智能的摘要)
    const maxChars = maxTokens * 4;
    return prompt.slice(0, maxChars) + "\n\n[Content truncated due to token limit]";
  }
  
  private estimateTokens(text: string): number {
    return Math.ceil(text.length / 4);
  }
  
  async compactHistory(history: Message[]): Promise<Message[]> {
    // 使用更智能的摘要策略
    return history;
  }
  
  private hasRAGCapability(): boolean {
    return false; // 或检查配置
  }
  
  private async retrieveMemories(query: string): Promise<string | null> {
    return null;
  }
}

// ===== 引擎注册表 =====
class ContextEngineRegistry {
  private engines: Map<string, ContextEngine> = new Map();
  
  register(engine: ContextEngine) {
    this.engines.set(engine.name, engine);
  }
  
  get(name: string): ContextEngine {
    const engine = this.engines.get(name);
    if (!engine) {
      throw new Error(`Context engine "${name}" not registered`);
    }
    return engine;
  }
  
  list(): string[] {
    return Array.from(this.engines.keys());
  }
}

// 全局注册表
export const contextEngineRegistry = new ContextEngineRegistry();

// 注册内置引擎
contextEngineRegistry.register(new LegacyContextEngine());
contextEngineRegistry.register(new ProgressiveContextEngine());

2.2 配置文件示例

{
  "context": {
    "engine": "progressive",
    "maxTokens": 8000,
    "layers": {
      "core": {
        "alwaysLoad": true,
        "content": "file://./prompts/core-identity.md"
      },
      "tools": {
        "strategy": "summary",
        "detailOnDemand": true
      },
      "skills": {
        "strategy": "progressive",
        "loadTrigger": "intent-match"
      },
      "memory": {
        "strategy": "rag",
        "retrievalCount": 5
      }
    }
  },
  "fallback": {
    "engine": "legacy",
    "trigger": "intent-unclear"
  }
}

3. 集成到现有框架

3.1 OpenCode风格集成

# opencode_integration.py

from typing import List, Dict, Any
import json

class OpenCodeContextAdapter:
    """适配OpenCode风格的上下文管理"""
    
    def __init__(self, config_path: str = None):
        self.manager = ProgressiveContextManager(max_tokens=6000)
        self._load_default_layers()
        
        if config_path:
            self._load_custom_layers(config_path)
    
    def _load_default_layers(self):
        """加载OpenCode默认的上下文层级"""
        
        # Layer 1: 核心身份
        self.manager.register_layer("opencode_identity", ContextLayer(
            name="OpenCode助手身份",
            priority=0,
            always_load=True,
            content="""你是OpenCode,一个专业的AI编程助手。

核心能力:
- 代码分析与重构
- 技术方案设计
- 自动化工具调用

行为准则:
- 遵循用户项目的既定模式
- 不确定时主动询问
- 提供可验证的建议""",
            token_count=100
        ))
        
        # Layer 2: 工具摘要
        self.manager.register_layer("tool_catalog", ContextLayer(
            name="工具目录",
            priority=1,
            always_load=True,
            content="""可用工具:
- file: 文件读写
- search: 代码搜索 (grep, ast-grep)
- bash: 命令行执行
- web: 网络搜索和获取
- lsp: 代码分析 (goto definition, find references)

工具详情将在需要时动态加载。""",
            token_count=80
        ))
        
        # Layer 3: 技能系统
        self.manager.register_layer("skill_system", ContextLayer(
            name="技能系统",
            priority=2,
            always_load=True,
            content="""技能(Skills)是可复用的专业指令集。

可用技能类型:
- openspec-*: OpenSpec变更管理
- research/*: 研究工作流
- git-*: Git操作

具体技能定义按需加载。""",
            token_count=70
        ))
    
    def prepare_agent_context(
        self,
        task_type: str,
        workspace_files: List[str] = None,
        required_skills: List[str] = None
    ) -> Dict[str, Any]:
        """
        为Agent准备上下文
        
        Args:
            task_type: 任务类型 (e.g., "code-review", "refactor", "research")
            workspace_files: 相关工作区文件
            required_skills: 明确需要的技能ID
        """
        capabilities = self._map_task_to_capabilities(task_type)
        if required_skills:
            capabilities.extend(required_skills)
        
        # 去重
        capabilities = list(set(capabilities))
        
        # 组装上下文
        system_prompt = self.manager.assemble_context(
            user_intent=task_type,
            required_capabilities=capabilities
        )
        
        return {
            "system_prompt": system_prompt,
            "available_tools": self._get_tools_for_capabilities(capabilities),
            "estimated_tokens": len(system_prompt.split()),
            "loaded_capabilities": capabilities
        }
    
    def _map_task_to_capabilities(self, task_type: str) -> List[str]:
        """将任务类型映射到所需能力"""
        mapping = {
            "code-review": ["code_analysis", "lsp_tools"],
            "refactor": ["code_analysis", "lsp_tools", "file_tools"],
            "research": ["web_tools", "file_tools"],
            "debug": ["bash_tools", "search_tools", "code_analysis"],
            "docs": ["file_tools", "web_tools"],
        }
        return mapping.get(task_type, [])
    
    def _get_tools_for_capabilities(self, capabilities: List[str]) -> List[str]:
        """根据能力获取相关工具"""
        tool_mapping = {
            "code_analysis": ["lsp_diagnostics", "lsp_goto_definition", "lsp_find_references"],
            "file_tools": ["read", "write", "edit", "glob"],
            "search_tools": ["grep", "ast_grep_search"],
            "bash_tools": ["bash"],
            "web_tools": ["webfetch", "websearch_web_search_exa"],
            "lsp_tools": ["lsp_symbols", "lsp_diagnostics"]
        }
        
        tools = []
        for cap in capabilities:
            tools.extend(tool_mapping.get(cap, []))
        return list(set(tools))


# 使用示例
def example_usage():
    adapter = OpenCodeContextAdapter()
    
    # 准备代码审查任务的上下文
    context = adapter.prepare_agent_context(
        task_type="code-review",
        workspace_files=["src/main.py", "tests/test_main.py"],
        required_skills=["vercel-react-best-practices"]
    )
    
    print(f"System Prompt长度: {len(context['system_prompt'])} 字符")
    print(f"估算Token数: {context['estimated_tokens']}")
    print(f"可用工具: {context['available_tools']}")
    print(f"加载的能力: {context['loaded_capabilities']}")
    
    return context

4. 性能基准测试

# benchmark.py
import time
from typing import List, Dict
import statistics

class ContextLoadingBenchmark:
    """上下文加载性能基准测试"""
    
    def __init__(self):
        self.results: List[Dict] = []
    
    def benchmark_strategy(
        self,
        name: str,
        context_manager,
        test_cases: List[Dict],
        iterations: int = 10
    ):
        """测试特定策略的性能"""
        
        print(f"\n{'='*50}")
        print(f"测试策略: {name}")
        print(f"{'='*50}")
        
        times = []
        token_counts = []
        
        for case in test_cases:
            case_times = []
            
            for _ in range(iterations):
                start = time.time()
                context = context_manager.assemble_context(
                    user_intent=case["intent"],
                    required_capabilities=case.get("capabilities", [])
                )
                elapsed = time.time() - start
                case_times.append(elapsed)
            
            avg_time = statistics.mean(case_times)
            token_count = len(context.split())
            
            times.append(avg_time)
            token_counts.append(token_count)
            
            print(f"  场景: {case['name']}")
            print(f"    平均耗时: {avg_time*1000:.2f}ms")
            print(f"    Token数: {token_count}")
        
        self.results.append({
            "strategy": name,
            "avg_time_ms": statistics.mean(times) * 1000,
            "avg_tokens": statistics.mean(token_counts),
            "total_time_ms": sum(times) * 1000
        })
    
    def print_summary(self):
        """打印测试总结"""
        print(f"\n{'='*60}")
        print("性能测试总结")
        print(f"{'='*60}")
        
        for result in self.results:
            print(f"\n策略: {result['strategy']}")
            print(f"  平均加载时间: {result['avg_time_ms']:.2f}ms")
            print(f"  平均Token数: {result['avg_tokens']:.0f}")
            print(f"  总测试时间: {result['total_time_ms']:.2f}ms")


# 测试用例
test_cases = [
    {
        "name": "简单问候",
        "intent": "你好,今天天气怎么样?",
        "capabilities": []
    },
    {
        "name": "代码搜索",
        "intent": "帮我找到项目中所有使用async/await的地方",
        "capabilities": ["search_tools"]
    },
    {
        "name": "代码分析",
        "intent": "分析这个函数的时间复杂度",
        "capabilities": ["code_analysis", "lsp_tools"]
    },
    {
        "name": "网络搜索",
        "intent": "搜索Python 3.12的新特性",
        "capabilities": ["web_tools"]
    },
    {
        "name": "复杂任务",
        "intent": "重构这个React组件并添加测试",
        "capabilities": ["code_analysis", "file_tools", "search_tools"]
    }
]

if __name__ == "__main__":
    benchmark = ContextLoadingBenchmark()
    
    # 测试Progressive Loading
    progressive_manager = create_context_manager()
    benchmark.benchmark_strategy(
        "Progressive Loading",
        progressive_manager,
        test_cases
    )
    
    # 打印总结
    benchmark.print_summary()

参考资料