# ============================================================================= # Real Data Acquisition Demo — AKShare # 真实数据获取演示 — AKShare 版 # ============================================================================= # # AKShare 是一个完全免费的开源金融数据接口,无需注册、无需 Token。 # 覆盖 A 股/港股/美股/期货/外汇/宏观经济等数据,是目前个人投资者 # 最实用的免费数据源。 # # Prerequisites / 安装: # pip install akshare pandas numpy matplotlib # # Topics covered / 涵盖主题: # §1 AKShare 简介与基本用法 # §2 获取 A 股行业 ETF 日线数据(轮动策略核心输入) # §3 获取宽基指数数据(沪深300 / 中证500) # §4 获取北向资金数据(情绪因子数据源) # §5 获取融资融券数据(杠杆情绪数据源) # §6 真实数据的清洗陷阱(与合成数据的关键差异) # §7 数据存储:构建本地研究数据库 # §8 数据质量报告生成 # ============================================================================= from __future__ import annotations import warnings warnings.filterwarnings("ignore") import numpy as np import pandas as pd from datetime import datetime, timedelta import time import os # ── 绕过系统代理,直连东方财富 ── # macOS 上的 Clash/小火箭等代理工具的出口 IP 容易被东方财富反爬封禁。 # 设置环境变量让 requests 库对 eastmoney.com 不走代理。 os.environ["NO_PROXY"] = os.environ.get("NO_PROXY", "") + ",eastmoney.com,eastmoneyfutures.com,10jqka.com.cn" os.environ["no_proxy"] = os.environ["NO_PROXY"] # ── 检查 akshare 是否安装 ── try: import akshare as ak print(f"✓ AKShare 版本: {ak.__version__}") except ImportError: print("❌ AKShare 未安装。请在 trading conda 环境中执行:") print(" conda activate trading") print(" pip install akshare") raise print("=" * 68) print(" 真实数据获取演示 — AKShare 版") print(" Real Data Acquisition Demo with AKShare") print("=" * 68) # ══════════════════════════════════════════════════════════════════════ # §1 AKShare 简介与基本用法 # ══════════════════════════════════════════════════════════════════════ # # AKShare 特点: # • 完全免费,无需注册 # • 数据来源于东方财富、新浪财经、交易所等公开接口 # • 函数名遵循 fund_xxx / stock_xxx / macro_xxx 等命名规范 # • 每次调用触发一次 HTTP 请求,批量获取需注意频率 # # 基本用法: # df = ak.function_name(param1=value1, param2=value2) # → 返回 pandas DataFrame # # 文档: https://akshare.akfamily.xyz/ print("\n[§1] AKShare 简介") print(f" 数据接口数量: 数千个 (覆盖股票/基金/期货/外汇/宏观/另类)") print(f" 数据来源: 东方财富、新浪财经、交易所公开接口等") print(f" 费用: 完全免费") # ══════════════════════════════════════════════════════════════════════ # §2 获取 A 股行业 ETF 日线数据 # ══════════════════════════════════════════════════════════════════════ # # 这是 demo_07 (ETF 轮动策略) 的真实数据替代方案。 # 下面获取的 ETF 代码与 demo_07 的 ETF_UNIVERSE 一一对应。 # # AKShare 接口: fund_etf_hist_em() # symbol — ETF 代码 (如 "159995") # period — "daily" / "weekly" / "monthly" # start_date / end_date — "YYYYMMDD" 格式 # adjust — "" (不复权) / "qfq" (前复权) / "hfq" (后复权) # ETF 通常用 "" 或 "qfq" # ============================================================================= print("\n[§2] 获取行业 ETF 日线数据") # A 股行业 ETF 池 — 与 demo_07 对应 ETF_UNIVERSE = { "159995": "芯片ETF", "159819": "人工智能ETF", "516160": "新能源ETF", "159928": "消费ETF", "512170": "医疗ETF", "512880": "证券ETF", # 宽基 ETF (新增) "510300": "沪深300ETF", "510500": "中证500ETF", # 国债 ETF (避险资产) "511010": "国债ETF", } START_DATE = "20190101" END_DATE = "20250601" # ── 安全获取单只 ETF 数据 (带重试和限速) ── def fetch_etf_safe(code: str, start: str, end: str, max_retries: int = 3): """ 安全获取 ETF 日线数据,失败时自动重试并控制请求频率。 网络抖动或东方财富接口偶发不稳定时非常必要。 """ for attempt in range(max_retries): try: df = ak.fund_etf_hist_em( symbol=code, period="daily", start_date=start, end_date=end, adjust="qfq", # 前复权 ) return df except Exception as e: err_msg = str(e) if "ProxyError" in err_msg or "RemoteDisconnected" in err_msg: wait = (attempt + 1) * 5 # 代理/连接问题,等更久 print(f"\n ⚠ {code} 连接被拒绝 (代理/反爬),{wait}s 后重试...") else: wait = 2.0 print(f"\n ⚠ {code} 第 {attempt+1} 次获取失败: {e}") if attempt < max_retries - 1: time.sleep(wait) print(f"\n ✗ {code} 全部 {max_retries} 次获取失败,跳过") return None etf_data = {} for i, (code, name) in enumerate(ETF_UNIVERSE.items()): if i > 0: # 请求间隔 + 随机抖动,避免触发东方财富反爬 time.sleep(2.0 + np.random.uniform(0, 1.5)) print(f" 获取 {code} ({name})...", end=" ") df = fetch_etf_safe(code, START_DATE, END_DATE) if df is not None: etf_data[code] = df print(f"{len(df)} 行, {df['日期'].iloc[0]} ~ {df['日期'].iloc[-1]}") else: print("失败 ✗") print(f"\n 成功获取: {len(etf_data)} / {len(ETF_UNIVERSE)} 只 ETF") # ── 构建价格面板 (Multi-Asset Price Panel) ── # 这是最关键的步骤:将多只 ETF 的收盘价对齐到统一的日期轴上。 # 真实数据中,不同 ETF 可能有不同的上市日期和停牌日期, # 对齐方式的选择直接影响后续所有分析。 if etf_data: price_panel = pd.DataFrame() for code, df in etf_data.items(): # 设置日期索引,提取收盘价 df_indexed = df.set_index("日期")["收盘"].copy() df_indexed.name = code price_panel = pd.concat([price_panel, df_indexed], axis=1) # 确保日期索引排序 price_panel.index = pd.to_datetime(price_panel.index) price_panel = price_panel.sort_index() print(f"\n 价格面板: {price_panel.shape[0]} 个交易日 × {price_panel.shape[1]} 只 ETF") print(f" 日期范围: {price_panel.index[0].date()} ~ {price_panel.index[-1].date()}") print(f" 各 ETF 起止日期:") for col in price_panel.columns: valid = price_panel[col].dropna() if len(valid) > 0: print(f" {col} ({ETF_UNIVERSE.get(col, '?')}): {valid.index[0].date()} ~ {valid.index[-1].date()}, {len(valid)} 天") # ══════════════════════════════════════════════════════════════════════ # §3 获取宽基指数数据 # ══════════════════════════════════════════════════════════════════════ # # AKShare 接口: stock_zh_index_daily() # symbol — 指数代码: # "sh000300" → 沪深300 # "sh000905" → 中证500 # "sz399006" → 创业板指 # "sh000016" → 上证50 # ============================================================================= print("\n[§3] 获取宽基指数数据") INDEX_CODES = { "sh000300": "沪深300", "sh000905": "中证500", "sz399006": "创业板指", "sh000016": "上证50", } index_data = {} for symbol, name in INDEX_CODES.items(): print(f" 获取 {symbol} ({name})...", end=" ") try: df = ak.stock_zh_index_daily(symbol=symbol) index_data[symbol] = df print(f"{len(df)} 行, {df['date'].iloc[0]} ~ {df['date'].iloc[-1]}") time.sleep(0.3) except Exception as e: print(f"失败: {e}") # ── 计算指数滚动估值参考 (PE/PB 分位数需另接接口,此处展示收益基座) ── if index_data: # 对齐为面板 index_panel = pd.DataFrame() for sym, df in index_data.items(): s = df.set_index("date")["close"].copy() s.name = sym index_panel = pd.concat([index_panel, s], axis=1) index_panel.index = pd.to_datetime(index_panel.index) index_panel = index_panel.sort_index() # 计算滚动年化收益 (252 日) rolling_ret = index_panel.pct_change(252).dropna() if len(rolling_ret) > 0: print(f"\n 最新滚动年化收益 ({rolling_ret.index[-1].date()}):") for col in rolling_ret.columns: name = INDEX_CODES.get(col, col) val = rolling_ret[col].iloc[-1] print(f" {name}: {val:+.2%}") # ══════════════════════════════════════════════════════════════════════ # §4 获取北向资金数据(关键情绪因子数据源) # ══════════════════════════════════════════════════════════════════════ # # 北向资金 (North-bound Capital Flow) 指通过沪深港通从香港流入 A 股 # 的外资。净买入额被视为外资对 A 股情绪的"投票机"。 # # AKShare 接口: stock_hsgt_hist_em() → 沪深港通历史资金流向 # stock_hsgt_north_net_flow_in_em() → 北向净流入 # ============================================================================= print("\n[§4] 获取北向资金数据") try: # 北向资金: stock_hsgt_hist_em 获取沪股通/深股通历史资金流向 for market, label in [("沪股通", "沪股通(北向)"), ("深股通", "深股通(北向)")]: try: hsgt = ak.stock_hsgt_hist_em(symbol=market) if hsgt is not None and len(hsgt) > 0: print(f" {label}: {hsgt.shape}") print(f" 列名: {list(hsgt.columns)}") print(f" 最新 3 行:\n{hsgt.tail(3)}") time.sleep(1.5) # 避免触发反爬 except Exception as e: print(f" {label} 获取失败: {e}") except Exception as e: print(f" 北向资金获取失败: {e}") print(f" 提示: AKShare 接口可能已更新,请查阅最新文档") # ══════════════════════════════════════════════════════════════════════ # §5 获取融资融券数据(杠杆情绪数据源) # ══════════════════════════════════════════════════════════════════════ # # 融资余额 (Margin Balance) 反映散户借钱买股的意愿。 # 融资余额大幅上升 → 散户看多情绪高涨(注意:极端值往往是反向指标) # 融资余额骤降 → 恐慌去杠杆,市场底部特征之一 # # AKShare 接口: stock_margin_sz() / stock_margin_sh() → 深圳/上海融资融券明细 # stock_margin_detail_sse() → 上交所融资融券汇总 # ============================================================================= print("\n[§5] 获取融资融券数据") try: # 上交所融资融券汇总 (日频) # stock_margin_sse: 历史序列 (start_date/end_date) # stock_margin_detail_sse: 单日快照 (date='YYYYMMDD') margin_sse = ak.stock_margin_sse(start_date="20240101", end_date="20250601") if margin_sse is not None and len(margin_sse) > 0: print(f" 上交所融资融券: {margin_sse.shape}") print(f" 列名: {list(margin_sse.columns)}") print(f" 最新数据:\n{margin_sse.tail(3)}") else: print(" 未获取到融资融券数据") except Exception as e: print(f" 融资融券获取失败: {e}") print(f" 说明: 此接口可能需要特定参数格式,请查阅文档") print(f" 备选: 可直接使用东方财富网页爬取") # ══════════════════════════════════════════════════════════════════════ # §6 真实数据的清洗陷阱 # ══════════════════════════════════════════════════════════════════════ # # 合成数据 (Demo 系列的 GBM/Factor Model) 没有这些问题: # ① 不同标的上市日期不同 → 面板前段大量 NaN # ② 停牌 (suspension) → 面板中间出现 NaN 缺口 # ③ 涨跌停日 → 价格"真实"但无法成交 # ④ ETF 分红/拆分 → 不复权价格存在跳跃 # ⑤ 数据源异常 → 极少数日期的 OHLC 数据错乱 # ============================================================================= print("\n[§6] 真实数据清洗演示") if etf_data: # ── 6-A: 缺失值分析 ── print(f"\n 6-A 缺失值分析:") missing_pct = price_panel.isna().mean().sort_values(ascending=False) for code in missing_pct.index[:5]: pct = missing_pct[code] name = ETF_UNIVERSE.get(code, code) print(f" {code} {name}: {pct:.1%} 缺失") # ── 6-B: 前向填充 (仅 ffill, 禁止 bfill!) ── # bfill 会用"未来还没发生的数据"填补今天的空值 → Look-Ahead Bias print(f"\n 6-B 缺失值处理 (ffill only):") missing_before = price_panel.isna().sum().sum() clean_prices = price_panel.ffill() # 步骤1: 前向填充 # 步骤2: 丢弃早期所有标的都 NaN 的行(通常是面板最前面) clean_prices = clean_prices.dropna(how='any') missing_after = clean_prices.isna().sum().sum() print(f" 填充前 NaN: {missing_before} → 填充后 NaN: {missing_after}") print(f" 丢弃了 {len(price_panel) - len(clean_prices)} 个早期交易日") # ── 6-C: 收益率计算与异常值标记 ── print(f"\n 6-C 收益率与涨跌停标记:") returns = clean_prices.pct_change() # 涨跌停检测: A 股 ETF 理论涨跌停 ±10% LIMIT_PCT = 0.10 limit_up = (returns >= LIMIT_PCT - 0.002).sum() limit_down = (returns <= -LIMIT_PCT + 0.002).sum() for code in returns.columns[:5]: up = limit_up[code] dn = limit_down[code] if up > 0 or dn > 0: name = ETF_UNIVERSE.get(code, code) print(f" {code} {name}: 涨停 {up} 天, 跌停 {dn} 天") # ── 6-D: 数据连续性检查 ── print(f"\n 6-D 数据连续性检查:") date_gaps = 0 for col in clean_prices.columns: ser = clean_prices[col].dropna() if len(ser) < 2: continue # 检查自然日连续性 → 非交易日(周末/假日)是正常的 trading_gaps = ser.index.to_series().diff().dt.days long_gaps = trading_gaps[trading_gaps > 7] # 连续停牌 > 7 天 if len(long_gaps) > 0: name = ETF_UNIVERSE.get(col, col) print(f" {col} {name}: {len(long_gaps)} 次连续停牌 > 7 天") date_gaps += len(long_gaps) if date_gaps == 0: print(f" 所有 ETF 无连续停牌 > 7 天 ✓") # ══════════════════════════════════════════════════════════════════════ # §7 数据存储:构建本地研究数据库 # ══════════════════════════════════════════════════════════════════════ # # 存储原则是"存原始数据 + 存清洗后数据",不要只存一个。 # 原因: 你以后可能想换一种清洗方式,而原始数据丢了就回不去了。 # ============================================================================= print("\n[§7] 数据存储") DATA_DIR = os.path.join(os.path.dirname(__file__), "data") os.makedirs(DATA_DIR, exist_ok=True) if etf_data: # 保存清洗后的价格面板 (研究级) clean_path = os.path.join(DATA_DIR, "etf_price_panel_clean.csv") clean_prices.to_csv(clean_path) print(f" ✓ 清洗后价格面板 → {clean_path}") # 保存收益率面板 returns_path = os.path.join(DATA_DIR, "etf_returns_panel.csv") returns.to_csv(returns_path) print(f" ✓ 收益率面板 → {returns_path}") if index_data and len(index_panel) > 0: index_path = os.path.join(DATA_DIR, "index_prices.csv") index_panel.to_csv(index_path) print(f" ✓ 指数价格 → {index_path}") print(f"\n 数据目录: {DATA_DIR}") print(f" 文件列表: {os.listdir(DATA_DIR) if os.path.exists(DATA_DIR) else '无'}") # ══════════════════════════════════════════════════════════════════════ # §8 数据质量报告 # ══════════════════════════════════════════════════════════════════════ # # 每次拉取数据后,生成一份质量报告,记录: # - 各标的覆盖日期范围 # - 缺失值比例 # - 极端收益率事件 # - 数据拉取时间戳 # 这对后期回溯调试非常有用——"我用的数据是哪天拉的?有什么已知缺陷?" # ============================================================================= print("\n[§8] 数据质量报告") if etf_data: report_lines = [] report_lines.append("=" * 60) report_lines.append("数据质量报告 / Data Quality Report") report_lines.append(f"生成时间: {datetime.now().isoformat()}") report_lines.append(f"数据源: AKShare (fund_etf_hist_em)") report_lines.append("=" * 60) for code in clean_prices.columns: name = ETF_UNIVERSE.get(code, code) ser = clean_prices[code].dropna() ret_ser = returns[code].dropna() n = len(ser) missing_input = price_panel[code].isna().sum() report_lines.append( f"\n{code} ({name}):" f"\n 日期范围: {ser.index[0].date()} ~ {ser.index[-1].date()} ({n} 天)" f"\n 原始缺失: {missing_input} 天 ({missing_input/len(price_panel):.1%})" f"\n 日均收益率: {ret_ser.mean():+.6f}" f"\n 日波动率: {ret_ser.std():.6f}" f"\n 最大日涨幅: {ret_ser.max():+.2%}" f"\n 最大日跌幅: {ret_ser.min():+.2%}" ) # 打印报告 report_text = "\n".join(report_lines) print(report_text) # 保存报告 report_path = os.path.join(DATA_DIR, "data_quality_report.txt") with open(report_path, "w", encoding="utf-8") as f: f.write(report_text) print(f"\n ✓ 质量报告已保存 → {report_path}") print("\n" + "=" * 68) print(" ✓ AKShare 数据获取 Demo 完成") print("=" * 68) print(f""" 关键收获: 1. AKShare 完全免费,覆盖 ETF / 指数 / 资金流向 / 融资融券 2. 真实数据需要处理: 上市日期不一致、停牌缺口、涨跌停标记 3. 填充缺失值只用 ffill() —— bfill() 会引入 Look-Ahead Bias 4. 数据存储时同时保留"原始"和"清洗后"两个版本 5. 每次拉取数据都应生成质量报告,记录已知缺陷 下一步: 将清洗后的 ETF 价格面板替换 demo_07 的合成数据, 重新运行 ETF 轮动策略回测,观察真实数据下的策略表现。 """)