Python 从同花顺软件中抓取历史数据

之前写过一篇《从同花顺软件中抓取上证指数成交明细》的博客,其实这一篇也大同小异,只不过是用Python实现的,而且是专门用来抓取分钟级别涨跌值的。

以抓上证指数为例,说一下大致的工作流程。在同花顺中,切换到上证指数的日K线图之后,双击一个蜡烛,就会弹出当天的分时走势:

此时,如果把焦点锁定到分时图所在的窗口上,然后按键盘'right'键,就会出现一个小Bar显示09:30的数据:

接下来每按一次'right',都会显示下一分钟的数据,直到15:00为止。根据《从同花顺软件中抓取上证指数成交明细》中的实验我们也知道,同花顺里面显示这些数据的字体都是一样的。于是可以是用最最原始的屏幕字符识别(OCR)来把每一个Bar里面的数据“抠”出来。这样,一天的分时数据就有了。

那么采完一天的数据之后,怎么切换到前一天的数据呢?《从同花顺软件中抓取上证指数成交明细》中,我是每一天成交明细采完以后手工切换到上一天。如今我越来越懒了,非得自动不行。方法也是一样的——让后面K线图的窗口获得焦点,然后按一下'left'键,那么成交明细的窗口就切换到上一个交易日了。

======================阶段一:OCR====================

先来解决最核心的屏幕识别吧。屏幕识别的原理可以参考《从同花顺软件中抓取上证指数成交明细》的第一节。本文中我就直接上代码了:

import os
from PIL import Image,ImageFile

class HexinOCR:

    def __init__(self):
        #图片的名称
        filenames=(*[str(n) for n in range(10)],'dot','colon','add','min')
        dirname=os.path.dirname(__file__)
        #打开图片,并且转换为0-1二维数组
        self.__images=tuple([self.__to_binary_map(Image.open(dirname+'\\'+d+'.png')) for d in filenames])
        #每个图片对应的字符
        self.__digits=(*[str(n) for n in range(10)],'.',':','+','-')

    #传入一张图片,识别所有的字符
    def recognize(self,im):
        if not isinstance(im,(Image.Image,ImageFile.ImageFile)):
            raise TypeError('Param <im> is not an image')
        text_lst=[]
        bm=self.__to_binary_map(im)
        w,h=self.__get_binary_map_size(bm)
        #逐行遍历
        y=0
        while y<h:
            for x in range(w):
                #如果某行存在1,则试图识别
                if bm[y][x]!=0:
                    text,line_h=self.__recognize_line(bm,y)
                    #确实有文字,则跳过一个行高
                    if text!='':
                        y+=line_h
                        text_lst.append(text)
                    break
            y+=1
        return text_lst

    #给定一张0-1二维数组和行号,试图识别改行上的字符串
    def __recognize_line(self,bm,start_y):
        w,h=self.__get_binary_map_size(bm)
        text=''
        line_h=0
        x=0
        #遍历每一个x
        while x<w:
            #穷举每一个字符
            for i in range(len(self.__digits)):
                sub=self.__images[i]
                #如果能够重合,则识别,加上一个字宽
                if self.__has_sub_image(bm,x,start_y,sub):
                    text+=self.__digits[i]
                    sw,sh=self.__get_binary_map_size(sub)
                    line_h=max(line_h,sh)
                    x+=sw
                    break
            x+=1
        return text,line_h

    #判断在bm的start_x,start_y位置是否存在一张子图和sub重合
    def __has_sub_image(self,bm,start_x,start_y,sub):
        w,h=self.__get_binary_map_size(bm)
        sw,sh=self.__get_binary_map_size(sub)
        if start_x+sw>w or start_y+sh>h:
            return False
        for y in range(sh):
            if sub[y]!=bm[start_y+y][start_x:start_x+sw]:
                return False
        return True

    #把图片转换成0-1二维数组
    def __to_binary_map(self,im):
        w,h=im.size
        bm=[[0]*w for i in range(h)]
        raw=im.getdata()
        for y in range(h):
            for x in range(w):
                p=raw[y*w+x][0:3]
                bm[y][x]=(0 if p==(0,0,0) else 1)
        return bm

    def __get_binary_map_size(self,bm):
        return len(bm[0]),len(bm)

另外14张图片也需要和该py文件放在同一个目录:

比如这样一张图片,存放到D:/test.png:

