Logo
热心市民王先生

关键代码验证

技术研究 代码验证 Rust实现

OpenFang的关键实现细节、配置示例与核心代码片段分析

快速启动与配置

安装与初始化

OpenFang提供了一键安装脚本,支持macOS、Linux和Windows:

# macOS/Linux
curl -fsSL https://openfang.sh/install | sh

# Windows (PowerShell)
irm https://openfang.sh/install.ps1 | iex

安装完成后,初始化流程会引导用户配置LLM供应商:

openfang init
# 交互式配置向导:
# 1. 选择默认LLM供应商(Anthropic/OpenAI/Gemini/等)
# 2. 输入API密钥(存储在~/.openfang/credentials.enc)
# 3. 选择默认Hand激活状态
# 4. 配置通知渠道(可选)

配置文件结构

OpenFang使用TOML格式的配置文件:

# ~/.openfang/openfang.toml

[server]
host = "0.0.0.0"
port = 4200
dashboard = true

[llm]
default_provider = "anthropic"
default_model = "claude-3-opus-20240229"

[llm.anthropic]
api_key = "${ANTHROPIC_API_KEY}"  # 支持环境变量引用

[llm.openai]
api_key = "${OPENAI_API_KEY}"
base_url = "https://api.openai.com/v1"  # 支持自定义endpoint

[llm.routing]
# 智能路由配置
complexity_threshold = 0.7  # 复杂度评分阈值
fallback_provider = "openai"
fallback_model = "gpt-4-turbo"

[security]
# 安全配置
sandbox_enabled = true
audit_trail = true
rate_limit_rpm = 60

[channels]
# Channel适配器配置
default_channel = "telegram"

[channels.telegram]
bot_token = "${TELEGRAM_BOT_TOKEN}"
allowed_chat_ids = [123456789, 987654321]

[hands]
# Hands默认状态
researcher = { enabled = true, schedule = "0 9 * * *" }  # 每日9点
lead = { enabled = true, schedule = "0 0 * * *" }        # 每日0点
collector = { enabled = false }

Hands系统实现

自定义Hand开发

创建自定义Hand需要以下步骤:

1. 创建目录结构

my-hand/
├── HAND.toml       # 必需:Hand清单
├── system.md       # 必需:系统提示(500+字)
├── SKILL.md        # 可选:领域知识参考
└── guardrails.json # 可选:审批门控配置

2. 定义HAND.toml

[hand]
name = "competitor-analyzer"
version = "1.0.0"
description = "Autonomous competitor analysis agent"
author = "your-name"

[dependencies]
# 声明所需工具(必须在工具白名单内)
tools = [
    "web_search",
    "web_fetch", 
    "document_parser",
    "knowledge_graph",
    "report_generator"
]

# 推荐LLM配置
[llm]
recommended_provider = "anthropic"
recommended_model = "claude-3-opus-20240229"
temperature = 0.3  # 低温度保证输出一致性

[settings]
# 可配置参数
competitor_list = []  # 空数组表示运行时提供
analysis_depth = "deep"  # shallow/medium/deep
output_language = "zh-CN"
max_sources = 30
schedule = "0 8 * * 1"  # 每周一早8点

[metrics]
# Dashboard展示指标
companies_analyzed = { type = "counter", label = "Companies Analyzed" }
insights_generated = { type = "counter", label = "Insights Generated" }
report_quality = { type = "gauge", label = "Quality Score" }

[guardrails]
# 敏感操作需要审批
web_automation = "require_approval"
external_api_call = "require_approval"
file_write = "allow"  # 允许写入报告文件

3. 编写系统提示

# Competitor Analyzer Hand - System Prompt

## Role
You are an expert competitive intelligence analyst with 15+ years of experience 
in market research and strategic analysis. Your role is to autonomously gather, 
analyze, and synthesize information about specified competitors.

## Mission
Generate comprehensive competitor analysis reports that provide actionable insights 
for strategic decision-making.

## Workflow (8 Phases)

### Phase 1: Target Definition
- Parse the competitor list from configuration or user input
- Validate target companies against known databases
- Create structured research agenda for each target

### Phase 2: Information Gathering
- Execute parallel web searches for each target
- Collect: company overview, products, pricing, recent news, leadership
- Prioritize official sources (website, press releases, SEC filings)
- Cross-reference with news articles and analyst reports

### Phase 3: Data Validation
- Apply CRAAP criteria (Currency, Relevance, Authority, Accuracy, Purpose)
- Score each source 0-1 for credibility
- Flag and exclude sources below threshold (default: 0.6)

### Phase 4: Knowledge Graph Construction
- Extract entities: products, people, events, metrics
- Build relationships: competitor-of, acquired-by, partnered-with
- Store in vector database for future retrieval

