trading/hello.py

685 lines
26 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy import stats
import warnings
warnings.filterwarnings('ignore')
# 使用我们在 Docker 中配置的字体
plt.rcParams['font.sans-serif'] = ['WenQuanYi Zen Hei', 'Arial Unicode MS', 'SimHei', 'DejaVu Sans']
plt.rcParams['axes.unicode_minus'] = False
plt.rcParams['figure.figsize'] = (12, 6)
plt.rcParams['figure.dpi'] = 100
np.random.seed(42)
print("环境准备完成!")
def generate_raw_stock_data(n_days=1000, seed=42):
"""生成包含拆股和分红事件的原始股票数据"""
np.random.seed(seed)
dates = pd.bdate_range(start='2020-01-02', periods=n_days)
# 生成"真实"价格序列 (几何布朗运动)
mu, sigma = 0.12, 0.25
dt = 1 / 252
log_ret = (mu - 0.5 * sigma**2) * dt + sigma * np.sqrt(dt) * np.random.randn(n_days)
true_prices = 100 * np.exp(np.cumsum(log_ret))
# 模拟成交量
volume = np.random.lognormal(mean=15, sigma=0.5, size=n_days).astype(int)
# 构建DataFrame
df = pd.DataFrame({
'date': dates,
'close': true_prices,
'volume': volume
}).set_index('date')
# 添加开高低价
daily_range = df['close'] * np.abs(np.random.randn(n_days)) * 0.02
df['open'] = df['close'] + np.random.randn(n_days) * daily_range * 0.5
df['high'] = np.maximum(df['close'], df['open']) + np.abs(np.random.randn(n_days)) * daily_range * 0.3
df['low'] = np.minimum(df['close'], df['open']) - np.abs(np.random.randn(n_days)) * daily_range * 0.3
# 定义公司行为事件
events = []
# 1:2拆股, 在第300个交易日
split_idx = 300
split_date = dates[split_idx]
events.append({'date': split_date, 'type': 'split', 'ratio': 2})
# 现金分红, 每季度一次
for q in [60, 185, 310, 435, 560, 685, 810, 935]:
if q < n_days:
events.append({'date': dates[q], 'type': 'dividend', 'amount': 0.5})
# 应用公司行为到"未复权"价格
unadj_close = df['close'].copy()
adj_factors = pd.Series(1.0, index=dates)
for event in sorted(events, key=lambda x: x['date']):
idx = dates.get_loc(event['date'])
if event['type'] == 'split':
# 拆股
unadj_close.iloc[idx:] = unadj_close.iloc[idx:] / event['ratio']
adj_factors.iloc[idx:] = adj_factors.iloc[idx:] * event['ratio']
elif event['type'] == 'dividend':
# 分红
price_before = unadj_close.iloc[idx - 1] if idx > 0 else unadj_close.iloc[idx]
div_ratio = 1 - event['amount'] / price_before
unadj_close.iloc[idx:] = unadj_close.iloc[idx:] * div_ratio
adj_factors.iloc[:idx] = adj_factors.iloc[:idx] * div_ratio
df['unadj_close'] = unadj_close
df['adj_factor'] = adj_factors
events_df = pd.DataFrame(events)
return df, events_df
def forward_adjust(unadj_prices, adj_factor):
"""前复权:以最新价格为基准,向前调整历史价格"""
normalized_factor = adj_factor / adj_factor.iloc[-1]
return unadj_prices * normalized_factor
def backward_adjust(unadj_prices, adj_factor):
"""后复权:以最早价格为基准,向后调整"""
normalized_factor = adj_factor / adj_factor.iloc[0]
return unadj_prices * normalized_factor
# 生成数据
stock_data, events = generate_raw_stock_data()
print("公司行为事件:")
print(events.to_string(index=False))
print(f"\n数据形状: {stock_data.shape}")
# 计算复权价格
stock_data['fwd_adj_close'] = forward_adjust(stock_data['unadj_close'], stock_data['adj_factor'])
stock_data['bwd_adj_close'] = backward_adjust(stock_data['unadj_close'], stock_data['adj_factor'])
# 可视化对比
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
# 1. 未复权价格
ax = axes[0, 0]
ax.plot(stock_data['unadj_close'], color='blue', linewidth=1)
ax.set_title('未复权价格 (可见拆股/分红跳跃)', fontsize=13)
ax.grid(alpha=0.3)
for _, event in events.iterrows():
ax.axvline(x=event['date'], color='red', linestyle='--', alpha=0.5)
# 2. 前复权价格
ax = axes[0, 1]
ax.plot(stock_data['fwd_adj_close'], color='green', linewidth=1)
ax.set_title('前复权价格 (当前价格真实)', fontsize=13)
ax.grid(alpha=0.3)
# 3. 后复权价格
ax = axes[1, 0]
ax.plot(stock_data['bwd_adj_close'], color='orange', linewidth=1)
ax.set_title('后复权价格 (历史价格真实)', fontsize=13)
ax.grid(alpha=0.3)
# 4. "真实"价格对比
ax = axes[1, 1]
ax.plot(stock_data['close'], label='真实连续价格', color='black', linewidth=1.5)
ax.plot(stock_data['fwd_adj_close'], label='前复权价格', color='green', linewidth=1, alpha=0.7)
ax.set_title('真实价格 vs 前复权价格', fontsize=13)
ax.legend()
ax.grid(alpha=0.3)
plt.suptitle('复权价格对比', fontsize=15, y=1.02)
plt.tight_layout()
plt.show()
# 验证收益率
print("\n收益率验证 (拆股日前后):")
split_date = events[events['type'] == 'split']['date'].iloc[0]
split_idx = stock_data.index.get_loc(split_date)
print(f"拆股日期: {split_date.strftime('%Y-%m-%d')}")
print(f"未复权收益率: {stock_data['unadj_close'].iloc[split_idx] / stock_data['unadj_close'].iloc[split_idx-1] - 1:.4f}")
print(f"前复权收益率: {stock_data['fwd_adj_close'].iloc[split_idx] / stock_data['fwd_adj_close'].iloc[split_idx-1] - 1:.4f}")
print(f"真实收益率: {stock_data['close'].iloc[split_idx] / stock_data['close'].iloc[split_idx-1] - 1:.4f}")
# 使用前复权价格计算收益率
prices = stock_data['fwd_adj_close']
# 简单收益率
simple_returns = prices.pct_change().dropna()
# 对数收益率
log_returns = np.log(prices / prices.shift(1)).dropna()
# 对比
comparison = pd.DataFrame({
'简单收益率': simple_returns,
'对数收益率': log_returns,
'差异': simple_returns - log_returns,
'差异(bp)': (simple_returns - log_returns) * 10000 # 基点
})
print("简单收益率 vs 对数收益率统计:")
print(f"平均绝对差异: {comparison['差异'].abs().mean():.6f} ({comparison['差异(bp)'].abs().mean():.6f})")
print(f"最大差异: {comparison['差异'].abs().max():.6f} ({comparison['差异(bp)'].abs().max():.6f})")
print(f"\n日频数据下两者差异很小,但月频或年频时差异会显著增大。")
# 验证时间可加性
print("\n=== 时间可加性验证 ===")
# 对数收益率:时间上直接求和
total_log_return = log_returns.sum()
actual_total = np.log(prices.iloc[-1] / prices.iloc[0])
print(f"对数收益率求和: {total_log_return:.6f}")
print(f"实际总对数收益: {actual_total:.6f}")
print(f"差异: {abs(total_log_return - actual_total):.10f} (几乎为零)")
print("\n简单收益率需要连乘:")
total_simple_product = (1 + simple_returns).prod() - 1
actual_simple = prices.iloc[-1] / prices.iloc[0] - 1
print(f"简单收益率连乘: {total_simple_product:.6f}")
print(f"实际总简单收益: {actual_simple:.6f}")
print(f"差异: {abs(total_simple_product - actual_simple):.10f}")
# 可视化
fig, axes = plt.subplots(1, 2, figsize=(14, 5))
ax = axes[0]
ax.scatter(simple_returns, log_returns, alpha=0.3, s=5)
ax.plot([-0.1, 0.1], [-0.1, 0.1], 'r--', linewidth=1, label='y=x')
ax.set_xlabel('简单收益率')
ax.set_ylabel('对数收益率')
ax.set_title('简单收益率 vs 对数收益率 (日频) ', fontsize=13)
ax.legend()
ax.grid(alpha=0.3)
ax = axes[1]
# 月频对比
monthly_simple = prices.resample('ME').last().pct_change().dropna()
monthly_log = np.log(prices.resample('ME').last() / prices.resample('ME').last().shift(1)).dropna()
ax.scatter(monthly_simple, monthly_log, alpha=0.5, s=20)
ax.plot([-0.2, 0.2], [-0.2, 0.2], 'r--', linewidth=1, label='y=x')
ax.set_xlabel('简单收益率')
ax.set_ylabel('对数收益率')
ax.set_title('简单收益率 vs 对数收益率 (月频, 差异更明显) ', fontsize=13)
ax.legend()
ax.grid(alpha=0.3)
plt.tight_layout()
plt.show()
def generate_multi_stock_panel(n_stocks=5, n_days=500, seed=42):
"""生成多标的日线数据面板,包含停牌和缺失值"""
np.random.seed(seed)
dates = pd.bdate_range(start='2022-01-03', periods=n_days)
stock_names = [f'Stock_{chr(65+i)}' for i in range(n_stocks)] # Stock_A, Stock_B, ...
all_data = {}
for i, name in enumerate(stock_names):
mu = np.random.uniform(0.05, 0.20)
sigma = np.random.uniform(0.15, 0.35)
S0 = np.random.uniform(20, 200)
dt = 1 / 252
log_ret = (mu - 0.5 * sigma**2) * dt + sigma * np.sqrt(dt) * np.random.randn(n_days)
prices = S0 * np.exp(np.cumsum(log_ret))
volume = np.random.lognormal(mean=14, sigma=0.8, size=n_days).astype(int)
# 模拟停牌 (随机选择一些连续日期设为NaN)
suspension_starts = np.random.choice(range(50, n_days - 30), size=2 + i % 3, replace=False)
for start in suspension_starts:
duration = np.random.randint(1, 10)
end = min(start + duration, n_days)
prices[start:end] = np.nan
volume[start:end] = 0
# 模拟随机缺失
random_missing = np.random.choice(range(n_days), size=5, replace=False)
prices[random_missing] = np.nan
all_data[name] = pd.DataFrame({
'close': prices,
'volume': volume
}, index=dates)
# 构建面板
price_panel = pd.DataFrame({name: data['close'] for name, data in all_data.items()})
volume_panel = pd.DataFrame({name: data['volume'] for name, data in all_data.items()})
return price_panel, volume_panel
# 生成多标的面板
price_panel, volume_panel = generate_multi_stock_panel(n_stocks=5, n_days=500)
# 缺失值统计
missing_stats = pd.DataFrame({
'总天数': len(price_panel),
'缺失天数': price_panel.isna().sum(),
'缺失比例': price_panel.isna().mean(),
'最长连续缺失': [price_panel[col].isna().astype(int).groupby(
price_panel[col].notna().astype(int).cumsum()).sum().max()
for col in price_panel.columns]
})
print("缺失值统计:")
print(missing_stats)
print(f"\n面板形状: {price_panel.shape}")
# 缺失值处理方法对比
def handle_missing_values(prices, method='ffill'):
"""处理缺失值的不同方法"""
if method == 'ffill':
# ffill = forward fill (前向填充): 用前一个有效值填补缺失值
# 最常用于停牌场景: 假设停牌期间价格保持不变
return prices.ffill()
elif method == 'linear':
# 线性插值
return prices.interpolate(method='linear')
elif method == 'drop':
# 直接删除
return prices.dropna()
elif method == 'ffill_limit':
# 有限制的向前填充 (最多填充5天)
return prices.ffill(limit=5)
else:
raise ValueError(f"Unknown method: {method}")
# 对Stock_A展示不同方法的效果
stock_a = price_panel['Stock_A'].copy()
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
methods = {
'原始数据 (含缺失值) ': stock_a,
'向前填充 (ffill)': handle_missing_values(stock_a, 'ffill'),
'线性插值 (interpolate)': handle_missing_values(stock_a, 'linear'),
'有限填充 (ffill, limit=5)': handle_missing_values(stock_a, 'ffill_limit')
}
for ax, (title, data) in zip(axes.flat, methods.items()):
ax.plot(data, linewidth=1)
# 标记原始缺失位置
missing_mask = stock_a.isna()
if not missing_mask.all() and title != '原始数据 (含缺失值) ':
filled_values = data[missing_mask].dropna()
if len(filled_values) > 0:
ax.scatter(filled_values.index, filled_values.values,
color='red', s=15, zorder=5, label='填充值')
ax.legend(fontsize=9)
ax.set_title(title, fontsize=12)
ax.grid(alpha=0.3)
plt.suptitle('缺失值处理方法对比 (Stock_A)', fontsize=14, y=1.02)
plt.tight_layout()
plt.show()
print("\n实践建议:")
print("- 停牌: 使用ffill (假设停牌期间价格不变) ")
print("- 短期缺失(<3天): 可用线性插值")
print("- 长期缺失(>10天): 考虑从分析中排除该标的")
print("- 计算收益率前: 先填充缺失值, 再计算")
# 构建干净的多标的数据面板
# 步骤: ffill填充 -> 删除仍然有NaN的行 -> 计算收益率
clean_prices = price_panel.ffill().bfill() # 先前填充再后填充处理开头的NaN
print(f"清洗前缺失值: {price_panel.isna().sum().sum()}")
print(f"清洗后缺失值: {clean_prices.isna().sum().sum()}")
# 计算收益率面板
returns_panel = clean_prices.pct_change().dropna()
# 可视化归一化价格
fig, ax = plt.subplots(figsize=(14, 6))
normalized = clean_prices / clean_prices.iloc[0] * 100
for col in normalized.columns:
ax.plot(normalized[col], label=col, linewidth=1.2)
ax.set_title('多标的归一化价格走势 (基期=100) ', fontsize=14)
ax.legend()
ax.grid(alpha=0.3)
plt.tight_layout()
plt.show()
# 相关性矩阵
corr_matrix = returns_panel.corr()
fig, ax = plt.subplots(figsize=(8, 6))
im = ax.imshow(corr_matrix.values, cmap='RdBu_r', vmin=-1, vmax=1)
ax.set_xticks(range(len(corr_matrix)))
ax.set_xticklabels(corr_matrix.columns, rotation=45)
ax.set_yticks(range(len(corr_matrix)))
ax.set_yticklabels(corr_matrix.columns)
for i in range(len(corr_matrix)):
for j in range(len(corr_matrix)):
ax.text(j, i, f'{corr_matrix.iloc[i,j]:.2f}', ha='center', va='center', fontsize=11)
plt.colorbar(im, ax=ax)
ax.set_title('收益率相关性矩阵', fontsize=14)
plt.tight_layout()
plt.show()
def detect_outliers_zscore(series, threshold=3.0):
"""Z-score (标准分数) 方法检测异常值
Z = (x - mean) / std
即:这个值偏离平均值多少个标准差?
|Z| > threshold (默认3) 视为异常值
通俗理解如果大家平均考70分标准差10分你考了100分
Z = (100-70)/10 = 3说明你偏离平均值3个标准差属于异常值。
缺点:均值和标准差本身受异常值影响
"""
z_scores = (series - series.mean()) / series.std()
outlier_mask = z_scores.abs() > threshold
return outlier_mask, z_scores
def detect_outliers_mad(series, threshold=3.0):
"""MAD (中位数绝对偏差, Median Absolute Deviation) 方法检测异常值
MAD = median(|x - median(x)|) 即所有值与中位数的距离的中位数
Modified Z = 0.6745 * (x - median) / MAD
优点用中位数替代均值比Z-score更稳健不受极端值影响
通俗理解Z-score用"平均值"做参考,容易被极端值带偏;
MAD用"中位数"做参考,即使有极端值也不受影响。
"""
median = series.median()
mad = (series - median).abs().median()
if mad == 0:
mad = 1e-10 # 避免除零
modified_z = 0.6745 * (series - median) / mad
outlier_mask = modified_z.abs() > threshold
return outlier_mask, modified_z
def winsorize(series, lower_pct=0.01, upper_pct=0.99):
"""Winsorize (缩尾处理): 将超出分位数的值截断到分位数边界
不同于删除异常值Winsorize保留了所有数据点只是限制了极端值。
比如设 lower_pct=0.01, upper_pct=0.99:
- 低于1%分位数的值 -> 拉回到1%分位数
- 高于99%分位数的值 -> 拉回到99%分位数
"""
lower = series.quantile(lower_pct)
upper = series.quantile(upper_pct)
return series.clip(lower=lower, upper=upper)
# 使用Stock_A的收益率进行演示
ret_a = returns_panel['Stock_A'].copy()
# 人为注入一些异常值以演示效果
np.random.seed(123)
outlier_indices = np.random.choice(ret_a.index, size=5, replace=False)
ret_a_with_outliers = ret_a.copy()
for idx in outlier_indices:
ret_a_with_outliers[idx] = np.random.choice([-1, 1]) * np.random.uniform(0.15, 0.30)
# 检测异常值
zscore_mask, z_scores = detect_outliers_zscore(ret_a_with_outliers, threshold=3.0)
mad_mask, mad_scores = detect_outliers_mad(ret_a_with_outliers, threshold=3.0)
print("异常值检测结果:")
print(f"Z-score方法 (|Z|>3): 检测到 {zscore_mask.sum()} 个异常值")
print(f"MAD方法 (|Modified Z|>3): 检测到 {mad_mask.sum()} 个异常值")
# Winsorize处理
ret_winsorized = winsorize(ret_a_with_outliers, lower_pct=0.01, upper_pct=0.99)
print(f"\nWinsorize前 - 范围: [{ret_a_with_outliers.min():.4f}, {ret_a_with_outliers.max():.4f}]")
print(f"Winsorize后 - 范围: [{ret_winsorized.min():.4f}, {ret_winsorized.max():.4f}]")
# 可视化异常值检测
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
# 1. 收益率时序与异常值标记
ax = axes[0, 0]
ax.plot(ret_a_with_outliers, color='blue', linewidth=0.8, alpha=0.7)
outliers = ret_a_with_outliers[zscore_mask]
ax.scatter(outliers.index, outliers.values, color='red', s=40, zorder=5, label=f'异常值 ({len(outliers)}个)')
ax.set_title('Z-score异常值检测 (|Z|>3)', fontsize=13)
ax.legend()
ax.grid(alpha=0.3)
# 2. MAD方法
ax = axes[0, 1]
ax.plot(ret_a_with_outliers, color='blue', linewidth=0.8, alpha=0.7)
outliers_mad = ret_a_with_outliers[mad_mask]
ax.scatter(outliers_mad.index, outliers_mad.values, color='orange', s=40, zorder=5, label=f'异常值 ({len(outliers_mad)}个)')
ax.set_title('MAD异常值检测 (|Modified Z|>3)', fontsize=13)
ax.legend()
ax.grid(alpha=0.3)
# 3. Winsorize前后分布对比
ax = axes[1, 0]
ax.hist(ret_a_with_outliers, bins=60, alpha=0.5, label='原始', color='blue', density=True)
ax.hist(ret_winsorized, bins=60, alpha=0.5, label='Winsorize后', color='green', density=True)
ax.set_title('Winsorize前后收益率分布', fontsize=13)
ax.legend()
ax.grid(alpha=0.3)
# 4. Q-Q图
ax = axes[1, 1]
sorted_data = np.sort(ret_a_with_outliers.dropna())
n = len(sorted_data)
theoretical = stats.norm.ppf(np.arange(1, n + 1) / (n + 1))
ax.scatter(theoretical, sorted_data, alpha=0.5, s=10)
ax.plot([theoretical.min(), theoretical.max()],
[theoretical.min() * ret_a_with_outliers.std() + ret_a_with_outliers.mean(),
theoretical.max() * ret_a_with_outliers.std() + ret_a_with_outliers.mean()],
'r--', linewidth=1, label='正态分布参考线')
ax.set_xlabel('理论分位数')
ax.set_ylabel('样本分位数')
ax.set_title('Q-Q图 (检验正态性) ', fontsize=13)
ax.legend()
ax.grid(alpha=0.3)
plt.tight_layout()
plt.show()
print("\n方法选择建议:")
print("- Z-score: 简单快速,但受极端值影响 (不够稳健) ")
print("- MAD: 更稳健,推荐用于金融数据")
print("- Winsorize: 保留数据量,适合构建因子时使用")
print("- 实际中常用: Winsorize(1%, 99%) + MAD检测")
def create_trading_calendars():
"""创建不同市场的交易日历示例
不同市场有不同的假日和交易时间,跨市场策略必须正确处理。
"""
# 生成2024年的工作日
all_bdays = pd.bdate_range(start='2024-01-01', end='2024-12-31')
# 美国市场假日 (简化版)
us_holidays = pd.to_datetime([
'2024-01-01', # 新年
'2024-01-15', # MLK Day
'2024-02-19', # Presidents Day
'2024-03-29', # Good Friday
'2024-05-27', # Memorial Day
'2024-06-19', # Juneteenth
'2024-07-04', # Independence Day
'2024-09-02', # Labor Day
'2024-11-28', # Thanksgiving
'2024-12-25', # Christmas
])
# 中国A股假日 (简化版)
cn_holidays = pd.to_datetime([
'2024-01-01', # 元旦
'2024-02-09', '2024-02-12', '2024-02-13', '2024-02-14',
'2024-02-15', '2024-02-16', # 春节
'2024-04-04', '2024-04-05', # 清明
'2024-05-01', '2024-05-02', '2024-05-03', # 劳动节
'2024-06-10', # 端午
'2024-09-16', '2024-09-17', # 中秋
'2024-10-01', '2024-10-02', '2024-10-03', '2024-10-04',
'2024-10-07', # 国庆
])
us_trading_days = all_bdays[~all_bdays.isin(us_holidays)]
cn_trading_days = all_bdays[~all_bdays.isin(cn_holidays)]
return us_trading_days, cn_trading_days
us_calendar, cn_calendar = create_trading_calendars()
print(f"2024年美股交易日: {len(us_calendar)}")
print(f"2024年A股交易日: {len(cn_calendar)}")
print(f"共同交易日: {len(us_calendar.intersection(cn_calendar))}")
print(f"仅美股交易日: {len(us_calendar.difference(cn_calendar))}")
print(f"仅A股交易日: {len(cn_calendar.difference(us_calendar))}")
# 模拟跨市场数据对齐
np.random.seed(42)
# 生成美股和A股数据
us_prices = pd.Series(
100 * np.exp(np.cumsum(np.random.randn(len(us_calendar)) * 0.01)),
index=us_calendar, name='US_ETF'
)
cn_prices = pd.Series(
50 * np.exp(np.cumsum(np.random.randn(len(cn_calendar)) * 0.015)),
index=cn_calendar, name='CN_ETF'
)
# 直接合并会有大量NaN
combined_raw = pd.DataFrame({'US_ETF': us_prices, 'CN_ETF': cn_prices})
print(f"直接合并后:")
print(f" 总行数: {len(combined_raw)}")
print(f" US_ETF缺失: {combined_raw['US_ETF'].isna().sum()}")
print(f" CN_ETF缺失: {combined_raw['CN_ETF'].isna().sum()}")
# 方法1: 取交集 (只保留双方都交易的日期)
common_dates = us_calendar.intersection(cn_calendar)
combined_inner = combined_raw.loc[common_dates].dropna()
# 方法2: 向前填充 (用最近的收盘价填充)
combined_ffill = combined_raw.ffill().dropna()
print(f"\n方法1 (取交集): {len(combined_inner)}")
print(f"方法2 (ffill): {len(combined_ffill)}")
# 可视化
fig, axes = plt.subplots(1, 2, figsize=(14, 5))
ax = axes[0]
ax.plot(combined_inner.index, combined_inner['US_ETF'] / combined_inner['US_ETF'].iloc[0],
label='US_ETF', linewidth=1.2)
ax.plot(combined_inner.index, combined_inner['CN_ETF'] / combined_inner['CN_ETF'].iloc[0],
label='CN_ETF', linewidth=1.2)
ax.set_title('方法1: 取交集日期', fontsize=13)
ax.legend()
ax.grid(alpha=0.3)
ax = axes[1]
ax.plot(combined_ffill.index, combined_ffill['US_ETF'] / combined_ffill['US_ETF'].iloc[0],
label='US_ETF', linewidth=1.2)
ax.plot(combined_ffill.index, combined_ffill['CN_ETF'] / combined_ffill['CN_ETF'].iloc[0],
label='CN_ETF', linewidth=1.2)
ax.set_title('方法2: 向前填充', fontsize=13)
ax.legend()
ax.grid(alpha=0.3)
plt.suptitle('跨市场数据对齐方法对比', fontsize=14, y=1.02)
plt.tight_layout()
plt.show()
print("\n实践建议:")
print("- 计算相关性、回归: 使用交集日期,确保数据同步")
print("- 构建组合/计算市值: 使用ffill避免数据断裂")
print("- 重要: ffill会引入假的'零收益日',计算波动率时需注意")
class DataPipeline:
"""简单的数据清洗管道
实现了从原始数据到干净数据面板的完整流程:
1. 缺失值处理
2. 异常值处理
3. 收益率计算
4. 数据质量报告
"""
def __init__(self, prices, max_missing_pct=0.1, winsorize_pct=(0.01, 0.99)):
self.raw_prices = prices.copy()
self.max_missing_pct = max_missing_pct
self.winsorize_pct = winsorize_pct
self.report = {}
def run(self):
"""运行完整的数据清洗管道"""
print("=" * 60)
print("数据清洗管道运行中...")
print("=" * 60)
# Step 1: 过滤缺失值过多的标的
missing_pct = self.raw_prices.isna().mean()
valid_cols = missing_pct[missing_pct <= self.max_missing_pct].index.tolist()
dropped_cols = missing_pct[missing_pct > self.max_missing_pct].index.tolist()
print(f"\n[Step 1] 过滤缺失值 > {self.max_missing_pct:.0%} 的标的")
print(f" 保留: {len(valid_cols)}个标的, 删除: {len(dropped_cols)}个标的")
if dropped_cols:
print(f" 被删除的标的: {dropped_cols}")
prices = self.raw_prices[valid_cols].copy()
# Step 2: 填充缺失值
print(f"\n[Step 2] 填充缺失值 (ffill + bfill)")
n_missing_before = prices.isna().sum().sum()
prices = prices.ffill().bfill()
n_missing_after = prices.isna().sum().sum()
print(f" 填充前: {n_missing_before}个NaN -> 填充后: {n_missing_after}个NaN")
# Step 3: 计算收益率
print(f"\n[Step 3] 计算收益率")
returns = prices.pct_change().iloc[1:] # 删除第一行NaN
print(f" 收益率面板: {returns.shape[0]}天 x {returns.shape[1]}标的")
# Step 4: 检测和处理异常值
print(f"\n[Step 4] 异常值检测与Winsorize")
n_outliers_total = 0
returns_clean = returns.copy()
for col in returns_clean.columns:
mask, _ = detect_outliers_mad(returns_clean[col], threshold=5.0)
n_outliers = mask.sum()
n_outliers_total += n_outliers
# Winsorize
returns_clean[col] = winsorize(returns_clean[col],
self.winsorize_pct[0], self.winsorize_pct[1])
print(f" 检测到 {n_outliers_total} 个异常值 (MAD, |Z|>5)")
print(f" 已进行Winsorize处理 ({self.winsorize_pct[0]:.0%}, {self.winsorize_pct[1]:.0%})")
# Step 5: 数据质量报告
print(f"\n[Step 5] 数据质量报告")
quality = pd.DataFrame({
'标的': valid_cols,
'日均收益率(bp)': (returns_clean.mean() * 10000).round(2),
'日波动率(%)': (returns_clean.std() * 100).round(3),
'年化收益率(%)': (returns_clean.mean() * 252 * 100).round(2),
'年化波动率(%)': (returns_clean.std() * np.sqrt(252) * 100).round(2),
'偏度': returns_clean.skew().round(3),
'峰度': returns_clean.kurtosis().round(3),
}).set_index('标的')
print(quality)
self.clean_prices = prices
self.clean_returns = returns_clean
self.quality_report = quality
print(f"\n管道运行完成!")
return returns_clean
# 运行管道
pipeline = DataPipeline(price_panel, max_missing_pct=0.15)
clean_returns = pipeline.run()