那么编写这么一个测试用例:

ocr=HexinOCR()
img=Image.open('D:/test.png')
texts=ocr.recognize(img)
print(texts)

可以看到输出:

=======================阶段二:自动操作===================

最核心的OCR解决之后,就只需要能够让程序自动操作就行了。然而说着简单,其实很复杂,因为要考虑的因素太多了。

import os,time,re
import win32api,win32gui,win32con
from PIL import Image,ImageFile,ImageGrab

class HexinFetch:

    def __init__(self,bbox_min,bbox_vals):
        #获取K线图窗口
        kwin_lst=self.__find_wins_by_title('^同花顺\(v\\d+\\.\\d+\\.\\d+\\)')
        if len(kwin_lst)==0:
            raise IOError('No K line window found')
        elif len(kwin_lst)>1:
            raise IOError('More than one K line window found: '+str(kwin_lst))
        self.__kwin=kwin_lst[0][0]
        #获取分时图窗口
        twin_lst=self.__find_wins_by_title('.+\(\\d{6}\) \\d{4}-\\d{2}-\\d{2}')
        if len(twin_lst)==0:
            raise IOError('No tick window found')
        elif len(twin_lst)>1:
            raise IOError('More than one tick window found: '+str(twin_lst))
        self.__twin=twin_lst[0][0]
        #获取K线图的Panel
        kpanel=self.__get_child_by_ids(self.__kwin,[0xE900])
        if kpanel==0:
            raise IOError('Cannot get k line panel')
        #获取分时图的Panel
        tpanel=self.__get_child_by_ids(self.__twin,[0xE801,0])
        if tpanel==0:
            raise IOError('Cannot get tick panel')
        #获取分时图Panel的屏幕坐标
        (left,top,right,bottom)=win32gui.GetWindowRect(tpanel)
        #计算要截图的矩形的屏幕坐标
        self.__bbox_load=(left+35,top+15,left+65,top+30)
        self.__bbox_ready=(left,top+15,left+15,top+30)
        if not isinstance(bbox_min,(tuple,list)) or len(bbox_min)!=4:
            raise TypeError('Param <bbox_min> is not a 4-tuple or 4-list')
        self.__bbox_min=(left+bbox_min[0],top+bbox_min[1],left+bbox_min[2],top+bbox_min[3])
        if not isinstance(bbox_vals,(tuple,list)):
            raise TypeError('Param <bbox_vals> is not a tuple or list')
        self.__bbox_vals=[]
        for bbox_val in bbox_vals:
            if not isinstance(bbox_val,(tuple,list)) or len(bbox_val)!=4:
                raise TypeError('Param <bbox_vals> is not a tuple of 4-tuple or 4-list')
            self.__bbox_vals.append((left+bbox_val[0],top+bbox_val[1],left+bbox_val[2],top+bbox_val[3]))
        self.__ocr=HexinOCR()

    #抓取一天的数据,data_lim表示至少在哪天之前,half表示是否只抓取半天
    def fetch_day(self,date_lim,half=False):
        #挪动到指定的日期
        date,last_date=None,None
        while True:
            date=self.__get_date()
            if date<=date_lim:
                break
            self.__press_key(self.__kwin,'left',0.2,0.5)
            date=self.__get_date()
            if date==last_date:
                time.sleep(5)
            last_date=date
        #等到分时图加载完毕
        while self.__is_bbox_empty(self.__bbox_load):
            time.sleep(1)
        #等到Bar显示完毕
        while True:
            self.__press_key(self.__twin,'right',0.5,0.5)
            if not self.__is_bbox_empty(self.__bbox_ready):
                break
            time.sleep(1)
        #要抓取的分钟
        morning=['%02d:%02d' % (int((m+30)/60)+9,(m+30)%60) for m in range(2*60)]
        afternoon=['%02d:%02d' % (int((m+1)/60)+13,(m+1)%60) for m in range(2*60)]
        minutes=(*morning,'11:30') if half else (*morning,'11:30',*afternoon)
        table=[]
        #依次抓取每一分钟
        for m in minutes:
            while True:
                minute=self.__recognize_grab(ImageGrab.grab(self.__bbox_min))
                if minute<m:
                    self.__press_key(self.__twin,'right',0,0.3)
                elif minute>m:
                    self.__press_key(self.__twin,'left',0,0.1)
                else:
                    grabs=[ImageGrab.grab(bbox) for bbox in self.__bbox_vals]
                    self.__press_key(self.__twin,'right',0,0.05)
                    values=[self.__recognize_grab(grab) for grab in grabs]
                    checks=[float(v) for v in values]
                    break
            if self.__get_date()!=date:
                return None
            row=(minute,*values)
            print(row)
            table.append(row)
        return date,table

    #遍历顶层窗口,得到名称符合正则表达式的窗口
    def __find_wins_by_title(self,tregx):
        hwnd_lst=[]
        regx=re.compile(tregx)
        def on_enum_window(hwnd,extra):
            title=win32gui.GetWindowText(hwnd)
            if regx.match(title):
                hwnd_lst.append((hwnd,title))
        win32gui.EnumWindows(on_enum_window,0)
        return hwnd_lst

    #根据id在指定窗口中逐层找到控件
    def __get_child_by_ids(self,hwnd,path):
        for id in path:
            hwnd=win32gui.GetDlgItem(hwnd,id)
        return hwnd

    #按下按键'left'或'right'
    def __press_key(self,win,key,delay_before,delay_after):
        win32gui.SetForegroundWindow(win)
        time.sleep(delay_before)
        m={'left':37,'right':39}
        key=m.get(key)
        win32api.keybd_event(key,0,0,0)
        win32api.keybd_event(key,0,win32con.KEYEVENTF_KEYUP,0)
        time.sleep(delay_after)

    #从分时图窗口的标题中提取出日期
    def __get_date(self):
        title=win32gui.GetWindowText(self.__twin)
        regx=re.compile(' (\\d{4}-\\d{2}-\\d{2}) ')
        date=regx.findall(title)
        if len(date)!=1:
            raise IOError('Wrong date '+str(date))
        return date[0]

    #判断一个矩形区域内是否为空
    def __is_bbox_empty(self,bbox):
        im=ImageGrab.grab(bbox)
        raw=im.getdata()
        for p in raw:
            if p[0:3]!=(0,0,0):
                return False
        return True

    #识别一个截图中的一行文字
    def __recognize_grab(self,im):
        text=self.__ocr.recognize(im)
        if len(text)!=1:
            raise IOError('Count is not 1, bbox may be wrong')
        return text[0]