### Phase 5: Comparative Analysis
- Normalize metrics across competitors
- Identify: strengths, weaknesses, opportunities, threats (SWOT)
- Calculate competitive positioning scores

### Phase 6: Insight Generation
- Identify market trends and patterns
- Generate strategic recommendations
- Prioritize insights by impact and confidence

### Phase 7: Report Assembly
- Structure: Executive Summary → Company Profiles → Comparative Matrix → Strategic Insights
- Format: Markdown with embedded tables and charts
- Include: citations in APA format

### Phase 8: Delivery
- Save report to configured output directory
- Send notification via configured channel (Telegram/Email)
- Update dashboard metrics

## Quality Standards
- All claims must be cited
- Confidence intervals for predictions
- Acknowledge data gaps explicitly
- No speculation presented as fact

## Constraints
- Maximum execution time: 2 hours
- Maximum sources per target: 50
- Respect robots.txt and rate limits
- No scraping of paywalled content

Hand生命周期管理

# 列出所有可用Hands
openfang hand list

# 激活Hand(开始自主运行)
openfang hand activate competitor-analyzer

# 查看Hand状态
openfang hand status competitor-analyzer
# 输出:
# ┌─────────────────────────────────────────┐
# │ Hand: competitor-analyzer               │
# │ Status: RUNNING                         │
# │ Last Run: 2026-03-01 08:00:00 UTC       │
# │ Next Run: 2026-03-08 08:00:00 UTC       │
# │ Companies Analyzed: 12                  │
# │ Insights Generated: 47                  │
# └─────────────────────────────────────────┘

# 暂停Hand(保留状态)
openfang hand pause competitor-analyzer

# 恢复运行
openfang hand resume competitor-analyzer

# 停止并清除状态
openfang hand stop competitor-analyzer

安全机制实现

WASM沙箱配置

OpenFang使用WASM双计量沙箱执行工具代码:

// 概念代码:WASM沙箱初始化
pub struct WasmSandbox {
    // 燃料计量:控制计算指令数
    fuel_limit: u64,      // 默认: 10,000,000
    fuel_consumed: u64,
    
    // 轮次计量:控制执行时间
    epoch_timeout: Duration,  // 默认: 30秒
    
    // Watchdog线程
    watchdog: JoinHandle<()>,
}

impl WasmSandbox {
    pub fn execute_tool(&mut self, tool: Tool, input: Value) -> Result<Value> {
        // 1. 验证工具签名
        self.verify_tool_signature(&tool)?;
        
        // 2. 编译到WASM(如果尚未编译)
        let module = self.compile_to_wasm(&tool)?;
        
        // 3. 创建燃料受限的实例
        let mut instance = self.create_metered_instance(module)?;
        
        // 4. 启动Watchdog
        let timeout = self.epoch_timeout;
        let interrupt_handle = instance.get_interrupt_handle();
        let watchdog = spawn(watchdog_thread(interrupt_handle, timeout));
        
        // 5. 执行工具
        let result = instance.call("execute", &[input])?;
        
        // 6. 清理
        watchdog.join().ok();
        
        Ok(result)
    }
    
    fn watchdog_thread(handle: InterruptHandle, timeout: Duration) {
        sleep(timeout);
        handle.interrupt();  // 强制终止超时执行
    }
}

Merkle审计链

每个操作都被记录到不可篡改的审计链:

// 概念代码:审计日志结构
pub struct AuditEntry {
    pub sequence: u64,
    pub timestamp: DateTime<Utc>,
    pub agent_id: String,
    pub action: Action,
    pub input_hash: [u8; 32],
    pub output_hash: [u8; 32],
    pub previous_hash: [u8; 32],  // 链向前一个条目
    pub signature: Signature,      // Ed25519签名
}

impl AuditLog {
    pub fn append(&mut self, entry: AuditEntry) -> Result<()> {
        // 验证链完整性
        if entry.previous_hash != self.last_hash {
            return Err(AuditError::ChainBroken);
        }
        
        // 验证签名
        entry.verify_signature()?;
        
        // 追加到存储
        self.store.append(entry.clone())?;
        
        // 更新最新hash
        self.last_hash = entry.compute_hash();
        
        Ok(())
    }
    
    pub fn verify_chain(&self) -> Result<bool> {
        let entries = self.store.iter()?;
        
        for (prev, curr) in entries.tuple_windows() {
            // 验证hash链接
            if curr.previous_hash != prev.compute_hash() {
                return Ok(false);
            }
            // 验证签名
            curr.verify_signature()?;
        }
        
        Ok(true)
    }
}

污点追踪

敏感数据在系统中被标记和追踪:

// 概念代码:污点标签传播
#[derive(Clone, Debug)]
pub struct TaintedString {
    content: String,
    taint_labels: HashSet<TaintLabel>,
}

