Compare commits

...

2 Commits

Author SHA1 Message Date
tigerenwork 31f8695d56 feat: Enhance data pipeline with circuit breakers and price limit flags 2026-05-30 00:48:10 +08:00
tigerenwork 9023c8eb76 feat: Add comprehensive quantitative trading data pipeline demo
- Implemented a data pipeline for quantitative trading covering:
  - Price adjustment for corporate actions (splits & dividends)
  - Calculation of simple and log returns
  - Handling of multi-stock panels and missing values
  - Outlier detection and treatment methods (Z-score, MAD, Winsorize)
  - Trading calendar creation and cross-market alignment
  - End-to-end DataPipeline class for data cleaning and analysis
- Included visualizations for price adjustments, return comparisons, and missing value handling
- Added detailed comments and documentation in Chinese for clarity
2026-05-30 00:24:41 +08:00
2 changed files with 276 additions and 4 deletions

37
.gitignore vendored Normal file
View File

@ -0,0 +1,37 @@
# Python bytecode and caches
__pycache__/
*.py[cod]
*$py.class
# Virtual environments
.venv/
venv/
env/
ENV/
# Packaging artifacts
build/
dist/
*.egg-info/
.eggs/
# Test and tooling caches
.pytest_cache/
.mypy_cache/
.ruff_cache/
.coverage
.coverage.*
htmlcov/
# Jupyter and IPython
.ipynb_checkpoints/
# Environment and local settings
.env
.env.*
*.local
# Editor and OS files
.vscode/
.idea/
.DS_Store

View File

