Logo
热心市民王先生

关键代码验证

技术研究 GitHub API

为了验证大宗商品轮动规律,需要构建一个能够反映不同类别商品轮动状态的指数模型。该模型将:

核心模型:大宗商品轮动指数构建

模型设计思路

为了验证大宗商品轮动规律,需要构建一个能够反映不同类别商品轮动状态的指数模型。该模型将:

  1. 分类跟踪:跟踪贵金属、工业金属、能源、农产品四大类别的价格走势
  2. 计算相对强度:计算各类别相对于基准的相对强弱
  3. 识别轮动信号:根据相对强度变化识别轮动信号
  4. 回测验证:使用历史数据验证轮动规律的有效性

数据结构定义

商品分类

# 商品类别分类定义
COMMODITY_CATEGORIES = {
    "precious_metals": {
        "name": "贵金属",
        "symbols": ["GC=F", "SI=F", "PL=F"],  # 黄金、白银、铂金
        "weights": [0.6, 0.3, 0.1],
        "risk_profile": "low"
    },
    "base_metals": {
        "name": "工业金属",
        "symbols": ["HG=F", "LME-MCU3", "LME-MSN3"],  # 铜、铝、镍
        "weights": [0.5, 0.3, 0.2],
        "risk_profile": "medium"
    },
    "energy": {
        "name": "能源",
        "symbols": ["CL=F", "NG=F", "RB=F"],  # 原油、天然气、汽油
        "weights": [0.5, 0.2, 0.3],
        "risk_profile": "high"
    },
    "agriculture": {
        "name": "农产品",
        "symbols": ["ZC=F", "ZW=F", "ZR=F", "ZS=F"],  # 玉米、小麦、稻谷、大豆
        "weights": [0.3, 0.2, 0.1, 0.4],
        "risk_profile": "medium"
    }
}

市场环境指标

# 市场环境指标定义
MARKET_INDICATORS = {
    "macro": {
        "growth": ["PMI", "GDP_GROWTH", "INDUSTRIAL_PRODUCTION"],
        "inflation": ["CPI", "PPI", "CORE_CPI"],
        "monetary": ["FED_FUNDS_RATE", "REAL_RATE", "M2_GROWTH"]
    },
    "liquidity": {
        "risk_preference": ["VIX", "CREDIT_SPREAD"],
        "global_liquidity": ["FED_BALANCE_SHEET", "ECB_BALANCE_SHEET"],
        "currency": ["DXY", "EUR_USD", "USD_CNY"]
    },
    "commodity_specific": {
        "inventory": ["CRUDE_INVENTORY", "COPPER_INVENTORY"],
        "supply": ["OPEC_PRODUCTION", "COPPER_MINE_OUTPUT"],
        "demand": ["REFINERY_UTILIZATION", "CHINA_IMPORTS"]
    }
}

关键算法1:轮动信号识别

相对强度计算

概念说明

相对强度(Relative Strength)衡量某类商品相对于基准的表现。相对强度上升表示该类商品跑赢基准,可能是轮动开始或加强的信号。

算法伪代码

def calculate_relative_strength(category_price_series, benchmark_series, lookback_period=20):
    """
    计算类别的相对强度
    
    参数:
        category_price_series: 类别价格序列
        benchmark_series: 基准价格序列(如CRB指数)
        lookback_period: 回看周期,默认20天
    
    返回:
        相对强度序列
    """
    # 计算各类别和基准的收益率
    category_returns = calculate_returns(category_price_series)
    benchmark_returns = calculate_returns(benchmark_series)
    
    # 计算超额收益
    excess_returns = category_returns - benchmark_returns
    
    # 计算移动平均超额收益(平滑处理)
    smooth_excess_returns = moving_average(excess_returns, lookback_period)
    
    # 计算相对强度(标准化处理)
    relative_strength = (smooth_excess_returns / standard_deviation(excess_returns, lookback_period))
    
    return relative_strength

应用示例

