startGetRestData.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. from threading import Thread
  2. from time import sleep
  3. import asyncio
  4. from EmQuantAPI import *
  5. import traceback
  6. from StockRestData import StockRestData
  7. from BondRestData import BondRestData
  8. def async_call(fn):
  9. def wrapper(*args, **kwargs):
  10. Thread(target=fn, args=args, kwargs=kwargs).start()
  11. return wrapper
  12. def mainCallback(quantdata):
  13. print ("mainCallback",str(quantdata))
  14. #登录掉线或者 登陆数达到上线(即登录被踢下线) 这时所有的服务都会停止
  15. if str(quantdata.ErrorCode) == "10001011" or str(quantdata.ErrorCode) == "10001009":
  16. print ("Your account is disconnect. You can force login automatically here if you need.")
  17. #行情登录验证失败(每次连接行情服务器时需要登录验证)或者行情流量验证失败时,会取消所有订阅,用户需根据具体情况处理
  18. elif str(quantdata.ErrorCode) == "10001021" or str(quantdata.ErrorCode) == "10001022":
  19. print ("Your all csq subscribe have stopped.")
  20. #行情服务器断线自动重连连续6次失败(1分钟左右)不过重连尝试还会继续进行直到成功为止,遇到这种情况需要确认两边的网络状况
  21. elif str(quantdata.ErrorCode) == "10002009":
  22. print ("Your all csq subscribe have stopped, reconnect 6 times fail.")
  23. # 行情订阅遇到一些错误(这些错误会导致重连,错误原因通过日志输出,统一转换成EQERR_QUOTE_RECONNECT在这里通知),正自动重连并重新订阅,可以做个监控
  24. elif str(quantdata.ErrorCode) == "10002012":
  25. print ("csq subscribe break on some error, reconnect and request automatically.")
  26. # 资讯服务器断线自动重连连续6次失败(1分钟左右)不过重连尝试还会继续进行直到成功为止,遇到这种情况需要确认两边的网络状况
  27. elif str(quantdata.ErrorCode) == "10002014":
  28. print ("Your all cnq subscribe have stopped, reconnect 6 times fail.")
  29. # 资讯订阅遇到一些错误(这些错误会导致重连,错误原因通过日志输出,统一转换成EQERR_INFO_RECONNECT在这里通知),正自动重连并重新订阅,可以做个监控
  30. elif str(quantdata.ErrorCode) == "10002013":
  31. print ("cnq subscribe break on some error, reconnect and request automatically.")
  32. # 资讯登录验证失败(每次连接资讯服务器时需要登录验证)或者资讯流量验证失败时,会取消所有订阅,用户需根据具体情况处理
  33. elif str(quantdata.ErrorCode) == "10001024" or str(quantdata.ErrorCode) == "10001025":
  34. print("Your all cnq subscribe have stopped.")
  35. else:
  36. pass
  37. # 获取股票实时数据
  38. async def getStockRest():
  39. stockRestData = StockRestData()
  40. print("开始股票")
  41. stockRestData.toGet()
  42. # 获取债券实时数据
  43. async def getBondRest():
  44. bondRestData = BondRestData()
  45. print("开始债券")
  46. bondRestData.toGet()
  47. # 运行了就不会停了
  48. try:
  49. #调用登录函数(激活后使用,不需要用户名密码)
  50. loginResult = c.start("ForceLogin=1", '', mainCallback)
  51. if(loginResult.ErrorCode != 0):
  52. print("login in fail")
  53. exit()
  54. # data = c.csqcancel(0)
  55. loop = asyncio.events.get_event_loop()
  56. tasks= [
  57. getStockRest(),
  58. getBondRest()
  59. ]
  60. loop.run_until_complete((asyncio.wait(tasks)))
  61. loop.close()
  62. while True:
  63. sleep(1 * 60 * 60)
  64. except Exception as ee:
  65. print("error >>>",ee)
  66. #退出
  67. data = logoutResult = c.stop()
  68. traceback.print_exc()
  69. else:
  70. print("demo end")