#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq)]
pub enum TaintLabel {
    ApiKey,
    Password,
    PersonalData,
    Financial,
    Confidential,
}

// 污点传播规则
impl TaintedString {
    pub fn concat(&self, other: &TaintedString) -> TaintedString {
        TaintedString {
            content: format!("{}{}", self.content, other.content),
            taint_labels: &self.taint_labels | &other.taint_labels,  // 并集
        }
    }
    
    pub fn substring(&self, start: usize, end: usize) -> TaintedString {
        TaintedString {
            content: self.content[start..end].to_string(),
            taint_labels: self.taint_labels.clone(),  // 保持污点
        }
    }
}

// Sink检查:防止敏感数据泄露
pub fn check_sink(sink: &Sink, data: &TaintedString) -> Result<()> {
    match sink {
        Sink::Network(url) if is_external(url) => {
            if data.taint_labels.contains(&TaintLabel::ApiKey) {
                return Err(SecurityError::DataExfiltration);
            }
        }
        Sink::File(path) if is_world_readable(path) => {
            if !data.taint_labels.is_disjoint(&SENSITIVE_LABELS) {
                return Err(SecurityError::InsecureStorage);
            }
        }
        _ => {}
    }
    Ok(())
}

Channel适配器集成

Telegram配置示例

// 概念代码:Telegram适配器初始化
pub async fn init_telegram_channel(config: TelegramConfig) -> Result<TelegramAdapter> {
    let api = Api::new(config.bot_token);
    
    // 设置命令菜单
    let commands = vec![
        BotCommand::new("status", "Check agent status"),
        BotCommand::new("pause", "Pause current hand"),
        BotCommand::new("resume", "Resume paused hand"),
        BotCommand::new("report", "Request immediate report"),
    ];
    api.set_my_commands(commands).await?;
    
    // 配置允许的聊天
    let allowed_chats: HashSet<ChatId> = config
        .allowed_chat_ids
        .into_iter()
        .map(ChatId)
        .collect();
    
    Ok(TelegramAdapter {
        api,
        allowed_chats,
        rate_limiter: GCRA::new(30, Duration::from_secs(60)),  // 30条/分钟
    })
}

impl ChannelAdapter for TelegramAdapter {
    async fn send_message(&self, chat_id: ChatId, message: Message) -> Result<()> {
        // 权限检查
        if !self.allowed_chats.contains(&chat_id) {
            return Err(ChannelError::Unauthorized);
        }
        
        // 速率限制
        self.rate_limiter.check()?;
        
        // 格式化消息
        let text = self.format_markdown(&message);
        
        // 发送
        self.api.send_message(chat_id, text)
            .parse_mode(ParseMode::MarkdownV2)
            .await?;
        
        Ok(())
    }
}

MCP协议集成

启动MCP服务器

# 启动MCP服务器模式
openfang mcp serve --port 3000

# 输出:
# [INFO] MCP Server listening on 0.0.0.0:3000
# [INFO] Exposed tools: web_search, web_fetch, document_parser, ...
# [INFO] Connected clients: 0

MCP工具定义

{
  "name": "web_search",
  "description": "Search the web for information",
  "inputSchema": {
    "type": "object",
    "properties": {
      "query": {
        "type": "string",
        "description": "Search query"
      },
      "num_results": {
        "type": "integer",
        "default": 10,
        "description": "Number of results to return"
      }
    },
    "required": ["query"]
  },
  "outputSchema": {
    "type": "array",
    "items": {
      "type": "object",
      "properties": {
        "title": { "type": "string" },
        "url": { "type": "string" },
        "snippet": { "type": "string" }
      }
    }
  }
}

外部MCP服务器连接

# 配置外部MCP服务器
[mcp.servers.github]
command = "github-mcp-server"
args = ["--token", "${GITHUB_TOKEN}"]

[mcp.servers.postgres]
command = "postgres-mcp-server"
args = ["--connection-string", "${DATABASE_URL}"]

OpenAI兼容API

OpenFang提供完整的OpenAI兼容接口:

# 聊天补全(流式)
curl -X POST localhost:4200/v1/chat/completions \
  -H "Content-Type: application/json" \
  -d '{
    "model": "researcher",
    "messages": [
      {"role": "user", "content": "分析2024年AI Agent市场趋势"}
    ],
    "stream": true,
    "temperature": 0.7
  }'

# 响应(SSE格式)
data: {"id":"chat-xxx","choices":[{"delta":{"content":"根据"},"index":0}]}
data: {"id":"chat-xxx","choices":[{"delta":{"content":"我的"},"index":0}]}
...
data: [DONE]

支持的端点:

  • POST /v1/chat/completions - 聊天补全
  • POST /v1/embeddings - 文本嵌入
  • GET /v1/models - 模型列表
  • POST /v1/agents/{id}/run - Agent执行
  • GET /v1/hands - Hands列表