trading/quant_data_pipeline_demo.py

920 lines
37 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# =============================================================================
# Quantitative Trading — Data Pipeline Demo
# =============================================================================
# Topics covered:
# 1. Price Adjustment for Corporate Actions (splits & dividends)
# 2. Simple Returns vs Log Returns
# 3. Multi-stock Panel & Missing Value Handling
# 4. Outlier Detection & Treatment (Z-score, MAD, Winsorize)
# 4b. Circuit Breakers & Price Limit Flags (涨跌停) [A-shares]
# 5. Trading Calendar & Cross-market Alignment
# 6. End-to-End DataPipeline Class
# =============================================================================
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy import stats
import warnings
warnings.filterwarnings('ignore')
# 使用我们在 Docker 中配置的字体
plt.rcParams['font.sans-serif'] = ['WenQuanYi Zen Hei', 'Arial Unicode MS', 'SimHei', 'DejaVu Sans']
plt.rcParams['axes.unicode_minus'] = False
plt.rcParams['figure.figsize'] = (12, 6)
plt.rcParams['figure.dpi'] = 100
np.random.seed(42)
print("环境准备完成!")
# =============================================================================
# TOPIC 1: Price Adjustment for Corporate Actions (价格复权)
# -----------------------------------------------------------------------------
# Raw stock prices have artificial jumps caused by stock splits and cash
# dividends. These must be adjusted before computing returns or signals,
# otherwise your algorithm will see fake crashes/spikes.
#
# Key concepts:
# - Unadjusted price (未复权): raw market price, has visible jumps
# - Forward-adjusted (前复权): history scaled to today's price level
# - Backward-adjusted (后复权): history kept real, recent prices scaled up
# - Adjustment factor (复权因子): multiplier that accounts for all events
# =============================================================================
def generate_raw_stock_data(n_days=1000, seed=42):
"""生成包含拆股和分红事件的原始股票数据"""
np.random.seed(seed)
dates = pd.bdate_range(start='2020-01-02', periods=n_days)
# 生成"真实"价格序列 (几何布朗运动)
mu, sigma = 0.12, 0.25
dt = 1 / 252
log_ret = (mu - 0.5 * sigma**2) * dt + sigma * np.sqrt(dt) * np.random.randn(n_days)
true_prices = 100 * np.exp(np.cumsum(log_ret))
# 模拟成交量
volume = np.random.lognormal(mean=15, sigma=0.5, size=n_days).astype(int)
# 构建DataFrame
df = pd.DataFrame({
'date': dates,
'close': true_prices,
'volume': volume
}).set_index('date')
# 添加开高低价
daily_range = df['close'] * np.abs(np.random.randn(n_days)) * 0.02
df['open'] = df['close'] + np.random.randn(n_days) * daily_range * 0.5
df['high'] = np.maximum(df['close'], df['open']) + np.abs(np.random.randn(n_days)) * daily_range * 0.3
df['low'] = np.minimum(df['close'], df['open']) - np.abs(np.random.randn(n_days)) * daily_range * 0.3
# 定义公司行为事件
events = []
# 1:2拆股, 在第300个交易日
split_idx = 300
split_date = dates[split_idx]
events.append({'date': split_date, 'type': 'split', 'ratio': 2})
# 现金分红, 每季度一次
for q in [60, 185, 310, 435, 560, 685, 810, 935]:
if q < n_days:
events.append({'date': dates[q], 'type': 'dividend', 'amount': 0.5})
# 应用公司行为到"未复权"价格
unadj_close = df['close'].copy()
adj_factors = pd.Series(1.0, index=dates)
for event in sorted(events, key=lambda x: x['date']):
idx = dates.get_loc(event['date'])
if event['type'] == 'split':
# 拆股
unadj_close.iloc[idx:] = unadj_close.iloc[idx:] / event['ratio']
adj_factors.iloc[idx:] = adj_factors.iloc[idx:] * event['ratio']
elif event['type'] == 'dividend':
# 分红
price_before = unadj_close.iloc[idx - 1] if idx > 0 else unadj_close.iloc[idx]
div_ratio = 1 - event['amount'] / price_before
unadj_close.iloc[idx:] = unadj_close.iloc[idx:] * div_ratio
adj_factors.iloc[:idx] = adj_factors.iloc[:idx] * div_ratio
df['unadj_close'] = unadj_close
df['adj_factor'] = adj_factors
events_df = pd.DataFrame(events)
return df, events_df
def forward_adjust(unadj_prices, adj_factor):
"""前复权:以最新价格为基准,向前调整历史价格"""
normalized_factor = adj_factor / adj_factor.iloc[-1]
return unadj_prices * normalized_factor
def backward_adjust(unadj_prices, adj_factor):
"""后复权:以最早价格为基准,向后调整"""
normalized_factor = adj_factor / adj_factor.iloc[0]
return unadj_prices * normalized_factor
# 生成数据
stock_data, events = generate_raw_stock_data()
print("公司行为事件:")
print(events.to_string(index=False))
print(f"\n数据形状: {stock_data.shape}")
# 计算复权价格
stock_data['fwd_adj_close'] = forward_adjust(stock_data['unadj_close'], stock_data['adj_factor'])
stock_data['bwd_adj_close'] = backward_adjust(stock_data['unadj_close'], stock_data['adj_factor'])
# 可视化对比
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
# 1. 未复权价格
ax = axes[0, 0]
ax.plot(stock_data['unadj_close'], color='blue', linewidth=1)
ax.set_title('未复权价格 (可见拆股/分红跳跃)', fontsize=13)
ax.grid(alpha=0.3)
for _, event in events.iterrows():
ax.axvline(x=event['date'], color='red', linestyle='--', alpha=0.5)
# 2. 前复权价格
ax = axes[0, 1]
ax.plot(stock_data['fwd_adj_close'], color='green', linewidth=1)
ax.set_title('前复权价格 (当前价格真实)', fontsize=13)
ax.grid(alpha=0.3)
# 3. 后复权价格
ax = axes[1, 0]
ax.plot(stock_data['bwd_adj_close'], color='orange', linewidth=1)
ax.set_title('后复权价格 (历史价格真实)', fontsize=13)
ax.grid(alpha=0.3)
# 4. "真实"价格对比
ax = axes[1, 1]
ax.plot(stock_data['close'], label='真实连续价格', color='black', linewidth=1.5)
ax.plot(stock_data['fwd_adj_close'], label='前复权价格', color='green', linewidth=1, alpha=0.7)
ax.set_title('真实价格 vs 前复权价格', fontsize=13)
ax.legend()
ax.grid(alpha=0.3)
plt.suptitle('复权价格对比', fontsize=15, y=1.02)
plt.tight_layout()
plt.show()
# 验证收益率
print("\n收益率验证 (拆股日前后):")
split_date = events[events['type'] == 'split']['date'].iloc[0]
split_idx = stock_data.index.get_loc(split_date)
print(f"拆股日期: {split_date.strftime('%Y-%m-%d')}")
print(f"未复权收益率: {stock_data['unadj_close'].iloc[split_idx] / stock_data['unadj_close'].iloc[split_idx-1] - 1:.4f}")
print(f"前复权收益率: {stock_data['fwd_adj_close'].iloc[split_idx] / stock_data['fwd_adj_close'].iloc[split_idx-1] - 1:.4f}")
print(f"真实收益率: {stock_data['close'].iloc[split_idx] / stock_data['close'].iloc[split_idx-1] - 1:.4f}")
# =============================================================================
# TOPIC 2: Simple Returns vs Log Returns (简单收益率 vs 对数收益率)
# -----------------------------------------------------------------------------
# Two ways to measure how much a price changed:
# - Simple return: r = (P_t / P_{t-1}) - 1
# - Log return: r = ln(P_t / P_{t-1})
#
# Why log returns are preferred in quant research:
# - Time-additive: sum daily log returns to get total log return
# - Better statistical properties (closer to normal distribution)
# - Numerically stable for long compounding periods
#
# At daily frequency the difference is tiny (~1bp), but it grows at
# monthly/annual frequency — never mix the two in the same calculation.
# =============================================================================
# 使用前复权价格计算收益率
prices = stock_data['fwd_adj_close']
# 简单收益率
simple_returns = prices.pct_change().dropna()
# 对数收益率
log_returns = np.log(prices / prices.shift(1)).dropna()
# 对比
comparison = pd.DataFrame({
'简单收益率': simple_returns,
'对数收益率': log_returns,
'差异': simple_returns - log_returns,
'差异(bp)': (simple_returns - log_returns) * 10000 # 基点
})
print("简单收益率 vs 对数收益率统计:")
print(f"平均绝对差异: {comparison['差异'].abs().mean():.6f} ({comparison['差异(bp)'].abs().mean():.6f})")
print(f"最大差异: {comparison['差异'].abs().max():.6f} ({comparison['差异(bp)'].abs().max():.6f})")
print(f"\n日频数据下两者差异很小,但月频或年频时差异会显著增大。")
# 验证时间可加性
print("\n=== 时间可加性验证 ===")
# 对数收益率:时间上直接求和
total_log_return = log_returns.sum()
actual_total = np.log(prices.iloc[-1] / prices.iloc[0])
print(f"对数收益率求和: {total_log_return:.6f}")
print(f"实际总对数收益: {actual_total:.6f}")
print(f"差异: {abs(total_log_return - actual_total):.10f} (几乎为零)")
print("\n简单收益率需要连乘:")
total_simple_product = (1 + simple_returns).prod() - 1
actual_simple = prices.iloc[-1] / prices.iloc[0] - 1
print(f"简单收益率连乘: {total_simple_product:.6f}")
print(f"实际总简单收益: {actual_simple:.6f}")
print(f"差异: {abs(total_simple_product - actual_simple):.10f}")
# 可视化
fig, axes = plt.subplots(1, 2, figsize=(14, 5))
ax = axes[0]
ax.scatter(simple_returns, log_returns, alpha=0.3, s=5)
ax.plot([-0.1, 0.1], [-0.1, 0.1], 'r--', linewidth=1, label='y=x')
ax.set_xlabel('简单收益率')
ax.set_ylabel('对数收益率')
ax.set_title('简单收益率 vs 对数收益率 (日频) ', fontsize=13)
ax.legend()
ax.grid(alpha=0.3)
ax = axes[1]
# 月频对比
monthly_simple = prices.resample('ME').last().pct_change().dropna()
monthly_log = np.log(prices.resample('ME').last() / prices.resample('ME').last().shift(1)).dropna()
ax.scatter(monthly_simple, monthly_log, alpha=0.5, s=20)
ax.plot([-0.2, 0.2], [-0.2, 0.2], 'r--', linewidth=1, label='y=x')
ax.set_xlabel('简单收益率')
ax.set_ylabel('对数收益率')
ax.set_title('简单收益率 vs 对数收益率 (月频, 差异更明显) ', fontsize=13)
ax.legend()
ax.grid(alpha=0.3)
plt.tight_layout()
plt.show()
# =============================================================================
# TOPIC 3: Multi-stock Panel & Missing Value Handling (多标的面板与缺失值处理)
# -----------------------------------------------------------------------------
# Real market data panels have two main sources of missing values:
# - Suspensions (停牌): a stock halts trading for days/weeks
# - Data feed gaps: vendor errors, holidays, late reporting
#
# Strategies (choice depends on context):
# - ffill (forward fill): use last known price — standard for suspensions
# - Linear interpolation: smooth gaps — OK for short random gaps
# - ffill with limit: ffill but cap at N days — conservative choice
# - Drop: remove rows/columns — loses cross-sectional data
#
# Rule of thumb: if missing% > 10%, consider excluding the stock entirely.
# =============================================================================
def generate_multi_stock_panel(n_stocks=5, n_days=500, seed=42):
"""生成多标的日线数据面板,包含停牌和缺失值"""
np.random.seed(seed)
dates = pd.bdate_range(start='2022-01-03', periods=n_days)
stock_names = [f'Stock_{chr(65+i)}' for i in range(n_stocks)] # Stock_A, Stock_B, ...
all_data = {}
for i, name in enumerate(stock_names):
mu = np.random.uniform(0.05, 0.20)
sigma = np.random.uniform(0.15, 0.35)
S0 = np.random.uniform(20, 200)
dt = 1 / 252
log_ret = (mu - 0.5 * sigma**2) * dt + sigma * np.sqrt(dt) * np.random.randn(n_days)
prices = S0 * np.exp(np.cumsum(log_ret))
volume = np.random.lognormal(mean=14, sigma=0.8, size=n_days).astype(int)
# 模拟停牌 (随机选择一些连续日期设为NaN)
suspension_starts = np.random.choice(range(50, n_days - 30), size=2 + i % 3, replace=False)
for start in suspension_starts:
duration = np.random.randint(1, 10)
end = min(start + duration, n_days)
prices[start:end] = np.nan
volume[start:end] = 0
# 模拟随机缺失
random_missing = np.random.choice(range(n_days), size=5, replace=False)
prices[random_missing] = np.nan
all_data[name] = pd.DataFrame({
'close': prices,
'volume': volume
}, index=dates)
# 构建面板
price_panel = pd.DataFrame({name: data['close'] for name, data in all_data.items()})
volume_panel = pd.DataFrame({name: data['volume'] for name, data in all_data.items()})
return price_panel, volume_panel
# 生成多标的面板
price_panel, volume_panel = generate_multi_stock_panel(n_stocks=5, n_days=500)
# 缺失值统计
missing_stats = pd.DataFrame({
'总天数': len(price_panel),
'缺失天数': price_panel.isna().sum(),
'缺失比例': price_panel.isna().mean(),
'最长连续缺失': [price_panel[col].isna().astype(int).groupby(
price_panel[col].notna().astype(int).cumsum()).sum().max()
for col in price_panel.columns]
})
print("缺失值统计:")
print(missing_stats)
print(f"\n面板形状: {price_panel.shape}")
# 缺失值处理方法对比
def handle_missing_values(prices, method='ffill'):
"""处理缺失值的不同方法"""
if method == 'ffill':
# ffill = forward fill (前向填充): 用前一个有效值填补缺失值
# 最常用于停牌场景: 假设停牌期间价格保持不变
return prices.ffill()
elif method == 'linear':
# 线性插值
return prices.interpolate(method='linear')
elif method == 'drop':
# 直接删除
return prices.dropna()
elif method == 'ffill_limit':
# 有限制的向前填充 (最多填充5天)
return prices.ffill(limit=5)
else:
raise ValueError(f"Unknown method: {method}")
# 对Stock_A展示不同方法的效果
stock_a = price_panel['Stock_A'].copy()
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
methods = {
'原始数据 (含缺失值) ': stock_a,
'向前填充 (ffill)': handle_missing_values(stock_a, 'ffill'),
'线性插值 (interpolate)': handle_missing_values(stock_a, 'linear'),
'有限填充 (ffill, limit=5)': handle_missing_values(stock_a, 'ffill_limit')
}
for ax, (title, data) in zip(axes.flat, methods.items()):
ax.plot(data, linewidth=1)
# 标记原始缺失位置
missing_mask = stock_a.isna()
if not missing_mask.all() and title != '原始数据 (含缺失值) ':
filled_values = data[missing_mask].dropna()
if len(filled_values) > 0:
ax.scatter(filled_values.index, filled_values.values,
color='red', s=15, zorder=5, label='填充值')
ax.legend(fontsize=9)
ax.set_title(title, fontsize=12)
ax.grid(alpha=0.3)
plt.suptitle('缺失值处理方法对比 (Stock_A)', fontsize=14, y=1.02)
plt.tight_layout()
plt.show()
print("\n实践建议:")
print("- 停牌: 使用ffill (假设停牌期间价格不变) ")
print("- 短期缺失(<3天): 可用线性插值")
print("- 长期缺失(>10天): 考虑从分析中排除该标的")
print("- 计算收益率前: 先填充缺失值, 再计算")
# 构建干净的多标的数据面板
# Steps: ffill → drop stocks/rows still NaN → compute returns
#
# ⚠️ Look-Ahead Bias Warning: DO NOT use bfill() here.
# bfill() fills early NaN rows by looking at future prices, which means
# your "day 1" price would be sourced from a future observation.
# This is data leakage — it would inflate backtest performance.
# Safe rule: only ffill (past → present), then drop what remains NaN.
clean_prices = price_panel.ffill() # only look backward
clean_prices = clean_prices.dropna(how='any') # drop rows where any stock still NaN
print(f"清洗前缺失值: {price_panel.isna().sum().sum()}")
print(f"清洗后缺失值: {clean_prices.isna().sum().sum()}")
print(f"(dropped {len(price_panel) - len(clean_prices)} leading rows to avoid look-ahead bias)")
# 计算收益率面板
returns_panel = clean_prices.pct_change().dropna()
# 可视化归一化价格
fig, ax = plt.subplots(figsize=(14, 6))
normalized = clean_prices / clean_prices.iloc[0] * 100
for col in normalized.columns:
ax.plot(normalized[col], label=col, linewidth=1.2)
ax.set_title('多标的归一化价格走势 (基期=100) ', fontsize=14)
ax.legend()
ax.grid(alpha=0.3)
plt.tight_layout()
plt.show()
# 相关性矩阵
corr_matrix = returns_panel.corr()
fig, ax = plt.subplots(figsize=(8, 6))
im = ax.imshow(corr_matrix.values, cmap='RdBu_r', vmin=-1, vmax=1)
ax.set_xticks(range(len(corr_matrix)))
ax.set_xticklabels(corr_matrix.columns, rotation=45)
ax.set_yticks(range(len(corr_matrix)))
ax.set_yticklabels(corr_matrix.columns)
for i in range(len(corr_matrix)):
for j in range(len(corr_matrix)):
ax.text(j, i, f'{corr_matrix.iloc[i,j]:.2f}', ha='center', va='center', fontsize=11)
plt.colorbar(im, ax=ax)
ax.set_title('收益率相关性矩阵', fontsize=14)
plt.tight_layout()
plt.show()
# =============================================================================
# TOPIC 4: Outlier Detection & Treatment (异常值检测与处理)
# -----------------------------------------------------------------------------
# Financial return series contain extreme values from real events (crashes,
# halts, data errors). How you handle them affects every downstream signal.
#
# Detection methods:
# - Z-score: |z| = |(x - mean) / std| > 3 — simple, but mean/std are
# themselves pulled by outliers (not robust)
# - MAD: uses median instead of mean — robust to extreme values,
# preferred for financial data
#
# Treatment methods:
# - Remove: delete the outlier row — reduces sample size
# - Winsorize: clip to [p1, p99] boundary — keeps all data, limits impact
# standard practice before factor construction
#
# Q-Q plot: visualizes how far returns deviate from a normal distribution.
# Fat tails (leptokurtosis) are a universal property of financial returns.
# =============================================================================
def detect_outliers_zscore(series, threshold=3.0):
"""Z-score (标准分数) 方法检测异常值
Z = (x - mean) / std
即:这个值偏离平均值多少个标准差?
|Z| > threshold (默认3) 视为异常值
通俗理解如果大家平均考70分标准差10分你考了100分
Z = (100-70)/10 = 3说明你偏离平均值3个标准差属于异常值。
缺点:均值和标准差本身受异常值影响
"""
z_scores = (series - series.mean()) / series.std()
outlier_mask = z_scores.abs() > threshold
return outlier_mask, z_scores
def detect_outliers_mad(series, threshold=3.0):
"""MAD (中位数绝对偏差, Median Absolute Deviation) 方法检测异常值
MAD = median(|x - median(x)|) 即所有值与中位数的距离的中位数
Modified Z = 0.6745 * (x - median) / MAD
优点用中位数替代均值比Z-score更稳健不受极端值影响
通俗理解Z-score用"平均值"做参考,容易被极端值带偏;
MAD用"中位数"做参考,即使有极端值也不受影响。
"""
median = series.median()
mad = (series - median).abs().median()
if mad == 0:
mad = 1e-10 # 避免除零
modified_z = 0.6745 * (series - median) / mad
outlier_mask = modified_z.abs() > threshold
return outlier_mask, modified_z
def winsorize(series, lower_pct=0.01, upper_pct=0.99):
"""Winsorize (缩尾处理): 将超出分位数的值截断到分位数边界
不同于删除异常值Winsorize保留了所有数据点只是限制了极端值。
比如设 lower_pct=0.01, upper_pct=0.99:
- 低于1%分位数的值 -> 拉回到1%分位数
- 高于99%分位数的值 -> 拉回到99%分位数
"""
lower = series.quantile(lower_pct)
upper = series.quantile(upper_pct)
return series.clip(lower=lower, upper=upper)
# 使用Stock_A的收益率进行演示
ret_a = returns_panel['Stock_A'].copy()
# 人为注入一些异常值以演示效果
np.random.seed(123)
outlier_indices = np.random.choice(ret_a.index, size=5, replace=False)
ret_a_with_outliers = ret_a.copy()
for idx in outlier_indices:
ret_a_with_outliers[idx] = np.random.choice([-1, 1]) * np.random.uniform(0.15, 0.30)
# 检测异常值
zscore_mask, z_scores = detect_outliers_zscore(ret_a_with_outliers, threshold=3.0)
mad_mask, mad_scores = detect_outliers_mad(ret_a_with_outliers, threshold=3.0)
print("异常值检测结果:")
print(f"Z-score方法 (|Z|>3): 检测到 {zscore_mask.sum()} 个异常值")
print(f"MAD方法 (|Modified Z|>3): 检测到 {mad_mask.sum()} 个异常值")
# Winsorize处理
ret_winsorized = winsorize(ret_a_with_outliers, lower_pct=0.01, upper_pct=0.99)
print(f"\nWinsorize前 - 范围: [{ret_a_with_outliers.min():.4f}, {ret_a_with_outliers.max():.4f}]")
print(f"Winsorize后 - 范围: [{ret_winsorized.min():.4f}, {ret_winsorized.max():.4f}]")
# 可视化异常值检测
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
# 1. 收益率时序与异常值标记
ax = axes[0, 0]
ax.plot(ret_a_with_outliers, color='blue', linewidth=0.8, alpha=0.7)
outliers = ret_a_with_outliers[zscore_mask]
ax.scatter(outliers.index, outliers.values, color='red', s=40, zorder=5, label=f'异常值 ({len(outliers)}个)')
ax.set_title('Z-score异常值检测 (|Z|>3)', fontsize=13)
ax.legend()
ax.grid(alpha=0.3)
# 2. MAD方法
ax = axes[0, 1]
ax.plot(ret_a_with_outliers, color='blue', linewidth=0.8, alpha=0.7)
outliers_mad = ret_a_with_outliers[mad_mask]
ax.scatter(outliers_mad.index, outliers_mad.values, color='orange', s=40, zorder=5, label=f'异常值 ({len(outliers_mad)}个)')
ax.set_title('MAD异常值检测 (|Modified Z|>3)', fontsize=13)
ax.legend()
ax.grid(alpha=0.3)
# 3. Winsorize前后分布对比
ax = axes[1, 0]
ax.hist(ret_a_with_outliers, bins=60, alpha=0.5, label='原始', color='blue', density=True)
ax.hist(ret_winsorized, bins=60, alpha=0.5, label='Winsorize后', color='green', density=True)
ax.set_title('Winsorize前后收益率分布', fontsize=13)
ax.legend()
ax.grid(alpha=0.3)
# 4. Q-Q图
ax = axes[1, 1]
sorted_data = np.sort(ret_a_with_outliers.dropna())
n = len(sorted_data)
theoretical = stats.norm.ppf(np.arange(1, n + 1) / (n + 1))
ax.scatter(theoretical, sorted_data, alpha=0.5, s=10)
ax.plot([theoretical.min(), theoretical.max()],
[theoretical.min() * ret_a_with_outliers.std() + ret_a_with_outliers.mean(),
theoretical.max() * ret_a_with_outliers.std() + ret_a_with_outliers.mean()],
'r--', linewidth=1, label='正态分布参考线')
ax.set_xlabel('理论分位数')
ax.set_ylabel('样本分位数')
ax.set_title('Q-Q图 (检验正态性) ', fontsize=13)
ax.legend()
ax.grid(alpha=0.3)
plt.tight_layout()
plt.show()
print("\n方法选择建议:")
print("- Z-score: 简单快速,但受极端值影响 (不够稳健) ")
print("- MAD: 更稳健,推荐用于金融数据")
print("- Winsorize: 保留数据量,适合构建因子时使用")
print("- 实际中常用: Winsorize(1%, 99%) + MAD检测")
# =============================================================================
# TOPIC 4b: Circuit Breakers & Price Limit Flags (涨跌停标记)
# -----------------------------------------------------------------------------
# In Chinese A-shares, stocks have a ±10% daily price limit (±5% for ST stocks).
# When a stock hits the limit, it is NOT a data error — it is real market data.
# However, it signals a critical data quality issue for downstream use:
#
# - The price IS the true closing price
# - But the stock may have been UNTRADEABLE for most of the day (zero liquidity)
# - A return of exactly +10% or -10% means you likely CANNOT execute at that price
#
# What to do in data prep (we flag, not remove):
# - Add a boolean column: `is_limit_up`, `is_limit_down`
# - Downstream strategy code can then decide to skip, weight-down, or
# exclude limit-hit days from signal calculations
#
# ⚠️ Survivorship Bias Note (out of scope for data prep, but important to know):
# This demo only generates stocks that "survive" the full period. In reality,
# stocks get delisted (bankruptcy, M&A, regulatory removal). If you only
# include currently-alive stocks in your historical dataset, you are
# implicitly selecting winners — your backtest returns will be inflated.
# Fix: source historical data that includes ALL stocks, including delisted ones.
# This is a DATA SOURCING problem, handled before data ever enters this pipeline.
# =============================================================================
def flag_price_limits(prices, limit_pct=0.10):
"""Flag days where a stock hit the daily price limit (涨跌停).
Returns a DataFrame of the same shape with values:
+1 = limit up (涨停)
-1 = limit down (跌停)
0 = normal day
A small tolerance (0.5%) is used because real closing prices may be
rounded and not land exactly on the theoretical limit price.
"""
returns = prices.pct_change()
tolerance = 0.005 # 0.5% buffer for rounding
limit_up = (returns >= limit_pct - tolerance).astype(int)
limit_down = (returns <= -limit_pct + tolerance).astype(int) * -1
flags = limit_up + limit_down
return flags
# Inject realistic limit-hit events into the panel for demonstration
np.random.seed(99)
demo_prices = clean_prices.copy()
for col in demo_prices.columns:
# Randomly inject 3 limit-up and 3 limit-down days per stock
up_days = np.random.choice(range(1, len(demo_prices) - 1), size=3, replace=False)
down_days = np.random.choice(range(1, len(demo_prices) - 1), size=3, replace=False)
for d in up_days:
demo_prices.iloc[d][col] = demo_prices.iloc[d - 1][col] * 1.10 # exactly +10%
for d in down_days:
demo_prices.iloc[d][col] = demo_prices.iloc[d - 1][col] * 0.90 # exactly -10%
limit_flags = flag_price_limits(demo_prices, limit_pct=0.10)
# Summary
total_limit_up = (limit_flags == 1).sum().sum()
total_limit_down = (limit_flags == -1).sum().sum()
print(f"\n涨跌停统计:")
print(f" 涨停天数 (limit up): {total_limit_up}")
print(f" 跌停天数 (limit down): {total_limit_down}")
print(f"\n各标的涨停次数:")
print((limit_flags == 1).sum())
print(f"\n各标的跌停次数:")
print((limit_flags == -1).sum())
# Visualize limit flags on Stock_A
fig, axes = plt.subplots(2, 1, figsize=(14, 8), sharex=True)
ax = axes[0]
ax.plot(demo_prices['Stock_A'], color='steelblue', linewidth=1, label='Stock_A Price')
limit_up_dates = limit_flags.index[limit_flags['Stock_A'] == 1]
limit_down_dates = limit_flags.index[limit_flags['Stock_A'] == -1]
ax.scatter(limit_up_dates, demo_prices.loc[limit_up_dates, 'Stock_A'],
color='red', s=60, zorder=5, label='涨停 (limit up)', marker='^')
ax.scatter(limit_down_dates, demo_prices.loc[limit_down_dates, 'Stock_A'],
color='green', s=60, zorder=5, label='跌停 (limit down)', marker='v')
ax.set_title('价格序列与涨跌停标记 (Stock_A)', fontsize=13)
ax.legend()
ax.grid(alpha=0.3)
ax = axes[1]
ax.bar(limit_flags.index, limit_flags['Stock_A'],
color=limit_flags['Stock_A'].map({1: 'red', -1: 'green', 0: 'lightgray'}),
width=1)
ax.set_title('涨跌停标志 (+1=涨停, -1=跌停, 0=正常)', fontsize=13)
ax.set_ylim(-1.5, 1.5)
ax.set_yticks([-1, 0, 1])
ax.grid(alpha=0.3)
plt.tight_layout()
plt.show()
print("\n实践建议:")
print("- 涨跌停日: 保留价格数据,但添加 is_limit_up / is_limit_down 标记列")
print("- 策略层: 遇到涨跌停标记时跳过信号,因无法成交")
print("- 统计分析: 计算波动率/相关性时可选择排除涨跌停日,避免失真")
# =============================================================================
# TOPIC 5: Trading Calendar & Cross-market Alignment (交易日历与跨市场对齐)
# -----------------------------------------------------------------------------
# Different markets trade on different days (national holidays, weekends).
# When combining data from multiple markets, you must decide how to align
# the time axis — the wrong choice distorts correlation calculations.
#
# Two alignment strategies:
# - Intersection (取交集): keep only dates both markets are open.
# Best for: correlation, regression, signal comparison.
# Drawback: loses data on single-market trading days.
#
# - Forward fill (向前填充): fill non-trading days with last known price.
# Best for: portfolio construction, continuous NAV calculation.
# Drawback: introduces artificial zero-return days — inflates Sharpe,
# understates volatility. Must be accounted for in analysis.
#
# Note: this demo uses simplified hardcoded holiday lists. Production systems
# should use exchange_calendars or pandas_market_calendars libraries.
# =============================================================================
def create_trading_calendars():
"""创建不同市场的交易日历示例
不同市场有不同的假日和交易时间,跨市场策略必须正确处理。
"""
# 生成2024年的工作日
all_bdays = pd.bdate_range(start='2024-01-01', end='2024-12-31')
# 美国市场假日 (简化版)
us_holidays = pd.to_datetime([
'2024-01-01', # 新年
'2024-01-15', # MLK Day
'2024-02-19', # Presidents Day
'2024-03-29', # Good Friday
'2024-05-27', # Memorial Day
'2024-06-19', # Juneteenth
'2024-07-04', # Independence Day
'2024-09-02', # Labor Day
'2024-11-28', # Thanksgiving
'2024-12-25', # Christmas
])
# 中国A股假日 (简化版)
cn_holidays = pd.to_datetime([
'2024-01-01', # 元旦
'2024-02-09', '2024-02-12', '2024-02-13', '2024-02-14',
'2024-02-15', '2024-02-16', # 春节
'2024-04-04', '2024-04-05', # 清明
'2024-05-01', '2024-05-02', '2024-05-03', # 劳动节
'2024-06-10', # 端午
'2024-09-16', '2024-09-17', # 中秋
'2024-10-01', '2024-10-02', '2024-10-03', '2024-10-04',
'2024-10-07', # 国庆
])
us_trading_days = all_bdays[~all_bdays.isin(us_holidays)]
cn_trading_days = all_bdays[~all_bdays.isin(cn_holidays)]
return us_trading_days, cn_trading_days
us_calendar, cn_calendar = create_trading_calendars()
print(f"2024年美股交易日: {len(us_calendar)}")
print(f"2024年A股交易日: {len(cn_calendar)}")
print(f"共同交易日: {len(us_calendar.intersection(cn_calendar))}")
print(f"仅美股交易日: {len(us_calendar.difference(cn_calendar))}")
print(f"仅A股交易日: {len(cn_calendar.difference(us_calendar))}")
# 模拟跨市场数据对齐
np.random.seed(42)
# 生成美股和A股数据
us_prices = pd.Series(
100 * np.exp(np.cumsum(np.random.randn(len(us_calendar)) * 0.01)),
index=us_calendar, name='US_ETF'
)
cn_prices = pd.Series(
50 * np.exp(np.cumsum(np.random.randn(len(cn_calendar)) * 0.015)),
index=cn_calendar, name='CN_ETF'
)
# 直接合并会有大量NaN
combined_raw = pd.DataFrame({'US_ETF': us_prices, 'CN_ETF': cn_prices})
print(f"直接合并后:")
print(f" 总行数: {len(combined_raw)}")
print(f" US_ETF缺失: {combined_raw['US_ETF'].isna().sum()}")
print(f" CN_ETF缺失: {combined_raw['CN_ETF'].isna().sum()}")
# 方法1: 取交集 (只保留双方都交易的日期)
common_dates = us_calendar.intersection(cn_calendar)
combined_inner = combined_raw.loc[common_dates].dropna()
# 方法2: 向前填充 (用最近的收盘价填充)
combined_ffill = combined_raw.ffill().dropna()
print(f"\n方法1 (取交集): {len(combined_inner)}")
print(f"方法2 (ffill): {len(combined_ffill)}")
# 可视化
fig, axes = plt.subplots(1, 2, figsize=(14, 5))
ax = axes[0]
ax.plot(combined_inner.index, combined_inner['US_ETF'] / combined_inner['US_ETF'].iloc[0],
label='US_ETF', linewidth=1.2)
ax.plot(combined_inner.index, combined_inner['CN_ETF'] / combined_inner['CN_ETF'].iloc[0],
label='CN_ETF', linewidth=1.2)
ax.set_title('方法1: 取交集日期', fontsize=13)
ax.legend()
ax.grid(alpha=0.3)
ax = axes[1]
ax.plot(combined_ffill.index, combined_ffill['US_ETF'] / combined_ffill['US_ETF'].iloc[0],
label='US_ETF', linewidth=1.2)
ax.plot(combined_ffill.index, combined_ffill['CN_ETF'] / combined_ffill['CN_ETF'].iloc[0],
label='CN_ETF', linewidth=1.2)
ax.set_title('方法2: 向前填充', fontsize=13)
ax.legend()
ax.grid(alpha=0.3)
plt.suptitle('跨市场数据对齐方法对比', fontsize=14, y=1.02)
plt.tight_layout()
plt.show()
print("\n实践建议:")
print("- 计算相关性、回归: 使用交集日期,确保数据同步")
print("- 构建组合/计算市值: 使用ffill避免数据断裂")
print("- 重要: ffill会引入假的'零收益日',计算波动率时需注意")
# =============================================================================
# TOPIC 6: End-to-End DataPipeline Class (完整数据清洗管道)
# -----------------------------------------------------------------------------
# Combines all prior topics into a reusable, sequential pipeline:
# Step 1 — Filter stocks with too many missing values
# Step 2 — Fill remaining missing values (ffill only — no bfill/look-ahead)
# Step 3 — Compute return series
# Step 4 — Detect outliers (MAD) and apply Winsorize
# Step 5 — Generate a data quality report per stock
#
# This pattern (class-based pipeline with a .run() method) mirrors how
# production quant systems structure data ingestion — each step is auditable,
# configurable, and produces diagnostic output.
# =============================================================================
class DataPipeline:
"""简单的数据清洗管道
实现了从原始数据到干净数据面板的完整流程:
1. 缺失值处理
2. 异常值处理
3. 收益率计算
4. 数据质量报告
"""
def __init__(self, prices, max_missing_pct=0.1, winsorize_pct=(0.01, 0.99)):
self.raw_prices = prices.copy()
self.max_missing_pct = max_missing_pct
self.winsorize_pct = winsorize_pct
self.report = {}
def run(self):
"""运行完整的数据清洗管道"""
print("=" * 60)
print("数据清洗管道运行中...")
print("=" * 60)
# Step 1: 过滤缺失值过多的标的
missing_pct = self.raw_prices.isna().mean()
valid_cols = missing_pct[missing_pct <= self.max_missing_pct].index.tolist()
dropped_cols = missing_pct[missing_pct > self.max_missing_pct].index.tolist()
print(f"\n[Step 1] 过滤缺失值 > {self.max_missing_pct:.0%} 的标的")
print(f" 保留: {len(valid_cols)}个标的, 删除: {len(dropped_cols)}个标的")
if dropped_cols:
print(f" 被删除的标的: {dropped_cols}")
prices = self.raw_prices[valid_cols].copy()
# Step 2: 填充缺失值
# ⚠️ Only ffill (backward-looking). bfill would use future prices to
# fill early NaNs — that is look-ahead bias / data leakage.
# Any rows still NaN after ffill are dropped (typically only the
# very first row if the series starts with a missing value).
print(f"\n[Step 2] 填充缺失值 (ffill only, no bfill)")
n_missing_before = prices.isna().sum().sum()
prices = prices.ffill().dropna(how='any')
n_missing_after = prices.isna().sum().sum()
print(f" 填充前: {n_missing_before}个NaN -> 填充后: {n_missing_after}个NaN")
# Step 3: 计算收益率
print(f"\n[Step 3] 计算收益率")
returns = prices.pct_change().iloc[1:] # 删除第一行NaN
print(f" 收益率面板: {returns.shape[0]}天 x {returns.shape[1]}标的")
# Step 4: 检测和处理异常值
print(f"\n[Step 4] 异常值检测与Winsorize")
n_outliers_total = 0
returns_clean = returns.copy()
for col in returns_clean.columns:
mask, _ = detect_outliers_mad(returns_clean[col], threshold=5.0)
n_outliers = mask.sum()
n_outliers_total += n_outliers
# Winsorize
returns_clean[col] = winsorize(returns_clean[col],
self.winsorize_pct[0], self.winsorize_pct[1])
print(f" 检测到 {n_outliers_total} 个异常值 (MAD, |Z|>5)")
print(f" 已进行Winsorize处理 ({self.winsorize_pct[0]:.0%}, {self.winsorize_pct[1]:.0%})")
# Step 5: 数据质量报告
print(f"\n[Step 5] 数据质量报告")
quality = pd.DataFrame({
'标的': valid_cols,
'日均收益率(bp)': (returns_clean.mean() * 10000).round(2),
'日波动率(%)': (returns_clean.std() * 100).round(3),
'年化收益率(%)': (returns_clean.mean() * 252 * 100).round(2),
'年化波动率(%)': (returns_clean.std() * np.sqrt(252) * 100).round(2),
'偏度': returns_clean.skew().round(3),
'峰度': returns_clean.kurtosis().round(3),
}).set_index('标的')
print(quality)
self.clean_prices = prices
self.clean_returns = returns_clean
self.quality_report = quality
print(f"\n管道运行完成!")
return returns_clean
# 运行管道
pipeline = DataPipeline(price_panel, max_missing_pct=0.15)
clean_returns = pipeline.run()