MySQLConnection.py 6.8 KB


  1. #!/usr/bin/env python
  2. # -*- encoding: utf-8 -*-
  3. #!@File : database.py
  4. #!@Time : 2019/9/23 21:57
  5. #!@Author : xiaoyumin
  6. #!@Version : 1.0
  7. #!@Contact : xiaoymin@foxmail.com
  8. #!@License : Copyright (C) 2018 Zhejiang xiaominfo Technology CO.,LTD.
  9. #!@Desc : 数据库连接池相关
  10. import pymysql
  11. # from DBUtils.PooledDB import PoolpipedDB
  12. from dbutils.pooled_db import PooledDB
  13. import logging
  14. import configparser
  15. # 读取数据库配置信息
  16. config=configparser.ConfigParser()
  17. config.read('db.conf',encoding='UTF-8')
  18. sections=config.sections()
  19. # 数据库工厂
  20. dbFactory={}
  21. for dbName in sections:
  22. # 读取相关属性
  23. maxconnections=config.get(dbName,"maxconnections")
  24. mincached=config.get(dbName,"mincached")
  25. maxcached=config.get(dbName,"maxcached")
  26. host=config.get(dbName,"host")
  27. port=config.get(dbName,"port")
  28. user=config.get(dbName,"user")
  29. password=config.get(dbName,"password")
  30. database=config.get(dbName,"database")
  31. databasePooled=PooledDB(creator=pymysql,
  32. maxconnections=int(maxconnections),
  33. mincached=int(mincached),
  34. maxcached=int(maxcached),
  35. blocking=True,
  36. cursorclass = pymysql.cursors.DictCursor,
  37. host=host,
  38. port=int(port),
  39. user=user,
  40. password=password,
  41. database=database)
  42. dbFactory[dbName]=databasePooled
  43. class MySQLConnection(object):
  44. """
  45. 数据库连接池代理对象
  46. 查询参数主要有两种类型
  47. 第一种:传入元祖类型,例如(12,13),这种方式主要是替代SQL语句中的%s展位符号
  48. 第二种: 传入字典类型,例如{"id":13},此时我们的SQL语句需要使用键来代替展位符,例如:%(name)s
  49. """
  50. def __init__(self,dbName="master"):
  51. self.connect = dbFactory[dbName].connection()
  52. self.cursor = self.connect.cursor()
  53. logging.debug("获取数据库连接对象成功,连接池对象:{}".format(str(self.connect)))
  54. def execute(self,sql,param=None):
  55. """
  56. 基础更新、插入、删除操作
  57. :param sql:
  58. :param param:
  59. :return: 受影响的行数
  60. """
  61. ret=None
  62. try:
  63. if param==None:
  64. ret=self.cursor.execute(sql)
  65. else:
  66. ret=self.cursor.execute(sql,param)
  67. except TypeError as te:
  68. logging.debug("类型错误")
  69. logging.exception(te)
  70. return ret
  71. def query(self,sql,param=None):
  72. """
  73. 查询数据库
  74. :param sql: 查询SQL语句
  75. :param param: 参数
  76. :return: 返回集合
  77. """
  78. self.cursor.execute(sql,param)
  79. result=self.cursor.fetchall()
  80. return result
  81. def queryOne(self,sql,param=None):
  82. """
  83. 查询数据返回第一条
  84. :param sql: 查询SQL语句
  85. :param param: 参数
  86. :return: 返回第一条数据的字典
  87. """
  88. result=self.query(sql,param)
  89. if result:
  90. return result[0]
  91. else:
  92. return None
  93. def listByPage(self,sql,current_page,page_size,param=None):
  94. """
  95. 分页查询当前表格数据
  96. :param sql: 查询SQL语句
  97. :param current_page: 当前页码
  98. :param page_size: 页码大小
  99. :param param:参数
  100. :return:
  101. """
  102. countSQL="select count(*) ct from ("+sql+") tmp "
  103. logging.debug("统计SQL:{}".format(sql))
  104. countNum=self.count(countSQL,param)
  105. offset=(current_page-1)*page_size
  106. totalPage=int(countNum/page_size)
  107. if countNum % page_size>0:
  108. totalPage = totalPage + 1
  109. pagination={"current_page":current_page,"page_size":page_size,"count":countNum,"total_page":totalPage}
  110. querySql="select * from ("+sql+") tmp limit %s,%s"
  111. logging.debug("查询SQL:{}".format(querySql))
  112. # 判断是否有参数
  113. if param==None:
  114. # 无参数
  115. pagination["data"]=self.query(querySql,(offset,page_size))
  116. else:
  117. # 有参数的情况,此时需要判断参数是元祖还是字典
  118. if isinstance(param,dict):
  119. # 字典的情况,因此需要添加字典
  120. querySql="select * from ("+sql+") tmp limit %(tmp_offset)s,%(tmp_pageSize)s"
  121. param["tmp_offset"]=offset
  122. param["tmp_pageSize"]=page_size
  123. pagination["data"]=self.query(querySql,param)
  124. elif isinstance(param,tuple):
  125. # 元祖的方式
  126. listtp=list(param)
  127. listtp.append(offset)
  128. listtp.append(page_size)
  129. pagination["data"]=self.query(querySql,tuple(listtp))
  130. else:
  131. # 基础类型
  132. listtp=[]
  133. listtp.append(param)
  134. listtp.append(offset)
  135. listtp.append(page_size)
  136. pagination["data"]=self.query(querySql,tuple(listtp))
  137. return pagination
  138. def count(self,sql,param=None):
  139. """
  140. 统计当前表记录行数
  141. :param sql: 统计SQL语句
  142. :param param: 参数
  143. :return: 当前记录行
  144. """
  145. ret=self.queryOne(sql,param)
  146. count=None
  147. if ret:
  148. for k,v in ret.items():
  149. count=v
  150. return count
  151. def insert(self,sql,param=None):
  152. """
  153. 数据库插入
  154. :param sql: SQL语句
  155. :param param: 参数
  156. :return: 受影响的行数
  157. """
  158. return self.execute(sql,param)
  159. def update(self,sql,param=None):
  160. """
  161. 更新操作
  162. :param sql: SQL语句
  163. :param param: 参数
  164. :return: 受影响的行数
  165. """
  166. return self.execute(sql,param)
  167. def delete(self,sql,param=None):
  168. """
  169. 删除操作
  170. :param sql: 删除SQL语句
  171. :param param: 参数
  172. :return: 受影响的行数
  173. """
  174. return self.execute(sql,param)
  175. def batch(self,sql,param=None):
  176. """
  177. 批量插入
  178. :param sql: 插入SQL语句
  179. :param param: 参数
  180. :return: 受影响的行数
  181. """
  182. return self.cursor.executemany(sql,param)
  183. def commit(self,param=None):
  184. """
  185. 提交数据库
  186. :param param:
  187. :return:
  188. """
  189. if param==None:
  190. self.connect.commit()
  191. else:
  192. self.connect.rollback()
  193. def close(self):
  194. """
  195. 关闭数据库连接
  196. :return:
  197. """
  198. if self.cursor:
  199. self.cursor.close()
  200. if self.connect:
  201. self.connect.close()
  202. logging.debug("释放数据库连接")
  203. return None