trading/quant_portfolio_optimizatio...

950 lines
42 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
量化交易演示系列 第5篇组合优化
Quantitative Trading Demo Series - Part 5: Portfolio Optimization
涵盖内容 / Topics Covered:
§0 合成股票池与因子模型 (Synthetic Universe & Factor Model)
§1 协方差矩阵估计 (Covariance Matrix Estimation)
§2 马科维兹均值-方差优化 (Markowitz Mean-Variance Optimization)
§3 有效前沿 (Efficient Frontier)
§4 特殊组合:等权/最小方差/最大夏普 (Special Portfolios: EW / MinVar / MaxSharpe)
§5 风险平价 (Risk Parity)
§6 Black-Litterman 模型 (Black-Litterman Model)
§7 带约束的组合优化 (Constrained Portfolio Optimization)
§8 滚动回测:五策略对比 (Rolling Backtest: 5-Strategy Comparison)
§9 可视化 (9-Panel Visualization)
依赖库 / Dependencies:
numpy, pandas, scipy, matplotlib, sklearn
作者 / Author: Quant Demo Series
"""
# ── 无界面绘图模式,必须在 import pyplot 之前设置 ──
# Headless plotting mode — MUST be set before importing pyplot
import matplotlib
matplotlib.use('Agg')
import warnings
warnings.filterwarnings('ignore')
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
from matplotlib.ticker import FuncFormatter
from scipy.optimize import minimize
# ── 随机种子(确保结果可复现)/ Random seed for reproducibility ──
np.random.seed(42)
# ── 中文字体配置 / Chinese font configuration ──
plt.rcParams['font.sans-serif'] = [
'WenQuanYi Zen Hei', 'Arial Unicode MS', 'SimHei', 'DejaVu Sans'
]
plt.rcParams['axes.unicode_minus'] = False
print("=" * 70)
print(" 量化交易 组合优化演示")
print(" Quantitative Trading: Portfolio Optimization Demo")
print("=" * 70)
# ══════════════════════════════════════════════════════════════════════
# §0 合成股票池与行业因子模型
# Synthetic Universe & Sector Factor Model
# ══════════════════════════════════════════════════════════════════════
#
# 使用 Barra 风格的因子模型生成收益率:
# Returns are generated using a Barra-style factor model:
#
# r_{i,t} = β_{mkt,i} × r_{mkt,t} ← 市场因子 (Market factor)
# + Σ_k β_{sec,i,k} × f_{k,t} ← 行业因子 (Sector factors)
# + α_i ← 个股 Alpha (Stock-specific alpha)
# + ε_{i,t} ← 特质噪音 (Idiosyncratic noise)
N_STOCKS = 20 # 股票数量 (Number of stocks)
N_YEARS = 3 # 历史年数 (History in years)
FREQ = 252 # 年化因子 / 每年交易日数 (Annualization factor)
RF = 0.02 # 无风险利率 (Risk-free rate), 年化
START_DATE = "2021-01-04"
dates = pd.bdate_range(START_DATE, periods=N_YEARS * FREQ) # 工作日序列
N_DAYS = len(dates)
# ── 行业分组 / Sector assignments ──
SECTORS = {
"科技 Tech": list(range(0, 5)), # S00S04
"金融 Finance": list(range(5, 9)), # S05S08
"消费 Consumer": list(range(9, 13)), # S09S12
"工业 Industrial": list(range(13, 17)), # S13S16
"医疗 Healthcare": list(range(17, 20)), # S17S19
}
SECTOR_NAMES = list(SECTORS.keys())
SECTOR_SHORT = ["科技", "金融", "消费", "工业", "医疗"]
N_SECTORS = len(SECTORS)
stock_names = [f"S{i:02d}" for i in range(N_STOCKS)]
# ── 行业公共因子收益率 / Sector factor daily returns ──
sector_ann_ret = np.array([0.16, 0.10, 0.12, 0.08, 0.13]) # 各行业年化期望收益
sector_ann_vol = np.array([0.25, 0.18, 0.20, 0.15, 0.22]) # 各行业年化波动率
sector_factor_rets = np.zeros((N_DAYS, N_SECTORS))
for k in range(N_SECTORS):
mu_d = sector_ann_ret[k] / FREQ
sig_d = sector_ann_vol[k] / np.sqrt(FREQ)
sector_factor_rets[:, k] = np.random.normal(mu_d, sig_d, N_DAYS)
# ── 市场公共因子 / Market common factor ──
mkt_daily = np.random.normal(0.10 / FREQ, 0.18 / np.sqrt(FREQ), N_DAYS)
# ── 各股的因子载荷Beta 暴露)/ Factor loadings (Beta exposures) ──
betas_mkt = np.random.uniform(0.6, 1.4, N_STOCKS) # 市场 Beta
betas_sec = np.zeros((N_STOCKS, N_SECTORS)) # 行业 Beta
for k, (sname, sidx) in enumerate(SECTORS.items()):
for i in sidx:
betas_sec[i, k] = np.random.uniform(0.5, 1.0) # 本行业:强载荷
for k2 in range(N_SECTORS):
if k2 != k:
betas_sec[i, k2] = np.random.uniform(0.0, 0.15) # 其他行业:弱载荷
# ── 个股特质波动率 / Idiosyncratic volatility ──
idio_vols = np.random.uniform(0.12, 0.30, N_STOCKS) / np.sqrt(FREQ)
idio_rets = np.random.normal(0, idio_vols, (N_DAYS, N_STOCKS))
# ── 个股特质 Alpha / Stock-specific alpha ──
# 科技股注入正 Alpha让最大夏普组合有明显的倾向性
idio_alpha = np.zeros(N_STOCKS)
idio_alpha[:5] = np.random.uniform(0.04, 0.09, 5) / FREQ # 科技股:正 Alpha
idio_alpha[5:9] = np.random.uniform(-0.02, 0.02, 4) / FREQ # 金融股:中性
# ── 合成日收益率矩阵 / Composite daily return matrix ──
daily_ret_mat = np.zeros((N_DAYS, N_STOCKS))
for i in range(N_STOCKS):
daily_ret_mat[:, i] = (
betas_mkt[i] * mkt_daily # 市场贡献
+ betas_sec[i] @ sector_factor_rets.T # 行业贡献5 行业因子)
+ idio_alpha[i] # 个股 Alpha
+ idio_rets[:, i] # 特质噪音
)
ret_df = pd.DataFrame(daily_ret_mat, index=dates, columns=stock_names)
prices_df = (1 + ret_df).cumprod() * 100.0 # 从 100 元开始的价格序列
# ── 市值权重Black-Litterman 需要作为先验市场组合)──
# Market-cap weights as Black-Litterman equilibrium prior
mktcap_weights = pd.Series(
np.random.dirichlet(np.ones(N_STOCKS) * 2.0),
index=stock_names
)
ann_rets = (1 + ret_df).prod() ** (FREQ / N_DAYS) - 1
print(f"\n[§0] 股票池生成完成 / Universe Generated")
print(f" 股票数量 (Stocks): {N_STOCKS}")
print(f" 交易日数 (Days): {N_DAYS}{dates[0].date()} ~ {dates[-1].date()}")
print(f" 行业数量 (Sectors): {N_SECTORS}")
print(f" 个股年化收益区间: {ann_rets.min():.1%} ~ {ann_rets.max():.1%}")
# ══════════════════════════════════════════════════════════════════════
# §1 协方差矩阵估计:样本 vs Ledoit-Wolf 收缩
# Covariance Matrix Estimation: Sample vs Ledoit-Wolf Shrinkage
# ══════════════════════════════════════════════════════════════════════
#
# 协方差矩阵 (Covariance Matrix) Σ 是组合优化的核心输入。
# 样本协方差矩阵 (Sample Covariance Matrix) 存在两个问题:
# 1. 估计误差大小样本情形T/N 较小时)/ Large estimation error when T/N is small
# 2. 条件数 (Condition Number) 高,矩阵"病态",优化对估计误差非常敏感
#
# Ledoit-Wolf 收缩 (Ledoit-Wolf Shrinkage):
# Σ_shrunk = (1-α) × Σ_sample + α × F
# 其中 F 是结构化目标矩阵(如对角矩阵),α 是最优收缩系数(数据驱动自动选择)
# F is a structured target matrix; α is the optimal shrinkage coefficient.
# 样本协方差(年化) / Sample covariance (annualized)
cov_sample = ret_df.cov() * FREQ
mu_sample = ret_df.mean() * FREQ # 年化预期收益向量 (Annualized expected return vector)
# Ledoit-Wolf 收缩估计 / Ledoit-Wolf shrinkage estimation
try:
from sklearn.covariance import LedoitWolf
lw = LedoitWolf()
lw.fit(ret_df.values)
cov_shrink = pd.DataFrame(
lw.covariance_ * FREQ,
index=stock_names, columns=stock_names
)
shrink_alpha = lw.shrinkage_ # 最优收缩系数 α
USE_SHRINK = True
except ImportError:
cov_shrink = cov_sample.copy()
shrink_alpha = 0.0
USE_SHRINK = False
print(f"\n[§1] 协方差估计 / Covariance Estimation")
print(f" 样本协方差条件数 (Sample Cov Cond#): {np.linalg.cond(cov_sample.values):.1f}")
if USE_SHRINK:
print(f" Ledoit-Wolf 收缩系数 (Shrinkage α): {shrink_alpha:.4f}")
print(f" 收缩后条件数 (Shrunk Cov Cond#): {np.linalg.cond(cov_shrink.values):.1f}")
print(f" → 条件数降低意味着优化更稳健 / Lower cond# = more robust optimization")
# 正式使用的协方差矩阵(收缩版更稳健)/ Use shrunk covariance for optimization
cov_opt = cov_shrink if USE_SHRINK else cov_sample
mu_opt = mu_sample.copy()
# ══════════════════════════════════════════════════════════════════════
# §2 核心优化工具函数
# Core Optimization Utility Functions
# ══════════════════════════════════════════════════════════════════════
def portfolio_stats(weights, mu, cov, rf=RF):
"""
计算组合的三个核心绩效指标(年化)
Compute 3 key portfolio statistics (annualized).
参数 / Args:
weights : np.ndarray, 权重向量 (weight vector), shape (N,)
mu : pd.Series, 年化预期收益向量 (annualized expected returns)
cov : pd.DataFrame, 年化协方差矩阵 (annualized covariance matrix)
rf : float, 无风险利率 (risk-free rate)
返回 / Returns:
ret : 组合年化收益率 (portfolio annualized return)
vol : 组合年化波动率 (portfolio annualized volatility)
sharpe : 夏普比率 (Sharpe ratio) = (ret - rf) / vol
"""
w = np.asarray(weights)
ret = float(w @ mu) # w'μ
var = float(w @ cov.values @ w) # w'Σw组合方差
vol = np.sqrt(max(var, 1e-12)) # 组合波动率(年化)
sharpe = (ret - rf) / vol
return ret, vol, sharpe
def _base_optimize(objective, n, bounds, extra_constraints=None):
"""
通用 SLSQP 优化框架(内部使用)
Generic SLSQP optimization scaffold (internal use).
SLSQP = Sequential Least Squares Programming / 序列二次规划法
"""
w0 = np.ones(n) / n # 初始猜测:等权 (Initial guess: equal weight)
constraints = [{'type': 'eq', 'fun': lambda w: np.sum(w) - 1.0}] # 全投资约束
if extra_constraints:
constraints.extend(extra_constraints)
return minimize(
objective, w0,
method='SLSQP',
bounds=bounds,
constraints=constraints,
options={'ftol': 1e-12, 'maxiter': 1500}
)
def min_variance(mu, cov, allow_short=False):
"""
最小方差组合 (Minimum Variance Portfolio, MinVar)
这是组合优化中最稳健的组合,因为它完全不依赖对预期收益的估计。
Most robust portfolio: it does NOT require any return estimates.
问题形式 / Optimization problem:
minimize w'Σw
subject to Σw_i = 1 (全投资约束 / full investment)
w_i ≥ 0 (多头约束 / long-only, when allow_short=False)
"""
n = len(mu)
bnd = (-0.3, 1.0) if allow_short else (0.0, 1.0)
bounds = [bnd] * n
result = _base_optimize(lambda w: w @ cov.values @ w, n, bounds)
return pd.Series(result.x, index=cov.index)
def max_sharpe(mu, cov, rf=RF, allow_short=False):
"""
最大夏普比率组合 / 切线组合 (Maximum Sharpe / Tangency Portfolio)
在均值-方差空间中,从无风险资产出发到有效前沿的切线点。
Tangency point from the risk-free asset to the efficient frontier (Capital Market Line).
技巧 / Trick:
max Sharpe ≡ min -Sharpe (把最大化转为最小化 / flip to minimization
"""
n = len(mu)
bnd = (-0.3, 1.0) if allow_short else (0.0, 1.0)
bounds = [bnd] * n
def neg_sharpe(w):
r, v, sr = portfolio_stats(w, mu, cov, rf)
return -sr
result = _base_optimize(neg_sharpe, n, bounds)
return pd.Series(result.x, index=cov.index)
def efficient_portfolio(target_return, mu, cov):
"""
目标收益下的最小方差组合(有效前沿上的一点)
Minimum-variance portfolio for a given target return (a point on efficient frontier).
问题形式 / Optimization problem:
minimize w'Σw
subject to w'μ = target_return (收益约束 / return constraint)
Σw_i = 1
w_i ≥ 0
"""
n = len(mu)
bnd = (0.0, 1.0)
result = _base_optimize(
lambda w: w @ cov.values @ w, n,
bounds=[bnd] * n,
extra_constraints=[
{'type': 'eq', 'fun': lambda w: float(w @ mu.values) - target_return}
]
)
return pd.Series(result.x, index=cov.index) if result.success else None
# ══════════════════════════════════════════════════════════════════════
# §3 有效前沿
# Efficient Frontier
# ══════════════════════════════════════════════════════════════════════
#
# 有效前沿 (Efficient Frontier) 是所有"有效组合"的集合:
# → 在相同风险(波动率)下,收益最高
# → 在相同收益下,风险最低
#
# 有效前沿由两点确定:
# 左端点:最小方差组合 (Global Minimum Variance Portfolio, GMV)
# 右端点:预期收益最高的单只股票(极端集中)
print(f"\n[§3] 计算有效前沿 / Computing Efficient Frontier...")
# ── 计算关键节点组合 / Compute key special portfolios ──
w_minvar = min_variance(mu_opt, cov_opt)
w_maxsharpe = max_sharpe(mu_opt, cov_opt, rf=RF)
w_eq = pd.Series(np.ones(N_STOCKS) / N_STOCKS, index=stock_names)
ret_min, vol_min, sr_min = portfolio_stats(w_minvar, mu_opt, cov_opt)
ret_msr, vol_msr, sr_msr = portfolio_stats(w_maxsharpe, mu_opt, cov_opt)
ret_eq, vol_eq, sr_eq = portfolio_stats(w_eq, mu_opt, cov_opt)
# ── 扫描有效前沿 / Trace the efficient frontier ──
n_pts = 60
ret_lo = ret_min * 0.99
ret_hi = mu_opt.max() * 0.88
frontier_rets = []
frontier_vols = []
for tgt in np.linspace(ret_lo, ret_hi, n_pts):
w_eff = efficient_portfolio(tgt, mu_opt, cov_opt)
if w_eff is not None:
r, v, _ = portfolio_stats(w_eff, mu_opt, cov_opt)
frontier_rets.append(r)
frontier_vols.append(v)
# ── Monte Carlo 随机组合云(背景参照)/ Random portfolio cloud (background reference) ──
N_RANDOM = 3000
rand_rets = []
rand_vols = []
rand_sharpes = []
for _ in range(N_RANDOM):
w_r = np.random.dirichlet(np.ones(N_STOCKS)) # 满足 w≥0, Σw=1 的随机权重
r, v, sr = portfolio_stats(w_r, mu_opt, cov_opt)
rand_rets.append(r)
rand_vols.append(v)
rand_sharpes.append(sr)
print(f" 有效前沿点数 (Frontier points): {len(frontier_rets)}")
print(f" 最小方差组合 (MinVar): 收益={ret_min:.1%}, 波动={vol_min:.1%}, 夏普={sr_min:.2f}")
print(f" 最大夏普组合 (MaxSharpe): 收益={ret_msr:.1%}, 波动={vol_msr:.1%}, 夏普={sr_msr:.2f}")
# ══════════════════════════════════════════════════════════════════════
# §4 风险平价
# Risk Parity
# ══════════════════════════════════════════════════════════════════════
#
# 核心思想:每只资产对组合总风险的贡献相等
# Core idea: each asset contributes equally to total portfolio risk.
#
# 风险贡献 (Risk Contribution, RC) 的定义:
#
# RC_i = w_i ×σ_p/∂w_i = w_i × (Σw)_i / σ_p
# ↑ 边际风险贡献 (Marginal Risk Contribution, MRC)
#
# 等风险贡献条件 (Equal Risk Contribution):
# RC_i = σ_p / N ←→ w_i × (Σw)_i = w_j × (Σw)_j for all i, j
#
# 优化形式(最小化各股风险贡献比例与目标 1/N 的偏差平方和):
# minimize Σ_i (RC_i/σ_p 1/N)²
print(f"\n[§4] 计算风险平价组合 / Risk Parity Portfolio...")
def risk_contributions(weights, cov):
"""
计算每只资产的绝对风险贡献和风险贡献占比
Compute absolute risk contribution and risk contribution fraction for each asset.
Returns:
rc : 绝对风险贡献向量 (Absolute Risk Contribution vector), shape (N,)
sigma_p : 组合总波动率 (Total portfolio volatility)
"""
w = np.asarray(weights)
sigma_p = np.sqrt(float(w @ cov.values @ w)) # 组合波动率 (Portfolio vol)
mrc = cov.values @ w / sigma_p # 边际风险贡献 (MRC = ∂σ/∂w)
rc = w * mrc # 绝对风险贡献 (RC = w × MRC)
return rc, sigma_p
def risk_parity(cov):
"""
等风险贡献组合 (Equal Risk Contribution Portfolio)
约束w_i ≥ 0 (严格多头,做空会有负的风险贡献,没有意义)
Strictly long-only: short positions would give negative RC (meaningless).
"""
N = cov.shape[0]
tgt = 1.0 / N # 目标风险贡献占比 = 1/N
def objective(w):
rc, sigma_p = risk_contributions(w, cov)
rc_pct = rc / sigma_p # 风险贡献占比 (RC fraction)
return np.sum((rc_pct - tgt) ** 2)
result = _base_optimize(
objective, N,
bounds=[(1e-6, 1.0)] * N # 严格正数(不允许做空)
)
return pd.Series(result.x, index=cov.index)
w_rp = risk_parity(cov_opt)
rc_rp, sigma_rp = risk_contributions(w_rp, cov_opt)
ret_rp, vol_rp, sr_rp = portfolio_stats(w_rp, mu_opt, cov_opt)
rc_pct_rp = rc_rp / sigma_rp
max_rc_dev = np.max(np.abs(rc_pct_rp - 1.0 / N_STOCKS)) # 与目标的最大偏差
print(f" 风险平价组合: 收益={ret_rp:.1%}, 波动={vol_rp:.1%}, 夏普={sr_rp:.2f}")
print(f" 风险贡献最大偏差 (Max RC deviation from 1/N): {max_rc_dev:.5f}")
print(f" → 接近 0 表示各股风险贡献几乎相等 / ≈0 means near-perfect equal RC")
# ══════════════════════════════════════════════════════════════════════
# §5 Black-Litterman 模型
# Black-Litterman Model
# ══════════════════════════════════════════════════════════════════════
#
# 马科维兹优化的两大痛点 / Two pain points of Markowitz optimization:
# ① 预期收益 μ 难以估计,哪怕小幅误差也会导致权重大幅波动
# Expected returns are noisy; small errors cause wild weight swings.
# ② 结果过度集中("角点解"),实际不可用
# Results are over-concentrated "corner solutions".
#
# Black-Litterman (1990) 的贝叶斯框架 / Bayesian framework:
# ① 先验 (Prior): 从市场均衡推导隐含收益CAPM 反向优化)
# Market-implied returns from reverse-optimizing CAPM.
# ② 观点 (Views): 投资者对某些股票/组合的主观预期
# Investor's subjective views on specific stocks/portfolios.
# ③ 后验 (Posterior): 贝叶斯更新,先验与观点的加权混合
# Bayesian posterior blending prior and views.
#
# 后验均值公式 / Posterior mean formula (He & Litterman, 1999):
#
# μ_BL = [(τΣ)⁻¹ + P'Ω⁻¹P]⁻¹ × [(τΣ)⁻¹Π + P'Ω⁻¹Q]
# ───────────────────── ──────────────────────
# 精度矩阵之和 加权均值向量
# Sum of precision Weighted mean vector
#
# Π = δ × Σ × w_mkt ← 市场均衡隐含超额收益 (Market equilibrium implied returns)
# δ = 风险厌恶系数 (Risk aversion coefficient)
# τ = 先验收缩参数 (Prior scaling factor), 通常 0.025~0.10
# P = 观点矩阵 K×N (View matrix, K views, N assets)
# Q = 观点收益向量 K×1 (View expected excess returns)
# Ω = 观点不确定性矩阵 K×K (View uncertainty / confidence matrix)
print(f"\n[§5] Black-Litterman 模型 / Black-Litterman Model...")
DELTA = 2.5 # 风险厌恶系数 (Risk aversion), 典型值 2.0~3.0
TAU = 0.05 # 先验收缩参数 (Prior scaling), 典型值 0.025~0.10
# ─── Step 1: 计算市场均衡隐含超额收益 ───
# Market equilibrium implied excess returns (Reverse Optimization of CAPM)
# Π = δ × Σ × w_mkt
w_mkt = mktcap_weights.values
Pi = DELTA * cov_opt.values @ w_mkt
Pi_s = pd.Series(Pi, index=stock_names)
# ─── Step 2: 设定投资者观点 (Investor Views) ───
#
# 观点1 (View 1): "S01 的年化超额收益率将达到 +5%"(单资产绝对观点)
# "S01 will earn +5% excess return per year" (absolute view)
#
# 观点2 (View 2): "S00 将比 S04 多赚 3%"(两资产相对观点)
# "S00 will outperform S04 by 3%" (relative view)
K = 2 # 观点数量 (Number of views)
# P 矩阵K × NP[k, i] = 资产 i 在观点 k 中的权重(正=多头,负=空头)
P = np.zeros((K, N_STOCKS))
P[0, 1] = 1.0 # 观点1仅 S01
P[1, 0] = 1.0 # 观点2S00 多头
P[1, 4] = -1.0 # S04 空头(相对观点)
# Q 向量:观点的预期超额收益 (View expected excess returns)
Q = np.array([0.05, 0.03])
# Ω 矩阵观点的不确定性He & Litterman 建议的设置方式)
# Ω = τ × diag(P Σ P'),对角元素越大表示该观点越不确定
Omega = np.diag(TAU * np.diag(P @ cov_opt.values @ P.T))
Omega_inv = np.linalg.inv(Omega)
# ─── Step 3: 贝叶斯更新,计算后验预期超额收益 ───
# Bayesian posterior expected excess returns
tau_cov_inv = np.linalg.inv(TAU * cov_opt.values)
A = tau_cov_inv + P.T @ Omega_inv @ P # 精度矩阵之和
b = tau_cov_inv @ Pi + P.T @ Omega_inv @ Q
mu_bl = np.linalg.solve(A, b) + RF # 后验 = 超额 + 无风险利率
mu_bl_s = pd.Series(mu_bl, index=stock_names)
# ─── Step 4: 用后验收益做最大夏普优化 ───
w_bl = max_sharpe(mu_bl_s, cov_opt, rf=RF)
ret_bl, vol_bl, sr_bl = portfolio_stats(w_bl, mu_bl_s, cov_opt)
print(f" 市场均衡隐含收益范围: {Pi_s.min():.1%} ~ {Pi_s.max():.1%}")
print(f" BL 后验收益范围: {mu_bl_s.min():.1%} ~ {mu_bl_s.max():.1%}")
print(f" BL 组合: 收益={ret_bl:.1%}, 波动={vol_bl:.1%}, 夏普={sr_bl:.2f}")
print(f" 观点受益股S01, S00权重 BL: {w_bl['S01']:.1%}, {w_bl['S00']:.1%}")
print(f" 观点受益股S01, S00权重 等权: {w_eq['S01']:.1%}, {w_eq['S00']:.1%}")
# ══════════════════════════════════════════════════════════════════════
# §6 带约束的组合优化
# Constrained Portfolio Optimization
# ══════════════════════════════════════════════════════════════════════
#
# 实际组合管理中必须满足多重约束 / Real portfolios need multiple constraints:
# ① 单只股票权重上限(集中度管理)/ Max individual weight (concentration limit)
# ② 行业权重区间(风格/行业暴露管理)/ Sector weight bounds (style control)
# ③ 换手率约束(控制交易成本)/ Turnover constraint (transaction cost control)
#
# 换手率 (Turnover) 的定义:
# Turnover = Σ_i |w_new,i - w_old,i| / 2 (双边换手率,除以 2 避免重复计算)
# 或简化为 L1 距离Σ_i |w_new,i - w_old,i|
print(f"\n[§6] 带约束的最大夏普组合 / Constrained Max-Sharpe Portfolio...")
MAX_STOCK_WT = 0.15 # 单只股票最大 15% / Max 15% per stock
MAX_SECTOR_WT = 0.35 # 单行业最大 35% / Max 35% per sector
MIN_SECTOR_WT = 0.10 # 单行业最小 10% / Min 10% per sector
MAX_TURNOVER = 0.50 # 最大换手率 50% / Max 50% one-way turnover
w_prev = np.ones(N_STOCKS) / N_STOCKS # 上期持仓(假设为等权)
n = N_STOCKS
w0 = np.ones(n) / n
extra_cons = []
# 约束1: 行业权重上下限
for sname, sidx in SECTORS.items():
def make_lb(idx): return lambda w: np.sum(w[idx]) - MIN_SECTOR_WT
def make_ub(idx): return lambda w: MAX_SECTOR_WT - np.sum(w[idx])
extra_cons.append({'type': 'ineq', 'fun': make_lb(sidx)}) # ≥ 10%
extra_cons.append({'type': 'ineq', 'fun': make_ub(sidx)}) # ≤ 35%
# 约束2: 换手率约束L1 距离 ≤ 50%
extra_cons.append({'type': 'ineq',
'fun': lambda w: MAX_TURNOVER - np.sum(np.abs(w - w_prev))})
result_c = _base_optimize(
lambda w: -portfolio_stats(w, mu_opt, cov_opt, RF)[2], # 最小化负夏普
n,
bounds=[(0.0, MAX_STOCK_WT)] * n,
extra_constraints=extra_cons
)
w_constrained = pd.Series(result_c.x, index=stock_names)
ret_c, vol_c, sr_c = portfolio_stats(w_constrained, mu_opt, cov_opt)
print(f" 带约束最大夏普: 收益={ret_c:.1%}, 波动={vol_c:.1%}, 夏普={sr_c:.2f}")
print(f" 最大单股权重: {w_constrained.max():.1%} (约束 ≤{MAX_STOCK_WT:.0%})")
for sname, sidx in SECTORS.items():
sw = w_constrained.iloc[sidx].sum()
ok = "" if MIN_SECTOR_WT <= sw <= MAX_SECTOR_WT else ""
print(f" {ok} {sname}: {sw:.1%}")
# ══════════════════════════════════════════════════════════════════════
# §7 滚动回测:五策略对比
# Rolling Backtest: 5-Strategy Comparison
# ══════════════════════════════════════════════════════════════════════
#
# 流程 / Workflow:
# 每个调仓日 T每月一次:
# ① 用过去 LOOKBACK 天估计 μ 和 Σ
# ② 用当前估计值重新计算各策略权重
# ③ 用新权重持有到下次调仓日,每日记录组合收益
#
# 五种策略 / 5 Strategies:
# EW : 等权(不需要优化)
# MinVar : 最小方差(只需 Σ,不需要 μ)
# MaxSharpe : 最大夏普(需要 μ 和 Σ)
# RiskParity : 风险平价(只需 Σ)
# BL : Black-Litterman贝叶斯混合
print(f"\n[§7] 滚动回测 / Rolling Backtest...")
LOOKBACK = 126 # 协方差回看期(约 6 个月)/ 6-month estimation window
REBAL_FREQ = 21 # 调仓频率(约 1 个月)/ Monthly rebalancing
STRATEGY_NAMES = [
"等权 EW",
"最小方差 MinVar",
"最大夏普 MaxSharpe",
"风险平价 RP",
"Black-Litterman BL",
]
# 初始化:回看期内用 0 填充(尚未开始实际持仓)
daily_pnl = {name: [0.0] * LOOKBACK for name in STRATEGY_NAMES}
# 当前持仓权重(初始为等权)
cur_w = {name: np.ones(N_STOCKS) / N_STOCKS for name in STRATEGY_NAMES}
rebal_idx = list(range(LOOKBACK, N_DAYS, REBAL_FREQ))
for t_pos, t in enumerate(rebal_idx):
# 取过去 LOOKBACK 天的历史收益率
hist = ret_df.iloc[t - LOOKBACK: t]
# 估计滚动协方差矩阵(加微小正则化保证正定性)
# Rolling covariance (add tiny ridge term to ensure positive definiteness)
cov_r_arr = hist.cov().values * FREQ + 1e-7 * np.eye(N_STOCKS)
mu_r = hist.mean() * FREQ
cov_r = pd.DataFrame(cov_r_arr, index=stock_names, columns=stock_names)
try:
new_w = {}
new_w["等权 EW"] = np.ones(N_STOCKS) / N_STOCKS
new_w["最小方差 MinVar"] = min_variance(mu_r, cov_r).values
new_w["最大夏普 MaxSharpe"] = max_sharpe(mu_r, cov_r, rf=RF).values
new_w["风险平价 RP"] = risk_parity(cov_r).values
# Black-Litterman 滚动版本:使用滚动先验 + 固定观点
Pi_r = DELTA * cov_r_arr @ mktcap_weights.values
try:
tci = np.linalg.inv(TAU * cov_r_arr)
A_r = tci + P.T @ Omega_inv @ P
b_r = tci @ Pi_r + P.T @ Omega_inv @ Q
mu_bl_r = np.linalg.solve(A_r, b_r) + RF
except np.linalg.LinAlgError:
mu_bl_r = mu_r.values
new_w["Black-Litterman BL"] = max_sharpe(
pd.Series(mu_bl_r, index=stock_names), cov_r, rf=RF
).values
cur_w = new_w
except Exception:
pass # 优化失败时保持上期权重 / Keep previous weights if optimization fails
# 计算持有期内每日组合收益
next_t = rebal_idx[t_pos + 1] if t_pos + 1 < len(rebal_idx) else N_DAYS
for name in STRATEGY_NAMES:
w = cur_w[name]
for d in range(t, next_t):
daily_pnl[name].append(float(w @ ret_df.iloc[d].values))
# 截齐到 N_DAYS防止因最后一段超出
for name in STRATEGY_NAMES:
daily_pnl[name] = daily_pnl[name][:N_DAYS]
# ── 计算净值曲线 (NAV Curves) ──
nav_df = pd.DataFrame(
{name: (1 + pd.Series(daily_pnl[name], index=dates)).cumprod()
for name in STRATEGY_NAMES}
)
# ── 计算绩效指标 (Performance Metrics) ──
perf_rows = []
for name in STRATEGY_NAMES:
r_s = pd.Series(daily_pnl[name][LOOKBACK:], index=dates[LOOKBACK:])
ann_ret = (1 + r_s).prod() ** (FREQ / len(r_s)) - 1 # 年化收益率 (Ann. return)
ann_vol = r_s.std() * np.sqrt(FREQ) # 年化波动率 (Ann. vol)
sharpe = (ann_ret - RF) / ann_vol if ann_vol > 0 else 0
nav_s = (1 + r_s).cumprod()
max_dd = (nav_s / nav_s.cummax() - 1).min() # 最大回撤 (Max drawdown)
calmar = ann_ret / (-max_dd) if max_dd < 0 else 99.0 # Calmar 比率
perf_rows.append({
"策略": name, "年化收益": ann_ret, "年化波动": ann_vol,
"夏普比率": sharpe, "最大回撤": max_dd, "Calmar": calmar
})
perf_df = pd.DataFrame(perf_rows).set_index("策略")
print(f"\n 策略对比 / Strategy Comparison:")
print(f" {'策略':<28} {'年化收益':>8} {'年化波动':>8} {'夏普':>7} {'最大回撤':>9} {'Calmar':>8}")
print(f" {'' * 65}")
for name, row in perf_df.iterrows():
print(f" {name:<28} {row['年化收益']:>8.1%} {row['年化波动']:>8.1%} "
f"{row['夏普比率']:>7.3f} {row['最大回撤']:>9.1%} {min(row['Calmar'], 99.0):>8.2f}")
# ══════════════════════════════════════════════════════════════════════
# §8 可视化9 图综合面板
# 9-Panel Visualization Dashboard
# ══════════════════════════════════════════════════════════════════════
print(f"\n[§8] 生成可视化图表 / Generating visualization...")
# 颜色配置 / Color palette
COLORS = {
"等权 EW": "#888888",
"最小方差 MinVar": "#2196F3",
"最大夏普 MaxSharpe": "#FF5722",
"风险平价 RP": "#4CAF50",
"Black-Litterman BL": "#9C27B0",
}
PCT_FMT = FuncFormatter(lambda x, _: f"{x:.0%}")
PCT1_FMT = FuncFormatter(lambda x, _: f"{x:.1%}")
DARK_BG = '#1A1D27'
LT = '#E0E0E0' # 浅色文字 (Light text)
GRID_C = '#2A2D3A' # 网格颜色 (Grid color)
fig = plt.figure(figsize=(22, 22), facecolor='#0F1117')
fig.suptitle(
"量化组合优化演示 / Portfolio Optimization Demo",
fontsize=18, color='white', y=0.98, fontweight='bold'
)
gs = gridspec.GridSpec(3, 3, figure=fig, hspace=0.42, wspace=0.32)
ax1 = fig.add_subplot(gs[0, 0]) # 有效前沿 Efficient Frontier
ax2 = fig.add_subplot(gs[0, 1]) # 组合权重堆叠 Portfolio Weights
ax3 = fig.add_subplot(gs[0, 2]) # 行业权重对比 Sector Weights
ax4 = fig.add_subplot(gs[1, 0]) # 净值曲线 NAV Curves
ax5 = fig.add_subplot(gs[1, 1]) # 风险贡献 Risk Contribution (RP)
ax6 = fig.add_subplot(gs[1, 2]) # 滚动夏普 Rolling Sharpe
ax7 = fig.add_subplot(gs[2, 0]) # 月度收益箱形图 Monthly Return Box
ax8 = fig.add_subplot(gs[2, 1]) # 相关系数热力图 Correlation Heatmap
ax9 = fig.add_subplot(gs[2, 2]) # 绩效汇总表 Performance Table
for ax in [ax1, ax2, ax3, ax4, ax5, ax6, ax7, ax8, ax9]:
ax.set_facecolor(DARK_BG)
ax.tick_params(colors=LT, labelsize=8)
for sp in ax.spines.values():
sp.set_edgecolor(GRID_C)
# ── 图1有效前沿 + 随机组合云 + 关键节点 ──
# Efficient Frontier + Monte Carlo cloud + key portfolios
ax1.set_title("有效前沿 Efficient Frontier", color=LT, fontsize=10, pad=6)
ax1.scatter(rand_vols, rand_rets,
c=rand_sharpes, cmap='RdYlGn', vmin=0, vmax=2.5,
alpha=0.25, s=6, zorder=1)
ax1.plot(frontier_vols, frontier_rets, '-', color='#FFD700', lw=2.2, zorder=4,
label='有效前沿')
# 资本市场线 (Capital Market Line, CML) — 从无风险利率到最大夏普组合并延伸
cml_x = np.linspace(0, vol_msr * 1.4, 50)
cml_y = RF + sr_msr * cml_x
ax1.plot(cml_x, cml_y, '--', color='#FFEB3B', lw=1.0, alpha=0.7,
zorder=3, label='资本市场线 CML')
# 关键组合点
specials = [
("最小方差\nMinVar", vol_min, ret_min, COLORS["最小方差 MinVar"]),
("最大夏普\nMaxSharpe",vol_msr, ret_msr, COLORS["最大夏普 MaxSharpe"]),
("等权\nEW", vol_eq, ret_eq, COLORS["等权 EW"]),
("风险平价\nRP", vol_rp, ret_rp, COLORS["风险平价 RP"]),
("BL", vol_bl, ret_bl, COLORS["Black-Litterman BL"]),
]
for label, v, r, c in specials:
ax1.scatter(v, r, s=100, color=c, zorder=6, edgecolors='white', lw=0.8)
ax1.annotate(label, (v, r), color=LT, fontsize=6,
xytext=(5, 3), textcoords='offset points')
# 无风险利率点 (Risk-free rate point)
ax1.scatter(0, RF, s=80, color='yellow', zorder=6, marker='*')
ax1.annotate(f"无风险\nRf={RF:.0%}", (0, RF), color='yellow', fontsize=6,
xytext=(4, 2), textcoords='offset points')
ax1.xaxis.set_major_formatter(PCT_FMT)
ax1.yaxis.set_major_formatter(PCT_FMT)
ax1.set_xlabel("年化波动率 Volatility", color=LT, fontsize=8)
ax1.set_ylabel("年化收益率 Return", color=LT, fontsize=8)
ax1.legend(fontsize=7, labelcolor=LT, facecolor=DARK_BG, edgecolor=GRID_C)
ax1.grid(color=GRID_C, alpha=0.5)
# ── 图2各策略权重堆叠柱状图 ──
# Stacked bar chart of portfolio weights
ax2.set_title("组合权重 Portfolio Weights", color=LT, fontsize=10, pad=6)
weight_order = ["EW", "MinVar", "MaxSharpe", "RP", "BL"]
weight_arrays = [w_eq.values, w_minvar.values, w_maxsharpe.values, w_rp.values, w_bl.values]
bar_colors = plt.cm.tab20(np.linspace(0, 1, N_STOCKS))
bottoms = np.zeros(len(weight_order))
x_pos = np.arange(len(weight_order))
for i in range(N_STOCKS):
heights = [wa[i] for wa in weight_arrays]
ax2.bar(x_pos, heights, bottom=bottoms, color=bar_colors[i], width=0.72)
bottoms += heights
ax2.set_xticks(x_pos)
ax2.set_xticklabels(weight_order, color=LT, fontsize=9)
ax2.yaxis.set_major_formatter(PCT_FMT)
ax2.set_ylabel("权重 Weight", color=LT, fontsize=8)
ax2.grid(axis='y', color=GRID_C, alpha=0.5)
# ── 图3行业权重对比分组柱状图──
# Grouped bar chart: sector weights per strategy
ax3.set_title("行业权重对比 Sector Weights", color=LT, fontsize=10, pad=6)
x_sec = np.arange(N_SECTORS)
bw = 0.15
strat_colors_list = list(COLORS.values())
for k, (wname, warr) in enumerate(zip(weight_order, weight_arrays)):
sec_wts = [np.sum(warr[SECTORS[s]]) for s in SECTOR_NAMES]
off = (k - len(weight_order) / 2 + 0.5) * bw
ax3.bar(x_sec + off, sec_wts, width=bw,
color=strat_colors_list[k], label=wname, alpha=0.85)
ax3.set_xticks(x_sec)
ax3.set_xticklabels(SECTOR_SHORT, color=LT, fontsize=8)
ax3.yaxis.set_major_formatter(PCT_FMT)
ax3.axhline(MAX_SECTOR_WT, ls='--', color='#FF7043', lw=0.8, alpha=0.6,
label=f'上限 {MAX_SECTOR_WT:.0%}')
ax3.legend(fontsize=6, labelcolor=LT, facecolor=DARK_BG, edgecolor=GRID_C,
ncol=2, loc='upper right')
ax3.grid(axis='y', color=GRID_C, alpha=0.5)
# ── 图4净值曲线 ──
ax4.set_title("净值曲线 NAV Curves", color=LT, fontsize=10, pad=6)
for name in STRATEGY_NAMES:
is_key = "MaxSharpe" in name or "BL" in name
ax4.plot(nav_df.index, nav_df[name],
color=COLORS[name], lw=2.0 if is_key else 1.2,
alpha=1.0 if is_key else 0.7,
label=name.split(" ")[0])
ax4.set_ylabel("净值 NAV", color=LT, fontsize=8)
ax4.legend(fontsize=7, labelcolor=LT, facecolor=DARK_BG,
edgecolor=GRID_C, ncol=2)
ax4.grid(color=GRID_C, alpha=0.5)
ax4.xaxis.set_tick_params(rotation=20)
# ── 图5风险平价组合的风险贡献分解 ──
# Risk contribution breakdown for Risk Parity portfolio
ax5.set_title("风险贡献分布 Risk Contribution (RP)", color=LT, fontsize=10, pad=6)
rc_sorted = pd.Series(rc_pct_rp * 100, index=stock_names).sort_values(ascending=False)
bar_c = ['#4CAF50' if abs(v / 100 - 1.0 / N_STOCKS) < 5e-3
else '#FF7043' for v in rc_sorted]
ax5.bar(range(N_STOCKS), rc_sorted.values, color=bar_c, alpha=0.85)
ax5.axhline(100.0 / N_STOCKS, color='white', lw=1.2, ls='--',
label=f'目标 1/N={100/N_STOCKS:.1f}%')
ax5.set_xticks(range(N_STOCKS))
ax5.set_xticklabels(rc_sorted.index, rotation=45, fontsize=6, color=LT)
ax5.set_ylabel("风险贡献 RC%", color=LT, fontsize=8)
ax5.legend(fontsize=7, labelcolor=LT, facecolor=DARK_BG, edgecolor=GRID_C)
ax5.grid(axis='y', color=GRID_C, alpha=0.5)
# ── 图6滚动夏普比率63 日窗口)──
# Rolling Sharpe ratio (63-day window)
ax6.set_title("滚动夏普比率 Rolling Sharpe (63D)", color=LT, fontsize=10, pad=6)
ROLL_WIN = 63
for name in STRATEGY_NAMES:
r_s = pd.Series(daily_pnl[name], index=dates)
roll_sr = r_s.rolling(ROLL_WIN).apply(
lambda x: (x.mean() * FREQ - RF) / (x.std() * np.sqrt(FREQ) + 1e-10)
)
ax6.plot(dates, roll_sr, color=COLORS[name], lw=1.0,
alpha=0.85, label=name.split(" ")[0])
ax6.axhline(0, color='white', lw=0.7, ls='--')
ax6.set_ylabel("夏普比率 Sharpe", color=LT, fontsize=8)
ax6.legend(fontsize=7, labelcolor=LT, facecolor=DARK_BG,
edgecolor=GRID_C, ncol=2)
ax6.grid(color=GRID_C, alpha=0.5)
ax6.xaxis.set_tick_params(rotation=20)
# ── 图7月度收益分布箱形图 ──
# Monthly return distribution boxplot
ax7.set_title("月度收益分布 Monthly Return Distribution", color=LT, fontsize=10, pad=6)
monthly_rets = {}
for name in STRATEGY_NAMES:
r_s = pd.Series(daily_pnl[name], index=dates)
monthly_rets[name.split(" ")[0]] = r_s.resample('M').apply(
lambda x: (1 + x).prod() - 1
).values
bp = ax7.boxplot(
list(monthly_rets.values()),
patch_artist=True,
labels=list(monthly_rets.keys()),
medianprops=dict(color='white', lw=1.5),
whiskerprops=dict(color=LT),
capprops=dict(color=LT),
flierprops=dict(marker='.', color='#888888', ms=3)
)
for patch, c in zip(bp['boxes'], list(COLORS.values())):
patch.set_facecolor(c)
patch.set_alpha(0.72)
ax7.yaxis.set_major_formatter(PCT1_FMT)
ax7.axhline(0, color='white', lw=0.6, ls='--')
ax7.set_xticklabels(list(monthly_rets.keys()), color=LT, fontsize=8, rotation=15)
ax7.grid(axis='y', color=GRID_C, alpha=0.5)
# ── 图8收益相关系数热力图含行业分割线──
# Return correlation heatmap with sector dividers
ax8.set_title("收益相关系数 Return Correlation Matrix", color=LT, fontsize=10, pad=6)
corr = ret_df.corr().values
im = ax8.imshow(corr, cmap='RdYlGn', vmin=-0.2, vmax=1.0, aspect='auto')
ax8.set_xticks(range(N_STOCKS))
ax8.set_yticks(range(N_STOCKS))
ax8.set_xticklabels(stock_names, rotation=90, fontsize=5.5, color=LT)
ax8.set_yticklabels(stock_names, fontsize=5.5, color=LT)
# 行业分割线 / Sector boundary lines
sector_bounds = [0, 5, 9, 13, 17, 20]
for b in sector_bounds[1:-1]:
ax8.axhline(b - 0.5, color='white', lw=0.9, alpha=0.8)
ax8.axvline(b - 0.5, color='white', lw=0.9, alpha=0.8)
cb = plt.colorbar(im, ax=ax8, fraction=0.046, pad=0.04)
cb.ax.tick_params(colors=LT, labelsize=7)
# ── 图9绩效汇总表 ──
# Performance summary table
ax9.axis('off')
ax9.set_title("绩效汇总 Performance Summary", color=LT, fontsize=10, pad=6)
col_labels = ["年化收益\nAnn.Ret", "年化波动\nAnn.Vol",
"夏普比率\nSharpe", "最大回撤\nMax DD", "Calmar\n比率"]
row_labels = [n.split(" ")[0] for n in STRATEGY_NAMES]
cell_data = []
for name in STRATEGY_NAMES:
r = perf_df.loc[name]
cell_data.append([
f"{r['年化收益']:+.1%}",
f"{r['年化波动']:.1%}",
f"{r['夏普比率']:.2f}",
f"{r['最大回撤']:.1%}",
f"{min(r['Calmar'], 99.0):.2f}",
])
tbl = ax9.table(cellText=cell_data, rowLabels=row_labels,
colLabels=col_labels, loc='center', cellLoc='center')
tbl.auto_set_font_size(False)
tbl.set_fontsize(8)
tbl.scale(1.12, 2.0)
for (row, col), cell in tbl.get_celld().items():
cell.set_facecolor(DARK_BG)
cell.set_edgecolor(GRID_C)
cell.set_text_props(color=LT)
if row == 0 or col == -1:
cell.set_facecolor('#2A2D3A')
cell.set_text_props(color='#FFD700', fontweight='bold')
# 保存图表 / Save figure
OUTPUT_FILE = "portfolio_optimization_demo.png"
plt.savefig(OUTPUT_FILE, dpi=150, bbox_inches='tight',
facecolor=fig.get_facecolor())
print(f" 图表已保存: {OUTPUT_FILE} / Chart saved: {OUTPUT_FILE}")
print("\n" + "=" * 70)
print(" 演示完成! Demo Complete!")
print(f" 输出: {OUTPUT_FILE}")
print("=" * 70)