920 lines
37 KiB
Python
920 lines
37 KiB
Python
# =============================================================================
|
||
# Quantitative Trading — Data Pipeline Demo
|
||
# =============================================================================
|
||
# Topics covered:
|
||
# 1. Price Adjustment for Corporate Actions (splits & dividends)
|
||
# 2. Simple Returns vs Log Returns
|
||
# 3. Multi-stock Panel & Missing Value Handling
|
||
# 4. Outlier Detection & Treatment (Z-score, MAD, Winsorize)
|
||
# 4b. Circuit Breakers & Price Limit Flags (涨跌停) [A-shares]
|
||
# 5. Trading Calendar & Cross-market Alignment
|
||
# 6. End-to-End DataPipeline Class
|
||
# =============================================================================
|
||
|
||
import numpy as np
|
||
import pandas as pd
|
||
import matplotlib.pyplot as plt
|
||
from scipy import stats
|
||
import warnings
|
||
warnings.filterwarnings('ignore')
|
||
|
||
# 使用我们在 Docker 中配置的字体
|
||
plt.rcParams['font.sans-serif'] = ['WenQuanYi Zen Hei', 'Arial Unicode MS', 'SimHei', 'DejaVu Sans']
|
||
plt.rcParams['axes.unicode_minus'] = False
|
||
plt.rcParams['figure.figsize'] = (12, 6)
|
||
plt.rcParams['figure.dpi'] = 100
|
||
|
||
np.random.seed(42)
|
||
print("环境准备完成!")
|
||
|
||
# =============================================================================
|
||
# TOPIC 1: Price Adjustment for Corporate Actions (价格复权)
|
||
# -----------------------------------------------------------------------------
|
||
# Raw stock prices have artificial jumps caused by stock splits and cash
|
||
# dividends. These must be adjusted before computing returns or signals,
|
||
# otherwise your algorithm will see fake crashes/spikes.
|
||
#
|
||
# Key concepts:
|
||
# - Unadjusted price (未复权): raw market price, has visible jumps
|
||
# - Forward-adjusted (前复权): history scaled to today's price level
|
||
# - Backward-adjusted (后复权): history kept real, recent prices scaled up
|
||
# - Adjustment factor (复权因子): multiplier that accounts for all events
|
||
# =============================================================================
|
||
|
||
def generate_raw_stock_data(n_days=1000, seed=42):
|
||
"""生成包含拆股和分红事件的原始股票数据"""
|
||
np.random.seed(seed)
|
||
dates = pd.bdate_range(start='2020-01-02', periods=n_days)
|
||
|
||
# 生成"真实"价格序列 (几何布朗运动)
|
||
mu, sigma = 0.12, 0.25
|
||
dt = 1 / 252
|
||
log_ret = (mu - 0.5 * sigma**2) * dt + sigma * np.sqrt(dt) * np.random.randn(n_days)
|
||
true_prices = 100 * np.exp(np.cumsum(log_ret))
|
||
|
||
# 模拟成交量
|
||
volume = np.random.lognormal(mean=15, sigma=0.5, size=n_days).astype(int)
|
||
|
||
# 构建DataFrame
|
||
df = pd.DataFrame({
|
||
'date': dates,
|
||
'close': true_prices,
|
||
'volume': volume
|
||
}).set_index('date')
|
||
|
||
# 添加开高低价
|
||
daily_range = df['close'] * np.abs(np.random.randn(n_days)) * 0.02
|
||
df['open'] = df['close'] + np.random.randn(n_days) * daily_range * 0.5
|
||
df['high'] = np.maximum(df['close'], df['open']) + np.abs(np.random.randn(n_days)) * daily_range * 0.3
|
||
df['low'] = np.minimum(df['close'], df['open']) - np.abs(np.random.randn(n_days)) * daily_range * 0.3
|
||
|
||
# 定义公司行为事件
|
||
events = []
|
||
|
||
# 1:2拆股, 在第300个交易日
|
||
split_idx = 300
|
||
split_date = dates[split_idx]
|
||
events.append({'date': split_date, 'type': 'split', 'ratio': 2})
|
||
|
||
# 现金分红, 每季度一次
|
||
for q in [60, 185, 310, 435, 560, 685, 810, 935]:
|
||
if q < n_days:
|
||
events.append({'date': dates[q], 'type': 'dividend', 'amount': 0.5})
|
||
|
||
# 应用公司行为到"未复权"价格
|
||
unadj_close = df['close'].copy()
|
||
adj_factors = pd.Series(1.0, index=dates)
|
||
|
||
for event in sorted(events, key=lambda x: x['date']):
|
||
idx = dates.get_loc(event['date'])
|
||
if event['type'] == 'split':
|
||
# 拆股
|
||
unadj_close.iloc[idx:] = unadj_close.iloc[idx:] / event['ratio']
|
||
adj_factors.iloc[idx:] = adj_factors.iloc[idx:] * event['ratio']
|
||
elif event['type'] == 'dividend':
|
||
# 分红
|
||
price_before = unadj_close.iloc[idx - 1] if idx > 0 else unadj_close.iloc[idx]
|
||
div_ratio = 1 - event['amount'] / price_before
|
||
unadj_close.iloc[idx:] = unadj_close.iloc[idx:] * div_ratio
|
||
adj_factors.iloc[:idx] = adj_factors.iloc[:idx] * div_ratio
|
||
|
||
df['unadj_close'] = unadj_close
|
||
df['adj_factor'] = adj_factors
|
||
|
||
events_df = pd.DataFrame(events)
|
||
|
||
return df, events_df
|
||
|
||
def forward_adjust(unadj_prices, adj_factor):
|
||
"""前复权:以最新价格为基准,向前调整历史价格"""
|
||
normalized_factor = adj_factor / adj_factor.iloc[-1]
|
||
return unadj_prices * normalized_factor
|
||
|
||
def backward_adjust(unadj_prices, adj_factor):
|
||
"""后复权:以最早价格为基准,向后调整"""
|
||
normalized_factor = adj_factor / adj_factor.iloc[0]
|
||
return unadj_prices * normalized_factor
|
||
|
||
# 生成数据
|
||
stock_data, events = generate_raw_stock_data()
|
||
|
||
print("公司行为事件:")
|
||
print(events.to_string(index=False))
|
||
print(f"\n数据形状: {stock_data.shape}")
|
||
|
||
# 计算复权价格
|
||
stock_data['fwd_adj_close'] = forward_adjust(stock_data['unadj_close'], stock_data['adj_factor'])
|
||
stock_data['bwd_adj_close'] = backward_adjust(stock_data['unadj_close'], stock_data['adj_factor'])
|
||
|
||
# 可视化对比
|
||
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
|
||
|
||
# 1. 未复权价格
|
||
ax = axes[0, 0]
|
||
ax.plot(stock_data['unadj_close'], color='blue', linewidth=1)
|
||
ax.set_title('未复权价格 (可见拆股/分红跳跃)', fontsize=13)
|
||
ax.grid(alpha=0.3)
|
||
|
||
for _, event in events.iterrows():
|
||
ax.axvline(x=event['date'], color='red', linestyle='--', alpha=0.5)
|
||
|
||
# 2. 前复权价格
|
||
ax = axes[0, 1]
|
||
ax.plot(stock_data['fwd_adj_close'], color='green', linewidth=1)
|
||
ax.set_title('前复权价格 (当前价格真实)', fontsize=13)
|
||
ax.grid(alpha=0.3)
|
||
|
||
# 3. 后复权价格
|
||
ax = axes[1, 0]
|
||
ax.plot(stock_data['bwd_adj_close'], color='orange', linewidth=1)
|
||
ax.set_title('后复权价格 (历史价格真实)', fontsize=13)
|
||
ax.grid(alpha=0.3)
|
||
|
||
# 4. "真实"价格对比
|
||
ax = axes[1, 1]
|
||
ax.plot(stock_data['close'], label='真实连续价格', color='black', linewidth=1.5)
|
||
ax.plot(stock_data['fwd_adj_close'], label='前复权价格', color='green', linewidth=1, alpha=0.7)
|
||
ax.set_title('真实价格 vs 前复权价格', fontsize=13)
|
||
ax.legend()
|
||
ax.grid(alpha=0.3)
|
||
|
||
plt.suptitle('复权价格对比', fontsize=15, y=1.02)
|
||
plt.tight_layout()
|
||
plt.show()
|
||
|
||
# 验证收益率
|
||
print("\n收益率验证 (拆股日前后):")
|
||
split_date = events[events['type'] == 'split']['date'].iloc[0]
|
||
split_idx = stock_data.index.get_loc(split_date)
|
||
|
||
print(f"拆股日期: {split_date.strftime('%Y-%m-%d')}")
|
||
print(f"未复权收益率: {stock_data['unadj_close'].iloc[split_idx] / stock_data['unadj_close'].iloc[split_idx-1] - 1:.4f}")
|
||
print(f"前复权收益率: {stock_data['fwd_adj_close'].iloc[split_idx] / stock_data['fwd_adj_close'].iloc[split_idx-1] - 1:.4f}")
|
||
print(f"真实收益率: {stock_data['close'].iloc[split_idx] / stock_data['close'].iloc[split_idx-1] - 1:.4f}")
|
||
|
||
|
||
# =============================================================================
|
||
# TOPIC 2: Simple Returns vs Log Returns (简单收益率 vs 对数收益率)
|
||
# -----------------------------------------------------------------------------
|
||
# Two ways to measure how much a price changed:
|
||
# - Simple return: r = (P_t / P_{t-1}) - 1
|
||
# - Log return: r = ln(P_t / P_{t-1})
|
||
#
|
||
# Why log returns are preferred in quant research:
|
||
# - Time-additive: sum daily log returns to get total log return
|
||
# - Better statistical properties (closer to normal distribution)
|
||
# - Numerically stable for long compounding periods
|
||
#
|
||
# At daily frequency the difference is tiny (~1bp), but it grows at
|
||
# monthly/annual frequency — never mix the two in the same calculation.
|
||
# =============================================================================
|
||
|
||
# 使用前复权价格计算收益率
|
||
prices = stock_data['fwd_adj_close']
|
||
|
||
# 简单收益率
|
||
simple_returns = prices.pct_change().dropna()
|
||
|
||
# 对数收益率
|
||
log_returns = np.log(prices / prices.shift(1)).dropna()
|
||
|
||
# 对比
|
||
comparison = pd.DataFrame({
|
||
'简单收益率': simple_returns,
|
||
'对数收益率': log_returns,
|
||
'差异': simple_returns - log_returns,
|
||
'差异(bp)': (simple_returns - log_returns) * 10000 # 基点
|
||
})
|
||
|
||
print("简单收益率 vs 对数收益率统计:")
|
||
print(f"平均绝对差异: {comparison['差异'].abs().mean():.6f} ({comparison['差异(bp)'].abs().mean():.6f})")
|
||
print(f"最大差异: {comparison['差异'].abs().max():.6f} ({comparison['差异(bp)'].abs().max():.6f})")
|
||
print(f"\n日频数据下两者差异很小,但月频或年频时差异会显著增大。")
|
||
|
||
# 验证时间可加性
|
||
print("\n=== 时间可加性验证 ===")
|
||
# 对数收益率:时间上直接求和
|
||
total_log_return = log_returns.sum()
|
||
actual_total = np.log(prices.iloc[-1] / prices.iloc[0])
|
||
print(f"对数收益率求和: {total_log_return:.6f}")
|
||
print(f"实际总对数收益: {actual_total:.6f}")
|
||
print(f"差异: {abs(total_log_return - actual_total):.10f} (几乎为零)")
|
||
|
||
print("\n简单收益率需要连乘:")
|
||
total_simple_product = (1 + simple_returns).prod() - 1
|
||
actual_simple = prices.iloc[-1] / prices.iloc[0] - 1
|
||
print(f"简单收益率连乘: {total_simple_product:.6f}")
|
||
print(f"实际总简单收益: {actual_simple:.6f}")
|
||
print(f"差异: {abs(total_simple_product - actual_simple):.10f}")
|
||
|
||
# 可视化
|
||
fig, axes = plt.subplots(1, 2, figsize=(14, 5))
|
||
|
||
ax = axes[0]
|
||
ax.scatter(simple_returns, log_returns, alpha=0.3, s=5)
|
||
ax.plot([-0.1, 0.1], [-0.1, 0.1], 'r--', linewidth=1, label='y=x')
|
||
ax.set_xlabel('简单收益率')
|
||
ax.set_ylabel('对数收益率')
|
||
ax.set_title('简单收益率 vs 对数收益率 (日频) ', fontsize=13)
|
||
ax.legend()
|
||
ax.grid(alpha=0.3)
|
||
|
||
ax = axes[1]
|
||
# 月频对比
|
||
monthly_simple = prices.resample('ME').last().pct_change().dropna()
|
||
monthly_log = np.log(prices.resample('ME').last() / prices.resample('ME').last().shift(1)).dropna()
|
||
ax.scatter(monthly_simple, monthly_log, alpha=0.5, s=20)
|
||
ax.plot([-0.2, 0.2], [-0.2, 0.2], 'r--', linewidth=1, label='y=x')
|
||
ax.set_xlabel('简单收益率')
|
||
ax.set_ylabel('对数收益率')
|
||
ax.set_title('简单收益率 vs 对数收益率 (月频, 差异更明显) ', fontsize=13)
|
||
ax.legend()
|
||
ax.grid(alpha=0.3)
|
||
|
||
plt.tight_layout()
|
||
plt.show()
|
||
|
||
|
||
# =============================================================================
|
||
# TOPIC 3: Multi-stock Panel & Missing Value Handling (多标的面板与缺失值处理)
|
||
# -----------------------------------------------------------------------------
|
||
# Real market data panels have two main sources of missing values:
|
||
# - Suspensions (停牌): a stock halts trading for days/weeks
|
||
# - Data feed gaps: vendor errors, holidays, late reporting
|
||
#
|
||
# Strategies (choice depends on context):
|
||
# - ffill (forward fill): use last known price — standard for suspensions
|
||
# - Linear interpolation: smooth gaps — OK for short random gaps
|
||
# - ffill with limit: ffill but cap at N days — conservative choice
|
||
# - Drop: remove rows/columns — loses cross-sectional data
|
||
#
|
||
# Rule of thumb: if missing% > 10%, consider excluding the stock entirely.
|
||
# =============================================================================
|
||
|
||
def generate_multi_stock_panel(n_stocks=5, n_days=500, seed=42):
|
||
"""生成多标的日线数据面板,包含停牌和缺失值"""
|
||
np.random.seed(seed)
|
||
dates = pd.bdate_range(start='2022-01-03', periods=n_days)
|
||
|
||
stock_names = [f'Stock_{chr(65+i)}' for i in range(n_stocks)] # Stock_A, Stock_B, ...
|
||
|
||
all_data = {}
|
||
for i, name in enumerate(stock_names):
|
||
mu = np.random.uniform(0.05, 0.20)
|
||
sigma = np.random.uniform(0.15, 0.35)
|
||
S0 = np.random.uniform(20, 200)
|
||
|
||
dt = 1 / 252
|
||
log_ret = (mu - 0.5 * sigma**2) * dt + sigma * np.sqrt(dt) * np.random.randn(n_days)
|
||
prices = S0 * np.exp(np.cumsum(log_ret))
|
||
volume = np.random.lognormal(mean=14, sigma=0.8, size=n_days).astype(int)
|
||
|
||
# 模拟停牌 (随机选择一些连续日期设为NaN)
|
||
suspension_starts = np.random.choice(range(50, n_days - 30), size=2 + i % 3, replace=False)
|
||
for start in suspension_starts:
|
||
duration = np.random.randint(1, 10)
|
||
end = min(start + duration, n_days)
|
||
prices[start:end] = np.nan
|
||
volume[start:end] = 0
|
||
|
||
# 模拟随机缺失
|
||
random_missing = np.random.choice(range(n_days), size=5, replace=False)
|
||
prices[random_missing] = np.nan
|
||
|
||
all_data[name] = pd.DataFrame({
|
||
'close': prices,
|
||
'volume': volume
|
||
}, index=dates)
|
||
|
||
# 构建面板
|
||
price_panel = pd.DataFrame({name: data['close'] for name, data in all_data.items()})
|
||
volume_panel = pd.DataFrame({name: data['volume'] for name, data in all_data.items()})
|
||
|
||
return price_panel, volume_panel
|
||
|
||
# 生成多标的面板
|
||
price_panel, volume_panel = generate_multi_stock_panel(n_stocks=5, n_days=500)
|
||
|
||
# 缺失值统计
|
||
missing_stats = pd.DataFrame({
|
||
'总天数': len(price_panel),
|
||
'缺失天数': price_panel.isna().sum(),
|
||
'缺失比例': price_panel.isna().mean(),
|
||
'最长连续缺失': [price_panel[col].isna().astype(int).groupby(
|
||
price_panel[col].notna().astype(int).cumsum()).sum().max()
|
||
for col in price_panel.columns]
|
||
})
|
||
|
||
print("缺失值统计:")
|
||
print(missing_stats)
|
||
print(f"\n面板形状: {price_panel.shape}")
|
||
|
||
# 缺失值处理方法对比
|
||
def handle_missing_values(prices, method='ffill'):
|
||
"""处理缺失值的不同方法"""
|
||
if method == 'ffill':
|
||
# ffill = forward fill (前向填充): 用前一个有效值填补缺失值
|
||
# 最常用于停牌场景: 假设停牌期间价格保持不变
|
||
return prices.ffill()
|
||
elif method == 'linear':
|
||
# 线性插值
|
||
return prices.interpolate(method='linear')
|
||
elif method == 'drop':
|
||
# 直接删除
|
||
return prices.dropna()
|
||
elif method == 'ffill_limit':
|
||
# 有限制的向前填充 (最多填充5天)
|
||
return prices.ffill(limit=5)
|
||
else:
|
||
raise ValueError(f"Unknown method: {method}")
|
||
|
||
# 对Stock_A展示不同方法的效果
|
||
stock_a = price_panel['Stock_A'].copy()
|
||
|
||
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
|
||
|
||
methods = {
|
||
'原始数据 (含缺失值) ': stock_a,
|
||
'向前填充 (ffill)': handle_missing_values(stock_a, 'ffill'),
|
||
'线性插值 (interpolate)': handle_missing_values(stock_a, 'linear'),
|
||
'有限填充 (ffill, limit=5)': handle_missing_values(stock_a, 'ffill_limit')
|
||
}
|
||
|
||
for ax, (title, data) in zip(axes.flat, methods.items()):
|
||
ax.plot(data, linewidth=1)
|
||
# 标记原始缺失位置
|
||
missing_mask = stock_a.isna()
|
||
if not missing_mask.all() and title != '原始数据 (含缺失值) ':
|
||
filled_values = data[missing_mask].dropna()
|
||
if len(filled_values) > 0:
|
||
ax.scatter(filled_values.index, filled_values.values,
|
||
color='red', s=15, zorder=5, label='填充值')
|
||
ax.legend(fontsize=9)
|
||
ax.set_title(title, fontsize=12)
|
||
ax.grid(alpha=0.3)
|
||
|
||
plt.suptitle('缺失值处理方法对比 (Stock_A)', fontsize=14, y=1.02)
|
||
plt.tight_layout()
|
||
plt.show()
|
||
|
||
print("\n实践建议:")
|
||
print("- 停牌: 使用ffill (假设停牌期间价格不变) ")
|
||
print("- 短期缺失(<3天): 可用线性插值")
|
||
print("- 长期缺失(>10天): 考虑从分析中排除该标的")
|
||
print("- 计算收益率前: 先填充缺失值, 再计算")
|
||
|
||
|
||
# 构建干净的多标的数据面板
|
||
# Steps: ffill → drop stocks/rows still NaN → compute returns
|
||
#
|
||
# ⚠️ Look-Ahead Bias Warning: DO NOT use bfill() here.
|
||
# bfill() fills early NaN rows by looking at future prices, which means
|
||
# your "day 1" price would be sourced from a future observation.
|
||
# This is data leakage — it would inflate backtest performance.
|
||
# Safe rule: only ffill (past → present), then drop what remains NaN.
|
||
|
||
clean_prices = price_panel.ffill() # only look backward
|
||
clean_prices = clean_prices.dropna(how='any') # drop rows where any stock still NaN
|
||
|
||
print(f"清洗前缺失值: {price_panel.isna().sum().sum()}")
|
||
print(f"清洗后缺失值: {clean_prices.isna().sum().sum()}")
|
||
print(f"(dropped {len(price_panel) - len(clean_prices)} leading rows to avoid look-ahead bias)")
|
||
|
||
# 计算收益率面板
|
||
returns_panel = clean_prices.pct_change().dropna()
|
||
|
||
# 可视化归一化价格
|
||
fig, ax = plt.subplots(figsize=(14, 6))
|
||
normalized = clean_prices / clean_prices.iloc[0] * 100
|
||
for col in normalized.columns:
|
||
ax.plot(normalized[col], label=col, linewidth=1.2)
|
||
|
||
ax.set_title('多标的归一化价格走势 (基期=100) ', fontsize=14)
|
||
ax.legend()
|
||
ax.grid(alpha=0.3)
|
||
plt.tight_layout()
|
||
plt.show()
|
||
|
||
# 相关性矩阵
|
||
corr_matrix = returns_panel.corr()
|
||
fig, ax = plt.subplots(figsize=(8, 6))
|
||
im = ax.imshow(corr_matrix.values, cmap='RdBu_r', vmin=-1, vmax=1)
|
||
ax.set_xticks(range(len(corr_matrix)))
|
||
ax.set_xticklabels(corr_matrix.columns, rotation=45)
|
||
ax.set_yticks(range(len(corr_matrix)))
|
||
ax.set_yticklabels(corr_matrix.columns)
|
||
for i in range(len(corr_matrix)):
|
||
for j in range(len(corr_matrix)):
|
||
ax.text(j, i, f'{corr_matrix.iloc[i,j]:.2f}', ha='center', va='center', fontsize=11)
|
||
plt.colorbar(im, ax=ax)
|
||
ax.set_title('收益率相关性矩阵', fontsize=14)
|
||
plt.tight_layout()
|
||
plt.show()
|
||
|
||
|
||
# =============================================================================
|
||
# TOPIC 4: Outlier Detection & Treatment (异常值检测与处理)
|
||
# -----------------------------------------------------------------------------
|
||
# Financial return series contain extreme values from real events (crashes,
|
||
# halts, data errors). How you handle them affects every downstream signal.
|
||
#
|
||
# Detection methods:
|
||
# - Z-score: |z| = |(x - mean) / std| > 3 — simple, but mean/std are
|
||
# themselves pulled by outliers (not robust)
|
||
# - MAD: uses median instead of mean — robust to extreme values,
|
||
# preferred for financial data
|
||
#
|
||
# Treatment methods:
|
||
# - Remove: delete the outlier row — reduces sample size
|
||
# - Winsorize: clip to [p1, p99] boundary — keeps all data, limits impact
|
||
# standard practice before factor construction
|
||
#
|
||
# Q-Q plot: visualizes how far returns deviate from a normal distribution.
|
||
# Fat tails (leptokurtosis) are a universal property of financial returns.
|
||
# =============================================================================
|
||
|
||
def detect_outliers_zscore(series, threshold=3.0):
|
||
"""Z-score (标准分数) 方法检测异常值
|
||
|
||
Z = (x - mean) / std
|
||
即:这个值偏离平均值多少个标准差?
|
||
|Z| > threshold (默认3) 视为异常值
|
||
|
||
通俗理解:如果大家平均考70分,标准差10分,你考了100分,
|
||
Z = (100-70)/10 = 3,说明你偏离平均值3个标准差,属于异常值。
|
||
|
||
缺点:均值和标准差本身受异常值影响
|
||
"""
|
||
z_scores = (series - series.mean()) / series.std()
|
||
outlier_mask = z_scores.abs() > threshold
|
||
return outlier_mask, z_scores
|
||
|
||
|
||
def detect_outliers_mad(series, threshold=3.0):
|
||
"""MAD (中位数绝对偏差, Median Absolute Deviation) 方法检测异常值
|
||
|
||
MAD = median(|x - median(x)|) 即所有值与中位数的距离的中位数
|
||
Modified Z = 0.6745 * (x - median) / MAD
|
||
|
||
优点:用中位数替代均值,比Z-score更稳健,不受极端值影响
|
||
通俗理解:Z-score用"平均值"做参考,容易被极端值带偏;
|
||
MAD用"中位数"做参考,即使有极端值也不受影响。
|
||
"""
|
||
median = series.median()
|
||
mad = (series - median).abs().median()
|
||
if mad == 0:
|
||
mad = 1e-10 # 避免除零
|
||
modified_z = 0.6745 * (series - median) / mad
|
||
outlier_mask = modified_z.abs() > threshold
|
||
return outlier_mask, modified_z
|
||
|
||
|
||
def winsorize(series, lower_pct=0.01, upper_pct=0.99):
|
||
"""Winsorize (缩尾处理): 将超出分位数的值截断到分位数边界
|
||
|
||
不同于删除异常值,Winsorize保留了所有数据点,只是限制了极端值。
|
||
比如设 lower_pct=0.01, upper_pct=0.99:
|
||
- 低于1%分位数的值 -> 拉回到1%分位数
|
||
- 高于99%分位数的值 -> 拉回到99%分位数
|
||
"""
|
||
lower = series.quantile(lower_pct)
|
||
upper = series.quantile(upper_pct)
|
||
return series.clip(lower=lower, upper=upper)
|
||
|
||
|
||
# 使用Stock_A的收益率进行演示
|
||
ret_a = returns_panel['Stock_A'].copy()
|
||
|
||
# 人为注入一些异常值以演示效果
|
||
np.random.seed(123)
|
||
outlier_indices = np.random.choice(ret_a.index, size=5, replace=False)
|
||
ret_a_with_outliers = ret_a.copy()
|
||
for idx in outlier_indices:
|
||
ret_a_with_outliers[idx] = np.random.choice([-1, 1]) * np.random.uniform(0.15, 0.30)
|
||
|
||
# 检测异常值
|
||
zscore_mask, z_scores = detect_outliers_zscore(ret_a_with_outliers, threshold=3.0)
|
||
mad_mask, mad_scores = detect_outliers_mad(ret_a_with_outliers, threshold=3.0)
|
||
|
||
print("异常值检测结果:")
|
||
print(f"Z-score方法 (|Z|>3): 检测到 {zscore_mask.sum()} 个异常值")
|
||
print(f"MAD方法 (|Modified Z|>3): 检测到 {mad_mask.sum()} 个异常值")
|
||
|
||
# Winsorize处理
|
||
ret_winsorized = winsorize(ret_a_with_outliers, lower_pct=0.01, upper_pct=0.99)
|
||
|
||
print(f"\nWinsorize前 - 范围: [{ret_a_with_outliers.min():.4f}, {ret_a_with_outliers.max():.4f}]")
|
||
print(f"Winsorize后 - 范围: [{ret_winsorized.min():.4f}, {ret_winsorized.max():.4f}]")
|
||
|
||
|
||
# 可视化异常值检测
|
||
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
|
||
|
||
# 1. 收益率时序与异常值标记
|
||
ax = axes[0, 0]
|
||
ax.plot(ret_a_with_outliers, color='blue', linewidth=0.8, alpha=0.7)
|
||
outliers = ret_a_with_outliers[zscore_mask]
|
||
ax.scatter(outliers.index, outliers.values, color='red', s=40, zorder=5, label=f'异常值 ({len(outliers)}个)')
|
||
ax.set_title('Z-score异常值检测 (|Z|>3)', fontsize=13)
|
||
ax.legend()
|
||
ax.grid(alpha=0.3)
|
||
|
||
# 2. MAD方法
|
||
ax = axes[0, 1]
|
||
ax.plot(ret_a_with_outliers, color='blue', linewidth=0.8, alpha=0.7)
|
||
outliers_mad = ret_a_with_outliers[mad_mask]
|
||
ax.scatter(outliers_mad.index, outliers_mad.values, color='orange', s=40, zorder=5, label=f'异常值 ({len(outliers_mad)}个)')
|
||
ax.set_title('MAD异常值检测 (|Modified Z|>3)', fontsize=13)
|
||
ax.legend()
|
||
ax.grid(alpha=0.3)
|
||
|
||
# 3. Winsorize前后分布对比
|
||
ax = axes[1, 0]
|
||
ax.hist(ret_a_with_outliers, bins=60, alpha=0.5, label='原始', color='blue', density=True)
|
||
ax.hist(ret_winsorized, bins=60, alpha=0.5, label='Winsorize后', color='green', density=True)
|
||
ax.set_title('Winsorize前后收益率分布', fontsize=13)
|
||
ax.legend()
|
||
ax.grid(alpha=0.3)
|
||
|
||
# 4. Q-Q图
|
||
ax = axes[1, 1]
|
||
sorted_data = np.sort(ret_a_with_outliers.dropna())
|
||
n = len(sorted_data)
|
||
theoretical = stats.norm.ppf(np.arange(1, n + 1) / (n + 1))
|
||
ax.scatter(theoretical, sorted_data, alpha=0.5, s=10)
|
||
ax.plot([theoretical.min(), theoretical.max()],
|
||
[theoretical.min() * ret_a_with_outliers.std() + ret_a_with_outliers.mean(),
|
||
theoretical.max() * ret_a_with_outliers.std() + ret_a_with_outliers.mean()],
|
||
'r--', linewidth=1, label='正态分布参考线')
|
||
ax.set_xlabel('理论分位数')
|
||
ax.set_ylabel('样本分位数')
|
||
ax.set_title('Q-Q图 (检验正态性) ', fontsize=13)
|
||
ax.legend()
|
||
ax.grid(alpha=0.3)
|
||
|
||
plt.tight_layout()
|
||
plt.show()
|
||
|
||
print("\n方法选择建议:")
|
||
print("- Z-score: 简单快速,但受极端值影响 (不够稳健) ")
|
||
print("- MAD: 更稳健,推荐用于金融数据")
|
||
print("- Winsorize: 保留数据量,适合构建因子时使用")
|
||
print("- 实际中常用: Winsorize(1%, 99%) + MAD检测")
|
||
|
||
|
||
# =============================================================================
|
||
# TOPIC 4b: Circuit Breakers & Price Limit Flags (涨跌停标记)
|
||
# -----------------------------------------------------------------------------
|
||
# In Chinese A-shares, stocks have a ±10% daily price limit (±5% for ST stocks).
|
||
# When a stock hits the limit, it is NOT a data error — it is real market data.
|
||
# However, it signals a critical data quality issue for downstream use:
|
||
#
|
||
# - The price IS the true closing price
|
||
# - But the stock may have been UNTRADEABLE for most of the day (zero liquidity)
|
||
# - A return of exactly +10% or -10% means you likely CANNOT execute at that price
|
||
#
|
||
# What to do in data prep (we flag, not remove):
|
||
# - Add a boolean column: `is_limit_up`, `is_limit_down`
|
||
# - Downstream strategy code can then decide to skip, weight-down, or
|
||
# exclude limit-hit days from signal calculations
|
||
#
|
||
# ⚠️ Survivorship Bias Note (out of scope for data prep, but important to know):
|
||
# This demo only generates stocks that "survive" the full period. In reality,
|
||
# stocks get delisted (bankruptcy, M&A, regulatory removal). If you only
|
||
# include currently-alive stocks in your historical dataset, you are
|
||
# implicitly selecting winners — your backtest returns will be inflated.
|
||
# Fix: source historical data that includes ALL stocks, including delisted ones.
|
||
# This is a DATA SOURCING problem, handled before data ever enters this pipeline.
|
||
# =============================================================================
|
||
|
||
def flag_price_limits(prices, limit_pct=0.10):
|
||
"""Flag days where a stock hit the daily price limit (涨跌停).
|
||
|
||
Returns a DataFrame of the same shape with values:
|
||
+1 = limit up (涨停)
|
||
-1 = limit down (跌停)
|
||
0 = normal day
|
||
|
||
A small tolerance (0.5%) is used because real closing prices may be
|
||
rounded and not land exactly on the theoretical limit price.
|
||
"""
|
||
returns = prices.pct_change()
|
||
tolerance = 0.005 # 0.5% buffer for rounding
|
||
|
||
limit_up = (returns >= limit_pct - tolerance).astype(int)
|
||
limit_down = (returns <= -limit_pct + tolerance).astype(int) * -1
|
||
flags = limit_up + limit_down
|
||
return flags
|
||
|
||
|
||
# Inject realistic limit-hit events into the panel for demonstration
|
||
np.random.seed(99)
|
||
demo_prices = clean_prices.copy()
|
||
for col in demo_prices.columns:
|
||
# Randomly inject 3 limit-up and 3 limit-down days per stock
|
||
up_days = np.random.choice(range(1, len(demo_prices) - 1), size=3, replace=False)
|
||
down_days = np.random.choice(range(1, len(demo_prices) - 1), size=3, replace=False)
|
||
for d in up_days:
|
||
demo_prices.iloc[d][col] = demo_prices.iloc[d - 1][col] * 1.10 # exactly +10%
|
||
for d in down_days:
|
||
demo_prices.iloc[d][col] = demo_prices.iloc[d - 1][col] * 0.90 # exactly -10%
|
||
|
||
limit_flags = flag_price_limits(demo_prices, limit_pct=0.10)
|
||
|
||
# Summary
|
||
total_limit_up = (limit_flags == 1).sum().sum()
|
||
total_limit_down = (limit_flags == -1).sum().sum()
|
||
print(f"\n涨跌停统计:")
|
||
print(f" 涨停天数 (limit up): {total_limit_up}")
|
||
print(f" 跌停天数 (limit down): {total_limit_down}")
|
||
print(f"\n各标的涨停次数:")
|
||
print((limit_flags == 1).sum())
|
||
print(f"\n各标的跌停次数:")
|
||
print((limit_flags == -1).sum())
|
||
|
||
# Visualize limit flags on Stock_A
|
||
fig, axes = plt.subplots(2, 1, figsize=(14, 8), sharex=True)
|
||
|
||
ax = axes[0]
|
||
ax.plot(demo_prices['Stock_A'], color='steelblue', linewidth=1, label='Stock_A Price')
|
||
limit_up_dates = limit_flags.index[limit_flags['Stock_A'] == 1]
|
||
limit_down_dates = limit_flags.index[limit_flags['Stock_A'] == -1]
|
||
ax.scatter(limit_up_dates, demo_prices.loc[limit_up_dates, 'Stock_A'],
|
||
color='red', s=60, zorder=5, label='涨停 (limit up)', marker='^')
|
||
ax.scatter(limit_down_dates, demo_prices.loc[limit_down_dates, 'Stock_A'],
|
||
color='green', s=60, zorder=5, label='跌停 (limit down)', marker='v')
|
||
ax.set_title('价格序列与涨跌停标记 (Stock_A)', fontsize=13)
|
||
ax.legend()
|
||
ax.grid(alpha=0.3)
|
||
|
||
ax = axes[1]
|
||
ax.bar(limit_flags.index, limit_flags['Stock_A'],
|
||
color=limit_flags['Stock_A'].map({1: 'red', -1: 'green', 0: 'lightgray'}),
|
||
width=1)
|
||
ax.set_title('涨跌停标志 (+1=涨停, -1=跌停, 0=正常)', fontsize=13)
|
||
ax.set_ylim(-1.5, 1.5)
|
||
ax.set_yticks([-1, 0, 1])
|
||
ax.grid(alpha=0.3)
|
||
|
||
plt.tight_layout()
|
||
plt.show()
|
||
|
||
print("\n实践建议:")
|
||
print("- 涨跌停日: 保留价格数据,但添加 is_limit_up / is_limit_down 标记列")
|
||
print("- 策略层: 遇到涨跌停标记时跳过信号,因无法成交")
|
||
print("- 统计分析: 计算波动率/相关性时可选择排除涨跌停日,避免失真")
|
||
|
||
|
||
# =============================================================================
|
||
# TOPIC 5: Trading Calendar & Cross-market Alignment (交易日历与跨市场对齐)
|
||
# -----------------------------------------------------------------------------
|
||
# Different markets trade on different days (national holidays, weekends).
|
||
# When combining data from multiple markets, you must decide how to align
|
||
# the time axis — the wrong choice distorts correlation calculations.
|
||
#
|
||
# Two alignment strategies:
|
||
# - Intersection (取交集): keep only dates both markets are open.
|
||
# Best for: correlation, regression, signal comparison.
|
||
# Drawback: loses data on single-market trading days.
|
||
#
|
||
# - Forward fill (向前填充): fill non-trading days with last known price.
|
||
# Best for: portfolio construction, continuous NAV calculation.
|
||
# Drawback: introduces artificial zero-return days — inflates Sharpe,
|
||
# understates volatility. Must be accounted for in analysis.
|
||
#
|
||
# Note: this demo uses simplified hardcoded holiday lists. Production systems
|
||
# should use exchange_calendars or pandas_market_calendars libraries.
|
||
# =============================================================================
|
||
|
||
def create_trading_calendars():
|
||
"""创建不同市场的交易日历示例
|
||
|
||
不同市场有不同的假日和交易时间,跨市场策略必须正确处理。
|
||
"""
|
||
# 生成2024年的工作日
|
||
all_bdays = pd.bdate_range(start='2024-01-01', end='2024-12-31')
|
||
|
||
# 美国市场假日 (简化版)
|
||
us_holidays = pd.to_datetime([
|
||
'2024-01-01', # 新年
|
||
'2024-01-15', # MLK Day
|
||
'2024-02-19', # Presidents Day
|
||
'2024-03-29', # Good Friday
|
||
'2024-05-27', # Memorial Day
|
||
'2024-06-19', # Juneteenth
|
||
'2024-07-04', # Independence Day
|
||
'2024-09-02', # Labor Day
|
||
'2024-11-28', # Thanksgiving
|
||
'2024-12-25', # Christmas
|
||
])
|
||
|
||
# 中国A股假日 (简化版)
|
||
cn_holidays = pd.to_datetime([
|
||
'2024-01-01', # 元旦
|
||
'2024-02-09', '2024-02-12', '2024-02-13', '2024-02-14',
|
||
'2024-02-15', '2024-02-16', # 春节
|
||
'2024-04-04', '2024-04-05', # 清明
|
||
'2024-05-01', '2024-05-02', '2024-05-03', # 劳动节
|
||
'2024-06-10', # 端午
|
||
'2024-09-16', '2024-09-17', # 中秋
|
||
'2024-10-01', '2024-10-02', '2024-10-03', '2024-10-04',
|
||
'2024-10-07', # 国庆
|
||
])
|
||
|
||
us_trading_days = all_bdays[~all_bdays.isin(us_holidays)]
|
||
cn_trading_days = all_bdays[~all_bdays.isin(cn_holidays)]
|
||
|
||
return us_trading_days, cn_trading_days
|
||
|
||
us_calendar, cn_calendar = create_trading_calendars()
|
||
|
||
print(f"2024年美股交易日: {len(us_calendar)}天")
|
||
print(f"2024年A股交易日: {len(cn_calendar)}天")
|
||
print(f"共同交易日: {len(us_calendar.intersection(cn_calendar))}天")
|
||
print(f"仅美股交易日: {len(us_calendar.difference(cn_calendar))}天")
|
||
print(f"仅A股交易日: {len(cn_calendar.difference(us_calendar))}天")
|
||
|
||
# 模拟跨市场数据对齐
|
||
np.random.seed(42)
|
||
|
||
# 生成美股和A股数据
|
||
us_prices = pd.Series(
|
||
100 * np.exp(np.cumsum(np.random.randn(len(us_calendar)) * 0.01)),
|
||
index=us_calendar, name='US_ETF'
|
||
)
|
||
cn_prices = pd.Series(
|
||
50 * np.exp(np.cumsum(np.random.randn(len(cn_calendar)) * 0.015)),
|
||
index=cn_calendar, name='CN_ETF'
|
||
)
|
||
|
||
# 直接合并会有大量NaN
|
||
combined_raw = pd.DataFrame({'US_ETF': us_prices, 'CN_ETF': cn_prices})
|
||
print(f"直接合并后:")
|
||
print(f" 总行数: {len(combined_raw)}")
|
||
print(f" US_ETF缺失: {combined_raw['US_ETF'].isna().sum()}")
|
||
print(f" CN_ETF缺失: {combined_raw['CN_ETF'].isna().sum()}")
|
||
|
||
# 方法1: 取交集 (只保留双方都交易的日期)
|
||
common_dates = us_calendar.intersection(cn_calendar)
|
||
combined_inner = combined_raw.loc[common_dates].dropna()
|
||
|
||
# 方法2: 向前填充 (用最近的收盘价填充)
|
||
combined_ffill = combined_raw.ffill().dropna()
|
||
|
||
print(f"\n方法1 (取交集): {len(combined_inner)}行")
|
||
print(f"方法2 (ffill): {len(combined_ffill)}行")
|
||
|
||
# 可视化
|
||
fig, axes = plt.subplots(1, 2, figsize=(14, 5))
|
||
|
||
ax = axes[0]
|
||
ax.plot(combined_inner.index, combined_inner['US_ETF'] / combined_inner['US_ETF'].iloc[0],
|
||
label='US_ETF', linewidth=1.2)
|
||
ax.plot(combined_inner.index, combined_inner['CN_ETF'] / combined_inner['CN_ETF'].iloc[0],
|
||
label='CN_ETF', linewidth=1.2)
|
||
ax.set_title('方法1: 取交集日期', fontsize=13)
|
||
ax.legend()
|
||
ax.grid(alpha=0.3)
|
||
|
||
ax = axes[1]
|
||
ax.plot(combined_ffill.index, combined_ffill['US_ETF'] / combined_ffill['US_ETF'].iloc[0],
|
||
label='US_ETF', linewidth=1.2)
|
||
ax.plot(combined_ffill.index, combined_ffill['CN_ETF'] / combined_ffill['CN_ETF'].iloc[0],
|
||
label='CN_ETF', linewidth=1.2)
|
||
ax.set_title('方法2: 向前填充', fontsize=13)
|
||
ax.legend()
|
||
ax.grid(alpha=0.3)
|
||
|
||
plt.suptitle('跨市场数据对齐方法对比', fontsize=14, y=1.02)
|
||
plt.tight_layout()
|
||
plt.show()
|
||
|
||
print("\n实践建议:")
|
||
print("- 计算相关性、回归: 使用交集日期,确保数据同步")
|
||
print("- 构建组合/计算市值: 使用ffill,避免数据断裂")
|
||
print("- 重要: ffill会引入假的'零收益日',计算波动率时需注意")
|
||
|
||
|
||
# =============================================================================
|
||
# TOPIC 6: End-to-End DataPipeline Class (完整数据清洗管道)
|
||
# -----------------------------------------------------------------------------
|
||
# Combines all prior topics into a reusable, sequential pipeline:
|
||
# Step 1 — Filter stocks with too many missing values
|
||
# Step 2 — Fill remaining missing values (ffill only — no bfill/look-ahead)
|
||
# Step 3 — Compute return series
|
||
# Step 4 — Detect outliers (MAD) and apply Winsorize
|
||
# Step 5 — Generate a data quality report per stock
|
||
#
|
||
# This pattern (class-based pipeline with a .run() method) mirrors how
|
||
# production quant systems structure data ingestion — each step is auditable,
|
||
# configurable, and produces diagnostic output.
|
||
# =============================================================================
|
||
|
||
class DataPipeline:
|
||
"""简单的数据清洗管道
|
||
|
||
实现了从原始数据到干净数据面板的完整流程:
|
||
1. 缺失值处理
|
||
2. 异常值处理
|
||
3. 收益率计算
|
||
4. 数据质量报告
|
||
"""
|
||
|
||
def __init__(self, prices, max_missing_pct=0.1, winsorize_pct=(0.01, 0.99)):
|
||
self.raw_prices = prices.copy()
|
||
self.max_missing_pct = max_missing_pct
|
||
self.winsorize_pct = winsorize_pct
|
||
self.report = {}
|
||
|
||
def run(self):
|
||
"""运行完整的数据清洗管道"""
|
||
print("=" * 60)
|
||
print("数据清洗管道运行中...")
|
||
print("=" * 60)
|
||
|
||
# Step 1: 过滤缺失值过多的标的
|
||
missing_pct = self.raw_prices.isna().mean()
|
||
valid_cols = missing_pct[missing_pct <= self.max_missing_pct].index.tolist()
|
||
dropped_cols = missing_pct[missing_pct > self.max_missing_pct].index.tolist()
|
||
|
||
print(f"\n[Step 1] 过滤缺失值 > {self.max_missing_pct:.0%} 的标的")
|
||
print(f" 保留: {len(valid_cols)}个标的, 删除: {len(dropped_cols)}个标的")
|
||
if dropped_cols:
|
||
print(f" 被删除的标的: {dropped_cols}")
|
||
|
||
prices = self.raw_prices[valid_cols].copy()
|
||
|
||
# Step 2: 填充缺失值
|
||
# ⚠️ Only ffill (backward-looking). bfill would use future prices to
|
||
# fill early NaNs — that is look-ahead bias / data leakage.
|
||
# Any rows still NaN after ffill are dropped (typically only the
|
||
# very first row if the series starts with a missing value).
|
||
print(f"\n[Step 2] 填充缺失值 (ffill only, no bfill)")
|
||
n_missing_before = prices.isna().sum().sum()
|
||
prices = prices.ffill().dropna(how='any')
|
||
n_missing_after = prices.isna().sum().sum()
|
||
print(f" 填充前: {n_missing_before}个NaN -> 填充后: {n_missing_after}个NaN")
|
||
|
||
# Step 3: 计算收益率
|
||
print(f"\n[Step 3] 计算收益率")
|
||
returns = prices.pct_change().iloc[1:] # 删除第一行NaN
|
||
print(f" 收益率面板: {returns.shape[0]}天 x {returns.shape[1]}标的")
|
||
|
||
# Step 4: 检测和处理异常值
|
||
print(f"\n[Step 4] 异常值检测与Winsorize")
|
||
n_outliers_total = 0
|
||
returns_clean = returns.copy()
|
||
for col in returns_clean.columns:
|
||
mask, _ = detect_outliers_mad(returns_clean[col], threshold=5.0)
|
||
n_outliers = mask.sum()
|
||
n_outliers_total += n_outliers
|
||
# Winsorize
|
||
returns_clean[col] = winsorize(returns_clean[col],
|
||
self.winsorize_pct[0], self.winsorize_pct[1])
|
||
|
||
print(f" 检测到 {n_outliers_total} 个异常值 (MAD, |Z|>5)")
|
||
print(f" 已进行Winsorize处理 ({self.winsorize_pct[0]:.0%}, {self.winsorize_pct[1]:.0%})")
|
||
|
||
# Step 5: 数据质量报告
|
||
print(f"\n[Step 5] 数据质量报告")
|
||
quality = pd.DataFrame({
|
||
'标的': valid_cols,
|
||
'日均收益率(bp)': (returns_clean.mean() * 10000).round(2),
|
||
'日波动率(%)': (returns_clean.std() * 100).round(3),
|
||
'年化收益率(%)': (returns_clean.mean() * 252 * 100).round(2),
|
||
'年化波动率(%)': (returns_clean.std() * np.sqrt(252) * 100).round(2),
|
||
'偏度': returns_clean.skew().round(3),
|
||
'峰度': returns_clean.kurtosis().round(3),
|
||
}).set_index('标的')
|
||
print(quality)
|
||
|
||
self.clean_prices = prices
|
||
self.clean_returns = returns_clean
|
||
self.quality_report = quality
|
||
|
||
print(f"\n管道运行完成!")
|
||
return returns_clean
|
||
|
||
# 运行管道
|
||
pipeline = DataPipeline(price_panel, max_missing_pct=0.15)
|
||
clean_returns = pipeline.run() |