关键代码验证
技术研究 AI 对冲基金 代码验证
AI 对冲基金核心代码模式 - 智能体定义、LangGraph 状态机、投票聚合机制实现
核心智能体模式 (Core Agent Pattern)
基础智能体类
所有智能体继承自统一的基类,确保接口一致性:
# src/agents/base_agent.py
from abc import ABC, abstractmethod
from typing import Dict, Any
from langchain_core.language_models import BaseChatModel
class BaseAgent(ABC):
"""所有投资智能体的基类"""
def __init__(self, llm: BaseChatModel, system_prompt: str):
self.llm = llm
self.system_prompt = system_prompt
@abstractmethod
def analyze(self, state: Dict[str, Any]) -> Dict[str, Any]:
"""分析当前状态并返回意见"""
pass
def _build_prompt(self, state: Dict[str, Any]) -> str:
"""构建完整的提示词"""
return f"""{self.system_prompt}
当前分析标的:{state.get('ticker', 'Unknown')}
财务数据:{state.get('fundamentals', {})}
价格数据:{state.get('prices', {})}
市场情绪:{state.get('sentiment', {})}
请基于以上信息进行分析,并给出投资建议 (BUY/HOLD/SELL)。
"""
def _parse_response(self, response: str) -> Dict[str, Any]:
"""解析 LLM 响应为结构化数据"""
# 实际实现会提取:
# - 投资理由
# - 建议操作
# - 置信度
# - 关键指标
return {
"agent_name": self.__class__.__name__,
"recommendation": "BUY", # 示例
"confidence": 0.75,
"reasoning": response
}
巴菲特智能体实现
# src/agents/investors/buffett_agent.py
from .base_agent import BaseAgent
BUFFETT_PROMPT = """
你是沃伦·巴菲特,伯克希尔哈撒韦公司董事长,价值投资传奇人物。
【投资哲学】
1. 寻找具有持久竞争优势 (护城河) 的优秀企业
2. 关注 ROE、自由现金流、管理层质量
3. 只在价格低于内在价值时买入
4. 长期持有,几乎不做波段
【分析框架】
- 业务是否简单易懂?(我能向 10 岁孩子解释这家公司做什么吗?)
- 是否有稳定的经营历史?(过去 10 年盈利是否稳定增长?)
- 管理层是否诚信且能干?(资本配置是否合理?)
- 价格是否有安全边际?(当前价格是否显著低于内在价值?)
【输出要求】
1. 用巴菲特的口吻和思维方式分析
2. 引用具体财务指标支撑观点
3. 给出明确的投资建议 (BUY/HOLD/SELL)
4. 说明置信度和关键风险
"""
class BuffettAgent(BaseAgent):
"""沃伦·巴菲特投资智能体"""
def __init__(self, llm: BaseChatModel):
super().__init__(llm, BUFFETT_PROMPT)
def analyze(self, state: Dict[str, Any]) -> Dict[str, Any]:
"""执行巴菲特风格的投资分析"""
# 1. 从 state 提取关键指标
fundamentals = state.get('fundamentals', {})
roe = fundamentals.get('return_on_equity', 0)
profit_margin = fundamentals.get('profit_margin', 0)
debt_to_equity = fundamentals.get('debt_to_equity', 0)
# 2. 构建提示词
prompt = self._build_prompt(state)
# 3. 添加巴菲特特定的分析维度
prompt += f"""
特别关注:
- ROE: {roe:.2%} (巴菲特偏好>15%)
- 利润率:{profit_margin:.2%} (偏好稳定或增长)
- 负债权益比:{debt_to_equity:.2f} (偏好<1)
"""
# 4. 调用 LLM
response = self.llm.invoke(prompt)
# 5. 解析并返回
return self._parse_response(response.content)
LangGraph 状态定义 (StateGraph Definition)
State 结构
# src/graph/state.py
from typing import TypedDict, List, Dict, Any, Annotated
from typing_extensions import NotRequired
import operator
class HedgeFundState(TypedDict):
"""对冲基金分析的共享状态"""
# 输入参数
ticker: str
start_date: str
end_date: str
# 原始数据 (数据层填充)
fundamentals: Dict[str, Any]
prices: Dict[str, Any]
sentiment: List[Dict[str, Any]]
# 智能体意见 (逐步累积)
investor_opinions: Annotated[List[Dict], operator.add]
analysis_scores: Dict[str, float]
# 风险参数
risk_metrics: Dict[str, Any]
# 最终决策
decision: str
position_size: float
reasoning: str
关键设计:使用 Annotated[List, operator.add] 实现意见的自动累积,每个智能体的输出会追加到列表中。
StateGraph 构建
# src/graph/workflow.py
from langgraph.graph import StateGraph, END
from .state import HedgeFundState
def create_hedge_fund_graph() -> StateGraph:
"""创建对冲基金分析的完整工作流"""
# 1. 初始化状态图
graph_builder = StateGraph(HedgeFundState)
# 2. 添加数据获取节点
graph_builder.add_node("fetch_data", fetch_data_node)
# 3. 添加投资者智能体节点
graph_builder.add_node("buffett_agent", create_agent_node(BuffettAgent))
graph_builder.add_node("graham_agent", create_agent_node(GrahamAgent))
graph_builder.add_node("wood_agent", create_agent_node(WoodAgent))
# ... 其他投资者
# 4. 添加专业分析节点
graph_builder.add_node("fundamentals_agent", fundamentals_node)
graph_builder.add_node("technicals_agent", technicals_node)
graph_builder.add_node("valuation_agent", valuation_node)
graph_builder.add_node("sentiment_agent", sentiment_node)
# 5. 添加管理节点
graph_builder.add_node("risk_manager", risk_manager_node)
graph_builder.add_node("portfolio_manager", portfolio_manager_node)
# 6. 定义边 (流程控制)
graph_builder.set_entry_point("fetch_data")
# 数据获取 → 投资者分析
graph_builder.add_edge("fetch_data", "buffett_agent")
graph_builder.add_edge("buffett_agent", "graham_agent")
graph_builder.add_edge("graham_agent", "wood_agent")
# ... 链式调用所有投资者
# 投资者分析 → 专业分析
graph_builder.add_edge("wood_agent", "fundamentals_agent")
graph_builder.add_edge("fundamentals_agent", "technicals_agent")
# ...
# 专业分析 → 风险管理 → 投资决策
graph_builder.add_edge("sentiment_agent", "risk_manager")
graph_builder.add_edge("risk_manager", "portfolio_manager")
graph_builder.add_edge("portfolio_manager", END)
return graph_builder.compile()
工具集成 (Tool Integration)
财务数据获取
# src/tools/data_fetcher.py
import yfinance as yf
from financial_datasets import get_income_statements, get_balance_sheets
def fetch_fundamentals(ticker: str) -> Dict[str, Any]:
"""获取公司基本面数据"""
stock = yf.Ticker(ticker)
# 免费数据 (AAPL, GOOGL, MSFT, NVDA, TSLA)
info = stock.info
financials = stock.financials
# 付费数据 (需要 Financial Datasets API key)
# income = get_income_statements(ticker)
# balance = get_balance_sheets(ticker)
return {
'market_cap': info.get('marketCap', 0),
'pe_ratio': info.get('trailingPE', 0),
'pb_ratio': info.get('priceToBook', 0),
'roe': info.get('returnOnEquity', 0),
'profit_margin': info.get('profitMargins', 0),
'revenue_growth': info.get('revenueGrowth', 0),
'debt_to_equity': info.get('debtToEquity', 0),
}
def fetch_prices(ticker: str, start_date: str, end_date: str) -> Dict[str, Any]:
"""获取价格历史数据"""
stock = yf.Ticker(ticker)
hist = stock.history(start=start_date, end=end_date)
return {
'current_price': hist['Close'].iloc[-1],
'price_change_1m': (hist['Close'].iloc[-1] / hist['Close'].iloc[-21] - 1) * 100,
'price_change_3m': (hist['Close'].iloc[-1] / hist['Close'].iloc[-63] - 1) * 100,
'price_change_1y': (hist['Close'].iloc[-1] / hist['Close'].iloc[-252] - 1) * 100,
'volatility': hist['Close'].std(),
}
新闻情绪分析
# src/tools/sentiment_analyzer.py
from langchain.tools import tool
@tool
def analyze_news_sentiment(ticker: str, days: int = 7) -> float:
"""分析最近 N 天的新闻情绪"""
# 1. 获取新闻 (使用新闻 API)
news_articles = fetch_news(ticker, days)
# 2. 使用 LLM 分析每条新闻的情绪
sentiment_scores = []
for article in news_articles:
prompt = f"""分析以下新闻的情绪 (0=极度负面, 1=中性, 2=极度正面):
{article['title']}
{article['summary']}
仅返回 0-2 之间的数字。"""
score = llm.invoke(prompt)
sentiment_scores.append(float(score.content))
# 3. 计算平均情绪
avg_sentiment = sum(sentiment_scores) / len(sentiment_scores)
# 4. 归一化到 [-1, 1] 范围
normalized = (avg_sentiment - 1) / 1
return normalized
投票聚合机制 (Voting Aggregation)
加权投票算法
# src/graph/nodes.py
def portfolio_manager_node(state: HedgeFundState) -> Dict:
"""投资组合经理:聚合所有意见,做出最终决策"""
# 1. 提取所有投资者意见
opinions = state['investor_opinions']
# 2. 定义投资者权重 (可配置)
weights = {
'BuffettAgent': 1.5, # 巴菲特权重最高
'GrahamAgent': 1.3, # 格雷厄姆次之
'WoodAgent': 1.0,
'BurryAgent': 1.2, # 逆向思维有价值
# ... 其他投资者权重为 1.0
}
# 3. 计算加权投票
vote_mapping = {'BUY': 1, 'HOLD': 0, 'SELL': -1}
weighted_score = 0
for opinion in opinions:
agent_name = opinion['agent_name']
vote = vote_mapping.get(opinion['recommendation'], 0)
weight = weights.get(agent_name, 1.0)
confidence = opinion.get('confidence', 0.5)
weighted_score += vote * weight * confidence
# 4. 根据阈值决定
if weighted_score >= 5:
decision = "STRONG_BUY"
elif weighted_score >= 2:
decision = "BUY"
elif weighted_score <= -5:
decision = "STRONG_SELL"
elif weighted_score <= -2:
decision = "SELL"
else:
decision = "HOLD"
# 5. 风险管理覆盖
risk_metrics = state.get('risk_metrics', {})
max_position = risk_metrics.get('max_position_size', 1.0)
return {
'decision': decision,
'position_size': calculate_position_size(decision, max_position),
'reasoning': generate_reasoning(opinions, weighted_score)
}
配置示例 (Configuration)
.env 文件
# .env.example
# LLM API Keys (至少配置一个)
OPENAI_API_KEY=sk-... # 用于 GPT-4o, GPT-4o-mini
ANTHROPIC_API_KEY=sk-ant-... # 用于 Claude 3.5
GROQ_API_KEY=gsk_... # 用于 Groq (快速推理)
DEEPSEEK_API_KEY=sk-... # 用于 DeepSeek
# 金融数据 API
FINANCIAL_DATASETS_API_KEY=... # 获取完整财务数据 (可选)
# LLM 配置
LLM_PROVIDER=openai # openai | anthropic | groq | ollama
LLM_MODEL=gpt-4o # 模型名称
LLM_TEMPERATURE=0.3 # 0.0-1.0,越低越确定
# 运行配置
MAX_WORKERS=4 # 并行智能体数量
TIMEOUT_PER_AGENT=30 # 每个智能体超时 (秒)
RETRY_COUNT=3 # API 失败重试次数
运行命令
# 基础分析 (使用 OpenAI GPT-4o)
poetry run python src/main.py --ticker AAPL,MSFT,NVDA
# 指定日期范围
poetry run python src/main.py --ticker AAPL --start-date 2024-01-01 --end-date 2024-12-31
# 使用本地模型 (无 API 成本)
poetry run python src/main.py --ticker AAPL --ollama --model llama2
# 运行回测
poetry run python src/backtester.py --ticker AAPL,MSFT --start-date 2023-01-01 --end-date 2024-12-31
参考资料 (References)
- LangGraph StateGraph API - 状态图定义
- LangChain Tool Calling - 工具集成
- virattt/ai-hedge-fund src/graph - 完整实现
- Financial Datasets API - 金融数据服务