123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207 |
- # -*- coding:utf-8 -*-
- __author__ = 'weijie'
- from EmQuantAPI import *
- from datetime import timedelta, datetime
- import time
- import traceback
- import json
- from StockRestPojo import StockRestPojo
- from dbOperation import dbOperation
- from threading import Thread
- import numpy as np
- import db_config
- print("开始了")
- def mainCallback(quantdata):
- """
- mainCallback 是主回调函数,可捕捉如下错误
- 在start函数第三个参数位传入,该函数只有一个为c.EmQuantData类型的参数quantdata
- :param quantdata:c.EmQuantData
- :return:
- """
- print ("mainCallback",str(quantdata))
- print("开始了2222")
- #登录掉线或者 登陆数达到上线(即登录被踢下线) 这时所有的服务都会停止
- if str(quantdata.ErrorCode) == "10001011" or str(quantdata.ErrorCode) == "10001009":
- print ("Your account is disconnect. You can force login automatically here if you need.")
- #行情登录验证失败(每次连接行情服务器时需要登录验证)或者行情流量验证失败时,会取消所有订阅,用户需根据具体情况处理
- elif str(quantdata.ErrorCode) == "10001021" or str(quantdata.ErrorCode) == "10001022":
- print ("Your all csq subscribe have stopped.")
- #行情服务器断线自动重连连续6次失败(1分钟左右)不过重连尝试还会继续进行直到成功为止,遇到这种情况需要确认两边的网络状况
- elif str(quantdata.ErrorCode) == "10002009":
- print ("Your all csq subscribe have stopped, reconnect 6 times fail.")
- # 行情订阅遇到一些错误(这些错误会导致重连,错误原因通过日志输出,统一转换成EQERR_QUOTE_RECONNECT在这里通知),正自动重连并重新订阅,可以做个监控
- elif str(quantdata.ErrorCode) == "10002012":
- print ("csq subscribe break on some error, reconnect and request automatically.")
- # 资讯服务器断线自动重连连续6次失败(1分钟左右)不过重连尝试还会继续进行直到成功为止,遇到这种情况需要确认两边的网络状况
- elif str(quantdata.ErrorCode) == "10002014":
- print ("Your all cnq subscribe have stopped, reconnect 6 times fail.")
- # 资讯订阅遇到一些错误(这些错误会导致重连,错误原因通过日志输出,统一转换成EQERR_INFO_RECONNECT在这里通知),正自动重连并重新订阅,可以做个监控
- elif str(quantdata.ErrorCode) == "10002013":
- print ("cnq subscribe break on some error, reconnect and request automatically.")
- # 资讯登录验证失败(每次连接资讯服务器时需要登录验证)或者资讯流量验证失败时,会取消所有订阅,用户需根据具体情况处理
- elif str(quantdata.ErrorCode) == "10001024" or str(quantdata.ErrorCode) == "10001025":
- print("Your all cnq subscribe have stopped.")
- else:
- pass
- def cstCallBack(quantdata):
- # print(quantdata.Data)
- restData = quantdata.Data
- db = dbOperation(db_config.db_gupiao)
- for key,value in restData.items():
- print(str(key))
- i = len(value) / 33
- listS = np.array(value)
- result = listS.reshape(33, int(i))
- # print(result)
- stockRestPojoList = []
- for j in range(len(result[0])):
- if "None" == str(result[2][j]):
- continue
- stockRestPojo = StockRestPojo()
- stockRestPojo.date = "20201105"
- stockRestPojo.time = str(result[0][j])
- stockRestPojo.now = str(result[1][j])
- stockRestPojo.high = str(result[2][j])
- stockRestPojo.low = str(result[3][j])
- stockRestPojo.open = str(result[4][j])
- stockRestPojo.preclose = str(result[5][j])
- stockRestPojo.roundlot = ""
- stockRestPojo.change = str(value[8])
- stockRestPojo.pctchange = str(value[9])
- stockRestPojo.volume = str(result[6][j])
- stockRestPojo.amount = str(result[7][j])
- stockRestPojo.volumeratio = ""
- stockRestPojo.commissionratio = ""
- stockRestPojo.commissiondiff = ""
- stockRestPojo.tradestatus = ""
- stockRestPojo.outvolume = ""
- stockRestPojo.involume = ""
- stockRestPojo.highlimit = str(result[8][j])
- stockRestPojo.lowlimit = str(result[9][j])
- stockRestPojo.speed = ""
- stockRestPojo.averageprice = ""
- stockRestPojo.buyprice1 = str(result[10][j])
- stockRestPojo.buyprice2 = str(result[11][j])
- stockRestPojo.buyprice3 = str(result[12][j])
- stockRestPojo.buyprice4 = str(result[13][j])
- stockRestPojo.buyprice5 = str(result[14][j])
- stockRestPojo.buyvolume1 = str(result[15][j])
- stockRestPojo.buyvolume2 = str(result[16][j])
- stockRestPojo.buyvolume3 = str(result[17][j])
- stockRestPojo.buyvolume4 = str(result[18][j])
- stockRestPojo.buyvolume5 = str(result[19][j])
- stockRestPojo.sellprice1 = str(result[20][j])
- stockRestPojo.sellprice2 = str(result[21][j])
- stockRestPojo.sellprice3 = str(result[22][j])
- stockRestPojo.sellprice4 = str(result[23][j])
- stockRestPojo.sellprice5 = str(result[24][j])
- stockRestPojo.sellvolume1 = str(result[25][j])
- stockRestPojo.sellvolume2 = str(result[26][j])
- stockRestPojo.sellvolume3 = str(result[27][j])
- stockRestPojo.sellvolume4 = str(result[28][j])
- stockRestPojo.sellvolume5 = str(result[29][j])
- stockRestPojo.closedtime = str(result[30][j])
- stockRestPojo.closedvolume = str(result[31][j])
- stockRestPojo.closedamount = str(result[32][j])
- datTime = ""
- if len(str(result[0][j])) != 6:
- datTime = time.mktime(time.strptime("202011050" + str(result[0][j]), "%Y%m%d%H%M%S"))
- stockRestPojo.realTime = str(int(datTime))
- else:
- datTime = time.mktime(time.strptime("20201105" + str(result[0][j]), "%Y%m%d%H%M%S"))
- stockRestPojo.realTime = str(int(datTime))
- min_ = time.localtime(datTime).tm_min # 获取分钟
- sec_ = time.localtime(datTime).tm_sec # 获取秒数
- # 1分钟数据
- if sec_ != 0:
- tempDatTime = datTime + 60 - sec_
- stockRestPojo.dateOne = str(int(tempDatTime))
- else:
- stockRestPojo.dateOne = str(int(datTime))
- # 5分钟数据
- if sec_ != 0 or min_%5 != 0:
- tempDatTime = datTime + (5 * 60) - sec_ - (min_%5 * 60)
- stockRestPojo.dateFive = str(int(tempDatTime))
- else:
- stockRestPojo.dateFive = str(int(datTime))
- # 15分钟数据
- if sec_ != 0 or min_%15 != 0:
- tempDatTime = datTime + (15 * 60) - sec_ - (min_%15 * 60)
- stockRestPojo.dateFifteen = str(int(tempDatTime))
- else:
- stockRestPojo.dateFifteen = str(int(datTime))
- # 30分钟数据
- if sec_ != 0 or min_%30 != 0:
- tempDatTime = datTime + (30 * 60) - sec_ - (min_%30 * 60)
- stockRestPojo.dateThirty = str(int(tempDatTime))
- else:
- stockRestPojo.dateThirty = str(int(datTime))
- # 60分钟数据
- if sec_ != 0 or min_%60 != 0:
- tempDatTime = datTime + (60 * 60) - sec_ - (min_%60 * 60)
- stockRestPojo.dateSixty = str(int(tempDatTime))
- else:
- stockRestPojo.dateSixty = str(int(datTime))
- j = j + 1
- stockRestPojoList.append(stockRestPojo)
- # sql = "insert into data_rt_" + key.replace(".", "_").lower() + "(`realTime`,`date`,`time`,`now`,`high`,`low`,`open`,`preclose`,`roundlot`,`change`,`pctchange`,`volume`,`amount`,`volumeratio`,`commissionratio`,`commissiondiff`,`tradestatus`,`outvolume`,`involume`,`highlimit`,`lowlimit`,`speed`,`averageprice`,`buyprice1`,`buyprice2`,`buyprice3`,`buyprice4`,`buyprice5`,`buyvolume1`,`buyvolume2`,`buyvolume3`,`buyvolume4`,`buyvolume5`,`sellprice1`,`sellprice2`,`sellprice3`,`sellprice4`,`sellprice5`,`sellvolume1`,`sellvolume2`,`sellvolume3`,`sellvolume4`,`sellvolume5`,`closedtime`,`closedvolume`,`closedamount`,`dateOne`,`dateFive`,`dateFifteen`,`dateThirty`,`dateSixty`,`addTime`) values('" + str(stockRestPojo.realTime) + "','" + str(stockRestPojo.date) + "','" + str(stockRestPojo.time) + "','" + str(stockRestPojo.now) + "','" + str(stockRestPojo.high) + "','" + str(stockRestPojo.low) + "','" + str(stockRestPojo.open) + "','" + str(stockRestPojo.preclose) + "','" + str(stockRestPojo.roundlot) + "','" + str(stockRestPojo.change) + "','" + str(stockRestPojo.pctchange) + "','" + str(stockRestPojo.volume) + "','" + str(stockRestPojo.amount) + "','" + str(stockRestPojo.volumeratio) + "','" + str(stockRestPojo.commissionratio) + "','" + str(stockRestPojo.commissiondiff) + "','" + str(stockRestPojo.tradestatus) + "','" + str(stockRestPojo.outvolume) + "','" + str(stockRestPojo.involume) + "','" + str(stockRestPojo.highlimit) + "','" + str(stockRestPojo.lowlimit) + "','" + str(stockRestPojo.speed) + "','" + str(stockRestPojo.averageprice) + "','" + str(stockRestPojo.buyprice1) + "','" + str(stockRestPojo.buyprice2) + "','" + str(stockRestPojo.buyprice3) + "','" + str(stockRestPojo.buyprice4) + "','" + str(stockRestPojo.buyprice5) + "','" + str(stockRestPojo.buyvolume1) + "','" + str(stockRestPojo.buyvolume2) + "','" + str(stockRestPojo.buyvolume3) + "','" + str(stockRestPojo.buyvolume4) + "','" + str(stockRestPojo.buyvolume5) + "','" + str(stockRestPojo.sellprice1) + "','" + str(stockRestPojo.sellprice2) + "','" + str(stockRestPojo.sellprice3) + "','" + str(stockRestPojo.sellprice4) + "','" + str(stockRestPojo.sellprice5) + "','" + str(stockRestPojo.sellvolume1) + "','" + str(stockRestPojo.sellvolume2) + "','" + str(stockRestPojo.sellvolume3) + "','" + str(stockRestPojo.sellvolume4) + "','" + str(stockRestPojo.sellvolume5) + "','" + str(stockRestPojo.closedtime) + "','" + str(stockRestPojo.closedvolume) + "','" + str(stockRestPojo.closedamount) + "','" + str(stockRestPojo.dateOne) + "','" + str(stockRestPojo.dateFive) + "','" + str(stockRestPojo.dateFifteen) + "','" + str(stockRestPojo.dateThirty) + "','" + str(stockRestPojo.dateSixty) +"', now());"
- # print(sql)
- # input("请输入任意字符 \r\n")
- # db.insert(sql)
- insertKey = "`realTime`,`date`,`time`,`now`,`high`,`low`,`open`,`preclose`,`roundlot`,`change`,`pctchange`,`volume`,`amount`,`volumeratio`,`commissionratio`,`commissiondiff`,`tradestatus`,`outvolume`,`involume`,`highlimit`,`lowlimit`,`speed`,`averageprice`,`buyprice1`,`buyprice2`,`buyprice3`,`buyprice4`,`buyprice5`,`buyvolume1`,`buyvolume2`,`buyvolume3`,`buyvolume4`,`buyvolume5`,`sellprice1`,`sellprice2`,`sellprice3`,`sellprice4`,`sellprice5`,`sellvolume1`,`sellvolume2`,`sellvolume3`,`sellvolume4`,`sellvolume5`,`closedtime`,`closedvolume`,`closedamount`,`dateOne`,`dateFive`,`dateFifteen`,`dateThirty`,`dateSixty`,`addTime`"
- k = 0
- while True :
- vlues = ""
- # 如果数组数量大于500 截取前500个
- if len(stockRestPojoList) - k > 500 :
- for stockRestPojo in stockRestPojoList[k : 500 + k]:
- vlues = vlues + "('" + str(stockRestPojo.realTime) + "','" + str(stockRestPojo.date) + "','" + str(stockRestPojo.time) + "','" + str(stockRestPojo.now) + "','" + str(stockRestPojo.high) + "','" + str(stockRestPojo.low) + "','" + str(stockRestPojo.open) + "','" + str(stockRestPojo.preclose) + "','" + str(stockRestPojo.roundlot) + "','" + str(stockRestPojo.change) + "','" + str(stockRestPojo.pctchange) + "','" + str(stockRestPojo.volume) + "','" + str(stockRestPojo.amount) + "','" + str(stockRestPojo.volumeratio) + "','" + str(stockRestPojo.commissionratio) + "','" + str(stockRestPojo.commissiondiff) + "','" + str(stockRestPojo.tradestatus) + "','" + str(stockRestPojo.outvolume) + "','" + str(stockRestPojo.involume) + "','" + str(stockRestPojo.highlimit) + "','" + str(stockRestPojo.lowlimit) + "','" + str(stockRestPojo.speed) + "','" + str(stockRestPojo.averageprice) + "','" + str(stockRestPojo.buyprice1) + "','" + str(stockRestPojo.buyprice2) + "','" + str(stockRestPojo.buyprice3) + "','" + str(stockRestPojo.buyprice4) + "','" + str(stockRestPojo.buyprice5) + "','" + str(stockRestPojo.buyvolume1) + "','" + str(stockRestPojo.buyvolume2) + "','" + str(stockRestPojo.buyvolume3) + "','" + str(stockRestPojo.buyvolume4) + "','" + str(stockRestPojo.buyvolume5) + "','" + str(stockRestPojo.sellprice1) + "','" + str(stockRestPojo.sellprice2) + "','" + str(stockRestPojo.sellprice3) + "','" + str(stockRestPojo.sellprice4) + "','" + str(stockRestPojo.sellprice5) + "','" + str(stockRestPojo.sellvolume1) + "','" + str(stockRestPojo.sellvolume2) + "','" + str(stockRestPojo.sellvolume3) + "','" + str(stockRestPojo.sellvolume4) + "','" + str(stockRestPojo.sellvolume5) + "','" + str(stockRestPojo.closedtime) + "','" + str(stockRestPojo.closedvolume) + "','" + str(stockRestPojo.closedamount) + "','" + str(stockRestPojo.dateOne) + "','" + str(stockRestPojo.dateFive) + "','" + str(stockRestPojo.dateFifteen) + "','" + str(stockRestPojo.dateThirty) + "','" + str(stockRestPojo.dateSixty) +"', now()),"
- vlues = vlues[:-1]
- db.batchInsert("data_rt_" + key.replace(".", "_").lower(), insertKey, vlues)
- k = k + 500
- else :
- # 如果数量小于500 并且不为数量不大于标记变量
- if len(stockRestPojoList) > k :
- for stockRestPojo in stockRestPojoList[k : len(stockRestPojoList)]:
- vlues = vlues + "('" + str(stockRestPojo.realTime) + "','" + str(stockRestPojo.date) + "','" + str(stockRestPojo.time) + "','" + str(stockRestPojo.now) + "','" + str(stockRestPojo.high) + "','" + str(stockRestPojo.low) + "','" + str(stockRestPojo.open) + "','" + str(stockRestPojo.preclose) + "','" + str(stockRestPojo.roundlot) + "','" + str(stockRestPojo.change) + "','" + str(stockRestPojo.pctchange) + "','" + str(stockRestPojo.volume) + "','" + str(stockRestPojo.amount) + "','" + str(stockRestPojo.volumeratio) + "','" + str(stockRestPojo.commissionratio) + "','" + str(stockRestPojo.commissiondiff) + "','" + str(stockRestPojo.tradestatus) + "','" + str(stockRestPojo.outvolume) + "','" + str(stockRestPojo.involume) + "','" + str(stockRestPojo.highlimit) + "','" + str(stockRestPojo.lowlimit) + "','" + str(stockRestPojo.speed) + "','" + str(stockRestPojo.averageprice) + "','" + str(stockRestPojo.buyprice1) + "','" + str(stockRestPojo.buyprice2) + "','" + str(stockRestPojo.buyprice3) + "','" + str(stockRestPojo.buyprice4) + "','" + str(stockRestPojo.buyprice5) + "','" + str(stockRestPojo.buyvolume1) + "','" + str(stockRestPojo.buyvolume2) + "','" + str(stockRestPojo.buyvolume3) + "','" + str(stockRestPojo.buyvolume4) + "','" + str(stockRestPojo.buyvolume5) + "','" + str(stockRestPojo.sellprice1) + "','" + str(stockRestPojo.sellprice2) + "','" + str(stockRestPojo.sellprice3) + "','" + str(stockRestPojo.sellprice4) + "','" + str(stockRestPojo.sellprice5) + "','" + str(stockRestPojo.sellvolume1) + "','" + str(stockRestPojo.sellvolume2) + "','" + str(stockRestPojo.sellvolume3) + "','" + str(stockRestPojo.sellvolume4) + "','" + str(stockRestPojo.sellvolume5) + "','" + str(stockRestPojo.closedtime) + "','" + str(stockRestPojo.closedvolume) + "','" + str(stockRestPojo.closedamount) + "','" + str(stockRestPojo.dateOne) + "','" + str(stockRestPojo.dateFive) + "','" + str(stockRestPojo.dateFifteen) + "','" + str(stockRestPojo.dateThirty) + "','" + str(stockRestPojo.dateSixty) +"', now()),"
- vlues = vlues[:-1]
- db.batchInsert("data_rt_" + key.replace(".", "_").lower(), insertKey, vlues)
- break
- # db.close()
- try:
- #调用登录函数(激活后使用,不需要用户名密码)
- loginResult = c.start("ForceLogin=1", '', mainCallback)
- if(loginResult.ErrorCode != 0):
- print("login in fail")
- exit()
- db = dbOperation(db_config.db_gupiao)
- stockPojoList = db.query_list("select id, code, name, list_date listDate, seralid from t_stock_base_info where 1 = 1 order by id asc")
- # cst使用范例
- for stockPojo in stockPojoList:
- sql = "SELECT count(1) count FROM data_rt_" + stockPojo['code'].replace(".", "_").lower() + " WHERE `date` = '20201105'"
- count = db.query_one(sql)
- if int(count['count']) <= 4000:
- sql1 = "delete from data_rt_" + stockPojo['code'].replace(".", "_").lower() + " where date = '20201105'"
- db.dele(sql1)
- data = c.cst(stockPojo['code'], 'Time,Now,High,Low,Open,PreClose,Volume,Amount,HighLimit,LowLimit,BuyPrice1,BuyPrice2,BuyPrice3,BuyPrice4,BuyPrice5,BuyVolume1,BuyVolume2,BuyVolume3,BuyVolume4,BuyVolume5,SellPrice1,SellPrice2,SellPrice3,SellPrice4,SellPrice5,SellVolume1,SellVolume2,SellVolume3,SellVolume4,SellVolume5,ClosedTime,ClosedVolume,ClosedAmount', '093001', '150000','',cstCallBack)
- db.close()
- input("press any key to quit cst \r\n")
- #退出
- data = logoutResult = c.stop()
- except Exception as ee:
- print("error >>>",ee)
- traceback.print_exc()
- else:
- print("demo end")
|