下载更新通达信行情数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# !/usr/bin/python3
# coding=utf-8
# date=20231117
# name=下载更新通达信行情数据

from pytdx.hq import TdxHq_API
from pytdx.hq import TDXParams
from pytdx.crawler.history_financial_crawler import HistoryFinancialListCrawler
from pytdx.crawler.history_financial_crawler import HistoryFinancialCrawler
from pytdx.reader import HistoryFinancialReader
from pytdx.crawler.base_crawler import demo_reporthook
import pandas as pd
from datetime import date, datetime, timedelta
import os
import re
from chinese_calendar import is_workday


api = TdxHq_API(multithread=True, heartbeat=True, auto_retry=True)

def downkdata(code,startdate,enddate,file):
data = api.get_k_data(code,startdate,enddate)
if data.empty:
pass
else:
data.to_csv(file,encoding="utf-8-sig")
print(f"{code} 下载数据 {len(data)} 行")

def updatekdate(code,startdate,enddate,file):
data = api.get_k_data(code,startdate,enddate)
if data.empty:
pass
else:
data.to_csv(file,mode="a",header=0,encoding="utf-8-sig")
print(f"{code} 更新数据 {len(data)} 行")

def get_workdate(olddate):
workday = olddate + timedelta(days=1)
while not is_workday(pd.to_datetime(f"{workday:%Y-%m-%d}")):
workday += timedelta(days=1)
return workday

下载更新K线数据

  1. 行情数据存放目录:"D:\Stock_data\kdata
  2. 所有的股票代码及名称等信息文件:"D:\stock_data\allstock.csv"
  3. 读取股票代码名称文件,获取全部股票代码sr
1
2
3
fd = r"D:\Stock_data\kdata"
df_code = pd.read_csv(r"D:\stock_data\allstock.csv", dtype={"代码": str})
df_code.head()
代码 名称 细分行业 地区
0 000001 平安银行 股份制银行 深圳
1 000002 万 科A 住宅开发 深圳
2 000004 国华网安 基础软件 深圳
3 000005 ST星源 水治理 深圳
4 000006 深振业A 住宅开发 深圳
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
with api.connect('124.71.9.153', 7709):
data = pd.DataFrame()
for code in df_code.代码:
filename = f"{code}.csv"
file = os.path.join(fd,filename)
if os.path.exists(file) and os.path.getsize(file)>=100:
olddate = pd.read_csv(file, dtype={"code": str},parse_dates=['date']).iloc[-1, 0]
startdate = f"{get_workdate(olddate):%Y-%m-%d}"
enddate = f"{date.today():%Y-%m-%d}"
if startdate < enddate:
updatekdate(code=code,startdate=startdate,enddate=enddate,file=file)
elif startdate == enddate:
if datetime.now().hour < 16:
updatekdate(code=code,startdate=startdate,enddate=enddate,file=file)
else:
pass
else:
startdate = "2020-01-01"
enddate = f"{date.today():%Y-%m-%d}"
try:
downkdata(code,startdate,enddate,file)
except:
print(code,'无数据可下载!')
pass
001326 下载数据 7 行
001376 下载数据 11 行
001378 下载数据 15 行
301489 下载数据 19 行
301517 下载数据 24 行
301555 下载数据 14 行
001306 下载数据 2 行
301568 无数据可下载!
603107 下载数据 13 行
603107 下载数据 13 行
603193 下载数据 24 行
603273 下载数据 20 行
603361 无数据可下载!
688648 下载数据 5 行
688653 下载数据 1 行

转化成.py 文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# !/usr/bin/python3
# coding=utf-8
# date=20231117
# name=下载更新通达信行情数据

from pytdx.hq import TdxHq_API
from pytdx.hq import TDXParams
from pytdx.crawler.history_financial_crawler import HistoryFinancialListCrawler
from pytdx.crawler.history_financial_crawler import HistoryFinancialCrawler
from pytdx.reader import HistoryFinancialReader
from pytdx.crawler.base_crawler import demo_reporthook
import pandas as pd
from datetime import date, datetime, timedelta
import os
import re
from chinese_calendar import is_workday


api = TdxHq_API(multithread=True, heartbeat=True, auto_retry=True)

def downkdata(code,startdate,enddate,file):
data = api.get_k_data(code,startdate,enddate)
if data.empty:
pass
else:
data.to_csv(file,encoding="utf-8-sig")
print(f"{code} 更新数据 {len(data)} 行")

def updatekdate(code,startdate,enddate,file):
data = api.get_k_data(code,startdate,enddate)
if data.empty:
pass
else:
data.to_csv(file,mode="a",header=0,encoding="utf-8-sig")
print(f"{code} 更新数据 {len(data)} 行")

def get_workdate(olddate):
workday = olddate + timedelta(days=1)
while not is_workday(pd.to_datetime(f"{workday:%Y-%m-%d}")):
workday += timedelta(days=1)
return workday

with api.connect('124.71.9.153', 7709):
data = pd.DataFrame()
for code in df_code.代码:
filename = f"{code}.csv"
file = os.path.join(fd,filename)
if os.path.exists(file) and os.path.getsize(file)>=100:
olddate = pd.read_csv(file, dtype={"code": str},parse_dates=['date']).iloc[-1, 0]
startdate = f"{get_workdate(olddate):%Y-%m-%d}"
enddate = f"{date.today():%Y-%m-%d}"
if startdate < enddate:
updatekdate(code=code,startdate=startdate,enddate=enddate,file=file)
elif startdate == enddate:
if datetime.now().hour < 16:
updatekdate(code=code,startdate=startdate,enddate=enddate,file=file)
else:
pass
else:
startdate = "2020-01-01"
enddate = f"{date.today():%Y-%m-%d}"
try:
downkdata(code,startdate,enddate,file)
except:
print(code,'无数据可下载!')
pass
301568 无数据可下载!
603361 无数据可下载!

读取本地通达信软件下载的行情数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from pytdx.reader import TdxDailyBarReader, TdxFileNotFoundException
import re,os
reader = TdxDailyBarReader()

code = '600004'
def get_filepath(code):
"""
根据6位数字的股票代码获取完整的日k线数据路径
"""
if code.startswith('6',0,1):
code = 'sh' + code
elif code.startswith('0',0,1) or code.startswith('3',0,1):
code = 'sz' + code
fd1 = r'D:\new_tdx\vipdoc'
fd2 = f"{re.match(r'[a-z]+',code).group()}\lday\{code}.day"
return os.path.join(fd1,fd2)

df = reader.get_df(get_filepath(code))
df.tail()
open high low close amount volume
date
2023-11-13 10.84 10.89 10.72 10.80 83254888.0 77213.47
2023-11-14 10.81 10.93 10.78 10.91 104742512.0 96441.44
2023-11-15 11.01 11.04 10.87 10.91 96345712.0 88116.11
2023-11-16 10.92 10.93 10.81 10.86 70977744.0 65407.95
2023-11-17 10.86 10.95 10.83 10.91 79382128.0 72868.78