Logo
热心市民王先生

关键代码验证

技术研究 AI 对冲基金 代码验证

AI 对冲基金核心代码模式 - 智能体定义、LangGraph 状态机、投票聚合机制实现

核心智能体模式 (Core Agent Pattern)

基础智能体类

所有智能体继承自统一的基类,确保接口一致性:

# src/agents/base_agent.py
from abc import ABC, abstractmethod
from typing import Dict, Any
from langchain_core.language_models import BaseChatModel

class BaseAgent(ABC):
    """所有投资智能体的基类"""
    
    def __init__(self, llm: BaseChatModel, system_prompt: str):
        self.llm = llm
        self.system_prompt = system_prompt
    
    @abstractmethod
    def analyze(self, state: Dict[str, Any]) -> Dict[str, Any]:
        """分析当前状态并返回意见"""
        pass
    
    def _build_prompt(self, state: Dict[str, Any]) -> str:
        """构建完整的提示词"""
        return f"""{self.system_prompt}

当前分析标的:{state.get('ticker', 'Unknown')}
财务数据:{state.get('fundamentals', {})}
价格数据:{state.get('prices', {})}
市场情绪:{state.get('sentiment', {})}

请基于以上信息进行分析,并给出投资建议 (BUY/HOLD/SELL)。
"""
    
    def _parse_response(self, response: str) -> Dict[str, Any]:
        """解析 LLM 响应为结构化数据"""
        # 实际实现会提取:
        # - 投资理由
        # - 建议操作
        # - 置信度
        # - 关键指标
        return {
            "agent_name": self.__class__.__name__,
            "recommendation": "BUY",  # 示例
            "confidence": 0.75,
            "reasoning": response
        }

巴菲特智能体实现

# src/agents/investors/buffett_agent.py
from .base_agent import BaseAgent

BUFFETT_PROMPT = """
你是沃伦·巴菲特,伯克希尔哈撒韦公司董事长,价值投资传奇人物。

【投资哲学】
1. 寻找具有持久竞争优势 (护城河) 的优秀企业
2. 关注 ROE、自由现金流、管理层质量
3. 只在价格低于内在价值时买入
4. 长期持有,几乎不做波段

【分析框架】
- 业务是否简单易懂?(我能向 10 岁孩子解释这家公司做什么吗?)
- 是否有稳定的经营历史?(过去 10 年盈利是否稳定增长?)
- 管理层是否诚信且能干?(资本配置是否合理?)
- 价格是否有安全边际?(当前价格是否显著低于内在价值?)

【输出要求】
1. 用巴菲特的口吻和思维方式分析
2. 引用具体财务指标支撑观点
3. 给出明确的投资建议 (BUY/HOLD/SELL)
4. 说明置信度和关键风险
"""

class BuffettAgent(BaseAgent):
    """沃伦·巴菲特投资智能体"""
    
    def __init__(self, llm: BaseChatModel):
        super().__init__(llm, BUFFETT_PROMPT)
    
    def analyze(self, state: Dict[str, Any]) -> Dict[str, Any]:
        """执行巴菲特风格的投资分析"""
        # 1. 从 state 提取关键指标
        fundamentals = state.get('fundamentals', {})
        roe = fundamentals.get('return_on_equity', 0)
        profit_margin = fundamentals.get('profit_margin', 0)
        debt_to_equity = fundamentals.get('debt_to_equity', 0)
        
        # 2. 构建提示词
        prompt = self._build_prompt(state)
        
        # 3. 添加巴菲特特定的分析维度
        prompt += f"""
特别关注:
- ROE: {roe:.2%} (巴菲特偏好>15%)
- 利润率:{profit_margin:.2%} (偏好稳定或增长)
- 负债权益比:{debt_to_equity:.2f} (偏好<1)
"""
        
        # 4. 调用 LLM
        response = self.llm.invoke(prompt)
        
        # 5. 解析并返回
        return self._parse_response(response.content)

LangGraph 状态定义 (StateGraph Definition)

State 结构

# src/graph/state.py
from typing import TypedDict, List, Dict, Any, Annotated
from typing_extensions import NotRequired
import operator

