Skip to content

并发池

假设有一个任务,拥有非常多的请求,伪代码如下:

python
def download(url):
    # 请求网站 1mb
    pass


url_list = [...]  # 1千万个请求地址

# 伪代码


for url in url_list:
    download(url)

直接运行程序,后果会是什么 ? 会将程序能够使用的内存全部占满


为了更好的解决这个问题,Python3.2 带来了 concurrent.futures 模块,这个模块具有线程池和进程池、管理并行编程任务、处理非确定性的执行流程、进程/线程同步等功能。

此模块由以下部分组成:

  • concurrent.futures.Executor: 这是一个虚拟基类,提供了异步执行的方法。
  • submit(function, argument): 调度函数(可调用的对象)的执行,将 argument 作为参数传入。
  • map(function, argument): 将 argument 作为参数执行函数,以 异步 的方式。
  • shutdown(Wait=True): 发出让执行者释放所有资源的信号。
  • concurrent.futures.Future: 其中包括函数的异步执行。Future 对象是 submit 任务(即带有参数的 functions)到 executor 的实例。

Executor 是抽象类,可以通过子类访问,即线程或进程的 ExecutorPools 。因为,线程或进程的实例是依赖于资源的任务,所以最好以“池”的形式将他们组织在一起,作为可以重用的 launcherexecutor


其中比较重要是 current.Futures 模块提供了两种 Executor 的子类,各自独立操作一个线程池和一个进程池。这两个子类分别是:

  • concurrent.futures.ThreadPoolExecutor(max_workers)
  • concurrent.futures.ProcessPoolExecutor(max_workers)
  • max_workers 参数表示最多有多少个 worker 并行执行任务。

并发池

线程池或进程池是用于在程序中优化和简化线程/进程的使用。通过池,你可以提交任务给 executor。池由两部分组成,一部分是内部的队列,存放着待执行的任务; 另一部分是一系列的进程或线程,用于执行这些任务。池的概念主要目的是为了重用:让线程或进程在生命周期内可以多次使用。它减少了创建创建线程和进程的开销,提高了程序性能。重用不是必须的规则,但它是程序员在应用中使用池的主要原因。

准备工作

下面的示例代码展示了线程池和进程池的功能。这里的任务是,给一个 list url_list,然后用一个函数调用进行请求(这个任务只是为了消耗时间)。

下面的代码分别测试了:

  • 顺序执行
  • 通过有 5 个 worker 的线程池执行
  • 通过有 5 个 worker 的进程池执行

顺序执行

我们创建了一个 urls 存放100000个待请求的网址,然后使用线程池去请求:

python
import concurrent.futures
import time

url_list = [f'https://maoyan.com/board/4?offset={page}' for page in range(100000)]


def download(url):
    print(url)
    # 延时从操作
    time.sleep(1)

在主要程序中,我们先使用顺序执行跑了一次程序::

python
if __name__ == "__main__":
    # 顺序执行
    start_time = time.time()
    for item in number_list:
        evaluate_item(item)
    print("计算机顺序执行:" + str(time.time() - start_time), "秒")

线程池运行

然后,我们使用了 futures.ThreadPoolExecutor 模块的线程池跑了一次::

python

if __name__ == "__main__":

    start_time_1 = time.time()
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        for url in urls:
            executor.submit(download, url)
    print("线程池计算的时间:" + str(time.time() - start_time_1), "秒")

    # executor1 = concurrent.futures.ThreadPoolExecutor(max_workers=5)
    # for url in urls:
    #     executor.submit(download, url)
    # executor1.shutdown()
    # print("线程池计算的时间:" + str(time.time() - start_time_1), "秒")

ThreadPoolExecutor 使用线程池中的一个线程执行给定的任务。池中一共有5个线程,每一个线程从池中取得一个任务然后执行它。当任务执行完成,再从池中拿到另一个任务。

当所有的任务执行完成后,打印出执行用的时间::

python

print("线程池计算的时间:" + str(time.time() - start_time_1), "秒")

进程池运行

python

