经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 程序设计 » Python » 查看文章
Python封装数据库连接池详解
来源:jb51  时间:2022/6/20 17:07:42  对本文有异议

前言:

线程安全问题:当2个线程同时用到线程池时,会同时创建2个线程池。如果多个线程,错开用到线程池,就只会创建一个线程池,会共用一个线程池。我用的注解方式的单例模式,感觉就是这个注解的单例方式,解决了多线程问题,但是没解决线程安全问题,需要优化这个单例模式。

主要通过 PooledDB 模块实现。

一、数据库封装

1.1数据库基本配置

db_config.py

  1. # -*- coding: UTF-8 -*-
  2. import pymysql
  3. # 数据库信息
  4. DB_TEST_HOST = "127.0.0.1"
  5. DB_TEST_PORT = 3308
  6. DB_TEST_DBNAME = "bt"
  7. DB_TEST_USER = "root"
  8. DB_TEST_PASSWORD = "123456"
  9. # 数据库连接编码
  10. DB_CHARSET = "utf8"
  11. # mincached : 启动时开启的闲置连接数量(缺省值 0 开始时不创建连接)
  12. DB_MIN_CACHED = 5
  13. # maxcached : 连接池中允许的闲置的最多连接数量(缺省值 0 代表不闲置连接池大小)
  14. DB_MAX_CACHED = 0
  15. # maxshared : 共享连接数允许的最大数量(缺省值 0 代表所有连接都是专用的)如果达到了最大数量,被请求为共享的连接将会被共享使用
  16. DB_MAX_SHARED = 5
  17. # maxconnecyions : 创建连接池的最大数量(缺省值 0 代表不限制)
  18. DB_MAX_CONNECYIONS = 300
  19. # blocking : 设置在连接池达到最大数量时的行为(缺省值 0 或 False 代表返回一个错误<toMany......> 其他代表阻塞直到连接数减少,连接被分配)
  20. DB_BLOCKING = True
  21. # maxusage : 单个连接的最大允许复用次数(缺省值 0 或 False 代表不限制的复用).当达到最大数时,连接会自动重新连接(关闭和重新打开)
  22. DB_MAX_USAGE = 0
  23. # setsession : 一个可选的SQL命令列表用于准备每个会话,如["set datestyle to german", ...]
  24. DB_SET_SESSION = None
  25.  
  26. # creator : 使用连接数据库的模块
  27. DB_CREATOR = pymysql

设置连接池最大最小为5个。则启动连接池时,就会建立5个连接。

1.2 编写单例模式注解

singleton.py

  1. #单例模式函数,用来修饰类
  2. def singleton(cls,*args,**kw):
  3. instances = {}
  4. def _singleton():
  5. if cls not in instances:
  6. instances[cls] = cls(*args,**kw)
  7. return instances[cls]
  8. return _singleton

1.3 构建连接池

db_dbutils_init.py

  1. from dbutils.pooled_db import PooledDB
  2. import db_config as config
  3. # import random
  4.  
  5. from singleton import singleton
  6. """
  7. @功能:创建数据库连接池
  8. """
  9.  
  10. class MyConnectionPool(object):
  11. # 私有属性
  12. # 能通过对象直接访问,但是可以在本类内部访问;
  13. __pool = None
  14. # def __init__(self):
  15. # self.conn = self.__getConn()
  16. # self.cursor = self.conn.cursor()
  17. # 创建数据库连接conn和游标cursor
  18. def __enter__(self):
  19. self.conn = self.__getconn()
  20. self.cursor = self.conn.cursor()
  21. # 创建数据库连接池
  22. def __getconn(self):
  23. if self.__pool is None:
  24. # i = random.randint(1, 100)
  25. # print("创建线程池的数量"+str(i))
  26. self.__pool = PooledDB(
  27. creator=config.DB_CREATOR,
  28. mincached=config.DB_MIN_CACHED,
  29. maxcached=config.DB_MAX_CACHED,
  30. maxshared=config.DB_MAX_SHARED,
  31. maxconnections=config.DB_MAX_CONNECYIONS,
  32. blocking=config.DB_BLOCKING,
  33. maxusage=config.DB_MAX_USAGE,
  34. setsession=config.DB_SET_SESSION,
  35. host=config.DB_TEST_HOST,
  36. port=config.DB_TEST_PORT,
  37. user=config.DB_TEST_USER,
  38. passwd=config.DB_TEST_PASSWORD,
  39. db=config.DB_TEST_DBNAME,
  40. use_unicode=False,
  41. charset=config.DB_CHARSET
  42. )
  43. return self.__pool.connection()
  44. # 释放连接池资源
  45. def __exit__(self, exc_type, exc_val, exc_tb):
  46. self.cursor.close()
  47. self.conn.close()
  48. # 关闭连接归还给链接池
  49. # def close(self):
  50. # self.cursor.close()
  51. # self.conn.close()
  52. # 从连接池中取出一个连接
  53. def getconn(self):
  54. conn = self.__getconn()
  55. cursor = conn.cursor()
  56. return cursor, conn
  57. # 获取连接池,实例化
  58. @singleton
  59. def get_my_connection():
  60. return MyConnectionPool()

