feat: Enhance data pipeline with circuit breakers and price limit flags

This commit is contained in:
tigerenwork 2026-05-30 00:48:10 +08:00
parent 9023c8eb76
commit 31f8695d56
2 changed files with 158 additions and 5 deletions

37
.gitignore vendored Normal file
View File

@ -0,0 +1,37 @@
# Python bytecode and caches
__pycache__/
*.py[cod]
*$py.class
# Virtual environments
.venv/
venv/
env/
ENV/
# Packaging artifacts
build/
dist/
*.egg-info/
.eggs/
# Test and tooling caches
.pytest_cache/
.mypy_cache/
.ruff_cache/
.coverage
.coverage.*
htmlcov/
# Jupyter and IPython
.ipynb_checkpoints/
# Environment and local settings
.env
.env.*
*.local
# Editor and OS files
.vscode/
.idea/
.DS_Store

View File

@ -6,6 +6,7 @@
# 2. Simple Returns vs Log Returns
# 3. Multi-stock Panel & Missing Value Handling
# 4. Outlier Detection & Treatment (Z-score, MAD, Winsorize)
# 4b. Circuit Breakers & Price Limit Flags (涨跌停) [A-shares]
# 5. Trading Calendar & Cross-market Alignment
# 6. End-to-End DataPipeline Class
# =============================================================================
@ -384,12 +385,20 @@ print("- 计算收益率前: 先填充缺失值, 再计算")
# 构建干净的多标的数据面板
# 步骤: ffill填充 -> 删除仍然有NaN的行 -> 计算收益率
# Steps: ffill → drop stocks/rows still NaN → compute returns
#
# ⚠️ Look-Ahead Bias Warning: DO NOT use bfill() here.
# bfill() fills early NaN rows by looking at future prices, which means
# your "day 1" price would be sourced from a future observation.
# This is data leakage — it would inflate backtest performance.
# Safe rule: only ffill (past → present), then drop what remains NaN.
clean_prices = price_panel.ffill().bfill() # 先前填充再后填充处理开头的NaN
clean_prices = price_panel.ffill() # only look backward
clean_prices = clean_prices.dropna(how='any') # drop rows where any stock still NaN
print(f"清洗前缺失值: {price_panel.isna().sum().sum()}")
print(f"清洗后缺失值: {clean_prices.isna().sum().sum()}")
print(f"(dropped {len(price_panel) - len(clean_prices)} leading rows to avoid look-ahead bias)")
# 计算收益率面板
returns_panel = clean_prices.pct_change().dropna()
@ -573,6 +582,109 @@ print("- Winsorize: 保留数据量,适合构建因子时使用")
print("- 实际中常用: Winsorize(1%, 99%) + MAD检测")
# =============================================================================
# TOPIC 4b: Circuit Breakers & Price Limit Flags (涨跌停标记)
# -----------------------------------------------------------------------------
# In Chinese A-shares, stocks have a ±10% daily price limit (±5% for ST stocks).
# When a stock hits the limit, it is NOT a data error — it is real market data.
# However, it signals a critical data quality issue for downstream use:
#
# - The price IS the true closing price
# - But the stock may have been UNTRADEABLE for most of the day (zero liquidity)
# - A return of exactly +10% or -10% means you likely CANNOT execute at that price
#
# What to do in data prep (we flag, not remove):
# - Add a boolean column: `is_limit_up`, `is_limit_down`
# - Downstream strategy code can then decide to skip, weight-down, or
# exclude limit-hit days from signal calculations
#
# ⚠️ Survivorship Bias Note (out of scope for data prep, but important to know):
# This demo only generates stocks that "survive" the full period. In reality,
# stocks get delisted (bankruptcy, M&A, regulatory removal). If you only
# include currently-alive stocks in your historical dataset, you are
# implicitly selecting winners — your backtest returns will be inflated.
# Fix: source historical data that includes ALL stocks, including delisted ones.
# This is a DATA SOURCING problem, handled before data ever enters this pipeline.
# =============================================================================
def flag_price_limits(prices, limit_pct=0.10):
"""Flag days where a stock hit the daily price limit (涨跌停).
Returns a DataFrame of the same shape with values:
+1 = limit up (涨停)
-1 = limit down (跌停)
0 = normal day
A small tolerance (0.5%) is used because real closing prices may be
rounded and not land exactly on the theoretical limit price.
"""
returns = prices.pct_change()
tolerance = 0.005 # 0.5% buffer for rounding
limit_up = (returns >= limit_pct - tolerance).astype(int)
limit_down = (returns <= -limit_pct + tolerance).astype(int) * -1
flags = limit_up + limit_down
return flags
# Inject realistic limit-hit events into the panel for demonstration
np.random.seed(99)
demo_prices = clean_prices.copy()
for col in demo_prices.columns:
# Randomly inject 3 limit-up and 3 limit-down days per stock
up_days = np.random.choice(range(1, len(demo_prices) - 1), size=3, replace=False)
down_days = np.random.choice(range(1, len(demo_prices) - 1), size=3, replace=False)
for d in up_days:
demo_prices.iloc[d][col] = demo_prices.iloc[d - 1][col] * 1.10 # exactly +10%
for d in down_days:
demo_prices.iloc[d][col] = demo_prices.iloc[d - 1][col] * 0.90 # exactly -10%
limit_flags = flag_price_limits(demo_prices, limit_pct=0.10)
# Summary
total_limit_up = (limit_flags == 1).sum().sum()
total_limit_down = (limit_flags == -1).sum().sum()
print(f"\n涨跌停统计:")
print(f" 涨停天数 (limit up): {total_limit_up}")
print(f" 跌停天数 (limit down): {total_limit_down}")
print(f"\n各标的涨停次数:")
print((limit_flags == 1).sum())
print(f"\n各标的跌停次数:")
print((limit_flags == -1).sum())
# Visualize limit flags on Stock_A
fig, axes = plt.subplots(2, 1, figsize=(14, 8), sharex=True)
ax = axes[0]
ax.plot(demo_prices['Stock_A'], color='steelblue', linewidth=1, label='Stock_A Price')
limit_up_dates = limit_flags.index[limit_flags['Stock_A'] == 1]
limit_down_dates = limit_flags.index[limit_flags['Stock_A'] == -1]
ax.scatter(limit_up_dates, demo_prices.loc[limit_up_dates, 'Stock_A'],
color='red', s=60, zorder=5, label='涨停 (limit up)', marker='^')
ax.scatter(limit_down_dates, demo_prices.loc[limit_down_dates, 'Stock_A'],
color='green', s=60, zorder=5, label='跌停 (limit down)', marker='v')
ax.set_title('价格序列与涨跌停标记 (Stock_A)', fontsize=13)
ax.legend()
ax.grid(alpha=0.3)
ax = axes[1]
ax.bar(limit_flags.index, limit_flags['Stock_A'],
color=limit_flags['Stock_A'].map({1: 'red', -1: 'green', 0: 'lightgray'}),
width=1)
ax.set_title('涨跌停标志 (+1=涨停, -1=跌停, 0=正常)', fontsize=13)
ax.set_ylim(-1.5, 1.5)
ax.set_yticks([-1, 0, 1])
ax.grid(alpha=0.3)
plt.tight_layout()
plt.show()
print("\n实践建议:")
print("- 涨跌停日: 保留价格数据,但添加 is_limit_up / is_limit_down 标记列")
print("- 策略层: 遇到涨跌停标记时跳过信号,因无法成交")
print("- 统计分析: 计算波动率/相关性时可选择排除涨跌停日,避免失真")
# =============================================================================
# TOPIC 5: Trading Calendar & Cross-market Alignment (交易日历与跨市场对齐)
# -----------------------------------------------------------------------------
@ -708,7 +820,7 @@ print("- 重要: ffill会引入假的'零收益日',计算波动率时需注
# -----------------------------------------------------------------------------
# Combines all prior topics into a reusable, sequential pipeline:
# Step 1 — Filter stocks with too many missing values
# Step 2 — Fill remaining missing values (ffill + bfill)
# Step 2 — Fill remaining missing values (ffill only — no bfill/look-ahead)
# Step 3 — Compute return series
# Step 4 — Detect outliers (MAD) and apply Winsorize
# Step 5 — Generate a data quality report per stock
@ -753,9 +865,13 @@ class DataPipeline:
prices = self.raw_prices[valid_cols].copy()
# Step 2: 填充缺失值
print(f"\n[Step 2] 填充缺失值 (ffill + bfill)")
# ⚠️ Only ffill (backward-looking). bfill would use future prices to
# fill early NaNs — that is look-ahead bias / data leakage.
# Any rows still NaN after ffill are dropped (typically only the
# very first row if the series starts with a missing value).
print(f"\n[Step 2] 填充缺失值 (ffill only, no bfill)")
n_missing_before = prices.isna().sum().sum()
prices = prices.ffill().bfill()
prices = prices.ffill().dropna(how='any')
n_missing_after = prices.isna().sum().sum()
print(f" 填充前: {n_missing_before}个NaN -> 填充后: {n_missing_after}个NaN")