trading/quant_event_driven_backtest...

1532 lines
71 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# =============================================================================
# Quantitative Trading — Event-Driven Backtest Engine
# 量化交易 — 事件驱动回测引擎
# =============================================================================
#
# 本文件是策略开发演示 (quant_strategy_backtest_demo.py) 的续集。
# This file is the sequel to the strategy development demo.
#
# ─────────────────────────────────────────────────────────────────────────────
# Why Event-Driven? 为什么要用事件驱动?
# ─────────────────────────────────────────────────────────────────────────────
# The vectorized backtest in the previous demo computed everything at once
# using array operations. This is fast, but it has a fundamental problem:
# 上一个演示中的向量化回测一次性用数组运算计算所有结果。速度很快,但有根本缺陷:
#
# It cannot easily model:
# 难以建模:
# • Different order types 不同订单类型 (limit / stop / market orders)
# • Partial fills 部分成交
# • Intraday price path 日内价格路径 (open → high/low → close sequence)
# • Real-time risk checks 实时风控检查 (e.g. position limits, margin calls)
# • Multiple strategies 多策略组合
# • Portfolio-level rules 组合层面规则 (e.g. max sector exposure)
#
# The event-driven approach treats the backtest as a simulation of the actual
# live trading system. The same strategy & portfolio code can run in both
# backtest mode AND live trading mode — just swap the data source.
#
# 事件驱动方法将回测视为真实交易系统的模拟。
# 同一套策略和组合代码可以在回测模式和实盘模式下运行——只需替换数据源。
#
# ─────────────────────────────────────────────────────────────────────────────
# Architecture Overview 架构总览
# ─────────────────────────────────────────────────────────────────────────────
#
# ┌─────────────────────────────────┐
# │ Event Queue 事件队列 │
# │ (FIFO deque — central bus) │
# └───────────────┬─────────────────┘
# │ events flow through
# ┌────────────────────┼────────────────────┐
# ▼ ▼ ▼
# ┌────────────────┐ ┌────────────────┐ ┌────────────────┐
# │ DataHandler │ │ Strategy │ │ Portfolio │
# │ 数据处理器 │ │ 策略引擎 │ │ 组合管理器 │
# │ │ │ │ │ │
# │ Streams bars │ │ Reads market │ │ Receives order │
# │ (OHLCV) into │ │ data, computes │ │ requests from │
# │ the queue as │ │ signals, emits │ │ strategy and │
# │ MarketEvents │ │ SignalEvents │ │ emits │
# └────────────────┘ └────────────────┘ │ OrderEvents │
# └───────┬────────┘
# │
# ┌───────▼────────┐
# │ ExecutionHandler│
# │ 执行处理器 │
# │ (模拟券商) │
# │ │
# │ Fills orders, │
# │ applies slippage│
# │ & commission, │
# │ emits │
# │ FillEvents │
# └────────────────┘
#
# ─────────────────────────────────────────────────────────────────────────────
# Event flow for one trading day 单个交易日的事件流
# ─────────────────────────────────────────────────────────────────────────────
#
# [Clock ticks to new day / 时钟滴答到新的一天]
# │
# ▼
# DataHandler emits MarketEvent (新K线数据到达)
# │
# ▼
# Strategy processes MarketEvent → emits SignalEvent (信号:买/卖/平)
# │
# ▼
# Portfolio processes SignalEvent → emits OrderEvent (下单:市价/限价/止损)
# │
# ▼
# ExecutionHandler processes OrderEvent → emits FillEvent (成交确认)
# │
# ▼
# Portfolio processes FillEvent → updates positions & P&L (更新持仓和盈亏)
# │
# ▼
# [repeat for next day / 重复下一天]
#
# ─────────────────────────────────────────────────────────────────────────────
# Topics covered / 涵盖主题:
# 1. Event Classes 事件类 (Market, Signal, Order, Fill)
# 2. DataHandler 数据处理器
# 3. Strategy 策略 (MA Crossover & RSI)
# 4. Portfolio 组合管理器 (cash, positions, P&L)
# 5. ExecutionHandler 执行处理器 (slippage models, order types)
# 6. BacktestEngine 回测引擎 (main event loop)
# 7. Performance Analytics 绩效分析
# 8. Comparison: Event-Driven vs Vectorized
# =============================================================================
from __future__ import annotations
import queue # Python built-in FIFO queue
import enum # for clean event type constants
from dataclasses import dataclass, field
from abc import ABC, abstractmethod
from typing import Dict, List, Optional, Tuple
from copy import deepcopy
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
import warnings
warnings.filterwarnings('ignore')
# ── Chinese font / 中文字体 ──────────────────────────────────────────────────
plt.rcParams['font.sans-serif'] = ['WenQuanYi Zen Hei', 'Arial Unicode MS',
'SimHei', 'DejaVu Sans']
plt.rcParams['axes.unicode_minus'] = False
np.random.seed(42)
print("=" * 72)
print(" 量化交易 — 事件驱动回测引擎")
print(" Quantitative Trading — Event-Driven Backtest Engine")
print("=" * 72)
# =============================================================================
# SECTION 1: Event Classes 事件类
# =============================================================================
#
# Every piece of information that flows through the system is wrapped in an
# Event object. This decouples the components — a Strategy doesn't call the
# Portfolio directly; it just puts a SignalEvent on the queue.
#
# 系统中流通的每一条信息都被包装成一个 Event 对象。
# 这使各组件解耦——策略不直接调用组合,只是把 SignalEvent 放入队列。
#
# Four event types:
# 四种事件类型:
#
# MarketEvent 市场事件 — new OHLCV bar has arrived from the data feed
# SignalEvent 信号事件 — strategy has decided to buy or sell
# OrderEvent 订单事件 — portfolio has sized the trade and placed an order
# FillEvent 成交事件 — broker has confirmed the order was executed
# =============================================================================
class EventType(enum.Enum):
"""
Enumeration of all event types in the system.
系统中所有事件类型的枚举。
"""
MARKET = "MARKET" # 市场事件 — new price bar
SIGNAL = "SIGNAL" # 信号事件 — trade signal from strategy
ORDER = "ORDER" # 订单事件 — order from portfolio
FILL = "FILL" # 成交事件 — filled order from broker
class Direction(enum.Enum):
"""
Trade direction. 交易方向。
"""
LONG = "LONG" # 做多 / buy
SHORT = "SHORT" # 做空 / sell short
EXIT = "EXIT" # 平仓 / close position
class OrderType(enum.Enum):
"""
Order types. 订单类型。
MARKET 市价单 — execute immediately at the best available price
立即以市场最优价格成交,确保成交但价格不确定
LIMIT 限价单 — execute only at a specified price or better
只在指定价格或更优价格成交,价格确定但不保证成交
STOP 止损单 — becomes a market order when price hits the stop level
价格触及止损价时转变为市价单,用于控制最大亏损
"""
MARKET = "MARKET"
LIMIT = "LIMIT"
STOP = "STOP"
@dataclass
class MarketEvent:
"""
Triggered when the DataHandler has a new bar of OHLCV data ready.
当数据处理器有新的 OHLCV K线数据就绪时触发。
This is the "heartbeat" of the system — it drives all other components.
这是系统的"心跳"——驱动所有其他组件。
Fields / 字段:
symbol 股票代码 — which ticker this bar belongs to
date 日期 — the date of this bar
open 开盘价 — first trade of the session
high 最高价 — highest trade during the session
low 最低价 — lowest trade during the session
close 收盘价 — last trade of the session
volume 成交量 — total shares traded
"""
type : EventType = field(default=EventType.MARKET, init=False)
symbol : str = ""
date : pd.Timestamp = None
open : float = 0.0
high : float = 0.0
low : float = 0.0
close : float = 0.0
volume : float = 0.0
def __repr__(self):
return (f"MarketEvent({self.symbol} {self.date.date()} "
f"O={self.open:.2f} H={self.high:.2f} "
f"L={self.low:.2f} C={self.close:.2f})")
@dataclass
class SignalEvent:
"""
Generated by a Strategy when it detects a trade opportunity.
策略检测到交易机会时生成。
A SignalEvent is a "suggestion" — the Portfolio decides whether and
how much to actually trade based on its own risk rules.
信号事件是一个"建议"——组合管理器根据自身风控规则决定是否以及交易多少。
Fields / 字段:
symbol 股票代码
date 信号产生日期
direction 方向: LONG(做多) / SHORT(做空) / EXIT(平仓)
strength 信号强度 [0, 1] — used for position sizing (仓位管理)
1.0 = full conviction, 0.5 = half conviction
strategy 策略名称 — useful when running multiple strategies
"""
type : EventType = field(default=EventType.SIGNAL, init=False)
symbol : str = ""
date : pd.Timestamp = None
direction : Direction = Direction.LONG
strength : float = 1.0 # 信号强度 / signal strength for position sizing
strategy : str = ""
def __repr__(self):
return (f"SignalEvent({self.symbol} {self.date.date()} "
f"{self.direction.value} strength={self.strength:.2f})")
@dataclass
class OrderEvent:
"""
Sent from Portfolio to ExecutionHandler to place a trade.
从组合管理器发送给执行处理器以下单。
This is a concrete order: it specifies WHAT to trade and HOW MUCH.
这是一个具体订单:指定交易什么以及交易多少。
Fields / 字段:
symbol 股票代码
date 下单日期
order_type 订单类型 MARKET / LIMIT / STOP
direction 方向 LONG / SHORT / EXIT
quantity 数量 number of shares (股数)
limit_price 限价 for LIMIT orders (限价单价格)
stop_price 止损价 for STOP orders (止损单价格)
"""
type : EventType = field(default=EventType.ORDER, init=False)
symbol : str = ""
date : pd.Timestamp = None
order_type : OrderType = OrderType.MARKET
direction : Direction = Direction.LONG
quantity : int = 0
limit_price : float = 0.0
stop_price : float = 0.0
def __repr__(self):
return (f"OrderEvent({self.symbol} {self.date.date()} "
f"{self.order_type.value} {self.direction.value} "
f"qty={self.quantity})")
@dataclass
class FillEvent:
"""
Confirms that an order has been executed by the broker.
确认订单已被券商执行(成交回报)。
A Fill is the ground truth — it records what ACTUALLY happened.
成交回报是最终事实——记录实际发生了什么。
Fields / 字段:
symbol 股票代码
date 成交日期
direction 成交方向 LONG / SHORT / EXIT
quantity 成交数量 shares actually filled
fill_price 成交价格 actual execution price (含滑点 including slippage)
commission 佣金 brokerage commission
slippage 滑点 price impact of the trade
"""
type : EventType = field(default=EventType.FILL, init=False)
symbol : str = ""
date : pd.Timestamp = None
direction : Direction = Direction.LONG
quantity : int = 0
fill_price : float = 0.0
commission : float = 0.0
slippage : float = 0.0
@property
def total_cost(self) -> float:
"""
Total cash impact of this fill.
本次成交的总现金影响。
For a BUY (LONG): negative (cash decreases) 现金减少
For a SELL (EXIT): positive (cash increases) 现金增加
"""
sign = -1 if self.direction in (Direction.LONG, Direction.SHORT) else +1
return sign * (self.fill_price * self.quantity + self.commission)
def __repr__(self):
return (f"FillEvent({self.symbol} {self.date.date()} "
f"{self.direction.value} qty={self.quantity} "
f"@{self.fill_price:.2f} comm={self.commission:.2f})")
# =============================================================================
# SECTION 2: DataHandler 数据处理器
# =============================================================================
#
# The DataHandler is responsible for feeding price data into the system
# one bar at a time. It simulates the reality of not knowing the future.
#
# 数据处理器负责逐根K线地向系统输入价格数据。
# 它模拟了"无法预知未来"的现实。
#
# The key design rule:
# 核心设计规则:
#
# ❌ WRONG: strategy sees bar[t] and trades on bar[t] (lookahead bias!)
# 策略看到 bar[t] 并在 bar[t] 交易(前视偏差!)
#
# ✅ RIGHT: strategy sees bar[t], generates signal based on bar[t],
# order fills at bar[t+1] open (next day open)
# 策略看到 bar[t],生成信号,订单在 bar[t+1] 的开盘价成交
#
# This is why DataHandler, Strategy, Portfolio, and ExecutionHandler are
# separate — the separation naturally enforces the time boundary.
# 这就是为什么这四个组件分开——分离自然地强制了时间边界。
# =============================================================================
class DataHandler:
"""
Streams historical OHLCV data bar by bar into the event queue.
将历史 OHLCV 数据逐根K线地流入事件队列。
In live trading, this would connect to a real-time market data API.
在实盘交易中,这里会连接实时行情 API。
"""
def __init__(self, data: pd.DataFrame, symbol: str, event_queue: queue.Queue):
"""
Parameters / 参数:
data — DataFrame with DatetimeIndex and columns:
带 DatetimeIndex 的 DataFrame列名为:
open, high, low, close, volume
symbol — ticker symbol / 股票代码
event_queue — the shared event queue / 共享事件队列
"""
self.data = data.copy()
self.symbol = symbol
self.event_queue = event_queue
self._dates = list(data.index) # all trading dates
self._cursor = 0 # current position in history
self._history = [] # bars seen so far (已观测的K线)
@property
def current_date(self) -> Optional[pd.Timestamp]:
"""The date of the most recently published bar. 最新已发布K线的日期。"""
if self._cursor == 0:
return None
return self._dates[self._cursor - 1]
@property
def n_bars_seen(self) -> int:
"""How many bars have been streamed so far. 到目前为止已流出多少根K线。"""
return self._cursor
def has_more_data(self) -> bool:
"""Returns True if there is at least one more bar to stream."""
return self._cursor < len(self._dates)
def stream_next(self) -> bool:
"""
Emit the next bar as a MarketEvent onto the queue.
将下一根K线作为 MarketEvent 发送到队列。
Returns True if a bar was emitted, False if data is exhausted.
如果发出了K线则返回 True如果数据已耗尽则返回 False。
"""
if not self.has_more_data():
return False
date = self._dates[self._cursor]
row = self.data.loc[date]
# Build the MarketEvent for this bar / 构建本根K线的 MarketEvent
bar = MarketEvent(
symbol = self.symbol,
date = date,
open = float(row.get("open", row.get("close", 0))),
high = float(row.get("high", row.get("close", 0))),
low = float(row.get("low", row.get("close", 0))),
close = float(row["close"]),
volume = float(row.get("volume", 0)),
)
# Add to internal history (the strategy can query this)
# 添加到内部历史记录(策略可以查询)
self._history.append(bar)
self._cursor += 1
# Push onto the central event queue / 推送到中央事件队列
self.event_queue.put(bar)
return True
def get_history(self, n: Optional[int] = None) -> List[MarketEvent]:
"""
Return the last n bars seen so far (default: all history).
返回迄今为止看到的最后 n 根K线默认全部历史
This is what the Strategy uses to compute indicators.
这是策略用于计算技术指标的数据。
"""
if n is None:
return self._history
return self._history[-n:]
def get_close_series(self) -> pd.Series:
"""
Return a pd.Series of all close prices seen so far.
返回到目前为止所有已观测收盘价的 pd.Series。
Useful for computing rolling indicators (移动平均等滚动指标).
"""
if not self._history:
return pd.Series(dtype=float)
return pd.Series(
[b.close for b in self._history],
index=[b.date for b in self._history],
)
# =============================================================================
# SECTION 3: Strategy (Abstract Base + Concrete Implementations)
# 策略(抽象基类 + 具体实现)
# =============================================================================
#
# A Strategy:
# 1. Listens for MarketEvents
# 2. Computes indicators on the historical data it has seen so far
# 3. Generates SignalEvents when its rules are triggered
#
# 策略:
# 1. 监听 MarketEvent
# 2. 根据迄今为止看到的历史数据计算指标
# 3. 当规则被触发时生成 SignalEvent
#
# The abstract base class (ABC) enforces a consistent interface so that
# we can swap strategies without changing any other component.
# 抽象基类强制统一接口,这样我们可以替换策略而不改变其他任何组件。
# =============================================================================
class Strategy(ABC):
"""
Abstract base class for all strategies.
所有策略的抽象基类。
"""
def __init__(self, data_handler: DataHandler, event_queue: queue.Queue):
self.data = data_handler
self.queue = event_queue
@abstractmethod
def on_market(self, event: MarketEvent) -> None:
"""
Called when a new MarketEvent arrives.
新的 MarketEvent 到达时调用。
Subclasses implement their signal logic here.
子类在这里实现信号逻辑。
"""
...
def send_signal(self, symbol: str, date: pd.Timestamp,
direction: Direction, strength: float = 1.0) -> None:
"""Helper to build and queue a SignalEvent. 构建并排队 SignalEvent 的辅助方法。"""
sig = SignalEvent(
symbol = symbol,
date = date,
direction = direction,
strength = strength,
strategy = self.__class__.__name__,
)
self.queue.put(sig)
class MACrossoverStrategy(Strategy):
"""
Dual Moving Average Crossover Strategy 双均线金叉/死叉策略
─────────────────────────────────────────────────────────────
Rules / 规则:
Golden Cross (金叉): short SMA crosses ABOVE long SMA → BUY (做多)
Death Cross (死叉): short SMA crosses BELOW long SMA → SELL (平仓)
Parameters / 参数:
short_window 短期均线窗口 (default 20 days)
long_window 长期均线窗口 (default 60 days)
"""
def __init__(self, data_handler: DataHandler, event_queue: queue.Queue,
short_window: int = 20, long_window: int = 60):
super().__init__(data_handler, event_queue)
self.short_window = short_window
self.long_window = long_window
self._in_position = False # are we currently holding a long position?
# 当前是否持有多仓?
def on_market(self, event: MarketEvent) -> None:
"""
Called on every new bar. 每根新K线调用一次。
We need at least long_window bars before we can compute the long SMA.
至少需要 long_window 根K线才能计算长期均线。
"""
closes = self.data.get_close_series()
n = len(closes)
# Not enough data yet — wait / 数据不足,等待
if n < self.long_window:
return
# Compute SMAs using only data available up to this bar
# 仅使用截至本K线的数据计算均线无前视偏差
sma_short = closes.iloc[-self.short_window:].mean()
sma_long = closes.iloc[-self.long_window:].mean()
# Previous bar's SMAs (to detect crossover / 检测交叉)
if n < self.long_window + 1:
return # need one extra bar to detect a CHANGE in relationship
sma_short_prev = closes.iloc[-(self.short_window + 1):-1].mean()
sma_long_prev = closes.iloc[-(self.long_window + 1):-1].mean()
was_above = sma_short_prev > sma_long_prev # previous relationship
is_above = sma_short > sma_long # current relationship
# ── Golden Cross 金叉 ─────────────────────────────────────────────
# Short MA just crossed ABOVE long MA → bullish signal
# 短期均线刚刚上穿长期均线 → 看涨信号
if is_above and not was_above and not self._in_position:
self.send_signal(event.symbol, event.date, Direction.LONG, strength=1.0)
self._in_position = True
# ── Death Cross 死叉 ─────────────────────────────────────────────
# Short MA just crossed BELOW long MA → exit signal
# 短期均线刚刚下穿长期均线 → 平仓信号
elif not is_above and was_above and self._in_position:
self.send_signal(event.symbol, event.date, Direction.EXIT, strength=1.0)
self._in_position = False
class RSIMeanReversionStrategy(Strategy):
"""
RSI Mean Reversion Strategy RSI 均值回归策略
─────────────────────────────────────────────
Rules / 规则:
RSI < oversold (超卖) → BUY (做多)
RSI > overbought(超买) → SELL EXIT (平多)
(Also supports going SHORT / 也支持做空)
Parameters / 参数:
window RSI计算窗口 (default 14)
oversold 超卖阈值 (default 30)
overbought 超买阈值 (default 70)
"""
def __init__(self, data_handler: DataHandler, event_queue: queue.Queue,
window: int = 14, oversold: float = 30, overbought: float = 70):
super().__init__(data_handler, event_queue)
self.window = window
self.oversold = oversold
self.overbought = overbought
self._position = 0 # 0=flat, 1=long, -1=short (0=空仓, 1=多仓, -1=空仓)
def _compute_rsi(self, closes: pd.Series) -> float:
"""
Compute the current RSI value using Wilder's smoothing.
使用 Wilder 平滑法计算当前 RSI 值。
"""
if len(closes) < self.window + 1:
return 50.0 # neutral / 中性,数据不足时返回中性值
delta = closes.diff()
gain = delta.clip(lower=0)
loss = -delta.clip(upper=0)
avg_gain = gain.ewm(alpha=1.0 / self.window, adjust=False).mean().iloc[-1]
avg_loss = loss.ewm(alpha=1.0 / self.window, adjust=False).mean().iloc[-1]
if avg_loss == 0:
return 100.0
rs = avg_gain / avg_loss
return 100.0 - (100.0 / (1.0 + rs))
def on_market(self, event: MarketEvent) -> None:
closes = self.data.get_close_series()
rsi = self._compute_rsi(closes)
# ── Entry: Oversold → Go Long 入场:超卖 → 做多 ──────────────────
if rsi < self.oversold and self._position == 0:
self.send_signal(event.symbol, event.date, Direction.LONG, strength=0.8)
self._position = 1
# ── Entry: Overbought → Go Short 入场:超买 → 做空 ───────────────
elif rsi > self.overbought and self._position == 0:
self.send_signal(event.symbol, event.date, Direction.SHORT, strength=0.8)
self._position = -1
# ── Exit Long when RSI recovers to neutral 多仓RSI回到中性时平仓 ──
elif self._position == 1 and rsi > 50:
self.send_signal(event.symbol, event.date, Direction.EXIT, strength=1.0)
self._position = 0
# ── Exit Short when RSI falls back to neutral 空仓RSI回到中性时平仓
elif self._position == -1 and rsi < 50:
self.send_signal(event.symbol, event.date, Direction.EXIT, strength=1.0)
self._position = 0
# =============================================================================
# SECTION 4: Portfolio 组合管理器
# =============================================================================
#
# The Portfolio is the "brain" that manages money. It:
# 1. Receives SignalEvents from the Strategy
# 2. Decides HOW MUCH to trade (position sizing / 仓位管理)
# 3. Creates OrderEvents for the ExecutionHandler
# 4. Receives FillEvents from the broker and updates:
# - Cash balance (现金余额)
# - Positions (持仓)
# - Realized P&L (已实现盈亏)
# - Unrealized P&L(未实现盈亏)
# 5. Records the daily equity (净值) for performance analysis
#
# 组合管理器是管理资金的"大脑"。它接收策略信号,决定交易多少,
# 向执行器发出订单,接收成交回报并更新现金/持仓/盈亏,记录每日净值。
#
# Position Sizing (仓位管理) methods / 常见方法:
# Fixed Fraction 固定比例 — always use X% of capital (简单,本文采用)
# Fixed Dollar 固定金额 — always trade $N worth
# Kelly Criterion 凯利公式 — optimal fraction based on edge & odds
# Volatility Scaling 波动率缩放 — scale size to target a fixed daily risk
# =============================================================================
class Portfolio:
"""
Tracks cash, positions, and P&L. Records equity curve.
追踪现金、持仓和盈亏,记录净值曲线。
"""
def __init__(
self,
data_handler : DataHandler,
event_queue : queue.Queue,
initial_capital: float = 1_000_000.0,
position_pct : float = 0.95, # fraction of capital to deploy per trade
# 每次交易使用资本的比例 (95%)
):
self.data = data_handler
self.queue = event_queue
self.initial_capital = initial_capital
self.position_pct = position_pct
# ── Account state 账户状态 ──────────────────────────────────────
self.cash = initial_capital # 现金余额 / available cash
self.positions : Dict[str, int] = {} # symbol → qty (持仓数量)
self.avg_cost : Dict[str, float] = {} # symbol → avg entry price (持仓均价)
# ── Trade log 交易记录 ───────────────────────────────────────────
# Every completed fill gets recorded here for analysis.
# 每笔完成的成交都记录在这里以供分析。
self.trade_log: List[dict] = []
# ── Equity curve 净值曲线 ────────────────────────────────────────
# Recorded once per day after market close.
# 每天收盘后记录一次。
self.equity_curve: List[dict] = []
# ── Computed properties 计算属性 ─────────────────────────────────────────
def market_value(self, current_prices: Dict[str, float]) -> float:
"""
Current market value of all open positions.
所有开放持仓的当前市值。
市值 = Σ(持仓数量 × 当前价格)
"""
return sum(
qty * current_prices.get(sym, 0.0)
for sym, qty in self.positions.items()
)
def total_equity(self, current_prices: Dict[str, float]) -> float:
"""
Total portfolio value: cash + market value of positions.
账户总价值:现金 + 持仓市值。
总资产 = 现金余额 + 持仓市值
"""
return self.cash + self.market_value(current_prices)
# ── Event handlers 事件处理器 ─────────────────────────────────────────────
def on_signal(self, event: SignalEvent) -> None:
"""
Convert a SignalEvent into an OrderEvent.
将 SignalEvent 转换为 OrderEvent决定下多少单
This is where position sizing (仓位管理) happens.
这里进行仓位计算。
"""
symbol = event.symbol
direction = event.direction
# ── Determine quantity 计算交易数量 ─────────────────────────────
# We use Fixed Fraction sizing: deploy position_pct of current equity.
# 使用固定比例法:每次使用当前总资产的 position_pct。
#
# quantity = floor( equity × position_pct × strength / current_price )
# 数量 = floor( 总资产 × 仓位比例 × 信号强度 / 当前价格 )
current_price = self.data.get_close_series().iloc[-1]
if direction == Direction.EXIT:
# Close the entire existing position / 平掉全部现有持仓
qty = abs(self.positions.get(symbol, 0))
if qty == 0:
return # nothing to close / 没有持仓可平
else:
# New entry — compute share count / 新入场——计算股数
capital_to_deploy = self.cash * self.position_pct * event.strength
qty = int(capital_to_deploy / current_price)
if qty == 0:
return # not enough cash / 资金不足
order = OrderEvent(
symbol = symbol,
date = event.date,
order_type = OrderType.MARKET, # use market orders for simplicity
direction = direction,
quantity = qty,
)
self.queue.put(order)
def on_fill(self, event: FillEvent) -> None:
"""
Update cash and positions when a fill is confirmed.
成交确认后更新现金和持仓。
"""
symbol = event.symbol
qty = event.quantity
price = event.fill_price
comm = event.commission
if event.direction in (Direction.LONG, Direction.SHORT):
# ── Opening a position 建仓 ──────────────────────────────────
sign = 1 if event.direction == Direction.LONG else -1
# Update average cost 更新持仓均价
old_qty = self.positions.get(symbol, 0)
old_cost = self.avg_cost.get(symbol, 0.0)
new_qty = old_qty + sign * qty
if new_qty != 0:
# Weighted average cost 加权平均成本
self.avg_cost[symbol] = (
(old_cost * abs(old_qty) + price * qty) / abs(new_qty)
)
self.positions[symbol] = new_qty
# Deduct cash (buy) / add cash (short sell)
# 扣除现金(买入)/ 增加现金(卖空)
self.cash -= sign * (price * qty) + comm
elif event.direction == Direction.EXIT:
# ── Closing a position 平仓 ──────────────────────────────────
entry_price = self.avg_cost.get(symbol, price)
pos_sign = 1 if self.positions.get(symbol, 0) > 0 else -1
# Realized P&L 已实现盈亏
# P&L = (exit_price - entry_price) × qty × direction
pnl = pos_sign * (price - entry_price) * qty - comm
# Return cash from closing the position
# 平仓回收现金
self.cash += pos_sign * price * qty - comm
# Clear position 清除持仓
self.positions[symbol] = 0
self.avg_cost[symbol] = 0.0
# Log the trade 记录交易
self.trade_log.append({
"date" : event.date,
"symbol" : symbol,
"direction" : "LONG→EXIT" if pos_sign == 1 else "SHORT→EXIT",
"entry_price" : entry_price,
"exit_price" : price,
"quantity" : qty,
"pnl" : pnl,
"commission" : comm,
})
def record_equity(self, date: pd.Timestamp, current_price: float) -> None:
"""
Snapshot the portfolio value at end of day.
记录每日收盘时的组合价值快照。
"""
prices = {self.data.symbol: current_price}
self.equity_curve.append({
"date" : date,
"cash" : self.cash,
"market_value" : self.market_value(prices),
"total_equity" : self.total_equity(prices),
})
def get_equity_series(self) -> pd.Series:
"""Return equity curve as a pd.Series. 返回净值曲线为 pd.Series。"""
if not self.equity_curve:
return pd.Series(dtype=float)
df = pd.DataFrame(self.equity_curve).set_index("date")
return df["total_equity"]
# =============================================================================
# SECTION 5: ExecutionHandler (Simulated Broker)
# 执行处理器(模拟券商)
# =============================================================================
#
# In live trading, this connects to a real broker API (e.g. Interactive
# Brokers, Alpaca, or a Chinese broker via vnpy).
# 在实盘交易中,这里连接真实券商 API如 Interactive Brokers、Alpaca
# 或通过 vnpy 连接国内券商)。
#
# In backtesting, we SIMULATE the broker. The key costs to model are:
# 在回测中,我们模拟券商。需要建模的主要成本:
#
# 1. Commission 佣金
# A fixed percentage of trade value charged by the broker.
# 券商按交易金额收取的固定比例费用。
# Typical in China A-shares: 万分之三 (0.03%) per side
# 中国A股典型佣金单边万分之三
#
# 2. Slippage 滑点
# The difference between the price at signal time and actual fill price.
# 信号生成时的价格与实际成交价格之间的差异。
# Causes: bid-ask spread, market impact, order queue position.
# 原因:买卖价差、市场冲击、排队等待。
#
# Slippage models / 滑点模型:
# Fixed percentage 固定比例 — simple, conservative
# Volume-weighted 成交量加权 — larger orders → more slippage
# Fixed spread 固定点差 — realistic for liquid markets
#
# 3. Fill Assumption 成交假设
# We assume market orders fill at next bar's OPEN price.
# 我们假设市价单在下一根K线的开盘价成交。
# This is a conservative and common assumption.
# 这是保守且常用的假设。
# =============================================================================
class SimulatedBroker:
"""
Simulates a brokerage: fills market orders with realistic costs.
模拟券商:以真实成本成交市价单。
"""
def __init__(
self,
event_queue : queue.Queue,
commission_rate : float = 0.0003, # 0.03% per side / 单边万分之三
slippage_rate : float = 0.0001, # 0.01% slippage / 万分之一滑点
fill_on : str = "next_open", # when to fill / 何时成交
min_commission : float = 5.0, # minimum commission / 最低佣金
):
"""
Parameters / 参数:
commission_rate 佣金率 (per side / 单边)
slippage_rate 滑点率 (applied as adverse price move / 不利价格移动)
fill_on 成交时机 'next_open' or 'current_close'
min_commission 最低佣金
"""
self.queue = event_queue
self.commission_rate = commission_rate
self.slippage_rate = slippage_rate
self.fill_on = fill_on
self.min_commission = min_commission
# Store pending orders to fill on next bar's open
# 存储待处理订单在下一根K线开盘价成交
self._pending_orders: List[OrderEvent] = []
def on_order(self, event: OrderEvent) -> None:
"""
Receive an order from Portfolio.
接收来自组合管理器的订单。
For market orders, we queue them to fill at next open.
对于市价单我们将其排队在下一根K线的开盘价成交。
"""
if event.order_type == OrderType.MARKET:
self._pending_orders.append(event)
# Limit and Stop orders would need price monitoring — not implemented here
# 限价单和止损单需要价格监控——此处未实现,留作扩展
def fill_pending(self, bar: MarketEvent) -> None:
"""
Fill all pending orders at the open of the provided bar.
以提供K线的开盘价成交所有待处理订单。
This is called at the start of each new bar, before the Strategy
gets to process it.
每根新K线开始时策略处理之前调用。
"""
for order in self._pending_orders:
fill_price = self._compute_fill_price(order, bar)
commission = self._compute_commission(order, fill_price)
slippage = fill_price - bar.open # adverse difference from open
fill = FillEvent(
symbol = order.symbol,
date = bar.date, # filled on THIS bar's date
direction = order.direction,
quantity = order.quantity,
fill_price = fill_price,
commission = commission,
slippage = slippage,
)
self.queue.put(fill)
self._pending_orders.clear()
def _compute_fill_price(self, order: OrderEvent, bar: MarketEvent) -> float:
"""
Apply slippage to the open price.
对开盘价施加滑点。
Slippage always works AGAINST the trader:
滑点总是对交易者不利:
BUY (LONG) : fill above open (买入时成交价高于开盘价)
SELL (EXIT) : fill below open (卖出时成交价低于开盘价)
"""
direction = order.direction
if direction in (Direction.LONG, Direction.SHORT):
# Buying: slippage pushes price UP 买入:滑点使价格上升
return bar.open * (1 + self.slippage_rate)
else:
# Selling: slippage pushes price DOWN 卖出:滑点使价格下降
return bar.open * (1 - self.slippage_rate)
def _compute_commission(self, order: OrderEvent, fill_price: float) -> float:
"""
Compute brokerage commission.
计算券商佣金。
commission = max(commission_rate × trade_value, min_commission)
佣金 = max(佣金率 × 交易金额, 最低佣金)
"""
trade_value = fill_price * order.quantity
commission = max(trade_value * self.commission_rate, self.min_commission)
return commission
# =============================================================================
# SECTION 6: BacktestEngine 回测引擎(主事件循环)
# =============================================================================
#
# The BacktestEngine is the conductor — it orchestrates all components by
# running the main event loop.
# 回测引擎是指挥者——通过运行主事件循环来协调所有组件。
#
# Main Loop Algorithm 主循环算法:
# ──────────────────────────────────────────────────────────────────────────
# WHILE data feed has more bars:
# 1. DataHandler.stream_next() → puts MarketEvent on queue
# 2. Broker.fill_pending(bar) → fills any leftover orders FIRST
# (orders placed yesterday fill at today's open — no lookahead!)
# (昨天的订单今天开盘价成交 — 无前视偏差!)
# 3. WHILE queue is not empty:
# event = queue.get()
# IF MarketEvent → Strategy.on_market(event)
# ELIF SignalEvent → Portfolio.on_signal(event)
# ELIF OrderEvent → Broker.on_order(event)
# ELIF FillEvent → Portfolio.on_fill(event)
# 4. Portfolio.record_equity(today's close)
# END WHILE
# ──────────────────────────────────────────────────────────────────────────
#
# Why is step 2 (fill_pending) before step 3 (process queue)?
# 为什么步骤2成交待处理在步骤3处理队列之前
#
# Orders generated on day T are pending. On day T+1 the broker fills them
# at the open BEFORE the strategy sees day T+1's data.
# T日生成的订单是待处理状态。T+1日的开盘价成交在策略看到T+1数据之前
# This correctly models the one-day execution delay of real trading.
# 这正确模拟了真实交易的一天执行延迟。
# =============================================================================
class BacktestEngine:
"""
Main event-driven backtest engine. 主事件驱动回测引擎。
Wires together DataHandler, Strategy, Portfolio, and Broker.
将 DataHandler、Strategy、Portfolio 和 Broker 组合在一起。
"""
def __init__(
self,
data_handler : DataHandler,
strategy : Strategy,
portfolio : Portfolio,
broker : SimulatedBroker,
name : str = "Event-Driven Backtest",
):
self.data = data_handler
self.strategy = strategy
self.portfolio = portfolio
self.broker = broker
self.name = name
# All components share this single event queue / 所有组件共享这一事件队列
assert (data_handler.event_queue is strategy.queue is
portfolio.queue is broker.queue), \
"All components must share the same event queue!"
self._queue = data_handler.event_queue
def run(self, verbose: bool = False) -> None:
"""
Execute the backtest from start to finish.
从头到尾执行回测。
Parameters / 参数:
verbose — if True, print every event (useful for debugging)
如果为 True打印每个事件用于调试
"""
bar_count = 0
event_count= 0
print(f"\n{'' * 60}")
print(f" 回测开始: {self.name}")
print(f" Backtest Start: {self.data.data.index[0].date()}"
f"{self.data.data.index[-1].date()}")
print(f"{'' * 60}")
# ── Main loop 主循环 ─────────────────────────────────────────────
while self.data.has_more_data():
# ── Step 1: Stream the next bar 流出下一根K线 ─────────────────
# This puts a MarketEvent on the queue.
# 这会将一个 MarketEvent 放入队列。
self.data.stream_next()
bar_count += 1
# ── Step 2: Fill yesterday's pending orders at today's open ───
# ── 步骤2以今天的开盘价成交昨天的待处理订单 ─────────────────
current_bar = self.data._history[-1]
self.broker.fill_pending(current_bar)
# ── Step 3: Process all events in the queue 处理队列中的所有事件
while not self._queue.empty():
event = self._queue.get()
event_count += 1
if verbose:
print(f" [{event.type.value}] {event}")
if event.type == EventType.MARKET:
# Strategy sees the new price bar and may emit a signal
# 策略看到新K线可能发出信号
self.strategy.on_market(event)
elif event.type == EventType.SIGNAL:
# Portfolio receives signal and may emit an order
# 组合管理器接收信号,可能发出订单
self.portfolio.on_signal(event)
elif event.type == EventType.ORDER:
# Broker receives the order (will fill on next bar's open)
# 券商接收订单将在下一根K线的开盘价成交
self.broker.on_order(event)
elif event.type == EventType.FILL:
# Portfolio updates its cash & position records
# 组合管理器更新现金和持仓记录
self.portfolio.on_fill(event)
# ── Step 4: Record end-of-day equity 记录当日收盘净值 ───────
self.portfolio.record_equity(current_bar.date, current_bar.close)
n_trades = len(self.portfolio.trade_log)
print(f" 回测完成! Backtest Complete!")
print(f" 总K线数: {bar_count} 总事件数: {event_count} 总交易数: {n_trades}")
print(f"{'' * 60}")
# ── Performance metrics 绩效指标 ─────────────────────────────────────────
def metrics(self) -> dict:
"""
Compute performance metrics from the equity curve.
从净值曲线计算绩效指标。
"""
eq = self.portfolio.get_equity_series()
ret = eq.pct_change().fillna(0)
n = len(ret)
years = n / 252.0
total_return = (eq.iloc[-1] / self.portfolio.initial_capital) - 1
cagr = (1 + total_return) ** (1 / years) - 1 if years > 0 else 0
ann_vol = ret.std() * np.sqrt(252)
sharpe = (ret.mean() / ret.std()) * np.sqrt(252) if ret.std() > 0 else 0
downside_std = ret[ret < 0].std() * np.sqrt(252)
sortino = cagr / downside_std if downside_std > 0 else 0
rolling_max = eq.cummax()
drawdown = (eq - rolling_max) / rolling_max
max_dd = drawdown.min()
calmar = cagr / abs(max_dd) if max_dd != 0 else 0
win_rate = (ret > 0).mean()
# Trade-level statistics 交易级别统计
trades = self.portfolio.trade_log
n_trades = len(trades)
if n_trades > 0:
pnls = [t["pnl"] for t in trades]
win_trades = [p for p in pnls if p > 0]
loss_trades= [p for p in pnls if p < 0]
trade_win_rate = len(win_trades) / n_trades
avg_win = np.mean(win_trades) if win_trades else 0
avg_loss = np.mean(loss_trades) if loss_trades else 0
profit_factor = (sum(win_trades) / abs(sum(loss_trades))
if loss_trades else np.inf)
else:
trade_win_rate = profit_factor = avg_win = avg_loss = 0
return {
"总收益率 Total Return" : f"{total_return:.2%}",
"年化收益率 CAGR" : f"{cagr:.2%}",
"年化波动率 Ann. Volatility" : f"{ann_vol:.2%}",
"夏普比率 Sharpe Ratio" : f"{sharpe:.3f}",
"索提诺比率 Sortino Ratio" : f"{sortino:.3f}",
"最大回撤 Max Drawdown" : f"{max_dd:.2%}",
"卡玛比率 Calmar Ratio" : f"{calmar:.3f}",
"日胜率 Daily Win Rate" : f"{win_rate:.2%}",
"交易次数 # Trades (round trip)": str(n_trades),
"交易胜率 Trade Win Rate" : f"{trade_win_rate:.2%}",
"平均盈利 Avg Win (per trade)" : f"¥{avg_win:,.0f}",
"平均亏损 Avg Loss (per trade)": f"¥{avg_loss:,.0f}",
"盈亏比 Profit Factor" : f"{profit_factor:.3f}",
}
def print_metrics(self):
"""Pretty-print the performance report. 格式化打印绩效报告。"""
print(f"\n{'=' * 60}")
print(f" 策略绩效报告 / Performance Report")
print(f" {self.name}")
print(f"{'=' * 60}")
for k, v in self.metrics().items():
print(f" {k:<40} {v}")
print(f"\n ── 最近 5 笔交易 Last 5 Trades ────────────────────────")
trades = self.portfolio.trade_log[-5:]
if trades:
for t in trades:
pnl_sign = "+" if t["pnl"] >= 0 else ""
print(f" {t['date'].date()} {t['direction']:<12} "
f"进价 {t['entry_price']:>8.2f} 出价 {t['exit_price']:>8.2f} "
f"股数 {t['quantity']:>5} "
f"盈亏 {pnl_sign}{t['pnl']:>10,.0f}¥")
else:
print(" (无完整交易 / No completed round-trip trades)")
print(f"{'=' * 60}")
# =============================================================================
# SECTION 7: Assemble & Run 组装并运行
# =============================================================================
# ── Generate synthetic price data (same seed as previous demo for consistency)
# ── 生成合成价格数据(与前一个演示相同的随机种子以保持一致性)
def generate_price_series(n_days=1500, mu=0.10, sigma=0.25, s0=100.0, seed=42):
"""Geometric Brownian Motion price series. 几何布朗运动价格序列。"""
np.random.seed(seed)
dt = 1.0 / 252
eps = np.random.randn(n_days)
log_ret = (mu - 0.5 * sigma**2) * dt + sigma * np.sqrt(dt) * eps
prices = s0 * np.exp(np.cumsum(log_ret))
dates = pd.bdate_range(start="2019-01-02", periods=n_days)
# Build a realistic OHLCV DataFrame 构建真实的 OHLCV DataFrame
daily_range = prices * np.abs(np.random.randn(n_days)) * 0.015
opens = prices * (1 + np.random.randn(n_days) * 0.003)
highs = np.maximum(prices, opens) + np.abs(np.random.randn(n_days)) * daily_range * 0.5
lows = np.minimum(prices, opens) - np.abs(np.random.randn(n_days)) * daily_range * 0.5
vols = np.random.lognormal(mean=14.5, sigma=0.5, size=n_days).astype(int)
return pd.DataFrame({
"open": opens, "high": highs, "low": lows,
"close": prices, "volume": vols,
}, index=dates)
ohlcv = generate_price_series()
SYMBOL = "SIM_STOCK"
print(f"\n[数据] 生成模拟 OHLCV 数据:")
print(f" 交易日数: {len(ohlcv)}")
print(f" 日期范围: {ohlcv.index[0].date()}{ohlcv.index[-1].date()}")
print(f" 价格区间: {ohlcv['close'].min():.2f} ~ {ohlcv['close'].max():.2f}")
def build_engine(strategy_class, strategy_kwargs: dict,
name: str, commission=0.0003, slippage=0.0001
) -> BacktestEngine:
"""
Factory function: creates a fresh, independent backtest engine.
工厂函数:创建一个全新的、独立的回测引擎。
每次回测共享同样的价格数据,但所有组件(事件队列、组合、策略)
都是独立实例,相互不干扰。
"""
q = queue.Queue() # fresh queue / 全新事件队列
data = DataHandler(ohlcv, SYMBOL, q) # streams OHLCV bar by bar
strategy = strategy_class(data, q, **strategy_kwargs)
portfolio = Portfolio(data, q, initial_capital=1_000_000.0)
broker = SimulatedBroker(q, commission_rate=commission,
slippage_rate=slippage)
return BacktestEngine(data, strategy, portfolio, broker, name=name)
# ── Run MA Crossover 运行双均线策略 ──────────────────────────────────────────
engine_ma = build_engine(
MACrossoverStrategy,
{"short_window": 20, "long_window": 60},
name="事件驱动 — 双均线策略 (MA Crossover 20/60)",
)
engine_ma.run(verbose=False)
engine_ma.print_metrics()
# ── Run RSI Mean Reversion 运行RSI均值回归策略 ──────────────────────────────
engine_rsi = build_engine(
RSIMeanReversionStrategy,
{"window": 14, "oversold": 30, "overbought": 70},
name="事件驱动 — RSI均值回归 (RSI Mean Reversion 14/30/70)",
)
engine_rsi.run(verbose=False)
engine_rsi.print_metrics()
# =============================================================================
# SECTION 8: Visualization 可视化
# =============================================================================
fig = plt.figure(figsize=(16, 20))
gs = gridspec.GridSpec(5, 2, figure=fig, hspace=0.5, wspace=0.32)
price_series = ohlcv["close"]
# ── Plot 1 (top, full width): Price + MA signals
# ── 图1顶部全宽: 价格 + 均线信号
ax1 = fig.add_subplot(gs[0, :])
ax1.plot(price_series, color="#1f77b4", linewidth=1, alpha=0.9, label="价格 Close")
sma20 = price_series.rolling(20).mean()
sma60 = price_series.rolling(60).mean()
ax1.plot(sma20, color="orange", linewidth=1.3, label="SMA20 (快线)")
ax1.plot(sma60, color="crimson", linewidth=1.3, label="SMA60 (慢线)")
# Mark trade entries / exits from the event-driven engine
# 标记事件驱动引擎的交易进出场点
for trade in engine_ma.portfolio.trade_log:
color = "green" if "LONG" in trade["direction"] else "red"
ax1.axvline(x=trade["date"], color=color, alpha=0.25, linewidth=0.8)
ax1.set_title("双均线策略 — 价格与信号 (MA Crossover: Price + Signals)",
fontsize=12, fontweight="bold")
ax1.legend(loc="upper left", fontsize=9)
ax1.set_ylabel("价格 Price")
ax1.grid(alpha=0.3)
# ── Plot 2 (full width): Event flow detail for one trade
# ── 图2全宽: 一笔交易的事件流细节(放大展示)
ax2 = fig.add_subplot(gs[1, :])
# Show a 120-day window around the first trade to illustrate the event flow
# 展示第一笔交易前后120天的窗口说明事件流
if engine_ma.portfolio.trade_log:
trade0 = engine_ma.portfolio.trade_log[0]
focus_date = trade0["date"]
window = pd.Timedelta(days=90)
mask = (price_series.index >= focus_date - window) & \
(price_series.index <= focus_date + window)
ax2.plot(price_series[mask], color="#1f77b4", linewidth=1.5)
ax2.plot(sma20[mask], color="orange", linewidth=1.2, linestyle="--", label="SMA20")
ax2.plot(sma60[mask], color="crimson", linewidth=1.2, linestyle="--", label="SMA60")
# Mark the exit point
ax2.axvline(x=focus_date, color="red", linewidth=1.5,
label=f"平仓 Exit @ {focus_date.date()}")
ax2.set_title(
f"放大: 第1笔交易前后 90 天 "
f"(Zoom: 90 days around Trade #1 Exit)\n"
f"进价 Entry ¥{trade0['entry_price']:.2f} "
f"出价 Exit ¥{trade0['exit_price']:.2f} "
f"盈亏 P&L ¥{trade0['pnl']:,.0f}",
fontsize=10, fontweight="bold",
)
ax2.legend(fontsize=9)
ax2.grid(alpha=0.3)
# ── Plot 3 (left): Equity curves 净值曲线
ax3 = fig.add_subplot(gs[2, 0])
eq_ma = engine_ma.portfolio.get_equity_series()
eq_rsi = engine_rsi.portfolio.get_equity_series()
eq_bh = 1_000_000 * (1 + price_series.pct_change().fillna(0)).cumprod()
ax3.plot(eq_ma, color="steelblue", linewidth=1.5, label="MA策略")
ax3.plot(eq_rsi, color="purple", linewidth=1.5, label="RSI策略")
ax3.plot(eq_bh, color="gray", linewidth=1.2, linestyle="--", label="Buy & Hold")
ax3.set_title("净值曲线对比 (Equity Curves)", fontsize=11, fontweight="bold")
ax3.set_ylabel("账户价值 Portfolio Value")
ax3.yaxis.set_major_formatter(plt.FuncFormatter(lambda x, _: f"¥{x/1e4:.0f}"))
ax3.legend(fontsize=9)
ax3.grid(alpha=0.3)
# ── Plot 4 (right): Drawdown 回撤曲线
ax4 = fig.add_subplot(gs[2, 1])
def compute_dd(eq):
return (eq - eq.cummax()) / eq.cummax()
dd_ma = compute_dd(eq_ma)
dd_rsi = compute_dd(eq_rsi)
dd_bh = compute_dd(eq_bh)
ax4.fill_between(dd_ma.index, dd_ma, 0, alpha=0.5, color="steelblue", label="MA策略")
ax4.fill_between(dd_rsi.index, dd_rsi, 0, alpha=0.4, color="purple", label="RSI策略")
ax4.fill_between(dd_bh.index, dd_bh, 0, alpha=0.3, color="gray", label="Buy & Hold")
ax4.set_title("回撤曲线 (Drawdowns)", fontsize=11, fontweight="bold")
ax4.set_ylabel("回撤 Drawdown")
ax4.yaxis.set_major_formatter(plt.FuncFormatter(lambda x, _: f"{x:.0%}"))
ax4.legend(fontsize=9)
ax4.grid(alpha=0.3)
# ── Plot 5 (left): Trade P&L distribution 交易盈亏分布
ax5 = fig.add_subplot(gs[3, 0])
pnls = [t["pnl"] for t in engine_ma.portfolio.trade_log]
if pnls:
colors_bar = ["green" if p > 0 else "red" for p in pnls]
ax5.bar(range(len(pnls)), pnls, color=colors_bar, alpha=0.75, edgecolor="white")
ax5.axhline(0, color="black", linewidth=0.8)
ax5.set_title(f"MA策略 — 每笔交易盈亏 (Per-Trade P&L)\n"
f"{len(pnls)}笔, 胜率 "
f"{sum(1 for p in pnls if p > 0)/len(pnls):.0%}",
fontsize=10, fontweight="bold")
ax5.set_xlabel("交易编号 Trade #")
ax5.set_ylabel("盈亏 P&L (¥)")
ax5.yaxis.set_major_formatter(plt.FuncFormatter(lambda x, _: f"¥{x/1e4:.1f}"))
ax5.grid(alpha=0.3, axis="y")
# ── Plot 6 (right): Cash vs Market Value over time
# ── 图6: 现金 vs 持仓市值随时间变化
ax6 = fig.add_subplot(gs[3, 1])
eq_df = pd.DataFrame(engine_ma.portfolio.equity_curve).set_index("date")
ax6.stackplot(eq_df.index,
eq_df["cash"], eq_df["market_value"],
labels=["现金 Cash", "持仓市值 Market Value"],
colors=["#2ca02c", "#ff7f0e"], alpha=0.7)
ax6.set_title("现金 vs 持仓市值 (Cash vs Position Value)", fontsize=11, fontweight="bold")
ax6.set_ylabel("金额 Amount (¥)")
ax6.yaxis.set_major_formatter(plt.FuncFormatter(lambda x, _: f"¥{x/1e4:.0f}"))
ax6.legend(loc="upper left", fontsize=9)
ax6.grid(alpha=0.3)
# ── Plot 7 (full width): Rolling metrics — Sharpe & Volatility
# ── 图7全宽: 滚动指标 — 夏普比率 & 波动率
ax7a = fig.add_subplot(gs[4, 0])
ax7b = fig.add_subplot(gs[4, 1])
ret_ma = eq_ma.pct_change().fillna(0)
roll_sharpe = ret_ma.rolling(60).mean() / ret_ma.rolling(60).std() * np.sqrt(252)
roll_vol = ret_ma.rolling(60).std() * np.sqrt(252)
ax7a.plot(roll_sharpe, color="steelblue", linewidth=1)
ax7a.axhline(0, color="black", linewidth=0.8, linestyle="--")
ax7a.axhline(1, color="green", linewidth=0.8, linestyle=":", label="Sharpe=1 (目标线)")
ax7a.fill_between(roll_sharpe.index, roll_sharpe, 0,
where=(roll_sharpe > 0), alpha=0.2, color="green")
ax7a.fill_between(roll_sharpe.index, roll_sharpe, 0,
where=(roll_sharpe < 0), alpha=0.2, color="red")
ax7a.set_title("滚动60日夏普比率 (60-day Rolling Sharpe)", fontsize=11, fontweight="bold")
ax7a.set_ylabel("夏普比率 Sharpe")
ax7a.legend(fontsize=8)
ax7a.grid(alpha=0.3)
ax7b.plot(roll_vol, color="darkorange", linewidth=1)
ax7b.fill_between(roll_vol.index, roll_vol, alpha=0.2, color="orange")
ax7b.set_title("滚动60日年化波动率 (60-day Rolling Volatility)", fontsize=11, fontweight="bold")
ax7b.set_ylabel("年化波动率 Ann. Volatility")
ax7b.yaxis.set_major_formatter(plt.FuncFormatter(lambda x, _: f"{x:.0%}"))
ax7b.grid(alpha=0.3)
plt.suptitle(
"量化交易 — 事件驱动回测引擎 Quantitative Trading: Event-Driven Backtest",
fontsize=15, fontweight="bold", y=1.003,
)
plt.savefig("event_driven_backtest.png", dpi=120, bbox_inches="tight")
plt.show()
print("\n[图表] 已保存至 event_driven_backtest.png")
# =============================================================================
# SECTION 9: Compare Event-Driven vs Vectorized
# 对比事件驱动 vs 向量化回测
# =============================================================================
#
# Run a quick vectorized version of the SAME MA strategy on the SAME data
# to show how results can differ due to:
# 对相同数据运行相同MA策略的快速向量化版本展示结果差异的原因
#
# 1. Fill timing 成交时机 — event-driven uses next-bar open;
# vectorized typically uses same-bar close
# 2. Cost modeling 成本建模 — event-driven has min commission floor;
# vectorized applies flat percentage only
# 3. Integer shares 整数股 — event-driven floors to whole shares;
# vectorized trades fractional shares
# =============================================================================
print(f"\n{'=' * 60}")
print(" 对比: 事件驱动 vs 向量化回测")
print(" Comparison: Event-Driven vs Vectorized Backtest")
print(f"{'=' * 60}")
print(" 相同策略: 双均线 MA Crossover (20/60)")
print(" 相同数据: 同一模拟价格序列")
print()
# Vectorized version (快速向量化版本)
closes = ohlcv["close"]
sma20v = closes.rolling(20).mean()
sma60v = closes.rolling(60).mean()
sig_vec = (sma20v > sma60v).astype(float).shift(1).fillna(0)
ret_vec = closes.pct_change().fillna(0)
strat_ret_gross = sig_vec * ret_vec
# Simple flat cost: 0.03% commission + 0.01% slippage = 0.04% per side
position_change = sig_vec.diff().abs().fillna(0)
cost_vec = position_change * (0.0003 + 0.0001)
strat_ret_net = strat_ret_gross - cost_vec
eq_vec = 1_000_000 * (1 + strat_ret_net).cumprod()
# Compare
m_ed = engine_ma.metrics()
total_ed = float(m_ed["总收益率 Total Return"].strip("%")) / 100
sharpe_ed = float(m_ed["夏普比率 Sharpe Ratio"])
maxdd_ed = float(m_ed["最大回撤 Max Drawdown"].strip("%")) / 100
total_vec = (eq_vec.iloc[-1] / 1_000_000) - 1
ret_v_s = strat_ret_net
sharpe_vec = (ret_v_s.mean() / ret_v_s.std()) * np.sqrt(252)
rolling_mx = eq_vec.cummax()
maxdd_vec = ((eq_vec - rolling_mx) / rolling_mx).min()
print(f" {'指标 Metric':<30} {'事件驱动 Event-Driven':>22} {'向量化 Vectorized':>20}")
print(f" {''*72}")
print(f" {'总收益率 Total Return':<30} {total_ed:>21.2%} {total_vec:>19.2%}")
print(f" {'夏普比率 Sharpe Ratio':<30} {sharpe_ed:>21.3f} {sharpe_vec:>19.3f}")
print(f" {'最大回撤 Max Drawdown':<30} {maxdd_ed:>21.2%} {maxdd_vec:>19.2%}")
print()
print(" 差异原因 / Why they differ:")
print(" ① 事件驱动以「下一日开盘价」成交 (Event-driven fills at next open)")
print(" 向量化以「当日收盘价」交易 (Vectorized trades at same-day close)")
print(" ② 事件驱动有最低佣金下限 ¥5 (Event-driven has min commission floor)")
print(" ③ 事件驱动按整数股买卖 (Event-driven uses integer share quantities)")
print(" ④ 资金是随时间变化的 (Capital base grows/shrinks with P&L)")
# =============================================================================
# SECTION 10: What's Next 下一步
# =============================================================================
print(f"""
{'=' * 72}
总结 Summary
{'=' * 72}
事件驱动回测引擎的核心组件 / Core Components of Event-Driven Engine:
┌─────────────────────┬────────────────────────────────────────────┐
│ 组件 Component │ 职责 Responsibility │
├─────────────────────┼────────────────────────────────────────────┤
│ Event (事件) │ 解耦组件间通信的消息对象 │
│ │ Message objects that decouple components │
├─────────────────────┼────────────────────────────────────────────┤
│ DataHandler │ 逐根K线流出历史数据模拟无未来信息
│ 数据处理器 │ Streams bars one-by-one (no future info) │
├─────────────────────┼────────────────────────────────────────────┤
│ Strategy (策略) │ 观察市场 → 生成交易信号 │
│ │ Observes market → generates trade signals │
├─────────────────────┼────────────────────────────────────────────┤
│ Portfolio (组合) │ 仓位管理 + 现金/持仓/盈亏追踪 │
│ │ Position sizing + cash/position/P&L track │
├─────────────────────┼────────────────────────────────────────────┤
│ SimulatedBroker │ 模拟券商:滑点+佣金+成交时机 │
│ 模拟券商 │ Simulates broker: slippage+commission+fill │
├─────────────────────┼────────────────────────────────────────────┤
│ BacktestEngine │ 主事件循环,驱动所有组件 │
│ 回测引擎 │ Main event loop, orchestrates all parts │
└─────────────────────┴────────────────────────────────────────────┘
下一步学习方向 Next Steps:
──────────────────────────────────────────────────────────────────
• 限价单/止损单 Limit & Stop orders in ExecutionHandler
• 多标的组合 Multi-asset portfolio (stocks + bonds, sector rotation)
• 因子选股 Alpha factor models (Fama-French, Momentum, Quality)
• 机器学习信号 ML-based signals (XGBoost, LSTM)
• 风险管理 Risk management (VaR, CVaR, Kelly position sizing)
• 实盘对接 Live trading connection (vnpy, Alpaca API)
{'=' * 72}
""")