From 31f8695d56e83b2de2d348518d972a91b3346672 Mon Sep 17 00:00:00 2001 From: tigerenwork Date: Sat, 30 May 2026 00:48:10 +0800 Subject: [PATCH] feat: Enhance data pipeline with circuit breakers and price limit flags --- .gitignore | 37 +++++++++++ quant_data_pipeline_demo.py | 126 ++++++++++++++++++++++++++++++++++-- 2 files changed, 158 insertions(+), 5 deletions(-) create mode 100644 .gitignore diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..a128611 --- /dev/null +++ b/.gitignore @@ -0,0 +1,37 @@ +# Python bytecode and caches +__pycache__/ +*.py[cod] +*$py.class + +# Virtual environments +.venv/ +venv/ +env/ +ENV/ + +# Packaging artifacts +build/ +dist/ +*.egg-info/ +.eggs/ + +# Test and tooling caches +.pytest_cache/ +.mypy_cache/ +.ruff_cache/ +.coverage +.coverage.* +htmlcov/ + +# Jupyter and IPython +.ipynb_checkpoints/ + +# Environment and local settings +.env +.env.* +*.local + +# Editor and OS files +.vscode/ +.idea/ +.DS_Store diff --git a/quant_data_pipeline_demo.py b/quant_data_pipeline_demo.py index 1404b14..1b0b8c8 100644 --- a/quant_data_pipeline_demo.py +++ b/quant_data_pipeline_demo.py @@ -6,6 +6,7 @@ # 2. Simple Returns vs Log Returns # 3. Multi-stock Panel & Missing Value Handling # 4. Outlier Detection & Treatment (Z-score, MAD, Winsorize) +# 4b. Circuit Breakers & Price Limit Flags (涨跌停) [A-shares] # 5. Trading Calendar & Cross-market Alignment # 6. End-to-End DataPipeline Class # ============================================================================= @@ -384,12 +385,20 @@ print("- 计算收益率前: 先填充缺失值, 再计算") # 构建干净的多标的数据面板 -# 步骤: ffill填充 -> 删除仍然有NaN的行 -> 计算收益率 +# Steps: ffill → drop stocks/rows still NaN → compute returns +# +# ⚠️ Look-Ahead Bias Warning: DO NOT use bfill() here. +# bfill() fills early NaN rows by looking at future prices, which means +# your "day 1" price would be sourced from a future observation. +# This is data leakage — it would inflate backtest performance. +# Safe rule: only ffill (past → present), then drop what remains NaN. -clean_prices = price_panel.ffill().bfill() # 先前填充,再后填充处理开头的NaN +clean_prices = price_panel.ffill() # only look backward +clean_prices = clean_prices.dropna(how='any') # drop rows where any stock still NaN print(f"清洗前缺失值: {price_panel.isna().sum().sum()}") print(f"清洗后缺失值: {clean_prices.isna().sum().sum()}") +print(f"(dropped {len(price_panel) - len(clean_prices)} leading rows to avoid look-ahead bias)") # 计算收益率面板 returns_panel = clean_prices.pct_change().dropna() @@ -573,6 +582,109 @@ print("- Winsorize: 保留数据量,适合构建因子时使用") print("- 实际中常用: Winsorize(1%, 99%) + MAD检测") +# ============================================================================= +# TOPIC 4b: Circuit Breakers & Price Limit Flags (涨跌停标记) +# ----------------------------------------------------------------------------- +# In Chinese A-shares, stocks have a ±10% daily price limit (±5% for ST stocks). +# When a stock hits the limit, it is NOT a data error — it is real market data. +# However, it signals a critical data quality issue for downstream use: +# +# - The price IS the true closing price +# - But the stock may have been UNTRADEABLE for most of the day (zero liquidity) +# - A return of exactly +10% or -10% means you likely CANNOT execute at that price +# +# What to do in data prep (we flag, not remove): +# - Add a boolean column: `is_limit_up`, `is_limit_down` +# - Downstream strategy code can then decide to skip, weight-down, or +# exclude limit-hit days from signal calculations +# +# ⚠️ Survivorship Bias Note (out of scope for data prep, but important to know): +# This demo only generates stocks that "survive" the full period. In reality, +# stocks get delisted (bankruptcy, M&A, regulatory removal). If you only +# include currently-alive stocks in your historical dataset, you are +# implicitly selecting winners — your backtest returns will be inflated. +# Fix: source historical data that includes ALL stocks, including delisted ones. +# This is a DATA SOURCING problem, handled before data ever enters this pipeline. +# ============================================================================= + +def flag_price_limits(prices, limit_pct=0.10): + """Flag days where a stock hit the daily price limit (涨跌停). + + Returns a DataFrame of the same shape with values: + +1 = limit up (涨停) + -1 = limit down (跌停) + 0 = normal day + + A small tolerance (0.5%) is used because real closing prices may be + rounded and not land exactly on the theoretical limit price. + """ + returns = prices.pct_change() + tolerance = 0.005 # 0.5% buffer for rounding + + limit_up = (returns >= limit_pct - tolerance).astype(int) + limit_down = (returns <= -limit_pct + tolerance).astype(int) * -1 + flags = limit_up + limit_down + return flags + + +# Inject realistic limit-hit events into the panel for demonstration +np.random.seed(99) +demo_prices = clean_prices.copy() +for col in demo_prices.columns: + # Randomly inject 3 limit-up and 3 limit-down days per stock + up_days = np.random.choice(range(1, len(demo_prices) - 1), size=3, replace=False) + down_days = np.random.choice(range(1, len(demo_prices) - 1), size=3, replace=False) + for d in up_days: + demo_prices.iloc[d][col] = demo_prices.iloc[d - 1][col] * 1.10 # exactly +10% + for d in down_days: + demo_prices.iloc[d][col] = demo_prices.iloc[d - 1][col] * 0.90 # exactly -10% + +limit_flags = flag_price_limits(demo_prices, limit_pct=0.10) + +# Summary +total_limit_up = (limit_flags == 1).sum().sum() +total_limit_down = (limit_flags == -1).sum().sum() +print(f"\n涨跌停统计:") +print(f" 涨停天数 (limit up): {total_limit_up}") +print(f" 跌停天数 (limit down): {total_limit_down}") +print(f"\n各标的涨停次数:") +print((limit_flags == 1).sum()) +print(f"\n各标的跌停次数:") +print((limit_flags == -1).sum()) + +# Visualize limit flags on Stock_A +fig, axes = plt.subplots(2, 1, figsize=(14, 8), sharex=True) + +ax = axes[0] +ax.plot(demo_prices['Stock_A'], color='steelblue', linewidth=1, label='Stock_A Price') +limit_up_dates = limit_flags.index[limit_flags['Stock_A'] == 1] +limit_down_dates = limit_flags.index[limit_flags['Stock_A'] == -1] +ax.scatter(limit_up_dates, demo_prices.loc[limit_up_dates, 'Stock_A'], + color='red', s=60, zorder=5, label='涨停 (limit up)', marker='^') +ax.scatter(limit_down_dates, demo_prices.loc[limit_down_dates, 'Stock_A'], + color='green', s=60, zorder=5, label='跌停 (limit down)', marker='v') +ax.set_title('价格序列与涨跌停标记 (Stock_A)', fontsize=13) +ax.legend() +ax.grid(alpha=0.3) + +ax = axes[1] +ax.bar(limit_flags.index, limit_flags['Stock_A'], + color=limit_flags['Stock_A'].map({1: 'red', -1: 'green', 0: 'lightgray'}), + width=1) +ax.set_title('涨跌停标志 (+1=涨停, -1=跌停, 0=正常)', fontsize=13) +ax.set_ylim(-1.5, 1.5) +ax.set_yticks([-1, 0, 1]) +ax.grid(alpha=0.3) + +plt.tight_layout() +plt.show() + +print("\n实践建议:") +print("- 涨跌停日: 保留价格数据,但添加 is_limit_up / is_limit_down 标记列") +print("- 策略层: 遇到涨跌停标记时跳过信号,因无法成交") +print("- 统计分析: 计算波动率/相关性时可选择排除涨跌停日,避免失真") + + # ============================================================================= # TOPIC 5: Trading Calendar & Cross-market Alignment (交易日历与跨市场对齐) # ----------------------------------------------------------------------------- @@ -708,7 +820,7 @@ print("- 重要: ffill会引入假的'零收益日',计算波动率时需注 # ----------------------------------------------------------------------------- # Combines all prior topics into a reusable, sequential pipeline: # Step 1 — Filter stocks with too many missing values -# Step 2 — Fill remaining missing values (ffill + bfill) +# Step 2 — Fill remaining missing values (ffill only — no bfill/look-ahead) # Step 3 — Compute return series # Step 4 — Detect outliers (MAD) and apply Winsorize # Step 5 — Generate a data quality report per stock @@ -753,9 +865,13 @@ class DataPipeline: prices = self.raw_prices[valid_cols].copy() # Step 2: 填充缺失值 - print(f"\n[Step 2] 填充缺失值 (ffill + bfill)") + # ⚠️ Only ffill (backward-looking). bfill would use future prices to + # fill early NaNs — that is look-ahead bias / data leakage. + # Any rows still NaN after ffill are dropped (typically only the + # very first row if the series starts with a missing value). + print(f"\n[Step 2] 填充缺失值 (ffill only, no bfill)") n_missing_before = prices.isna().sum().sum() - prices = prices.ffill().bfill() + prices = prices.ffill().dropna(how='any') n_missing_after = prices.isna().sum().sum() print(f" 填充前: {n_missing_before}个NaN -> 填充后: {n_missing_after}个NaN")