getRestData.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210
  1. # -*- coding:utf-8 -*-
  2. __author__ = 'weijie'
  3. from EmQuantAPI import *
  4. from datetime import timedelta, datetime
  5. import time
  6. import traceback
  7. import json
  8. from StockRestPojo import StockRestPojo
  9. from dbOperation import dbOperation
  10. from threading import Thread
  11. import db_config
  12. print("开始了")
  13. def mainCallback(quantdata):
  14. """
  15. mainCallback 是主回调函数,可捕捉如下错误
  16. 在start函数第三个参数位传入,该函数只有一个为c.EmQuantData类型的参数quantdata
  17. :param quantdata:c.EmQuantData
  18. :return:
  19. """
  20. print ("mainCallback",str(quantdata))
  21. print("开始了2222")
  22. #登录掉线或者 登陆数达到上线(即登录被踢下线) 这时所有的服务都会停止
  23. if str(quantdata.ErrorCode) == "10001011" or str(quantdata.ErrorCode) == "10001009":
  24. print ("Your account is disconnect. You can force login automatically here if you need.")
  25. #行情登录验证失败(每次连接行情服务器时需要登录验证)或者行情流量验证失败时,会取消所有订阅,用户需根据具体情况处理
  26. elif str(quantdata.ErrorCode) == "10001021" or str(quantdata.ErrorCode) == "10001022":
  27. print ("Your all csq subscribe have stopped.")
  28. #行情服务器断线自动重连连续6次失败(1分钟左右)不过重连尝试还会继续进行直到成功为止,遇到这种情况需要确认两边的网络状况
  29. elif str(quantdata.ErrorCode) == "10002009":
  30. print ("Your all csq subscribe have stopped, reconnect 6 times fail.")
  31. # 行情订阅遇到一些错误(这些错误会导致重连,错误原因通过日志输出,统一转换成EQERR_QUOTE_RECONNECT在这里通知),正自动重连并重新订阅,可以做个监控
  32. elif str(quantdata.ErrorCode) == "10002012":
  33. print ("csq subscribe break on some error, reconnect and request automatically.")
  34. # 资讯服务器断线自动重连连续6次失败(1分钟左右)不过重连尝试还会继续进行直到成功为止,遇到这种情况需要确认两边的网络状况
  35. elif str(quantdata.ErrorCode) == "10002014":
  36. print ("Your all cnq subscribe have stopped, reconnect 6 times fail.")
  37. # 资讯订阅遇到一些错误(这些错误会导致重连,错误原因通过日志输出,统一转换成EQERR_INFO_RECONNECT在这里通知),正自动重连并重新订阅,可以做个监控
  38. elif str(quantdata.ErrorCode) == "10002013":
  39. print ("cnq subscribe break on some error, reconnect and request automatically.")
  40. # 资讯登录验证失败(每次连接资讯服务器时需要登录验证)或者资讯流量验证失败时,会取消所有订阅,用户需根据具体情况处理
  41. elif str(quantdata.ErrorCode) == "10001024" or str(quantdata.ErrorCode) == "10001025":
  42. print("Your all cnq subscribe have stopped.")
  43. else:
  44. pass
  45. def work(code):
  46. if(code != ""):
  47. #实时行情订阅使用范例
  48. data = c.csq(code, 'date,time,now,high,low,open,preclose,roundlot,change,pctchange,volume,amount,volumeratio,commissionratio,commissiondiff,tradestatus,outvolume,involume,highlimit,lowlimit,speed,averageprice,buyprice1,buyprice2,buyprice3,buyprice4,buyprice5,buyvolume1,buyvolume2,buyvolume3,buyvolume4,buyvolume5,sellprice1,sellprice2,sellprice3,sellprice4,sellprice5,sellvolume1,sellvolume2,sellvolume3,sellvolume4,sellvolume5,closedtime,closedvolume,closedamount','Pushtype=2',csqCallback)
  49. # print("csq返回" + str(data))
  50. if(data.ErrorCode != 0):
  51. print("request csq Error, ", data.ErrorMsg)
  52. else:
  53. print("csq输出结果======分隔线======")
  54. time.sleep(2)
  55. # text = input("press any key to cancel csq \r\n")
  56. #取消订阅
  57. # data = c.csqcancel(data.SerialID)
  58. def startCallback(message):
  59. print("[EmQuantAPI Python]", message)
  60. return 1
  61. def csqCallback(quantdata):
  62. """
  63. csqCallback 是csq订阅时提供的回调函数模板。该函数只有一个为c.EmQuantData类型的参数quantdata
  64. :param quantdata:c.EmQuantData
  65. :return:
  66. """
  67. serialID = quantdata.SerialID
  68. # 设置线程序列号,标识正在抓取数据中
  69. db = dbOperation(db_config.db_gupiao)
  70. restData = quantdata.Data
  71. # print(restData)
  72. for key,value in restData.items():
  73. if "None".strip() == str(value[0]).strip():
  74. print("无效数据跳过")
  75. break
  76. else:
  77. db.update("update t_stock_base_info set seralid = '"+str(serialID)+"' where code = '" + str(key) + "'")
  78. stockRestPojo = StockRestPojo()
  79. stockRestPojo.date = str(value[0])
  80. stockRestPojo.time = str(value[1])
  81. stockRestPojo.now = str(value[2])
  82. stockRestPojo.high = str(value[3])
  83. stockRestPojo.low = str(value[4])
  84. stockRestPojo.open = str(value[5])
  85. stockRestPojo.preclose = str(value[6])
  86. stockRestPojo.roundlot = str(value[7])
  87. stockRestPojo.change = str(value[8])
  88. stockRestPojo.pctchange = str(value[9])
  89. stockRestPojo.volume = str(value[10])
  90. stockRestPojo.amount = str(value[11])
  91. stockRestPojo.volumeratio = str(value[12])
  92. stockRestPojo.commissionratio = str(value[13])
  93. stockRestPojo.commissiondiff = str(value[14])
  94. stockRestPojo.tradestatus = str(value[15])
  95. stockRestPojo.outvolume = str(value[16])
  96. stockRestPojo.involume = str(value[17])
  97. stockRestPojo.highlimit = str(value[18])
  98. stockRestPojo.lowlimit = str(value[19])
  99. stockRestPojo.speed = str(value[20])
  100. stockRestPojo.averageprice = str(value[21])
  101. stockRestPojo.buyprice1 = str(value[22])
  102. stockRestPojo.buyprice2 = str(value[23])
  103. stockRestPojo.buyprice3 = str(value[24])
  104. stockRestPojo.buyprice4 = str(value[25])
  105. stockRestPojo.buyprice5 = str(value[26])
  106. stockRestPojo.buyvolume1 = str(value[27])
  107. stockRestPojo.buyvolume2 = str(value[28])
  108. stockRestPojo.buyvolume3 = str(value[29])
  109. stockRestPojo.buyvolume4 = str(value[30])
  110. stockRestPojo.buyvolume5 = str(value[31])
  111. stockRestPojo.sellprice1 = str(value[32])
  112. stockRestPojo.sellprice2 = str(value[33])
  113. stockRestPojo.sellprice3 = str(value[34])
  114. stockRestPojo.sellprice4 = str(value[35])
  115. stockRestPojo.sellprice5 = str(value[36])
  116. stockRestPojo.sellvolume1 = str(value[37])
  117. stockRestPojo.sellvolume2 = str(value[38])
  118. stockRestPojo.sellvolume3 = str(value[39])
  119. stockRestPojo.sellvolume4 = str(value[40])
  120. stockRestPojo.sellvolume5 = str(value[41])
  121. stockRestPojo.closedtime = str(value[42])
  122. stockRestPojo.closedvolume = str(value[43])
  123. stockRestPojo.closedamount = str(value[44])
  124. datTime = time.mktime(time.strptime(str(value[0]) + str(value[1]), "%Y%m%d%H%M%S"))
  125. stockRestPojo.realTime = str(int(datTime))
  126. stockRestPojo.dateOne = "0"
  127. stockRestPojo.dateFive = "0"
  128. stockRestPojo.dateFifteen = "0"
  129. stockRestPojo.dateThirty = "0"
  130. stockRestPojo.dateSixty = "0"
  131. min_ = time.localtime(datTime).tm_min # 获取分钟
  132. sec_ = time.localtime(datTime).tm_sec # 获取秒数
  133. # 1分钟数据
  134. if sec_ != 0:
  135. tempDatTime = datTime + 60 - sec_
  136. stockRestPojo.dateOne = str(int(tempDatTime))
  137. else:
  138. stockRestPojo.dateOne = str(int(datTime))
  139. # 5分钟数据
  140. if sec_ != 0 or min_%5 != 0:
  141. tempDatTime = datTime + (5 * 60) - sec_ - (min_%5 * 60)
  142. stockRestPojo.dateFive = str(int(tempDatTime))
  143. else:
  144. stockRestPojo.dateFive = str(int(datTime))
  145. # 15分钟数据
  146. if sec_ != 0 or min_%15 != 0:
  147. tempDatTime = datTime + (15 * 60) - sec_ - (min_%15 * 60)
  148. stockRestPojo.dateFifteen = str(int(tempDatTime))
  149. else:
  150. stockRestPojo.dateFifteen = str(int(datTime))
  151. # 30分钟数据
  152. if sec_ != 0 or min_%30 != 0:
  153. tempDatTime = datTime + (30 * 60) - sec_ - (min_%30 * 60)
  154. stockRestPojo.dateThirty = str(int(tempDatTime))
  155. else:
  156. stockRestPojo.dateThirty = str(int(datTime))
  157. # 60分钟数据
  158. if sec_ != 0 or min_%60 != 0:
  159. tempDatTime = datTime + (60 * 60) - sec_ - (min_%60 * 60)
  160. stockRestPojo.dateSixty = str(int(tempDatTime))
  161. else:
  162. stockRestPojo.dateSixty = str(int(datTime))
  163. sql = "insert into data_rt_" + key.replace(".", "_").lower() + "(`realTime`,`date`,`time`,`now`,`high`,`low`,`open`,`preclose`,`roundlot`,`change`,`pctchange`,`volume`,`amount`,`volumeratio`,`commissionratio`,`commissiondiff`,`tradestatus`,`outvolume`,`involume`,`highlimit`,`lowlimit`,`speed`,`averageprice`,`buyprice1`,`buyprice2`,`buyprice3`,`buyprice4`,`buyprice5`,`buyvolume1`,`buyvolume2`,`buyvolume3`,`buyvolume4`,`buyvolume5`,`sellprice1`,`sellprice2`,`sellprice3`,`sellprice4`,`sellprice5`,`sellvolume1`,`sellvolume2`,`sellvolume3`,`sellvolume4`,`sellvolume5`,`closedtime`,`closedvolume`,`closedamount`,`dateOne`,`dateFive`,`dateFifteen`,`dateThirty`,`dateSixty`,`addTime`) values('" + str(stockRestPojo.realTime) + "','" + str(stockRestPojo.date) + "','" + str(stockRestPojo.time) + "','" + str(stockRestPojo.now) + "','" + str(stockRestPojo.high) + "','" + str(stockRestPojo.low) + "','" + str(stockRestPojo.open) + "','" + str(stockRestPojo.preclose) + "','" + str(stockRestPojo.roundlot) + "','" + str(stockRestPojo.change) + "','" + str(stockRestPojo.pctchange) + "','" + str(stockRestPojo.volume) + "','" + str(stockRestPojo.amount) + "','" + str(stockRestPojo.volumeratio) + "','" + str(stockRestPojo.commissionratio) + "','" + str(stockRestPojo.commissiondiff) + "','" + str(stockRestPojo.tradestatus) + "','" + str(stockRestPojo.outvolume) + "','" + str(stockRestPojo.involume) + "','" + str(stockRestPojo.highlimit) + "','" + str(stockRestPojo.lowlimit) + "','" + str(stockRestPojo.speed) + "','" + str(stockRestPojo.averageprice) + "','" + str(stockRestPojo.buyprice1) + "','" + str(stockRestPojo.buyprice2) + "','" + str(stockRestPojo.buyprice3) + "','" + str(stockRestPojo.buyprice4) + "','" + str(stockRestPojo.buyprice5) + "','" + str(stockRestPojo.buyvolume1) + "','" + str(stockRestPojo.buyvolume2) + "','" + str(stockRestPojo.buyvolume3) + "','" + str(stockRestPojo.buyvolume4) + "','" + str(stockRestPojo.buyvolume5) + "','" + str(stockRestPojo.sellprice1) + "','" + str(stockRestPojo.sellprice2) + "','" + str(stockRestPojo.sellprice3) + "','" + str(stockRestPojo.sellprice4) + "','" + str(stockRestPojo.sellprice5) + "','" + str(stockRestPojo.sellvolume1) + "','" + str(stockRestPojo.sellvolume2) + "','" + str(stockRestPojo.sellvolume3) + "','" + str(stockRestPojo.sellvolume4) + "','" + str(stockRestPojo.sellvolume5) + "','" + str(stockRestPojo.closedtime) + "','" + str(stockRestPojo.closedvolume) + "','" + str(stockRestPojo.closedamount) + "','" + str(stockRestPojo.dateOne) + "','" + str(stockRestPojo.dateFive) + "','" + str(stockRestPojo.dateFifteen) + "','" + str(stockRestPojo.dateThirty) + "','" + str(stockRestPojo.dateSixty) +"', now());"
  164. db.insert(sql)
  165. db.close()
  166. try:
  167. #调用登录函数(激活后使用,不需要用户名密码)
  168. loginResult = c.start("ForceLogin=1", '', mainCallback)
  169. if(loginResult.ErrorCode != 0):
  170. print("login in fail")
  171. exit()
  172. db = dbOperation(db_config.db_gupiao)
  173. stockPojoList = db.query_list("select id, code, name, list_date listDate, seralid from t_stock_base_info where is_rest_add = 0 order by id asc")
  174. # 拼接证券代码
  175. codesStr = ""
  176. # 单线程
  177. i = 1
  178. for stockPojo in stockPojoList:
  179. if 0 != i%500 :
  180. codesStr = codesStr + stockPojo['code'] + ','
  181. if 0 == i%500 or str(i) == str(len(stockPojoList)):
  182. if(codesStr != ""):
  183. #实时行情订阅使用范例
  184. codesStr = codesStr[:-1]
  185. data = c.csq(codesStr, 'date,time,now,high,low,open,preclose,roundlot,change,pctchange,volume,amount,volumeratio,commissionratio,commissiondiff,tradestatus,outvolume,involume,highlimit,lowlimit,speed,averageprice,buyprice1,buyprice2,buyprice3,buyprice4,buyprice5,buyvolume1,buyvolume2,buyvolume3,buyvolume4,buyvolume5,sellprice1,sellprice2,sellprice3,sellprice4,sellprice5,sellvolume1,sellvolume2,sellvolume3,sellvolume4,sellvolume5,closedtime,closedvolume,closedamount','Pushtype=2',csqCallback)
  186. codesStr = ""
  187. i = i + 1
  188. db.close()
  189. text = input("press any key to cancel csq \r\n")
  190. #退出
  191. data = logoutResult = c.stop()
  192. except Exception as ee:
  193. print("error >>>",ee)
  194. traceback.print_exc()
  195. else:
  196. print("demo end")