关键代码验证
技术研究 代码验证 Rust实现
OpenFang的关键实现细节、配置示例与核心代码片段分析
快速启动与配置
安装与初始化
OpenFang提供了一键安装脚本,支持macOS、Linux和Windows:
# macOS/Linux
curl -fsSL https://openfang.sh/install | sh
# Windows (PowerShell)
irm https://openfang.sh/install.ps1 | iex
安装完成后,初始化流程会引导用户配置LLM供应商:
openfang init
# 交互式配置向导:
# 1. 选择默认LLM供应商(Anthropic/OpenAI/Gemini/等)
# 2. 输入API密钥(存储在~/.openfang/credentials.enc)
# 3. 选择默认Hand激活状态
# 4. 配置通知渠道(可选)
配置文件结构
OpenFang使用TOML格式的配置文件:
# ~/.openfang/openfang.toml
[server]
host = "0.0.0.0"
port = 4200
dashboard = true
[llm]
default_provider = "anthropic"
default_model = "claude-3-opus-20240229"
[llm.anthropic]
api_key = "${ANTHROPIC_API_KEY}" # 支持环境变量引用
[llm.openai]
api_key = "${OPENAI_API_KEY}"
base_url = "https://api.openai.com/v1" # 支持自定义endpoint
[llm.routing]
# 智能路由配置
complexity_threshold = 0.7 # 复杂度评分阈值
fallback_provider = "openai"
fallback_model = "gpt-4-turbo"
[security]
# 安全配置
sandbox_enabled = true
audit_trail = true
rate_limit_rpm = 60
[channels]
# Channel适配器配置
default_channel = "telegram"
[channels.telegram]
bot_token = "${TELEGRAM_BOT_TOKEN}"
allowed_chat_ids = [123456789, 987654321]
[hands]
# Hands默认状态
researcher = { enabled = true, schedule = "0 9 * * *" } # 每日9点
lead = { enabled = true, schedule = "0 0 * * *" } # 每日0点
collector = { enabled = false }
Hands系统实现
自定义Hand开发
创建自定义Hand需要以下步骤:
1. 创建目录结构
my-hand/
├── HAND.toml # 必需:Hand清单
├── system.md # 必需:系统提示(500+字)
├── SKILL.md # 可选:领域知识参考
└── guardrails.json # 可选:审批门控配置
2. 定义HAND.toml
[hand]
name = "competitor-analyzer"
version = "1.0.0"
description = "Autonomous competitor analysis agent"
author = "your-name"
[dependencies]
# 声明所需工具(必须在工具白名单内)
tools = [
"web_search",
"web_fetch",
"document_parser",
"knowledge_graph",
"report_generator"
]
# 推荐LLM配置
[llm]
recommended_provider = "anthropic"
recommended_model = "claude-3-opus-20240229"
temperature = 0.3 # 低温度保证输出一致性
[settings]
# 可配置参数
competitor_list = [] # 空数组表示运行时提供
analysis_depth = "deep" # shallow/medium/deep
output_language = "zh-CN"
max_sources = 30
schedule = "0 8 * * 1" # 每周一早8点
[metrics]
# Dashboard展示指标
companies_analyzed = { type = "counter", label = "Companies Analyzed" }
insights_generated = { type = "counter", label = "Insights Generated" }
report_quality = { type = "gauge", label = "Quality Score" }
[guardrails]
# 敏感操作需要审批
web_automation = "require_approval"
external_api_call = "require_approval"
file_write = "allow" # 允许写入报告文件
3. 编写系统提示
# Competitor Analyzer Hand - System Prompt
## Role
You are an expert competitive intelligence analyst with 15+ years of experience
in market research and strategic analysis. Your role is to autonomously gather,
analyze, and synthesize information about specified competitors.
## Mission
Generate comprehensive competitor analysis reports that provide actionable insights
for strategic decision-making.
## Workflow (8 Phases)
### Phase 1: Target Definition
- Parse the competitor list from configuration or user input
- Validate target companies against known databases
- Create structured research agenda for each target
### Phase 2: Information Gathering
- Execute parallel web searches for each target
- Collect: company overview, products, pricing, recent news, leadership
- Prioritize official sources (website, press releases, SEC filings)
- Cross-reference with news articles and analyst reports
### Phase 3: Data Validation
- Apply CRAAP criteria (Currency, Relevance, Authority, Accuracy, Purpose)
- Score each source 0-1 for credibility
- Flag and exclude sources below threshold (default: 0.6)
### Phase 4: Knowledge Graph Construction
- Extract entities: products, people, events, metrics
- Build relationships: competitor-of, acquired-by, partnered-with
- Store in vector database for future retrieval
### Phase 5: Comparative Analysis
- Normalize metrics across competitors
- Identify: strengths, weaknesses, opportunities, threats (SWOT)
- Calculate competitive positioning scores
### Phase 6: Insight Generation
- Identify market trends and patterns
- Generate strategic recommendations
- Prioritize insights by impact and confidence
### Phase 7: Report Assembly
- Structure: Executive Summary → Company Profiles → Comparative Matrix → Strategic Insights
- Format: Markdown with embedded tables and charts
- Include: citations in APA format
### Phase 8: Delivery
- Save report to configured output directory
- Send notification via configured channel (Telegram/Email)
- Update dashboard metrics
## Quality Standards
- All claims must be cited
- Confidence intervals for predictions
- Acknowledge data gaps explicitly
- No speculation presented as fact
## Constraints
- Maximum execution time: 2 hours
- Maximum sources per target: 50
- Respect robots.txt and rate limits
- No scraping of paywalled content
Hand生命周期管理
# 列出所有可用Hands
openfang hand list
# 激活Hand(开始自主运行)
openfang hand activate competitor-analyzer
# 查看Hand状态
openfang hand status competitor-analyzer
# 输出:
# ┌─────────────────────────────────────────┐
# │ Hand: competitor-analyzer │
# │ Status: RUNNING │
# │ Last Run: 2026-03-01 08:00:00 UTC │
# │ Next Run: 2026-03-08 08:00:00 UTC │
# │ Companies Analyzed: 12 │
# │ Insights Generated: 47 │
# └─────────────────────────────────────────┘
# 暂停Hand(保留状态)
openfang hand pause competitor-analyzer
# 恢复运行
openfang hand resume competitor-analyzer
# 停止并清除状态
openfang hand stop competitor-analyzer
安全机制实现
WASM沙箱配置
OpenFang使用WASM双计量沙箱执行工具代码:
// 概念代码:WASM沙箱初始化
pub struct WasmSandbox {
// 燃料计量:控制计算指令数
fuel_limit: u64, // 默认: 10,000,000
fuel_consumed: u64,
// 轮次计量:控制执行时间
epoch_timeout: Duration, // 默认: 30秒
// Watchdog线程
watchdog: JoinHandle<()>,
}
impl WasmSandbox {
pub fn execute_tool(&mut self, tool: Tool, input: Value) -> Result<Value> {
// 1. 验证工具签名
self.verify_tool_signature(&tool)?;
// 2. 编译到WASM(如果尚未编译)
let module = self.compile_to_wasm(&tool)?;
// 3. 创建燃料受限的实例
let mut instance = self.create_metered_instance(module)?;
// 4. 启动Watchdog
let timeout = self.epoch_timeout;
let interrupt_handle = instance.get_interrupt_handle();
let watchdog = spawn(watchdog_thread(interrupt_handle, timeout));
// 5. 执行工具
let result = instance.call("execute", &[input])?;
// 6. 清理
watchdog.join().ok();
Ok(result)
}
fn watchdog_thread(handle: InterruptHandle, timeout: Duration) {
sleep(timeout);
handle.interrupt(); // 强制终止超时执行
}
}
Merkle审计链
每个操作都被记录到不可篡改的审计链:
// 概念代码:审计日志结构
pub struct AuditEntry {
pub sequence: u64,
pub timestamp: DateTime<Utc>,
pub agent_id: String,
pub action: Action,
pub input_hash: [u8; 32],
pub output_hash: [u8; 32],
pub previous_hash: [u8; 32], // 链向前一个条目
pub signature: Signature, // Ed25519签名
}
impl AuditLog {
pub fn append(&mut self, entry: AuditEntry) -> Result<()> {
// 验证链完整性
if entry.previous_hash != self.last_hash {
return Err(AuditError::ChainBroken);
}
// 验证签名
entry.verify_signature()?;
// 追加到存储
self.store.append(entry.clone())?;
// 更新最新hash
self.last_hash = entry.compute_hash();
Ok(())
}
pub fn verify_chain(&self) -> Result<bool> {
let entries = self.store.iter()?;
for (prev, curr) in entries.tuple_windows() {
// 验证hash链接
if curr.previous_hash != prev.compute_hash() {
return Ok(false);
}
// 验证签名
curr.verify_signature()?;
}
Ok(true)
}
}
污点追踪
敏感数据在系统中被标记和追踪:
// 概念代码:污点标签传播
#[derive(Clone, Debug)]
pub struct TaintedString {
content: String,
taint_labels: HashSet<TaintLabel>,
}
#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq)]
pub enum TaintLabel {
ApiKey,
Password,
PersonalData,
Financial,
Confidential,
}
// 污点传播规则
impl TaintedString {
pub fn concat(&self, other: &TaintedString) -> TaintedString {
TaintedString {
content: format!("{}{}", self.content, other.content),
taint_labels: &self.taint_labels | &other.taint_labels, // 并集
}
}
pub fn substring(&self, start: usize, end: usize) -> TaintedString {
TaintedString {
content: self.content[start..end].to_string(),
taint_labels: self.taint_labels.clone(), // 保持污点
}
}
}
// Sink检查:防止敏感数据泄露
pub fn check_sink(sink: &Sink, data: &TaintedString) -> Result<()> {
match sink {
Sink::Network(url) if is_external(url) => {
if data.taint_labels.contains(&TaintLabel::ApiKey) {
return Err(SecurityError::DataExfiltration);
}
}
Sink::File(path) if is_world_readable(path) => {
if !data.taint_labels.is_disjoint(&SENSITIVE_LABELS) {
return Err(SecurityError::InsecureStorage);
}
}
_ => {}
}
Ok(())
}
Channel适配器集成
Telegram配置示例
// 概念代码:Telegram适配器初始化
pub async fn init_telegram_channel(config: TelegramConfig) -> Result<TelegramAdapter> {
let api = Api::new(config.bot_token);
// 设置命令菜单
let commands = vec![
BotCommand::new("status", "Check agent status"),
BotCommand::new("pause", "Pause current hand"),
BotCommand::new("resume", "Resume paused hand"),
BotCommand::new("report", "Request immediate report"),
];
api.set_my_commands(commands).await?;
// 配置允许的聊天
let allowed_chats: HashSet<ChatId> = config
.allowed_chat_ids
.into_iter()
.map(ChatId)
.collect();
Ok(TelegramAdapter {
api,
allowed_chats,
rate_limiter: GCRA::new(30, Duration::from_secs(60)), // 30条/分钟
})
}
impl ChannelAdapter for TelegramAdapter {
async fn send_message(&self, chat_id: ChatId, message: Message) -> Result<()> {
// 权限检查
if !self.allowed_chats.contains(&chat_id) {
return Err(ChannelError::Unauthorized);
}
// 速率限制
self.rate_limiter.check()?;
// 格式化消息
let text = self.format_markdown(&message);
// 发送
self.api.send_message(chat_id, text)
.parse_mode(ParseMode::MarkdownV2)
.await?;
Ok(())
}
}
MCP协议集成
启动MCP服务器
# 启动MCP服务器模式
openfang mcp serve --port 3000
# 输出:
# [INFO] MCP Server listening on 0.0.0.0:3000
# [INFO] Exposed tools: web_search, web_fetch, document_parser, ...
# [INFO] Connected clients: 0
MCP工具定义
{
"name": "web_search",
"description": "Search the web for information",
"inputSchema": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query"
},
"num_results": {
"type": "integer",
"default": 10,
"description": "Number of results to return"
}
},
"required": ["query"]
},
"outputSchema": {
"type": "array",
"items": {
"type": "object",
"properties": {
"title": { "type": "string" },
"url": { "type": "string" },
"snippet": { "type": "string" }
}
}
}
}
外部MCP服务器连接
# 配置外部MCP服务器
[mcp.servers.github]
command = "github-mcp-server"
args = ["--token", "${GITHUB_TOKEN}"]
[mcp.servers.postgres]
command = "postgres-mcp-server"
args = ["--connection-string", "${DATABASE_URL}"]
OpenAI兼容API
OpenFang提供完整的OpenAI兼容接口:
# 聊天补全(流式)
curl -X POST localhost:4200/v1/chat/completions \
-H "Content-Type: application/json" \
-d '{
"model": "researcher",
"messages": [
{"role": "user", "content": "分析2024年AI Agent市场趋势"}
],
"stream": true,
"temperature": 0.7
}'
# 响应(SSE格式)
data: {"id":"chat-xxx","choices":[{"delta":{"content":"根据"},"index":0}]}
data: {"id":"chat-xxx","choices":[{"delta":{"content":"我的"},"index":0}]}
...
data: [DONE]
支持的端点:
POST /v1/chat/completions- 聊天补全POST /v1/embeddings- 文本嵌入GET /v1/models- 模型列表POST /v1/agents/{id}/run- Agent执行GET /v1/hands- Hands列表