getHistData.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. # -*- coding:utf-8 -*-
  2. __author__ = 'weijie'
  3. from EmQuantAPI import *
  4. import datetime
  5. import time
  6. import traceback
  7. from StockPojo import StockPojo
  8. from StockHistPojo import StockHistPojo
  9. from dbOperation import dbOperation
  10. import pandas as pd
  11. import json
  12. import array
  13. import db_config
  14. print("开始了")
  15. def mainCallback(quantdata):
  16. print ("mainCallback",str(quantdata))
  17. #登录掉线或者 登陆数达到上线(即登录被踢下线) 这时所有的服务都会停止
  18. if str(quantdata.ErrorCode) == "10001011" or str(quantdata.ErrorCode) == "10001009":
  19. print ("Your account is disconnect. You can force login automatically here if you need.")
  20. #行情登录验证失败(每次连接行情服务器时需要登录验证)或者行情流量验证失败时,会取消所有订阅,用户需根据具体情况处理
  21. elif str(quantdata.ErrorCode) == "10001021" or str(quantdata.ErrorCode) == "10001022":
  22. print ("Your all csq subscribe have stopped.")
  23. #行情服务器断线自动重连连续6次失败(1分钟左右)不过重连尝试还会继续进行直到成功为止,遇到这种情况需要确认两边的网络状况
  24. elif str(quantdata.ErrorCode) == "10002009":
  25. print ("Your all csq subscribe have stopped, reconnect 6 times fail.")
  26. # 行情订阅遇到一些错误(这些错误会导致重连,错误原因通过日志输出,统一转换成EQERR_QUOTE_RECONNECT在这里通知),正自动重连并重新订阅,可以做个监控
  27. elif str(quantdata.ErrorCode) == "10002012":
  28. print ("csq subscribe break on some error, reconnect and request automatically.")
  29. # 资讯服务器断线自动重连连续6次失败(1分钟左右)不过重连尝试还会继续进行直到成功为止,遇到这种情况需要确认两边的网络状况
  30. elif str(quantdata.ErrorCode) == "10002014":
  31. print ("Your all cnq subscribe have stopped, reconnect 6 times fail.")
  32. # 资讯订阅遇到一些错误(这些错误会导致重连,错误原因通过日志输出,统一转换成EQERR_INFO_RECONNECT在这里通知),正自动重连并重新订阅,可以做个监控
  33. elif str(quantdata.ErrorCode) == "10002013":
  34. print ("cnq subscribe break on some error, reconnect and request automatically.")
  35. # 资讯登录验证失败(每次连接资讯服务器时需要登录验证)或者资讯流量验证失败时,会取消所有订阅,用户需根据具体情况处理
  36. elif str(quantdata.ErrorCode) == "10001024" or str(quantdata.ErrorCode) == "10001025":
  37. print("Your all cnq subscribe have stopped.")
  38. else:
  39. pass
  40. try:
  41. #调用登录函数(激活后使用,不需要用户名密码)
  42. loginResult = c.start("ForceLogin=1", '', mainCallback)
  43. if(loginResult.ErrorCode != 0):
  44. print("login in fail")
  45. exit()
  46. # csd使用范例
  47. db = dbOperation(db_config.db_gupiao)
  48. stockPojoList = db.query_list("select id, code, name, list_date listDate from t_stock_base_info where 1 = 1")
  49. getDataTime = time.strftime('%Y-%m-%d',time.localtime(time.time()))
  50. for stockPojo in stockPojoList:
  51. # print(str(stockPojo['code']))
  52. # 先去获取历史表中最后一条 如果有就使用历史表中的最后一条时间当作起始时间去获取数据
  53. sql = "select dates dates from data_hist_" + stockPojo['code'].replace(".", "_").lower() + " order by realTime desc limit 1"
  54. dates = db.query_one(sql)
  55. isAdd = False
  56. startDate = str(stockPojo['listDate'])
  57. if dates is not None :
  58. startDate = str(dates['dates'])
  59. data = c.csd(
  60. str(stockPojo['code']),
  61. "OPEN,CLOSE,HIGH,LOW,PRECLOSE,AVERAGE,CHANGE,PCTCHANGE,VOLUME,HIGHLIMIT,AMOUNT,TURN,TRADESTATUS,LOWLIMIT,AMPLITUDE,TNUM,TAFACTOR,FRONTTAFACTOR,ISSTSTOCK,ISXSTSTOCK",
  62. startDate,
  63. getDataTime,
  64. "Period=1,adjustflag=1,curtype=1,Ispandas=1")
  65. print(data)
  66. data.reset_index(inplace=True)
  67. jsonData = data.to_json()
  68. text = json.loads(jsonData)
  69. stockHistPojoList = []
  70. for i in range(len(text['CODES'])):
  71. isAdd = True
  72. stockHistPojo = StockHistPojo()
  73. stockHistPojo.codes = text['CODES'][str(i)].replace(".", "_")
  74. stockHistPojo.open = text['OPEN'][str(i)]
  75. stockHistPojo.close = text['CLOSE'][str(i)]
  76. stockHistPojo.high = text['HIGH'][str(i)]
  77. stockHistPojo.low = text['LOW'][str(i)]
  78. stockHistPojo.preclose = text['PRECLOSE'][str(i)]
  79. if "None".strip() == str(text['AVERAGE'][str(i)]).strip():
  80. continue
  81. stockHistPojo.average = text['AVERAGE'][str(i)]
  82. stockHistPojo.change = text['CHANGE'][str(i)]
  83. stockHistPojo.pctchange = text['PCTCHANGE'][str(i)]
  84. stockHistPojo.volume = text['VOLUME'][str(i)]
  85. stockHistPojo.highlimit = text['HIGHLIMIT'][str(i)]
  86. stockHistPojo.amount = text['AMOUNT'][str(i)]
  87. stockHistPojo.turn = text['TURN'][str(i)]
  88. stockHistPojo.tradestatus = text['TRADESTATUS'][str(i)]
  89. stockHistPojo.lowlimit = text['LOWLIMIT'][str(i)]
  90. stockHistPojo.amplitude = text['AMPLITUDE'][str(i)]
  91. stockHistPojo.tnum = text['TNUM'][str(i)]
  92. stockHistPojo.tafactor = text['TAFACTOR'][str(i)]
  93. stockHistPojo.fronttafactor = text['FRONTTAFACTOR'][str(i)]
  94. stockHistPojo.isststock = text['ISSTSTOCK'][str(i)]
  95. stockHistPojo.isxststock = text['ISXSTSTOCK'][str(i)]
  96. stockHistPojo.dates = text['DATES'][str(i)]
  97. # datTime = datetime.datetime.strptime(stockHistPojo.dates, "%Y/%m/%d")
  98. # datTime = time.mktime(time.strptime(stockHistPojo.dates, "%Y/%m/%d"))
  99. datTime = time.mktime(time.strptime(stockHistPojo.dates, "%Y/%m/%d"))
  100. year = time.localtime(datTime).tm_year # 获取年份
  101. yearDatTime = time.mktime(time.strptime(str(year) + "/01/01", "%Y/%m/%d"))
  102. month = time.localtime(datTime).tm_mon # 获取月份
  103. monthDatTime = ""
  104. if 10 > month:
  105. datTime = time.mktime(time.strptime(str(year) + "/0" + str(month) + "/01", "%Y/%m/%d"))
  106. monthDatTime = int(datTime)
  107. else:
  108. datTime = time.mktime(time.strptime(str(year) + "/" + str(month) + "/01", "%Y/%m/%d"))
  109. monthDatTime = int(datTime)
  110. weekDatTime = time.mktime(time.strptime(stockHistPojo.dates, "%Y/%m/%d"))
  111. wday = time.localtime(weekDatTime).tm_wday # 获取周
  112. dt = datetime.datetime.strptime(stockHistPojo.dates, "%Y/%m/%d")
  113. out_date = (dt + datetime.timedelta(days=-wday)).strftime("%Y/%m/%d")
  114. weekDatTime1 = time.mktime(time.strptime(out_date, "%Y/%m/%d"))
  115. #获取日
  116. dayDatTime = time.mktime(time.strptime(stockHistPojo.dates, "%Y/%m/%d"))
  117. stockHistPojo.dateYear = str(int(yearDatTime))
  118. stockHistPojo.dateMonth = str(int(monthDatTime))
  119. stockHistPojo.dateWeek = str(int(weekDatTime1))
  120. stockHistPojo.dateDay = str(int(dayDatTime))
  121. stockHistPojoList.append(stockHistPojo)
  122. i = 0
  123. key = "`realTime`,`dates`,`open`,`close`,`high`,`low`,`preclose`,`average`,`change`,`pctchange`,`volume`,`highlimit`,`amount`,`turn`,`tradestatus`,`lowlimit`,`amplitude`,`tnum`,`tafactor`,`fronttafactor`,`isststock`,`isxststock`,`date_year`,`date_month`,`date_week`"
  124. # 批量插入 每次插入500
  125. while True :
  126. vlues = ""
  127. # print(i)
  128. # 如果数组数量大于500 截取前500个
  129. if len(stockHistPojoList) - i > 500 :
  130. # stockPojoList[i : 500] 等于java subList(i, 500)
  131. for stockHistPojo in stockHistPojoList[i : 500 + i]:
  132. vlues = vlues + "('"+str(stockHistPojo.dateDay)+"','"+str(stockHistPojo.dates)+"','" + str(stockHistPojo.open) + "','" + str(stockHistPojo.close) + "','" + str(stockHistPojo.high)+ "','" + str(stockHistPojo.low) + "','" + str(stockHistPojo.preclose) + "','" + str(stockHistPojo.average) + "','" + str(stockHistPojo.change) + "','" + str(stockHistPojo.pctchange) + "','" + str(stockHistPojo.volume) + "','" + str(stockHistPojo.highlimit) + "','" + str(stockHistPojo.amount) + "','" + str(stockHistPojo.turn) + "','" + str(stockHistPojo.tradestatus) + "','" + str(stockHistPojo.lowlimit) + "','" + str(stockHistPojo.amplitude) + "','" + str(stockHistPojo.tnum) + "','" + str(stockHistPojo.tafactor) + "','" + str(stockHistPojo.fronttafactor) + "','" + str(stockHistPojo.isststock) + "','" + str(stockHistPojo.isxststock) + "','" + str(stockHistPojo.dateYear) + "','" + str(stockHistPojo.dateMonth) + "','" + str(stockHistPojo.dateWeek) + "'),"
  133. vlues = vlues[:-1]
  134. db.batchInsert("data_hist_" + stockPojo['code'].replace(".", "_").lower(), key, vlues)
  135. i = i + 500
  136. else :
  137. # 如果数量小于500 并且不为数量不大于标记变量
  138. if len(stockHistPojoList) > i :
  139. for stockHistPojo in stockHistPojoList[i : len(stockHistPojoList)]:
  140. vlues = vlues + "('"+str(stockHistPojo.dateDay)+"','"+str(stockHistPojo.dates)+"','" + str(stockHistPojo.open) + "','" + str(stockHistPojo.close) + "','" + str(stockHistPojo.high)+ "','" + str(stockHistPojo.low) + "','" + str(stockHistPojo.preclose) + "','" + str(stockHistPojo.average) + "','" + str(stockHistPojo.change) + "','" + str(stockHistPojo.pctchange) + "','" + str(stockHistPojo.volume) + "','" + str(stockHistPojo.highlimit) + "','" + str(stockHistPojo.amount) + "','" + str(stockHistPojo.turn) + "','" + str(stockHistPojo.tradestatus) + "','" + str(stockHistPojo.lowlimit) + "','" + str(stockHistPojo.amplitude) + "','" + str(stockHistPojo.tnum) + "','" + str(stockHistPojo.tafactor) + "','" + str(stockHistPojo.fronttafactor) + "','" + str(stockHistPojo.isststock) + "','" + str(stockHistPojo.isxststock) + "','" + str(stockHistPojo.dateYear) + "','" + str(stockHistPojo.dateMonth) + "','" + str(stockHistPojo.dateWeek) + "'),"
  141. vlues = vlues[:-1]
  142. db.batchInsert("data_hist_" + stockPojo['code'].replace(".", "_").lower(), key, vlues)
  143. break
  144. if isAdd:
  145. db.update("update t_stock_base_info set is_hist_add = 1 where id = " + str(stockPojo['id']))
  146. db.close()
  147. #退出
  148. data = logoutResult = c.stop()
  149. except Exception as ee:
  150. print("error >>>",ee)
  151. traceback.print_exc()
  152. else:
  153. print("demo end")