并发池
假设有一个任务,拥有非常多的请求,伪代码如下:
def download(url):
# 请求网站 1mb
pass
url_list = [...] # 1千万个请求地址
# 伪代码
for url in url_list:
download(url)
直接运行程序,后果会是什么 ? 会将程序能够使用的内存全部占满
为了更好的解决这个问题,Python3.2 带来了 concurrent.futures
模块,这个模块具有线程池和进程池、管理并行编程任务、处理非确定性的执行流程、进程/线程同步等功能。
此模块由以下部分组成:
concurrent.futures.Executor
: 这是一个虚拟基类,提供了异步执行的方法。submit(function, argument)
: 调度函数(可调用的对象)的执行,将argument
作为参数传入。map(function, argument)
: 将argument
作为参数执行函数,以 异步 的方式。shutdown(Wait=True)
: 发出让执行者释放所有资源的信号。concurrent.futures.Future
: 其中包括函数的异步执行。Future 对象是 submit 任务(即带有参数的 functions)到 executor 的实例。
Executor
是抽象类,可以通过子类访问,即线程或进程的 ExecutorPools
。因为,线程或进程的实例是依赖于资源的任务,所以最好以“池”的形式将他们组织在一起,作为可以重用的 launcher
或 executor
。
其中比较重要是 current.Futures
模块提供了两种 Executor
的子类,各自独立操作一个线程池和一个进程池。这两个子类分别是:
concurrent.futures.ThreadPoolExecutor(max_workers)
concurrent.futures.ProcessPoolExecutor(max_workers)
max_workers
参数表示最多有多少个worker
并行执行任务。
并发池
线程池或进程池是用于在程序中优化和简化线程/进程的使用。通过池,你可以提交任务给 executor。池由两部分组成,一部分是内部的队列,存放着待执行的任务; 另一部分是一系列的进程或线程,用于执行这些任务。池的概念主要目的是为了重用:让线程或进程在生命周期内可以多次使用。它减少了创建创建线程和进程的开销,提高了程序性能。重用不是必须的规则,但它是程序员在应用中使用池的主要原因。
准备工作
下面的示例代码展示了线程池和进程池的功能。这里的任务是,给一个 list url_list
,然后用一个函数调用进行请求(这个任务只是为了消耗时间)。
下面的代码分别测试了:
- 顺序执行
- 通过有 5 个 worker 的线程池执行
- 通过有 5 个 worker 的进程池执行
顺序执行
我们创建了一个 urls 存放100000个待请求的网址,然后使用线程池去请求:
import concurrent.futures
import time
url_list = [f'https://maoyan.com/board/4?offset={page}' for page in range(100000)]
def download(url):
print(url)
# 延时从操作
time.sleep(1)
在主要程序中,我们先使用顺序执行跑了一次程序::
if __name__ == "__main__":
# 顺序执行
start_time = time.time()
for item in number_list:
evaluate_item(item)
print("计算机顺序执行:" + str(time.time() - start_time), "秒")
线程池运行
然后,我们使用了 futures.ThreadPoolExecutor
模块的线程池跑了一次::
if __name__ == "__main__":
start_time_1 = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
for url in urls:
executor.submit(download, url)
print("线程池计算的时间:" + str(time.time() - start_time_1), "秒")
# executor1 = concurrent.futures.ThreadPoolExecutor(max_workers=5)
# for url in urls:
# executor.submit(download, url)
# executor1.shutdown()
# print("线程池计算的时间:" + str(time.time() - start_time_1), "秒")
ThreadPoolExecutor
使用线程池中的一个线程执行给定的任务。池中一共有5个线程,每一个线程从池中取得一个任务然后执行它。当任务执行完成,再从池中拿到另一个任务。
当所有的任务执行完成后,打印出执行用的时间::
print("线程池计算的时间:" + str(time.time() - start_time_1), "秒")
进程池运行
if __name__ == "__main__":
start_time_2 = time.time()
with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
for url in urls:
executor.submit(download, url)
print("进程池计算的时间:" + str(time.time() - start_time_2), "秒")
# executor2 = concurrent.futures.ProcessPoolExecutor(max_workers=5)
# for url in urls:
# executor.submit(download, url)
# executor2.shutdown()
# print("进程池计算的时间:" + str(time.time() - start_time_2), "秒")
如同 ThreadPoolExecutor
一样, ProcessPoolExecutor
是一个 executor,使用一个线程池来并行执行任务。然而,和 ThreadPoolExecutor
不同的是, ProcessPoolExecutor
使用了多核
并发速度对比
单线程、多线程、多进程效果对比
IO密集型对比
# -*- coding: utf-8 -*-
import concurrent.futures
import time
import random
urls = [
f'https://maoyan.com/board/4?offset={page}' for page in range(1000)
]
def download(url):
# print(url)
# 延时从操作
time.sleep(0.0000001)
if __name__ == '__main__':
"""单线程"""
start_time = time.time()
for url in urls:
download(url)
print("单线程执行:" + str(time.time() - start_time), "秒")
"""多线程"""
start_time_1 = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
for url in urls:
executor.submit(download, url)
print("线程池计算的时间:" + str(time.time() - start_time_1), "秒")
"""多进程"""
start_time_1 = time.time()
with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
for url in urls:
executor.submit(download, url)
print("线程池计算的时间:" + str(time.time() - start_time_1), "秒")
运行这个代码,我们可以看到运行时间的输出:
单线程执行:1.064488172531128 秒
线程池计算的时间:0.4077413082122803 秒
线程池计算的时间:0.46396422386169434 秒
CPU 密集型
# -*- coding: utf-8 -*-
import concurrent.futures
import time
number_list = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
def evaluate_item(x):
"""计算总和,这里只是为了消耗时间"""
a = 0
for i in range(0, 100000):
# 重复计算 消耗时间 cpu计算能力
a = a + i
return x
if __name__ == '__main__':
"""单线程"""
start_time = time.time()
for item in number_list:
evaluate_item(item)
print("单线程执行:" + str(time.time() - start_time), "秒")
"""多线程"""
start_time_1 = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
for item in number_list:
executor.submit(evaluate_item, item)
print("线程池计算的时间:" + str(time.time() - start_time_1), "秒")
"""多进程"""
start_time_2 = time.time()
with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
for item in number_list:
executor.submit(evaluate_item, item)
print("进程池计算的时间:" + str(time.time() - start_time_2), "秒")
"""
cpu 密集型任务
顺序执行 一个人做,从头到尾
多线程会有线程间的切换
多进程 有十个人
"""
"""
1. io 密集型任务: 爬虫、下载文件、网页服务器、请求数据库。只要有网络延迟的都是 io 密集型任务
2. cpu 密集型任务: 需要不断地进行 cpu 计算获取结果,图像处理、视频处理、人工智能 (Python 的 cpu 密集型工具都是用其他语言写的)
读取一个 txt 文件
写入 100Gb 的数据到硬盘
读取文件是 io,写入数据到文件是 cpu
"""
案例:多进程嵌套多线程
import time
import threading
import requests
import concurrent.futures
urls = ['http://www.baidu.com?page={}'.format(page) for page in range(1, 100)]
def download(url):
print(url)
response = requests.get(url)
print(response)
time.sleep(1)
def thread_poll_download(urls):
print(urls)
thread_poll = concurrent.futures.ThreadPoolExecutor(max_workers=5)
for url in urls:
thread_poll.submit(download, url)
# 100 个任务
# 5线程 每个线程 20 个任务
# 5进程 每个进程 20 个任务
# 5多进程(每个进程 20 个任务) + 5多线程(每个线程是四个任务)
if __name__ == '__main__':
# 5多进程 -> 等分五份任务
process_poll = concurrent.futures.ProcessPoolExecutor(max_workers=5)
for i in range(20, 101, 20):
# print(urls[i - 20:i])
process_poll.submit(thread_poll_download, urls[i - 20:i])
"""
多线程与多进程可以将普通任务变成并发任务
"""