trading/pipeline.py

84 lines
3.5 KiB
Python

class DataPipeline:
"""简单的数据清洗管道
实现了从原始数据到干净数据面板的完整流程:
1. 缺失值处理
2. 异常值处理
3. 收益率计算
4. 数据质量报告
"""
def __init__(self, prices, max_missing_pct=0.1, winsorize_pct=(0.01, 0.99)):
self.raw_prices = prices.copy()
self.max_missing_pct = max_missing_pct
self.winsorize_pct = winsorize_pct
self.report = {}
def run(self):
"""运行完整的数据清洗管道"""
print("=" * 60)
print("数据清洗管道运行中...")
print("=" * 60)
# Step 1: 过滤缺失值过多的标的
missing_pct = self.raw_prices.isna().mean()
valid_cols = missing_pct[missing_pct <= self.max_missing_pct].index.tolist()
dropped_cols = missing_pct[missing_pct > self.max_missing_pct].index.tolist()
print(f"\n[Step 1] 过滤缺失值 > {self.max_missing_pct:.0%} 的标的")
print(f" 保留: {len(valid_cols)}个标的, 删除: {len(dropped_cols)}个标的")
if dropped_cols:
print(f" 被删除的标的: {dropped_cols}")
prices = self.raw_prices[valid_cols].copy()
# Step 2: 填充缺失值
print(f"\n[Step 2] 填充缺失值 (ffill + bfill)")
n_missing_before = prices.isna().sum().sum()
prices = prices.ffill().bfill()
n_missing_after = prices.isna().sum().sum()
print(f" 填充前: {n_missing_before}个NaN -> 填充后: {n_missing_after}个NaN")
# Step 3: 计算收益率
print(f"\n[Step 3] 计算收益率")
returns = prices.pct_change().iloc[1:] # 删除第一行NaN
print(f" 收益率面板: {returns.shape[0]}天 x {returns.shape[1]}标的")
# Step 4: 检测和处理异常值
print(f"\n[Step 4] 异常值检测与Winsorize")
n_outliers_total = 0
returns_clean = returns.copy()
for col in returns_clean.columns:
mask, _ = detect_outliers_mad(returns_clean[col], threshold=5.0)
n_outliers = mask.sum()
n_outliers_total += n_outliers
# Winsorize
returns_clean[col] = winsorize(returns_clean[col],
self.winsorize_pct[0], self.winsorize_pct[1])
print(f" 检测到 {n_outliers_total} 个异常值 (MAD, |Z|>5)")
print(f" 已进行Winsorize处理 ({self.winsorize_pct[0]:.0%}, {self.winsorize_pct[1]:.0%})")
# Step 5: 数据质量报告
print(f"\n[Step 5] 数据质量报告")
quality = pd.DataFrame({
'标的': valid_cols,
'日均收益率(bp)': (returns_clean.mean() * 10000).round(2),
'日波动率(%)': (returns_clean.std() * 100).round(3),
'年化收益率(%)': (returns_clean.mean() * 252 * 100).round(2),
'年化波动率(%)': (returns_clean.std() * np.sqrt(252) * 100).round(2),
'偏度': returns_clean.skew().round(3),
'峰度': returns_clean.kurtosis().round(3),
}).set_index('标的')
print(quality)
self.clean_prices = prices
self.clean_returns = returns_clean
self.quality_report = quality
print(f"\n管道运行完成!")
return returns_clean
# 运行管道
pipeline = DataPipeline(price_panel, max_missing_pct=0.15)
clean_returns = pipeline.run()