如何利用并发性加速你的python程序(二):I/O 绑定程序加速
雷锋网 (公众号:雷锋网) AI 科技评论按,本文是工程师 Jim Anderson 分享的关于「通过并发性加快 python 程序的速度」的文章的第二部分,主要内容是 I/O 绑定程序加速相关。
在 上一篇 中,我们已经讲过了相关的概念:什么是并发?什么是并行? I/O 绑定和 CPU 绑定等。在这里,我们将对一些 python 并发方法进行比较,包括线程、异步和多进程,在程序中何时使用并发性以及使用哪个模块。
当然,本文假设读者对 python 有一个基本的了解,并且使用 python3.6 及以上版来运行示例。你可以从 Real python GitHub repo 下载示例。
如何加速 I/O 绑定程序
让我们从关注 I/O 绑定程序和一个常见问题开始:通过网络下载内容。在我们的例子中,你将从一些站点下载网页,但这个过程可能会产生任何故障。它只是更容易可视化。
同步版本
我们将从这个任务的非并发版本开始。注意,这个程序需要请求模块。在运行这个程序之前,你需要运行 pip 安装请求,这可能需要使用 virtualenv 命令。此版本根本不使用并发:
import requests
import time
def download_site(url, session):
with session.get(url) as response:
print (f "Read {len(response.content)} from {url}" )
def download_all_sites(sites):
with requests.Session() as session:
for url in sites:
download_site(url, session)
if __name__ == "__main__":
sites = [
"http://www.jython.org",
"http://olympus.realpython.org/dice",
] * 80start_time = time.time()
download_all_sites(sites)
duration = time.time() - start_time
print(f "Downloaded {len(sites)} in {duration} seconds" )
如你所见,这是一个相当短的程序。download_site()可以从 URL 下载内容并打印它的大小。要指出的一个小问题是,我们正在使用来自 Session 的会话对象。
直接从 requests 中使用 get(),但创建一个 Session 对象允许 requests 执行一些花哨的网络技巧从而真正加快速度是可能的。
download_all_sites()创建 Session,然后浏览站点列表,依次下载每个站点。最后,它打印出这个过程花费了多长时间,这样你就可以满意地看到在下面的示例中并发性对我们有多大帮助。
这个程序的处理图看起来很像上一节中的 I/O 绑定图。
注意:网络流量取决于许多因素,这些因素可能在每秒都在变化。我已经看到由于网络问题,这些测试案例从一次运行跳转到另一次的时间加倍了。
为什么同步版本很重要
这个版本的代码最棒的特点是,它很简单,编写和调试相对容易。代码的思路更加直接,所以你可以预测它将如何运作。
同步版本的问题
和我们提供的其他解决方案相比,同步版本最大的问题是,它的速度相对较慢。以下是我的机器上的最终输出示例:
注意 :你得到的结果可能会和上面有很大差异。运行这个脚本时,需要的时间从 14.2 秒到 21.9 秒不等。在本文中,时间取三次运行中最快的一次所花的时间,在这种情况下,两种方法之间的差异仍然很明显。
然而,运行速度变慢并不总是一个大问题。如果你正在运行的程序使用同步版本运行只需要 2 秒,并且很少运行,那么可能不需要添加并发性。
如果你的程序经常运行怎么办?如果运行程序需要几个小时怎么办?让我们继续使用线程重写这个程序以实现并发性。
线程版本
正如你可能猜测的那样,编写线程程序需要付出更多的努力。然而,对于简单的案例,你可能会惊讶于它所花费的额外努力是如此之少。下面是同一个程序的线程版本:
import concurrent.futures
import requests
import threading
import time
thread_local = threading . local()
def get_session():
if not getattr(thread_local, "session", None ):
thread_local.session = requests.Session()
return thread_local.session
def download_site(url):
session = get_session()
with session.get(url) as response:
print (f "Read {len(response.content)} from {url}" )
def download_all_sites(sites):
with concurrent.futures.ThreadPoolExecutor(max_workers = 5 ) as executor:
executor.map(download_site, sites)
if __name__ == "__main__":
sites = [
"http://www.jython.org",
"http://olympus.realpython.org/dice",
] * 80
start_time = time.time()
download_all_sites(sites)
duration = time.time() - start_time
print(f "Downloaded {len(sites)} in {duration} seconds" )
当你添加线程时,整体结构是相同的,因此你只需要做一些更改。download_all_sites()从在每个站点调用一次函数改为更复杂的结构。
在这个版本中,你正在创建一个 ThreadPoolExecutor,这看起来很复杂。我们可以把它分解为:ThreadPoolExecutor=thread+pool+executor。
这个对象将创建一个线程池,每个线程都可以并发运行。最后,执行器会控制池中每个线程的运行方式和运行时间。请求将在池中执行。
标准库将 ThreadPoolExecutor 实现为上下文管理器,这样你就可以使用 with 语法来管理线程池的创建和释放。
一旦有了 ThreadPoolExecutor,就可以很方便地使用它的.map()方法。此方法在列表中的每个站点上运行传入函数。最重要的是,它使用所管理的线程池自动并发地运行它们。
那些学习其他语言,甚至是 python 2 的用户可能想知道,在处理线程时,通常用来管理细节的对象和函数在哪里,比如 thread.start()、thread.join()和 queue。
这些仍然存在,你可以使用它们来实现对线程运行方式的细粒度控制。但是,从 python3.2 开始,标准库添加了一个执行器,如果不需要细粒度的控制,它可以为你管理许多细节。
我们的示例中另一个有趣的变化是,每个线程都需要创建自己的 requests.session()对象。当你查看请求文档时,不一定很容易分辨出来,但是读到 这个问题 时,你似乎很清楚每个线程需要单独的 Session。
这是线程处理的一个有趣又困难的问题之一。因为操作系统可以控制一个任务何时被中断及另一个任务何时开始,所以在线程之间共享的任何数据都需要受到保护,保证线程安全。很遗憾,requests.session()不是线程安全的。
根据数据是什么以及如何使用它,有几种策略可以使数据访问线程安全。其中之一是使用线程安全的数据结构,如 python 队列模块中的 queue。
另一种策略是线程本地存储。Threading.local() 创建一个看起来像全局的对象,但它对于每个线程来说是不一样的。在你的示例中,这是通过 threadLocal 和 get_session()完成的:
threadLocal = threading . local()def get_session():
if getattr (threadLocal, "session", None ) is None :
threadLocal.session = requests.Session()
return threadLocal.session
ThreadLocal 是在线程模块中专门解决这个问题的。看起来有点奇怪,但你只想创建这些对象中的一个,而不是为每个线程创建一个对象。对象本身负责分离不同线程对不同数据的访问过程。
当调用 get_session()时,它查找的 session 和它运行的特定线程是对应的。因此,每个线程在第一次调用 get_session()时将创建一个会话,然后后续在其整个生命周期内简单地调用该会话。
最后,一个关于选择线程数的简短说明。你可以看到示例代码使用了 5 个线程。你可以随意调整这个数字的大小,看看总的时间是如何变化的。你可能认为每次下载只有一个线程是最快的,但实际上不是这样,至少在我的系统中不是这样。我发现,线程数目在 5 到 10 个之间时,速度是最快的。如果超过这个值,那么创建和销毁线程所产生的额外开销将抵消任何节省时间所带来的好处。
这里的难点在于,正确的线程数不是从一个任务到另一个任务中的常量。需要进行一些实验才能得到结果。
为什么线程版本很重要
它很快!这里是我测试中最快的一次。记住,非并发版本需要 14 秒以上的时间:
它的执行时序图如下所示:
它使用多个线程同时向网站发出多个打开的请求,允许你的程序重叠等待时间并更快地获得最终结果!
线程版本的问题
正如你从示例中看到的,要实现这一点需要更多的代码,而且你真的需要考虑在线程之间需要共享哪些数据。
线程可以以巧妙且难以检测的方式进行交互。这些交互可能导致随机的、间歇性的错误,且这些错误很难找到。
异步(asyncio)版本
在你开始检查异步版本示例代码之前,让我们详细讨论一下异步的工作原理。
异步基础
这将是 asycio 的简化版本。这里有许多细节被掩盖了,但它仍然说明了它是如何工作的。
asyncio 的一般概念是,一个被称为事件循环的 python 对象控制每个任务的运行方式和时间。这个对象清楚地知道每个任务处于什么状态。实际上,任务可以处于许多状态,但现在让我们设想一个简化的事件循环,它只有两个状态。
就绪状态指的是任务有工作要做并且准备运行,而等待状态意味着任务正在等待一些外部事情完成,例如网络操作。简化的事件循环维护两个任务列表,分别对应这两个状态。它选择一个已经就绪的任务,然后重新开始运行。该任务处于完全控制状态,直到它将控件送回事件循环。
当正在运行的任务将控制权交还给事件循环时,事件循环将该任务放入就绪或等待列表,然后遍历等待列表中的每个任务,以查看完成 I/O 操作后该任务是否已就绪。它知道就绪列表中的任务仍然是就绪状态,因为它们尚未运行。
一旦所有的任务都被重新排序到正确的列表中,事件循环就会选择下一个要运行的任务。简化的事件循环选择等待时间最长的任务并运行该任务。此过程重复,直到事件循环完成。
asyncio 的一个重要点是,如果不是有意为之,任务永远不会放弃控制。任务在执行的过程中从不会被打断。这使得我们在异步中比在线程中更容易进行资源共享。你不需要担心线程安全问题。
async 和 await
现在让我们来谈谈添加到 python 中的两个新关键字:async 和 await。根据上面的讨论,你可以将 await 视为允许任务将控制权交回事件循环的一种魔力。当你的代码等待函数调用时,await 是一个信号,表明调用可能需要花费一段时间,并且任务应该放弃控制。
最简单的方法是将 async 看作是 python 的标志,告诉它将使用 await 定义函数。在有些情况下,这不是完全正确的,比如异步生成器,但它适用于许多情况,并在开始时为你提供一个简单的模型。
你将在下一个代码中看到的一个例外是 async with 语句,它通常从你的等待的对象创建一个上下文管理器。虽然语义有点不同,但其思想是相同的:将这个上下文管理器标记为可以替换的东西。
我确信你可以想象到,在管理事件循环和任务之间的交互时有一些复杂性。对于以 asyncio 开始的开发人员来说,这些细节并不重要,但是你需要记住,任何调用 await 的函数都需要标记为 async。否则将出现语法错误。 雷锋网
回到代码
既然你已经基本了解了什么是 asyncio,那么让我们浏览一下示例代码的 asyncio 版本,并了解它是如何工作的。请注意,此版本添加了 aiohtp。在运行它之前,应该先运行 pip install aiohtp:
import asyncio
import time
import aiohttp
async def download_site(session, url):
async with session.get(url) as response:
print ( "Read {0} from {1}" .format(response.content_length, url))
async def download_all_sites(sites):
async with aiohttp.ClientSession() as session:
tasks = []
for url in sites:
task = asyncio.ensure_future(download_site(session, url))
tasks.append(task)
await asyncio.gather( * tasks, return_exceptions=True)
if __name__ == "__main__" :
sites = [
"http://www.jython.org",
"http://olympus.realpython.org/dice" ,
] * 80
start_time = time.time()
asyncio.get_event_loop().run_until_complete(download_all_sites(sites))
duration = time.time() - start_time
print(f "Downloaded {len(sites)} sites in {duration} seconds ")
这个版本比前两个版本要复杂一些。它有一个类似的结构,但是启动任务的工作量比创建线程池执行器的工作量要多一些。让我们从示例的顶部开始。
-
download_site()
顶部的 download_site()与线程版本几乎相同,但函数定义行上的 async 关键字和实际调用 session.get()时的 async with 关键字除外。稍后你将看到为什么可以在这里传递 session,而不是使用线程本地存储。
-
download_all_sites()
download_all_sites() 中可以看到线程示例中最大的变化。
你可以在所有任务之间共享会话,因此该会话在此处创建为上下文管理器。任务可以共享会话,因为它们都在同一线程上运行。会话处于错误状态时,一个任务无法中断另一个任务。
在该上下文管理器中,它使用 asyncio.secure_future()创建一个任务列表,该列表还负责启动它们。创建所有任务后,此函数使用 asyncio.gather()完成会话内容的变动,直到所有任务完成。
线程代码的作用与此类似,但在 ThreadPoolExecutor 中可以方便地处理细节。当前没有 asyncioPoolExecutor 类。
然而,这里的细节中隐藏着一个小而重要的变化。还记得之前我们讨论过要创建的线程数吗?在线程示例中,线程的最佳数量并不明显。
asyncio 的一个很酷的优点是它的规模远远优于线程。与线程相比,每项任务创建所需的资源和时间要少得多,因此创建和运行更多的资源和时间能很好地工作。这个例子只是为每个要下载的站点创建一个单独的任务,这个任务运行得很好。 雷锋网
-
__main__
最后,异步的本质意味着你必须启动事件循环,并告诉它要运行哪些任务。文件底部的__main__部分包含 get_event_loop() 的代码,然后运行 run_until_complete()。如果没有别的,他们在命名这些函数方面做得很好。
如果你已经更新到 python 3.7,那么 python 核心开发人员会为你简化这种语法。不需要分辨那种情况下使用 asyncio.get_event_loop(),那种情况下使用 run_until_complete(),你只需使用 asyncio.run()。
为什么 asyncio 版本很重要
它真的很快!在我的机器上进行的所有测试中,这是代码运行最快的版本:
执行时序图与线程示例中所发生的情况非常相似。只是 I/O 请求都是由同一线程完成的:
缺少线程池执行器,使得这段代码比线程示例要复杂一些。在这种情况下,你需要做一些额外的工作来获得更好的性能。
还有一个常见的论点是,在合适的位置添加 async 和 await 是一个复杂的问题。在某种程度上,这是事实。这个论点的另一个方面是,它迫使你思考何时交换给定的任务,这可以帮助你设计出一份更好、更快的代码。
规模问题在这里也很突出。为每个站点运行上面的线程示例明显比用少量线程运行它慢。运行带有数百个任务的 asyncio 示例并没有减慢速度。
asyncio 版本的问题
现在 asyncio 有几个问题。为了充分利用 asyncio,你需要特殊的 asyncio 版本的库。如果你只是使用下载站点的请求,那么速度会慢得多,因为请求不是用来通知事件循环它被阻塞了。随着时间的推移,这个问题越来越少,因为越来越多的库采用 asyncio。
另一个更微妙的问题是,如果其中一个任务不合作,那么协作多任务的所有优势都会消失。代码中的一个小错误会导致一个任务运行,并长时间占用处理器,从而使其他需要运行的任务处于等待状态。如果任务没有将控制权交还给事件循环,则无法中断事件循环。考虑到这一点,让我们来看看一种完全不同的并发、多处理方法。
多处理版本
与前面的方法不同,多处理版本的代码充分利用了新计算机的多个 CPU。让我们从代码开始:
import requests
import multiprocessing
import time
session = None
def set_global_session():
global session
if not session:
session = requests.Session()
def download_site(url):
with session.get(url) as response:
name = multiprocessing.current_process().name
print (f "{name}:Read {len(response.content)} from {url}" )
def download_all_sites(sites):
with multiprocessing.Pool(initializer=set_global_session) as pool:
pool.map(download_site, sites)
if __name__ == "__main__":
sites = [
"http://www.jython.org",
"http://olympus.realpython.org/dice" ,
] * 80
start_time = time.time()
download_all_sites(sites)
duration = time.time() - start_time
print (f "Downloaded {len(sites)} in {duration} seconds" )
这比 asyncio 示例短得多,实际上,它看起来与线程示例非常相似,但是在我们深入研究代码之前,让我们快速了解一下多处理对你会有什么帮助。
简述多处理
到目前为止,本文中的所有并发示例都只在计算机的单个 CPU 或核上运行。其原因与当前的 cpython 的设计以及所谓的全局解释器锁(globalinterpretorlock,简称 gil)有关。
标准库中的多处理设计正是为了改变这种状态而设计的,它使你能在多个 CPU 上运行代码。在高层,它是通过创建一个新的 python 解释器实例在每个 CPU 上运行,然后释放出程序的一部分来实现的。
在当前的 python 解释器中启动一个新线程的速度不如单独启动一个 python 解释器的速度快。这是一个重要的操作,存在一些限制和困难,但对某些问题来说,它可以产生巨大的差异。
多处理代码
代码与我们的同步版本相比有一些小的变化。第一个区别位于 download_all_sites()中。它不是简单地重复调用 download_site(),而是创建一个 multiprocessing.pool 对象,并让它将 download_site 映射到不可访问的站点。和线程示例相比,这点比较相似。
这里所发生的是,池(pool)创建了许多单独的 python 解释器进程,并让每个进程在某些项上运行指定的函数,在我们的例子中是在站点列表上运行指定的函数。主进程和其他进程之间的通信由多处理模块为你处理。
创造池的那条线值得你注意。首先,它不指定要在池中创建多少进程,尽管这是一个可选参数。默认情况下,multiprocessing.pool()将确定计算机中的 CPU 数量并与之匹配。这通常是最好的答案,在我们的例子中也是如此。
对于这个问题,增加进程的数量并不能提高速度。相反,它实际上会降低速度,因为启动和删除所有这些进程的成本大于并行执行 I/O 请求的好处。
接下来,我们得到该调用的 initializer=set_global_session 部分。请记住,池中的每个进程都有自己的内存空间,这意味着它们不能共享会话对象之类的东西。你不会希望每次调用函数时都创建新会话,而是希望为每个进程创建一个会话。
初始化功能参数就是为这种情况而生成的。无法将返回值从初始值设定项传递回由进程 download_site()调用的函数,但可以初始化全局会话变量以保存每个进程的单个会话。因为每个进程都有自己的内存空间,所以每个进程的全局空间都不同。
这就是所有要说的啦,其余的代码与你以前看到的非常相似。
为什么多处理版本很重要
这个例子的多处理版本非常好,因为它相对容易启动,并且只需要很少的额外代码。它还充分利用了计算机中的 CPU 资源。此代码的执行时序图如下所示:
多处理版本的问题
这个版本的示例确实需要一些额外的设置,而且全局会话对象很奇怪。你必须花费一些时间来考虑在每个流程中访问哪些变量。
最后,它明显比本例中的异步和线程版本慢:
这并不奇怪,因为 I/O 绑定问题并不是多处理存在的真正原因。在进入下一节并查看 CPU 绑定示例时,你将看到更多内容。
本文之前还有相关概念介绍: 如何利用并发性加速你的python程序(一):相关概念
以及接下来的一篇是: 如何利用并发性加速你的python程序(三):CPU 绑定程序加速
via: https://www.leiphone.com/news/201901/JfoLltRClm3bZzuB.html?type=preview
雷锋网版权文章,未经授权禁止转载。详情见。