队列
队列有点像列表:从一头添加事物,从另一头取出事物。这种队列被称为 FIFO(先进先出)。
假设你正在洗盘子,如果需要完成全部工作,需要洗每一个盘子、烘干并放好。你有很多种方法来完成这个任务。或许你会先洗第一个盘子,烘干并把它放好,之后用同样的方法来处理第二个盘子,以此类推。此外,你也可以执行批量操作,先洗完所有的盘子,再烘干所有的盘子,最后把它们都放好。这样做需要你有足够大的水池和烘干机来放置每一步积累的所有盘子。这些都是同步方法:一个工人,一次做一件事。
还有一种方法是再找一个或者两个帮手。如果你是洗盘子的人,可以把洗好的盘子递给烘干盘子的人,他再把烘干的盘子递给放置盘子的人。所有人都在自己的位置工作,这样会比你一个人要快很多。
然而,如果你洗盘子的速度比下一个人烘干的速度快怎么办?要么把湿盘子扔在地上,要么把它们堆在你和下一个人之间,或者一直闲着直到下一个人处理完之前的盘子。如果最后一个人比第二个人还慢,那第二个人要么把盘子扔在地上,要么把它们堆在两个人之间,要么就闲着。你有很多个工人,但总体来说,任务仍然是同步完成的,处理速度和最慢的工人速度是一样的。
俗话说:人多好办事 。增加工人可以更快地搭建粮仓或者洗盘子,前提是使用队列。
通常来说,队列用来传递消息,消息可以是任意类型的信息。在本例中,我们用队列来管理分布式任务,这种队列也称为工作队列或者任务队列 。水池中的每个盘子都会发给一个闲置的洗盘子的人,他会洗盘子并把盘子传给第一个闲置的烘干盘子的人,他会烘干盘子并把盘子传给第一个闲置的放盘子的人。这个过程可以是同步的(工人等着处理盘子,处理完等着把盘子给下一个人),也可以是异步的(盘子堆在两个工人中间)。只要你有足够多的工人并且他们都能认真工作,完成速度会很快。
队列(Queue)
Queue 叫队列,是数据结构中的一种,基本上所有成熟的编程语言都内置了对 Queue 的支持。
Python 中的 Queue 模块实现了多生产者和多消费者模型,当需要在多线程编程中非常实用。而且该模块中的 Queue 类实现了锁原语,不需要再考虑多线程安全问题。
该模块内置了三种类型的 Queue,分别是
class queue.Queue(maxsize=0)
,class queue.LifoQueue(maxsize=0)
class queue.PriorityQueue(maxsize=0)
它们三个的区别仅仅是取出时的顺序不一致而已。
队列方式 | 特点 |
---|---|
queue.Queue | 先进先出队列 |
queue.LifoQueue | 后进先出队列 |
queue.PriorityQueue | 优先级队列 |
queue.deque | 双线队列 |
常用操作
class queue.Queue(maxsize=0)
上面所说的内置队列,其中 maxsize 是个整数,用于设置可以放入队列中的任务数的上限。当达到这个大小的时候,插入操作将阻塞至队列中的任务被消费掉。如果 maxsize 小于等于零,则队列尺寸为无限大。
方法 | 用法说明 |
---|---|
put | 放数据,Queue.put( )默认有block=True和timeout两个参数。当block=True时,写入是阻塞式的,阻塞时间由timeout确定。当队列q被(其他线程)写满后,这段代码就会阻塞,直至其他线程取走数据。Queue.put()方法加上 block=False 的参数,即可解决这个隐蔽的问题。但要注意,非阻塞方式写队列,当队列满时会抛出 exception Queue.Full 的异常 |
get | 取数据(默认阻塞),Queue.get([block[, timeout]])获取队列,timeout等待时间 |
empty | 如果队列为空,返回True,反之False |
qsize | 显示队列中真实存在的元素长度 |
maxsize | 最大支持的队列长度,使用时无括号 |
join | 实际上意味着等到队列为空,再执行别的操作 |
task_done | 在完成一项工作之后,Queue.task_done()函数向任务已经完成的队列发送一个信号 |
full | 如果队列满了,返回True,反之False |
添加任务
向队列中添加任务,直接调用 put()
函数即可
>>> import queue
>>> q = queue.Queue(maxsize=1)
>>> q.put(100)
put()
函数完整的函数签名如下Queue.put(item, block=True, timeout=None)
,如你所见,该函数有两个可选参数。- 默认情况下,在队列满时,该函数会一直阻塞,直到队列中有空余的位置可以添加任务为止。如果 timeout 是正数,则最多阻塞 timeout 秒,如果这段时间内还没有空余的位置出来,则会引发
Full
异常。 - 当 block 为 false 时,timeout 参数将失效。同时如果队列中没有空余的位置可添加任务则会引发
Full
异常,否则会直接把任务放入队列并返回,不会阻塞。 - 另外,还可以通过
Queue.put_nowait(item)
来添加任务,相当于Queue.put(item, False)
,不再赘述。同样,在队列满时,该操作会引发Full
异常。
获取任务
>>> import queue
>>> q = queue.Queue()
>>> q.put(100)
>>> q.get()
100
- 与
put()
函数一样,get()
函数也有两个可选参数,完整签名如下Queue.get(block=True, timeout=None)
。 - 默认情况下,当队列空时调用该函数会一直阻塞,直到队列中有任务可获取为止。如果 timeout 是正数,则最多阻塞 timeout 秒,如果这段时间内还没有任务可获取,则会引发
Empty
异常。 - 当 block 为 false 时,timeout 参数将失效。同时如果队列中没有任务可获取则会立刻引发
Empty
异常,否则会直接获取一个任务并返回,不会阻塞。 - 另外,还可以通过
Queue.get_nowait()
来获取任务,相当于Queue.get(False)
,不再赘述。同样,在队列为空时,该操作会引发Empty
异常。
获取队列大小
>>> import queue
>>> q = queue.Queue()
>>> q.put(100)
>>> q.put(200)
>>> q.qsize()
2
判断队列是否为空
如果队列为空,返回 True
,否则返回 False
。如果 empty() 返回 True
,不保证后续调用的 put() 不被阻塞。类似的,如果 empty() 返回 False
,也不保证后续调用的 get() 不被阻塞。
判断队列是否满
如果队列是满的返回 True
,否则返回 False
。如果 full() 返回 True
不保证后续调用的 get() 不被阻塞。类似的,如果 full() 返回 False
也不保证后续调用的 put() 不被阻塞。
>>> import queue
>>> q = queue.Queue(maxsize=1)
>>> q.empty()
True
>>> q.full()
False
>>> q.put(100)
>>> q.empty()
False
>>> q.full()
True
生产者-消费者模型
在生产者-消费者问题 是用来看看线程或进程同步的问题一个标准的计算机科学问题。您将看一下它的变体,以了解Python threading
模块提供的原语。
比如一个包子铺中的顾客吃包子,和厨师做包子,不可能是将包子一块做出来,在给顾客吃,但是单线程只能这么做
多线程来执行,厨师一边做包子,顾客一边吃包子,当顾客少时,厨师做的包子就放在一个容器中,等着顾客来吃,当顾客多的时候,就从容器中先取出来给顾客吃,厨师继续做包子用队列来模拟这个容器
如果希望一次能够在管道中处理多个值,那么您将需要一个管道数据结构,允许数字随着数据的备份而增长和缩小 producer
。
producer
生产者:
def producer(pipeline):
"""生产者 厨师做包子"""
for index in range(1, 100000):
time.sleep(0.1)
print("做出一个包子: %s" % index)
# 将包子放到蒸笼里面
pipeline.put(f'第{index}个包子')
它现在将循环,直到它看到事件在第3行设置。它也不再将SENTINEL
值放入pipeline
。
consumer
不得不改变一点:
def consumer(pipeline):
"""消费者 客户吃包子"""
while True:
message = pipeline.get()
print(message)
print(f'消费者吃了{message}')
在消费者完成之前确保队列为空可防止另一个有趣的问题。如果consumer
确实在其中pipeline
包含消息时退出,则可能发生两件坏事。首先是你丢失了那些最终的消息,但更严重的是,producer
可以抓住尝试将消息添加到完整队列并且永远不会返回。
Queue
初始化时有一个可选参数,用于指定队列的最大大小。
如果给出一个正数maxsize
,它会将队列限制为该元素数,导致.put()
阻塞直到少于maxsize
元素。如果未指定maxsize
,则队列将增长到计算机内存的限制。
.get_message()
并且 .set_message()
变小了。他们基本上包裹 .get()
和 .put()
上 Queue
。您可能想知道阻止线程引起竞争条件的所有锁定代码的位置。
编写标准库的核心开发人员知道a Queue
经常在多线程环境中使用,并将所有锁定代码合并到Queue
自身内部。Queue
是线程安全的。
以下是代码queue.Queue
直接使用的内容:
import queue
import random
import threading
import concurrent.futures
import time
def producer(pipeline):
"""生产者 厨师做包子"""
for index in range(1, 100000):
time.sleep(0.1)
print("做出一个包子: %s" % index)
# 将包子放到蒸笼里面
pipeline.put(f'第{index}个包子')
def consumer(pipeline):
"""消费者 客户吃包子"""
while True:
message = pipeline.get()
print(message)
print(f'消费者吃了{message}')
if __name__ == "__main__":
pipeline = queue.Queue(maxsize=10)
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
executor.submit(producer, pipeline)
executor.submit(consumer, pipeline)
time.sleep(0.1)
这更容易阅读,并展示了如何使用 Python 的内置基元来简化复杂问题。
Lock
并且 Queue
是解决并发问题的方便类,但标准库还提供了其他类。在结束本教程之前,让我们快速调查一下这些教程。