950 lines
42 KiB
Python
950 lines
42 KiB
Python
"""
|
||
量化交易演示系列 第5篇:组合优化
|
||
Quantitative Trading Demo Series - Part 5: Portfolio Optimization
|
||
|
||
涵盖内容 / Topics Covered:
|
||
§0 合成股票池与因子模型 (Synthetic Universe & Factor Model)
|
||
§1 协方差矩阵估计 (Covariance Matrix Estimation)
|
||
§2 马科维兹均值-方差优化 (Markowitz Mean-Variance Optimization)
|
||
§3 有效前沿 (Efficient Frontier)
|
||
§4 特殊组合:等权/最小方差/最大夏普 (Special Portfolios: EW / MinVar / MaxSharpe)
|
||
§5 风险平价 (Risk Parity)
|
||
§6 Black-Litterman 模型 (Black-Litterman Model)
|
||
§7 带约束的组合优化 (Constrained Portfolio Optimization)
|
||
§8 滚动回测:五策略对比 (Rolling Backtest: 5-Strategy Comparison)
|
||
§9 可视化 (9-Panel Visualization)
|
||
|
||
依赖库 / Dependencies:
|
||
numpy, pandas, scipy, matplotlib, sklearn
|
||
|
||
作者 / Author: Quant Demo Series
|
||
"""
|
||
|
||
# ── 无界面绘图模式,必须在 import pyplot 之前设置 ──
|
||
# Headless plotting mode — MUST be set before importing pyplot
|
||
import matplotlib
|
||
matplotlib.use('Agg')
|
||
|
||
import warnings
|
||
warnings.filterwarnings('ignore')
|
||
|
||
import numpy as np
|
||
import pandas as pd
|
||
import matplotlib.pyplot as plt
|
||
import matplotlib.gridspec as gridspec
|
||
from matplotlib.ticker import FuncFormatter
|
||
from scipy.optimize import minimize
|
||
|
||
# ── 随机种子(确保结果可复现)/ Random seed for reproducibility ──
|
||
np.random.seed(42)
|
||
|
||
# ── 中文字体配置 / Chinese font configuration ──
|
||
plt.rcParams['font.sans-serif'] = [
|
||
'WenQuanYi Zen Hei', 'Arial Unicode MS', 'SimHei', 'DejaVu Sans'
|
||
]
|
||
plt.rcParams['axes.unicode_minus'] = False
|
||
|
||
print("=" * 70)
|
||
print(" 量化交易 组合优化演示")
|
||
print(" Quantitative Trading: Portfolio Optimization Demo")
|
||
print("=" * 70)
|
||
|
||
|
||
# ══════════════════════════════════════════════════════════════════════
|
||
# §0 合成股票池与行业因子模型
|
||
# Synthetic Universe & Sector Factor Model
|
||
# ══════════════════════════════════════════════════════════════════════
|
||
#
|
||
# 使用 Barra 风格的因子模型生成收益率:
|
||
# Returns are generated using a Barra-style factor model:
|
||
#
|
||
# r_{i,t} = β_{mkt,i} × r_{mkt,t} ← 市场因子 (Market factor)
|
||
# + Σ_k β_{sec,i,k} × f_{k,t} ← 行业因子 (Sector factors)
|
||
# + α_i ← 个股 Alpha (Stock-specific alpha)
|
||
# + ε_{i,t} ← 特质噪音 (Idiosyncratic noise)
|
||
|
||
N_STOCKS = 20 # 股票数量 (Number of stocks)
|
||
N_YEARS = 3 # 历史年数 (History in years)
|
||
FREQ = 252 # 年化因子 / 每年交易日数 (Annualization factor)
|
||
RF = 0.02 # 无风险利率 (Risk-free rate), 年化
|
||
|
||
START_DATE = "2021-01-04"
|
||
dates = pd.bdate_range(START_DATE, periods=N_YEARS * FREQ) # 工作日序列
|
||
N_DAYS = len(dates)
|
||
|
||
# ── 行业分组 / Sector assignments ──
|
||
SECTORS = {
|
||
"科技 Tech": list(range(0, 5)), # S00–S04
|
||
"金融 Finance": list(range(5, 9)), # S05–S08
|
||
"消费 Consumer": list(range(9, 13)), # S09–S12
|
||
"工业 Industrial": list(range(13, 17)), # S13–S16
|
||
"医疗 Healthcare": list(range(17, 20)), # S17–S19
|
||
}
|
||
SECTOR_NAMES = list(SECTORS.keys())
|
||
SECTOR_SHORT = ["科技", "金融", "消费", "工业", "医疗"]
|
||
N_SECTORS = len(SECTORS)
|
||
stock_names = [f"S{i:02d}" for i in range(N_STOCKS)]
|
||
|
||
# ── 行业公共因子收益率 / Sector factor daily returns ──
|
||
sector_ann_ret = np.array([0.16, 0.10, 0.12, 0.08, 0.13]) # 各行业年化期望收益
|
||
sector_ann_vol = np.array([0.25, 0.18, 0.20, 0.15, 0.22]) # 各行业年化波动率
|
||
|
||
sector_factor_rets = np.zeros((N_DAYS, N_SECTORS))
|
||
for k in range(N_SECTORS):
|
||
mu_d = sector_ann_ret[k] / FREQ
|
||
sig_d = sector_ann_vol[k] / np.sqrt(FREQ)
|
||
sector_factor_rets[:, k] = np.random.normal(mu_d, sig_d, N_DAYS)
|
||
|
||
# ── 市场公共因子 / Market common factor ──
|
||
mkt_daily = np.random.normal(0.10 / FREQ, 0.18 / np.sqrt(FREQ), N_DAYS)
|
||
|
||
# ── 各股的因子载荷(Beta 暴露)/ Factor loadings (Beta exposures) ──
|
||
betas_mkt = np.random.uniform(0.6, 1.4, N_STOCKS) # 市场 Beta
|
||
betas_sec = np.zeros((N_STOCKS, N_SECTORS)) # 行业 Beta
|
||
|
||
for k, (sname, sidx) in enumerate(SECTORS.items()):
|
||
for i in sidx:
|
||
betas_sec[i, k] = np.random.uniform(0.5, 1.0) # 本行业:强载荷
|
||
for k2 in range(N_SECTORS):
|
||
if k2 != k:
|
||
betas_sec[i, k2] = np.random.uniform(0.0, 0.15) # 其他行业:弱载荷
|
||
|
||
# ── 个股特质波动率 / Idiosyncratic volatility ──
|
||
idio_vols = np.random.uniform(0.12, 0.30, N_STOCKS) / np.sqrt(FREQ)
|
||
idio_rets = np.random.normal(0, idio_vols, (N_DAYS, N_STOCKS))
|
||
|
||
# ── 个股特质 Alpha / Stock-specific alpha ──
|
||
# 科技股注入正 Alpha,让最大夏普组合有明显的倾向性
|
||
idio_alpha = np.zeros(N_STOCKS)
|
||
idio_alpha[:5] = np.random.uniform(0.04, 0.09, 5) / FREQ # 科技股:正 Alpha
|
||
idio_alpha[5:9] = np.random.uniform(-0.02, 0.02, 4) / FREQ # 金融股:中性
|
||
|
||
# ── 合成日收益率矩阵 / Composite daily return matrix ──
|
||
daily_ret_mat = np.zeros((N_DAYS, N_STOCKS))
|
||
for i in range(N_STOCKS):
|
||
daily_ret_mat[:, i] = (
|
||
betas_mkt[i] * mkt_daily # 市场贡献
|
||
+ betas_sec[i] @ sector_factor_rets.T # 行业贡献(5 行业因子)
|
||
+ idio_alpha[i] # 个股 Alpha
|
||
+ idio_rets[:, i] # 特质噪音
|
||
)
|
||
|
||
ret_df = pd.DataFrame(daily_ret_mat, index=dates, columns=stock_names)
|
||
prices_df = (1 + ret_df).cumprod() * 100.0 # 从 100 元开始的价格序列
|
||
|
||
# ── 市值权重(Black-Litterman 需要作为先验市场组合)──
|
||
# Market-cap weights as Black-Litterman equilibrium prior
|
||
mktcap_weights = pd.Series(
|
||
np.random.dirichlet(np.ones(N_STOCKS) * 2.0),
|
||
index=stock_names
|
||
)
|
||
|
||
ann_rets = (1 + ret_df).prod() ** (FREQ / N_DAYS) - 1
|
||
print(f"\n[§0] 股票池生成完成 / Universe Generated")
|
||
print(f" 股票数量 (Stocks): {N_STOCKS} 只")
|
||
print(f" 交易日数 (Days): {N_DAYS} 天 {dates[0].date()} ~ {dates[-1].date()}")
|
||
print(f" 行业数量 (Sectors): {N_SECTORS} 个")
|
||
print(f" 个股年化收益区间: {ann_rets.min():.1%} ~ {ann_rets.max():.1%}")
|
||
|
||
|
||
# ══════════════════════════════════════════════════════════════════════
|
||
# §1 协方差矩阵估计:样本 vs Ledoit-Wolf 收缩
|
||
# Covariance Matrix Estimation: Sample vs Ledoit-Wolf Shrinkage
|
||
# ══════════════════════════════════════════════════════════════════════
|
||
#
|
||
# 协方差矩阵 (Covariance Matrix) Σ 是组合优化的核心输入。
|
||
# 样本协方差矩阵 (Sample Covariance Matrix) 存在两个问题:
|
||
# 1. 估计误差大(小样本情形,T/N 较小时)/ Large estimation error when T/N is small
|
||
# 2. 条件数 (Condition Number) 高,矩阵"病态",优化对估计误差非常敏感
|
||
#
|
||
# Ledoit-Wolf 收缩 (Ledoit-Wolf Shrinkage):
|
||
# Σ_shrunk = (1-α) × Σ_sample + α × F
|
||
# 其中 F 是结构化目标矩阵(如对角矩阵),α 是最优收缩系数(数据驱动自动选择)
|
||
# F is a structured target matrix; α is the optimal shrinkage coefficient.
|
||
|
||
# 样本协方差(年化) / Sample covariance (annualized)
|
||
cov_sample = ret_df.cov() * FREQ
|
||
mu_sample = ret_df.mean() * FREQ # 年化预期收益向量 (Annualized expected return vector)
|
||
|
||
# Ledoit-Wolf 收缩估计 / Ledoit-Wolf shrinkage estimation
|
||
try:
|
||
from sklearn.covariance import LedoitWolf
|
||
lw = LedoitWolf()
|
||
lw.fit(ret_df.values)
|
||
cov_shrink = pd.DataFrame(
|
||
lw.covariance_ * FREQ,
|
||
index=stock_names, columns=stock_names
|
||
)
|
||
shrink_alpha = lw.shrinkage_ # 最优收缩系数 α
|
||
USE_SHRINK = True
|
||
except ImportError:
|
||
cov_shrink = cov_sample.copy()
|
||
shrink_alpha = 0.0
|
||
USE_SHRINK = False
|
||
|
||
print(f"\n[§1] 协方差估计 / Covariance Estimation")
|
||
print(f" 样本协方差条件数 (Sample Cov Cond#): {np.linalg.cond(cov_sample.values):.1f}")
|
||
if USE_SHRINK:
|
||
print(f" Ledoit-Wolf 收缩系数 (Shrinkage α): {shrink_alpha:.4f}")
|
||
print(f" 收缩后条件数 (Shrunk Cov Cond#): {np.linalg.cond(cov_shrink.values):.1f}")
|
||
print(f" → 条件数降低意味着优化更稳健 / Lower cond# = more robust optimization")
|
||
|
||
# 正式使用的协方差矩阵(收缩版更稳健)/ Use shrunk covariance for optimization
|
||
cov_opt = cov_shrink if USE_SHRINK else cov_sample
|
||
mu_opt = mu_sample.copy()
|
||
|
||
|
||
# ══════════════════════════════════════════════════════════════════════
|
||
# §2 核心优化工具函数
|
||
# Core Optimization Utility Functions
|
||
# ══════════════════════════════════════════════════════════════════════
|
||
|
||
def portfolio_stats(weights, mu, cov, rf=RF):
|
||
"""
|
||
计算组合的三个核心绩效指标(年化)
|
||
Compute 3 key portfolio statistics (annualized).
|
||
|
||
参数 / Args:
|
||
weights : np.ndarray, 权重向量 (weight vector), shape (N,)
|
||
mu : pd.Series, 年化预期收益向量 (annualized expected returns)
|
||
cov : pd.DataFrame, 年化协方差矩阵 (annualized covariance matrix)
|
||
rf : float, 无风险利率 (risk-free rate)
|
||
|
||
返回 / Returns:
|
||
ret : 组合年化收益率 (portfolio annualized return)
|
||
vol : 组合年化波动率 (portfolio annualized volatility)
|
||
sharpe : 夏普比率 (Sharpe ratio) = (ret - rf) / vol
|
||
"""
|
||
w = np.asarray(weights)
|
||
ret = float(w @ mu) # w'μ
|
||
var = float(w @ cov.values @ w) # w'Σw(组合方差)
|
||
vol = np.sqrt(max(var, 1e-12)) # 组合波动率(年化)
|
||
sharpe = (ret - rf) / vol
|
||
return ret, vol, sharpe
|
||
|
||
|
||
def _base_optimize(objective, n, bounds, extra_constraints=None):
|
||
"""
|
||
通用 SLSQP 优化框架(内部使用)
|
||
Generic SLSQP optimization scaffold (internal use).
|
||
|
||
SLSQP = Sequential Least Squares Programming / 序列二次规划法
|
||
"""
|
||
w0 = np.ones(n) / n # 初始猜测:等权 (Initial guess: equal weight)
|
||
constraints = [{'type': 'eq', 'fun': lambda w: np.sum(w) - 1.0}] # 全投资约束
|
||
if extra_constraints:
|
||
constraints.extend(extra_constraints)
|
||
return minimize(
|
||
objective, w0,
|
||
method='SLSQP',
|
||
bounds=bounds,
|
||
constraints=constraints,
|
||
options={'ftol': 1e-12, 'maxiter': 1500}
|
||
)
|
||
|
||
|
||
def min_variance(mu, cov, allow_short=False):
|
||
"""
|
||
最小方差组合 (Minimum Variance Portfolio, MinVar)
|
||
|
||
这是组合优化中最稳健的组合,因为它完全不依赖对预期收益的估计。
|
||
Most robust portfolio: it does NOT require any return estimates.
|
||
|
||
问题形式 / Optimization problem:
|
||
minimize w'Σw
|
||
subject to Σw_i = 1 (全投资约束 / full investment)
|
||
w_i ≥ 0 (多头约束 / long-only, when allow_short=False)
|
||
"""
|
||
n = len(mu)
|
||
bnd = (-0.3, 1.0) if allow_short else (0.0, 1.0)
|
||
bounds = [bnd] * n
|
||
result = _base_optimize(lambda w: w @ cov.values @ w, n, bounds)
|
||
return pd.Series(result.x, index=cov.index)
|
||
|
||
|
||
def max_sharpe(mu, cov, rf=RF, allow_short=False):
|
||
"""
|
||
最大夏普比率组合 / 切线组合 (Maximum Sharpe / Tangency Portfolio)
|
||
|
||
在均值-方差空间中,从无风险资产出发到有效前沿的切线点。
|
||
Tangency point from the risk-free asset to the efficient frontier (Capital Market Line).
|
||
|
||
技巧 / Trick:
|
||
max Sharpe ≡ min -Sharpe (把最大化转为最小化 / flip to minimization)
|
||
"""
|
||
n = len(mu)
|
||
bnd = (-0.3, 1.0) if allow_short else (0.0, 1.0)
|
||
bounds = [bnd] * n
|
||
|
||
def neg_sharpe(w):
|
||
r, v, sr = portfolio_stats(w, mu, cov, rf)
|
||
return -sr
|
||
|
||
result = _base_optimize(neg_sharpe, n, bounds)
|
||
return pd.Series(result.x, index=cov.index)
|
||
|
||
|
||
def efficient_portfolio(target_return, mu, cov):
|
||
"""
|
||
目标收益下的最小方差组合(有效前沿上的一点)
|
||
Minimum-variance portfolio for a given target return (a point on efficient frontier).
|
||
|
||
问题形式 / Optimization problem:
|
||
minimize w'Σw
|
||
subject to w'μ = target_return (收益约束 / return constraint)
|
||
Σw_i = 1
|
||
w_i ≥ 0
|
||
"""
|
||
n = len(mu)
|
||
bnd = (0.0, 1.0)
|
||
result = _base_optimize(
|
||
lambda w: w @ cov.values @ w, n,
|
||
bounds=[bnd] * n,
|
||
extra_constraints=[
|
||
{'type': 'eq', 'fun': lambda w: float(w @ mu.values) - target_return}
|
||
]
|
||
)
|
||
return pd.Series(result.x, index=cov.index) if result.success else None
|
||
|
||
|
||
# ══════════════════════════════════════════════════════════════════════
|
||
# §3 有效前沿
|
||
# Efficient Frontier
|
||
# ══════════════════════════════════════════════════════════════════════
|
||
#
|
||
# 有效前沿 (Efficient Frontier) 是所有"有效组合"的集合:
|
||
# → 在相同风险(波动率)下,收益最高
|
||
# → 在相同收益下,风险最低
|
||
#
|
||
# 有效前沿由两点确定:
|
||
# 左端点:最小方差组合 (Global Minimum Variance Portfolio, GMV)
|
||
# 右端点:预期收益最高的单只股票(极端集中)
|
||
|
||
print(f"\n[§3] 计算有效前沿 / Computing Efficient Frontier...")
|
||
|
||
# ── 计算关键节点组合 / Compute key special portfolios ──
|
||
w_minvar = min_variance(mu_opt, cov_opt)
|
||
w_maxsharpe = max_sharpe(mu_opt, cov_opt, rf=RF)
|
||
w_eq = pd.Series(np.ones(N_STOCKS) / N_STOCKS, index=stock_names)
|
||
|
||
ret_min, vol_min, sr_min = portfolio_stats(w_minvar, mu_opt, cov_opt)
|
||
ret_msr, vol_msr, sr_msr = portfolio_stats(w_maxsharpe, mu_opt, cov_opt)
|
||
ret_eq, vol_eq, sr_eq = portfolio_stats(w_eq, mu_opt, cov_opt)
|
||
|
||
# ── 扫描有效前沿 / Trace the efficient frontier ──
|
||
n_pts = 60
|
||
ret_lo = ret_min * 0.99
|
||
ret_hi = mu_opt.max() * 0.88
|
||
frontier_rets = []
|
||
frontier_vols = []
|
||
|
||
for tgt in np.linspace(ret_lo, ret_hi, n_pts):
|
||
w_eff = efficient_portfolio(tgt, mu_opt, cov_opt)
|
||
if w_eff is not None:
|
||
r, v, _ = portfolio_stats(w_eff, mu_opt, cov_opt)
|
||
frontier_rets.append(r)
|
||
frontier_vols.append(v)
|
||
|
||
# ── Monte Carlo 随机组合云(背景参照)/ Random portfolio cloud (background reference) ──
|
||
N_RANDOM = 3000
|
||
rand_rets = []
|
||
rand_vols = []
|
||
rand_sharpes = []
|
||
|
||
for _ in range(N_RANDOM):
|
||
w_r = np.random.dirichlet(np.ones(N_STOCKS)) # 满足 w≥0, Σw=1 的随机权重
|
||
r, v, sr = portfolio_stats(w_r, mu_opt, cov_opt)
|
||
rand_rets.append(r)
|
||
rand_vols.append(v)
|
||
rand_sharpes.append(sr)
|
||
|
||
print(f" 有效前沿点数 (Frontier points): {len(frontier_rets)}")
|
||
print(f" 最小方差组合 (MinVar): 收益={ret_min:.1%}, 波动={vol_min:.1%}, 夏普={sr_min:.2f}")
|
||
print(f" 最大夏普组合 (MaxSharpe): 收益={ret_msr:.1%}, 波动={vol_msr:.1%}, 夏普={sr_msr:.2f}")
|
||
|
||
|
||
# ══════════════════════════════════════════════════════════════════════
|
||
# §4 风险平价
|
||
# Risk Parity
|
||
# ══════════════════════════════════════════════════════════════════════
|
||
#
|
||
# 核心思想:每只资产对组合总风险的贡献相等
|
||
# Core idea: each asset contributes equally to total portfolio risk.
|
||
#
|
||
# 风险贡献 (Risk Contribution, RC) 的定义:
|
||
#
|
||
# RC_i = w_i × ∂σ_p/∂w_i = w_i × (Σw)_i / σ_p
|
||
# ↑ 边际风险贡献 (Marginal Risk Contribution, MRC)
|
||
#
|
||
# 等风险贡献条件 (Equal Risk Contribution):
|
||
# RC_i = σ_p / N ←→ w_i × (Σw)_i = w_j × (Σw)_j for all i, j
|
||
#
|
||
# 优化形式(最小化各股风险贡献比例与目标 1/N 的偏差平方和):
|
||
# minimize Σ_i (RC_i/σ_p − 1/N)²
|
||
|
||
print(f"\n[§4] 计算风险平价组合 / Risk Parity Portfolio...")
|
||
|
||
|
||
def risk_contributions(weights, cov):
|
||
"""
|
||
计算每只资产的绝对风险贡献和风险贡献占比
|
||
Compute absolute risk contribution and risk contribution fraction for each asset.
|
||
|
||
Returns:
|
||
rc : 绝对风险贡献向量 (Absolute Risk Contribution vector), shape (N,)
|
||
sigma_p : 组合总波动率 (Total portfolio volatility)
|
||
"""
|
||
w = np.asarray(weights)
|
||
sigma_p = np.sqrt(float(w @ cov.values @ w)) # 组合波动率 (Portfolio vol)
|
||
mrc = cov.values @ w / sigma_p # 边际风险贡献 (MRC = ∂σ/∂w)
|
||
rc = w * mrc # 绝对风险贡献 (RC = w × MRC)
|
||
return rc, sigma_p
|
||
|
||
|
||
def risk_parity(cov):
|
||
"""
|
||
等风险贡献组合 (Equal Risk Contribution Portfolio)
|
||
|
||
约束:w_i ≥ 0 (严格多头,做空会有负的风险贡献,没有意义)
|
||
Strictly long-only: short positions would give negative RC (meaningless).
|
||
"""
|
||
N = cov.shape[0]
|
||
tgt = 1.0 / N # 目标风险贡献占比 = 1/N
|
||
|
||
def objective(w):
|
||
rc, sigma_p = risk_contributions(w, cov)
|
||
rc_pct = rc / sigma_p # 风险贡献占比 (RC fraction)
|
||
return np.sum((rc_pct - tgt) ** 2)
|
||
|
||
result = _base_optimize(
|
||
objective, N,
|
||
bounds=[(1e-6, 1.0)] * N # 严格正数(不允许做空)
|
||
)
|
||
return pd.Series(result.x, index=cov.index)
|
||
|
||
|
||
w_rp = risk_parity(cov_opt)
|
||
rc_rp, sigma_rp = risk_contributions(w_rp, cov_opt)
|
||
ret_rp, vol_rp, sr_rp = portfolio_stats(w_rp, mu_opt, cov_opt)
|
||
|
||
rc_pct_rp = rc_rp / sigma_rp
|
||
max_rc_dev = np.max(np.abs(rc_pct_rp - 1.0 / N_STOCKS)) # 与目标的最大偏差
|
||
|
||
print(f" 风险平价组合: 收益={ret_rp:.1%}, 波动={vol_rp:.1%}, 夏普={sr_rp:.2f}")
|
||
print(f" 风险贡献最大偏差 (Max RC deviation from 1/N): {max_rc_dev:.5f}")
|
||
print(f" → 接近 0 表示各股风险贡献几乎相等 / ≈0 means near-perfect equal RC")
|
||
|
||
|
||
# ══════════════════════════════════════════════════════════════════════
|
||
# §5 Black-Litterman 模型
|
||
# Black-Litterman Model
|
||
# ══════════════════════════════════════════════════════════════════════
|
||
#
|
||
# 马科维兹优化的两大痛点 / Two pain points of Markowitz optimization:
|
||
# ① 预期收益 μ 难以估计,哪怕小幅误差也会导致权重大幅波动
|
||
# Expected returns are noisy; small errors cause wild weight swings.
|
||
# ② 结果过度集中("角点解"),实际不可用
|
||
# Results are over-concentrated "corner solutions".
|
||
#
|
||
# Black-Litterman (1990) 的贝叶斯框架 / Bayesian framework:
|
||
# ① 先验 (Prior): 从市场均衡推导隐含收益(CAPM 反向优化)
|
||
# Market-implied returns from reverse-optimizing CAPM.
|
||
# ② 观点 (Views): 投资者对某些股票/组合的主观预期
|
||
# Investor's subjective views on specific stocks/portfolios.
|
||
# ③ 后验 (Posterior): 贝叶斯更新,先验与观点的加权混合
|
||
# Bayesian posterior blending prior and views.
|
||
#
|
||
# 后验均值公式 / Posterior mean formula (He & Litterman, 1999):
|
||
#
|
||
# μ_BL = [(τΣ)⁻¹ + P'Ω⁻¹P]⁻¹ × [(τΣ)⁻¹Π + P'Ω⁻¹Q]
|
||
# ───────────────────── ──────────────────────
|
||
# 精度矩阵之和 加权均值向量
|
||
# Sum of precision Weighted mean vector
|
||
#
|
||
# Π = δ × Σ × w_mkt ← 市场均衡隐含超额收益 (Market equilibrium implied returns)
|
||
# δ = 风险厌恶系数 (Risk aversion coefficient)
|
||
# τ = 先验收缩参数 (Prior scaling factor), 通常 0.025~0.10
|
||
# P = 观点矩阵 K×N (View matrix, K views, N assets)
|
||
# Q = 观点收益向量 K×1 (View expected excess returns)
|
||
# Ω = 观点不确定性矩阵 K×K (View uncertainty / confidence matrix)
|
||
|
||
print(f"\n[§5] Black-Litterman 模型 / Black-Litterman Model...")
|
||
|
||
DELTA = 2.5 # 风险厌恶系数 (Risk aversion), 典型值 2.0~3.0
|
||
TAU = 0.05 # 先验收缩参数 (Prior scaling), 典型值 0.025~0.10
|
||
|
||
# ─── Step 1: 计算市场均衡隐含超额收益 ───
|
||
# Market equilibrium implied excess returns (Reverse Optimization of CAPM)
|
||
# Π = δ × Σ × w_mkt
|
||
w_mkt = mktcap_weights.values
|
||
Pi = DELTA * cov_opt.values @ w_mkt
|
||
Pi_s = pd.Series(Pi, index=stock_names)
|
||
|
||
# ─── Step 2: 设定投资者观点 (Investor Views) ───
|
||
#
|
||
# 观点1 (View 1): "S01 的年化超额收益率将达到 +5%"(单资产绝对观点)
|
||
# "S01 will earn +5% excess return per year" (absolute view)
|
||
#
|
||
# 观点2 (View 2): "S00 将比 S04 多赚 3%"(两资产相对观点)
|
||
# "S00 will outperform S04 by 3%" (relative view)
|
||
|
||
K = 2 # 观点数量 (Number of views)
|
||
|
||
# P 矩阵:K × N;P[k, i] = 资产 i 在观点 k 中的权重(正=多头,负=空头)
|
||
P = np.zeros((K, N_STOCKS))
|
||
P[0, 1] = 1.0 # 观点1:仅 S01
|
||
P[1, 0] = 1.0 # 观点2:S00 多头
|
||
P[1, 4] = -1.0 # S04 空头(相对观点)
|
||
|
||
# Q 向量:观点的预期超额收益 (View expected excess returns)
|
||
Q = np.array([0.05, 0.03])
|
||
|
||
# Ω 矩阵:观点的不确定性(He & Litterman 建议的设置方式)
|
||
# Ω = τ × diag(P Σ P'),对角元素越大表示该观点越不确定
|
||
Omega = np.diag(TAU * np.diag(P @ cov_opt.values @ P.T))
|
||
Omega_inv = np.linalg.inv(Omega)
|
||
|
||
# ─── Step 3: 贝叶斯更新,计算后验预期超额收益 ───
|
||
# Bayesian posterior expected excess returns
|
||
tau_cov_inv = np.linalg.inv(TAU * cov_opt.values)
|
||
A = tau_cov_inv + P.T @ Omega_inv @ P # 精度矩阵之和
|
||
b = tau_cov_inv @ Pi + P.T @ Omega_inv @ Q
|
||
|
||
mu_bl = np.linalg.solve(A, b) + RF # 后验 = 超额 + 无风险利率
|
||
mu_bl_s = pd.Series(mu_bl, index=stock_names)
|
||
|
||
# ─── Step 4: 用后验收益做最大夏普优化 ───
|
||
w_bl = max_sharpe(mu_bl_s, cov_opt, rf=RF)
|
||
ret_bl, vol_bl, sr_bl = portfolio_stats(w_bl, mu_bl_s, cov_opt)
|
||
|
||
print(f" 市场均衡隐含收益范围: {Pi_s.min():.1%} ~ {Pi_s.max():.1%}")
|
||
print(f" BL 后验收益范围: {mu_bl_s.min():.1%} ~ {mu_bl_s.max():.1%}")
|
||
print(f" BL 组合: 收益={ret_bl:.1%}, 波动={vol_bl:.1%}, 夏普={sr_bl:.2f}")
|
||
print(f" 观点受益股(S01, S00)权重 BL: {w_bl['S01']:.1%}, {w_bl['S00']:.1%}")
|
||
print(f" 观点受益股(S01, S00)权重 等权: {w_eq['S01']:.1%}, {w_eq['S00']:.1%}")
|
||
|
||
|
||
# ══════════════════════════════════════════════════════════════════════
|
||
# §6 带约束的组合优化
|
||
# Constrained Portfolio Optimization
|
||
# ══════════════════════════════════════════════════════════════════════
|
||
#
|
||
# 实际组合管理中必须满足多重约束 / Real portfolios need multiple constraints:
|
||
# ① 单只股票权重上限(集中度管理)/ Max individual weight (concentration limit)
|
||
# ② 行业权重区间(风格/行业暴露管理)/ Sector weight bounds (style control)
|
||
# ③ 换手率约束(控制交易成本)/ Turnover constraint (transaction cost control)
|
||
#
|
||
# 换手率 (Turnover) 的定义:
|
||
# Turnover = Σ_i |w_new,i - w_old,i| / 2 (双边换手率,除以 2 避免重复计算)
|
||
# 或简化为 L1 距离:Σ_i |w_new,i - w_old,i|
|
||
|
||
print(f"\n[§6] 带约束的最大夏普组合 / Constrained Max-Sharpe Portfolio...")
|
||
|
||
MAX_STOCK_WT = 0.15 # 单只股票最大 15% / Max 15% per stock
|
||
MAX_SECTOR_WT = 0.35 # 单行业最大 35% / Max 35% per sector
|
||
MIN_SECTOR_WT = 0.10 # 单行业最小 10% / Min 10% per sector
|
||
MAX_TURNOVER = 0.50 # 最大换手率 50% / Max 50% one-way turnover
|
||
|
||
w_prev = np.ones(N_STOCKS) / N_STOCKS # 上期持仓(假设为等权)
|
||
|
||
n = N_STOCKS
|
||
w0 = np.ones(n) / n
|
||
|
||
extra_cons = []
|
||
|
||
# 约束1: 行业权重上下限
|
||
for sname, sidx in SECTORS.items():
|
||
def make_lb(idx): return lambda w: np.sum(w[idx]) - MIN_SECTOR_WT
|
||
def make_ub(idx): return lambda w: MAX_SECTOR_WT - np.sum(w[idx])
|
||
extra_cons.append({'type': 'ineq', 'fun': make_lb(sidx)}) # ≥ 10%
|
||
extra_cons.append({'type': 'ineq', 'fun': make_ub(sidx)}) # ≤ 35%
|
||
|
||
# 约束2: 换手率约束(L1 距离 ≤ 50%)
|
||
extra_cons.append({'type': 'ineq',
|
||
'fun': lambda w: MAX_TURNOVER - np.sum(np.abs(w - w_prev))})
|
||
|
||
result_c = _base_optimize(
|
||
lambda w: -portfolio_stats(w, mu_opt, cov_opt, RF)[2], # 最小化负夏普
|
||
n,
|
||
bounds=[(0.0, MAX_STOCK_WT)] * n,
|
||
extra_constraints=extra_cons
|
||
)
|
||
w_constrained = pd.Series(result_c.x, index=stock_names)
|
||
ret_c, vol_c, sr_c = portfolio_stats(w_constrained, mu_opt, cov_opt)
|
||
|
||
print(f" 带约束最大夏普: 收益={ret_c:.1%}, 波动={vol_c:.1%}, 夏普={sr_c:.2f}")
|
||
print(f" 最大单股权重: {w_constrained.max():.1%} (约束 ≤{MAX_STOCK_WT:.0%})")
|
||
for sname, sidx in SECTORS.items():
|
||
sw = w_constrained.iloc[sidx].sum()
|
||
ok = "✓" if MIN_SECTOR_WT <= sw <= MAX_SECTOR_WT else "✗"
|
||
print(f" {ok} {sname}: {sw:.1%}")
|
||
|
||
|
||
# ══════════════════════════════════════════════════════════════════════
|
||
# §7 滚动回测:五策略对比
|
||
# Rolling Backtest: 5-Strategy Comparison
|
||
# ══════════════════════════════════════════════════════════════════════
|
||
#
|
||
# 流程 / Workflow:
|
||
# 每个调仓日 T(每月一次):
|
||
# ① 用过去 LOOKBACK 天估计 μ 和 Σ
|
||
# ② 用当前估计值重新计算各策略权重
|
||
# ③ 用新权重持有到下次调仓日,每日记录组合收益
|
||
#
|
||
# 五种策略 / 5 Strategies:
|
||
# EW : 等权(不需要优化)
|
||
# MinVar : 最小方差(只需 Σ,不需要 μ)
|
||
# MaxSharpe : 最大夏普(需要 μ 和 Σ)
|
||
# RiskParity : 风险平价(只需 Σ)
|
||
# BL : Black-Litterman(贝叶斯混合)
|
||
|
||
print(f"\n[§7] 滚动回测 / Rolling Backtest...")
|
||
|
||
LOOKBACK = 126 # 协方差回看期(约 6 个月)/ 6-month estimation window
|
||
REBAL_FREQ = 21 # 调仓频率(约 1 个月)/ Monthly rebalancing
|
||
|
||
STRATEGY_NAMES = [
|
||
"等权 EW",
|
||
"最小方差 MinVar",
|
||
"最大夏普 MaxSharpe",
|
||
"风险平价 RP",
|
||
"Black-Litterman BL",
|
||
]
|
||
|
||
# 初始化:回看期内用 0 填充(尚未开始实际持仓)
|
||
daily_pnl = {name: [0.0] * LOOKBACK for name in STRATEGY_NAMES}
|
||
|
||
# 当前持仓权重(初始为等权)
|
||
cur_w = {name: np.ones(N_STOCKS) / N_STOCKS for name in STRATEGY_NAMES}
|
||
|
||
rebal_idx = list(range(LOOKBACK, N_DAYS, REBAL_FREQ))
|
||
|
||
for t_pos, t in enumerate(rebal_idx):
|
||
# 取过去 LOOKBACK 天的历史收益率
|
||
hist = ret_df.iloc[t - LOOKBACK: t]
|
||
|
||
# 估计滚动协方差矩阵(加微小正则化保证正定性)
|
||
# Rolling covariance (add tiny ridge term to ensure positive definiteness)
|
||
cov_r_arr = hist.cov().values * FREQ + 1e-7 * np.eye(N_STOCKS)
|
||
mu_r = hist.mean() * FREQ
|
||
cov_r = pd.DataFrame(cov_r_arr, index=stock_names, columns=stock_names)
|
||
|
||
try:
|
||
new_w = {}
|
||
new_w["等权 EW"] = np.ones(N_STOCKS) / N_STOCKS
|
||
new_w["最小方差 MinVar"] = min_variance(mu_r, cov_r).values
|
||
new_w["最大夏普 MaxSharpe"] = max_sharpe(mu_r, cov_r, rf=RF).values
|
||
new_w["风险平价 RP"] = risk_parity(cov_r).values
|
||
|
||
# Black-Litterman 滚动版本:使用滚动先验 + 固定观点
|
||
Pi_r = DELTA * cov_r_arr @ mktcap_weights.values
|
||
try:
|
||
tci = np.linalg.inv(TAU * cov_r_arr)
|
||
A_r = tci + P.T @ Omega_inv @ P
|
||
b_r = tci @ Pi_r + P.T @ Omega_inv @ Q
|
||
mu_bl_r = np.linalg.solve(A_r, b_r) + RF
|
||
except np.linalg.LinAlgError:
|
||
mu_bl_r = mu_r.values
|
||
new_w["Black-Litterman BL"] = max_sharpe(
|
||
pd.Series(mu_bl_r, index=stock_names), cov_r, rf=RF
|
||
).values
|
||
|
||
cur_w = new_w
|
||
|
||
except Exception:
|
||
pass # 优化失败时保持上期权重 / Keep previous weights if optimization fails
|
||
|
||
# 计算持有期内每日组合收益
|
||
next_t = rebal_idx[t_pos + 1] if t_pos + 1 < len(rebal_idx) else N_DAYS
|
||
for name in STRATEGY_NAMES:
|
||
w = cur_w[name]
|
||
for d in range(t, next_t):
|
||
daily_pnl[name].append(float(w @ ret_df.iloc[d].values))
|
||
|
||
# 截齐到 N_DAYS(防止因最后一段超出)
|
||
for name in STRATEGY_NAMES:
|
||
daily_pnl[name] = daily_pnl[name][:N_DAYS]
|
||
|
||
# ── 计算净值曲线 (NAV Curves) ──
|
||
nav_df = pd.DataFrame(
|
||
{name: (1 + pd.Series(daily_pnl[name], index=dates)).cumprod()
|
||
for name in STRATEGY_NAMES}
|
||
)
|
||
|
||
# ── 计算绩效指标 (Performance Metrics) ──
|
||
perf_rows = []
|
||
for name in STRATEGY_NAMES:
|
||
r_s = pd.Series(daily_pnl[name][LOOKBACK:], index=dates[LOOKBACK:])
|
||
ann_ret = (1 + r_s).prod() ** (FREQ / len(r_s)) - 1 # 年化收益率 (Ann. return)
|
||
ann_vol = r_s.std() * np.sqrt(FREQ) # 年化波动率 (Ann. vol)
|
||
sharpe = (ann_ret - RF) / ann_vol if ann_vol > 0 else 0
|
||
nav_s = (1 + r_s).cumprod()
|
||
max_dd = (nav_s / nav_s.cummax() - 1).min() # 最大回撤 (Max drawdown)
|
||
calmar = ann_ret / (-max_dd) if max_dd < 0 else 99.0 # Calmar 比率
|
||
perf_rows.append({
|
||
"策略": name, "年化收益": ann_ret, "年化波动": ann_vol,
|
||
"夏普比率": sharpe, "最大回撤": max_dd, "Calmar": calmar
|
||
})
|
||
|
||
perf_df = pd.DataFrame(perf_rows).set_index("策略")
|
||
|
||
print(f"\n 策略对比 / Strategy Comparison:")
|
||
print(f" {'策略':<28} {'年化收益':>8} {'年化波动':>8} {'夏普':>7} {'最大回撤':>9} {'Calmar':>8}")
|
||
print(f" {'─' * 65}")
|
||
for name, row in perf_df.iterrows():
|
||
print(f" {name:<28} {row['年化收益']:>8.1%} {row['年化波动']:>8.1%} "
|
||
f"{row['夏普比率']:>7.3f} {row['最大回撤']:>9.1%} {min(row['Calmar'], 99.0):>8.2f}")
|
||
|
||
|
||
# ══════════════════════════════════════════════════════════════════════
|
||
# §8 可视化:9 图综合面板
|
||
# 9-Panel Visualization Dashboard
|
||
# ══════════════════════════════════════════════════════════════════════
|
||
|
||
print(f"\n[§8] 生成可视化图表 / Generating visualization...")
|
||
|
||
# 颜色配置 / Color palette
|
||
COLORS = {
|
||
"等权 EW": "#888888",
|
||
"最小方差 MinVar": "#2196F3",
|
||
"最大夏普 MaxSharpe": "#FF5722",
|
||
"风险平价 RP": "#4CAF50",
|
||
"Black-Litterman BL": "#9C27B0",
|
||
}
|
||
PCT_FMT = FuncFormatter(lambda x, _: f"{x:.0%}")
|
||
PCT1_FMT = FuncFormatter(lambda x, _: f"{x:.1%}")
|
||
DARK_BG = '#1A1D27'
|
||
LT = '#E0E0E0' # 浅色文字 (Light text)
|
||
GRID_C = '#2A2D3A' # 网格颜色 (Grid color)
|
||
|
||
fig = plt.figure(figsize=(22, 22), facecolor='#0F1117')
|
||
fig.suptitle(
|
||
"量化组合优化演示 / Portfolio Optimization Demo",
|
||
fontsize=18, color='white', y=0.98, fontweight='bold'
|
||
)
|
||
gs = gridspec.GridSpec(3, 3, figure=fig, hspace=0.42, wspace=0.32)
|
||
|
||
ax1 = fig.add_subplot(gs[0, 0]) # 有效前沿 Efficient Frontier
|
||
ax2 = fig.add_subplot(gs[0, 1]) # 组合权重堆叠 Portfolio Weights
|
||
ax3 = fig.add_subplot(gs[0, 2]) # 行业权重对比 Sector Weights
|
||
ax4 = fig.add_subplot(gs[1, 0]) # 净值曲线 NAV Curves
|
||
ax5 = fig.add_subplot(gs[1, 1]) # 风险贡献 Risk Contribution (RP)
|
||
ax6 = fig.add_subplot(gs[1, 2]) # 滚动夏普 Rolling Sharpe
|
||
ax7 = fig.add_subplot(gs[2, 0]) # 月度收益箱形图 Monthly Return Box
|
||
ax8 = fig.add_subplot(gs[2, 1]) # 相关系数热力图 Correlation Heatmap
|
||
ax9 = fig.add_subplot(gs[2, 2]) # 绩效汇总表 Performance Table
|
||
|
||
for ax in [ax1, ax2, ax3, ax4, ax5, ax6, ax7, ax8, ax9]:
|
||
ax.set_facecolor(DARK_BG)
|
||
ax.tick_params(colors=LT, labelsize=8)
|
||
for sp in ax.spines.values():
|
||
sp.set_edgecolor(GRID_C)
|
||
|
||
# ── 图1:有效前沿 + 随机组合云 + 关键节点 ──
|
||
# Efficient Frontier + Monte Carlo cloud + key portfolios
|
||
ax1.set_title("有效前沿 Efficient Frontier", color=LT, fontsize=10, pad=6)
|
||
ax1.scatter(rand_vols, rand_rets,
|
||
c=rand_sharpes, cmap='RdYlGn', vmin=0, vmax=2.5,
|
||
alpha=0.25, s=6, zorder=1)
|
||
ax1.plot(frontier_vols, frontier_rets, '-', color='#FFD700', lw=2.2, zorder=4,
|
||
label='有效前沿')
|
||
# 资本市场线 (Capital Market Line, CML) — 从无风险利率到最大夏普组合并延伸
|
||
cml_x = np.linspace(0, vol_msr * 1.4, 50)
|
||
cml_y = RF + sr_msr * cml_x
|
||
ax1.plot(cml_x, cml_y, '--', color='#FFEB3B', lw=1.0, alpha=0.7,
|
||
zorder=3, label='资本市场线 CML')
|
||
# 关键组合点
|
||
specials = [
|
||
("最小方差\nMinVar", vol_min, ret_min, COLORS["最小方差 MinVar"]),
|
||
("最大夏普\nMaxSharpe",vol_msr, ret_msr, COLORS["最大夏普 MaxSharpe"]),
|
||
("等权\nEW", vol_eq, ret_eq, COLORS["等权 EW"]),
|
||
("风险平价\nRP", vol_rp, ret_rp, COLORS["风险平价 RP"]),
|
||
("BL", vol_bl, ret_bl, COLORS["Black-Litterman BL"]),
|
||
]
|
||
for label, v, r, c in specials:
|
||
ax1.scatter(v, r, s=100, color=c, zorder=6, edgecolors='white', lw=0.8)
|
||
ax1.annotate(label, (v, r), color=LT, fontsize=6,
|
||
xytext=(5, 3), textcoords='offset points')
|
||
# 无风险利率点 (Risk-free rate point)
|
||
ax1.scatter(0, RF, s=80, color='yellow', zorder=6, marker='*')
|
||
ax1.annotate(f"无风险\nRf={RF:.0%}", (0, RF), color='yellow', fontsize=6,
|
||
xytext=(4, 2), textcoords='offset points')
|
||
ax1.xaxis.set_major_formatter(PCT_FMT)
|
||
ax1.yaxis.set_major_formatter(PCT_FMT)
|
||
ax1.set_xlabel("年化波动率 Volatility", color=LT, fontsize=8)
|
||
ax1.set_ylabel("年化收益率 Return", color=LT, fontsize=8)
|
||
ax1.legend(fontsize=7, labelcolor=LT, facecolor=DARK_BG, edgecolor=GRID_C)
|
||
ax1.grid(color=GRID_C, alpha=0.5)
|
||
|
||
# ── 图2:各策略权重堆叠柱状图 ──
|
||
# Stacked bar chart of portfolio weights
|
||
ax2.set_title("组合权重 Portfolio Weights", color=LT, fontsize=10, pad=6)
|
||
weight_order = ["EW", "MinVar", "MaxSharpe", "RP", "BL"]
|
||
weight_arrays = [w_eq.values, w_minvar.values, w_maxsharpe.values, w_rp.values, w_bl.values]
|
||
bar_colors = plt.cm.tab20(np.linspace(0, 1, N_STOCKS))
|
||
bottoms = np.zeros(len(weight_order))
|
||
x_pos = np.arange(len(weight_order))
|
||
for i in range(N_STOCKS):
|
||
heights = [wa[i] for wa in weight_arrays]
|
||
ax2.bar(x_pos, heights, bottom=bottoms, color=bar_colors[i], width=0.72)
|
||
bottoms += heights
|
||
ax2.set_xticks(x_pos)
|
||
ax2.set_xticklabels(weight_order, color=LT, fontsize=9)
|
||
ax2.yaxis.set_major_formatter(PCT_FMT)
|
||
ax2.set_ylabel("权重 Weight", color=LT, fontsize=8)
|
||
ax2.grid(axis='y', color=GRID_C, alpha=0.5)
|
||
|
||
# ── 图3:行业权重对比(分组柱状图)──
|
||
# Grouped bar chart: sector weights per strategy
|
||
ax3.set_title("行业权重对比 Sector Weights", color=LT, fontsize=10, pad=6)
|
||
x_sec = np.arange(N_SECTORS)
|
||
bw = 0.15
|
||
strat_colors_list = list(COLORS.values())
|
||
for k, (wname, warr) in enumerate(zip(weight_order, weight_arrays)):
|
||
sec_wts = [np.sum(warr[SECTORS[s]]) for s in SECTOR_NAMES]
|
||
off = (k - len(weight_order) / 2 + 0.5) * bw
|
||
ax3.bar(x_sec + off, sec_wts, width=bw,
|
||
color=strat_colors_list[k], label=wname, alpha=0.85)
|
||
ax3.set_xticks(x_sec)
|
||
ax3.set_xticklabels(SECTOR_SHORT, color=LT, fontsize=8)
|
||
ax3.yaxis.set_major_formatter(PCT_FMT)
|
||
ax3.axhline(MAX_SECTOR_WT, ls='--', color='#FF7043', lw=0.8, alpha=0.6,
|
||
label=f'上限 {MAX_SECTOR_WT:.0%}')
|
||
ax3.legend(fontsize=6, labelcolor=LT, facecolor=DARK_BG, edgecolor=GRID_C,
|
||
ncol=2, loc='upper right')
|
||
ax3.grid(axis='y', color=GRID_C, alpha=0.5)
|
||
|
||
# ── 图4:净值曲线 ──
|
||
ax4.set_title("净值曲线 NAV Curves", color=LT, fontsize=10, pad=6)
|
||
for name in STRATEGY_NAMES:
|
||
is_key = "MaxSharpe" in name or "BL" in name
|
||
ax4.plot(nav_df.index, nav_df[name],
|
||
color=COLORS[name], lw=2.0 if is_key else 1.2,
|
||
alpha=1.0 if is_key else 0.7,
|
||
label=name.split(" ")[0])
|
||
ax4.set_ylabel("净值 NAV", color=LT, fontsize=8)
|
||
ax4.legend(fontsize=7, labelcolor=LT, facecolor=DARK_BG,
|
||
edgecolor=GRID_C, ncol=2)
|
||
ax4.grid(color=GRID_C, alpha=0.5)
|
||
ax4.xaxis.set_tick_params(rotation=20)
|
||
|
||
# ── 图5:风险平价组合的风险贡献分解 ──
|
||
# Risk contribution breakdown for Risk Parity portfolio
|
||
ax5.set_title("风险贡献分布 Risk Contribution (RP)", color=LT, fontsize=10, pad=6)
|
||
rc_sorted = pd.Series(rc_pct_rp * 100, index=stock_names).sort_values(ascending=False)
|
||
bar_c = ['#4CAF50' if abs(v / 100 - 1.0 / N_STOCKS) < 5e-3
|
||
else '#FF7043' for v in rc_sorted]
|
||
ax5.bar(range(N_STOCKS), rc_sorted.values, color=bar_c, alpha=0.85)
|
||
ax5.axhline(100.0 / N_STOCKS, color='white', lw=1.2, ls='--',
|
||
label=f'目标 1/N={100/N_STOCKS:.1f}%')
|
||
ax5.set_xticks(range(N_STOCKS))
|
||
ax5.set_xticklabels(rc_sorted.index, rotation=45, fontsize=6, color=LT)
|
||
ax5.set_ylabel("风险贡献 RC%", color=LT, fontsize=8)
|
||
ax5.legend(fontsize=7, labelcolor=LT, facecolor=DARK_BG, edgecolor=GRID_C)
|
||
ax5.grid(axis='y', color=GRID_C, alpha=0.5)
|
||
|
||
# ── 图6:滚动夏普比率(63 日窗口)──
|
||
# Rolling Sharpe ratio (63-day window)
|
||
ax6.set_title("滚动夏普比率 Rolling Sharpe (63D)", color=LT, fontsize=10, pad=6)
|
||
ROLL_WIN = 63
|
||
for name in STRATEGY_NAMES:
|
||
r_s = pd.Series(daily_pnl[name], index=dates)
|
||
roll_sr = r_s.rolling(ROLL_WIN).apply(
|
||
lambda x: (x.mean() * FREQ - RF) / (x.std() * np.sqrt(FREQ) + 1e-10)
|
||
)
|
||
ax6.plot(dates, roll_sr, color=COLORS[name], lw=1.0,
|
||
alpha=0.85, label=name.split(" ")[0])
|
||
ax6.axhline(0, color='white', lw=0.7, ls='--')
|
||
ax6.set_ylabel("夏普比率 Sharpe", color=LT, fontsize=8)
|
||
ax6.legend(fontsize=7, labelcolor=LT, facecolor=DARK_BG,
|
||
edgecolor=GRID_C, ncol=2)
|
||
ax6.grid(color=GRID_C, alpha=0.5)
|
||
ax6.xaxis.set_tick_params(rotation=20)
|
||
|
||
# ── 图7:月度收益分布箱形图 ──
|
||
# Monthly return distribution boxplot
|
||
ax7.set_title("月度收益分布 Monthly Return Distribution", color=LT, fontsize=10, pad=6)
|
||
monthly_rets = {}
|
||
for name in STRATEGY_NAMES:
|
||
r_s = pd.Series(daily_pnl[name], index=dates)
|
||
monthly_rets[name.split(" ")[0]] = r_s.resample('M').apply(
|
||
lambda x: (1 + x).prod() - 1
|
||
).values
|
||
|
||
bp = ax7.boxplot(
|
||
list(monthly_rets.values()),
|
||
patch_artist=True,
|
||
labels=list(monthly_rets.keys()),
|
||
medianprops=dict(color='white', lw=1.5),
|
||
whiskerprops=dict(color=LT),
|
||
capprops=dict(color=LT),
|
||
flierprops=dict(marker='.', color='#888888', ms=3)
|
||
)
|
||
for patch, c in zip(bp['boxes'], list(COLORS.values())):
|
||
patch.set_facecolor(c)
|
||
patch.set_alpha(0.72)
|
||
ax7.yaxis.set_major_formatter(PCT1_FMT)
|
||
ax7.axhline(0, color='white', lw=0.6, ls='--')
|
||
ax7.set_xticklabels(list(monthly_rets.keys()), color=LT, fontsize=8, rotation=15)
|
||
ax7.grid(axis='y', color=GRID_C, alpha=0.5)
|
||
|
||
# ── 图8:收益相关系数热力图(含行业分割线)──
|
||
# Return correlation heatmap with sector dividers
|
||
ax8.set_title("收益相关系数 Return Correlation Matrix", color=LT, fontsize=10, pad=6)
|
||
corr = ret_df.corr().values
|
||
im = ax8.imshow(corr, cmap='RdYlGn', vmin=-0.2, vmax=1.0, aspect='auto')
|
||
ax8.set_xticks(range(N_STOCKS))
|
||
ax8.set_yticks(range(N_STOCKS))
|
||
ax8.set_xticklabels(stock_names, rotation=90, fontsize=5.5, color=LT)
|
||
ax8.set_yticklabels(stock_names, fontsize=5.5, color=LT)
|
||
# 行业分割线 / Sector boundary lines
|
||
sector_bounds = [0, 5, 9, 13, 17, 20]
|
||
for b in sector_bounds[1:-1]:
|
||
ax8.axhline(b - 0.5, color='white', lw=0.9, alpha=0.8)
|
||
ax8.axvline(b - 0.5, color='white', lw=0.9, alpha=0.8)
|
||
cb = plt.colorbar(im, ax=ax8, fraction=0.046, pad=0.04)
|
||
cb.ax.tick_params(colors=LT, labelsize=7)
|
||
|
||
# ── 图9:绩效汇总表 ──
|
||
# Performance summary table
|
||
ax9.axis('off')
|
||
ax9.set_title("绩效汇总 Performance Summary", color=LT, fontsize=10, pad=6)
|
||
col_labels = ["年化收益\nAnn.Ret", "年化波动\nAnn.Vol",
|
||
"夏普比率\nSharpe", "最大回撤\nMax DD", "Calmar\n比率"]
|
||
row_labels = [n.split(" ")[0] for n in STRATEGY_NAMES]
|
||
cell_data = []
|
||
for name in STRATEGY_NAMES:
|
||
r = perf_df.loc[name]
|
||
cell_data.append([
|
||
f"{r['年化收益']:+.1%}",
|
||
f"{r['年化波动']:.1%}",
|
||
f"{r['夏普比率']:.2f}",
|
||
f"{r['最大回撤']:.1%}",
|
||
f"{min(r['Calmar'], 99.0):.2f}",
|
||
])
|
||
|
||
tbl = ax9.table(cellText=cell_data, rowLabels=row_labels,
|
||
colLabels=col_labels, loc='center', cellLoc='center')
|
||
tbl.auto_set_font_size(False)
|
||
tbl.set_fontsize(8)
|
||
tbl.scale(1.12, 2.0)
|
||
for (row, col), cell in tbl.get_celld().items():
|
||
cell.set_facecolor(DARK_BG)
|
||
cell.set_edgecolor(GRID_C)
|
||
cell.set_text_props(color=LT)
|
||
if row == 0 or col == -1:
|
||
cell.set_facecolor('#2A2D3A')
|
||
cell.set_text_props(color='#FFD700', fontweight='bold')
|
||
|
||
# 保存图表 / Save figure
|
||
OUTPUT_FILE = "portfolio_optimization_demo.png"
|
||
plt.savefig(OUTPUT_FILE, dpi=150, bbox_inches='tight',
|
||
facecolor=fig.get_facecolor())
|
||
|
||
print(f" 图表已保存: {OUTPUT_FILE} / Chart saved: {OUTPUT_FILE}")
|
||
|
||
print("\n" + "=" * 70)
|
||
print(" 演示完成! Demo Complete!")
|
||
print(f" 输出: {OUTPUT_FILE}")
|
||
print("=" * 70)
|