python标准库之队列

在python标准库中的queue模块实现了面向多生产线程、多消费线程的队列。

一、队列的三种形态

1、FIFO队列(先进先出)

1
2
3
4
5
6
7
8
9
import queue

q = queue.Queue()

for i in range(5):
q.put(i)

while not q.empty():
print(q.get())

2、LIFO队列(后进先出)

1
2
3
4
5
6
7
8
9
import queue

q = queue.LifoQueue()

for i in range(5):
q.put(i)

while not q.empty():
print(q.get())

3、Priority队列(优先级队列)

1
2
3
4
5
6
7
8
9
10
11
12
import queue

q = queue.PriorityQueue()

# 格式:q.put((数字,值))
# 特点:数字越小,优先级越高
q.put((1,'a'))
q.put((-1,'b'))
q.put((10,'c'))

while not q.empty():
print(q.get())

二、队列常用的方法

1、基础用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import queue

q = queue.Queue(maxsize=5)

for i in range(16):
while q.full(): # 判断队列满了
print(q.get()) # 取一条任务
q.put(i) # 队列为满时等待
# q.put(i, block=True) # 队列为满且 block 为 True, put()方法就使调用线程暂停, 直到空出一个数据单元
# q.put(i, block=False) # 队列为满且 block 为 False, 队列将引发 full 异常
# q.put_nowait(i) # 相当于q.put(i, block=False), 队列为满时抛异常

print('quene size:',q.qsize()) # 返回队列的大小

print('-----------')

for i in range(10):
while not q.empty(): # 判断对列不为空
print(q.get()) # 如果队列为空且 block 为 True, get()就使调用线程暂停, 直至有任务可取
# print(q.get(block=False)) # 如果队列为空且 block 为 False, 队列将引发 Empty 异常
# print(q.get_nowait()) # 相当于q.get(block=False), 队列为空时抛异常

print('quene size:',q.qsize()) # 返回队列的大小

print('-----------')

for i in range(5):
q.put(i)

print(q.queue.clear()) # 清空队列
print('quene size:',q.qsize()) # 返回队列的大小

2、task_done()+join()

所有数据被get并不会解除 join 阻塞,只有全部数据都被处理完(task_done调用次数等于get次数)了才会 解除 join 阻塞。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
from threading import Thread
import time
import random
from queue import Queue

# 创建队列,设置队列最大数限制为3个
queue = Queue(3)

# 生产者线程
class Pro_Thread(Thread):
def run(self):
global queue
# 原材料准备,等待被生产
tasks = [1, 2, 3, 4, 5, 6, 7, 8]
while True:
try:
# 从原材料右边开始生产
task = tasks.pop()
queue.put(task)
print("生产", task, "现在队列数:", queue.qsize())
# 休眠随机时间
time.sleep(random.random())
# 如果原材料被生产完,生产线程跳出循环
except IndexError:
break
print("原材料已被生产完毕")

# 消费者线程
class Con_Thread(Thread):
def run(self):
global queue
while True:
if not queue.empty():
# 通过get(), 这里已经将队列减去了1
task = queue.get()
time.sleep(2)
# 发出完成的信号,不发的话,join会永远阻塞,程序不会停止
queue.task_done()
print("消费", task)
else:
break
print("消费完毕")

# 入口方法,主线程
def main():
Pro_1 = Pro_Thread()
# 启动线程
Pro_1.start()
# 这里休眠一秒钟,等到队列有值,否则队列创建时是空的,主线程直接就结束了,实验失败,造成误导
time.sleep(1)
for i in range(2):
Con_i = Con_Thread()
# 启动线程
Con_i.start()
global queue
# 接收信号,主线程在这里等待队列被处理完毕后再做下一步
queue.join()
# 给个标示,表示主线程已经结束
print("主线程结束")

if __name__ == '__main__':
main()