123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171 |
- # -*- coding:utf-8 -*-
- __author__ = 'weijie'
- from EmQuantAPI import *
- import datetime
- import time
- import traceback
- from StockPojo import StockPojo
- from StockHistPojo import StockHistPojo
- from dbOperation import dbOperation
- import pandas as pd
- import json
- import array
- import db_config
- print("开始了")
- def mainCallback(quantdata):
- """
- mainCallback 是主回调函数,可捕捉如下错误
- 在start函数第三个参数位传入,该函数只有一个为c.EmQuantData类型的参数quantdata
- :param quantdata:c.EmQuantData
- :return:
- """
- print ("mainCallback",str(quantdata))
- print("开始了2222")
- #登录掉线或者 登陆数达到上线(即登录被踢下线) 这时所有的服务都会停止
- if str(quantdata.ErrorCode) == "10001011" or str(quantdata.ErrorCode) == "10001009":
- print ("Your account is disconnect. You can force login automatically here if you need.")
- #行情登录验证失败(每次连接行情服务器时需要登录验证)或者行情流量验证失败时,会取消所有订阅,用户需根据具体情况处理
- elif str(quantdata.ErrorCode) == "10001021" or str(quantdata.ErrorCode) == "10001022":
- print ("Your all csq subscribe have stopped.")
- #行情服务器断线自动重连连续6次失败(1分钟左右)不过重连尝试还会继续进行直到成功为止,遇到这种情况需要确认两边的网络状况
- elif str(quantdata.ErrorCode) == "10002009":
- print ("Your all csq subscribe have stopped, reconnect 6 times fail.")
- # 行情订阅遇到一些错误(这些错误会导致重连,错误原因通过日志输出,统一转换成EQERR_QUOTE_RECONNECT在这里通知),正自动重连并重新订阅,可以做个监控
- elif str(quantdata.ErrorCode) == "10002012":
- print ("csq subscribe break on some error, reconnect and request automatically.")
- # 资讯服务器断线自动重连连续6次失败(1分钟左右)不过重连尝试还会继续进行直到成功为止,遇到这种情况需要确认两边的网络状况
- elif str(quantdata.ErrorCode) == "10002014":
- print ("Your all cnq subscribe have stopped, reconnect 6 times fail.")
- # 资讯订阅遇到一些错误(这些错误会导致重连,错误原因通过日志输出,统一转换成EQERR_INFO_RECONNECT在这里通知),正自动重连并重新订阅,可以做个监控
- elif str(quantdata.ErrorCode) == "10002013":
- print ("cnq subscribe break on some error, reconnect and request automatically.")
- # 资讯登录验证失败(每次连接资讯服务器时需要登录验证)或者资讯流量验证失败时,会取消所有订阅,用户需根据具体情况处理
- elif str(quantdata.ErrorCode) == "10001024" or str(quantdata.ErrorCode) == "10001025":
- print("Your all cnq subscribe have stopped.")
- else:
- pass
- def startCallback(message):
- print("[EmQuantAPI Python]", message)
- return 1
- try:
- #调用登录函数(激活后使用,不需要用户名密码)
- loginResult = c.start("ForceLogin=1", '', mainCallback)
- if(loginResult.ErrorCode != 0):
- print("login in fail")
- exit()
- # csd使用范例
- db = dbOperation(db_config.db_zhaiquan)
- stockPojoList = db.query_list("select id, code, name, list_date listDate from t_stock_base_info where is_hist_add = 0 and list_date != '待上市'")
- getDataTime = time.strftime('%Y-%m-%d',time.localtime(time.time()))
- for stockPojo in stockPojoList:
- if stockPojo['listDate'] > getDataTime:
- continue
- isAdd = False
- data = c.csd(
- str(stockPojo['code']),
- "OPEN,CLOSE,HIGH,LOW,PRECLOSE,AVERAGE,CHANGE,PCTCHANGE,VOLUME,AMOUNT",
- str(stockPojo['listDate']),
- getDataTime,
- "Period=1,adjustflag=1,curtype=1,Ispandas=1")
- print("返回数据:" + str(data))
- data.reset_index(inplace=True)
- # print(data)
- jsonData = data.to_json()
- text = json.loads(jsonData)
- stockHistPojoList = []
- for i in range(len(text['CODES'])):
- isAdd = True
- stockHistPojo = StockHistPojo()
- stockHistPojo.codes = text['CODES'][str(i)].replace(".", "_")
- stockHistPojo.open = text['OPEN'][str(i)]
- stockHistPojo.close = text['CLOSE'][str(i)]
- stockHistPojo.high = text['HIGH'][str(i)]
- stockHistPojo.low = text['LOW'][str(i)]
- stockHistPojo.preclose = text['PRECLOSE'][str(i)]
- if "None".strip() == str(text['AVERAGE'][str(i)]).strip():
- continue
- stockHistPojo.average = text['AVERAGE'][str(i)]
- stockHistPojo.change = text['CHANGE'][str(i)]
- stockHistPojo.pctchange = text['PCTCHANGE'][str(i)]
- stockHistPojo.volume = text['VOLUME'][str(i)]
- stockHistPojo.highlimit = "0"
- stockHistPojo.amount = text['AMOUNT'][str(i)]
- stockHistPojo.turn = "0"
- stockHistPojo.tradestatus = "0"
- stockHistPojo.lowlimit = "0"
- stockHistPojo.amplitude = "0"
- stockHistPojo.tnum = "0"
- stockHistPojo.tafactor = "0"
- stockHistPojo.fronttafactor = "0"
- stockHistPojo.isststock = "0"
- stockHistPojo.isxststock = "0"
- stockHistPojo.dates = text['DATES'][str(i)]
- # datTime = datetime.datetime.strptime(stockHistPojo.dates, "%Y/%m/%d")
- # datTime = time.mktime(time.strptime(stockHistPojo.dates, "%Y/%m/%d"))
- datTime = time.mktime(time.strptime(stockHistPojo.dates, "%Y/%m/%d"))
- year = time.localtime(datTime).tm_year # 获取年份
- yearDatTime = time.mktime(time.strptime(str(year) + "/01/01", "%Y/%m/%d"))
- month = time.localtime(datTime).tm_mon # 获取月份
- monthDatTime = ""
- if 10 > month:
- datTime = time.mktime(time.strptime(str(year) + "/0" + str(month) + "/01", "%Y/%m/%d"))
- monthDatTime = int(datTime)
- else:
- datTime = time.mktime(time.strptime(str(year) + "/" + str(month) + "/01", "%Y/%m/%d"))
- monthDatTime = int(datTime)
- weekDatTime = time.mktime(time.strptime(stockHistPojo.dates, "%Y/%m/%d"))
- wday = time.localtime(weekDatTime).tm_wday # 获取周
- dt = datetime.datetime.strptime(stockHistPojo.dates, "%Y/%m/%d")
- out_date = (dt + datetime.timedelta(days=-wday)).strftime("%Y/%m/%d")
- weekDatTime1 = time.mktime(time.strptime(out_date, "%Y/%m/%d"))
- #获取日
- dayDatTime = time.mktime(time.strptime(stockHistPojo.dates, "%Y/%m/%d"))
- stockHistPojo.dateYear = str(int(yearDatTime))
- stockHistPojo.dateMonth = str(int(monthDatTime))
- stockHistPojo.dateWeek = str(int(weekDatTime1))
- stockHistPojo.dateDay = str(int(dayDatTime))
-
- stockHistPojoList.append(stockHistPojo)
- i = 0
- key = "`realTime`,`dates`,`open`,`close`,`high`,`low`,`preclose`,`average`,`change`,`pctchange`,`volume`,`highlimit`,`amount`,`turn`,`tradestatus`,`lowlimit`,`amplitude`,`tnum`,`tafactor`,`fronttafactor`,`isststock`,`isxststock`,`date_year`,`date_month`,`date_week`"
- while True :
- vlues = ""
- # print(i)
- # 如果数组数量大于500 截取前500个
- if len(stockHistPojoList) - i > 500 :
- # stockPojoList[i : 500] 等于java subList(i, 500)
- for stockHistPojo in stockHistPojoList[i : 500 + i]:
- vlues = vlues + "('"+str(stockHistPojo.dateDay)+"','"+str(stockHistPojo.dates)+"','" + str(stockHistPojo.open) + "','" + str(stockHistPojo.close) + "','" + str(stockHistPojo.high)+ "','" + str(stockHistPojo.low) + "','" + str(stockHistPojo.preclose) + "','" + str(stockHistPojo.average) + "','" + str(stockHistPojo.change) + "','" + str(stockHistPojo.pctchange) + "','" + str(stockHistPojo.volume) + "','" + str(stockHistPojo.highlimit) + "','" + str(stockHistPojo.amount) + "','" + str(stockHistPojo.turn) + "','" + str(stockHistPojo.tradestatus) + "','" + str(stockHistPojo.lowlimit) + "','" + str(stockHistPojo.amplitude) + "','" + str(stockHistPojo.tnum) + "','" + str(stockHistPojo.tafactor) + "','" + str(stockHistPojo.fronttafactor) + "','" + str(stockHistPojo.isststock) + "','" + str(stockHistPojo.isxststock) + "','" + str(stockHistPojo.dateYear) + "','" + str(stockHistPojo.dateMonth) + "','" + str(stockHistPojo.dateWeek) + "'),"
- vlues = vlues[:-1]
- db.batchInsert("data_hist_" + stockPojo['code'].replace(".", "_").lower(), key, vlues)
- i = i + 500
- else :
- # 如果数量小于500 并且不为数量不大于标记变量
- if len(stockHistPojoList) > i :
- for stockHistPojo in stockHistPojoList[i : len(stockHistPojoList)]:
- vlues = vlues + "('"+str(stockHistPojo.dateDay)+"','"+str(stockHistPojo.dates)+"','" + str(stockHistPojo.open) + "','" + str(stockHistPojo.close) + "','" + str(stockHistPojo.high)+ "','" + str(stockHistPojo.low) + "','" + str(stockHistPojo.preclose) + "','" + str(stockHistPojo.average) + "','" + str(stockHistPojo.change) + "','" + str(stockHistPojo.pctchange) + "','" + str(stockHistPojo.volume) + "','" + str(stockHistPojo.highlimit) + "','" + str(stockHistPojo.amount) + "','" + str(stockHistPojo.turn) + "','" + str(stockHistPojo.tradestatus) + "','" + str(stockHistPojo.lowlimit) + "','" + str(stockHistPojo.amplitude) + "','" + str(stockHistPojo.tnum) + "','" + str(stockHistPojo.tafactor) + "','" + str(stockHistPojo.fronttafactor) + "','" + str(stockHistPojo.isststock) + "','" + str(stockHistPojo.isxststock) + "','" + str(stockHistPojo.dateYear) + "','" + str(stockHistPojo.dateMonth) + "','" + str(stockHistPojo.dateWeek) + "'),"
- vlues = vlues[:-1]
- db.batchInsert("data_hist_" + stockPojo['code'].replace(".", "_").lower(), key, vlues)
- break
- if isAdd:
- db.update("update t_stock_base_info set is_hist_add = 1 where id = " + str(stockPojo['id']))
- db.close()
- #退出
- data = logoutResult = c.stop()
- except Exception as ee:
- print("error >>>",ee)
- traceback.print_exc()
- else:
- print("demo end")
|