84 lines
3.5 KiB
Python
84 lines
3.5 KiB
Python
class DataPipeline:
|
|
"""简单的数据清洗管道
|
|
|
|
实现了从原始数据到干净数据面板的完整流程:
|
|
1. 缺失值处理
|
|
2. 异常值处理
|
|
3. 收益率计算
|
|
4. 数据质量报告
|
|
"""
|
|
|
|
def __init__(self, prices, max_missing_pct=0.1, winsorize_pct=(0.01, 0.99)):
|
|
self.raw_prices = prices.copy()
|
|
self.max_missing_pct = max_missing_pct
|
|
self.winsorize_pct = winsorize_pct
|
|
self.report = {}
|
|
|
|
def run(self):
|
|
"""运行完整的数据清洗管道"""
|
|
print("=" * 60)
|
|
print("数据清洗管道运行中...")
|
|
print("=" * 60)
|
|
|
|
# Step 1: 过滤缺失值过多的标的
|
|
missing_pct = self.raw_prices.isna().mean()
|
|
valid_cols = missing_pct[missing_pct <= self.max_missing_pct].index.tolist()
|
|
dropped_cols = missing_pct[missing_pct > self.max_missing_pct].index.tolist()
|
|
|
|
print(f"\n[Step 1] 过滤缺失值 > {self.max_missing_pct:.0%} 的标的")
|
|
print(f" 保留: {len(valid_cols)}个标的, 删除: {len(dropped_cols)}个标的")
|
|
if dropped_cols:
|
|
print(f" 被删除的标的: {dropped_cols}")
|
|
|
|
prices = self.raw_prices[valid_cols].copy()
|
|
|
|
# Step 2: 填充缺失值
|
|
print(f"\n[Step 2] 填充缺失值 (ffill + bfill)")
|
|
n_missing_before = prices.isna().sum().sum()
|
|
prices = prices.ffill().bfill()
|
|
n_missing_after = prices.isna().sum().sum()
|
|
print(f" 填充前: {n_missing_before}个NaN -> 填充后: {n_missing_after}个NaN")
|
|
|
|
# Step 3: 计算收益率
|
|
print(f"\n[Step 3] 计算收益率")
|
|
returns = prices.pct_change().iloc[1:] # 删除第一行NaN
|
|
print(f" 收益率面板: {returns.shape[0]}天 x {returns.shape[1]}标的")
|
|
|
|
# Step 4: 检测和处理异常值
|
|
print(f"\n[Step 4] 异常值检测与Winsorize")
|
|
n_outliers_total = 0
|
|
returns_clean = returns.copy()
|
|
for col in returns_clean.columns:
|
|
mask, _ = detect_outliers_mad(returns_clean[col], threshold=5.0)
|
|
n_outliers = mask.sum()
|
|
n_outliers_total += n_outliers
|
|
# Winsorize
|
|
returns_clean[col] = winsorize(returns_clean[col],
|
|
self.winsorize_pct[0], self.winsorize_pct[1])
|
|
|
|
print(f" 检测到 {n_outliers_total} 个异常值 (MAD, |Z|>5)")
|
|
print(f" 已进行Winsorize处理 ({self.winsorize_pct[0]:.0%}, {self.winsorize_pct[1]:.0%})")
|
|
|
|
# Step 5: 数据质量报告
|
|
print(f"\n[Step 5] 数据质量报告")
|
|
quality = pd.DataFrame({
|
|
'标的': valid_cols,
|
|
'日均收益率(bp)': (returns_clean.mean() * 10000).round(2),
|
|
'日波动率(%)': (returns_clean.std() * 100).round(3),
|
|
'年化收益率(%)': (returns_clean.mean() * 252 * 100).round(2),
|
|
'年化波动率(%)': (returns_clean.std() * np.sqrt(252) * 100).round(2),
|
|
'偏度': returns_clean.skew().round(3),
|
|
'峰度': returns_clean.kurtosis().round(3),
|
|
}).set_index('标的')
|
|
print(quality)
|
|
|
|
self.clean_prices = prices
|
|
self.clean_returns = returns_clean
|
|
self.quality_report = quality
|
|
|
|
print(f"\n管道运行完成!")
|
|
return returns_clean
|
|
|
|
# 运行管道
|
|
pipeline = DataPipeline(price_panel, max_missing_pct=0.15)
|
|
clean_returns = pipeline.run() |