@ -1,3 +1,16 @@
# =============================================================================
# Quantitative Trading — Data Pipeline Demo
# =============================================================================
# Topics covered:
# 1. Price Adjustment for Corporate Actions (splits & dividends)
# 2. Simple Returns vs Log Returns
# 3. Multi-stock Panel & Missing Value Handling
# 4. Outlier Detection & Treatment (Z-score, MAD, Winsorize)
# 4b. Circuit Breakers & Price Limit Flags (涨跌停) [A-shares]
# 5. Trading Calendar & Cross-market Alignment
# 6. End-to-End DataPipeline Class
# =============================================================================
import numpy as np
import pandas as pd
import matplotlib . pyplot as plt
@ -14,6 +27,20 @@ plt.rcParams['figure.dpi'] = 100
np . random . seed ( 42 )
print ( " 环境准备完成! " )
# =============================================================================
# TOPIC 1: Price Adjustment for Corporate Actions (价格复权)
# -----------------------------------------------------------------------------
# Raw stock prices have artificial jumps caused by stock splits and cash
# dividends. These must be adjusted before computing returns or signals,
# otherwise your algorithm will see fake crashes/spikes.
#
# Key concepts:
# - Unadjusted price (未复权): raw market price, has visible jumps
# - Forward-adjusted (前复权): history scaled to today's price level
# - Backward-adjusted (后复权): history kept real, recent prices scaled up
# - Adjustment factor (复权因子): multiplier that accounts for all events
# =============================================================================
def generate_raw_stock_data ( n_days = 1000 , seed = 42 ) :
""" 生成包含拆股和分红事件的原始股票数据 """
np . random . seed ( seed )
@ -145,6 +172,23 @@ print(f"未复权收益率: {stock_data['unadj_close'].iloc[split_idx] / stock_d
print ( f " 前复权收益率: { stock_data [ ' fwd_adj_close ' ] . iloc [ split_idx ] / stock_data [ ' fwd_adj_close ' ] . iloc [ split_idx - 1 ] - 1 : .4f } " )
print ( f " 真实收益率: { stock_data [ ' close ' ] . iloc [ split_idx ] / stock_data [ ' close ' ] . iloc [ split_idx - 1 ] - 1 : .4f } " )
# =============================================================================
# TOPIC 2: Simple Returns vs Log Returns (简单收益率 vs 对数收益率)
# -----------------------------------------------------------------------------
# Two ways to measure how much a price changed:
# - Simple return: r = (P_t / P_{t-1}) - 1
# - Log return: r = ln(P_t / P_{t-1})
#
# Why log returns are preferred in quant research:
# - Time-additive: sum daily log returns to get total log return
# - Better statistical properties (closer to normal distribution)
# - Numerically stable for long compounding periods
#
# At daily frequency the difference is tiny (~1bp), but it grows at
# monthly/annual frequency — never mix the two in the same calculation.
# =============================================================================
# 使用前复权价格计算收益率
prices = stock_data [ ' fwd_adj_close ' ]
@ -211,6 +255,22 @@ plt.tight_layout()
plt . show ( )
# =============================================================================
# TOPIC 3: Multi-stock Panel & Missing Value Handling (多标的面板与缺失值处理)
# -----------------------------------------------------------------------------
# Real market data panels have two main sources of missing values:
# - Suspensions (停牌): a stock halts trading for days/weeks
# - Data feed gaps: vendor errors, holidays, late reporting
#
# Strategies (choice depends on context):
# - ffill (forward fill): use last known price — standard for suspensions
# - Linear interpolation: smooth gaps — OK for short random gaps
# - ffill with limit: ffill but cap at N days — conservative choice
# - Drop: remove rows/columns — loses cross-sectional data
#
# Rule of thumb: if missing% > 10%, consider excluding the stock entirely.
# =============================================================================
def generate_multi_stock_panel ( n_stocks = 5 , n_days = 500 , seed = 42 ) :
""" 生成多标的日线数据面板,包含停牌和缺失值 """
np . random . seed ( seed )
@ -325,12 +385,20 @@ print("- 计算收益率前: 先填充缺失值, 再计算")
# 构建干净的多标的数据面板
# 步骤: ffill填充 -> 删除仍然有NaN的行 -> 计算收益率
# Steps: ffill → drop stocks/rows still NaN → compute returns
#
# ⚠️ Look-Ahead Bias Warning: DO NOT use bfill() here.
# bfill() fills early NaN rows by looking at future prices, which means
# your "day 1" price would be sourced from a future observation.
# This is data leakage — it would inflate backtest performance.
# Safe rule: only ffill (past → present), then drop what remains NaN.
clean_prices = price_panel . ffill ( ) . bfill ( ) # 先前填充, 再后填充处理开头的NaN
clean_prices = price_panel . ffill ( ) # only look backward
clean_prices = clean_prices . dropna ( how = ' any ' ) # drop rows where any stock still NaN
print ( f " 清洗前缺失值: { price_panel . isna ( ) . sum ( ) . sum ( ) } " )
print ( f " 清洗后缺失值: { clean_prices . isna ( ) . sum ( ) . sum ( ) } " )
print ( f " (dropped { len ( price_panel ) - len ( clean_prices ) } leading rows to avoid look-ahead bias) " )
# 计算收益率面板
returns_panel = clean_prices . pct_change ( ) . dropna ( )
@ -363,6 +431,28 @@ ax.set_title('收益率相关性矩阵', fontsize=14)
plt . tight_layout ( )
plt . show ( )
# =============================================================================
# TOPIC 4: Outlier Detection & Treatment (异常值检测与处理)
# -----------------------------------------------------------------------------
# Financial return series contain extreme values from real events (crashes,
# halts, data errors). How you handle them affects every downstream signal.
#
# Detection methods:
# - Z-score: |z| = |(x - mean) / std| > 3 — simple, but mean/std are
# themselves pulled by outliers (not robust)
# - MAD: uses median instead of mean — robust to extreme values,
# preferred for financial data
#
# Treatment methods:
# - Remove: delete the outlier row — reduces sample size
# - Winsorize: clip to [p1, p99] boundary — keeps all data, limits impact
# standard practice before factor construction
#
# Q-Q plot: visualizes how far returns deviate from a normal distribution.
# Fat tails (leptokurtosis) are a universal property of financial returns.
# =============================================================================
def detect_outliers_zscore ( series , threshold = 3.0 ) :
""" Z-score (标准分数) 方法检测异常值
@ -491,6 +581,131 @@ print("- MAD: 更稳健,推荐用于金融数据")
print ( " - Winsorize: 保留数据量,适合构建因子时使用 " )
print ( " - 实际中常用: Winsorize(1 % , 99 % ) + MAD检测 " )
# =============================================================================
# TOPIC 4b: Circuit Breakers & Price Limit Flags (涨跌停标记)
# -----------------------------------------------------------------------------
# In Chinese A-shares, stocks have a ±10% daily price limit (±5% for ST stocks).
# When a stock hits the limit, it is NOT a data error — it is real market data.
# However, it signals a critical data quality issue for downstream use:
#
# - The price IS the true closing price
# - But the stock may have been UNTRADEABLE for most of the day (zero liquidity)
# - A return of exactly +10% or -10% means you likely CANNOT execute at that price
#
# What to do in data prep (we flag, not remove):
# - Add a boolean column: `is_limit_up`, `is_limit_down`
# - Downstream strategy code can then decide to skip, weight-down, or
# exclude limit-hit days from signal calculations
#
# ⚠️ Survivorship Bias Note (out of scope for data prep, but important to know):
# This demo only generates stocks that "survive" the full period. In reality,
# stocks get delisted (bankruptcy, M&A, regulatory removal). If you only
# include currently-alive stocks in your historical dataset, you are
# implicitly selecting winners — your backtest returns will be inflated.
# Fix: source historical data that includes ALL stocks, including delisted ones.
# This is a DATA SOURCING problem, handled before data ever enters this pipeline.
# =============================================================================
def flag_price_limits ( prices , limit_pct = 0.10 ) :
""" Flag days where a stock hit the daily price limit (涨跌停).
Returns a DataFrame of the same shape with values :
+ 1 = limit up ( 涨停 )
- 1 = limit down ( 跌停 )
0 = normal day
A small tolerance ( 0.5 % ) is used because real closing prices may be
rounded and not land exactly on the theoretical limit price .
"""
returns = prices . pct_change ( )
tolerance = 0.005 # 0.5% buffer for rounding
limit_up = ( returns > = limit_pct - tolerance ) . astype ( int )
limit_down = ( returns < = - limit_pct + tolerance ) . astype ( int ) * - 1
flags = limit_up + limit_down
return flags
# Inject realistic limit-hit events into the panel for demonstration
np . random . seed ( 99 )
demo_prices = clean_prices . copy ( )
for col in demo_prices . columns :
# Randomly inject 3 limit-up and 3 limit-down days per stock
up_days = np . random . choice ( range ( 1 , len ( demo_prices ) - 1 ) , size = 3 , replace = False )
down_days = np . random . choice ( range ( 1 , len ( demo_prices ) - 1 ) , size = 3 , replace = False )
for d in up_days :
demo_prices . iloc [ d ] [ col ] = demo_prices . iloc [ d - 1 ] [ col ] * 1.10 # exactly +10%
for d in down_days :
demo_prices . iloc [ d ] [ col ] = demo_prices . iloc [ d - 1 ] [ col ] * 0.90 # exactly -10%
limit_flags = flag_price_limits ( demo_prices , limit_pct = 0.10 )
# Summary
total_limit_up = ( limit_flags == 1 ) . sum ( ) . sum ( )
total_limit_down = ( limit_flags == - 1 ) . sum ( ) . sum ( )
print ( f " \n 涨跌停统计: " )
print ( f " 涨停天数 (limit up): { total_limit_up } " )
print ( f " 跌停天数 (limit down): { total_limit_down } " )
print ( f " \n 各标的涨停次数: " )
print ( ( limit_flags == 1 ) . sum ( ) )
print ( f " \n 各标的跌停次数: " )
print ( ( limit_flags == - 1 ) . sum ( ) )
# Visualize limit flags on Stock_A
fig , axes = plt . subplots ( 2 , 1 , figsize = ( 14 , 8 ) , sharex = True )
ax = axes [ 0 ]
ax . plot ( demo_prices [ ' Stock_A ' ] , color = ' steelblue ' , linewidth = 1 , label = ' Stock_A Price ' )
limit_up_dates = limit_flags . index [ limit_flags [ ' Stock_A ' ] == 1 ]
limit_down_dates = limit_flags . index [ limit_flags [ ' Stock_A ' ] == - 1 ]
ax . scatter ( limit_up_dates , demo_prices . loc [ limit_up_dates , ' Stock_A ' ] ,
color = ' red ' , s = 60 , zorder = 5 , label = ' 涨停 (limit up) ' , marker = ' ^ ' )
ax . scatter ( limit_down_dates , demo_prices . loc [ limit_down_dates , ' Stock_A ' ] ,
color = ' green ' , s = 60 , zorder = 5 , label = ' 跌停 (limit down) ' , marker = ' v ' )
ax . set_title ( ' 价格序列与涨跌停标记 (Stock_A) ' , fontsize = 13 )
ax . legend ( )
ax . grid ( alpha = 0.3 )
ax = axes [ 1 ]
ax . bar ( limit_flags . index , limit_flags [ ' Stock_A ' ] ,
color = limit_flags [ ' Stock_A ' ] . map ( { 1 : ' red ' , - 1 : ' green ' , 0 : ' lightgray ' } ) ,
width = 1 )
ax . set_title ( ' 涨跌停标志 (+1=涨停, -1=跌停, 0=正常) ' , fontsize = 13 )
ax . set_ylim ( - 1.5 , 1.5 )
ax . set_yticks ( [ - 1 , 0 , 1 ] )
ax . grid ( alpha = 0.3 )
plt . tight_layout ( )
plt . show ( )
print ( " \n 实践建议: " )
print ( " - 涨跌停日: 保留价格数据,但添加 is_limit_up / is_limit_down 标记列 " )
print ( " - 策略层: 遇到涨跌停标记时跳过信号,因无法成交 " )
print ( " - 统计分析: 计算波动率/相关性时可选择排除涨跌停日,避免失真 " )
# =============================================================================
# TOPIC 5: Trading Calendar & Cross-market Alignment (交易日历与跨市场对齐)
# -----------------------------------------------------------------------------
# Different markets trade on different days (national holidays, weekends).
# When combining data from multiple markets, you must decide how to align
# the time axis — the wrong choice distorts correlation calculations.
#
# Two alignment strategies:
# - Intersection (取交集): keep only dates both markets are open.
# Best for: correlation, regression, signal comparison.
# Drawback: loses data on single-market trading days.
#
# - Forward fill (向前填充): fill non-trading days with last known price.
# Best for: portfolio construction, continuous NAV calculation.
# Drawback: introduces artificial zero-return days — inflates Sharpe,
# understates volatility. Must be accounted for in analysis.
#
# Note: this demo uses simplified hardcoded holiday lists. Production systems
# should use exchange_calendars or pandas_market_calendars libraries.
# =============================================================================
def create_trading_calendars ( ) :
""" 创建不同市场的交易日历示例
@ -599,6 +814,22 @@ print("- 计算相关性、回归: 使用交集日期,确保数据同步")
print ( " - 构建组合/计算市值: 使用ffill, 避免数据断裂 " )
print ( " - 重要: ffill会引入假的 ' 零收益日 ' ,计算波动率时需注意 " )
# =============================================================================
# TOPIC 6: End-to-End DataPipeline Class (完整数据清洗管道)
# -----------------------------------------------------------------------------
# Combines all prior topics into a reusable, sequential pipeline:
# Step 1 — Filter stocks with too many missing values
# Step 2 — Fill remaining missing values (ffill only — no bfill/look-ahead)
# Step 3 — Compute return series
# Step 4 — Detect outliers (MAD) and apply Winsorize
# Step 5 — Generate a data quality report per stock
#
# This pattern (class-based pipeline with a .run() method) mirrors how
# production quant systems structure data ingestion — each step is auditable,
# configurable, and produces diagnostic output.
# =============================================================================
class DataPipeline :
""" 简单的数据清洗管道
@ -634,9 +865,13 @@ class DataPipeline:
prices = self . raw_prices [ valid_cols ] . copy ( )
# Step 2: 填充缺失值
print ( f " \n [Step 2] 填充缺失值 (ffill + bfill) " )
# ⚠️ Only ffill (backward-looking). bfill would use future prices to
# fill early NaNs — that is look-ahead bias / data leakage.
# Any rows still NaN after ffill are dropped (typically only the
# very first row if the series starts with a missing value).
print ( f " \n [Step 2] 填充缺失值 (ffill only, no bfill) " )
n_missing_before = prices . isna ( ) . sum ( ) . sum ( )
prices = prices . ffill ( ) . bfill ( )
prices = prices . ffill ( ) . dropna( how = ' any ' )
n_missing_after = prices . isna ( ) . sum ( ) . sum ( )
print ( f " 填充前: { n_missing_before } 个NaN -> 填充后: { n_missing_after } 个NaN " )