Browse Source

修改抓数据逻辑

z 4 years ago
parent
commit
87fe5df0ad

+ 4 - 1
hist/BondHistData.py

@@ -20,7 +20,7 @@ class BondHistData:
     def toGet(self):
         try:
             db = dbOperation(db_config.db_zhaiquan)
-            stockPojoList = db.query_list("select id, code, name, list_date listDate from t_stock_base_info where is_hist_add = 0 and list_date != '待上市'")
+            stockPojoList = db.query_list("select id, code, name, list_date listDate from t_stock_base_info where list_date != '待上市'")
             getDataTime = time.strftime('%Y-%m-%d',time.localtime(time.time()))
             for stockPojo in stockPojoList:
                 if stockPojo['listDate'] > getDataTime:
@@ -31,6 +31,9 @@ class BondHistData:
                 startDate = str(stockPojo['listDate'])
                 if dates is not None :
                     startDate = str(dates['dates'].replace("/", "-"))
+                    dd = datetime.datetime.strptime(startDate, "%Y-%m-%d")
+                    dd = (dd + datetime.timedelta(days=1)).strftime("%Y-%m-%d")
+                    startDate = dd
                 data = c.csd(
                     str(stockPojo['code']), 
                     "OPEN,CLOSE,HIGH,LOW,PRECLOSE,AVERAGE,CHANGE,PCTCHANGE,VOLUME,AMOUNT", 

+ 1 - 1
Manualactivate-all.py

@@ -27,6 +27,6 @@ elif data1 == 'Darwin':
 else:
     pass
 
-data = c.manualactivate("dfcf4198", "an073937", "email=617590286@qq.com")
+data = c.manualactivate("dfcf4198", "an073937", "email=@qq.com")
 if data.ErrorCode != 0:
     print ("manualactivate failed, ", data.ErrorMsg)

+ 208 - 0
MySQLConnection.py

@@ -0,0 +1,208 @@
+#!/usr/bin/env python
+# -*- encoding: utf-8 -*-
+#!@File    :   database.py
+#!@Time    :   2019/9/23 21:57
+#!@Author  :   xiaoyumin
+#!@Version :   1.0
+#!@Contact :   xiaoymin@foxmail.com
+#!@License :   Copyright (C) 2018 Zhejiang xiaominfo Technology CO.,LTD.
+#!@Desc    :   数据库连接池相关
+import pymysql
+# from DBUtils.PooledDB import PoolpipedDB
+from dbutils.pooled_db import PooledDB
+import logging
+import configparser
+
+# 读取数据库配置信息
+config=configparser.ConfigParser()
+config.read('db.conf',encoding='UTF-8')
+sections=config.sections()
+# 数据库工厂
+dbFactory={}
+for dbName in sections:
+    # 读取相关属性
+    maxconnections=config.get(dbName,"maxconnections")
+    mincached=config.get(dbName,"mincached")
+    maxcached=config.get(dbName,"maxcached")
+    host=config.get(dbName,"host")
+    port=config.get(dbName,"port")
+    user=config.get(dbName,"user")
+    password=config.get(dbName,"password")
+    database=config.get(dbName,"database")
+    databasePooled=PooledDB(creator=pymysql,
+                        maxconnections=int(maxconnections),
+                        mincached=int(mincached),
+                        maxcached=int(maxcached),
+                        blocking=True,
+                        cursorclass = pymysql.cursors.DictCursor,
+                        host=host,
+                        port=int(port),
+                        user=user,
+                        password=password,
+                        database=database)
+    dbFactory[dbName]=databasePooled
+
+class MySQLConnection(object):
+    """
+    数据库连接池代理对象
+    查询参数主要有两种类型
+    第一种:传入元祖类型,例如(12,13),这种方式主要是替代SQL语句中的%s展位符号
+    第二种: 传入字典类型,例如{"id":13},此时我们的SQL语句需要使用键来代替展位符,例如:%(name)s
+    """
+    def __init__(self,dbName="master"):
+        self.connect = dbFactory[dbName].connection()
+        self.cursor = self.connect.cursor()
+        logging.debug("获取数据库连接对象成功,连接池对象:{}".format(str(self.connect)))
+
+    def execute(self,sql,param=None):
+        """
+        基础更新、插入、删除操作
+        :param sql:
+        :param param:
+        :return: 受影响的行数
+        """
+        ret=None
+        try:
+            if param==None:
+                ret=self.cursor.execute(sql)
+            else:
+                ret=self.cursor.execute(sql,param)
+        except TypeError as te:
+            logging.debug("类型错误")
+            logging.exception(te)
+        return ret
+    def query(self,sql,param=None):
+        """
+        查询数据库
+        :param sql: 查询SQL语句
+        :param param: 参数
+        :return: 返回集合
+        """
+        self.cursor.execute(sql,param)
+        result=self.cursor.fetchall()
+        return result
+    def queryOne(self,sql,param=None):
+        """
+        查询数据返回第一条
+        :param sql: 查询SQL语句
+        :param param: 参数
+        :return: 返回第一条数据的字典
+        """
+        result=self.query(sql,param)
+        if result:
+            return result[0]
+        else:
+            return None
+    def listByPage(self,sql,current_page,page_size,param=None):
+        """
+        分页查询当前表格数据
+        :param sql: 查询SQL语句
+        :param current_page: 当前页码
+        :param page_size: 页码大小
+        :param param:参数
+        :return:
+        """
+        countSQL="select count(*) ct from ("+sql+") tmp "
+        logging.debug("统计SQL:{}".format(sql))
+        countNum=self.count(countSQL,param)
+        offset=(current_page-1)*page_size
+        totalPage=int(countNum/page_size)
+        if countNum % page_size>0:
+            totalPage = totalPage + 1
+        pagination={"current_page":current_page,"page_size":page_size,"count":countNum,"total_page":totalPage}
+        querySql="select * from ("+sql+") tmp limit %s,%s"
+        logging.debug("查询SQL:{}".format(querySql))
+        # 判断是否有参数
+        if param==None:
+            # 无参数
+            pagination["data"]=self.query(querySql,(offset,page_size))
+        else:
+            # 有参数的情况,此时需要判断参数是元祖还是字典
+            if isinstance(param,dict):
+                # 字典的情况,因此需要添加字典
+                querySql="select * from ("+sql+") tmp limit %(tmp_offset)s,%(tmp_pageSize)s"
+                param["tmp_offset"]=offset
+                param["tmp_pageSize"]=page_size
+                pagination["data"]=self.query(querySql,param)
+            elif isinstance(param,tuple):
+                # 元祖的方式
+                listtp=list(param)
+                listtp.append(offset)
+                listtp.append(page_size)
+                pagination["data"]=self.query(querySql,tuple(listtp))
+            else:
+                # 基础类型
+                listtp=[]
+                listtp.append(param)
+                listtp.append(offset)
+                listtp.append(page_size)
+                pagination["data"]=self.query(querySql,tuple(listtp))
+        return pagination
+    def count(self,sql,param=None):
+        """
+        统计当前表记录行数
+        :param sql: 统计SQL语句
+        :param param: 参数
+        :return: 当前记录行
+        """
+        ret=self.queryOne(sql,param)
+        count=None
+        if ret:
+            for k,v in ret.items():
+                count=v
+        return count
+
+    def insert(self,sql,param=None):
+        """
+        数据库插入
+        :param sql: SQL语句
+        :param param: 参数
+        :return: 受影响的行数
+        """
+        return self.execute(sql,param)
+    def update(self,sql,param=None):
+        """
+        更新操作
+        :param sql: SQL语句
+        :param param: 参数
+        :return: 受影响的行数
+        """
+        return self.execute(sql,param)
+    def delete(self,sql,param=None):
+        """
+        删除操作
+        :param sql: 删除SQL语句
+        :param param: 参数
+        :return: 受影响的行数
+        """
+        return self.execute(sql,param)
+    def batch(self,sql,param=None):
+        """
+        批量插入
+        :param sql: 插入SQL语句
+        :param param: 参数
+        :return: 受影响的行数
+        """
+        return self.cursor.executemany(sql,param)
+    def commit(self,param=None):
+        """
+        提交数据库
+        :param param:
+        :return:
+        """
+        if param==None:
+            self.connect.commit()
+        else:
+            self.connect.rollback()
+
+    def close(self):
+        """
+        关闭数据库连接
+        :return:
+        """
+        if self.cursor:
+            self.cursor.close()
+        if self.connect:
+            self.connect.close()
+        logging.debug("释放数据库连接")
+        return None

+ 3 - 0
hist/SotckHistData.py

@@ -31,6 +31,9 @@ class SotckHistData:
                 startDate = str(stockPojo['listDate'])
                 if dates is not None :
                     startDate = str(dates['dates'].replace("/", "-"))
+                    dd = datetime.datetime.strptime(startDate, "%Y-%m-%d")
+                    dd = (dd + datetime.timedelta(days=1)).strftime("%Y-%m-%d")
+                    startDate = dd
                 data = c.csd(
                     str(stockPojo['code']), 
                     "OPEN,CLOSE,HIGH,LOW,PRECLOSE,AVERAGE,CHANGE,PCTCHANGE,VOLUME,HIGHLIMIT,AMOUNT,TURN,TRADESTATUS,LOWLIMIT,AMPLITUDE,TNUM,TAFACTOR,FRONTTAFACTOR,ISSTSTOCK,ISXSTSTOCK", 

File diff suppressed because it is too large
+ 7 - 90
StockRestData.py


BIN
__pycache__/BondHistData.cpython-38.pyc


BIN
__pycache__/BondRestData.cpython-38.pyc


BIN
__pycache__/SotckHistData.cpython-38.pyc


BIN
__pycache__/StockRestData.cpython-38.pyc


BIN
__pycache__/dbOperationStock.cpython-38.pyc


BIN
__pycache__/db_config.cpython-38.pyc


File diff suppressed because it is too large
+ 9 - 88
bondRestData.py


File diff suppressed because it is too large
+ 0 - 143
clacRestGroupData.py


+ 35 - 0
db.conf

@@ -0,0 +1,35 @@
+[master]
+# 数据库连接主机
+host=hk-cdb-mw8z8p47.sql.tencentcdb.com
+# 数据库端口号
+port=63791
+# 用户名
+user=root
+# 密码
+password=TestBicon@123
+# 数据库名称
+database=stock_data
+# 数据库连接池最大连接数
+maxconnections=20
+# 数据库连接池最小缓存数
+mincached=5
+# 数据库连接池最大缓存数
+maxcached=10
+
+[stock]
+# 数据库连接主机
+host=47.96.39.5
+# 数据库端口号
+port=3306
+# 用户名
+user=root
+# 密码
+password=root
+# 数据库名称
+database=stock2chen
+# 数据库连接池最大连接数
+maxconnections=20
+# 数据库连接池最小缓存数
+mincached=5
+# 数据库连接池最大缓存数
+maxcached=10

File diff suppressed because it is too large
+ 172 - 0
getCmc.py


+ 7 - 14
getCst.py

@@ -15,14 +15,7 @@ import db_config
 
 print("开始了")
 def mainCallback(quantdata):
-    """
-    mainCallback 是主回调函数,可捕捉如下错误
-    在start函数第三个参数位传入,该函数只有一个为c.EmQuantData类型的参数quantdata
-    :param quantdata:c.EmQuantData
-    :return:
-    """
     print ("mainCallback",str(quantdata))
-    print("开始了2222")
     #登录掉线或者 登陆数达到上线(即登录被踢下线) 这时所有的服务都会停止
     if str(quantdata.ErrorCode) == "10001011" or str(quantdata.ErrorCode) == "10001009":
         print ("Your account is disconnect. You can force login automatically here if you need.")
@@ -62,7 +55,7 @@ def cstCallBack(quantdata):
             if "None" == str(result[2][j]):
                 continue
             stockRestPojo = StockRestPojo()
-            stockRestPojo.date = "20201105"
+            stockRestPojo.date = "20201106"
             stockRestPojo.time = str(result[0][j])
             stockRestPojo.now = str(result[1][j])
             stockRestPojo.high = str(result[2][j])
@@ -70,8 +63,8 @@ def cstCallBack(quantdata):
             stockRestPojo.open = str(result[4][j])
             stockRestPojo.preclose = str(result[5][j])
             stockRestPojo.roundlot = ""
-            stockRestPojo.change = str(value[8])
-            stockRestPojo.pctchange = str(value[9])
+            stockRestPojo.change = ""
+            stockRestPojo.pctchange = ""
             stockRestPojo.volume = str(result[6][j])
             stockRestPojo.amount = str(result[7][j])
             stockRestPojo.volumeratio = ""
@@ -110,10 +103,10 @@ def cstCallBack(quantdata):
 
             datTime = ""
             if len(str(result[0][j])) != 6:
-                datTime = time.mktime(time.strptime("202011050" + str(result[0][j]), "%Y%m%d%H%M%S"))
+                datTime = time.mktime(time.strptime("202011060" + str(result[0][j]), "%Y%m%d%H%M%S"))
                 stockRestPojo.realTime = str(int(datTime))
             else:
-                datTime = time.mktime(time.strptime("20201105" + str(result[0][j]), "%Y%m%d%H%M%S"))
+                datTime = time.mktime(time.strptime("20201106" + str(result[0][j]), "%Y%m%d%H%M%S"))
                 stockRestPojo.realTime = str(int(datTime))
 
             min_ = time.localtime(datTime).tm_min # 获取分钟
@@ -190,10 +183,10 @@ try:
     stockPojoList = db.query_list("select id, code, name, list_date listDate, seralid from t_stock_base_info where 1 = 1 order by id asc")
     # cst使用范例
     for stockPojo in stockPojoList:
-        sql = "SELECT count(1) count FROM data_rt_" + stockPojo['code'].replace(".", "_").lower() + " WHERE `date` = '20201105'"
+        sql = "SELECT count(1) count FROM data_rt_" + stockPojo['code'].replace(".", "_").lower() + " WHERE `date` = '20201106'"
         count = db.query_one(sql)
         if int(count['count']) <= 4000:
-            sql1 = "delete from data_rt_" + stockPojo['code'].replace(".", "_").lower() + " where date = '20201105'"
+            sql1 = "delete from data_rt_" + stockPojo['code'].replace(".", "_").lower() + " where date = '20201106'"
             db.dele(sql1)
             data = c.cst(stockPojo['code'], 'Time,Now,High,Low,Open,PreClose,Volume,Amount,HighLimit,LowLimit,BuyPrice1,BuyPrice2,BuyPrice3,BuyPrice4,BuyPrice5,BuyVolume1,BuyVolume2,BuyVolume3,BuyVolume4,BuyVolume5,SellPrice1,SellPrice2,SellPrice3,SellPrice4,SellPrice5,SellVolume1,SellVolume2,SellVolume3,SellVolume4,SellVolume5,ClosedTime,ClosedVolume,ClosedAmount', '093001', '150000','',cstCallBack)
     db.close()

File diff suppressed because it is too large
+ 200 - 0
getCstZhaiQuan.py


+ 61 - 16
getLimitUpNewsData.py

@@ -6,16 +6,14 @@ from EmQuantAPI import *
 from datetime import timedelta, datetime
 import time as _time
 import traceback
+import requests
+import re
+from dbOperationStock import dbOperationStock
+import db_config
+
 print("开始了")
 def mainCallback(quantdata):
-    """
-    mainCallback 是主回调函数,可捕捉如下错误
-    在start函数第三个参数位传入,该函数只有一个为c.EmQuantData类型的参数quantdata
-    :param quantdata:c.EmQuantData
-    :return:
-    """
     print ("mainCallback",str(quantdata))
-    print("开始了2222")
     #登录掉线或者 登陆数达到上线(即登录被踢下线) 这时所有的服务都会停止
     if str(quantdata.ErrorCode) == "10001011" or str(quantdata.ErrorCode) == "10001009":
         print ("Your account is disconnect. You can force login automatically here if you need.")
@@ -41,12 +39,29 @@ def mainCallback(quantdata):
         pass
 
 def cnqCallback(quantdata):
-    print ("cnqCallback,", str(quantdata))
-    print("cnqCallback,")
-    for code in quantdata.Data:
-        total = len(quantdata.Data[code])
-        for k in range(0, len(quantdata.Data[code])):
-            print(quantdata.Data[code][k])
+    db = dbOperationStock(db_config.db_stock)
+    for code in data.Data:
+        # total = len(data.Data[code])
+        for k in range(0, len(data.Data[code])):
+            # print(data.Data[code][k])
+            # print(data.Data[code][k][7])
+            se = requests.session()
+            url = "http://app.jg.eastmoney.com/NewsData/GetNewsText.do?cid=Admin&cid=Admin&id=" + data.Data[code][k][5]
+            dfNews = se.post(url).text.replace("'", '"').replace('/ ', '/')
+            dfNew = eval(dfNews)
+            # print(dfNew)
+            content = str(dfNew['text']).replace("<a","<span").replace("/a>","/span>")
+            pattern = 'https://z1.*?\.jpg'
+            p2 = re.findall(pattern, str(content))
+            # print(p2)
+            imgUrl = "http://47.96.39.5:8080/public/default.png"
+            if(0 != len(p2)) :
+                imgUrl = p2[0]
+            sql = "insert into `news` (`title`, `cover`, `content`, `source`, `hot`, `publish_time`, `create_time`, `update_time`) values('" + str(dfNew['Title']) + "', '" + str(imgUrl) + "', '" + str(content) +"', '" + str(dfNew['medianame']) + "',0,'" + str(dfNew['Date']) + "',now(),now());"
+            # print(sql)
+            db.insert(sql)
+    db.close()
+
 
 try:
     #调用登录函数(激活后使用,不需要用户名密码)
@@ -54,14 +69,44 @@ try:
     if(loginResult.ErrorCode != 0):
         print("login in fail")
         exit()
+    # 资讯查询使用范例
+    data = c.cfn("S888005002API", "sectornews", eCfnMode_EndCount, "starttime=20201106010000,endtime=20201107,count=10")
+    print("cfn输出结果======分隔线======")
+    if (not isinstance(data, c.EmQuantData)):
+        print (data)
+    else:
+        if (data.ErrorCode != 0):
+            print("request cfn Error, ", data.ErrorMsg)
+        else:
+            db = dbOperationStock(db_config.db_stock)
+            for code in data.Data:
+                # total = len(data.Data[code])
+                for k in range(0, len(data.Data[code])):
+                    # print(data.Data[code][k])
+                    # print(data.Data[code][k][7])
+                    se = requests.session()
+                    url = "http://app.jg.eastmoney.com/NewsData/GetNewsText.do?cid=Admin&cid=Admin&id=" + data.Data[code][k][5]
+                    dfNews = se.post(url).text.replace("'", '"').replace('/ ', '/')
+                    dfNew = eval(dfNews)
+                    # print(dfNew)
+                    content = str(dfNew['text']).replace("<a","<span").replace("/a>","/span>")
+                    pattern = 'https://z1.*?\.jpg'
+                    p2 = re.findall(pattern, str(content))
+                    # print(p2)
+                    imgUrl = "http://47.96.39.5:8080/public/default.png"
+                    if(0 != len(p2)) :
+                        imgUrl = p2[0]
+                    sql = "insert into `news` (`title`, `cover`, `content`, `source`, `hot`, `publish_time`, `create_time`, `update_time`) values('" + str(dfNew['Title']) + "', '" + str(imgUrl) + "', '" + str(content) +"', '" + str(dfNew['medianame']) + "',0,'" + str(dfNew['Date']) + "',now(),now());"
+                    # print(sql)
+                    db.insert(sql)
+            db.close()
 
-    # 资讯订阅使用范例 0072255 昨日涨停
-    data = c.cnq("0072255", "sectornews", "", cnqCallback)
+    # # 资讯订阅使用范例 S888005009API 昨日涨停
+    data = c.cnq("S888005002API","sectornews","",cnqCallback)
     if data.ErrorCode != 0:
         print("request cnq Error, ", data.ErrorMsg)
     else:
         print("cnq输出结果======分隔线======")
-        _time.sleep(60)
         text = input("press any key to cancel cnq \r\n")
         # 取消订阅
         data = c.cnqcancel(data.SerialID)

+ 12 - 12
startGetHistData.py

@@ -3,8 +3,8 @@ import asyncio
 from EmQuantAPI import *
 import traceback
 from SotckHistData import SotckHistData
-from StockHkHistData import StockHkHistData
-from StockUsaHistData import StockUsaHistData
+# from StockHkHistData import StockHkHistData
+# from StockUsaHistData import StockUsaHistData
 from BondHistData import BondHistData
 
 def mainCallback(quantdata):
@@ -38,15 +38,15 @@ async def getStockHist():
     sotckHistData = SotckHistData()
     sotckHistData.toGet()
     
-# 获取港股历史数据
-async def getStockHkHist():
-    stockHkHistData = StockHkHistData()
-    stockHkHistData.toGet()
+# # 获取港股历史数据
+# async def getStockHkHist():
+#     stockHkHistData = StockHkHistData()
+#     stockHkHistData.toGet()
 
-# 获取美股历史数据
-async def getStockUsaHist():
-    stockUsaHistData = StockUsaHistData()
-    stockUsaHistData.toGet()
+# # 获取美股历史数据
+# async def getStockUsaHist():
+#     stockUsaHistData = StockUsaHistData()
+#     stockUsaHistData.toGet()
 
 # 获取债券历史数据
 async def getBondHist():
@@ -60,8 +60,8 @@ async def getIndexHist():
 async def main():
     await asyncio.gather(
         getStockHist(),
-        getStockHkHist(),
-        getStockUsaHist(),
+        # getStockHkHist(),
+        # getStockUsaHist(),
         getBondHist(),
     )
 

+ 22 - 8
startGetRestData.py

@@ -4,6 +4,12 @@ import asyncio
 from EmQuantAPI import *
 import traceback
 from StockRestData import StockRestData
+from BondRestData import BondRestData
+
+def async_call(fn):
+    def wrapper(*args, **kwargs):
+        Thread(target=fn, args=args, kwargs=kwargs).start()
+    return wrapper
 
 def mainCallback(quantdata):
     print ("mainCallback",str(quantdata))
@@ -34,13 +40,14 @@ def mainCallback(quantdata):
 # 获取股票实时数据
 async def getStockRest():
     stockRestData = StockRestData()
+    print("开始股票")
     stockRestData.toGet()
     
-
-async def main():
-    await asyncio.gather(
-        getStockRest(),
-    )
+# 获取债券实时数据
+async def getBondRest():
+    bondRestData = BondRestData()
+    print("开始债券")
+    bondRestData.toGet()
 
 # 运行了就不会停了
 try:
@@ -49,8 +56,16 @@ try:
     if(loginResult.ErrorCode != 0):
         print("login in fail")
         exit()
-    asyncio.run(main())
-
+    # data = c.csqcancel(0)
+    loop = asyncio.events.get_event_loop()
+    tasks= [
+        getStockRest(),
+        getBondRest()
+    ]
+    loop.run_until_complete((asyncio.wait(tasks)))
+    loop.close()
+    while True:
+        sleep(1 * 60 * 60)
 except Exception as ee:
     print("error >>>",ee)
     #退出
@@ -58,4 +73,3 @@ except Exception as ee:
     traceback.print_exc()
 else:
     print("demo end")
-