class HedgeFundState(TypedDict):
    """对冲基金分析的共享状态"""
    
    # 输入参数
    ticker: str
    start_date: str
    end_date: str
    
    # 原始数据 (数据层填充)
    fundamentals: Dict[str, Any]
    prices: Dict[str, Any]
    sentiment: List[Dict[str, Any]]
    
    # 智能体意见 (逐步累积)
    investor_opinions: Annotated[List[Dict], operator.add]
    analysis_scores: Dict[str, float]
    
    # 风险参数
    risk_metrics: Dict[str, Any]
    
    # 最终决策
    decision: str
    position_size: float
    reasoning: str

关键设计:使用 Annotated[List, operator.add] 实现意见的自动累积,每个智能体的输出会追加到列表中。

StateGraph 构建

# src/graph/workflow.py
from langgraph.graph import StateGraph, END
from .state import HedgeFundState

def create_hedge_fund_graph() -> StateGraph:
    """创建对冲基金分析的完整工作流"""
    
    # 1. 初始化状态图
    graph_builder = StateGraph(HedgeFundState)
    
    # 2. 添加数据获取节点
    graph_builder.add_node("fetch_data", fetch_data_node)
    
    # 3. 添加投资者智能体节点
    graph_builder.add_node("buffett_agent", create_agent_node(BuffettAgent))
    graph_builder.add_node("graham_agent", create_agent_node(GrahamAgent))
    graph_builder.add_node("wood_agent", create_agent_node(WoodAgent))
    # ... 其他投资者
    
    # 4. 添加专业分析节点
    graph_builder.add_node("fundamentals_agent", fundamentals_node)
    graph_builder.add_node("technicals_agent", technicals_node)
    graph_builder.add_node("valuation_agent", valuation_node)
    graph_builder.add_node("sentiment_agent", sentiment_node)
    
    # 5. 添加管理节点
    graph_builder.add_node("risk_manager", risk_manager_node)
    graph_builder.add_node("portfolio_manager", portfolio_manager_node)
    
    # 6. 定义边 (流程控制)
    graph_builder.set_entry_point("fetch_data")
    
    # 数据获取 → 投资者分析
    graph_builder.add_edge("fetch_data", "buffett_agent")
    graph_builder.add_edge("buffett_agent", "graham_agent")
    graph_builder.add_edge("graham_agent", "wood_agent")
    # ... 链式调用所有投资者
    
    # 投资者分析 → 专业分析
    graph_builder.add_edge("wood_agent", "fundamentals_agent")
    graph_builder.add_edge("fundamentals_agent", "technicals_agent")
    # ...
    
    # 专业分析 → 风险管理 → 投资决策
    graph_builder.add_edge("sentiment_agent", "risk_manager")
    graph_builder.add_edge("risk_manager", "portfolio_manager")
    graph_builder.add_edge("portfolio_manager", END)
    
    return graph_builder.compile()

工具集成 (Tool Integration)

财务数据获取

# src/tools/data_fetcher.py
import yfinance as yf
from financial_datasets import get_income_statements, get_balance_sheets

def fetch_fundamentals(ticker: str) -> Dict[str, Any]:
    """获取公司基本面数据"""
    stock = yf.Ticker(ticker)
    
    # 免费数据 (AAPL, GOOGL, MSFT, NVDA, TSLA)
    info = stock.info
    financials = stock.financials
    
    # 付费数据 (需要 Financial Datasets API key)
    # income = get_income_statements(ticker)
    # balance = get_balance_sheets(ticker)
    
    return {
        'market_cap': info.get('marketCap', 0),
        'pe_ratio': info.get('trailingPE', 0),
        'pb_ratio': info.get('priceToBook', 0),
        'roe': info.get('returnOnEquity', 0),
        'profit_margin': info.get('profitMargins', 0),
        'revenue_growth': info.get('revenueGrowth', 0),
        'debt_to_equity': info.get('debtToEquity', 0),
    }

def fetch_prices(ticker: str, start_date: str, end_date: str) -> Dict[str, Any]:
    """获取价格历史数据"""
    stock = yf.Ticker(ticker)
    hist = stock.history(start=start_date, end=end_date)
    
    return {
        'current_price': hist['Close'].iloc[-1],
        'price_change_1m': (hist['Close'].iloc[-1] / hist['Close'].iloc[-21] - 1) * 100,
        'price_change_3m': (hist['Close'].iloc[-1] / hist['Close'].iloc[-63] - 1) * 100,
        'price_change_1y': (hist['Close'].iloc[-1] / hist['Close'].iloc[-252] - 1) * 100,
        'volatility': hist['Close'].std(),
    }

