startGetRestData.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. from threading import Thread
  2. from time import sleep
  3. import asyncio
  4. from EmQuantAPI import *
  5. import traceback
  6. from StockRestData import StockRestData
  7. from BondRestData import BondRestData
  8. from TempIndexRestData import TempIndexRestData
  9. from LimitUpNewsData import LimitUpNewsData
  10. def async_call(fn):
  11. def wrapper(*args, **kwargs):
  12. Thread(target=fn, args=args, kwargs=kwargs).start()
  13. return wrapper
  14. def mainCallback(quantdata):
  15. print ("mainCallback",str(quantdata))
  16. #登录掉线或者 登陆数达到上线(即登录被踢下线) 这时所有的服务都会停止
  17. if str(quantdata.ErrorCode) == "10001011" or str(quantdata.ErrorCode) == "10001009":
  18. print ("Your account is disconnect. You can force login automatically here if you need.")
  19. #行情登录验证失败(每次连接行情服务器时需要登录验证)或者行情流量验证失败时,会取消所有订阅,用户需根据具体情况处理
  20. elif str(quantdata.ErrorCode) == "10001021" or str(quantdata.ErrorCode) == "10001022":
  21. print ("Your all csq subscribe have stopped.")
  22. #行情服务器断线自动重连连续6次失败(1分钟左右)不过重连尝试还会继续进行直到成功为止,遇到这种情况需要确认两边的网络状况
  23. elif str(quantdata.ErrorCode) == "10002009":
  24. print ("Your all csq subscribe have stopped, reconnect 6 times fail.")
  25. # 行情订阅遇到一些错误(这些错误会导致重连,错误原因通过日志输出,统一转换成EQERR_QUOTE_RECONNECT在这里通知),正自动重连并重新订阅,可以做个监控
  26. elif str(quantdata.ErrorCode) == "10002012":
  27. print ("csq subscribe break on some error, reconnect and request automatically.")
  28. # 资讯服务器断线自动重连连续6次失败(1分钟左右)不过重连尝试还会继续进行直到成功为止,遇到这种情况需要确认两边的网络状况
  29. elif str(quantdata.ErrorCode) == "10002014":
  30. print ("Your all cnq subscribe have stopped, reconnect 6 times fail.")
  31. # 资讯订阅遇到一些错误(这些错误会导致重连,错误原因通过日志输出,统一转换成EQERR_INFO_RECONNECT在这里通知),正自动重连并重新订阅,可以做个监控
  32. elif str(quantdata.ErrorCode) == "10002013":
  33. print ("cnq subscribe break on some error, reconnect and request automatically.")
  34. # 资讯登录验证失败(每次连接资讯服务器时需要登录验证)或者资讯流量验证失败时,会取消所有订阅,用户需根据具体情况处理
  35. elif str(quantdata.ErrorCode) == "10001024" or str(quantdata.ErrorCode) == "10001025":
  36. print("Your all cnq subscribe have stopped.")
  37. else:
  38. pass
  39. # 获取股票实时数据
  40. async def getStockRest():
  41. stockRestData = StockRestData()
  42. print("开始股票")
  43. stockRestData.toGet()
  44. # 获取债券实时数据
  45. async def getBondRest():
  46. bondRestData = BondRestData()
  47. print("开始债券")
  48. bondRestData.toGet()
  49. # 获取指数实时数据
  50. async def getIndexRest():
  51. tempIndexRestData = TempIndexRestData()
  52. print("开始指数")
  53. tempIndexRestData.toGet()
  54. # 获取订阅新闻
  55. async def getLimitUpNewsData():
  56. limitUpNewsData = LimitUpNewsData()
  57. print("开始新闻")
  58. limitUpNewsData.toGet()
  59. # 运行了就不会停了
  60. try:
  61. #调用登录函数(激活后使用,不需要用户名密码)
  62. # loginResult = c.start("LoginMode=SXDL,PhoneNumber=15387568630", '', mainCallback)
  63. loginResult = c.start("ForceLogin=1", '', mainCallback)
  64. if(loginResult.ErrorCode != 0):
  65. print("login in fail")
  66. exit()
  67. # data = c.csqcancel(0)
  68. # data = c.csqcancel(-1)
  69. loop = asyncio.events.get_event_loop()
  70. tasks= [
  71. getStockRest(),
  72. getBondRest(),
  73. getIndexRest(),
  74. getLimitUpNewsData()
  75. ]
  76. loop.run_until_complete((asyncio.wait(tasks)))
  77. loop.close()
  78. while True:
  79. sleep(1 * 60 * 60)
  80. except Exception as ee:
  81. print("error >>>",ee)
  82. #退出
  83. data = logoutResult = c.stop()
  84. traceback.print_exc()
  85. else:
  86. print("demo end")