# 计算贵金属相对强度
gold_prices = get_historical_prices("GC=F", start_date="2020-01-01", end_date="2024-12-31")
crb_index = get_historical_prices("^CRB", start_date="2020-01-01", end_date="2024-12-31")

gold_relative_strength = calculate_relative_strength(gold_prices, crb_index, lookback_period=60)

# 判断轮动信号
if gold_relative_strength[-1] > 1.0 and gold_relative_strength[-5] < 0.5:
    print("贵金属轮动信号:黄金相对强度显著上升")

轮动阶段识别

概念说明

根据各类别相对强度的排序,识别当前所处的轮动阶段。

算法伪代码

def identify_rotation_phase(relative_strength_dict):
    """
    识别当前轮动阶段
    
    参数:
        relative_strength_dict: 各类别相对强度字典
            {
                "precious_metals": 0.8,
                "base_metals": 0.5,
                "energy": 0.2,
                "agriculture": 0.1
            }
    
    返回:
        轮动阶段(字符串)
    """
    # 按相对强度排序
    sorted_categories = sorted(relative_strength_dict.items(), key=lambda x: x[1], reverse=True)
    
    # 判断轮动阶段
    top_category = sorted_categories[0][0]
    second_category = sorted_categories[1][0]
    
    if top_category == "precious_metals" and second_category == "base_metals":
        phase = "阶段1:贵金属先行"
    elif top_category == "base_metals" and second_category == "energy":
        phase = "阶段2:工业金属确认"
    elif top_category == "energy" and second_category == "agriculture":
        phase = "阶段3:能源放大"
    elif top_category == "agriculture":
        phase = "阶段4:农产品扩散"
    else:
        phase = "过渡期:轮动转换中"
    
    return phase

应用示例

# 计算所有类别的相对强度
relative_strength_dict = {
    "precious_metals": calculate_relative_strength(precious_prices, crb_index),
    "base_metals": calculate_relative_strength(base_prices, crb_index),
    "energy": calculate_relative_strength(energy_prices, crb_index),
    "agriculture": calculate_relative_strength(agri_prices, crb_index)
}

# 获取当前各类别的最新相对强度值
current_relative_strength = {
    "precious_metals": relative_strength_dict["precious_metals"][-1],
    "base_metals": relative_strength_dict["base_metals"][-1],
    "energy": relative_strength_dict["energy"][-1],
    "agriculture": relative_strength_dict["agriculture"][-1]
}

# 识别轮动阶段
current_phase = identify_rotation_phase(current_relative_strength)
print(f"当前轮动阶段:{current_phase}")

关键算法2:宏观因子与轮动相关性分析

因子-商品相关性模型

概念说明

分析宏观因子与各类商品收益率的相关性,识别驱动轮动的关键因子。

算法伪代码

def calculate_factor_commodity_correlation(factor_data, commodity_returns, correlation_method="pearson"):
    """
    计算因子与商品收益率的相关性
    
    参数:
        factor_data: 因子数据序列
        commodity_returns: 商品收益率序列
        correlation_method: 相关性计算方法(pearson或spearman)
    
    返回:
        相关系数和p值
    """
    # 确保数据长度一致
    min_length = min(len(factor_data), len(commodity_returns))
    factor_data = factor_data[-min_length:]
    commodity_returns = commodity_returns[-min_length:]
    
    # 计算相关性
    if correlation_method == "pearson":
        correlation, p_value = scipy.stats.pearsonr(factor_data, commodity_returns)
    elif correlation_method == "spearman":
        correlation, p_value = scipy.stats.spearmanr(factor_data, commodity_returns)
    
    return correlation, p_value

滚动窗口相关性

def rolling_correlation(factor_data, commodity_returns, window=60):
    """
    计算滚动窗口相关性
    
    参数:
        factor_data: 因子数据序列
        commodity_returns: 商品收益率序列
        window: 滚动窗口长度
    
    返回:
        滚动相关性序列
    """
    rolling_corrs = []
    
    for i in range(window, len(factor_data)):
        window_factor = factor_data[i-window:i]
        window_returns = commodity_returns[i-window:i]
        
        correlation, _ = calculate_factor_commodity_correlation(window_factor, window_returns)
        rolling_corrs.append(correlation)
    
    return rolling_corrs