1.4 封装Python操作MYSQL的代码

mysqlhelper.py

  1. import time
  2. from db_dbutils_init import get_my_connection
  3. """执行语句查询有结果返回结果没有返回0;增/删/改返回变更数据条数,没有返回0"""
  4. class MySqLHelper(object):
  5. def __init__(self):
  6. self.db = get_my_connection() # 从数据池中获取连接
  7. #
  8. # def __new__(cls, *args, **kwargs):
  9. # if not hasattr(cls, 'inst'): # 单例
  10. # cls.inst = super(MySqLHelper, cls).__new__(cls, *args, **kwargs)
  11. # return cls.inst
  12. # 封装执行命令
  13. def execute(self, sql, param=None, autoclose=False):
  14. """
  15. 【主要判断是否有参数和是否执行完就释放连接】
  16. :param sql: 字符串类型,sql语句
  17. :param param: sql语句中要替换的参数"select %s from tab where id=%s" 其中的%s就是参数
  18. :param autoclose: 是否关闭连接
  19. :return: 返回连接conn和游标cursor
  20. """
  21. cursor, conn = self.db.getconn() # 从连接池获取连接
  22. count = 0
  23. try:
  24. # count : 为改变的数据条数
  25. if param:
  26. count = cursor.execute(sql, param)
  27. else:
  28. count = cursor.execute(sql)
  29. conn.commit()
  30. if autoclose:
  31. self.close(cursor, conn)
  32. except Exception as e:
  33. pass
  34. return cursor, conn, count
  35. # 释放连接
  36. def close(self, cursor, conn):
  37. """释放连接归还给连接池"""
  38. cursor.close()
  39. conn.close()
  40. # 查询所有
  41. def selectall(self, sql, param=None):
  42. cursor = None
  43. conn = None
  44. count = None
  45. try:
  46. cursor, conn, count = self.execute(sql, param)
  47. res = cursor.fetchall()
  48. return res
  49. except Exception as e:
  50. print(e)
  51. self.close(cursor, conn)
  52. return count
  53. # 查询单条
  54. def selectone(self, sql, param=None):
  55. cursor = None
  56. conn = None
  57. count = None
  58. try:
  59. cursor, conn, count = self.execute(sql, param)
  60. res = cursor.fetchone()
  61. self.close(cursor, conn)
  62. return res
  63. except Exception as e:
  64. print("error_msg:", e.args)
  65. self.close(cursor, conn)
  66. return count
  67. # 增加
  68. def insertone(self, sql, param):
  69. cursor = None
  70. conn = None
  71. count = None
  72. try:
  73. cursor, conn, count = self.execute(sql, param)
  74. # _id = cursor.lastrowid() # 获取当前插入数据的主键id,该id应该为自动生成为好
  75. conn.commit()
  76. self.close(cursor, conn)
  77. return count
  78. except Exception as e:
  79. print(e)
  80. conn.rollback()
  81. self.close(cursor, conn)
  82. return count
  83. # 增加多行
  84. def insertmany(self, sql, param):
  85. """
  86. :param sql:
  87. :param param: 必须是元组或列表[(),()]或((),())
  88. :return:
  89. """
  90. cursor, conn, count = self.db.getconn()
  91. try:
  92. cursor.executemany(sql, param)
  93. conn.commit()
  94. return count
  95. except Exception as e:
  96. print(e)
  97. conn.rollback()
  98. self.close(cursor, conn)
  99. return count
  100. # 删除
  101. def delete(self, sql, param=None):
  102. cursor = None
  103. conn = None
  104. count = None
  105. try:
  106. cursor, conn, count = self.execute(sql, param)
  107. self.close(cursor, conn)
  108. return count
  109. except Exception as e:
  110. print(e)
  111. conn.rollback()
  112. self.close(cursor, conn)
  113. return count
  114. # 更新
  115. def update(self, sql, param=None):
  116. cursor = None
  117. conn = None
  118. count = None
  119. try:
  120. cursor, conn, count = self.execute(sql, param)
  121. conn.commit()
  122. self.close(cursor, conn)
  123. return count
  124. except Exception as e:
  125. print(e)
  126. conn.rollback()
  127. self.close(cursor, conn)
  128. return count
  129. # if __name__ == '__main__':
  130. # db = MySqLHelper()
  131. # sql = "SELECT SLEEP(10)"
  132. # db.execute(sql)
  133. # time.sleep(20)
  134. # TODO 查询单条
  135. # sql1 = 'select * from userinfo where name=%s'
  136. # args = 'python'
  137. # ret = db.selectone(sql=sql1, param=args)
  138. # print(ret) # (None, b'python', b'123456', b'0')
  139. # TODO 增加单条
  140. # sql2 = 'insert into hotel_urls(cname,hname,cid,hid,url) values(%s,%s,%s,%s,%s)'
  141. # ret = db.insertone(sql2, ('1', '2', '1', '2', '2'))
  142. # print(ret)
  143. # TODO 增加多条
  144. # sql3 = 'insert into userinfo (name,password) VALUES (%s,%s)'
  145. # li = li = [
  146. # ('分省', '123'),
  147. # ('到达','456')
  148. # ]
  149. # ret = db.insertmany(sql3,li)
  150. # print(ret)
  151. # TODO 删除
  152. # sql4 = 'delete from userinfo WHERE name=%s'
  153. # args = 'xxxx'
  154. # ret = db.delete(sql4, args)
  155. # print(ret)
  156. # TODO 更新
  157. # sql5 = r'update userinfo set password=%s WHERE name LIKE %s'
  158. # args = ('993333993', '%old%')
  159. # ret = db.update(sql5, args)
  160. # print(ret)

