defconcurrent_access(pool): conn = pool.get_connection() cur = conn.cursor() # 参数化查询 cur.execute("select * from sqlite_master where type = 'table';") rows = cur.fetchall() pool.put_connection(conn) return rows
print(concurrent_access(pool))
函数 concurrent_access 可以在高频场景下调用,本质上,连接池相当于一个全局变量。
使用sqlalchemy创建连接池
1 2 3 4 5 6 7 8 9 10
from sqlalchemy import create_engine,text
engine = create_engine('sqlite:///yoursqlite.db', pool_size=10) defconcurrent_access(): with engine.connect() as conn: result = conn.execute(text("select * from sqlite_master where type = 'table';")) for row in result: print(row)