class DataPipeline: """简单的数据清洗管道 实现了从原始数据到干净数据面板的完整流程: 1. 缺失值处理 2. 异常值处理 3. 收益率计算 4. 数据质量报告 """ def __init__(self, prices, max_missing_pct=0.1, winsorize_pct=(0.01, 0.99)): self.raw_prices = prices.copy() self.max_missing_pct = max_missing_pct self.winsorize_pct = winsorize_pct self.report = {} def run(self): """运行完整的数据清洗管道""" print("=" * 60) print("数据清洗管道运行中...") print("=" * 60) # Step 1: 过滤缺失值过多的标的 missing_pct = self.raw_prices.isna().mean() valid_cols = missing_pct[missing_pct <= self.max_missing_pct].index.tolist() dropped_cols = missing_pct[missing_pct > self.max_missing_pct].index.tolist() print(f"\n[Step 1] 过滤缺失值 > {self.max_missing_pct:.0%} 的标的") print(f" 保留: {len(valid_cols)}个标的, 删除: {len(dropped_cols)}个标的") if dropped_cols: print(f" 被删除的标的: {dropped_cols}") prices = self.raw_prices[valid_cols].copy() # Step 2: 填充缺失值 print(f"\n[Step 2] 填充缺失值 (ffill + bfill)") n_missing_before = prices.isna().sum().sum() prices = prices.ffill().bfill() n_missing_after = prices.isna().sum().sum() print(f" 填充前: {n_missing_before}个NaN -> 填充后: {n_missing_after}个NaN") # Step 3: 计算收益率 print(f"\n[Step 3] 计算收益率") returns = prices.pct_change().iloc[1:] # 删除第一行NaN print(f" 收益率面板: {returns.shape[0]}天 x {returns.shape[1]}标的") # Step 4: 检测和处理异常值 print(f"\n[Step 4] 异常值检测与Winsorize") n_outliers_total = 0 returns_clean = returns.copy() for col in returns_clean.columns: mask, _ = detect_outliers_mad(returns_clean[col], threshold=5.0) n_outliers = mask.sum() n_outliers_total += n_outliers # Winsorize returns_clean[col] = winsorize(returns_clean[col], self.winsorize_pct[0], self.winsorize_pct[1]) print(f" 检测到 {n_outliers_total} 个异常值 (MAD, |Z|>5)") print(f" 已进行Winsorize处理 ({self.winsorize_pct[0]:.0%}, {self.winsorize_pct[1]:.0%})") # Step 5: 数据质量报告 print(f"\n[Step 5] 数据质量报告") quality = pd.DataFrame({ '标的': valid_cols, '日均收益率(bp)': (returns_clean.mean() * 10000).round(2), '日波动率(%)': (returns_clean.std() * 100).round(3), '年化收益率(%)': (returns_clean.mean() * 252 * 100).round(2), '年化波动率(%)': (returns_clean.std() * np.sqrt(252) * 100).round(2), '偏度': returns_clean.skew().round(3), '峰度': returns_clean.kurtosis().round(3), }).set_index('标的') print(quality) self.clean_prices = prices self.clean_returns = returns_clean self.quality_report = quality print(f"\n管道运行完成!") return returns_clean # 运行管道 pipeline = DataPipeline(price_panel, max_missing_pct=0.15) clean_returns = pipeline.run()