此时写一个测试用例:

fetch=HexinFetch((10,35,55,55),((0,70,70,85),(0,130,70,145)))
result=fetch.fetch_day('2017-03-01')

运行后就能看的惊喜——程序先把分时图的日期调到2017-03-01,然后开始采集数据!可以查看视频

======================阶段三:最后一层封装=================

最后一层封装的作用是实现连续地采集数据,而且能够把采集的数据到放到一个文件夹中。

import sys,time,datetime,os,winsound,hexin

#给定一个'YYYY-mm-dd'形式的日期,得到上一天的日期
def get_previous_date(date):
    date=datetime.datetime.strptime(date,'%Y-%m-%d')
    date-=datetime.timedelta(days=1)
    return date.strftime('%Y-%m-%d')

#文件目录
output_dir='D:/szzs_min_k_p'

if len(sys.argv)==1:
    files=os.listdir(output_dir)
    if len(files)>0:
        files.sort()
        date=get_previous_date(os.path.splitext(os.path.split(files[0])[1])[0])
    else:
        date=datetime.datetime.now().strftime('%Y-%m-%d')
else:
    date=sys.argv[1]
    try:
        datetime.datetime.strptime(date,'%Y-%m-%d')
    except ValueError:
        raise IOError('<date> must in format of YYYY-mm-dd')

fetch=hexin.HexinFetch((10,35,55,55),((0,70,70,85),(0,130,70,145)))

while True:
    try:
        print(date)
        result=fetch.fetch_day(date,False)
        if result!=None:
            date,table=result
            file=open('%s/%s.txt' % (output_dir,date),'w')
            for row in table:
                file.write('\t'.join(row)+'\n')
            file.close()
            date=get_previous_date(date)
        else:
            raise IOError('Wrong date %s' % date)
    except KeyboardInterrupt:
        break
    except:
        winsound.Beep(1000,1000)

在D盘建一个目录szzs_min_k_p,然后运行程序,就会看到程序一天天把历史分钟数值记录下来,一个交易日对应一个文件。

完整的代码可以从这里下载。