@ -1,3 +1,16 @@
# =============================================================================
# Quantitative Trading — Data Pipeline Demo
# =============================================================================
# Topics covered:
# 1. Price Adjustment for Corporate Actions (splits & dividends)
# 2. Simple Returns vs Log Returns
# 3. Multi-stock Panel & Missing Value Handling
# 4. Outlier Detection & Treatment (Z-score, MAD, Winsorize)
# 4b. Circuit Breakers & Price Limit Flags (涨跌停) [A-shares]
# 5. Trading Calendar & Cross-market Alignment
# 6. End-to-End DataPipeline Class
# =============================================================================
import numpy as np import numpy as np
import pandas as pd import pandas as pd
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
@ -14,6 +27,20 @@ plt.rcParams['figure.dpi'] = 100
np.random.seed(42) np.random.seed(42)
print("环境准备完成!") print("环境准备完成!")
# =============================================================================
# TOPIC 1: Price Adjustment for Corporate Actions (价格复权)
# -----------------------------------------------------------------------------
# Raw stock prices have artificial jumps caused by stock splits and cash
# dividends. These must be adjusted before computing returns or signals,
# otherwise your algorithm will see fake crashes/spikes.
#
# Key concepts:
# - Unadjusted price (未复权): raw market price, has visible jumps
# - Forward-adjusted (前复权): history scaled to today's price level
# - Backward-adjusted (后复权): history kept real, recent prices scaled up
# - Adjustment factor (复权因子): multiplier that accounts for all events
# =============================================================================
def generate_raw_stock_data(n_days=1000, seed=42): def generate_raw_stock_data(n_days=1000, seed=42):
"""生成包含拆股和分红事件的原始股票数据""" """生成包含拆股和分红事件的原始股票数据"""
np.random.seed(seed) np.random.seed(seed)
@ -145,6 +172,23 @@ print(f"未复权收益率: {stock_data['unadj_close'].iloc[split_idx] / stock_d
print(f"前复权收益率: {stock_data['fwd_adj_close'].iloc[split_idx] / stock_data['fwd_adj_close'].iloc[split_idx-1] - 1:.4f}") print(f"前复权收益率: {stock_data['fwd_adj_close'].iloc[split_idx] / stock_data['fwd_adj_close'].iloc[split_idx-1] - 1:.4f}")
print(f"真实收益率: {stock_data['close'].iloc[split_idx] / stock_data['close'].iloc[split_idx-1] - 1:.4f}") print(f"真实收益率: {stock_data['close'].iloc[split_idx] / stock_data['close'].iloc[split_idx-1] - 1:.4f}")
# =============================================================================
# TOPIC 2: Simple Returns vs Log Returns (简单收益率 vs 对数收益率)
# -----------------------------------------------------------------------------
# Two ways to measure how much a price changed:
# - Simple return: r = (P_t / P_{t-1}) - 1
# - Log return: r = ln(P_t / P_{t-1})
#
# Why log returns are preferred in quant research:
# - Time-additive: sum daily log returns to get total log return
# - Better statistical properties (closer to normal distribution)
# - Numerically stable for long compounding periods
#
# At daily frequency the difference is tiny (~1bp), but it grows at
# monthly/annual frequency — never mix the two in the same calculation.
# =============================================================================
# 使用前复权价格计算收益率 # 使用前复权价格计算收益率
prices = stock_data['fwd_adj_close'] prices = stock_data['fwd_adj_close']
@ -211,6 +255,22 @@ plt.tight_layout()
plt.show() plt.show()
# =============================================================================
# TOPIC 3: Multi-stock Panel & Missing Value Handling (多标的面板与缺失值处理)
# -----------------------------------------------------------------------------
# Real market data panels have two main sources of missing values:
# - Suspensions (停牌): a stock halts trading for days/weeks
# - Data feed gaps: vendor errors, holidays, late reporting
#
# Strategies (choice depends on context):
# - ffill (forward fill): use last known price — standard for suspensions
# - Linear interpolation: smooth gaps — OK for short random gaps
# - ffill with limit: ffill but cap at N days — conservative choice
# - Drop: remove rows/columns — loses cross-sectional data
#
# Rule of thumb: if missing% > 10%, consider excluding the stock entirely.
# =============================================================================
def generate_multi_stock_panel(n_stocks=5, n_days=500, seed=42): def generate_multi_stock_panel(n_stocks=5, n_days=500, seed=42):
"""生成多标的日线数据面板,包含停牌和缺失值""" """生成多标的日线数据面板,包含停牌和缺失值"""
np.random.seed(seed) np.random.seed(seed)
@ -325,12 +385,20 @@ print("- 计算收益率前: 先填充缺失值, 再计算")
# 构建干净的多标的数据面板 # 构建干净的多标的数据面板
# 步骤: ffill填充 -> 删除仍然有NaN的行 -> 计算收益率 # Steps: ffill → drop stocks/rows still NaN → compute returns
#
# ⚠️ Look-Ahead Bias Warning: DO NOT use bfill() here.
# bfill() fills early NaN rows by looking at future prices, which means
# your "day 1" price would be sourced from a future observation.
# This is data leakage — it would inflate backtest performance.
# Safe rule: only ffill (past → present), then drop what remains NaN.
clean_prices = price_panel.ffill().bfill() # 先前填充再后填充处理开头的NaN clean_prices = price_panel.ffill() # only look backward
clean_prices = clean_prices.dropna(how='any') # drop rows where any stock still NaN
print(f"清洗前缺失值: {price_panel.isna().sum().sum()}") print(f"清洗前缺失值: {price_panel.isna().sum().sum()}")
print(f"清洗后缺失值: {clean_prices.isna().sum().sum()}") print(f"清洗后缺失值: {clean_prices.isna().sum().sum()}")
print(f"(dropped {len(price_panel) - len(clean_prices)} leading rows to avoid look-ahead bias)")
# 计算收益率面板 # 计算收益率面板
returns_panel = clean_prices.pct_change().dropna() returns_panel = clean_prices.pct_change().dropna()
@ -363,6 +431,28 @@ ax.set_title('收益率相关性矩阵', fontsize=14)
plt.tight_layout() plt.tight_layout()
plt.show() plt.show()
# =============================================================================
# TOPIC 4: Outlier Detection & Treatment (异常值检测与处理)
# -----------------------------------------------------------------------------
# Financial return series contain extreme values from real events (crashes,
# halts, data errors). How you handle them affects every downstream signal.
#
# Detection methods:
# - Z-score: |z| = |(x - mean) / std| > 3 — simple, but mean/std are
# themselves pulled by outliers (not robust)
# - MAD: uses median instead of mean — robust to extreme values,
# preferred for financial data
#
# Treatment methods:
# - Remove: delete the outlier row — reduces sample size
# - Winsorize: clip to [p1, p99] boundary — keeps all data, limits impact
# standard practice before factor construction
#
# Q-Q plot: visualizes how far returns deviate from a normal distribution.
# Fat tails (leptokurtosis) are a universal property of financial returns.
# =============================================================================
def detect_outliers_zscore(series, threshold=3.0): def detect_outliers_zscore(series, threshold=3.0):
"""Z-score (标准分数) 方法检测异常值 """Z-score (标准分数) 方法检测异常值
@ -491,6 +581,131 @@ print("- MAD: 更稳健,推荐用于金融数据")
print("- Winsorize: 保留数据量,适合构建因子时使用") print("- Winsorize: 保留数据量,适合构建因子时使用")
print("- 实际中常用: Winsorize(1%, 99%) + MAD检测") print("- 实际中常用: Winsorize(1%, 99%) + MAD检测")
# =============================================================================
# TOPIC 4b: Circuit Breakers & Price Limit Flags (涨跌停标记)
# -----------------------------------------------------------------------------
# In Chinese A-shares, stocks have a ±10% daily price limit (±5% for ST stocks).
# When a stock hits the limit, it is NOT a data error — it is real market data.
# However, it signals a critical data quality issue for downstream use:
#
# - The price IS the true closing price
# - But the stock may have been UNTRADEABLE for most of the day (zero liquidity)
# - A return of exactly +10% or -10% means you likely CANNOT execute at that price
#
# What to do in data prep (we flag, not remove):
# - Add a boolean column: `is_limit_up`, `is_limit_down`
# - Downstream strategy code can then decide to skip, weight-down, or
# exclude limit-hit days from signal calculations
#
# ⚠️ Survivorship Bias Note (out of scope for data prep, but important to know):
# This demo only generates stocks that "survive" the full period. In reality,
# stocks get delisted (bankruptcy, M&A, regulatory removal). If you only
# include currently-alive stocks in your historical dataset, you are
# implicitly selecting winners — your backtest returns will be inflated.
# Fix: source historical data that includes ALL stocks, including delisted ones.
# This is a DATA SOURCING problem, handled before data ever enters this pipeline.
# =============================================================================
def flag_price_limits(prices, limit_pct=0.10):
"""Flag days where a stock hit the daily price limit (涨跌停).
Returns a DataFrame of the same shape with values:
+1 = limit up (涨停)
-1 = limit down (跌停)
0 = normal day
A small tolerance (0.5%) is used because real closing prices may be
rounded and not land exactly on the theoretical limit price.
"""
returns = prices.pct_change()
tolerance = 0.005 # 0.5% buffer for rounding
limit_up = (returns >= limit_pct - tolerance).astype(int)
limit_down = (returns <= -limit_pct + tolerance).astype(int) * -1
flags = limit_up + limit_down
return flags
# Inject realistic limit-hit events into the panel for demonstration
np.random.seed(99)
demo_prices = clean_prices.copy()
for col in demo_prices.columns:
# Randomly inject 3 limit-up and 3 limit-down days per stock
up_days = np.random.choice(range(1, len(demo_prices) - 1), size=3, replace=False)
down_days = np.random.choice(range(1, len(demo_prices) - 1), size=3, replace=False)
for d in up_days:
demo_prices.iloc[d][col] = demo_prices.iloc[d - 1][col] * 1.10 # exactly +10%
for d in down_days:
demo_prices.iloc[d][col] = demo_prices.iloc[d - 1][col] * 0.90 # exactly -10%
limit_flags = flag_price_limits(demo_prices, limit_pct=0.10)
# Summary
total_limit_up = (limit_flags == 1).sum().sum()
total_limit_down = (limit_flags == -1).sum().sum()
print(f"\n涨跌停统计:")
print(f" 涨停天数 (limit up): {total_limit_up}")
print(f" 跌停天数 (limit down): {total_limit_down}")
print(f"\n各标的涨停次数:")
print((limit_flags == 1).sum())
print(f"\n各标的跌停次数:")
print((limit_flags == -1).sum())
# Visualize limit flags on Stock_A
fig, axes = plt.subplots(2, 1, figsize=(14, 8), sharex=True)
ax = axes[0]
ax.plot(demo_prices['Stock_A'], color='steelblue', linewidth=1, label='Stock_A Price')
limit_up_dates = limit_flags.index[limit_flags['Stock_A'] == 1]
limit_down_dates = limit_flags.index[limit_flags['Stock_A'] == -1]
ax.scatter(limit_up_dates, demo_prices.loc[limit_up_dates, 'Stock_A'],
color='red', s=60, zorder=5, label='涨停 (limit up)', marker='^')
ax.scatter(limit_down_dates, demo_prices.loc[limit_down_dates, 'Stock_A'],
color='green', s=60, zorder=5, label='跌停 (limit down)', marker='v')
ax.set_title('价格序列与涨跌停标记 (Stock_A)', fontsize=13)
ax.legend()
ax.grid(alpha=0.3)
ax = axes[1]
ax.bar(limit_flags.index, limit_flags['Stock_A'],
color=limit_flags['Stock_A'].map({1: 'red', -1: 'green', 0: 'lightgray'}),
width=1)
ax.set_title('涨跌停标志 (+1=涨停, -1=跌停, 0=正常)', fontsize=13)
ax.set_ylim(-1.5, 1.5)
ax.set_yticks([-1, 0, 1])
ax.grid(alpha=0.3)
plt.tight_layout()
plt.show()
print("\n实践建议:")
print("- 涨跌停日: 保留价格数据,但添加 is_limit_up / is_limit_down 标记列")
print("- 策略层: 遇到涨跌停标记时跳过信号,因无法成交")
print("- 统计分析: 计算波动率/相关性时可选择排除涨跌停日,避免失真")
# =============================================================================
# TOPIC 5: Trading Calendar & Cross-market Alignment (交易日历与跨市场对齐)
# -----------------------------------------------------------------------------
# Different markets trade on different days (national holidays, weekends).
# When combining data from multiple markets, you must decide how to align
# the time axis — the wrong choice distorts correlation calculations.
#
# Two alignment strategies:
# - Intersection (取交集): keep only dates both markets are open.
# Best for: correlation, regression, signal comparison.
# Drawback: loses data on single-market trading days.
#
# - Forward fill (向前填充): fill non-trading days with last known price.
# Best for: portfolio construction, continuous NAV calculation.
# Drawback: introduces artificial zero-return days — inflates Sharpe,
# understates volatility. Must be accounted for in analysis.
#
# Note: this demo uses simplified hardcoded holiday lists. Production systems
# should use exchange_calendars or pandas_market_calendars libraries.
# =============================================================================
def create_trading_calendars(): def create_trading_calendars():
"""创建不同市场的交易日历示例 """创建不同市场的交易日历示例
@ -599,6 +814,22 @@ print("- 计算相关性、回归: 使用交集日期,确保数据同步")
print("- 构建组合/计算市值: 使用ffill避免数据断裂") print("- 构建组合/计算市值: 使用ffill避免数据断裂")
print("- 重要: ffill会引入假的'零收益日',计算波动率时需注意") print("- 重要: ffill会引入假的'零收益日',计算波动率时需注意")
# =============================================================================
# TOPIC 6: End-to-End DataPipeline Class (完整数据清洗管道)
# -----------------------------------------------------------------------------
# Combines all prior topics into a reusable, sequential pipeline:
# Step 1 — Filter stocks with too many missing values
# Step 2 — Fill remaining missing values (ffill only — no bfill/look-ahead)
# Step 3 — Compute return series
# Step 4 — Detect outliers (MAD) and apply Winsorize
# Step 5 — Generate a data quality report per stock
#
# This pattern (class-based pipeline with a .run() method) mirrors how
# production quant systems structure data ingestion — each step is auditable,
# configurable, and produces diagnostic output.
# =============================================================================
class DataPipeline: class DataPipeline:
"""简单的数据清洗管道 """简单的数据清洗管道
@ -634,9 +865,13 @@ class DataPipeline:
prices = self.raw_prices[valid_cols].copy() prices = self.raw_prices[valid_cols].copy()
# Step 2: 填充缺失值 # Step 2: 填充缺失值
print(f"\n[Step 2] 填充缺失值 (ffill + bfill)") # ⚠️ Only ffill (backward-looking). bfill would use future prices to
# fill early NaNs — that is look-ahead bias / data leakage.
# Any rows still NaN after ffill are dropped (typically only the
# very first row if the series starts with a missing value).
print(f"\n[Step 2] 填充缺失值 (ffill only, no bfill)")
n_missing_before = prices.isna().sum().sum() n_missing_before = prices.isna().sum().sum()
prices = prices.ffill().bfill() prices = prices.ffill().dropna(how='any')
n_missing_after = prices.isna().sum().sum() n_missing_after = prices.isna().sum().sum()
print(f" 填充前: {n_missing_before}个NaN -> 填充后: {n_missing_after}个NaN") print(f" 填充前: {n_missing_before}个NaN -> 填充后: {n_missing_after}个NaN")