feat: add demo script for Tushare Pro data acquisition

- Implemented a comprehensive demo for acquiring financial data using Tushare Pro.
- Included sections for registration, token configuration, and data fetching for A-shares, indices, and fundamental data.
- Added data cleaning and alignment processes, along with local database construction.
- Provided a comparison between Tushare and AKShare, highlighting key differences and use cases.
This commit is contained in:
tigerenwork 2026-06-07 15:33:37 +08:00
parent a9e1399cdc
commit b03a8f7dda
3 changed files with 6998 additions and 0 deletions

5923
data/index_prices.csv Normal file

File diff suppressed because it is too large Load Diff

458
demo_akshare_data.py Normal file
View File

@ -0,0 +1,458 @@
# =============================================================================
# Real Data Acquisition Demo — AKShare
# 真实数据获取演示 — AKShare 版
# =============================================================================
#
# AKShare 是一个完全免费的开源金融数据接口,无需注册、无需 Token。
# 覆盖 A 股/港股/美股/期货/外汇/宏观经济等数据,是目前个人投资者
# 最实用的免费数据源。
#
# Prerequisites / 安装:
# pip install akshare pandas numpy matplotlib
#
# Topics covered / 涵盖主题:
# §1 AKShare 简介与基本用法
# §2 获取 A 股行业 ETF 日线数据(轮动策略核心输入)
# §3 获取宽基指数数据沪深300 / 中证500
# §4 获取北向资金数据(情绪因子数据源)
# §5 获取融资融券数据(杠杆情绪数据源)
# §6 真实数据的清洗陷阱(与合成数据的关键差异)
# §7 数据存储:构建本地研究数据库
# §8 数据质量报告生成
# =============================================================================
from __future__ import annotations
import warnings
warnings.filterwarnings("ignore")
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
import time
import os
# ── 检查 akshare 是否安装 ──
try:
import akshare as ak
print(f"✓ AKShare 版本: {ak.__version__}")
except ImportError:
print("请先安装 AKShare: pip install akshare")
raise
print("=" * 68)
print(" 真实数据获取演示 — AKShare 版")
print(" Real Data Acquisition Demo with AKShare")
print("=" * 68)
# ══════════════════════════════════════════════════════════════════════
# §1 AKShare 简介与基本用法
# ══════════════════════════════════════════════════════════════════════
#
# AKShare 特点:
# • 完全免费,无需注册
# • 数据来源于东方财富、新浪财经、交易所等公开接口
# • 函数名遵循 fund_xxx / stock_xxx / macro_xxx 等命名规范
# • 每次调用触发一次 HTTP 请求,批量获取需注意频率
#
# 基本用法:
# df = ak.function_name(param1=value1, param2=value2)
# → 返回 pandas DataFrame
#
# 文档: https://akshare.akfamily.xyz/
print("\n[§1] AKShare 简介")
print(f" 数据接口数量: 数千个 (覆盖股票/基金/期货/外汇/宏观/另类)")
print(f" 数据来源: 东方财富、新浪财经、交易所公开接口等")
print(f" 费用: 完全免费")
# ══════════════════════════════════════════════════════════════════════
# §2 获取 A 股行业 ETF 日线数据
# ══════════════════════════════════════════════════════════════════════
#
# 这是 demo_07 (ETF 轮动策略) 的真实数据替代方案。
# 下面获取的 ETF 代码与 demo_07 的 ETF_UNIVERSE 一一对应。
#
# AKShare 接口: fund_etf_hist_em()
# symbol — ETF 代码 (如 "159995")
# period — "daily" / "weekly" / "monthly"
# start_date / end_date — "YYYYMMDD" 格式
# adjust — "" (不复权) / "qfq" (前复权) / "hfq" (后复权)
# ETF 通常用 "" 或 "qfq"
# =============================================================================
print("\n[§2] 获取行业 ETF 日线数据")
# A 股行业 ETF 池 — 与 demo_07 对应
ETF_UNIVERSE = {
"159995": "芯片ETF",
"159819": "人工智能ETF",
"516160": "新能源ETF",
"159928": "消费ETF",
"512170": "医疗ETF",
"512880": "证券ETF",
# 宽基 ETF (新增)
"510300": "沪深300ETF",
"510500": "中证500ETF",
# 国债 ETF (避险资产)
"511010": "国债ETF",
}
START_DATE = "20190101"
END_DATE = "20250601"
# ── 安全获取单只 ETF 数据 (带重试和限速) ──
def fetch_etf_safe(code: str, start: str, end: str, max_retries: int = 3):
"""
安全获取 ETF 日线数据失败时自动重试并控制请求频率
网络抖动或东方财富接口偶发不稳定时非常必要
"""
for attempt in range(max_retries):
try:
df = ak.fund_etf_hist_em(
symbol=code,
period="daily",
start_date=start,
end_date=end,
adjust="qfq", # 前复权
)
time.sleep(0.5) # 礼貌限速,避免被封 IP
return df
except Exception as e:
print(f"{code}{attempt+1} 次获取失败: {e}")
if attempt < max_retries - 1:
time.sleep(2.0)
print(f"{code} 全部 {max_retries} 次获取失败,跳过")
return None
etf_data = {}
for code, name in ETF_UNIVERSE.items():
print(f" 获取 {code} ({name})...", end=" ")
df = fetch_etf_safe(code, START_DATE, END_DATE)
if df is not None:
etf_data[code] = df
print(f"{len(df)} 行, {df['日期'].iloc[0]} ~ {df['日期'].iloc[-1]}")
else:
print("失败 ✗")
print(f"\n 成功获取: {len(etf_data)} / {len(ETF_UNIVERSE)} 只 ETF")
# ── 构建价格面板 (Multi-Asset Price Panel) ──
# 这是最关键的步骤:将多只 ETF 的收盘价对齐到统一的日期轴上。
# 真实数据中,不同 ETF 可能有不同的上市日期和停牌日期,
# 对齐方式的选择直接影响后续所有分析。
if etf_data:
price_panel = pd.DataFrame()
for code, df in etf_data.items():
# 设置日期索引,提取收盘价
df_indexed = df.set_index("日期")["收盘"].copy()
df_indexed.name = code
price_panel = pd.concat([price_panel, df_indexed], axis=1)
# 确保日期索引排序
price_panel.index = pd.to_datetime(price_panel.index)
price_panel = price_panel.sort_index()
print(f"\n 价格面板: {price_panel.shape[0]} 个交易日 × {price_panel.shape[1]} 只 ETF")
print(f" 日期范围: {price_panel.index[0].date()} ~ {price_panel.index[-1].date()}")
print(f" 各 ETF 起止日期:")
for col in price_panel.columns:
valid = price_panel[col].dropna()
if len(valid) > 0:
print(f" {col} ({ETF_UNIVERSE.get(col, '?')}): {valid.index[0].date()} ~ {valid.index[-1].date()}, {len(valid)}")
# ══════════════════════════════════════════════════════════════════════
# §3 获取宽基指数数据
# ══════════════════════════════════════════════════════════════════════
#
# AKShare 接口: stock_zh_index_daily()
# symbol — 指数代码:
# "sh000300" → 沪深300
# "sh000905" → 中证500
# "sz399006" → 创业板指
# "sh000016" → 上证50
# =============================================================================
print("\n[§3] 获取宽基指数数据")
INDEX_CODES = {
"sh000300": "沪深300",
"sh000905": "中证500",
"sz399006": "创业板指",
"sh000016": "上证50",
}
index_data = {}
for symbol, name in INDEX_CODES.items():
print(f" 获取 {symbol} ({name})...", end=" ")
try:
df = ak.stock_zh_index_daily(symbol=symbol)
index_data[symbol] = df
print(f"{len(df)} 行, {df['date'].iloc[0]} ~ {df['date'].iloc[-1]}")
time.sleep(0.3)
except Exception as e:
print(f"失败: {e}")
# ── 计算指数滚动估值参考 (PE/PB 分位数需另接接口,此处展示收益基座) ──
if index_data:
# 对齐为面板
index_panel = pd.DataFrame()
for sym, df in index_data.items():
s = df.set_index("date")["close"].copy()
s.name = sym
index_panel = pd.concat([index_panel, s], axis=1)
index_panel.index = pd.to_datetime(index_panel.index)
index_panel = index_panel.sort_index()
# 计算滚动年化收益 (252 日)
rolling_ret = index_panel.pct_change(252).dropna()
if len(rolling_ret) > 0:
print(f"\n 最新滚动年化收益 ({rolling_ret.index[-1].date()}):")
for col in rolling_ret.columns:
name = INDEX_CODES.get(col, col)
val = rolling_ret[col].iloc[-1]
print(f" {name}: {val:+.2%}")
# ══════════════════════════════════════════════════════════════════════
# §4 获取北向资金数据(关键情绪因子数据源)
# ══════════════════════════════════════════════════════════════════════
#
# 北向资金 (North-bound Capital Flow) 指通过沪深港通从香港流入 A 股
# 的外资。净买入额被视为外资对 A 股情绪的"投票机"。
#
# AKShare 接口: stock_hsgt_hist_em() → 沪深港通历史资金流向
# stock_hsgt_north_net_flow_in_em() → 北向净流入
# =============================================================================
print("\n[§4] 获取北向资金数据")
try:
# 北向资金日频净流入
north_flow = ak.stock_hsgt_north_net_flow_in_em(symbol="北上")
if north_flow is not None and len(north_flow) > 0:
print(f" 北向资金净流入: {north_flow.shape}")
print(f" 列名: {list(north_flow.columns)}")
# 标准化为通用格式
if "date" in north_flow.columns and "value" in north_flow.columns:
north_flow = north_flow.rename(columns={"date": "日期", "value": "净流入_亿"})
print(f" 最新数据:\n{north_flow.tail(3)}")
else:
print(" 未获取到北向资金数据 (接口可能已变更)")
time.sleep(0.5)
# 沪深港通历史资金流向 (更详细)
hsgt = ak.stock_hsgt_hist_em(symbol="沪股通")
if hsgt is not None and len(hsgt) > 0:
print(f"\n 沪深港通历史资金: {hsgt.shape}")
print(f" 列名: {list(hsgt.columns)}")
print(f" 最新 3 行:\n{hsgt.tail(3)}")
except Exception as e:
print(f" 北向资金获取失败: {e}")
print(f" 提示: AKShare 接口可能已更新,请查阅最新文档")
# ══════════════════════════════════════════════════════════════════════
# §5 获取融资融券数据(杠杆情绪数据源)
# ══════════════════════════════════════════════════════════════════════
#
# 融资余额 (Margin Balance) 反映散户借钱买股的意愿。
# 融资余额大幅上升 → 散户看多情绪高涨(注意:极端值往往是反向指标)
# 融资余额骤降 → 恐慌去杠杆,市场底部特征之一
#
# AKShare 接口: stock_margin_sz() / stock_margin_sh() → 深圳/上海融资融券明细
# stock_margin_detail_sse() → 上交所融资融券汇总
# =============================================================================
print("\n[§5] 获取融资融券数据")
try:
# 上交所融资融券汇总 (日频)
# stock_margin_sse: 历史序列 (start_date/end_date)
# stock_margin_detail_sse: 单日快照 (date='YYYYMMDD')
margin_sse = ak.stock_margin_sse(start_date="20240101", end_date="20250601")
if margin_sse is not None and len(margin_sse) > 0:
print(f" 上交所融资融券: {margin_sse.shape}")
print(f" 列名: {list(margin_sse.columns)}")
print(f" 最新数据:\n{margin_sse.tail(3)}")
else:
print(" 未获取到融资融券数据")
except Exception as e:
print(f" 融资融券获取失败: {e}")
print(f" 说明: 此接口可能需要特定参数格式,请查阅文档")
print(f" 备选: 可直接使用东方财富网页爬取")
# ══════════════════════════════════════════════════════════════════════
# §6 真实数据的清洗陷阱
# ══════════════════════════════════════════════════════════════════════
#
# 合成数据 (Demo 系列的 GBM/Factor Model) 没有这些问题:
# ① 不同标的上市日期不同 → 面板前段大量 NaN
# ② 停牌 (suspension) → 面板中间出现 NaN 缺口
# ③ 涨跌停日 → 价格"真实"但无法成交
# ④ ETF 分红/拆分 → 不复权价格存在跳跃
# ⑤ 数据源异常 → 极少数日期的 OHLC 数据错乱
# =============================================================================
print("\n[§6] 真实数据清洗演示")
if etf_data:
# ── 6-A: 缺失值分析 ──
print(f"\n 6-A 缺失值分析:")
missing_pct = price_panel.isna().mean().sort_values(ascending=False)
for code in missing_pct.index[:5]:
pct = missing_pct[code]
name = ETF_UNIVERSE.get(code, code)
print(f" {code} {name}: {pct:.1%} 缺失")
# ── 6-B: 前向填充 (仅 ffill, 禁止 bfill!) ──
# bfill 会用"未来还没发生的数据"填补今天的空值 → Look-Ahead Bias
print(f"\n 6-B 缺失值处理 (ffill only):")
missing_before = price_panel.isna().sum().sum()
clean_prices = price_panel.ffill() # 步骤1: 前向填充
# 步骤2: 丢弃早期所有标的都 NaN 的行(通常是面板最前面)
clean_prices = clean_prices.dropna(how='any')
missing_after = clean_prices.isna().sum().sum()
print(f" 填充前 NaN: {missing_before} → 填充后 NaN: {missing_after}")
print(f" 丢弃了 {len(price_panel) - len(clean_prices)} 个早期交易日")
# ── 6-C: 收益率计算与异常值标记 ──
print(f"\n 6-C 收益率与涨跌停标记:")
returns = clean_prices.pct_change()
# 涨跌停检测: A 股 ETF 理论涨跌停 ±10%
LIMIT_PCT = 0.10
limit_up = (returns >= LIMIT_PCT - 0.002).sum()
limit_down = (returns <= -LIMIT_PCT + 0.002).sum()
for code in returns.columns[:5]:
up = limit_up[code]
dn = limit_down[code]
if up > 0 or dn > 0:
name = ETF_UNIVERSE.get(code, code)
print(f" {code} {name}: 涨停 {up} 天, 跌停 {dn}")
# ── 6-D: 数据连续性检查 ──
print(f"\n 6-D 数据连续性检查:")
date_gaps = 0
for col in clean_prices.columns:
ser = clean_prices[col].dropna()
if len(ser) < 2:
continue
# 检查自然日连续性 → 非交易日(周末/假日)是正常的
trading_gaps = ser.index.to_series().diff().dt.days
long_gaps = trading_gaps[trading_gaps > 7] # 连续停牌 > 7 天
if len(long_gaps) > 0:
name = ETF_UNIVERSE.get(col, col)
print(f" {col} {name}: {len(long_gaps)} 次连续停牌 > 7 天")
date_gaps += len(long_gaps)
if date_gaps == 0:
print(f" 所有 ETF 无连续停牌 > 7 天 ✓")
# ══════════════════════════════════════════════════════════════════════
# §7 数据存储:构建本地研究数据库
# ══════════════════════════════════════════════════════════════════════
#
# 存储原则是"存原始数据 + 存清洗后数据",不要只存一个。
# 原因: 你以后可能想换一种清洗方式,而原始数据丢了就回不去了。
# =============================================================================
print("\n[§7] 数据存储")
DATA_DIR = os.path.join(os.path.dirname(__file__), "data")
os.makedirs(DATA_DIR, exist_ok=True)
if etf_data:
# 保存清洗后的价格面板 (研究级)
clean_path = os.path.join(DATA_DIR, "etf_price_panel_clean.csv")
clean_prices.to_csv(clean_path)
print(f" ✓ 清洗后价格面板 → {clean_path}")
# 保存收益率面板
returns_path = os.path.join(DATA_DIR, "etf_returns_panel.csv")
returns.to_csv(returns_path)
print(f" ✓ 收益率面板 → {returns_path}")
if index_data and len(index_panel) > 0:
index_path = os.path.join(DATA_DIR, "index_prices.csv")
index_panel.to_csv(index_path)
print(f" ✓ 指数价格 → {index_path}")
print(f"\n 数据目录: {DATA_DIR}")
print(f" 文件列表: {os.listdir(DATA_DIR) if os.path.exists(DATA_DIR) else ''}")
# ══════════════════════════════════════════════════════════════════════
# §8 数据质量报告
# ══════════════════════════════════════════════════════════════════════
#
# 每次拉取数据后,生成一份质量报告,记录:
# - 各标的覆盖日期范围
# - 缺失值比例
# - 极端收益率事件
# - 数据拉取时间戳
# 这对后期回溯调试非常有用——"我用的数据是哪天拉的?有什么已知缺陷?"
# =============================================================================
print("\n[§8] 数据质量报告")
if etf_data:
report_lines = []
report_lines.append("=" * 60)
report_lines.append("数据质量报告 / Data Quality Report")
report_lines.append(f"生成时间: {datetime.now().isoformat()}")
report_lines.append(f"数据源: AKShare (fund_etf_hist_em)")
report_lines.append("=" * 60)
for code in clean_prices.columns:
name = ETF_UNIVERSE.get(code, code)
ser = clean_prices[code].dropna()
ret_ser = returns[code].dropna()
n = len(ser)
missing_input = price_panel[code].isna().sum()
report_lines.append(
f"\n{code} ({name}):"
f"\n 日期范围: {ser.index[0].date()} ~ {ser.index[-1].date()} ({n} 天)"
f"\n 原始缺失: {missing_input} 天 ({missing_input/len(price_panel):.1%})"
f"\n 日均收益率: {ret_ser.mean():+.6f}"
f"\n 日波动率: {ret_ser.std():.6f}"
f"\n 最大日涨幅: {ret_ser.max():+.2%}"
f"\n 最大日跌幅: {ret_ser.min():+.2%}"
)
# 打印报告
report_text = "\n".join(report_lines)
print(report_text)
# 保存报告
report_path = os.path.join(DATA_DIR, "data_quality_report.txt")
with open(report_path, "w", encoding="utf-8") as f:
f.write(report_text)
print(f"\n ✓ 质量报告已保存 → {report_path}")
print("\n" + "=" * 68)
print(" ✓ AKShare 数据获取 Demo 完成")
print("=" * 68)
print(f"""
关键收获:
1. AKShare 完全免费覆盖 ETF / 指数 / 资金流向 / 融资融券
2. 真实数据需要处理: 上市日期不一致停牌缺口涨跌停标记
3. 填充缺失值只用 ffill() bfill() 会引入 Look-Ahead Bias
4. 数据存储时同时保留"原始""清洗后"两个版本
5. 每次拉取数据都应生成质量报告记录已知缺陷
下一步: 将清洗后的 ETF 价格面板替换 demo_07 的合成数据
重新运行 ETF 轮动策略回测观察真实数据下的策略表现
""")