二、连接池测试

修改 db_dbutils_init.py 文件,在创建连接池def __getconn(self):方法下,加一个打印随机数,方便将来我们定位是否时单例的线程池。

 修改后的db_dbutils_init.py 文件:

  1. from dbutils.pooled_db import PooledDB
  2. import db_config as config
  3. import random
  4. from singleton import singleton
  5. """
  6. @功能:创建数据库连接池
  7. """
  8. class MyConnectionPool(object):
  9. # 私有属性
  10. # 能通过对象直接访问,但是可以在本类内部访问;
  11. __pool = None
  12. # def __init__(self):
  13. # self.conn = self.__getConn()
  14. # self.cursor = self.conn.cursor()
  15. # 创建数据库连接conn和游标cursor
  16. def __enter__(self):
  17. self.conn = self.__getconn()
  18. self.cursor = self.conn.cursor()
  19. # 创建数据库连接池
  20. def __getconn(self):
  21. if self.__pool is None:
  22. i = random.randint(1, 100)
  23. print("线程池的随机数"+str(i))
  24. self.__pool = PooledDB(
  25. creator=config.DB_CREATOR,
  26. mincached=config.DB_MIN_CACHED,
  27. maxcached=config.DB_MAX_CACHED,
  28. maxshared=config.DB_MAX_SHARED,
  29. maxconnections=config.DB_MAX_CONNECYIONS,
  30. blocking=config.DB_BLOCKING,
  31. maxusage=config.DB_MAX_USAGE,
  32. setsession=config.DB_SET_SESSION,
  33. host=config.DB_TEST_HOST,
  34. port=config.DB_TEST_PORT,
  35. user=config.DB_TEST_USER,
  36. passwd=config.DB_TEST_PASSWORD,
  37. db=config.DB_TEST_DBNAME,
  38. use_unicode=False,
  39. charset=config.DB_CHARSET
  40. )
  41. return self.__pool.connection()
  42. # 释放连接池资源
  43. def __exit__(self, exc_type, exc_val, exc_tb):
  44. self.cursor.close()
  45. self.conn.close()
  46. # 关闭连接归还给链接池
  47. # def close(self):
  48. # self.cursor.close()
  49. # self.conn.close()
  50. # 从连接池中取出一个连接
  51. def getconn(self):
  52. conn = self.__getconn()
  53. cursor = conn.cursor()
  54. return cursor, conn
  55. # 获取连接池,实例化
  56. @singleton
  57. def get_my_connection():
  58. return MyConnectionPool()

开始测试:

