汇总或更新历史财报数据

1
2
3
4
5
6
7
8
9
10
11
12
from pytdx.crawler.history_financial_crawler import HistoryFinancialCrawler,HistoryFinancialListCrawler
from pytdx.hq import TdxHq_API
from pytdx.reader import HistoryFinancialReader
from pytdx.crawler.base_crawler import demo_reporthook

import pandas as pd
from datetime import date, datetime, timedelta
import os,re,random
from chinese_calendar import is_workday

api = TdxHq_API(multithread=True, heartbeat=True, auto_retry=True)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
def updateallfns():
'''更新汇总的财务报表数据'''
# 汇总的财报文件中的最新日期
file = 'D:\\stock_data\\allfns.csv'
allfns = pd.read_csv(file,index_col='date',dtype={'code':str},parse_dates=['date'])
newdate_allfns = max(allfns.index)
# 获取已经下载的最新的文件的财报日期
fnsfold = r"D:\stock_data\fdata"
localfnsfiles = [i for i in os.listdir(fnsfold) if re.search('\.zip$',i)]
fnsfilesdates = [re.sub(r'\D','',i) for i in os.listdir(fnsfold) if re.search('\.zip$',i)]
newdate_fnsfiles = pd.to_datetime(max(fnsfilesdates))
# 判断是否需要更新allfns文件
updatefiles = [i for i in localfnsfiles if re.findall(r'\d+',i)[0] > f"{newdate_allfns:%Y%m%d}"]
if not updatefiles:
pass
else:
for filename in updatefiles:
zipfile = os.path.join(fnsfold,filename)
cols = ['report_date', 'col1', 'col4', 'col6']
single_fns = HistoryFinancialReader().get_df(zipfile).filter(cols)
single_fns.reset_index().to_csv(file,mode='a',header=0,index=False,encoding='utf-8-sig')


def total_fnsfile():
'''汇总所有的财报数据文件'''
# 列出已经下载的所有财报数据文件,2013年后的
fnsfold = r"D:\stock_data\fdata"
filenames = [i for i in os.listdir(fnsfold) if re.search(r'\.zip$',i) and (re.findall('\d+',i)[0]>'20131231')]
random.choices(filenames,k=3)
# 需要筛选的列名称
cols = ['report_date', 'col1', 'col4', 'col6']
total_fns = pd.DataFrame()
for filename in filenames:
file = os.path.join(fnsfold,filename)
# 读取历史财报数据文件
single_fns = HistoryFinancialReader().get_df(file).filter(cols)
# 合并fns_sigle
total_fns = pd.concat([total_fns,single_fns])
# 重新命名列名称
total_fns.columns = ["date", "EPS", "BPS", "ROE"]
# 输出到csv文件
total_fns.to_csv(os.path.join("D:\stock_data",'allfns.csv'),encoding='utf-8-sig')

if __name__ == '__main__':
file = 'D:\\stock_data\\allfns.csv'
if os.path.exists(file):
updateallfns()
else:
total_fnsfile()