if __name__ == "__main__":
    start_time_2 = time.time()

    with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
        for url in urls:
            executor.submit(download, url)

    print("进程池计算的时间:" + str(time.time() - start_time_2), "秒")

    # executor2 = concurrent.futures.ProcessPoolExecutor(max_workers=5)
    # for url in urls:
    #     executor.submit(download, url)
    # executor2.shutdown()
    # print("进程池计算的时间:" + str(time.time() - start_time_2), "秒")

如同 ThreadPoolExecutor 一样, ProcessPoolExecutor 是一个 executor,使用一个线程池来并行执行任务。然而,和 ThreadPoolExecutor 不同的是, ProcessPoolExecutor 使用了多核

并发速度对比

单线程、多线程、多进程效果对比

IO密集型对比

python
# -*- coding: utf-8 -*-
import concurrent.futures
import time
import random

urls = [
    f'https://maoyan.com/board/4?offset={page}' for page in range(1000)
]


def download(url):
    # print(url)
    # 延时从操作
    time.sleep(0.0000001)


if __name__ == '__main__':
    """单线程"""
    start_time = time.time()
    for url in urls:
        download(url)
    print("单线程执行:" + str(time.time() - start_time), "秒")

    """多线程"""
    start_time_1 = time.time()
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        for url in urls:
            executor.submit(download, url)
    print("线程池计算的时间:" + str(time.time() - start_time_1), "秒")

    """多进程"""
    start_time_1 = time.time()
    with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
        for url in urls:
            executor.submit(download, url)
    print("线程池计算的时间:" + str(time.time() - start_time_1), "秒")

运行这个代码,我们可以看到运行时间的输出:

单线程执行:1.064488172531128 秒
线程池计算的时间:0.4077413082122803 秒
线程池计算的时间:0.46396422386169434 秒

CPU 密集型

python
# -*- coding: utf-8 -*-
import concurrent.futures
import time

number_list = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]


def evaluate_item(x):
    """计算总和,这里只是为了消耗时间"""
    a = 0
    for i in range(0, 100000):
        # 重复计算 消耗时间 cpu计算能力
        a = a + i
    return x


if __name__ == '__main__':
    """单线程"""
    start_time = time.time()
    for item in number_list:
        evaluate_item(item)
    print("单线程执行:" + str(time.time() - start_time), "秒")

    """多线程"""
    start_time_1 = time.time()
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        for item in number_list:
            executor.submit(evaluate_item, item)
    print("线程池计算的时间:" + str(time.time() - start_time_1), "秒")

    """多进程"""
    start_time_2 = time.time()
    with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
        for item in number_list:
            executor.submit(evaluate_item, item)
    print("进程池计算的时间:" + str(time.time() - start_time_2), "秒")
python
"""
    cpu 密集型任务
        顺序执行 一个人做,从头到尾
        多线程会有线程间的切换
        多进程 有十个人
"""

"""
    1. io 密集型任务: 爬虫、下载文件、网页服务器、请求数据库。只要有网络延迟的都是 io 密集型任务
    2. cpu 密集型任务: 需要不断地进行 cpu 计算获取结果,图像处理、视频处理、人工智能 (Python 的 cpu 密集型工具都是用其他语言写的)

    读取一个 txt 文件
    写入 100Gb 的数据到硬盘

    读取文件是 io,写入数据到文件是 cpu
"""

案例:多进程嵌套多线程

python
import time
import threading
import requests
import concurrent.futures

urls = ['http://www.baidu.com?page={}'.format(page) for page in range(1, 100)]


def download(url):
    print(url)
    response = requests.get(url)
    print(response)
    time.sleep(1)


def thread_poll_download(urls):
    print(urls)
    thread_poll = concurrent.futures.ThreadPoolExecutor(max_workers=5)
    for url in urls:
        thread_poll.submit(download, url)


# 100 个任务

# 5线程 每个线程 20 个任务
# 5进程 每个进程 20 个任务

# 5多进程(每个进程 20 个任务) + 5多线程(每个线程是四个任务)


if __name__ == '__main__':
    # 5多进程 -> 等分五份任务
    process_poll = concurrent.futures.ProcessPoolExecutor(max_workers=5)
    for i in range(20, 101, 20):
        # print(urls[i - 20:i])
        process_poll.submit(thread_poll_download, urls[i - 20:i])

"""
    多线程与多进程可以将普通任务变成并发任务
"""