场景一:同一个实例,执行2次sql

  1. from mysqlhelper import MySqLHelper
  2. import time
  3. if __name__ == '__main__':
  4. sql = "SELECT SLEEP(10)"
  5. sql1 = "SELECT SLEEP(15)"
  6. db = MySqLHelper()
  7. db.execute(sql)
  8. db.execute(sql1)
  9. time.sleep(20)

在数据库中,使用 show processlist;

  1. show processlist;

当执行第一个sql时。数据库连接显示。

当执行第二个sql时。数据库连接显示:

 当执行完sql,程序sleep时。数据库连接显示:

程序打印结果:

线程池的随机数43

由以上可以得出结论:

线程池启动后,生成了5个连接。执行第一个sql时,使用了1个连接。执行完第一个sql后,使用了另外1个连接。 这是一个线性的,线程池中一共5个连接,但是每次执行,只使用了其中一个。

有个疑问,连接池如果不支持并发是不是就毫无意义?

如上,虽然开了线程池5个连接,但是每次执行sql,只用到了一个连接。那为何不设置线程池大小为1呢?设置线程池大小的意义何在呢?(如果在非并发的场景下,是不是设置大小无意义?)

相比于不用线程池的优点:

如果不用线程池,则每次执行一个sql都要创建、断开连接。 像我们这样使用连接池,不用反复创建、断开连接,拿现成的连接直接用就好了。

场景二:依次创建2个实例,各自执行sql

  1. from mysqlhelper import MySqLHelper
  2. import time
  3. if __name__ == '__main__':
  4. db = MySqLHelper()
  5. db1 = MySqLHelper()
  6. sql = "SELECT SLEEP(10)"
  7. sql1 = "SELECT SLEEP(15)"
  8. db.execute(sql)
  9. db1.execute(sql1)
  10. time.sleep(20)

第一个实例db,执行sql。线程池启动了5个连接

第二个实例db1,执行sql:

 程序睡眠时,一共5个线程池:

 打印结果:

结果证明:

虽然我们依次创建了2个实例,但是(1)创建线程池的打印结果,只打印1次,且从始至终,线程池一共只启动了5个连接,且连接的id没有发生改变,说明一直是这5个连接。

证明,我们虽然创建了2个实例,但是这2个实例其实是一个实例。(单例模式是生效的)

场景三:启动2个线程,但是线程在创建连接池实例时,有时间间隔

  1. import threading
  2. from mysqlhelper import MySqLHelper
  3. import time
  4. def sl1():
  5. time.sleep(2)
  6. db = MySqLHelper()
  7. sql = "SELECT SLEEP(6)"
  8. db.execute(sql)
  9. def sl2():
  10. time.sleep(4)
  11. db = MySqLHelper()
  12. sql = "SELECT SLEEP(15)"
  13. db.execute(sql)
  14. if __name__ == '__main__':
  15. threads = []
  16. t1 = threading.Thread(target=sl1)
  17. threads.append(t1)
  18. t2 = threading.Thread(target=sl2)
  19. threads.append(t2)
  20. for t in threads:
  21. t.setDaemon(True)
  22. t.start()
  23. time.sleep(20)

2个线程间隔了2秒。

观察数据库的连接数量:

打印结果:

在并发执行2个sql时,共用了这5个连接,且打印结果只打印了一次,说明虽然并发创建了2次实例,但真正只创建了一个连接池。

场景四:启动2个线程,线程在创建连接池实例时,没有时间间隔

  1. import threading
  2. from mysqlhelper import MySqLHelper
  3. import time
  4.  
  5. if __name__ == '__main__':
  6. db = MySqLHelper()
  7. sql = "SELECT SLEEP(6)"
  8. sql1 = "SELECT SLEEP(15)"
  9. threads = []
  10. t1 = threading.Thread(target=db.execute, args=(sql,))
  11. threads.append(t1)
  12. t2 = threading.Thread(target=db.execute, args=(sql1,))
  13. threads.append(t2)
  14. for t in threads:
  15. t.setDaemon(True)
  16. t.start()
  17. time.sleep(20)

观察数据库连接 :

 打印结果:

结果表明:

终端打印了2次,数据库建立了10个连接,说明创建了2个线程池。这样的单例模式,存在线程安全问题。

到此这篇关于Python封装数据库连接池详解的文章就介绍到这了,更多相关Python连接池内容请搜索w3xue以前的文章或继续浏览下面的相关文章希望大家以后多多支持w3xue!

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号