1532 lines
71 KiB
Python
1532 lines
71 KiB
Python
# =============================================================================
|
||
# Quantitative Trading — Event-Driven Backtest Engine
|
||
# 量化交易 — 事件驱动回测引擎
|
||
# =============================================================================
|
||
#
|
||
# 本文件是策略开发演示 (quant_strategy_backtest_demo.py) 的续集。
|
||
# This file is the sequel to the strategy development demo.
|
||
#
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
# Why Event-Driven? 为什么要用事件驱动?
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
# The vectorized backtest in the previous demo computed everything at once
|
||
# using array operations. This is fast, but it has a fundamental problem:
|
||
# 上一个演示中的向量化回测一次性用数组运算计算所有结果。速度很快,但有根本缺陷:
|
||
#
|
||
# It cannot easily model:
|
||
# 难以建模:
|
||
# • Different order types 不同订单类型 (limit / stop / market orders)
|
||
# • Partial fills 部分成交
|
||
# • Intraday price path 日内价格路径 (open → high/low → close sequence)
|
||
# • Real-time risk checks 实时风控检查 (e.g. position limits, margin calls)
|
||
# • Multiple strategies 多策略组合
|
||
# • Portfolio-level rules 组合层面规则 (e.g. max sector exposure)
|
||
#
|
||
# The event-driven approach treats the backtest as a simulation of the actual
|
||
# live trading system. The same strategy & portfolio code can run in both
|
||
# backtest mode AND live trading mode — just swap the data source.
|
||
#
|
||
# 事件驱动方法将回测视为真实交易系统的模拟。
|
||
# 同一套策略和组合代码可以在回测模式和实盘模式下运行——只需替换数据源。
|
||
#
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
# Architecture Overview 架构总览
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
#
|
||
# ┌─────────────────────────────────┐
|
||
# │ Event Queue 事件队列 │
|
||
# │ (FIFO deque — central bus) │
|
||
# └───────────────┬─────────────────┘
|
||
# │ events flow through
|
||
# ┌────────────────────┼────────────────────┐
|
||
# ▼ ▼ ▼
|
||
# ┌────────────────┐ ┌────────────────┐ ┌────────────────┐
|
||
# │ DataHandler │ │ Strategy │ │ Portfolio │
|
||
# │ 数据处理器 │ │ 策略引擎 │ │ 组合管理器 │
|
||
# │ │ │ │ │ │
|
||
# │ Streams bars │ │ Reads market │ │ Receives order │
|
||
# │ (OHLCV) into │ │ data, computes │ │ requests from │
|
||
# │ the queue as │ │ signals, emits │ │ strategy and │
|
||
# │ MarketEvents │ │ SignalEvents │ │ emits │
|
||
# └────────────────┘ └────────────────┘ │ OrderEvents │
|
||
# └───────┬────────┘
|
||
# │
|
||
# ┌───────▼────────┐
|
||
# │ ExecutionHandler│
|
||
# │ 执行处理器 │
|
||
# │ (模拟券商) │
|
||
# │ │
|
||
# │ Fills orders, │
|
||
# │ applies slippage│
|
||
# │ & commission, │
|
||
# │ emits │
|
||
# │ FillEvents │
|
||
# └────────────────┘
|
||
#
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
# Event flow for one trading day 单个交易日的事件流
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
#
|
||
# [Clock ticks to new day / 时钟滴答到新的一天]
|
||
# │
|
||
# ▼
|
||
# DataHandler emits MarketEvent (新K线数据到达)
|
||
# │
|
||
# ▼
|
||
# Strategy processes MarketEvent → emits SignalEvent (信号:买/卖/平)
|
||
# │
|
||
# ▼
|
||
# Portfolio processes SignalEvent → emits OrderEvent (下单:市价/限价/止损)
|
||
# │
|
||
# ▼
|
||
# ExecutionHandler processes OrderEvent → emits FillEvent (成交确认)
|
||
# │
|
||
# ▼
|
||
# Portfolio processes FillEvent → updates positions & P&L (更新持仓和盈亏)
|
||
# │
|
||
# ▼
|
||
# [repeat for next day / 重复下一天]
|
||
#
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
# Topics covered / 涵盖主题:
|
||
# 1. Event Classes 事件类 (Market, Signal, Order, Fill)
|
||
# 2. DataHandler 数据处理器
|
||
# 3. Strategy 策略 (MA Crossover & RSI)
|
||
# 4. Portfolio 组合管理器 (cash, positions, P&L)
|
||
# 5. ExecutionHandler 执行处理器 (slippage models, order types)
|
||
# 6. BacktestEngine 回测引擎 (main event loop)
|
||
# 7. Performance Analytics 绩效分析
|
||
# 8. Comparison: Event-Driven vs Vectorized
|
||
# =============================================================================
|
||
|
||
from __future__ import annotations
|
||
|
||
import queue # Python built-in FIFO queue
|
||
import enum # for clean event type constants
|
||
from dataclasses import dataclass, field
|
||
from abc import ABC, abstractmethod
|
||
from typing import Dict, List, Optional, Tuple
|
||
from copy import deepcopy
|
||
|
||
import numpy as np
|
||
import pandas as pd
|
||
import matplotlib.pyplot as plt
|
||
import matplotlib.gridspec as gridspec
|
||
import warnings
|
||
|
||
warnings.filterwarnings('ignore')
|
||
|
||
# ── Chinese font / 中文字体 ──────────────────────────────────────────────────
|
||
plt.rcParams['font.sans-serif'] = ['WenQuanYi Zen Hei', 'Arial Unicode MS',
|
||
'SimHei', 'DejaVu Sans']
|
||
plt.rcParams['axes.unicode_minus'] = False
|
||
|
||
np.random.seed(42)
|
||
|
||
print("=" * 72)
|
||
print(" 量化交易 — 事件驱动回测引擎")
|
||
print(" Quantitative Trading — Event-Driven Backtest Engine")
|
||
print("=" * 72)
|
||
|
||
|
||
# =============================================================================
|
||
# SECTION 1: Event Classes 事件类
|
||
# =============================================================================
|
||
#
|
||
# Every piece of information that flows through the system is wrapped in an
|
||
# Event object. This decouples the components — a Strategy doesn't call the
|
||
# Portfolio directly; it just puts a SignalEvent on the queue.
|
||
#
|
||
# 系统中流通的每一条信息都被包装成一个 Event 对象。
|
||
# 这使各组件解耦——策略不直接调用组合,只是把 SignalEvent 放入队列。
|
||
#
|
||
# Four event types:
|
||
# 四种事件类型:
|
||
#
|
||
# MarketEvent 市场事件 — new OHLCV bar has arrived from the data feed
|
||
# SignalEvent 信号事件 — strategy has decided to buy or sell
|
||
# OrderEvent 订单事件 — portfolio has sized the trade and placed an order
|
||
# FillEvent 成交事件 — broker has confirmed the order was executed
|
||
# =============================================================================
|
||
|
||
class EventType(enum.Enum):
|
||
"""
|
||
Enumeration of all event types in the system.
|
||
系统中所有事件类型的枚举。
|
||
"""
|
||
MARKET = "MARKET" # 市场事件 — new price bar
|
||
SIGNAL = "SIGNAL" # 信号事件 — trade signal from strategy
|
||
ORDER = "ORDER" # 订单事件 — order from portfolio
|
||
FILL = "FILL" # 成交事件 — filled order from broker
|
||
|
||
|
||
class Direction(enum.Enum):
|
||
"""
|
||
Trade direction. 交易方向。
|
||
"""
|
||
LONG = "LONG" # 做多 / buy
|
||
SHORT = "SHORT" # 做空 / sell short
|
||
EXIT = "EXIT" # 平仓 / close position
|
||
|
||
|
||
class OrderType(enum.Enum):
|
||
"""
|
||
Order types. 订单类型。
|
||
|
||
MARKET 市价单 — execute immediately at the best available price
|
||
立即以市场最优价格成交,确保成交但价格不确定
|
||
LIMIT 限价单 — execute only at a specified price or better
|
||
只在指定价格或更优价格成交,价格确定但不保证成交
|
||
STOP 止损单 — becomes a market order when price hits the stop level
|
||
价格触及止损价时转变为市价单,用于控制最大亏损
|
||
"""
|
||
MARKET = "MARKET"
|
||
LIMIT = "LIMIT"
|
||
STOP = "STOP"
|
||
|
||
|
||
@dataclass
|
||
class MarketEvent:
|
||
"""
|
||
Triggered when the DataHandler has a new bar of OHLCV data ready.
|
||
当数据处理器有新的 OHLCV K线数据就绪时触发。
|
||
|
||
This is the "heartbeat" of the system — it drives all other components.
|
||
这是系统的"心跳"——驱动所有其他组件。
|
||
|
||
Fields / 字段:
|
||
symbol 股票代码 — which ticker this bar belongs to
|
||
date 日期 — the date of this bar
|
||
open 开盘价 — first trade of the session
|
||
high 最高价 — highest trade during the session
|
||
low 最低价 — lowest trade during the session
|
||
close 收盘价 — last trade of the session
|
||
volume 成交量 — total shares traded
|
||
"""
|
||
type : EventType = field(default=EventType.MARKET, init=False)
|
||
symbol : str = ""
|
||
date : pd.Timestamp = None
|
||
open : float = 0.0
|
||
high : float = 0.0
|
||
low : float = 0.0
|
||
close : float = 0.0
|
||
volume : float = 0.0
|
||
|
||
def __repr__(self):
|
||
return (f"MarketEvent({self.symbol} {self.date.date()} "
|
||
f"O={self.open:.2f} H={self.high:.2f} "
|
||
f"L={self.low:.2f} C={self.close:.2f})")
|
||
|
||
|
||
@dataclass
|
||
class SignalEvent:
|
||
"""
|
||
Generated by a Strategy when it detects a trade opportunity.
|
||
策略检测到交易机会时生成。
|
||
|
||
A SignalEvent is a "suggestion" — the Portfolio decides whether and
|
||
how much to actually trade based on its own risk rules.
|
||
信号事件是一个"建议"——组合管理器根据自身风控规则决定是否以及交易多少。
|
||
|
||
Fields / 字段:
|
||
symbol 股票代码
|
||
date 信号产生日期
|
||
direction 方向: LONG(做多) / SHORT(做空) / EXIT(平仓)
|
||
strength 信号强度 [0, 1] — used for position sizing (仓位管理)
|
||
1.0 = full conviction, 0.5 = half conviction
|
||
strategy 策略名称 — useful when running multiple strategies
|
||
"""
|
||
type : EventType = field(default=EventType.SIGNAL, init=False)
|
||
symbol : str = ""
|
||
date : pd.Timestamp = None
|
||
direction : Direction = Direction.LONG
|
||
strength : float = 1.0 # 信号强度 / signal strength for position sizing
|
||
strategy : str = ""
|
||
|
||
def __repr__(self):
|
||
return (f"SignalEvent({self.symbol} {self.date.date()} "
|
||
f"{self.direction.value} strength={self.strength:.2f})")
|
||
|
||
|
||
@dataclass
|
||
class OrderEvent:
|
||
"""
|
||
Sent from Portfolio to ExecutionHandler to place a trade.
|
||
从组合管理器发送给执行处理器以下单。
|
||
|
||
This is a concrete order: it specifies WHAT to trade and HOW MUCH.
|
||
这是一个具体订单:指定交易什么以及交易多少。
|
||
|
||
Fields / 字段:
|
||
symbol 股票代码
|
||
date 下单日期
|
||
order_type 订单类型 MARKET / LIMIT / STOP
|
||
direction 方向 LONG / SHORT / EXIT
|
||
quantity 数量 number of shares (股数)
|
||
limit_price 限价 for LIMIT orders (限价单价格)
|
||
stop_price 止损价 for STOP orders (止损单价格)
|
||
"""
|
||
type : EventType = field(default=EventType.ORDER, init=False)
|
||
symbol : str = ""
|
||
date : pd.Timestamp = None
|
||
order_type : OrderType = OrderType.MARKET
|
||
direction : Direction = Direction.LONG
|
||
quantity : int = 0
|
||
limit_price : float = 0.0
|
||
stop_price : float = 0.0
|
||
|
||
def __repr__(self):
|
||
return (f"OrderEvent({self.symbol} {self.date.date()} "
|
||
f"{self.order_type.value} {self.direction.value} "
|
||
f"qty={self.quantity})")
|
||
|
||
|
||
@dataclass
|
||
class FillEvent:
|
||
"""
|
||
Confirms that an order has been executed by the broker.
|
||
确认订单已被券商执行(成交回报)。
|
||
|
||
A Fill is the ground truth — it records what ACTUALLY happened.
|
||
成交回报是最终事实——记录实际发生了什么。
|
||
|
||
Fields / 字段:
|
||
symbol 股票代码
|
||
date 成交日期
|
||
direction 成交方向 LONG / SHORT / EXIT
|
||
quantity 成交数量 shares actually filled
|
||
fill_price 成交价格 actual execution price (含滑点 including slippage)
|
||
commission 佣金 brokerage commission
|
||
slippage 滑点 price impact of the trade
|
||
"""
|
||
type : EventType = field(default=EventType.FILL, init=False)
|
||
symbol : str = ""
|
||
date : pd.Timestamp = None
|
||
direction : Direction = Direction.LONG
|
||
quantity : int = 0
|
||
fill_price : float = 0.0
|
||
commission : float = 0.0
|
||
slippage : float = 0.0
|
||
|
||
@property
|
||
def total_cost(self) -> float:
|
||
"""
|
||
Total cash impact of this fill.
|
||
本次成交的总现金影响。
|
||
|
||
For a BUY (LONG): negative (cash decreases) 现金减少
|
||
For a SELL (EXIT): positive (cash increases) 现金增加
|
||
"""
|
||
sign = -1 if self.direction in (Direction.LONG, Direction.SHORT) else +1
|
||
return sign * (self.fill_price * self.quantity + self.commission)
|
||
|
||
def __repr__(self):
|
||
return (f"FillEvent({self.symbol} {self.date.date()} "
|
||
f"{self.direction.value} qty={self.quantity} "
|
||
f"@{self.fill_price:.2f} comm={self.commission:.2f})")
|
||
|
||
|
||
# =============================================================================
|
||
# SECTION 2: DataHandler 数据处理器
|
||
# =============================================================================
|
||
#
|
||
# The DataHandler is responsible for feeding price data into the system
|
||
# one bar at a time. It simulates the reality of not knowing the future.
|
||
#
|
||
# 数据处理器负责逐根K线地向系统输入价格数据。
|
||
# 它模拟了"无法预知未来"的现实。
|
||
#
|
||
# The key design rule:
|
||
# 核心设计规则:
|
||
#
|
||
# ❌ WRONG: strategy sees bar[t] and trades on bar[t] (lookahead bias!)
|
||
# 策略看到 bar[t] 并在 bar[t] 交易(前视偏差!)
|
||
#
|
||
# ✅ RIGHT: strategy sees bar[t], generates signal based on bar[t],
|
||
# order fills at bar[t+1] open (next day open)
|
||
# 策略看到 bar[t],生成信号,订单在 bar[t+1] 的开盘价成交
|
||
#
|
||
# This is why DataHandler, Strategy, Portfolio, and ExecutionHandler are
|
||
# separate — the separation naturally enforces the time boundary.
|
||
# 这就是为什么这四个组件分开——分离自然地强制了时间边界。
|
||
# =============================================================================
|
||
|
||
class DataHandler:
|
||
"""
|
||
Streams historical OHLCV data bar by bar into the event queue.
|
||
将历史 OHLCV 数据逐根K线地流入事件队列。
|
||
|
||
In live trading, this would connect to a real-time market data API.
|
||
在实盘交易中,这里会连接实时行情 API。
|
||
"""
|
||
|
||
def __init__(self, data: pd.DataFrame, symbol: str, event_queue: queue.Queue):
|
||
"""
|
||
Parameters / 参数:
|
||
data — DataFrame with DatetimeIndex and columns:
|
||
带 DatetimeIndex 的 DataFrame,列名为:
|
||
open, high, low, close, volume
|
||
symbol — ticker symbol / 股票代码
|
||
event_queue — the shared event queue / 共享事件队列
|
||
"""
|
||
self.data = data.copy()
|
||
self.symbol = symbol
|
||
self.event_queue = event_queue
|
||
self._dates = list(data.index) # all trading dates
|
||
self._cursor = 0 # current position in history
|
||
self._history = [] # bars seen so far (已观测的K线)
|
||
|
||
@property
|
||
def current_date(self) -> Optional[pd.Timestamp]:
|
||
"""The date of the most recently published bar. 最新已发布K线的日期。"""
|
||
if self._cursor == 0:
|
||
return None
|
||
return self._dates[self._cursor - 1]
|
||
|
||
@property
|
||
def n_bars_seen(self) -> int:
|
||
"""How many bars have been streamed so far. 到目前为止已流出多少根K线。"""
|
||
return self._cursor
|
||
|
||
def has_more_data(self) -> bool:
|
||
"""Returns True if there is at least one more bar to stream."""
|
||
return self._cursor < len(self._dates)
|
||
|
||
def stream_next(self) -> bool:
|
||
"""
|
||
Emit the next bar as a MarketEvent onto the queue.
|
||
将下一根K线作为 MarketEvent 发送到队列。
|
||
|
||
Returns True if a bar was emitted, False if data is exhausted.
|
||
如果发出了K线则返回 True,如果数据已耗尽则返回 False。
|
||
"""
|
||
if not self.has_more_data():
|
||
return False
|
||
|
||
date = self._dates[self._cursor]
|
||
row = self.data.loc[date]
|
||
|
||
# Build the MarketEvent for this bar / 构建本根K线的 MarketEvent
|
||
bar = MarketEvent(
|
||
symbol = self.symbol,
|
||
date = date,
|
||
open = float(row.get("open", row.get("close", 0))),
|
||
high = float(row.get("high", row.get("close", 0))),
|
||
low = float(row.get("low", row.get("close", 0))),
|
||
close = float(row["close"]),
|
||
volume = float(row.get("volume", 0)),
|
||
)
|
||
|
||
# Add to internal history (the strategy can query this)
|
||
# 添加到内部历史记录(策略可以查询)
|
||
self._history.append(bar)
|
||
self._cursor += 1
|
||
|
||
# Push onto the central event queue / 推送到中央事件队列
|
||
self.event_queue.put(bar)
|
||
return True
|
||
|
||
def get_history(self, n: Optional[int] = None) -> List[MarketEvent]:
|
||
"""
|
||
Return the last n bars seen so far (default: all history).
|
||
返回迄今为止看到的最后 n 根K线(默认:全部历史)。
|
||
|
||
This is what the Strategy uses to compute indicators.
|
||
这是策略用于计算技术指标的数据。
|
||
"""
|
||
if n is None:
|
||
return self._history
|
||
return self._history[-n:]
|
||
|
||
def get_close_series(self) -> pd.Series:
|
||
"""
|
||
Return a pd.Series of all close prices seen so far.
|
||
返回到目前为止所有已观测收盘价的 pd.Series。
|
||
|
||
Useful for computing rolling indicators (移动平均等滚动指标).
|
||
"""
|
||
if not self._history:
|
||
return pd.Series(dtype=float)
|
||
return pd.Series(
|
||
[b.close for b in self._history],
|
||
index=[b.date for b in self._history],
|
||
)
|
||
|
||
|
||
# =============================================================================
|
||
# SECTION 3: Strategy (Abstract Base + Concrete Implementations)
|
||
# 策略(抽象基类 + 具体实现)
|
||
# =============================================================================
|
||
#
|
||
# A Strategy:
|
||
# 1. Listens for MarketEvents
|
||
# 2. Computes indicators on the historical data it has seen so far
|
||
# 3. Generates SignalEvents when its rules are triggered
|
||
#
|
||
# 策略:
|
||
# 1. 监听 MarketEvent
|
||
# 2. 根据迄今为止看到的历史数据计算指标
|
||
# 3. 当规则被触发时生成 SignalEvent
|
||
#
|
||
# The abstract base class (ABC) enforces a consistent interface so that
|
||
# we can swap strategies without changing any other component.
|
||
# 抽象基类强制统一接口,这样我们可以替换策略而不改变其他任何组件。
|
||
# =============================================================================
|
||
|
||
class Strategy(ABC):
|
||
"""
|
||
Abstract base class for all strategies.
|
||
所有策略的抽象基类。
|
||
"""
|
||
|
||
def __init__(self, data_handler: DataHandler, event_queue: queue.Queue):
|
||
self.data = data_handler
|
||
self.queue = event_queue
|
||
|
||
@abstractmethod
|
||
def on_market(self, event: MarketEvent) -> None:
|
||
"""
|
||
Called when a new MarketEvent arrives.
|
||
新的 MarketEvent 到达时调用。
|
||
Subclasses implement their signal logic here.
|
||
子类在这里实现信号逻辑。
|
||
"""
|
||
...
|
||
|
||
def send_signal(self, symbol: str, date: pd.Timestamp,
|
||
direction: Direction, strength: float = 1.0) -> None:
|
||
"""Helper to build and queue a SignalEvent. 构建并排队 SignalEvent 的辅助方法。"""
|
||
sig = SignalEvent(
|
||
symbol = symbol,
|
||
date = date,
|
||
direction = direction,
|
||
strength = strength,
|
||
strategy = self.__class__.__name__,
|
||
)
|
||
self.queue.put(sig)
|
||
|
||
|
||
class MACrossoverStrategy(Strategy):
|
||
"""
|
||
Dual Moving Average Crossover Strategy 双均线金叉/死叉策略
|
||
─────────────────────────────────────────────────────────────
|
||
Rules / 规则:
|
||
Golden Cross (金叉): short SMA crosses ABOVE long SMA → BUY (做多)
|
||
Death Cross (死叉): short SMA crosses BELOW long SMA → SELL (平仓)
|
||
|
||
Parameters / 参数:
|
||
short_window 短期均线窗口 (default 20 days)
|
||
long_window 长期均线窗口 (default 60 days)
|
||
"""
|
||
|
||
def __init__(self, data_handler: DataHandler, event_queue: queue.Queue,
|
||
short_window: int = 20, long_window: int = 60):
|
||
super().__init__(data_handler, event_queue)
|
||
self.short_window = short_window
|
||
self.long_window = long_window
|
||
self._in_position = False # are we currently holding a long position?
|
||
# 当前是否持有多仓?
|
||
|
||
def on_market(self, event: MarketEvent) -> None:
|
||
"""
|
||
Called on every new bar. 每根新K线调用一次。
|
||
|
||
We need at least long_window bars before we can compute the long SMA.
|
||
至少需要 long_window 根K线才能计算长期均线。
|
||
"""
|
||
closes = self.data.get_close_series()
|
||
n = len(closes)
|
||
|
||
# Not enough data yet — wait / 数据不足,等待
|
||
if n < self.long_window:
|
||
return
|
||
|
||
# Compute SMAs using only data available up to this bar
|
||
# 仅使用截至本K线的数据计算均线(无前视偏差)
|
||
sma_short = closes.iloc[-self.short_window:].mean()
|
||
sma_long = closes.iloc[-self.long_window:].mean()
|
||
|
||
# Previous bar's SMAs (to detect crossover / 检测交叉)
|
||
if n < self.long_window + 1:
|
||
return # need one extra bar to detect a CHANGE in relationship
|
||
|
||
sma_short_prev = closes.iloc[-(self.short_window + 1):-1].mean()
|
||
sma_long_prev = closes.iloc[-(self.long_window + 1):-1].mean()
|
||
|
||
was_above = sma_short_prev > sma_long_prev # previous relationship
|
||
is_above = sma_short > sma_long # current relationship
|
||
|
||
# ── Golden Cross 金叉 ─────────────────────────────────────────────
|
||
# Short MA just crossed ABOVE long MA → bullish signal
|
||
# 短期均线刚刚上穿长期均线 → 看涨信号
|
||
if is_above and not was_above and not self._in_position:
|
||
self.send_signal(event.symbol, event.date, Direction.LONG, strength=1.0)
|
||
self._in_position = True
|
||
|
||
# ── Death Cross 死叉 ─────────────────────────────────────────────
|
||
# Short MA just crossed BELOW long MA → exit signal
|
||
# 短期均线刚刚下穿长期均线 → 平仓信号
|
||
elif not is_above and was_above and self._in_position:
|
||
self.send_signal(event.symbol, event.date, Direction.EXIT, strength=1.0)
|
||
self._in_position = False
|
||
|
||
|
||
class RSIMeanReversionStrategy(Strategy):
|
||
"""
|
||
RSI Mean Reversion Strategy RSI 均值回归策略
|
||
─────────────────────────────────────────────
|
||
Rules / 规则:
|
||
RSI < oversold (超卖) → BUY (做多)
|
||
RSI > overbought(超买) → SELL EXIT (平多)
|
||
(Also supports going SHORT / 也支持做空)
|
||
|
||
Parameters / 参数:
|
||
window RSI计算窗口 (default 14)
|
||
oversold 超卖阈值 (default 30)
|
||
overbought 超买阈值 (default 70)
|
||
"""
|
||
|
||
def __init__(self, data_handler: DataHandler, event_queue: queue.Queue,
|
||
window: int = 14, oversold: float = 30, overbought: float = 70):
|
||
super().__init__(data_handler, event_queue)
|
||
self.window = window
|
||
self.oversold = oversold
|
||
self.overbought = overbought
|
||
self._position = 0 # 0=flat, 1=long, -1=short (0=空仓, 1=多仓, -1=空仓)
|
||
|
||
def _compute_rsi(self, closes: pd.Series) -> float:
|
||
"""
|
||
Compute the current RSI value using Wilder's smoothing.
|
||
使用 Wilder 平滑法计算当前 RSI 值。
|
||
"""
|
||
if len(closes) < self.window + 1:
|
||
return 50.0 # neutral / 中性,数据不足时返回中性值
|
||
|
||
delta = closes.diff()
|
||
gain = delta.clip(lower=0)
|
||
loss = -delta.clip(upper=0)
|
||
avg_gain = gain.ewm(alpha=1.0 / self.window, adjust=False).mean().iloc[-1]
|
||
avg_loss = loss.ewm(alpha=1.0 / self.window, adjust=False).mean().iloc[-1]
|
||
|
||
if avg_loss == 0:
|
||
return 100.0
|
||
rs = avg_gain / avg_loss
|
||
return 100.0 - (100.0 / (1.0 + rs))
|
||
|
||
def on_market(self, event: MarketEvent) -> None:
|
||
closes = self.data.get_close_series()
|
||
rsi = self._compute_rsi(closes)
|
||
|
||
# ── Entry: Oversold → Go Long 入场:超卖 → 做多 ──────────────────
|
||
if rsi < self.oversold and self._position == 0:
|
||
self.send_signal(event.symbol, event.date, Direction.LONG, strength=0.8)
|
||
self._position = 1
|
||
|
||
# ── Entry: Overbought → Go Short 入场:超买 → 做空 ───────────────
|
||
elif rsi > self.overbought and self._position == 0:
|
||
self.send_signal(event.symbol, event.date, Direction.SHORT, strength=0.8)
|
||
self._position = -1
|
||
|
||
# ── Exit Long when RSI recovers to neutral 多仓RSI回到中性时平仓 ──
|
||
elif self._position == 1 and rsi > 50:
|
||
self.send_signal(event.symbol, event.date, Direction.EXIT, strength=1.0)
|
||
self._position = 0
|
||
|
||
# ── Exit Short when RSI falls back to neutral 空仓RSI回到中性时平仓
|
||
elif self._position == -1 and rsi < 50:
|
||
self.send_signal(event.symbol, event.date, Direction.EXIT, strength=1.0)
|
||
self._position = 0
|
||
|
||
|
||
# =============================================================================
|
||
# SECTION 4: Portfolio 组合管理器
|
||
# =============================================================================
|
||
#
|
||
# The Portfolio is the "brain" that manages money. It:
|
||
# 1. Receives SignalEvents from the Strategy
|
||
# 2. Decides HOW MUCH to trade (position sizing / 仓位管理)
|
||
# 3. Creates OrderEvents for the ExecutionHandler
|
||
# 4. Receives FillEvents from the broker and updates:
|
||
# - Cash balance (现金余额)
|
||
# - Positions (持仓)
|
||
# - Realized P&L (已实现盈亏)
|
||
# - Unrealized P&L(未实现盈亏)
|
||
# 5. Records the daily equity (净值) for performance analysis
|
||
#
|
||
# 组合管理器是管理资金的"大脑"。它接收策略信号,决定交易多少,
|
||
# 向执行器发出订单,接收成交回报并更新现金/持仓/盈亏,记录每日净值。
|
||
#
|
||
# Position Sizing (仓位管理) methods / 常见方法:
|
||
# Fixed Fraction 固定比例 — always use X% of capital (简单,本文采用)
|
||
# Fixed Dollar 固定金额 — always trade $N worth
|
||
# Kelly Criterion 凯利公式 — optimal fraction based on edge & odds
|
||
# Volatility Scaling 波动率缩放 — scale size to target a fixed daily risk
|
||
# =============================================================================
|
||
|
||
class Portfolio:
|
||
"""
|
||
Tracks cash, positions, and P&L. Records equity curve.
|
||
追踪现金、持仓和盈亏,记录净值曲线。
|
||
"""
|
||
|
||
def __init__(
|
||
self,
|
||
data_handler : DataHandler,
|
||
event_queue : queue.Queue,
|
||
initial_capital: float = 1_000_000.0,
|
||
position_pct : float = 0.95, # fraction of capital to deploy per trade
|
||
# 每次交易使用资本的比例 (95%)
|
||
):
|
||
self.data = data_handler
|
||
self.queue = event_queue
|
||
self.initial_capital = initial_capital
|
||
self.position_pct = position_pct
|
||
|
||
# ── Account state 账户状态 ──────────────────────────────────────
|
||
self.cash = initial_capital # 现金余额 / available cash
|
||
self.positions : Dict[str, int] = {} # symbol → qty (持仓数量)
|
||
self.avg_cost : Dict[str, float] = {} # symbol → avg entry price (持仓均价)
|
||
|
||
# ── Trade log 交易记录 ───────────────────────────────────────────
|
||
# Every completed fill gets recorded here for analysis.
|
||
# 每笔完成的成交都记录在这里以供分析。
|
||
self.trade_log: List[dict] = []
|
||
|
||
# ── Equity curve 净值曲线 ────────────────────────────────────────
|
||
# Recorded once per day after market close.
|
||
# 每天收盘后记录一次。
|
||
self.equity_curve: List[dict] = []
|
||
|
||
# ── Computed properties 计算属性 ─────────────────────────────────────────
|
||
|
||
def market_value(self, current_prices: Dict[str, float]) -> float:
|
||
"""
|
||
Current market value of all open positions.
|
||
所有开放持仓的当前市值。
|
||
市值 = Σ(持仓数量 × 当前价格)
|
||
"""
|
||
return sum(
|
||
qty * current_prices.get(sym, 0.0)
|
||
for sym, qty in self.positions.items()
|
||
)
|
||
|
||
def total_equity(self, current_prices: Dict[str, float]) -> float:
|
||
"""
|
||
Total portfolio value: cash + market value of positions.
|
||
账户总价值:现金 + 持仓市值。
|
||
总资产 = 现金余额 + 持仓市值
|
||
"""
|
||
return self.cash + self.market_value(current_prices)
|
||
|
||
# ── Event handlers 事件处理器 ─────────────────────────────────────────────
|
||
|
||
def on_signal(self, event: SignalEvent) -> None:
|
||
"""
|
||
Convert a SignalEvent into an OrderEvent.
|
||
将 SignalEvent 转换为 OrderEvent(决定下多少单)。
|
||
|
||
This is where position sizing (仓位管理) happens.
|
||
这里进行仓位计算。
|
||
"""
|
||
symbol = event.symbol
|
||
direction = event.direction
|
||
|
||
# ── Determine quantity 计算交易数量 ─────────────────────────────
|
||
# We use Fixed Fraction sizing: deploy position_pct of current equity.
|
||
# 使用固定比例法:每次使用当前总资产的 position_pct。
|
||
#
|
||
# quantity = floor( equity × position_pct × strength / current_price )
|
||
# 数量 = floor( 总资产 × 仓位比例 × 信号强度 / 当前价格 )
|
||
|
||
current_price = self.data.get_close_series().iloc[-1]
|
||
|
||
if direction == Direction.EXIT:
|
||
# Close the entire existing position / 平掉全部现有持仓
|
||
qty = abs(self.positions.get(symbol, 0))
|
||
if qty == 0:
|
||
return # nothing to close / 没有持仓可平
|
||
else:
|
||
# New entry — compute share count / 新入场——计算股数
|
||
capital_to_deploy = self.cash * self.position_pct * event.strength
|
||
qty = int(capital_to_deploy / current_price)
|
||
if qty == 0:
|
||
return # not enough cash / 资金不足
|
||
|
||
order = OrderEvent(
|
||
symbol = symbol,
|
||
date = event.date,
|
||
order_type = OrderType.MARKET, # use market orders for simplicity
|
||
direction = direction,
|
||
quantity = qty,
|
||
)
|
||
self.queue.put(order)
|
||
|
||
def on_fill(self, event: FillEvent) -> None:
|
||
"""
|
||
Update cash and positions when a fill is confirmed.
|
||
成交确认后更新现金和持仓。
|
||
"""
|
||
symbol = event.symbol
|
||
qty = event.quantity
|
||
price = event.fill_price
|
||
comm = event.commission
|
||
|
||
if event.direction in (Direction.LONG, Direction.SHORT):
|
||
# ── Opening a position 建仓 ──────────────────────────────────
|
||
sign = 1 if event.direction == Direction.LONG else -1
|
||
|
||
# Update average cost 更新持仓均价
|
||
old_qty = self.positions.get(symbol, 0)
|
||
old_cost = self.avg_cost.get(symbol, 0.0)
|
||
new_qty = old_qty + sign * qty
|
||
if new_qty != 0:
|
||
# Weighted average cost 加权平均成本
|
||
self.avg_cost[symbol] = (
|
||
(old_cost * abs(old_qty) + price * qty) / abs(new_qty)
|
||
)
|
||
self.positions[symbol] = new_qty
|
||
|
||
# Deduct cash (buy) / add cash (short sell)
|
||
# 扣除现金(买入)/ 增加现金(卖空)
|
||
self.cash -= sign * (price * qty) + comm
|
||
|
||
elif event.direction == Direction.EXIT:
|
||
# ── Closing a position 平仓 ──────────────────────────────────
|
||
entry_price = self.avg_cost.get(symbol, price)
|
||
pos_sign = 1 if self.positions.get(symbol, 0) > 0 else -1
|
||
|
||
# Realized P&L 已实现盈亏
|
||
# P&L = (exit_price - entry_price) × qty × direction
|
||
pnl = pos_sign * (price - entry_price) * qty - comm
|
||
|
||
# Return cash from closing the position
|
||
# 平仓回收现金
|
||
self.cash += pos_sign * price * qty - comm
|
||
|
||
# Clear position 清除持仓
|
||
self.positions[symbol] = 0
|
||
self.avg_cost[symbol] = 0.0
|
||
|
||
# Log the trade 记录交易
|
||
self.trade_log.append({
|
||
"date" : event.date,
|
||
"symbol" : symbol,
|
||
"direction" : "LONG→EXIT" if pos_sign == 1 else "SHORT→EXIT",
|
||
"entry_price" : entry_price,
|
||
"exit_price" : price,
|
||
"quantity" : qty,
|
||
"pnl" : pnl,
|
||
"commission" : comm,
|
||
})
|
||
|
||
def record_equity(self, date: pd.Timestamp, current_price: float) -> None:
|
||
"""
|
||
Snapshot the portfolio value at end of day.
|
||
记录每日收盘时的组合价值快照。
|
||
"""
|
||
prices = {self.data.symbol: current_price}
|
||
self.equity_curve.append({
|
||
"date" : date,
|
||
"cash" : self.cash,
|
||
"market_value" : self.market_value(prices),
|
||
"total_equity" : self.total_equity(prices),
|
||
})
|
||
|
||
def get_equity_series(self) -> pd.Series:
|
||
"""Return equity curve as a pd.Series. 返回净值曲线为 pd.Series。"""
|
||
if not self.equity_curve:
|
||
return pd.Series(dtype=float)
|
||
df = pd.DataFrame(self.equity_curve).set_index("date")
|
||
return df["total_equity"]
|
||
|
||
|
||
# =============================================================================
|
||
# SECTION 5: ExecutionHandler (Simulated Broker)
|
||
# 执行处理器(模拟券商)
|
||
# =============================================================================
|
||
#
|
||
# In live trading, this connects to a real broker API (e.g. Interactive
|
||
# Brokers, Alpaca, or a Chinese broker via vnpy).
|
||
# 在实盘交易中,这里连接真实券商 API(如 Interactive Brokers、Alpaca
|
||
# 或通过 vnpy 连接国内券商)。
|
||
#
|
||
# In backtesting, we SIMULATE the broker. The key costs to model are:
|
||
# 在回测中,我们模拟券商。需要建模的主要成本:
|
||
#
|
||
# 1. Commission 佣金
|
||
# A fixed percentage of trade value charged by the broker.
|
||
# 券商按交易金额收取的固定比例费用。
|
||
# Typical in China A-shares: 万分之三 (0.03%) per side
|
||
# 中国A股典型佣金:单边万分之三
|
||
#
|
||
# 2. Slippage 滑点
|
||
# The difference between the price at signal time and actual fill price.
|
||
# 信号生成时的价格与实际成交价格之间的差异。
|
||
# Causes: bid-ask spread, market impact, order queue position.
|
||
# 原因:买卖价差、市场冲击、排队等待。
|
||
#
|
||
# Slippage models / 滑点模型:
|
||
# Fixed percentage 固定比例 — simple, conservative
|
||
# Volume-weighted 成交量加权 — larger orders → more slippage
|
||
# Fixed spread 固定点差 — realistic for liquid markets
|
||
#
|
||
# 3. Fill Assumption 成交假设
|
||
# We assume market orders fill at next bar's OPEN price.
|
||
# 我们假设市价单在下一根K线的开盘价成交。
|
||
# This is a conservative and common assumption.
|
||
# 这是保守且常用的假设。
|
||
# =============================================================================
|
||
|
||
class SimulatedBroker:
|
||
"""
|
||
Simulates a brokerage: fills market orders with realistic costs.
|
||
模拟券商:以真实成本成交市价单。
|
||
"""
|
||
|
||
def __init__(
|
||
self,
|
||
event_queue : queue.Queue,
|
||
commission_rate : float = 0.0003, # 0.03% per side / 单边万分之三
|
||
slippage_rate : float = 0.0001, # 0.01% slippage / 万分之一滑点
|
||
fill_on : str = "next_open", # when to fill / 何时成交
|
||
min_commission : float = 5.0, # minimum commission / 最低佣金
|
||
):
|
||
"""
|
||
Parameters / 参数:
|
||
commission_rate 佣金率 (per side / 单边)
|
||
slippage_rate 滑点率 (applied as adverse price move / 不利价格移动)
|
||
fill_on 成交时机 'next_open' or 'current_close'
|
||
min_commission 最低佣金
|
||
"""
|
||
self.queue = event_queue
|
||
self.commission_rate = commission_rate
|
||
self.slippage_rate = slippage_rate
|
||
self.fill_on = fill_on
|
||
self.min_commission = min_commission
|
||
|
||
# Store pending orders to fill on next bar's open
|
||
# 存储待处理订单,在下一根K线开盘价成交
|
||
self._pending_orders: List[OrderEvent] = []
|
||
|
||
def on_order(self, event: OrderEvent) -> None:
|
||
"""
|
||
Receive an order from Portfolio.
|
||
接收来自组合管理器的订单。
|
||
|
||
For market orders, we queue them to fill at next open.
|
||
对于市价单,我们将其排队,在下一根K线的开盘价成交。
|
||
"""
|
||
if event.order_type == OrderType.MARKET:
|
||
self._pending_orders.append(event)
|
||
# Limit and Stop orders would need price monitoring — not implemented here
|
||
# 限价单和止损单需要价格监控——此处未实现,留作扩展
|
||
|
||
def fill_pending(self, bar: MarketEvent) -> None:
|
||
"""
|
||
Fill all pending orders at the open of the provided bar.
|
||
以提供K线的开盘价成交所有待处理订单。
|
||
|
||
This is called at the start of each new bar, before the Strategy
|
||
gets to process it.
|
||
每根新K线开始时(策略处理之前)调用。
|
||
"""
|
||
for order in self._pending_orders:
|
||
fill_price = self._compute_fill_price(order, bar)
|
||
commission = self._compute_commission(order, fill_price)
|
||
slippage = fill_price - bar.open # adverse difference from open
|
||
|
||
fill = FillEvent(
|
||
symbol = order.symbol,
|
||
date = bar.date, # filled on THIS bar's date
|
||
direction = order.direction,
|
||
quantity = order.quantity,
|
||
fill_price = fill_price,
|
||
commission = commission,
|
||
slippage = slippage,
|
||
)
|
||
self.queue.put(fill)
|
||
|
||
self._pending_orders.clear()
|
||
|
||
def _compute_fill_price(self, order: OrderEvent, bar: MarketEvent) -> float:
|
||
"""
|
||
Apply slippage to the open price.
|
||
对开盘价施加滑点。
|
||
|
||
Slippage always works AGAINST the trader:
|
||
滑点总是对交易者不利:
|
||
BUY (LONG) : fill above open (买入时成交价高于开盘价)
|
||
SELL (EXIT) : fill below open (卖出时成交价低于开盘价)
|
||
"""
|
||
direction = order.direction
|
||
if direction in (Direction.LONG, Direction.SHORT):
|
||
# Buying: slippage pushes price UP 买入:滑点使价格上升
|
||
return bar.open * (1 + self.slippage_rate)
|
||
else:
|
||
# Selling: slippage pushes price DOWN 卖出:滑点使价格下降
|
||
return bar.open * (1 - self.slippage_rate)
|
||
|
||
def _compute_commission(self, order: OrderEvent, fill_price: float) -> float:
|
||
"""
|
||
Compute brokerage commission.
|
||
计算券商佣金。
|
||
|
||
commission = max(commission_rate × trade_value, min_commission)
|
||
佣金 = max(佣金率 × 交易金额, 最低佣金)
|
||
"""
|
||
trade_value = fill_price * order.quantity
|
||
commission = max(trade_value * self.commission_rate, self.min_commission)
|
||
return commission
|
||
|
||
|
||
# =============================================================================
|
||
# SECTION 6: BacktestEngine 回测引擎(主事件循环)
|
||
# =============================================================================
|
||
#
|
||
# The BacktestEngine is the conductor — it orchestrates all components by
|
||
# running the main event loop.
|
||
# 回测引擎是指挥者——通过运行主事件循环来协调所有组件。
|
||
#
|
||
# Main Loop Algorithm 主循环算法:
|
||
# ──────────────────────────────────────────────────────────────────────────
|
||
# WHILE data feed has more bars:
|
||
# 1. DataHandler.stream_next() → puts MarketEvent on queue
|
||
# 2. Broker.fill_pending(bar) → fills any leftover orders FIRST
|
||
# (orders placed yesterday fill at today's open — no lookahead!)
|
||
# (昨天的订单今天开盘价成交 — 无前视偏差!)
|
||
# 3. WHILE queue is not empty:
|
||
# event = queue.get()
|
||
# IF MarketEvent → Strategy.on_market(event)
|
||
# ELIF SignalEvent → Portfolio.on_signal(event)
|
||
# ELIF OrderEvent → Broker.on_order(event)
|
||
# ELIF FillEvent → Portfolio.on_fill(event)
|
||
# 4. Portfolio.record_equity(today's close)
|
||
# END WHILE
|
||
# ──────────────────────────────────────────────────────────────────────────
|
||
#
|
||
# Why is step 2 (fill_pending) before step 3 (process queue)?
|
||
# 为什么步骤2(成交待处理)在步骤3(处理队列)之前?
|
||
#
|
||
# Orders generated on day T are pending. On day T+1 the broker fills them
|
||
# at the open BEFORE the strategy sees day T+1's data.
|
||
# T日生成的订单是待处理状态。T+1日的开盘价成交(在策略看到T+1数据之前)。
|
||
# This correctly models the one-day execution delay of real trading.
|
||
# 这正确模拟了真实交易的一天执行延迟。
|
||
# =============================================================================
|
||
|
||
class BacktestEngine:
|
||
"""
|
||
Main event-driven backtest engine. 主事件驱动回测引擎。
|
||
Wires together DataHandler, Strategy, Portfolio, and Broker.
|
||
将 DataHandler、Strategy、Portfolio 和 Broker 组合在一起。
|
||
"""
|
||
|
||
def __init__(
|
||
self,
|
||
data_handler : DataHandler,
|
||
strategy : Strategy,
|
||
portfolio : Portfolio,
|
||
broker : SimulatedBroker,
|
||
name : str = "Event-Driven Backtest",
|
||
):
|
||
self.data = data_handler
|
||
self.strategy = strategy
|
||
self.portfolio = portfolio
|
||
self.broker = broker
|
||
self.name = name
|
||
|
||
# All components share this single event queue / 所有组件共享这一事件队列
|
||
assert (data_handler.event_queue is strategy.queue is
|
||
portfolio.queue is broker.queue), \
|
||
"All components must share the same event queue!"
|
||
|
||
self._queue = data_handler.event_queue
|
||
|
||
def run(self, verbose: bool = False) -> None:
|
||
"""
|
||
Execute the backtest from start to finish.
|
||
从头到尾执行回测。
|
||
|
||
Parameters / 参数:
|
||
verbose — if True, print every event (useful for debugging)
|
||
如果为 True,打印每个事件(用于调试)
|
||
"""
|
||
bar_count = 0
|
||
event_count= 0
|
||
|
||
print(f"\n{'─' * 60}")
|
||
print(f" 回测开始: {self.name}")
|
||
print(f" Backtest Start: {self.data.data.index[0].date()} → "
|
||
f"{self.data.data.index[-1].date()}")
|
||
print(f"{'─' * 60}")
|
||
|
||
# ── Main loop 主循环 ─────────────────────────────────────────────
|
||
while self.data.has_more_data():
|
||
|
||
# ── Step 1: Stream the next bar 流出下一根K线 ─────────────────
|
||
# This puts a MarketEvent on the queue.
|
||
# 这会将一个 MarketEvent 放入队列。
|
||
self.data.stream_next()
|
||
bar_count += 1
|
||
|
||
# ── Step 2: Fill yesterday's pending orders at today's open ───
|
||
# ── 步骤2:以今天的开盘价成交昨天的待处理订单 ─────────────────
|
||
current_bar = self.data._history[-1]
|
||
self.broker.fill_pending(current_bar)
|
||
|
||
# ── Step 3: Process all events in the queue 处理队列中的所有事件
|
||
while not self._queue.empty():
|
||
event = self._queue.get()
|
||
event_count += 1
|
||
|
||
if verbose:
|
||
print(f" [{event.type.value}] {event}")
|
||
|
||
if event.type == EventType.MARKET:
|
||
# Strategy sees the new price bar and may emit a signal
|
||
# 策略看到新K线,可能发出信号
|
||
self.strategy.on_market(event)
|
||
|
||
elif event.type == EventType.SIGNAL:
|
||
# Portfolio receives signal and may emit an order
|
||
# 组合管理器接收信号,可能发出订单
|
||
self.portfolio.on_signal(event)
|
||
|
||
elif event.type == EventType.ORDER:
|
||
# Broker receives the order (will fill on next bar's open)
|
||
# 券商接收订单(将在下一根K线的开盘价成交)
|
||
self.broker.on_order(event)
|
||
|
||
elif event.type == EventType.FILL:
|
||
# Portfolio updates its cash & position records
|
||
# 组合管理器更新现金和持仓记录
|
||
self.portfolio.on_fill(event)
|
||
|
||
# ── Step 4: Record end-of-day equity 记录当日收盘净值 ───────
|
||
self.portfolio.record_equity(current_bar.date, current_bar.close)
|
||
|
||
n_trades = len(self.portfolio.trade_log)
|
||
print(f" 回测完成! Backtest Complete!")
|
||
print(f" 总K线数: {bar_count} 总事件数: {event_count} 总交易数: {n_trades}")
|
||
print(f"{'─' * 60}")
|
||
|
||
# ── Performance metrics 绩效指标 ─────────────────────────────────────────
|
||
|
||
def metrics(self) -> dict:
|
||
"""
|
||
Compute performance metrics from the equity curve.
|
||
从净值曲线计算绩效指标。
|
||
"""
|
||
eq = self.portfolio.get_equity_series()
|
||
ret = eq.pct_change().fillna(0)
|
||
n = len(ret)
|
||
years = n / 252.0
|
||
|
||
total_return = (eq.iloc[-1] / self.portfolio.initial_capital) - 1
|
||
cagr = (1 + total_return) ** (1 / years) - 1 if years > 0 else 0
|
||
ann_vol = ret.std() * np.sqrt(252)
|
||
sharpe = (ret.mean() / ret.std()) * np.sqrt(252) if ret.std() > 0 else 0
|
||
|
||
downside_std = ret[ret < 0].std() * np.sqrt(252)
|
||
sortino = cagr / downside_std if downside_std > 0 else 0
|
||
|
||
rolling_max = eq.cummax()
|
||
drawdown = (eq - rolling_max) / rolling_max
|
||
max_dd = drawdown.min()
|
||
calmar = cagr / abs(max_dd) if max_dd != 0 else 0
|
||
win_rate = (ret > 0).mean()
|
||
|
||
# Trade-level statistics 交易级别统计
|
||
trades = self.portfolio.trade_log
|
||
n_trades = len(trades)
|
||
if n_trades > 0:
|
||
pnls = [t["pnl"] for t in trades]
|
||
win_trades = [p for p in pnls if p > 0]
|
||
loss_trades= [p for p in pnls if p < 0]
|
||
trade_win_rate = len(win_trades) / n_trades
|
||
avg_win = np.mean(win_trades) if win_trades else 0
|
||
avg_loss = np.mean(loss_trades) if loss_trades else 0
|
||
profit_factor = (sum(win_trades) / abs(sum(loss_trades))
|
||
if loss_trades else np.inf)
|
||
else:
|
||
trade_win_rate = profit_factor = avg_win = avg_loss = 0
|
||
|
||
return {
|
||
"总收益率 Total Return" : f"{total_return:.2%}",
|
||
"年化收益率 CAGR" : f"{cagr:.2%}",
|
||
"年化波动率 Ann. Volatility" : f"{ann_vol:.2%}",
|
||
"夏普比率 Sharpe Ratio" : f"{sharpe:.3f}",
|
||
"索提诺比率 Sortino Ratio" : f"{sortino:.3f}",
|
||
"最大回撤 Max Drawdown" : f"{max_dd:.2%}",
|
||
"卡玛比率 Calmar Ratio" : f"{calmar:.3f}",
|
||
"日胜率 Daily Win Rate" : f"{win_rate:.2%}",
|
||
"交易次数 # Trades (round trip)": str(n_trades),
|
||
"交易胜率 Trade Win Rate" : f"{trade_win_rate:.2%}",
|
||
"平均盈利 Avg Win (per trade)" : f"¥{avg_win:,.0f}",
|
||
"平均亏损 Avg Loss (per trade)": f"¥{avg_loss:,.0f}",
|
||
"盈亏比 Profit Factor" : f"{profit_factor:.3f}",
|
||
}
|
||
|
||
def print_metrics(self):
|
||
"""Pretty-print the performance report. 格式化打印绩效报告。"""
|
||
print(f"\n{'=' * 60}")
|
||
print(f" 策略绩效报告 / Performance Report")
|
||
print(f" {self.name}")
|
||
print(f"{'=' * 60}")
|
||
for k, v in self.metrics().items():
|
||
print(f" {k:<40} {v}")
|
||
|
||
print(f"\n ── 最近 5 笔交易 Last 5 Trades ────────────────────────")
|
||
trades = self.portfolio.trade_log[-5:]
|
||
if trades:
|
||
for t in trades:
|
||
pnl_sign = "+" if t["pnl"] >= 0 else ""
|
||
print(f" {t['date'].date()} {t['direction']:<12} "
|
||
f"进价 {t['entry_price']:>8.2f} 出价 {t['exit_price']:>8.2f} "
|
||
f"股数 {t['quantity']:>5} "
|
||
f"盈亏 {pnl_sign}{t['pnl']:>10,.0f}¥")
|
||
else:
|
||
print(" (无完整交易 / No completed round-trip trades)")
|
||
print(f"{'=' * 60}")
|
||
|
||
|
||
# =============================================================================
|
||
# SECTION 7: Assemble & Run 组装并运行
|
||
# =============================================================================
|
||
|
||
# ── Generate synthetic price data (same seed as previous demo for consistency)
|
||
# ── 生成合成价格数据(与前一个演示相同的随机种子以保持一致性)
|
||
|
||
def generate_price_series(n_days=1500, mu=0.10, sigma=0.25, s0=100.0, seed=42):
|
||
"""Geometric Brownian Motion price series. 几何布朗运动价格序列。"""
|
||
np.random.seed(seed)
|
||
dt = 1.0 / 252
|
||
eps = np.random.randn(n_days)
|
||
log_ret = (mu - 0.5 * sigma**2) * dt + sigma * np.sqrt(dt) * eps
|
||
prices = s0 * np.exp(np.cumsum(log_ret))
|
||
dates = pd.bdate_range(start="2019-01-02", periods=n_days)
|
||
|
||
# Build a realistic OHLCV DataFrame 构建真实的 OHLCV DataFrame
|
||
daily_range = prices * np.abs(np.random.randn(n_days)) * 0.015
|
||
opens = prices * (1 + np.random.randn(n_days) * 0.003)
|
||
highs = np.maximum(prices, opens) + np.abs(np.random.randn(n_days)) * daily_range * 0.5
|
||
lows = np.minimum(prices, opens) - np.abs(np.random.randn(n_days)) * daily_range * 0.5
|
||
vols = np.random.lognormal(mean=14.5, sigma=0.5, size=n_days).astype(int)
|
||
|
||
return pd.DataFrame({
|
||
"open": opens, "high": highs, "low": lows,
|
||
"close": prices, "volume": vols,
|
||
}, index=dates)
|
||
|
||
|
||
ohlcv = generate_price_series()
|
||
SYMBOL = "SIM_STOCK"
|
||
|
||
print(f"\n[数据] 生成模拟 OHLCV 数据:")
|
||
print(f" 交易日数: {len(ohlcv)}")
|
||
print(f" 日期范围: {ohlcv.index[0].date()} → {ohlcv.index[-1].date()}")
|
||
print(f" 价格区间: {ohlcv['close'].min():.2f} ~ {ohlcv['close'].max():.2f}")
|
||
|
||
|
||
def build_engine(strategy_class, strategy_kwargs: dict,
|
||
name: str, commission=0.0003, slippage=0.0001
|
||
) -> BacktestEngine:
|
||
"""
|
||
Factory function: creates a fresh, independent backtest engine.
|
||
工厂函数:创建一个全新的、独立的回测引擎。
|
||
每次回测共享同样的价格数据,但所有组件(事件队列、组合、策略)
|
||
都是独立实例,相互不干扰。
|
||
"""
|
||
q = queue.Queue() # fresh queue / 全新事件队列
|
||
data = DataHandler(ohlcv, SYMBOL, q) # streams OHLCV bar by bar
|
||
strategy = strategy_class(data, q, **strategy_kwargs)
|
||
portfolio = Portfolio(data, q, initial_capital=1_000_000.0)
|
||
broker = SimulatedBroker(q, commission_rate=commission,
|
||
slippage_rate=slippage)
|
||
return BacktestEngine(data, strategy, portfolio, broker, name=name)
|
||
|
||
|
||
# ── Run MA Crossover 运行双均线策略 ──────────────────────────────────────────
|
||
engine_ma = build_engine(
|
||
MACrossoverStrategy,
|
||
{"short_window": 20, "long_window": 60},
|
||
name="事件驱动 — 双均线策略 (MA Crossover 20/60)",
|
||
)
|
||
engine_ma.run(verbose=False)
|
||
engine_ma.print_metrics()
|
||
|
||
# ── Run RSI Mean Reversion 运行RSI均值回归策略 ──────────────────────────────
|
||
engine_rsi = build_engine(
|
||
RSIMeanReversionStrategy,
|
||
{"window": 14, "oversold": 30, "overbought": 70},
|
||
name="事件驱动 — RSI均值回归 (RSI Mean Reversion 14/30/70)",
|
||
)
|
||
engine_rsi.run(verbose=False)
|
||
engine_rsi.print_metrics()
|
||
|
||
|
||
# =============================================================================
|
||
# SECTION 8: Visualization 可视化
|
||
# =============================================================================
|
||
|
||
fig = plt.figure(figsize=(16, 20))
|
||
gs = gridspec.GridSpec(5, 2, figure=fig, hspace=0.5, wspace=0.32)
|
||
|
||
price_series = ohlcv["close"]
|
||
|
||
# ── Plot 1 (top, full width): Price + MA signals
|
||
# ── 图1(顶部,全宽): 价格 + 均线信号
|
||
ax1 = fig.add_subplot(gs[0, :])
|
||
ax1.plot(price_series, color="#1f77b4", linewidth=1, alpha=0.9, label="价格 Close")
|
||
sma20 = price_series.rolling(20).mean()
|
||
sma60 = price_series.rolling(60).mean()
|
||
ax1.plot(sma20, color="orange", linewidth=1.3, label="SMA20 (快线)")
|
||
ax1.plot(sma60, color="crimson", linewidth=1.3, label="SMA60 (慢线)")
|
||
|
||
# Mark trade entries / exits from the event-driven engine
|
||
# 标记事件驱动引擎的交易进出场点
|
||
for trade in engine_ma.portfolio.trade_log:
|
||
color = "green" if "LONG" in trade["direction"] else "red"
|
||
ax1.axvline(x=trade["date"], color=color, alpha=0.25, linewidth=0.8)
|
||
|
||
ax1.set_title("双均线策略 — 价格与信号 (MA Crossover: Price + Signals)",
|
||
fontsize=12, fontweight="bold")
|
||
ax1.legend(loc="upper left", fontsize=9)
|
||
ax1.set_ylabel("价格 Price")
|
||
ax1.grid(alpha=0.3)
|
||
|
||
# ── Plot 2 (full width): Event flow detail for one trade
|
||
# ── 图2(全宽): 一笔交易的事件流细节(放大展示)
|
||
ax2 = fig.add_subplot(gs[1, :])
|
||
# Show a 120-day window around the first trade to illustrate the event flow
|
||
# 展示第一笔交易前后120天的窗口,说明事件流
|
||
if engine_ma.portfolio.trade_log:
|
||
trade0 = engine_ma.portfolio.trade_log[0]
|
||
focus_date = trade0["date"]
|
||
window = pd.Timedelta(days=90)
|
||
mask = (price_series.index >= focus_date - window) & \
|
||
(price_series.index <= focus_date + window)
|
||
ax2.plot(price_series[mask], color="#1f77b4", linewidth=1.5)
|
||
ax2.plot(sma20[mask], color="orange", linewidth=1.2, linestyle="--", label="SMA20")
|
||
ax2.plot(sma60[mask], color="crimson", linewidth=1.2, linestyle="--", label="SMA60")
|
||
# Mark the exit point
|
||
ax2.axvline(x=focus_date, color="red", linewidth=1.5,
|
||
label=f"平仓 Exit @ {focus_date.date()}")
|
||
ax2.set_title(
|
||
f"放大: 第1笔交易前后 90 天 "
|
||
f"(Zoom: 90 days around Trade #1 Exit)\n"
|
||
f"进价 Entry ¥{trade0['entry_price']:.2f} "
|
||
f"出价 Exit ¥{trade0['exit_price']:.2f} "
|
||
f"盈亏 P&L ¥{trade0['pnl']:,.0f}",
|
||
fontsize=10, fontweight="bold",
|
||
)
|
||
ax2.legend(fontsize=9)
|
||
ax2.grid(alpha=0.3)
|
||
|
||
# ── Plot 3 (left): Equity curves 净值曲线
|
||
ax3 = fig.add_subplot(gs[2, 0])
|
||
eq_ma = engine_ma.portfolio.get_equity_series()
|
||
eq_rsi = engine_rsi.portfolio.get_equity_series()
|
||
eq_bh = 1_000_000 * (1 + price_series.pct_change().fillna(0)).cumprod()
|
||
|
||
ax3.plot(eq_ma, color="steelblue", linewidth=1.5, label="MA策略")
|
||
ax3.plot(eq_rsi, color="purple", linewidth=1.5, label="RSI策略")
|
||
ax3.plot(eq_bh, color="gray", linewidth=1.2, linestyle="--", label="Buy & Hold")
|
||
ax3.set_title("净值曲线对比 (Equity Curves)", fontsize=11, fontweight="bold")
|
||
ax3.set_ylabel("账户价值 Portfolio Value")
|
||
ax3.yaxis.set_major_formatter(plt.FuncFormatter(lambda x, _: f"¥{x/1e4:.0f}万"))
|
||
ax3.legend(fontsize=9)
|
||
ax3.grid(alpha=0.3)
|
||
|
||
# ── Plot 4 (right): Drawdown 回撤曲线
|
||
ax4 = fig.add_subplot(gs[2, 1])
|
||
def compute_dd(eq):
|
||
return (eq - eq.cummax()) / eq.cummax()
|
||
|
||
dd_ma = compute_dd(eq_ma)
|
||
dd_rsi = compute_dd(eq_rsi)
|
||
dd_bh = compute_dd(eq_bh)
|
||
|
||
ax4.fill_between(dd_ma.index, dd_ma, 0, alpha=0.5, color="steelblue", label="MA策略")
|
||
ax4.fill_between(dd_rsi.index, dd_rsi, 0, alpha=0.4, color="purple", label="RSI策略")
|
||
ax4.fill_between(dd_bh.index, dd_bh, 0, alpha=0.3, color="gray", label="Buy & Hold")
|
||
ax4.set_title("回撤曲线 (Drawdowns)", fontsize=11, fontweight="bold")
|
||
ax4.set_ylabel("回撤 Drawdown")
|
||
ax4.yaxis.set_major_formatter(plt.FuncFormatter(lambda x, _: f"{x:.0%}"))
|
||
ax4.legend(fontsize=9)
|
||
ax4.grid(alpha=0.3)
|
||
|
||
# ── Plot 5 (left): Trade P&L distribution 交易盈亏分布
|
||
ax5 = fig.add_subplot(gs[3, 0])
|
||
pnls = [t["pnl"] for t in engine_ma.portfolio.trade_log]
|
||
if pnls:
|
||
colors_bar = ["green" if p > 0 else "red" for p in pnls]
|
||
ax5.bar(range(len(pnls)), pnls, color=colors_bar, alpha=0.75, edgecolor="white")
|
||
ax5.axhline(0, color="black", linewidth=0.8)
|
||
ax5.set_title(f"MA策略 — 每笔交易盈亏 (Per-Trade P&L)\n"
|
||
f"共{len(pnls)}笔, 胜率 "
|
||
f"{sum(1 for p in pnls if p > 0)/len(pnls):.0%}",
|
||
fontsize=10, fontweight="bold")
|
||
ax5.set_xlabel("交易编号 Trade #")
|
||
ax5.set_ylabel("盈亏 P&L (¥)")
|
||
ax5.yaxis.set_major_formatter(plt.FuncFormatter(lambda x, _: f"¥{x/1e4:.1f}万"))
|
||
ax5.grid(alpha=0.3, axis="y")
|
||
|
||
# ── Plot 6 (right): Cash vs Market Value over time
|
||
# ── 图6(右): 现金 vs 持仓市值随时间变化
|
||
ax6 = fig.add_subplot(gs[3, 1])
|
||
eq_df = pd.DataFrame(engine_ma.portfolio.equity_curve).set_index("date")
|
||
ax6.stackplot(eq_df.index,
|
||
eq_df["cash"], eq_df["market_value"],
|
||
labels=["现金 Cash", "持仓市值 Market Value"],
|
||
colors=["#2ca02c", "#ff7f0e"], alpha=0.7)
|
||
ax6.set_title("现金 vs 持仓市值 (Cash vs Position Value)", fontsize=11, fontweight="bold")
|
||
ax6.set_ylabel("金额 Amount (¥)")
|
||
ax6.yaxis.set_major_formatter(plt.FuncFormatter(lambda x, _: f"¥{x/1e4:.0f}万"))
|
||
ax6.legend(loc="upper left", fontsize=9)
|
||
ax6.grid(alpha=0.3)
|
||
|
||
# ── Plot 7 (full width): Rolling metrics — Sharpe & Volatility
|
||
# ── 图7(全宽): 滚动指标 — 夏普比率 & 波动率
|
||
ax7a = fig.add_subplot(gs[4, 0])
|
||
ax7b = fig.add_subplot(gs[4, 1])
|
||
|
||
ret_ma = eq_ma.pct_change().fillna(0)
|
||
roll_sharpe = ret_ma.rolling(60).mean() / ret_ma.rolling(60).std() * np.sqrt(252)
|
||
roll_vol = ret_ma.rolling(60).std() * np.sqrt(252)
|
||
|
||
ax7a.plot(roll_sharpe, color="steelblue", linewidth=1)
|
||
ax7a.axhline(0, color="black", linewidth=0.8, linestyle="--")
|
||
ax7a.axhline(1, color="green", linewidth=0.8, linestyle=":", label="Sharpe=1 (目标线)")
|
||
ax7a.fill_between(roll_sharpe.index, roll_sharpe, 0,
|
||
where=(roll_sharpe > 0), alpha=0.2, color="green")
|
||
ax7a.fill_between(roll_sharpe.index, roll_sharpe, 0,
|
||
where=(roll_sharpe < 0), alpha=0.2, color="red")
|
||
ax7a.set_title("滚动60日夏普比率 (60-day Rolling Sharpe)", fontsize=11, fontweight="bold")
|
||
ax7a.set_ylabel("夏普比率 Sharpe")
|
||
ax7a.legend(fontsize=8)
|
||
ax7a.grid(alpha=0.3)
|
||
|
||
ax7b.plot(roll_vol, color="darkorange", linewidth=1)
|
||
ax7b.fill_between(roll_vol.index, roll_vol, alpha=0.2, color="orange")
|
||
ax7b.set_title("滚动60日年化波动率 (60-day Rolling Volatility)", fontsize=11, fontweight="bold")
|
||
ax7b.set_ylabel("年化波动率 Ann. Volatility")
|
||
ax7b.yaxis.set_major_formatter(plt.FuncFormatter(lambda x, _: f"{x:.0%}"))
|
||
ax7b.grid(alpha=0.3)
|
||
|
||
plt.suptitle(
|
||
"量化交易 — 事件驱动回测引擎 Quantitative Trading: Event-Driven Backtest",
|
||
fontsize=15, fontweight="bold", y=1.003,
|
||
)
|
||
plt.savefig("event_driven_backtest.png", dpi=120, bbox_inches="tight")
|
||
plt.show()
|
||
print("\n[图表] 已保存至 event_driven_backtest.png")
|
||
|
||
|
||
# =============================================================================
|
||
# SECTION 9: Compare Event-Driven vs Vectorized
|
||
# 对比事件驱动 vs 向量化回测
|
||
# =============================================================================
|
||
#
|
||
# Run a quick vectorized version of the SAME MA strategy on the SAME data
|
||
# to show how results can differ due to:
|
||
# 对相同数据运行相同MA策略的快速向量化版本,展示结果差异的原因:
|
||
#
|
||
# 1. Fill timing 成交时机 — event-driven uses next-bar open;
|
||
# vectorized typically uses same-bar close
|
||
# 2. Cost modeling 成本建模 — event-driven has min commission floor;
|
||
# vectorized applies flat percentage only
|
||
# 3. Integer shares 整数股 — event-driven floors to whole shares;
|
||
# vectorized trades fractional shares
|
||
# =============================================================================
|
||
|
||
print(f"\n{'=' * 60}")
|
||
print(" 对比: 事件驱动 vs 向量化回测")
|
||
print(" Comparison: Event-Driven vs Vectorized Backtest")
|
||
print(f"{'=' * 60}")
|
||
print(" 相同策略: 双均线 MA Crossover (20/60)")
|
||
print(" 相同数据: 同一模拟价格序列")
|
||
print()
|
||
|
||
# Vectorized version (快速向量化版本)
|
||
closes = ohlcv["close"]
|
||
sma20v = closes.rolling(20).mean()
|
||
sma60v = closes.rolling(60).mean()
|
||
sig_vec = (sma20v > sma60v).astype(float).shift(1).fillna(0)
|
||
ret_vec = closes.pct_change().fillna(0)
|
||
strat_ret_gross = sig_vec * ret_vec
|
||
# Simple flat cost: 0.03% commission + 0.01% slippage = 0.04% per side
|
||
position_change = sig_vec.diff().abs().fillna(0)
|
||
cost_vec = position_change * (0.0003 + 0.0001)
|
||
strat_ret_net = strat_ret_gross - cost_vec
|
||
eq_vec = 1_000_000 * (1 + strat_ret_net).cumprod()
|
||
|
||
# Compare
|
||
m_ed = engine_ma.metrics()
|
||
total_ed = float(m_ed["总收益率 Total Return"].strip("%")) / 100
|
||
sharpe_ed = float(m_ed["夏普比率 Sharpe Ratio"])
|
||
maxdd_ed = float(m_ed["最大回撤 Max Drawdown"].strip("%")) / 100
|
||
|
||
total_vec = (eq_vec.iloc[-1] / 1_000_000) - 1
|
||
ret_v_s = strat_ret_net
|
||
sharpe_vec = (ret_v_s.mean() / ret_v_s.std()) * np.sqrt(252)
|
||
rolling_mx = eq_vec.cummax()
|
||
maxdd_vec = ((eq_vec - rolling_mx) / rolling_mx).min()
|
||
|
||
print(f" {'指标 Metric':<30} {'事件驱动 Event-Driven':>22} {'向量化 Vectorized':>20}")
|
||
print(f" {'─'*72}")
|
||
print(f" {'总收益率 Total Return':<30} {total_ed:>21.2%} {total_vec:>19.2%}")
|
||
print(f" {'夏普比率 Sharpe Ratio':<30} {sharpe_ed:>21.3f} {sharpe_vec:>19.3f}")
|
||
print(f" {'最大回撤 Max Drawdown':<30} {maxdd_ed:>21.2%} {maxdd_vec:>19.2%}")
|
||
print()
|
||
print(" 差异原因 / Why they differ:")
|
||
print(" ① 事件驱动以「下一日开盘价」成交 (Event-driven fills at next open)")
|
||
print(" 向量化以「当日收盘价」交易 (Vectorized trades at same-day close)")
|
||
print(" ② 事件驱动有最低佣金下限 ¥5 (Event-driven has min commission floor)")
|
||
print(" ③ 事件驱动按整数股买卖 (Event-driven uses integer share quantities)")
|
||
print(" ④ 资金是随时间变化的 (Capital base grows/shrinks with P&L)")
|
||
|
||
|
||
# =============================================================================
|
||
# SECTION 10: What's Next 下一步
|
||
# =============================================================================
|
||
print(f"""
|
||
{'=' * 72}
|
||
总结 Summary
|
||
{'=' * 72}
|
||
|
||
事件驱动回测引擎的核心组件 / Core Components of Event-Driven Engine:
|
||
|
||
┌─────────────────────┬────────────────────────────────────────────┐
|
||
│ 组件 Component │ 职责 Responsibility │
|
||
├─────────────────────┼────────────────────────────────────────────┤
|
||
│ Event (事件) │ 解耦组件间通信的消息对象 │
|
||
│ │ Message objects that decouple components │
|
||
├─────────────────────┼────────────────────────────────────────────┤
|
||
│ DataHandler │ 逐根K线流出历史数据(模拟无未来信息) │
|
||
│ 数据处理器 │ Streams bars one-by-one (no future info) │
|
||
├─────────────────────┼────────────────────────────────────────────┤
|
||
│ Strategy (策略) │ 观察市场 → 生成交易信号 │
|
||
│ │ Observes market → generates trade signals │
|
||
├─────────────────────┼────────────────────────────────────────────┤
|
||
│ Portfolio (组合) │ 仓位管理 + 现金/持仓/盈亏追踪 │
|
||
│ │ Position sizing + cash/position/P&L track │
|
||
├─────────────────────┼────────────────────────────────────────────┤
|
||
│ SimulatedBroker │ 模拟券商:滑点+佣金+成交时机 │
|
||
│ 模拟券商 │ Simulates broker: slippage+commission+fill │
|
||
├─────────────────────┼────────────────────────────────────────────┤
|
||
│ BacktestEngine │ 主事件循环,驱动所有组件 │
|
||
│ 回测引擎 │ Main event loop, orchestrates all parts │
|
||
└─────────────────────┴────────────────────────────────────────────┘
|
||
|
||
下一步学习方向 Next Steps:
|
||
──────────────────────────────────────────────────────────────────
|
||
• 限价单/止损单 Limit & Stop orders in ExecutionHandler
|
||
• 多标的组合 Multi-asset portfolio (stocks + bonds, sector rotation)
|
||
• 因子选股 Alpha factor models (Fama-French, Momentum, Quality)
|
||
• 机器学习信号 ML-based signals (XGBoost, LSTM)
|
||
• 风险管理 Risk management (VaR, CVaR, Kelly position sizing)
|
||
• 实盘对接 Live trading connection (vnpy, Alpaca API)
|
||
{'=' * 72}
|
||
""")
|