# ============================================================================= # Quantitative Trading — Data Pipeline Demo # ============================================================================= # Topics covered: # 1. Price Adjustment for Corporate Actions (splits & dividends) # 2. Simple Returns vs Log Returns # 3. Multi-stock Panel & Missing Value Handling # 4. Outlier Detection & Treatment (Z-score, MAD, Winsorize) # 5. Trading Calendar & Cross-market Alignment # 6. End-to-End DataPipeline Class # ============================================================================= import numpy as np import pandas as pd import matplotlib.pyplot as plt from scipy import stats import warnings warnings.filterwarnings('ignore') # 使用我们在 Docker 中配置的字体 plt.rcParams['font.sans-serif'] = ['WenQuanYi Zen Hei', 'Arial Unicode MS', 'SimHei', 'DejaVu Sans'] plt.rcParams['axes.unicode_minus'] = False plt.rcParams['figure.figsize'] = (12, 6) plt.rcParams['figure.dpi'] = 100 np.random.seed(42) print("环境准备完成!") # ============================================================================= # TOPIC 1: Price Adjustment for Corporate Actions (价格复权) # ----------------------------------------------------------------------------- # Raw stock prices have artificial jumps caused by stock splits and cash # dividends. These must be adjusted before computing returns or signals, # otherwise your algorithm will see fake crashes/spikes. # # Key concepts: # - Unadjusted price (未复权): raw market price, has visible jumps # - Forward-adjusted (前复权): history scaled to today's price level # - Backward-adjusted (后复权): history kept real, recent prices scaled up # - Adjustment factor (复权因子): multiplier that accounts for all events # ============================================================================= def generate_raw_stock_data(n_days=1000, seed=42): """生成包含拆股和分红事件的原始股票数据""" np.random.seed(seed) dates = pd.bdate_range(start='2020-01-02', periods=n_days) # 生成"真实"价格序列 (几何布朗运动) mu, sigma = 0.12, 0.25 dt = 1 / 252 log_ret = (mu - 0.5 * sigma**2) * dt + sigma * np.sqrt(dt) * np.random.randn(n_days) true_prices = 100 * np.exp(np.cumsum(log_ret)) # 模拟成交量 volume = np.random.lognormal(mean=15, sigma=0.5, size=n_days).astype(int) # 构建DataFrame df = pd.DataFrame({ 'date': dates, 'close': true_prices, 'volume': volume }).set_index('date') # 添加开高低价 daily_range = df['close'] * np.abs(np.random.randn(n_days)) * 0.02 df['open'] = df['close'] + np.random.randn(n_days) * daily_range * 0.5 df['high'] = np.maximum(df['close'], df['open']) + np.abs(np.random.randn(n_days)) * daily_range * 0.3 df['low'] = np.minimum(df['close'], df['open']) - np.abs(np.random.randn(n_days)) * daily_range * 0.3 # 定义公司行为事件 events = [] # 1:2拆股, 在第300个交易日 split_idx = 300 split_date = dates[split_idx] events.append({'date': split_date, 'type': 'split', 'ratio': 2}) # 现金分红, 每季度一次 for q in [60, 185, 310, 435, 560, 685, 810, 935]: if q < n_days: events.append({'date': dates[q], 'type': 'dividend', 'amount': 0.5}) # 应用公司行为到"未复权"价格 unadj_close = df['close'].copy() adj_factors = pd.Series(1.0, index=dates) for event in sorted(events, key=lambda x: x['date']): idx = dates.get_loc(event['date']) if event['type'] == 'split': # 拆股 unadj_close.iloc[idx:] = unadj_close.iloc[idx:] / event['ratio'] adj_factors.iloc[idx:] = adj_factors.iloc[idx:] * event['ratio'] elif event['type'] == 'dividend': # 分红 price_before = unadj_close.iloc[idx - 1] if idx > 0 else unadj_close.iloc[idx] div_ratio = 1 - event['amount'] / price_before unadj_close.iloc[idx:] = unadj_close.iloc[idx:] * div_ratio adj_factors.iloc[:idx] = adj_factors.iloc[:idx] * div_ratio df['unadj_close'] = unadj_close df['adj_factor'] = adj_factors events_df = pd.DataFrame(events) return df, events_df def forward_adjust(unadj_prices, adj_factor): """前复权:以最新价格为基准,向前调整历史价格""" normalized_factor = adj_factor / adj_factor.iloc[-1] return unadj_prices * normalized_factor def backward_adjust(unadj_prices, adj_factor): """后复权:以最早价格为基准,向后调整""" normalized_factor = adj_factor / adj_factor.iloc[0] return unadj_prices * normalized_factor # 生成数据 stock_data, events = generate_raw_stock_data() print("公司行为事件:") print(events.to_string(index=False)) print(f"\n数据形状: {stock_data.shape}") # 计算复权价格 stock_data['fwd_adj_close'] = forward_adjust(stock_data['unadj_close'], stock_data['adj_factor']) stock_data['bwd_adj_close'] = backward_adjust(stock_data['unadj_close'], stock_data['adj_factor']) # 可视化对比 fig, axes = plt.subplots(2, 2, figsize=(14, 10)) # 1. 未复权价格 ax = axes[0, 0] ax.plot(stock_data['unadj_close'], color='blue', linewidth=1) ax.set_title('未复权价格 (可见拆股/分红跳跃)', fontsize=13) ax.grid(alpha=0.3) for _, event in events.iterrows(): ax.axvline(x=event['date'], color='red', linestyle='--', alpha=0.5) # 2. 前复权价格 ax = axes[0, 1] ax.plot(stock_data['fwd_adj_close'], color='green', linewidth=1) ax.set_title('前复权价格 (当前价格真实)', fontsize=13) ax.grid(alpha=0.3) # 3. 后复权价格 ax = axes[1, 0] ax.plot(stock_data['bwd_adj_close'], color='orange', linewidth=1) ax.set_title('后复权价格 (历史价格真实)', fontsize=13) ax.grid(alpha=0.3) # 4. "真实"价格对比 ax = axes[1, 1] ax.plot(stock_data['close'], label='真实连续价格', color='black', linewidth=1.5) ax.plot(stock_data['fwd_adj_close'], label='前复权价格', color='green', linewidth=1, alpha=0.7) ax.set_title('真实价格 vs 前复权价格', fontsize=13) ax.legend() ax.grid(alpha=0.3) plt.suptitle('复权价格对比', fontsize=15, y=1.02) plt.tight_layout() plt.show() # 验证收益率 print("\n收益率验证 (拆股日前后):") split_date = events[events['type'] == 'split']['date'].iloc[0] split_idx = stock_data.index.get_loc(split_date) print(f"拆股日期: {split_date.strftime('%Y-%m-%d')}") print(f"未复权收益率: {stock_data['unadj_close'].iloc[split_idx] / stock_data['unadj_close'].iloc[split_idx-1] - 1:.4f}") print(f"前复权收益率: {stock_data['fwd_adj_close'].iloc[split_idx] / stock_data['fwd_adj_close'].iloc[split_idx-1] - 1:.4f}") print(f"真实收益率: {stock_data['close'].iloc[split_idx] / stock_data['close'].iloc[split_idx-1] - 1:.4f}") # ============================================================================= # TOPIC 2: Simple Returns vs Log Returns (简单收益率 vs 对数收益率) # ----------------------------------------------------------------------------- # Two ways to measure how much a price changed: # - Simple return: r = (P_t / P_{t-1}) - 1 # - Log return: r = ln(P_t / P_{t-1}) # # Why log returns are preferred in quant research: # - Time-additive: sum daily log returns to get total log return # - Better statistical properties (closer to normal distribution) # - Numerically stable for long compounding periods # # At daily frequency the difference is tiny (~1bp), but it grows at # monthly/annual frequency — never mix the two in the same calculation. # ============================================================================= # 使用前复权价格计算收益率 prices = stock_data['fwd_adj_close'] # 简单收益率 simple_returns = prices.pct_change().dropna() # 对数收益率 log_returns = np.log(prices / prices.shift(1)).dropna() # 对比 comparison = pd.DataFrame({ '简单收益率': simple_returns, '对数收益率': log_returns, '差异': simple_returns - log_returns, '差异(bp)': (simple_returns - log_returns) * 10000 # 基点 }) print("简单收益率 vs 对数收益率统计:") print(f"平均绝对差异: {comparison['差异'].abs().mean():.6f} ({comparison['差异(bp)'].abs().mean():.6f})") print(f"最大差异: {comparison['差异'].abs().max():.6f} ({comparison['差异(bp)'].abs().max():.6f})") print(f"\n日频数据下两者差异很小,但月频或年频时差异会显著增大。") # 验证时间可加性 print("\n=== 时间可加性验证 ===") # 对数收益率:时间上直接求和 total_log_return = log_returns.sum() actual_total = np.log(prices.iloc[-1] / prices.iloc[0]) print(f"对数收益率求和: {total_log_return:.6f}") print(f"实际总对数收益: {actual_total:.6f}") print(f"差异: {abs(total_log_return - actual_total):.10f} (几乎为零)") print("\n简单收益率需要连乘:") total_simple_product = (1 + simple_returns).prod() - 1 actual_simple = prices.iloc[-1] / prices.iloc[0] - 1 print(f"简单收益率连乘: {total_simple_product:.6f}") print(f"实际总简单收益: {actual_simple:.6f}") print(f"差异: {abs(total_simple_product - actual_simple):.10f}") # 可视化 fig, axes = plt.subplots(1, 2, figsize=(14, 5)) ax = axes[0] ax.scatter(simple_returns, log_returns, alpha=0.3, s=5) ax.plot([-0.1, 0.1], [-0.1, 0.1], 'r--', linewidth=1, label='y=x') ax.set_xlabel('简单收益率') ax.set_ylabel('对数收益率') ax.set_title('简单收益率 vs 对数收益率 (日频) ', fontsize=13) ax.legend() ax.grid(alpha=0.3) ax = axes[1] # 月频对比 monthly_simple = prices.resample('ME').last().pct_change().dropna() monthly_log = np.log(prices.resample('ME').last() / prices.resample('ME').last().shift(1)).dropna() ax.scatter(monthly_simple, monthly_log, alpha=0.5, s=20) ax.plot([-0.2, 0.2], [-0.2, 0.2], 'r--', linewidth=1, label='y=x') ax.set_xlabel('简单收益率') ax.set_ylabel('对数收益率') ax.set_title('简单收益率 vs 对数收益率 (月频, 差异更明显) ', fontsize=13) ax.legend() ax.grid(alpha=0.3) plt.tight_layout() plt.show() # ============================================================================= # TOPIC 3: Multi-stock Panel & Missing Value Handling (多标的面板与缺失值处理) # ----------------------------------------------------------------------------- # Real market data panels have two main sources of missing values: # - Suspensions (停牌): a stock halts trading for days/weeks # - Data feed gaps: vendor errors, holidays, late reporting # # Strategies (choice depends on context): # - ffill (forward fill): use last known price — standard for suspensions # - Linear interpolation: smooth gaps — OK for short random gaps # - ffill with limit: ffill but cap at N days — conservative choice # - Drop: remove rows/columns — loses cross-sectional data # # Rule of thumb: if missing% > 10%, consider excluding the stock entirely. # ============================================================================= def generate_multi_stock_panel(n_stocks=5, n_days=500, seed=42): """生成多标的日线数据面板,包含停牌和缺失值""" np.random.seed(seed) dates = pd.bdate_range(start='2022-01-03', periods=n_days) stock_names = [f'Stock_{chr(65+i)}' for i in range(n_stocks)] # Stock_A, Stock_B, ... all_data = {} for i, name in enumerate(stock_names): mu = np.random.uniform(0.05, 0.20) sigma = np.random.uniform(0.15, 0.35) S0 = np.random.uniform(20, 200) dt = 1 / 252 log_ret = (mu - 0.5 * sigma**2) * dt + sigma * np.sqrt(dt) * np.random.randn(n_days) prices = S0 * np.exp(np.cumsum(log_ret)) volume = np.random.lognormal(mean=14, sigma=0.8, size=n_days).astype(int) # 模拟停牌 (随机选择一些连续日期设为NaN) suspension_starts = np.random.choice(range(50, n_days - 30), size=2 + i % 3, replace=False) for start in suspension_starts: duration = np.random.randint(1, 10) end = min(start + duration, n_days) prices[start:end] = np.nan volume[start:end] = 0 # 模拟随机缺失 random_missing = np.random.choice(range(n_days), size=5, replace=False) prices[random_missing] = np.nan all_data[name] = pd.DataFrame({ 'close': prices, 'volume': volume }, index=dates) # 构建面板 price_panel = pd.DataFrame({name: data['close'] for name, data in all_data.items()}) volume_panel = pd.DataFrame({name: data['volume'] for name, data in all_data.items()}) return price_panel, volume_panel # 生成多标的面板 price_panel, volume_panel = generate_multi_stock_panel(n_stocks=5, n_days=500) # 缺失值统计 missing_stats = pd.DataFrame({ '总天数': len(price_panel), '缺失天数': price_panel.isna().sum(), '缺失比例': price_panel.isna().mean(), '最长连续缺失': [price_panel[col].isna().astype(int).groupby( price_panel[col].notna().astype(int).cumsum()).sum().max() for col in price_panel.columns] }) print("缺失值统计:") print(missing_stats) print(f"\n面板形状: {price_panel.shape}") # 缺失值处理方法对比 def handle_missing_values(prices, method='ffill'): """处理缺失值的不同方法""" if method == 'ffill': # ffill = forward fill (前向填充): 用前一个有效值填补缺失值 # 最常用于停牌场景: 假设停牌期间价格保持不变 return prices.ffill() elif method == 'linear': # 线性插值 return prices.interpolate(method='linear') elif method == 'drop': # 直接删除 return prices.dropna() elif method == 'ffill_limit': # 有限制的向前填充 (最多填充5天) return prices.ffill(limit=5) else: raise ValueError(f"Unknown method: {method}") # 对Stock_A展示不同方法的效果 stock_a = price_panel['Stock_A'].copy() fig, axes = plt.subplots(2, 2, figsize=(14, 10)) methods = { '原始数据 (含缺失值) ': stock_a, '向前填充 (ffill)': handle_missing_values(stock_a, 'ffill'), '线性插值 (interpolate)': handle_missing_values(stock_a, 'linear'), '有限填充 (ffill, limit=5)': handle_missing_values(stock_a, 'ffill_limit') } for ax, (title, data) in zip(axes.flat, methods.items()): ax.plot(data, linewidth=1) # 标记原始缺失位置 missing_mask = stock_a.isna() if not missing_mask.all() and title != '原始数据 (含缺失值) ': filled_values = data[missing_mask].dropna() if len(filled_values) > 0: ax.scatter(filled_values.index, filled_values.values, color='red', s=15, zorder=5, label='填充值') ax.legend(fontsize=9) ax.set_title(title, fontsize=12) ax.grid(alpha=0.3) plt.suptitle('缺失值处理方法对比 (Stock_A)', fontsize=14, y=1.02) plt.tight_layout() plt.show() print("\n实践建议:") print("- 停牌: 使用ffill (假设停牌期间价格不变) ") print("- 短期缺失(<3天): 可用线性插值") print("- 长期缺失(>10天): 考虑从分析中排除该标的") print("- 计算收益率前: 先填充缺失值, 再计算") # 构建干净的多标的数据面板 # 步骤: ffill填充 -> 删除仍然有NaN的行 -> 计算收益率 clean_prices = price_panel.ffill().bfill() # 先前填充,再后填充处理开头的NaN print(f"清洗前缺失值: {price_panel.isna().sum().sum()}") print(f"清洗后缺失值: {clean_prices.isna().sum().sum()}") # 计算收益率面板 returns_panel = clean_prices.pct_change().dropna() # 可视化归一化价格 fig, ax = plt.subplots(figsize=(14, 6)) normalized = clean_prices / clean_prices.iloc[0] * 100 for col in normalized.columns: ax.plot(normalized[col], label=col, linewidth=1.2) ax.set_title('多标的归一化价格走势 (基期=100) ', fontsize=14) ax.legend() ax.grid(alpha=0.3) plt.tight_layout() plt.show() # 相关性矩阵 corr_matrix = returns_panel.corr() fig, ax = plt.subplots(figsize=(8, 6)) im = ax.imshow(corr_matrix.values, cmap='RdBu_r', vmin=-1, vmax=1) ax.set_xticks(range(len(corr_matrix))) ax.set_xticklabels(corr_matrix.columns, rotation=45) ax.set_yticks(range(len(corr_matrix))) ax.set_yticklabels(corr_matrix.columns) for i in range(len(corr_matrix)): for j in range(len(corr_matrix)): ax.text(j, i, f'{corr_matrix.iloc[i,j]:.2f}', ha='center', va='center', fontsize=11) plt.colorbar(im, ax=ax) ax.set_title('收益率相关性矩阵', fontsize=14) plt.tight_layout() plt.show() # ============================================================================= # TOPIC 4: Outlier Detection & Treatment (异常值检测与处理) # ----------------------------------------------------------------------------- # Financial return series contain extreme values from real events (crashes, # halts, data errors). How you handle them affects every downstream signal. # # Detection methods: # - Z-score: |z| = |(x - mean) / std| > 3 — simple, but mean/std are # themselves pulled by outliers (not robust) # - MAD: uses median instead of mean — robust to extreme values, # preferred for financial data # # Treatment methods: # - Remove: delete the outlier row — reduces sample size # - Winsorize: clip to [p1, p99] boundary — keeps all data, limits impact # standard practice before factor construction # # Q-Q plot: visualizes how far returns deviate from a normal distribution. # Fat tails (leptokurtosis) are a universal property of financial returns. # ============================================================================= def detect_outliers_zscore(series, threshold=3.0): """Z-score (标准分数) 方法检测异常值 Z = (x - mean) / std 即:这个值偏离平均值多少个标准差? |Z| > threshold (默认3) 视为异常值 通俗理解:如果大家平均考70分,标准差10分,你考了100分, Z = (100-70)/10 = 3,说明你偏离平均值3个标准差,属于异常值。 缺点:均值和标准差本身受异常值影响 """ z_scores = (series - series.mean()) / series.std() outlier_mask = z_scores.abs() > threshold return outlier_mask, z_scores def detect_outliers_mad(series, threshold=3.0): """MAD (中位数绝对偏差, Median Absolute Deviation) 方法检测异常值 MAD = median(|x - median(x)|) 即所有值与中位数的距离的中位数 Modified Z = 0.6745 * (x - median) / MAD 优点:用中位数替代均值,比Z-score更稳健,不受极端值影响 通俗理解:Z-score用"平均值"做参考,容易被极端值带偏; MAD用"中位数"做参考,即使有极端值也不受影响。 """ median = series.median() mad = (series - median).abs().median() if mad == 0: mad = 1e-10 # 避免除零 modified_z = 0.6745 * (series - median) / mad outlier_mask = modified_z.abs() > threshold return outlier_mask, modified_z def winsorize(series, lower_pct=0.01, upper_pct=0.99): """Winsorize (缩尾处理): 将超出分位数的值截断到分位数边界 不同于删除异常值,Winsorize保留了所有数据点,只是限制了极端值。 比如设 lower_pct=0.01, upper_pct=0.99: - 低于1%分位数的值 -> 拉回到1%分位数 - 高于99%分位数的值 -> 拉回到99%分位数 """ lower = series.quantile(lower_pct) upper = series.quantile(upper_pct) return series.clip(lower=lower, upper=upper) # 使用Stock_A的收益率进行演示 ret_a = returns_panel['Stock_A'].copy() # 人为注入一些异常值以演示效果 np.random.seed(123) outlier_indices = np.random.choice(ret_a.index, size=5, replace=False) ret_a_with_outliers = ret_a.copy() for idx in outlier_indices: ret_a_with_outliers[idx] = np.random.choice([-1, 1]) * np.random.uniform(0.15, 0.30) # 检测异常值 zscore_mask, z_scores = detect_outliers_zscore(ret_a_with_outliers, threshold=3.0) mad_mask, mad_scores = detect_outliers_mad(ret_a_with_outliers, threshold=3.0) print("异常值检测结果:") print(f"Z-score方法 (|Z|>3): 检测到 {zscore_mask.sum()} 个异常值") print(f"MAD方法 (|Modified Z|>3): 检测到 {mad_mask.sum()} 个异常值") # Winsorize处理 ret_winsorized = winsorize(ret_a_with_outliers, lower_pct=0.01, upper_pct=0.99) print(f"\nWinsorize前 - 范围: [{ret_a_with_outliers.min():.4f}, {ret_a_with_outliers.max():.4f}]") print(f"Winsorize后 - 范围: [{ret_winsorized.min():.4f}, {ret_winsorized.max():.4f}]") # 可视化异常值检测 fig, axes = plt.subplots(2, 2, figsize=(14, 10)) # 1. 收益率时序与异常值标记 ax = axes[0, 0] ax.plot(ret_a_with_outliers, color='blue', linewidth=0.8, alpha=0.7) outliers = ret_a_with_outliers[zscore_mask] ax.scatter(outliers.index, outliers.values, color='red', s=40, zorder=5, label=f'异常值 ({len(outliers)}个)') ax.set_title('Z-score异常值检测 (|Z|>3)', fontsize=13) ax.legend() ax.grid(alpha=0.3) # 2. MAD方法 ax = axes[0, 1] ax.plot(ret_a_with_outliers, color='blue', linewidth=0.8, alpha=0.7) outliers_mad = ret_a_with_outliers[mad_mask] ax.scatter(outliers_mad.index, outliers_mad.values, color='orange', s=40, zorder=5, label=f'异常值 ({len(outliers_mad)}个)') ax.set_title('MAD异常值检测 (|Modified Z|>3)', fontsize=13) ax.legend() ax.grid(alpha=0.3) # 3. Winsorize前后分布对比 ax = axes[1, 0] ax.hist(ret_a_with_outliers, bins=60, alpha=0.5, label='原始', color='blue', density=True) ax.hist(ret_winsorized, bins=60, alpha=0.5, label='Winsorize后', color='green', density=True) ax.set_title('Winsorize前后收益率分布', fontsize=13) ax.legend() ax.grid(alpha=0.3) # 4. Q-Q图 ax = axes[1, 1] sorted_data = np.sort(ret_a_with_outliers.dropna()) n = len(sorted_data) theoretical = stats.norm.ppf(np.arange(1, n + 1) / (n + 1)) ax.scatter(theoretical, sorted_data, alpha=0.5, s=10) ax.plot([theoretical.min(), theoretical.max()], [theoretical.min() * ret_a_with_outliers.std() + ret_a_with_outliers.mean(), theoretical.max() * ret_a_with_outliers.std() + ret_a_with_outliers.mean()], 'r--', linewidth=1, label='正态分布参考线') ax.set_xlabel('理论分位数') ax.set_ylabel('样本分位数') ax.set_title('Q-Q图 (检验正态性) ', fontsize=13) ax.legend() ax.grid(alpha=0.3) plt.tight_layout() plt.show() print("\n方法选择建议:") print("- Z-score: 简单快速,但受极端值影响 (不够稳健) ") print("- MAD: 更稳健,推荐用于金融数据") print("- Winsorize: 保留数据量,适合构建因子时使用") print("- 实际中常用: Winsorize(1%, 99%) + MAD检测") # ============================================================================= # TOPIC 5: Trading Calendar & Cross-market Alignment (交易日历与跨市场对齐) # ----------------------------------------------------------------------------- # Different markets trade on different days (national holidays, weekends). # When combining data from multiple markets, you must decide how to align # the time axis — the wrong choice distorts correlation calculations. # # Two alignment strategies: # - Intersection (取交集): keep only dates both markets are open. # Best for: correlation, regression, signal comparison. # Drawback: loses data on single-market trading days. # # - Forward fill (向前填充): fill non-trading days with last known price. # Best for: portfolio construction, continuous NAV calculation. # Drawback: introduces artificial zero-return days — inflates Sharpe, # understates volatility. Must be accounted for in analysis. # # Note: this demo uses simplified hardcoded holiday lists. Production systems # should use exchange_calendars or pandas_market_calendars libraries. # ============================================================================= def create_trading_calendars(): """创建不同市场的交易日历示例 不同市场有不同的假日和交易时间,跨市场策略必须正确处理。 """ # 生成2024年的工作日 all_bdays = pd.bdate_range(start='2024-01-01', end='2024-12-31') # 美国市场假日 (简化版) us_holidays = pd.to_datetime([ '2024-01-01', # 新年 '2024-01-15', # MLK Day '2024-02-19', # Presidents Day '2024-03-29', # Good Friday '2024-05-27', # Memorial Day '2024-06-19', # Juneteenth '2024-07-04', # Independence Day '2024-09-02', # Labor Day '2024-11-28', # Thanksgiving '2024-12-25', # Christmas ]) # 中国A股假日 (简化版) cn_holidays = pd.to_datetime([ '2024-01-01', # 元旦 '2024-02-09', '2024-02-12', '2024-02-13', '2024-02-14', '2024-02-15', '2024-02-16', # 春节 '2024-04-04', '2024-04-05', # 清明 '2024-05-01', '2024-05-02', '2024-05-03', # 劳动节 '2024-06-10', # 端午 '2024-09-16', '2024-09-17', # 中秋 '2024-10-01', '2024-10-02', '2024-10-03', '2024-10-04', '2024-10-07', # 国庆 ]) us_trading_days = all_bdays[~all_bdays.isin(us_holidays)] cn_trading_days = all_bdays[~all_bdays.isin(cn_holidays)] return us_trading_days, cn_trading_days us_calendar, cn_calendar = create_trading_calendars() print(f"2024年美股交易日: {len(us_calendar)}天") print(f"2024年A股交易日: {len(cn_calendar)}天") print(f"共同交易日: {len(us_calendar.intersection(cn_calendar))}天") print(f"仅美股交易日: {len(us_calendar.difference(cn_calendar))}天") print(f"仅A股交易日: {len(cn_calendar.difference(us_calendar))}天") # 模拟跨市场数据对齐 np.random.seed(42) # 生成美股和A股数据 us_prices = pd.Series( 100 * np.exp(np.cumsum(np.random.randn(len(us_calendar)) * 0.01)), index=us_calendar, name='US_ETF' ) cn_prices = pd.Series( 50 * np.exp(np.cumsum(np.random.randn(len(cn_calendar)) * 0.015)), index=cn_calendar, name='CN_ETF' ) # 直接合并会有大量NaN combined_raw = pd.DataFrame({'US_ETF': us_prices, 'CN_ETF': cn_prices}) print(f"直接合并后:") print(f" 总行数: {len(combined_raw)}") print(f" US_ETF缺失: {combined_raw['US_ETF'].isna().sum()}") print(f" CN_ETF缺失: {combined_raw['CN_ETF'].isna().sum()}") # 方法1: 取交集 (只保留双方都交易的日期) common_dates = us_calendar.intersection(cn_calendar) combined_inner = combined_raw.loc[common_dates].dropna() # 方法2: 向前填充 (用最近的收盘价填充) combined_ffill = combined_raw.ffill().dropna() print(f"\n方法1 (取交集): {len(combined_inner)}行") print(f"方法2 (ffill): {len(combined_ffill)}行") # 可视化 fig, axes = plt.subplots(1, 2, figsize=(14, 5)) ax = axes[0] ax.plot(combined_inner.index, combined_inner['US_ETF'] / combined_inner['US_ETF'].iloc[0], label='US_ETF', linewidth=1.2) ax.plot(combined_inner.index, combined_inner['CN_ETF'] / combined_inner['CN_ETF'].iloc[0], label='CN_ETF', linewidth=1.2) ax.set_title('方法1: 取交集日期', fontsize=13) ax.legend() ax.grid(alpha=0.3) ax = axes[1] ax.plot(combined_ffill.index, combined_ffill['US_ETF'] / combined_ffill['US_ETF'].iloc[0], label='US_ETF', linewidth=1.2) ax.plot(combined_ffill.index, combined_ffill['CN_ETF'] / combined_ffill['CN_ETF'].iloc[0], label='CN_ETF', linewidth=1.2) ax.set_title('方法2: 向前填充', fontsize=13) ax.legend() ax.grid(alpha=0.3) plt.suptitle('跨市场数据对齐方法对比', fontsize=14, y=1.02) plt.tight_layout() plt.show() print("\n实践建议:") print("- 计算相关性、回归: 使用交集日期,确保数据同步") print("- 构建组合/计算市值: 使用ffill,避免数据断裂") print("- 重要: ffill会引入假的'零收益日',计算波动率时需注意") # ============================================================================= # TOPIC 6: End-to-End DataPipeline Class (完整数据清洗管道) # ----------------------------------------------------------------------------- # Combines all prior topics into a reusable, sequential pipeline: # Step 1 — Filter stocks with too many missing values # Step 2 — Fill remaining missing values (ffill + bfill) # Step 3 — Compute return series # Step 4 — Detect outliers (MAD) and apply Winsorize # Step 5 — Generate a data quality report per stock # # This pattern (class-based pipeline with a .run() method) mirrors how # production quant systems structure data ingestion — each step is auditable, # configurable, and produces diagnostic output. # ============================================================================= class DataPipeline: """简单的数据清洗管道 实现了从原始数据到干净数据面板的完整流程: 1. 缺失值处理 2. 异常值处理 3. 收益率计算 4. 数据质量报告 """ def __init__(self, prices, max_missing_pct=0.1, winsorize_pct=(0.01, 0.99)): self.raw_prices = prices.copy() self.max_missing_pct = max_missing_pct self.winsorize_pct = winsorize_pct self.report = {} def run(self): """运行完整的数据清洗管道""" print("=" * 60) print("数据清洗管道运行中...") print("=" * 60) # Step 1: 过滤缺失值过多的标的 missing_pct = self.raw_prices.isna().mean() valid_cols = missing_pct[missing_pct <= self.max_missing_pct].index.tolist() dropped_cols = missing_pct[missing_pct > self.max_missing_pct].index.tolist() print(f"\n[Step 1] 过滤缺失值 > {self.max_missing_pct:.0%} 的标的") print(f" 保留: {len(valid_cols)}个标的, 删除: {len(dropped_cols)}个标的") if dropped_cols: print(f" 被删除的标的: {dropped_cols}") prices = self.raw_prices[valid_cols].copy() # Step 2: 填充缺失值 print(f"\n[Step 2] 填充缺失值 (ffill + bfill)") n_missing_before = prices.isna().sum().sum() prices = prices.ffill().bfill() n_missing_after = prices.isna().sum().sum() print(f" 填充前: {n_missing_before}个NaN -> 填充后: {n_missing_after}个NaN") # Step 3: 计算收益率 print(f"\n[Step 3] 计算收益率") returns = prices.pct_change().iloc[1:] # 删除第一行NaN print(f" 收益率面板: {returns.shape[0]}天 x {returns.shape[1]}标的") # Step 4: 检测和处理异常值 print(f"\n[Step 4] 异常值检测与Winsorize") n_outliers_total = 0 returns_clean = returns.copy() for col in returns_clean.columns: mask, _ = detect_outliers_mad(returns_clean[col], threshold=5.0) n_outliers = mask.sum() n_outliers_total += n_outliers # Winsorize returns_clean[col] = winsorize(returns_clean[col], self.winsorize_pct[0], self.winsorize_pct[1]) print(f" 检测到 {n_outliers_total} 个异常值 (MAD, |Z|>5)") print(f" 已进行Winsorize处理 ({self.winsorize_pct[0]:.0%}, {self.winsorize_pct[1]:.0%})") # Step 5: 数据质量报告 print(f"\n[Step 5] 数据质量报告") quality = pd.DataFrame({ '标的': valid_cols, '日均收益率(bp)': (returns_clean.mean() * 10000).round(2), '日波动率(%)': (returns_clean.std() * 100).round(3), '年化收益率(%)': (returns_clean.mean() * 252 * 100).round(2), '年化波动率(%)': (returns_clean.std() * np.sqrt(252) * 100).round(2), '偏度': returns_clean.skew().round(3), '峰度': returns_clean.kurtosis().round(3), }).set_index('标的') print(quality) self.clean_prices = prices self.clean_returns = returns_clean self.quality_report = quality print(f"\n管道运行完成!") return returns_clean # 运行管道 pipeline = DataPipeline(price_panel, max_missing_pct=0.15) clean_returns = pipeline.run()