应用示例

# 获取宏观因子数据
real_rate_data = get_historical_data("REAL_INTEREST_RATE", start_date="2020-01-01")
pmi_data = get_historical_data("PMI", start_date="2020-01-01")

# 计算商品收益率
gold_returns = calculate_returns(get_historical_prices("GC=F"))
copper_returns = calculate_returns(get_historical_prices("HG=F"))

# 计算相关性
gold_real_rate_corr, p_value = calculate_factor_commodity_correlation(real_rate_data, gold_returns)
copper_pmi_corr, p_value = calculate_factor_commodity_correlation(pmi_data, copper_returns)

print(f"黄金与实际利率相关性:{gold_real_rate_corr:.3f} (p值: {p_value:.3f})")
print(f"铜与PMI相关性:{copper_pmi_corr:.3f} (p值: {p_value:.3f})")

# 计算滚动相关性
gold_real_rate_rolling = rolling_correlation(real_rate_data, gold_returns, window=60)

关键算法3:有色金属传导顺序验证

传导速度计算

概念说明

计算有色金属之间的传导速度,验证用户提出的传导顺序。

算法伪代码

def calculate_transmission_speed(price_series_list, max_lag=10):
    """
    计算价格序列之间的传导速度(Granger因果关系检验)
    
    参数:
        price_series_list: 价格序列列表 [贵金属, 铜, 铝, 锂, 锡, 钨, 镍, 钢铁]
        max_lag: 最大滞后阶数
    
    返回:
        传导速度矩阵
    """
    n = len(price_series_list)
    transmission_matrix = np.zeros((n, n))
    
    for i in range(n):
        for j in range(n):
            if i != j:
                # 计算Granger因果关系
                causality, p_value = granger_causality_test(
                    price_series_list[i], 
                    price_series_list[j], 
                    max_lag=max_lag
                )
                
                # 如果p值显著,记录传导方向
                if p_value < 0.05:
                    # 找到最优滞后阶数
                    best_lag = find_optimal_lag(price_series_list[i], price_series_list[j])
                    transmission_matrix[i][j] = best_lag
    
    return transmission_matrix

Granger因果关系检验

def granger_causality_test(series1, series2, max_lag):
    """
    Granger因果关系检验
    
    参数:
        series1: 因变量序列
        series2: 自变量序列
        max_lag: 最大滞后阶数
    
    返回:
        因果关系F统计量和p值
    """
    from statsmodels.tsa.stattools import grangercausalitytests
    
    # 准备数据
    data = pd.DataFrame({'y': series1, 'x': series2})
    
    # 进行Granger因果检验
    test_result = grangercausalitytests(data, max_lag, verbose=False)
    
    # 提取F统计量和p值(使用max_lag的结果)
    f_statistic = test_result[max_lag][0]['ssr_ftest'][0]
    p_value = test_result[max_lag][0]['ssr_ftest'][1]
    
    return f_statistic, p_value

应用示例

# 获取有色金属价格数据
metal_prices = {
    "gold": get_historical_prices("GC=F"),
    "silver": get_historical_prices("SI=F"),
    "copper": get_historical_prices("HG=F"),
    "aluminum": get_historical_prices("LME-MAH3"),
    "lithium": get_lithium_price_data(),  # 锂的历史数据可能较短
    "tin": get_historical_prices("LME-MSN3"),
    "nickel": get_historical_prices("LME-MNI3"),
    "steel": get_historical_prices("LME-STL3")
}

# 按照用户提出的顺序排列
price_series_list = [
    metal_prices["gold"],
    metal_prices["silver"],
    metal_prices["copper"],
    metal_prices["aluminum"],
    metal_prices["lithium"],
    metal_prices["tin"],
    metal_prices["nickel"],
    metal_prices["steel"]
]

# 计算传导速度
transmission_matrix = calculate_transmission_speed(price_series_list, max_lag=10)

