trading/demo_akshare_data.py

466 lines
20 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# =============================================================================
# Real Data Acquisition Demo — AKShare
# 真实数据获取演示 — AKShare 版
# =============================================================================
#
# AKShare 是一个完全免费的开源金融数据接口,无需注册、无需 Token。
# 覆盖 A 股/港股/美股/期货/外汇/宏观经济等数据,是目前个人投资者
# 最实用的免费数据源。
#
# Prerequisites / 安装:
# pip install akshare pandas numpy matplotlib
#
# Topics covered / 涵盖主题:
# §1 AKShare 简介与基本用法
# §2 获取 A 股行业 ETF 日线数据(轮动策略核心输入)
# §3 获取宽基指数数据沪深300 / 中证500
# §4 获取北向资金数据(情绪因子数据源)
# §5 获取融资融券数据(杠杆情绪数据源)
# §6 真实数据的清洗陷阱(与合成数据的关键差异)
# §7 数据存储:构建本地研究数据库
# §8 数据质量报告生成
# =============================================================================
from __future__ import annotations
import warnings
warnings.filterwarnings("ignore")
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
import time
import os
# ── 绕过系统代理,直连东方财富 ──
# macOS 上的 Clash/小火箭等代理工具的出口 IP 容易被东方财富反爬封禁。
# 设置环境变量让 requests 库对 eastmoney.com 不走代理。
os.environ["NO_PROXY"] = os.environ.get("NO_PROXY", "") + ",eastmoney.com,eastmoneyfutures.com,10jqka.com.cn"
os.environ["no_proxy"] = os.environ["NO_PROXY"]
# ── 检查 akshare 是否安装 ──
try:
import akshare as ak
print(f"✓ AKShare 版本: {ak.__version__}")
except ImportError:
print("❌ AKShare 未安装。请在 trading conda 环境中执行:")
print(" conda activate trading")
print(" pip install akshare")
raise
print("=" * 68)
print(" 真实数据获取演示 — AKShare 版")
print(" Real Data Acquisition Demo with AKShare")
print("=" * 68)
# ══════════════════════════════════════════════════════════════════════
# §1 AKShare 简介与基本用法
# ══════════════════════════════════════════════════════════════════════
#
# AKShare 特点:
# • 完全免费,无需注册
# • 数据来源于东方财富、新浪财经、交易所等公开接口
# • 函数名遵循 fund_xxx / stock_xxx / macro_xxx 等命名规范
# • 每次调用触发一次 HTTP 请求,批量获取需注意频率
#
# 基本用法:
# df = ak.function_name(param1=value1, param2=value2)
# → 返回 pandas DataFrame
#
# 文档: https://akshare.akfamily.xyz/
print("\n[§1] AKShare 简介")
print(f" 数据接口数量: 数千个 (覆盖股票/基金/期货/外汇/宏观/另类)")
print(f" 数据来源: 东方财富、新浪财经、交易所公开接口等")
print(f" 费用: 完全免费")
# ══════════════════════════════════════════════════════════════════════
# §2 获取 A 股行业 ETF 日线数据
# ══════════════════════════════════════════════════════════════════════
#
# 这是 demo_07 (ETF 轮动策略) 的真实数据替代方案。
# 下面获取的 ETF 代码与 demo_07 的 ETF_UNIVERSE 一一对应。
#
# AKShare 接口: fund_etf_hist_em()
# symbol — ETF 代码 (如 "159995")
# period — "daily" / "weekly" / "monthly"
# start_date / end_date — "YYYYMMDD" 格式
# adjust — "" (不复权) / "qfq" (前复权) / "hfq" (后复权)
# ETF 通常用 "" 或 "qfq"
# =============================================================================
print("\n[§2] 获取行业 ETF 日线数据")
# A 股行业 ETF 池 — 与 demo_07 对应
ETF_UNIVERSE = {
"159995": "芯片ETF",
"159819": "人工智能ETF",
"516160": "新能源ETF",
"159928": "消费ETF",
"512170": "医疗ETF",
"512880": "证券ETF",
# 宽基 ETF (新增)
"510300": "沪深300ETF",
"510500": "中证500ETF",
# 国债 ETF (避险资产)
"511010": "国债ETF",
}
START_DATE = "20190101"
END_DATE = "20250601"
# ── 安全获取单只 ETF 数据 (带重试和限速) ──
def fetch_etf_safe(code: str, start: str, end: str, max_retries: int = 3):
"""
安全获取 ETF 日线数据,失败时自动重试并控制请求频率。
网络抖动或东方财富接口偶发不稳定时非常必要。
"""
for attempt in range(max_retries):
try:
df = ak.fund_etf_hist_em(
symbol=code,
period="daily",
start_date=start,
end_date=end,
adjust="qfq", # 前复权
)
return df
except Exception as e:
err_msg = str(e)
if "ProxyError" in err_msg or "RemoteDisconnected" in err_msg:
wait = (attempt + 1) * 5 # 代理/连接问题,等更久
print(f"\n{code} 连接被拒绝 (代理/反爬){wait}s 后重试...")
else:
wait = 2.0
print(f"\n{code}{attempt+1} 次获取失败: {e}")
if attempt < max_retries - 1:
time.sleep(wait)
print(f"\n{code} 全部 {max_retries} 次获取失败,跳过")
return None
etf_data = {}
for i, (code, name) in enumerate(ETF_UNIVERSE.items()):
if i > 0:
# 请求间隔 + 随机抖动,避免触发东方财富反爬
time.sleep(2.0 + np.random.uniform(0, 1.5))
print(f" 获取 {code} ({name})...", end=" ")
df = fetch_etf_safe(code, START_DATE, END_DATE)
if df is not None:
etf_data[code] = df
print(f"{len(df)} 行, {df['日期'].iloc[0]} ~ {df['日期'].iloc[-1]}")
else:
print("失败 ✗")
print(f"\n 成功获取: {len(etf_data)} / {len(ETF_UNIVERSE)} 只 ETF")
# ── 构建价格面板 (Multi-Asset Price Panel) ──
# 这是最关键的步骤:将多只 ETF 的收盘价对齐到统一的日期轴上。
# 真实数据中,不同 ETF 可能有不同的上市日期和停牌日期,
# 对齐方式的选择直接影响后续所有分析。
if etf_data:
price_panel = pd.DataFrame()
for code, df in etf_data.items():
# 设置日期索引,提取收盘价
df_indexed = df.set_index("日期")["收盘"].copy()
df_indexed.name = code
price_panel = pd.concat([price_panel, df_indexed], axis=1)
# 确保日期索引排序
price_panel.index = pd.to_datetime(price_panel.index)
price_panel = price_panel.sort_index()
print(f"\n 价格面板: {price_panel.shape[0]} 个交易日 × {price_panel.shape[1]} 只 ETF")
print(f" 日期范围: {price_panel.index[0].date()} ~ {price_panel.index[-1].date()}")
print(f" 各 ETF 起止日期:")
for col in price_panel.columns:
valid = price_panel[col].dropna()
if len(valid) > 0:
print(f" {col} ({ETF_UNIVERSE.get(col, '?')}): {valid.index[0].date()} ~ {valid.index[-1].date()}, {len(valid)}")
# ══════════════════════════════════════════════════════════════════════
# §3 获取宽基指数数据
# ══════════════════════════════════════════════════════════════════════
#
# AKShare 接口: stock_zh_index_daily()
# symbol — 指数代码:
# "sh000300" → 沪深300
# "sh000905" → 中证500
# "sz399006" → 创业板指
# "sh000016" → 上证50
# =============================================================================
print("\n[§3] 获取宽基指数数据")
INDEX_CODES = {
"sh000300": "沪深300",
"sh000905": "中证500",
"sz399006": "创业板指",
"sh000016": "上证50",
}
index_data = {}
for symbol, name in INDEX_CODES.items():
print(f" 获取 {symbol} ({name})...", end=" ")
try:
df = ak.stock_zh_index_daily(symbol=symbol)
index_data[symbol] = df
print(f"{len(df)} 行, {df['date'].iloc[0]} ~ {df['date'].iloc[-1]}")
time.sleep(0.3)
except Exception as e:
print(f"失败: {e}")
# ── 计算指数滚动估值参考 (PE/PB 分位数需另接接口,此处展示收益基座) ──
if index_data:
# 对齐为面板
index_panel = pd.DataFrame()
for sym, df in index_data.items():
s = df.set_index("date")["close"].copy()
s.name = sym
index_panel = pd.concat([index_panel, s], axis=1)
index_panel.index = pd.to_datetime(index_panel.index)
index_panel = index_panel.sort_index()
# 计算滚动年化收益 (252 日)
rolling_ret = index_panel.pct_change(252).dropna()
if len(rolling_ret) > 0:
print(f"\n 最新滚动年化收益 ({rolling_ret.index[-1].date()}):")
for col in rolling_ret.columns:
name = INDEX_CODES.get(col, col)
val = rolling_ret[col].iloc[-1]
print(f" {name}: {val:+.2%}")
# ══════════════════════════════════════════════════════════════════════
# §4 获取北向资金数据(关键情绪因子数据源)
# ══════════════════════════════════════════════════════════════════════
#
# 北向资金 (North-bound Capital Flow) 指通过沪深港通从香港流入 A 股
# 的外资。净买入额被视为外资对 A 股情绪的"投票机"。
#
# AKShare 接口: stock_hsgt_hist_em() → 沪深港通历史资金流向
# stock_hsgt_north_net_flow_in_em() → 北向净流入
# =============================================================================
print("\n[§4] 获取北向资金数据")
try:
# 北向资金: stock_hsgt_hist_em 获取沪股通/深股通历史资金流向
for market, label in [("沪股通", "沪股通(北向)"), ("深股通", "深股通(北向)")]:
try:
hsgt = ak.stock_hsgt_hist_em(symbol=market)
if hsgt is not None and len(hsgt) > 0:
print(f" {label}: {hsgt.shape}")
print(f" 列名: {list(hsgt.columns)}")
print(f" 最新 3 行:\n{hsgt.tail(3)}")
time.sleep(1.5) # 避免触发反爬
except Exception as e:
print(f" {label} 获取失败: {e}")
except Exception as e:
print(f" 北向资金获取失败: {e}")
print(f" 提示: AKShare 接口可能已更新,请查阅最新文档")
# ══════════════════════════════════════════════════════════════════════
# §5 获取融资融券数据(杠杆情绪数据源)
# ══════════════════════════════════════════════════════════════════════
#
# 融资余额 (Margin Balance) 反映散户借钱买股的意愿。
# 融资余额大幅上升 → 散户看多情绪高涨(注意:极端值往往是反向指标)
# 融资余额骤降 → 恐慌去杠杆,市场底部特征之一
#
# AKShare 接口: stock_margin_sz() / stock_margin_sh() → 深圳/上海融资融券明细
# stock_margin_detail_sse() → 上交所融资融券汇总
# =============================================================================
print("\n[§5] 获取融资融券数据")
try:
# 上交所融资融券汇总 (日频)
# stock_margin_sse: 历史序列 (start_date/end_date)
# stock_margin_detail_sse: 单日快照 (date='YYYYMMDD')
margin_sse = ak.stock_margin_sse(start_date="20240101", end_date="20250601")
if margin_sse is not None and len(margin_sse) > 0:
print(f" 上交所融资融券: {margin_sse.shape}")
print(f" 列名: {list(margin_sse.columns)}")
print(f" 最新数据:\n{margin_sse.tail(3)}")
else:
print(" 未获取到融资融券数据")
except Exception as e:
print(f" 融资融券获取失败: {e}")
print(f" 说明: 此接口可能需要特定参数格式,请查阅文档")
print(f" 备选: 可直接使用东方财富网页爬取")
# ══════════════════════════════════════════════════════════════════════
# §6 真实数据的清洗陷阱
# ══════════════════════════════════════════════════════════════════════
#
# 合成数据 (Demo 系列的 GBM/Factor Model) 没有这些问题:
# ① 不同标的上市日期不同 → 面板前段大量 NaN
# ② 停牌 (suspension) → 面板中间出现 NaN 缺口
# ③ 涨跌停日 → 价格"真实"但无法成交
# ④ ETF 分红/拆分 → 不复权价格存在跳跃
# ⑤ 数据源异常 → 极少数日期的 OHLC 数据错乱
# =============================================================================
print("\n[§6] 真实数据清洗演示")
if etf_data:
# ── 6-A: 缺失值分析 ──
print(f"\n 6-A 缺失值分析:")
missing_pct = price_panel.isna().mean().sort_values(ascending=False)
for code in missing_pct.index[:5]:
pct = missing_pct[code]
name = ETF_UNIVERSE.get(code, code)
print(f" {code} {name}: {pct:.1%} 缺失")
# ── 6-B: 前向填充 (仅 ffill, 禁止 bfill!) ──
# bfill 会用"未来还没发生的数据"填补今天的空值 → Look-Ahead Bias
print(f"\n 6-B 缺失值处理 (ffill only):")
missing_before = price_panel.isna().sum().sum()
clean_prices = price_panel.ffill() # 步骤1: 前向填充
# 步骤2: 丢弃早期所有标的都 NaN 的行(通常是面板最前面)
clean_prices = clean_prices.dropna(how='any')
missing_after = clean_prices.isna().sum().sum()
print(f" 填充前 NaN: {missing_before} → 填充后 NaN: {missing_after}")
print(f" 丢弃了 {len(price_panel) - len(clean_prices)} 个早期交易日")
# ── 6-C: 收益率计算与异常值标记 ──
print(f"\n 6-C 收益率与涨跌停标记:")
returns = clean_prices.pct_change()
# 涨跌停检测: A 股 ETF 理论涨跌停 ±10%
LIMIT_PCT = 0.10
limit_up = (returns >= LIMIT_PCT - 0.002).sum()
limit_down = (returns <= -LIMIT_PCT + 0.002).sum()
for code in returns.columns[:5]:
up = limit_up[code]
dn = limit_down[code]
if up > 0 or dn > 0:
name = ETF_UNIVERSE.get(code, code)
print(f" {code} {name}: 涨停 {up} 天, 跌停 {dn}")
# ── 6-D: 数据连续性检查 ──
print(f"\n 6-D 数据连续性检查:")
date_gaps = 0
for col in clean_prices.columns:
ser = clean_prices[col].dropna()
if len(ser) < 2:
continue
# 检查自然日连续性 → 非交易日(周末/假日)是正常的
trading_gaps = ser.index.to_series().diff().dt.days
long_gaps = trading_gaps[trading_gaps > 7] # 连续停牌 > 7 天
if len(long_gaps) > 0:
name = ETF_UNIVERSE.get(col, col)
print(f" {col} {name}: {len(long_gaps)} 次连续停牌 > 7 天")
date_gaps += len(long_gaps)
if date_gaps == 0:
print(f" 所有 ETF 无连续停牌 > 7 天 ✓")
# ══════════════════════════════════════════════════════════════════════
# §7 数据存储:构建本地研究数据库
# ══════════════════════════════════════════════════════════════════════
#
# 存储原则是"存原始数据 + 存清洗后数据",不要只存一个。
# 原因: 你以后可能想换一种清洗方式,而原始数据丢了就回不去了。
# =============================================================================
print("\n[§7] 数据存储")
DATA_DIR = os.path.join(os.path.dirname(__file__), "data")
os.makedirs(DATA_DIR, exist_ok=True)
if etf_data:
# 保存清洗后的价格面板 (研究级)
clean_path = os.path.join(DATA_DIR, "etf_price_panel_clean.csv")
clean_prices.to_csv(clean_path)
print(f" ✓ 清洗后价格面板 → {clean_path}")
# 保存收益率面板
returns_path = os.path.join(DATA_DIR, "etf_returns_panel.csv")
returns.to_csv(returns_path)
print(f" ✓ 收益率面板 → {returns_path}")
if index_data and len(index_panel) > 0:
index_path = os.path.join(DATA_DIR, "index_prices.csv")
index_panel.to_csv(index_path)
print(f" ✓ 指数价格 → {index_path}")
print(f"\n 数据目录: {DATA_DIR}")
print(f" 文件列表: {os.listdir(DATA_DIR) if os.path.exists(DATA_DIR) else ''}")
# ══════════════════════════════════════════════════════════════════════
# §8 数据质量报告
# ══════════════════════════════════════════════════════════════════════
#
# 每次拉取数据后,生成一份质量报告,记录:
# - 各标的覆盖日期范围
# - 缺失值比例
# - 极端收益率事件
# - 数据拉取时间戳
# 这对后期回溯调试非常有用——"我用的数据是哪天拉的?有什么已知缺陷?"
# =============================================================================
print("\n[§8] 数据质量报告")
if etf_data:
report_lines = []
report_lines.append("=" * 60)
report_lines.append("数据质量报告 / Data Quality Report")
report_lines.append(f"生成时间: {datetime.now().isoformat()}")
report_lines.append(f"数据源: AKShare (fund_etf_hist_em)")
report_lines.append("=" * 60)
for code in clean_prices.columns:
name = ETF_UNIVERSE.get(code, code)
ser = clean_prices[code].dropna()
ret_ser = returns[code].dropna()
n = len(ser)
missing_input = price_panel[code].isna().sum()
report_lines.append(
f"\n{code} ({name}):"
f"\n 日期范围: {ser.index[0].date()} ~ {ser.index[-1].date()} ({n} 天)"
f"\n 原始缺失: {missing_input} 天 ({missing_input/len(price_panel):.1%})"
f"\n 日均收益率: {ret_ser.mean():+.6f}"
f"\n 日波动率: {ret_ser.std():.6f}"
f"\n 最大日涨幅: {ret_ser.max():+.2%}"
f"\n 最大日跌幅: {ret_ser.min():+.2%}"
)
# 打印报告
report_text = "\n".join(report_lines)
print(report_text)
# 保存报告
report_path = os.path.join(DATA_DIR, "data_quality_report.txt")
with open(report_path, "w", encoding="utf-8") as f:
f.write(report_text)
print(f"\n ✓ 质量报告已保存 → {report_path}")
print("\n" + "=" * 68)
print(" ✓ AKShare 数据获取 Demo 完成")
print("=" * 68)
print(f"""
关键收获:
1. AKShare 完全免费,覆盖 ETF / 指数 / 资金流向 / 融资融券
2. 真实数据需要处理: 上市日期不一致、停牌缺口、涨跌停标记
3. 填充缺失值只用 ffill() —— bfill() 会引入 Look-Ahead Bias
4. 数据存储时同时保留"原始""清洗后"两个版本
5. 每次拉取数据都应生成质量报告,记录已知缺陷
下一步: 将清洗后的 ETF 价格面板替换 demo_07 的合成数据,
重新运行 ETF 轮动策略回测,观察真实数据下的策略表现。
""")