617
demo_tushare_data.py Normal file
View File

@ -0,0 +1,617 @@
# =============================================================================
# Real Data Acquisition Demo — Tushare Pro
# 真实数据获取演示 — Tushare Pro 版
# =============================================================================
#
# Tushare Pro 是国内最成熟的金融数据 API 之一,数据质量和稳定性
# 优于 AKShare适合策略验证通过后切换到生产级数据。
#
# 与 AKShare 的核心差异:
# • 需要注册获取 Token (免费注册,基础接口免费)
# • 积分系统: 注册送 120 分,部分接口需要更高积分
# • 股票代码格式: "000001.SZ" (而非纯数字)
# • 数据质量更高、接口更稳定
# • 支持基本面数据 (PE/PB/ROE) 和因子数据
#
# Prerequisites / 前置准备:
# 1. 注册 Tushare: https://tushare.pro/register
# 2. 获取 Token: 登录后 → 个人主页 → 接口 Token
# 3. 安装: pip install tushare pandas numpy
#
# Topics covered / 涵盖主题:
# §1 Tushare 注册与 Token 配置
# §2 获取 A 股个股日线数据
# §3 获取指数日线数据
# §4 获取股票基础信息 (上市日期/行业/市值)
# §5 获取财务数据 (PE/PB/ROE — 价值因子的数据源)
# §6 获取指数成分股权重 (组合优化的输入)
# §7 获取行业分类 (申万行业)
# §8 数据清洗与对齐
# §9 构建本地数据库
# §10 AKShare vs Tushare 对比总结
# =============================================================================
from __future__ import annotations
import warnings
warnings.filterwarnings("ignore")
import numpy as np
import pandas as pd
import os
import time
from datetime import datetime
# ── 检查 tushare 是否安装 ──
try:
import tushare as ts
print(f"✓ Tushare 版本: {ts.__version__}")
except ImportError:
print("请先安装 Tushare: pip install tushare")
raise
print("=" * 68)
print(" 真实数据获取演示 — Tushare Pro 版")
print(" Real Data Acquisition Demo with Tushare Pro")
print("=" * 68)
# ══════════════════════════════════════════════════════════════════════
# §1 Tushare 注册与 Token 配置
# ══════════════════════════════════════════════════════════════════════
#
# 使用步骤:
# 1. 访问 https://tushare.pro/register 注册 (用手机号即可)
# 2. 登录后 → 个人主页 → 接口 Token → 复制那一长串字符
# 3. 把 Token 粘贴到下面 (或设置环境变量 TUSHARE_TOKEN)
#
# 积分说明:
# 注册: 120 分 (基础接口可用)
# 完善个人信息: +20 分
# 推荐他人注册: 各 +50 分
# 捐赠: 200 RMB = 3000 分 (解锁全部接口)
#
# 日常使用建议:
# - 将 Token 存为环境变量而非硬编码在代码中
# - 每个接口都标注所需最低积分,避免超限调用
# =============================================================================
print("\n[§1] Tushare Token 配置")
TUSHARE_TOKEN = os.environ.get("TUSHARE_TOKEN", "")
if not TUSHARE_TOKEN:
print(" ⚠ 未检测到 TUSHARE_TOKEN 环境变量")
print(" ── 获取 Token 的步骤 ──")
print(" 1. 访问 https://tushare.pro/register 注册")
print(" 2. 登录后进入个人主页 → 接口 Token")
print(" 3. 复制 Token 后,设置环境变量:")
print(" export TUSHARE_TOKEN='你的token'")
print()
print(" 或者直接在下方代码中填入 (仅本地测试用):")
print(' TUSHARE_TOKEN = "你的token"')
print()
print(" ⚠ 本 Demo 将继续运行,但数据获取部分会跳过或使用模拟数据")
print(" 您仍然可以看到完整的代码结构和数据清洗逻辑。")
print()
# 尝试初始化 Pro API
PRO = None
TOKEN_VALID = False
if TUSHARE_TOKEN:
try:
PRO = ts.pro_api(TUSHARE_TOKEN)
# 用最简单接口测试 Token 是否有效
test = PRO.trade_cal(exchange='SSE', start_date='20250601', end_date='20250605')
TOKEN_VALID = len(test) > 0
print(f" ✓ Token 验证成功")
print(f" ✓ Pro API 就绪")
except Exception as e:
print(f" ✗ Token 验证失败: {e}")
else:
print(f" - Token 未设置,跳过远程数据获取")
print(f" - 代码结构完整,可阅读学习流程逻辑")
# ══════════════════════════════════════════════════════════════════════
# §2 获取 A 股个股日线数据
# ══════════════════════════════════════════════════════════════════════
#
# 接口: pro.daily()
# 所需积分: 120 分 (注册即送)
# 每日调用上限: 200 次
# 单次最多返回: 约 5000 条记录
#
# 参数:
# ts_code — 股票代码 "000001.SZ" / "600000.SH"
# trade_date— 交易日期 "YYYYMMDD"
# start_date/end_date — 日期范围
#
# 注意: Tushare 的日期格式是 "YYYYMMDD" (无连字符)
# =============================================================================
print("\n[§2] 获取个股日线数据")
# ── 构建 A 股代表性标的池 ──
STOCK_POOL = {
"000001.SZ": "平安银行",
"000002.SZ": "万科A",
"000858.SZ": "五粮液",
"600519.SH": "贵州茅台",
"600036.SH": "招商银行",
"600276.SH": "恒瑞医药",
"300750.SZ": "宁德时代",
"601318.SH": "中国平安",
}
# ── 日期范围转换 ──
# Demo 系列用 "YYYY-MM-DD"Tushare 用 "YYYYMMDD"
START_DATE = "20190101"
END_DATE = "20250601"
def fetch_daily_safe(pro, ts_code, start, end, max_retries=3):
"""
安全获取个股日线数据
Tushare 每次调用返回约 5000 单只股票 6 年约 1500 个交易日
一次调用即可覆盖如果有更多股票/更长区间需要分多次调用并拼接
"""
for attempt in range(max_retries):
try:
df = pro.daily(
ts_code=ts_code,
start_date=start,
end_date=end,
fields='ts_code,trade_date,open,high,low,close,pre_close,'
'change,pct_chg,vol,amount'
)
time.sleep(0.3) # 频率控制: Tushare 每分钟上限约 200 次
return df
except Exception as e:
print(f"{ts_code}{attempt+1} 次失败: {e}")
if attempt < max_retries - 1:
time.sleep(2.0)
return None
if TOKEN_VALID and PRO:
stock_daily_data = {}
for code, name in STOCK_POOL.items():
print(f" 获取 {code} ({name})...", end=" ")
df = fetch_daily_safe(PRO, code, START_DATE, END_DATE)
if df is not None and len(df) > 0:
stock_daily_data[code] = df
print(f"{len(df)} 行, {df['trade_date'].iloc[-1]} ~ {df['trade_date'].iloc[0]}")
else:
print("无数据")
if stock_daily_data:
# ── 构建价格面板 ──
stock_price_panel = pd.DataFrame()
for code, df in stock_daily_data.items():
# Tushare 返回的 trade_date 是 YYYYMMDD 格式
df = df.copy()
df['trade_date'] = pd.to_datetime(df['trade_date'], format='%Y%m%d')
df = df.sort_values('trade_date')
s = df.set_index('trade_date')['close'].copy()
s.name = code
stock_price_panel = pd.concat([stock_price_panel, s], axis=1)
print(f"\n 个股价格面板: {stock_price_panel.shape}")
# 展示最新 3 行
print(f" 最新 3 个交易日:")
print(stock_price_panel.tail(3).to_string())
else:
print(" (跳过 — Token 未配置)")
print(f" 演示: 如果 Token 有效,会拉取以下 {len(STOCK_POOL)} 只股票的数据")
for code, name in STOCK_POOL.items():
print(f" {code} ({name})")
# ══════════════════════════════════════════════════════════════════════
# §3 获取指数日线数据
# ══════════════════════════════════════════════════════════════════════
#
# 接口: pro.index_daily()
# 所需积分: 120 分
#
# 指数代码:
# 000300.SH — 沪深300
# 000905.SH — 中证500
# 000016.SH — 上证50
# 399006.SZ — 创业板指
# =============================================================================
print("\n[§3] 获取指数日线数据")
INDEX_CODES = {
"000300.SH": "沪深300",
"000905.SH": "中证500",
"000016.SH": "上证50",
"399006.SZ": "创业板指",
}
if TOKEN_VALID and PRO:
index_data = {}
for code, name in INDEX_CODES.items():
print(f" 获取 {code} ({name})...", end=" ")
try:
df = PRO.index_daily(
ts_code=code,
start_date=START_DATE,
end_date=END_DATE,
fields='ts_code,trade_date,close,open,high,low,vol,amount,pct_chg'
)
time.sleep(0.3)
if df is not None and len(df) > 0:
index_data[code] = df
print(f"{len(df)}")
else:
print("无数据")
except Exception as e:
print(f"失败: {e}")
if index_data:
index_panel = pd.DataFrame()
for code, df in index_data.items():
df = df.copy()
df['trade_date'] = pd.to_datetime(df['trade_date'], format='%Y%m%d')
df = df.sort_values('trade_date')
s = df.set_index('trade_date')['close']
s.name = code
index_panel = pd.concat([index_panel, s], axis=1)
print(f"\n 指数面板: {index_panel.shape}")
# 年化收益对比
ann_rets = (index_panel.iloc[-1] / index_panel.iloc[0]) ** (252.0 / len(index_panel)) - 1
print(" 年化收益率 (CAGR):")
for code, val in ann_rets.items():
name = INDEX_CODES.get(code, code)
print(f" {name}: {val:+.2%}")
else:
print(" (跳过 — Token 未配置)")
# ══════════════════════════════════════════════════════════════════════
# §4 获取股票基础信息
# ══════════════════════════════════════════════════════════════════════
#
# 接口: pro.stock_basic()
# 所需积分: 120 分
# 功能: 获取股票列表,包含股票代码、名称、上市日期、退市日期、行业、地区等
# =============================================================================
print("\n[§4] 获取股票基础信息")
if TOKEN_VALID and PRO:
try:
# 获取沪深两市全部 A 股列表
stock_basic = PRO.stock_basic(
exchange='',
list_status='L', # L=上市, D=退市, P=暂停上市
fields='ts_code,symbol,name,area,industry,market,list_date,'
'delist_date,curr_type,list_status,enname'
)
if stock_basic is not None and len(stock_basic) > 0:
print(f" 当前上市股票: {len(stock_basic)}")
# 退市股票 (用于幸存者偏差检查!)
# 如果只拉当前上市股票,回测会漏掉已退市的垃圾公司
# → 幸存者偏差 (Survivorship Bias) ← 这是实盘最大的坑之一
delisted = PRO.stock_basic(
exchange='',
list_status='D', # 退市
fields='ts_code,symbol,name,list_date,delist_date'
)
if delisted is not None and len(delisted) > 0:
print(f" 历史退市股票: {len(delisted)} 只 ← 回测必须包含!")
print(f" 退市样本 (前 5 只):")
for _, row in delisted.head(5).iterrows():
print(f" {row['ts_code']} {row['name']} "
f"{row['list_date']} ~ {row['delist_date']}")
# 行业分布
if 'industry' in stock_basic.columns:
industry_counts = stock_basic['industry'].value_counts().head(10)
print(f"\n 行业分布 Top 10:")
for ind, cnt in industry_counts.items():
print(f" {ind}: {cnt}")
except Exception as e:
print(f" 获取失败: {e}")
else:
print(" (跳过 — Token 未配置)")
print(" 提示: stock_basic() 可获取全市场股票列表和退市记录")
print(" 退市数据对消除幸存者偏差至关重要")
# ══════════════════════════════════════════════════════════════════════
# §5 获取财务数据 (价值因子的数据源)
# ══════════════════════════════════════════════════════════════════════
#
# 这是 Tushare 相比 AKShare 最大的优势领域 ——
# AKShare 也有部分财务接口,但覆盖度和稳定性不如 Tushare。
#
# 接口: pro.daily_basic() — 每日指标
# 所需积分: 300 分 (需升级)
# 字段: pe, pe_ttm, pb, ps, ps_ttm, dv_ratio, total_share, float_share,
# total_mv, circ_mv
#
# 接口: pro.fina_indicator() — 财务指标
# 所需积分: 600 分 (需进一步升级)
# 字段: roe, roa, grossprofit_margin, netprofit_margin, debt_to_assets 等
# =============================================================================
print("\n[§5] 获取财务数据")
if TOKEN_VALID and PRO:
try:
# daily_basic: 日频估值指标 (PE/PB/市值)
daily_basic = PRO.daily_basic(
ts_code='000001.SZ',
start_date='20240101',
end_date='20250601',
fields='ts_code,trade_date,close,pe,pe_ttm,pb,ps,total_mv,circ_mv'
)
if daily_basic is not None and len(daily_basic) > 0:
print(f" 平安银行 (000001.SZ) 日频估值: {len(daily_basic)}")
print(f" 字段: {list(daily_basic.columns)}")
print(f" 最新 PE_TTM / PB / 市值:")
latest = daily_basic.iloc[0]
print(f" PE(TTM): {latest.get('pe_ttm', 'N/A')}")
print(f" PB: {latest.get('pb', 'N/A')}")
print(f" 总市值: {latest.get('total_mv', 'N/A')} 万元")
print(f"\n ⚠ 注意: 如果返回的 pe_ttm 都是 NaN, 说明积分不足 (需 300 分)")
else:
print(" 未获取到数据 (积分可能不足)")
except Exception as e:
print(f" 获取失败: {e}")
print(f" 提示: daily_basic 需要 300 积分,注册仅送 120 分")
else:
print(" (跳过 — Token 未配置)")
print(" 提示: Tushare 的 daily_basic() 和 fina_indicator() 可获取")
print(" PE/PB/ROE/ROA/毛利率/资产负债率等基本面数据")
print(" 这些都是价值因子 (Value Factor) 的核心输入")
# ══════════════════════════════════════════════════════════════════════
# §6 获取指数成分股权重
# ══════════════════════════════════════════════════════════════════════
#
# 接口: pro.index_weight() — 指数成分股权重
# 所需积分: 120 分
# 用途: 组合优化 (Black-Litterman 的市场均衡权重)
# =============================================================================
print("\n[§6] 获取指数成分股权重")
if TOKEN_VALID and PRO:
try:
# 沪深300 成分股权重 (每月更新)
index_weights = PRO.index_weight(
index_code='000300.SH',
start_date='20240101',
end_date='20250601',
)
if index_weights is not None and len(index_weights) > 0:
print(f" 沪深300 成分股权重: {len(index_weights)} 条记录")
print(f" 字段: {list(index_weights.columns)}")
# 最新一期 Top 10
latest_date = index_weights['trade_date'].max() if 'trade_date' in index_weights.columns else None
if latest_date is not None:
latest_weights = index_weights[
index_weights['trade_date'] == latest_date
].nlargest(10, 'weight') if 'weight' in index_weights.columns else None
if latest_weights is not None and len(latest_weights) > 0:
print(f"\n {latest_date} 前 10 大权重股:")
for _, row in latest_weights.iterrows():
print(f" {row['con_code']} {row.get('con_name', '')}: "
f"{row['weight']:.2%}")
else:
print(" 未获取到数据")
except Exception as e:
print(f" 获取失败: {e}")
else:
print(" (跳过 — Token 未配置)")
print(" 提示: index_weight() 可获取沪深300/中证500等指数的成分股权重")
print(" 这是 Black-Litterman 模型市场均衡组合的必需输入")
# ══════════════════════════════════════════════════════════════════════
# §7 获取行业分类 (申万行业)
# ══════════════════════════════════════════════════════════════════════
#
# 申万行业分类是中国 A 股投研最常用的行业分类标准。
# 在组合优化中 (demo_05), 行业约束需要知道每只股票属于哪个行业。
#
# 接口: pro.index_classify() — 申万行业分类
# 所需积分: 120 分
# =============================================================================
print("\n[§7] 获取行业分类")
if TOKEN_VALID and PRO:
try:
# 获取申万一级行业分类 (level='L1')
sw_classify = PRO.index_classify(
level='L1',
src='SW2021' # 申万2021版行业分类
)
if sw_classify is not None and len(sw_classify) > 0:
print(f" 申万一级行业: {sw_classify['industry_name'].nunique()}")
print(f" 各行业成分股数量 Top 10:")
industry_counts = sw_classify.groupby('industry_name').size().sort_values(ascending=False)
for name, cnt in industry_counts.head(10).items():
print(f" {name}: {cnt}")
else:
print(" 未获取到数据 (接口可能已变更)")
except Exception as e:
print(f" 获取失败: {e}")
print(f" 提示: 申万行业分类接口可能已更新,建议查阅最新文档")
else:
print(" (跳过 — Token 未配置)")
# ══════════════════════════════════════════════════════════════════════
# §8 数据清洗与对齐 (Tushare 特有)
# ══════════════════════════════════════════════════════════════════════
# =============================================================================
print("\n[§8] 数据清洗与对齐")
if TOKEN_VALID and PRO and 'stock_daily_data' in dir() and stock_daily_data:
# ── 缺失值分析 ──
missing = stock_price_panel.isna().sum()
print(f" 各标的缺失天数:")
for code, cnt in missing.items():
name = STOCK_POOL.get(code, code)
print(f" {code} ({name}): {cnt} 天 ({cnt/len(stock_price_panel):.1%})")
# ── ffill + 丢弃行首 ──
clean_prices = stock_price_panel.ffill().dropna(how='any')
print(f"\n 清洗前: {stock_price_panel.shape[0]}")
print(f" 清洗后: {clean_prices.shape[0]} 天 (丢弃 {stock_price_panel.shape[0] - clean_prices.shape[0]} 天)")
# ── 收益率面板 ──
returns = clean_prices.pct_change().dropna()
print(f" 收益率面板: {returns.shape}")
# ── 相关性矩阵 ──
corr = returns.corr()
print(f"\n 收益率相关性 (仅展示对角外最高相关的 3 对):")
corr_unstack = corr.where(
~np.eye(len(corr), dtype=bool)
).unstack().dropna()
top_pairs = corr_unstack.abs().nlargest(6)
for (s1, s2), val in top_pairs.items():
print(f" {s1}{s2}: {val:+.3f}")
else:
print(" (跳过 — 无数据可清洗)")
print(" 提示: 清洗流程与 demo_akshare_data.py 相同,核心原则:")
print(" 1. 只用 ffill(), 禁止 bfill()")
print(" 2. 填充后 dropna(how='any') 丢弃仍有 NaN 的行")
print(" 3. pct_change() 后再 dropna() 得到收益率面板")
# ══════════════════════════════════════════════════════════════════════
# §9 构建本地数据库
# ══════════════════════════════════════════════════════════════════════
# =============================================================================
print("\n[§9] 构建本地数据库")
DATA_DIR = os.path.join(os.path.dirname(__file__), "data", "tushare")
os.makedirs(DATA_DIR, exist_ok=True)
if TOKEN_VALID and PRO:
saved_files = []
if 'stock_daily_data' in dir() and stock_daily_data:
# 保存清洗后价格面板
clean_path = os.path.join(DATA_DIR, "stock_price_clean.csv")
clean_prices.to_csv(clean_path)
saved_files.append(clean_path)
# 保存收益率
ret_path = os.path.join(DATA_DIR, "stock_returns.csv")
returns.to_csv(ret_path)
saved_files.append(ret_path)
if 'index_panel' in dir() and len(index_panel) > 0:
idx_path = os.path.join(DATA_DIR, "index_prices.csv")
index_panel.to_csv(idx_path)
saved_files.append(idx_path)
# 保存拉取元信息
meta_path = os.path.join(DATA_DIR, "fetch_metadata.txt")
with open(meta_path, "w", encoding="utf-8") as f:
f.write(f"数据源: Tushare Pro\n")
f.write(f"拉取时间: {datetime.now().isoformat()}\n")
f.write(f"数据范围: {START_DATE} ~ {END_DATE}\n")
f.write(f"标的数量: {len(STOCK_POOL)} 只个股, {len(INDEX_CODES)} 个指数\n")
saved_files.append(meta_path)
for f in saved_files:
print(f"{f}")
print(f"\n 数据目录: {DATA_DIR}")
print(f" 文件列表: {os.listdir(DATA_DIR) if os.path.exists(DATA_DIR) else ''}")
else:
print(" (跳过 — 无数据可保存)")
# ══════════════════════════════════════════════════════════════════════
# §10 AKShare vs Tushare 对比总结
# ══════════════════════════════════════════════════════════════════════
# =============================================================================
print("\n" + "=" * 68)
print(" §10 AKShare vs Tushare 对比总结")
print("=" * 68)
print("""
维度 AKShare Tushare Pro
费用 完全免费 基础免费(120)
高级需积分/捐赠
注册 不需要 需要手机号注册
数据覆盖 极广 (//// 聚焦 A /指数/
/另类/舆情等) 基金/财务/因子
数据稳定性 中等 (依赖爬虫, (独立数据源,
接口偶发变更) 专业维护)
基本面数据 有限 (PE/PB 基本) 丰富 (PE/PB/ROE/
财务报表等)
退市股票数据 获取较难 stock_basic() 直接
包含退市记录
因子数据 (Barra 风格因子)
股票代码格式 "000001" (纯数字) "000001.SZ" (含市场)
调用频率限制 无硬限制 (建议礼貌) (按积分等级)
最佳使用场景 研究探索快速验证 策略验证通过后的
数据覆盖第一选择 生产级数据源
推荐路径:
研究阶段 AKShare (零成本快速验证想法)
策略定型 Tushare (数据质量更高接口更稳定)
实盘运行 Tushare + 本地缓存数据库 (减少 API 依赖)
关键提醒:
AKShare 接口偶尔变更升级版本前记得检查 changelog
Tushare 积分用完会拒绝请求注意剩余调用次数
两个数据源的收盘价可能存在微小差异 (取整/复权方式)
回测结果因此会有细微不同这是正常的
长期存储请用 Parquet 格式 ( CSV 更快更省空间):
df.to_parquet("data.parquet")
df = pd.read_parquet("data.parquet")
""")
print("=" * 68)
print(" ✓ Tushare Pro 数据获取 Demo 完成")
print("=" * 68)
print(f"""
关键收获:
1. Tushare 需要 Token (免费注册)积分系统决定可用接口范围
2. 股票代码格式为 "000001.SZ" / "600000.SH" (含交易所后缀)
3. Tushare 最大优势: 财务数据 (PE/PB/ROE) + 退市数据 + 稳定性
4. daily_basic() fina_indicator() 是价值因子的核心数据源
5. index_weight() 提供 Black-Litterman 所需的市场权重
6. stock_basic(list_status='D') 是消除幸存者偏差的关键
7. 研究阶段用 AKShare定型后用 Tushare 生产化
下一步:
- Tushare 拉取的数据写入 demo_04 (Alpha 因子)
用真实 PE/PB/ROE 替代合成因子
- 用真实退市记录修正回测的幸存者偏差
""")