466 lines
20 KiB
Python
466 lines
20 KiB
Python
# =============================================================================
|
||
# Real Data Acquisition Demo — AKShare
|
||
# 真实数据获取演示 — AKShare 版
|
||
# =============================================================================
|
||
#
|
||
# AKShare 是一个完全免费的开源金融数据接口,无需注册、无需 Token。
|
||
# 覆盖 A 股/港股/美股/期货/外汇/宏观经济等数据,是目前个人投资者
|
||
# 最实用的免费数据源。
|
||
#
|
||
# Prerequisites / 安装:
|
||
# pip install akshare pandas numpy matplotlib
|
||
#
|
||
# Topics covered / 涵盖主题:
|
||
# §1 AKShare 简介与基本用法
|
||
# §2 获取 A 股行业 ETF 日线数据(轮动策略核心输入)
|
||
# §3 获取宽基指数数据(沪深300 / 中证500)
|
||
# §4 获取北向资金数据(情绪因子数据源)
|
||
# §5 获取融资融券数据(杠杆情绪数据源)
|
||
# §6 真实数据的清洗陷阱(与合成数据的关键差异)
|
||
# §7 数据存储:构建本地研究数据库
|
||
# §8 数据质量报告生成
|
||
# =============================================================================
|
||
|
||
from __future__ import annotations
|
||
|
||
import warnings
|
||
warnings.filterwarnings("ignore")
|
||
|
||
import numpy as np
|
||
import pandas as pd
|
||
from datetime import datetime, timedelta
|
||
import time
|
||
import os
|
||
|
||
# ── 绕过系统代理,直连东方财富 ──
|
||
# macOS 上的 Clash/小火箭等代理工具的出口 IP 容易被东方财富反爬封禁。
|
||
# 设置环境变量让 requests 库对 eastmoney.com 不走代理。
|
||
os.environ["NO_PROXY"] = os.environ.get("NO_PROXY", "") + ",eastmoney.com,eastmoneyfutures.com,10jqka.com.cn"
|
||
os.environ["no_proxy"] = os.environ["NO_PROXY"]
|
||
|
||
# ── 检查 akshare 是否安装 ──
|
||
try:
|
||
import akshare as ak
|
||
print(f"✓ AKShare 版本: {ak.__version__}")
|
||
except ImportError:
|
||
print("❌ AKShare 未安装。请在 trading conda 环境中执行:")
|
||
print(" conda activate trading")
|
||
print(" pip install akshare")
|
||
raise
|
||
|
||
print("=" * 68)
|
||
print(" 真实数据获取演示 — AKShare 版")
|
||
print(" Real Data Acquisition Demo with AKShare")
|
||
print("=" * 68)
|
||
|
||
|
||
# ══════════════════════════════════════════════════════════════════════
|
||
# §1 AKShare 简介与基本用法
|
||
# ══════════════════════════════════════════════════════════════════════
|
||
#
|
||
# AKShare 特点:
|
||
# • 完全免费,无需注册
|
||
# • 数据来源于东方财富、新浪财经、交易所等公开接口
|
||
# • 函数名遵循 fund_xxx / stock_xxx / macro_xxx 等命名规范
|
||
# • 每次调用触发一次 HTTP 请求,批量获取需注意频率
|
||
#
|
||
# 基本用法:
|
||
# df = ak.function_name(param1=value1, param2=value2)
|
||
# → 返回 pandas DataFrame
|
||
#
|
||
# 文档: https://akshare.akfamily.xyz/
|
||
|
||
print("\n[§1] AKShare 简介")
|
||
print(f" 数据接口数量: 数千个 (覆盖股票/基金/期货/外汇/宏观/另类)")
|
||
print(f" 数据来源: 东方财富、新浪财经、交易所公开接口等")
|
||
print(f" 费用: 完全免费")
|
||
|
||
|
||
# ══════════════════════════════════════════════════════════════════════
|
||
# §2 获取 A 股行业 ETF 日线数据
|
||
# ══════════════════════════════════════════════════════════════════════
|
||
#
|
||
# 这是 demo_07 (ETF 轮动策略) 的真实数据替代方案。
|
||
# 下面获取的 ETF 代码与 demo_07 的 ETF_UNIVERSE 一一对应。
|
||
#
|
||
# AKShare 接口: fund_etf_hist_em()
|
||
# symbol — ETF 代码 (如 "159995")
|
||
# period — "daily" / "weekly" / "monthly"
|
||
# start_date / end_date — "YYYYMMDD" 格式
|
||
# adjust — "" (不复权) / "qfq" (前复权) / "hfq" (后复权)
|
||
# ETF 通常用 "" 或 "qfq"
|
||
# =============================================================================
|
||
|
||
print("\n[§2] 获取行业 ETF 日线数据")
|
||
|
||
# A 股行业 ETF 池 — 与 demo_07 对应
|
||
ETF_UNIVERSE = {
|
||
"159995": "芯片ETF",
|
||
"159819": "人工智能ETF",
|
||
"516160": "新能源ETF",
|
||
"159928": "消费ETF",
|
||
"512170": "医疗ETF",
|
||
"512880": "证券ETF",
|
||
# 宽基 ETF (新增)
|
||
"510300": "沪深300ETF",
|
||
"510500": "中证500ETF",
|
||
# 国债 ETF (避险资产)
|
||
"511010": "国债ETF",
|
||
}
|
||
|
||
START_DATE = "20190101"
|
||
END_DATE = "20250601"
|
||
|
||
# ── 安全获取单只 ETF 数据 (带重试和限速) ──
|
||
def fetch_etf_safe(code: str, start: str, end: str, max_retries: int = 3):
|
||
"""
|
||
安全获取 ETF 日线数据,失败时自动重试并控制请求频率。
|
||
网络抖动或东方财富接口偶发不稳定时非常必要。
|
||
"""
|
||
for attempt in range(max_retries):
|
||
try:
|
||
df = ak.fund_etf_hist_em(
|
||
symbol=code,
|
||
period="daily",
|
||
start_date=start,
|
||
end_date=end,
|
||
adjust="qfq", # 前复权
|
||
)
|
||
return df
|
||
except Exception as e:
|
||
err_msg = str(e)
|
||
if "ProxyError" in err_msg or "RemoteDisconnected" in err_msg:
|
||
wait = (attempt + 1) * 5 # 代理/连接问题,等更久
|
||
print(f"\n ⚠ {code} 连接被拒绝 (代理/反爬),{wait}s 后重试...")
|
||
else:
|
||
wait = 2.0
|
||
print(f"\n ⚠ {code} 第 {attempt+1} 次获取失败: {e}")
|
||
if attempt < max_retries - 1:
|
||
time.sleep(wait)
|
||
print(f"\n ✗ {code} 全部 {max_retries} 次获取失败,跳过")
|
||
return None
|
||
|
||
|
||
etf_data = {}
|
||
for i, (code, name) in enumerate(ETF_UNIVERSE.items()):
|
||
if i > 0:
|
||
# 请求间隔 + 随机抖动,避免触发东方财富反爬
|
||
time.sleep(2.0 + np.random.uniform(0, 1.5))
|
||
print(f" 获取 {code} ({name})...", end=" ")
|
||
df = fetch_etf_safe(code, START_DATE, END_DATE)
|
||
if df is not None:
|
||
etf_data[code] = df
|
||
print(f"{len(df)} 行, {df['日期'].iloc[0]} ~ {df['日期'].iloc[-1]}")
|
||
else:
|
||
print("失败 ✗")
|
||
|
||
print(f"\n 成功获取: {len(etf_data)} / {len(ETF_UNIVERSE)} 只 ETF")
|
||
|
||
# ── 构建价格面板 (Multi-Asset Price Panel) ──
|
||
# 这是最关键的步骤:将多只 ETF 的收盘价对齐到统一的日期轴上。
|
||
# 真实数据中,不同 ETF 可能有不同的上市日期和停牌日期,
|
||
# 对齐方式的选择直接影响后续所有分析。
|
||
|
||
if etf_data:
|
||
price_panel = pd.DataFrame()
|
||
for code, df in etf_data.items():
|
||
# 设置日期索引,提取收盘价
|
||
df_indexed = df.set_index("日期")["收盘"].copy()
|
||
df_indexed.name = code
|
||
price_panel = pd.concat([price_panel, df_indexed], axis=1)
|
||
|
||
# 确保日期索引排序
|
||
price_panel.index = pd.to_datetime(price_panel.index)
|
||
price_panel = price_panel.sort_index()
|
||
|
||
print(f"\n 价格面板: {price_panel.shape[0]} 个交易日 × {price_panel.shape[1]} 只 ETF")
|
||
print(f" 日期范围: {price_panel.index[0].date()} ~ {price_panel.index[-1].date()}")
|
||
print(f" 各 ETF 起止日期:")
|
||
for col in price_panel.columns:
|
||
valid = price_panel[col].dropna()
|
||
if len(valid) > 0:
|
||
print(f" {col} ({ETF_UNIVERSE.get(col, '?')}): {valid.index[0].date()} ~ {valid.index[-1].date()}, {len(valid)} 天")
|
||
|
||
|
||
# ══════════════════════════════════════════════════════════════════════
|
||
# §3 获取宽基指数数据
|
||
# ══════════════════════════════════════════════════════════════════════
|
||
#
|
||
# AKShare 接口: stock_zh_index_daily()
|
||
# symbol — 指数代码:
|
||
# "sh000300" → 沪深300
|
||
# "sh000905" → 中证500
|
||
# "sz399006" → 创业板指
|
||
# "sh000016" → 上证50
|
||
# =============================================================================
|
||
|
||
print("\n[§3] 获取宽基指数数据")
|
||
|
||
INDEX_CODES = {
|
||
"sh000300": "沪深300",
|
||
"sh000905": "中证500",
|
||
"sz399006": "创业板指",
|
||
"sh000016": "上证50",
|
||
}
|
||
|
||
index_data = {}
|
||
for symbol, name in INDEX_CODES.items():
|
||
print(f" 获取 {symbol} ({name})...", end=" ")
|
||
try:
|
||
df = ak.stock_zh_index_daily(symbol=symbol)
|
||
index_data[symbol] = df
|
||
print(f"{len(df)} 行, {df['date'].iloc[0]} ~ {df['date'].iloc[-1]}")
|
||
time.sleep(0.3)
|
||
except Exception as e:
|
||
print(f"失败: {e}")
|
||
|
||
|
||
# ── 计算指数滚动估值参考 (PE/PB 分位数需另接接口,此处展示收益基座) ──
|
||
if index_data:
|
||
# 对齐为面板
|
||
index_panel = pd.DataFrame()
|
||
for sym, df in index_data.items():
|
||
s = df.set_index("date")["close"].copy()
|
||
s.name = sym
|
||
index_panel = pd.concat([index_panel, s], axis=1)
|
||
index_panel.index = pd.to_datetime(index_panel.index)
|
||
index_panel = index_panel.sort_index()
|
||
|
||
# 计算滚动年化收益 (252 日)
|
||
rolling_ret = index_panel.pct_change(252).dropna()
|
||
if len(rolling_ret) > 0:
|
||
print(f"\n 最新滚动年化收益 ({rolling_ret.index[-1].date()}):")
|
||
for col in rolling_ret.columns:
|
||
name = INDEX_CODES.get(col, col)
|
||
val = rolling_ret[col].iloc[-1]
|
||
print(f" {name}: {val:+.2%}")
|
||
|
||
|
||
# ══════════════════════════════════════════════════════════════════════
|
||
# §4 获取北向资金数据(关键情绪因子数据源)
|
||
# ══════════════════════════════════════════════════════════════════════
|
||
#
|
||
# 北向资金 (North-bound Capital Flow) 指通过沪深港通从香港流入 A 股
|
||
# 的外资。净买入额被视为外资对 A 股情绪的"投票机"。
|
||
#
|
||
# AKShare 接口: stock_hsgt_hist_em() → 沪深港通历史资金流向
|
||
# stock_hsgt_north_net_flow_in_em() → 北向净流入
|
||
# =============================================================================
|
||
|
||
print("\n[§4] 获取北向资金数据")
|
||
|
||
try:
|
||
# 北向资金: stock_hsgt_hist_em 获取沪股通/深股通历史资金流向
|
||
for market, label in [("沪股通", "沪股通(北向)"), ("深股通", "深股通(北向)")]:
|
||
try:
|
||
hsgt = ak.stock_hsgt_hist_em(symbol=market)
|
||
if hsgt is not None and len(hsgt) > 0:
|
||
print(f" {label}: {hsgt.shape}")
|
||
print(f" 列名: {list(hsgt.columns)}")
|
||
print(f" 最新 3 行:\n{hsgt.tail(3)}")
|
||
time.sleep(1.5) # 避免触发反爬
|
||
except Exception as e:
|
||
print(f" {label} 获取失败: {e}")
|
||
|
||
except Exception as e:
|
||
print(f" 北向资金获取失败: {e}")
|
||
print(f" 提示: AKShare 接口可能已更新,请查阅最新文档")
|
||
|
||
|
||
# ══════════════════════════════════════════════════════════════════════
|
||
# §5 获取融资融券数据(杠杆情绪数据源)
|
||
# ══════════════════════════════════════════════════════════════════════
|
||
#
|
||
# 融资余额 (Margin Balance) 反映散户借钱买股的意愿。
|
||
# 融资余额大幅上升 → 散户看多情绪高涨(注意:极端值往往是反向指标)
|
||
# 融资余额骤降 → 恐慌去杠杆,市场底部特征之一
|
||
#
|
||
# AKShare 接口: stock_margin_sz() / stock_margin_sh() → 深圳/上海融资融券明细
|
||
# stock_margin_detail_sse() → 上交所融资融券汇总
|
||
# =============================================================================
|
||
|
||
print("\n[§5] 获取融资融券数据")
|
||
|
||
try:
|
||
# 上交所融资融券汇总 (日频)
|
||
# stock_margin_sse: 历史序列 (start_date/end_date)
|
||
# stock_margin_detail_sse: 单日快照 (date='YYYYMMDD')
|
||
margin_sse = ak.stock_margin_sse(start_date="20240101", end_date="20250601")
|
||
if margin_sse is not None and len(margin_sse) > 0:
|
||
print(f" 上交所融资融券: {margin_sse.shape}")
|
||
print(f" 列名: {list(margin_sse.columns)}")
|
||
print(f" 最新数据:\n{margin_sse.tail(3)}")
|
||
else:
|
||
print(" 未获取到融资融券数据")
|
||
|
||
except Exception as e:
|
||
print(f" 融资融券获取失败: {e}")
|
||
print(f" 说明: 此接口可能需要特定参数格式,请查阅文档")
|
||
print(f" 备选: 可直接使用东方财富网页爬取")
|
||
|
||
|
||
# ══════════════════════════════════════════════════════════════════════
|
||
# §6 真实数据的清洗陷阱
|
||
# ══════════════════════════════════════════════════════════════════════
|
||
#
|
||
# 合成数据 (Demo 系列的 GBM/Factor Model) 没有这些问题:
|
||
# ① 不同标的上市日期不同 → 面板前段大量 NaN
|
||
# ② 停牌 (suspension) → 面板中间出现 NaN 缺口
|
||
# ③ 涨跌停日 → 价格"真实"但无法成交
|
||
# ④ ETF 分红/拆分 → 不复权价格存在跳跃
|
||
# ⑤ 数据源异常 → 极少数日期的 OHLC 数据错乱
|
||
# =============================================================================
|
||
|
||
print("\n[§6] 真实数据清洗演示")
|
||
|
||
if etf_data:
|
||
# ── 6-A: 缺失值分析 ──
|
||
print(f"\n 6-A 缺失值分析:")
|
||
missing_pct = price_panel.isna().mean().sort_values(ascending=False)
|
||
for code in missing_pct.index[:5]:
|
||
pct = missing_pct[code]
|
||
name = ETF_UNIVERSE.get(code, code)
|
||
print(f" {code} {name}: {pct:.1%} 缺失")
|
||
|
||
# ── 6-B: 前向填充 (仅 ffill, 禁止 bfill!) ──
|
||
# bfill 会用"未来还没发生的数据"填补今天的空值 → Look-Ahead Bias
|
||
print(f"\n 6-B 缺失值处理 (ffill only):")
|
||
missing_before = price_panel.isna().sum().sum()
|
||
clean_prices = price_panel.ffill() # 步骤1: 前向填充
|
||
# 步骤2: 丢弃早期所有标的都 NaN 的行(通常是面板最前面)
|
||
clean_prices = clean_prices.dropna(how='any')
|
||
missing_after = clean_prices.isna().sum().sum()
|
||
print(f" 填充前 NaN: {missing_before} → 填充后 NaN: {missing_after}")
|
||
print(f" 丢弃了 {len(price_panel) - len(clean_prices)} 个早期交易日")
|
||
|
||
# ── 6-C: 收益率计算与异常值标记 ──
|
||
print(f"\n 6-C 收益率与涨跌停标记:")
|
||
returns = clean_prices.pct_change()
|
||
|
||
# 涨跌停检测: A 股 ETF 理论涨跌停 ±10%
|
||
LIMIT_PCT = 0.10
|
||
limit_up = (returns >= LIMIT_PCT - 0.002).sum()
|
||
limit_down = (returns <= -LIMIT_PCT + 0.002).sum()
|
||
for code in returns.columns[:5]:
|
||
up = limit_up[code]
|
||
dn = limit_down[code]
|
||
if up > 0 or dn > 0:
|
||
name = ETF_UNIVERSE.get(code, code)
|
||
print(f" {code} {name}: 涨停 {up} 天, 跌停 {dn} 天")
|
||
|
||
# ── 6-D: 数据连续性检查 ──
|
||
print(f"\n 6-D 数据连续性检查:")
|
||
date_gaps = 0
|
||
for col in clean_prices.columns:
|
||
ser = clean_prices[col].dropna()
|
||
if len(ser) < 2:
|
||
continue
|
||
# 检查自然日连续性 → 非交易日(周末/假日)是正常的
|
||
trading_gaps = ser.index.to_series().diff().dt.days
|
||
long_gaps = trading_gaps[trading_gaps > 7] # 连续停牌 > 7 天
|
||
if len(long_gaps) > 0:
|
||
name = ETF_UNIVERSE.get(col, col)
|
||
print(f" {col} {name}: {len(long_gaps)} 次连续停牌 > 7 天")
|
||
date_gaps += len(long_gaps)
|
||
if date_gaps == 0:
|
||
print(f" 所有 ETF 无连续停牌 > 7 天 ✓")
|
||
|
||
|
||
# ══════════════════════════════════════════════════════════════════════
|
||
# §7 数据存储:构建本地研究数据库
|
||
# ══════════════════════════════════════════════════════════════════════
|
||
#
|
||
# 存储原则是"存原始数据 + 存清洗后数据",不要只存一个。
|
||
# 原因: 你以后可能想换一种清洗方式,而原始数据丢了就回不去了。
|
||
# =============================================================================
|
||
|
||
print("\n[§7] 数据存储")
|
||
|
||
DATA_DIR = os.path.join(os.path.dirname(__file__), "data")
|
||
os.makedirs(DATA_DIR, exist_ok=True)
|
||
|
||
if etf_data:
|
||
# 保存清洗后的价格面板 (研究级)
|
||
clean_path = os.path.join(DATA_DIR, "etf_price_panel_clean.csv")
|
||
clean_prices.to_csv(clean_path)
|
||
print(f" ✓ 清洗后价格面板 → {clean_path}")
|
||
|
||
# 保存收益率面板
|
||
returns_path = os.path.join(DATA_DIR, "etf_returns_panel.csv")
|
||
returns.to_csv(returns_path)
|
||
print(f" ✓ 收益率面板 → {returns_path}")
|
||
|
||
if index_data and len(index_panel) > 0:
|
||
index_path = os.path.join(DATA_DIR, "index_prices.csv")
|
||
index_panel.to_csv(index_path)
|
||
print(f" ✓ 指数价格 → {index_path}")
|
||
|
||
print(f"\n 数据目录: {DATA_DIR}")
|
||
print(f" 文件列表: {os.listdir(DATA_DIR) if os.path.exists(DATA_DIR) else '无'}")
|
||
|
||
|
||
# ══════════════════════════════════════════════════════════════════════
|
||
# §8 数据质量报告
|
||
# ══════════════════════════════════════════════════════════════════════
|
||
#
|
||
# 每次拉取数据后,生成一份质量报告,记录:
|
||
# - 各标的覆盖日期范围
|
||
# - 缺失值比例
|
||
# - 极端收益率事件
|
||
# - 数据拉取时间戳
|
||
# 这对后期回溯调试非常有用——"我用的数据是哪天拉的?有什么已知缺陷?"
|
||
# =============================================================================
|
||
|
||
print("\n[§8] 数据质量报告")
|
||
|
||
if etf_data:
|
||
report_lines = []
|
||
report_lines.append("=" * 60)
|
||
report_lines.append("数据质量报告 / Data Quality Report")
|
||
report_lines.append(f"生成时间: {datetime.now().isoformat()}")
|
||
report_lines.append(f"数据源: AKShare (fund_etf_hist_em)")
|
||
report_lines.append("=" * 60)
|
||
|
||
for code in clean_prices.columns:
|
||
name = ETF_UNIVERSE.get(code, code)
|
||
ser = clean_prices[code].dropna()
|
||
ret_ser = returns[code].dropna()
|
||
n = len(ser)
|
||
missing_input = price_panel[code].isna().sum()
|
||
|
||
report_lines.append(
|
||
f"\n{code} ({name}):"
|
||
f"\n 日期范围: {ser.index[0].date()} ~ {ser.index[-1].date()} ({n} 天)"
|
||
f"\n 原始缺失: {missing_input} 天 ({missing_input/len(price_panel):.1%})"
|
||
f"\n 日均收益率: {ret_ser.mean():+.6f}"
|
||
f"\n 日波动率: {ret_ser.std():.6f}"
|
||
f"\n 最大日涨幅: {ret_ser.max():+.2%}"
|
||
f"\n 最大日跌幅: {ret_ser.min():+.2%}"
|
||
)
|
||
|
||
# 打印报告
|
||
report_text = "\n".join(report_lines)
|
||
print(report_text)
|
||
|
||
# 保存报告
|
||
report_path = os.path.join(DATA_DIR, "data_quality_report.txt")
|
||
with open(report_path, "w", encoding="utf-8") as f:
|
||
f.write(report_text)
|
||
print(f"\n ✓ 质量报告已保存 → {report_path}")
|
||
|
||
|
||
print("\n" + "=" * 68)
|
||
print(" ✓ AKShare 数据获取 Demo 完成")
|
||
print("=" * 68)
|
||
print(f"""
|
||
关键收获:
|
||
1. AKShare 完全免费,覆盖 ETF / 指数 / 资金流向 / 融资融券
|
||
2. 真实数据需要处理: 上市日期不一致、停牌缺口、涨跌停标记
|
||
3. 填充缺失值只用 ffill() —— bfill() 会引入 Look-Ahead Bias
|
||
4. 数据存储时同时保留"原始"和"清洗后"两个版本
|
||
5. 每次拉取数据都应生成质量报告,记录已知缺陷
|
||
|
||
下一步: 将清洗后的 ETF 价格面板替换 demo_07 的合成数据,
|
||
重新运行 ETF 轮动策略回测,观察真实数据下的策略表现。
|
||
""")
|