使用连接池应对高并发场景

以SQLite数据库为例,介绍如何使用连接池来提升数据操作的效率。

自行编写连接池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import sqlite3
from queue import Queue

class SQLiteConnectionPool:
"""
SQLite 连接池类,用于管理数据库连接,避免频繁创建和关闭连接的开销。
"""
def __init__(self, db_path, max_connections=10):
"""
初始化连接池。

Args:
db_path (str): 数据库文件路径。
max_connections (int): 连接池允许的最大连接数,默认为 10。
"""
self.db_path = db_path # 数据库文件路径
self.max_connections = max_connections # 最大连接数
self.free_connections = Queue(maxsize=max_connections) # 存储空闲连接的队列

def get_connection(self):
"""
从连接池获取一个数据库连接。

Returns:
sqlite3.Connection: 一个 SQLite 数据库连接对象。
"""
if self.free_connections.empty(): # 如果队列为空,则创建新的连接
conn = sqlite3.connect(self.db_path)
else: # 否则从队列中获取一个空闲连接
conn = self.free_connections.get()
return conn

def put_connection(self, conn):
"""
将一个数据库连接放回连接池。

Args:
conn (sqlite3.Connection): 要放回的数据库连接对象。
"""
self.free_connections.put(conn) # 将连接放回队列

pool = SQLiteConnectionPool('yoursqlite.db')

def concurrent_access(pool):
conn = pool.get_connection()
cur = conn.cursor()
# 参数化查询
cur.execute("select * from sqlite_master where type = 'table';")
rows = cur.fetchall()
pool.put_connection(conn)
return rows

print(concurrent_access(pool))

函数 concurrent_access 可以在高频场景下调用,本质上,连接池相当于一个全局变量。

使用sqlalchemy创建连接池

1
2
3
4
5
6
7
8
9
10
from sqlalchemy import create_engine,text

engine = create_engine('sqlite:///yoursqlite.db', pool_size=10)
def concurrent_access():
with engine.connect() as conn:
result = conn.execute(text("select * from sqlite_master where type = 'table';"))
for row in result:
print(row)

concurrent_access()

函数 concurrent_access 可以在高频场景下调用,with 语句会自动处理连接的回收事宜。