# 分析传导路径
print("有色金属传导路径分析:")
for i in range(len(transmission_matrix) - 1):
    current_metal = list(metal_prices.keys())[i]
    next_metal = list(metal_prices.keys())[i + 1]
    lag = transmission_matrix[i][i + 1]
    
    if lag > 0:
        print(f"{current_metal} -> {next_metal}: 滞后{lag}期")
    else:
        print(f"{current_metal} -> {next_metal}: 无显著传导")

配置与参数调优

关键参数配置

回看周期选择

# 不同商品类别的推荐回看周期
LOOKBACK_PERIODS = {
    "precious_metals": {
        "short": 20,    # 短期:1个月
        "medium": 60,   # 中期:3个月
        "long": 252     # 长期:1年
    },
    "base_metals": {
        "short": 20,
        "medium": 60,
        "long": 252
    },
    "energy": {
        "short": 20,
        "medium": 60,
        "long": 252
    },
    "agriculture": {
        "short": 60,    # 农产品周期较长
        "medium": 120,
        "long": 252
    }
}

相关性阈值设定

# 相关性阈值配置
CORRELATION_THRESHOLDS = {
    "strong": 0.7,     # 强相关
    "moderate": 0.4,   # 中等相关
    "weak": 0.2,       # 弱相关
    "negligible": 0.1  # 可忽略
}

# 统计显著性阈值
P_VALUE_THRESHOLD = 0.05  # p < 0.05表示统计显著

滚动窗口选择

# 滚动窗口配置
ROLLING_WINDOW_CONFIG = {
    "short_term": 20,      # 短期:1个月
    "medium_term": 60,     # 中期:3个月
    "long_term": 120,      # 长期:6个月
    "very_long_term": 252  # 超长期:1年
}

# 不同应用场景的推荐窗口
WINDOW_SELECTION = {
    "trading": "short_term",          # 交易:短期窗口
    "tactical": "medium_term",       # 战术:中期窗口
    "strategic": "long_term",         # 战略:长期窗口
    "structural": "very_long_term"    # 结构:超长期窗口
}

模型性能评估

准确性评估指标

def evaluate_rotation_model(actual_rotation, predicted_rotation):
    """
    评估轮动模型的准确性
    
    参数:
        actual_rotation: 实际轮动序列
        predicted_rotation: 预测轮动序列
    
    返回:
        评估指标字典
    """
    # 准确率
    accuracy = sum(actual_rotation == predicted_rotation) / len(actual_rotation)
    
    # 精确率
    true_positives = sum((actual_rotation == predicted_rotation) & (predicted_rotation != "transitional"))
    precision = true_positives / sum(predicted_rotation != "transitional")
    
    # 召回率
    recall = true_positives / sum(actual_rotation != "transitional")
    
    # F1分数
    f1_score = 2 * (precision * recall) / (precision + recall)
    
    return {
        "accuracy": accuracy,
        "precision": precision,
        "recall": recall,
        "f1_score": f1_score
    }

回测框架

def backtest_rotation_strategy(start_date, end_date, rebalance_frequency="monthly"):
    """
    回测轮动策略
    
    参数:
        start_date: 开始日期
        end_date: 结束日期
        rebalance_frequency: 再平衡频率
    
    返回:
        回测结果
    """
    # 生成交易信号
    signals = generate_rotation_signals(start_date, end_date)
    
    # 执行交易
    portfolio_value = execute_trades(signals, rebalance_frequency)
    
    # 计算收益
    returns = calculate_returns(portfolio_value)
    
    # 计算风险指标
    sharpe_ratio = calculate_sharpe_ratio(returns)
    max_drawdown = calculate_max_drawdown(portfolio_value)
    
    return {
        "total_return": (portfolio_value[-1] / portfolio_value[0] - 1),
        "annualized_return": annualize_returns(returns),
        "sharpe_ratio": sharpe_ratio,
        "max_drawdown": max_drawdown,
        "volatility": np.std(returns) * np.sqrt(252)
    }

参考资料