新闻情绪分析

# src/tools/sentiment_analyzer.py
from langchain.tools import tool

@tool
def analyze_news_sentiment(ticker: str, days: int = 7) -> float:
    """分析最近 N 天的新闻情绪"""
    # 1. 获取新闻 (使用新闻 API)
    news_articles = fetch_news(ticker, days)
    
    # 2. 使用 LLM 分析每条新闻的情绪
    sentiment_scores = []
    for article in news_articles:
        prompt = f"""分析以下新闻的情绪 (0=极度负面, 1=中性, 2=极度正面):
{article['title']}
{article['summary']}
仅返回 0-2 之间的数字。"""
        score = llm.invoke(prompt)
        sentiment_scores.append(float(score.content))
    
    # 3. 计算平均情绪
    avg_sentiment = sum(sentiment_scores) / len(sentiment_scores)
    
    # 4. 归一化到 [-1, 1] 范围
    normalized = (avg_sentiment - 1) / 1
    
    return normalized

投票聚合机制 (Voting Aggregation)

加权投票算法

# src/graph/nodes.py
def portfolio_manager_node(state: HedgeFundState) -> Dict:
    """投资组合经理:聚合所有意见,做出最终决策"""
    
    # 1. 提取所有投资者意见
    opinions = state['investor_opinions']
    
    # 2. 定义投资者权重 (可配置)
    weights = {
        'BuffettAgent': 1.5,      # 巴菲特权重最高
        'GrahamAgent': 1.3,       # 格雷厄姆次之
        'WoodAgent': 1.0,
        'BurryAgent': 1.2,        # 逆向思维有价值
        # ... 其他投资者权重为 1.0
    }
    
    # 3. 计算加权投票
    vote_mapping = {'BUY': 1, 'HOLD': 0, 'SELL': -1}
    weighted_score = 0
    
    for opinion in opinions:
        agent_name = opinion['agent_name']
        vote = vote_mapping.get(opinion['recommendation'], 0)
        weight = weights.get(agent_name, 1.0)
        confidence = opinion.get('confidence', 0.5)
        
        weighted_score += vote * weight * confidence
    
    # 4. 根据阈值决定
    if weighted_score >= 5:
        decision = "STRONG_BUY"
    elif weighted_score >= 2:
        decision = "BUY"
    elif weighted_score <= -5:
        decision = "STRONG_SELL"
    elif weighted_score <= -2:
        decision = "SELL"
    else:
        decision = "HOLD"
    
    # 5. 风险管理覆盖
    risk_metrics = state.get('risk_metrics', {})
    max_position = risk_metrics.get('max_position_size', 1.0)
    
    return {
        'decision': decision,
        'position_size': calculate_position_size(decision, max_position),
        'reasoning': generate_reasoning(opinions, weighted_score)
    }

配置示例 (Configuration)

.env 文件

# .env.example

# LLM API Keys (至少配置一个)
OPENAI_API_KEY=sk-...           # 用于 GPT-4o, GPT-4o-mini
ANTHROPIC_API_KEY=sk-ant-...    # 用于 Claude 3.5
GROQ_API_KEY=gsk_...            # 用于 Groq (快速推理)
DEEPSEEK_API_KEY=sk-...         # 用于 DeepSeek

# 金融数据 API
FINANCIAL_DATASETS_API_KEY=...  # 获取完整财务数据 (可选)

# LLM 配置
LLM_PROVIDER=openai             # openai | anthropic | groq | ollama
LLM_MODEL=gpt-4o                # 模型名称
LLM_TEMPERATURE=0.3             # 0.0-1.0,越低越确定

# 运行配置
MAX_WORKERS=4                   # 并行智能体数量
TIMEOUT_PER_AGENT=30            # 每个智能体超时 (秒)
RETRY_COUNT=3                   # API 失败重试次数

运行命令

# 基础分析 (使用 OpenAI GPT-4o)
poetry run python src/main.py --ticker AAPL,MSFT,NVDA

# 指定日期范围
poetry run python src/main.py --ticker AAPL --start-date 2024-01-01 --end-date 2024-12-31

# 使用本地模型 (无 API 成本)
poetry run python src/main.py --ticker AAPL --ollama --model llama2

# 运行回测
poetry run python src/backtester.py --ticker AAPL,MSFT --start-date 2023-01-01 --end-date 2024-12-31

参考资料 (References)