为了高效处理网络 I/O,需要使用并发,因为网络有很高的延时,为了不浪费 CPU 周期去等待,最好在收到网络相应之前做些其他的事

我们首先看依次从网络下载的代码:


In [6]:
import os 
import time
import sys

import requests

POP20_CC = ('CN IN US ID BR PK NG BD RU JP '
            'MX PH VN ET EG DE IR TR CD FR').split()

BASE_URL = 'http://flupy.org/data/flags'
DEST_DIR = 'downloads/'

def save_flag(img, filename):
    path = os.path.join(DEST_DIR, filename)
    with open(path, 'wb') as fp:
        fp.write(img)
        
def get_flag(cc):
    url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())
    resp = requests.get(url)
    return resp.content

def show(text):
    print(text, end=' ') # 输出末尾的换行符变成了空格
    sys.stdout.flush() #Python 中正常情况下,遇到换行才会刷新 stdout 缓冲。所以这里手动刷新缓冲
    
def download_many(cc_list):
    for cc in sorted(cc_list):
        image = get_flag(cc)
        show(cc)
        save_flag(image, cc.lower() + '.gif')
    return len(cc_list)

def main(download_many):
    t0 = time.time()
    count = download_many(POP20_CC)
    elapsed = time.time() - t0
    msg = '\n{} flags downloaded in {:.2f}s'
    print(msg.format(count, elapsed))
    
main(download_many)


BD BR CD CN DE EG ET FR ID IN IR JP MX NG PH PK RU TR US VN 
20 flags downloaded in 33.59s

注意上面代码细节,导入标准库 os time 和 sys 之后,使用一个空格分开了非标准库 requests

为了清楚起见,这里并没有处理异常。看到依次下载需要 33 秒

使用 concurrent.futures 模块下载

concurrent.futures 模块的主要特色是 ThreadPoolExecutor 和 ProcessPoolExecutor 类,这两个类实现的接口能分别在不同的线程或进程中执行可调用的对象,这两个类在内部维护着一个工作线程或进程池,以及要执行的任务队列。不过这个接口抽象层级很高,像下国旗这种简单的案例,不用关心任何实现细节

下面展示了如何使用 ThreadPoolExecutor.map 方法,以最简单的方式实现并发下载


In [10]:
from concurrent import futures

MAX_WORKERS = 20 

def download_one(cc):
    image = get_flag(cc)
    show(cc)
    save_flag(image, cc.lower() + '.gif')
    return cc

def download_many(cc_list):
    workers = min(MAX_WORKERS, len(cc_list))
    with futures.ThreadPoolExecutor(workers) as executor:
        # 与内置的 map 方法类似,不过 download_one 函数会在多个线程中并发调用
        # map 方法返回一个生成器,因此可以迭代,获取各个函数的返回值
        res = executor.map(download_one, sorted(cc_list))
    
    return len(list(res))

main(download_many)


CN FR EG ID RU TR BR IN CD NG JP PK PH MX US IR ET DE BD VN 
20 flags downloaded in 2.18s

这里为了方便引用了第一个代码块的一些函数,注意,上面的 download_one 函数其实是第一个例子的 for 循环结构体,编写并发代码经常这样重构,把依次执行的 for 循环改成函数,以便并发调用

期物在哪里

期物是 concurrent.futures 模块和 asyncio 包的重要组件,可是,作为这两个库的用户,我们有时候却见不到期物,在上面例子背后用到了期物,但是我们编写的代码并没有直接使用。这一节概述期物并举一个例子,展示用法

从 Python3.4 起,标准库中有两个名为 Future 的类:concurrent.futures.Future 和 asyncio.Future。这两个类的作用完全相同:两个 Future 类的实例都表示可能已经完成或者尚未完成的延时计算。这与 Twisted 引擎中的 Deferred 类,Tornado 框架中的 Future 类,以及多个 JavaScript 库中的 Promise 对象类似

期物封装待完成的操作,可以放入队列,完成的状态可以查询,得到结果(或抛出异常)后可以获取结果(或异常)。

我们要记住一件事:通常情况自己不应该创建期物,而只能由并发框架(concurrent.futures 或 asyncio)实例化。原因很简单:期物表示终将发生的事,而确定某件事会发生的唯一方式是执行的时间已经排定。因此,只有排定把某件事交给 concurrent.futures.Executor 子类处理,才会创建 concurrent.futures.Future 实例。例如 Executor.submit() 方法的参数是一个可调用的对象,调用这个方法后会为传入的可调用对象排期,并返回一个期物。

客户端代码不应该改变期物的状态,并发框架在期物表示的延时计算结束后会改变期物状态,而我们无法控制计算何时结束

这两种期物都有 .done() 方法,这个方法不阻塞,返回值是 布尔值,指明期物链接的可调用对象是否已经执行。客户端代码通常不会询问期物是否运行结束,而会等待通知。因此,两个 Future 类都用 .add_done_callback() 方法;这个方法只有一个参数,类型是可调用的对象,期物运行结束后会调用指定的可调用对象

此外,还有 .result() 方法,在期物运行结束后调用的话,这个方法在两个 Future 类中的作用相同;返回可调用对象的结果,或者重新抛出执行的可调用的对象时抛出异常。可是,如果期物没有运行结束,result 方法在两个 Future 类中的行为相差很大。对 concurrency.futures.Future 实例来说,调用 f.result() 方法会阻塞调用方所在的线程,直到有结果可返回,此时, result 方法可以接收可选的 timeout 参数,如果在指定时间期物没有完毕,会抛出 TimeError 异常,第 18 章你会发现,asyncio.Future.result 方法不支持设定超时时间,在那个库中获取期物的结果最好使用 yield from 结构。不过,对 concurrency.futures.Future 实例不能这么做。

这两个库都有几个函数返回期物,其他函数则使用期物,以用户易于理解的方式实现自身,上面的例子中的 Executor.map 方法属于后者:返回值是一个迭代器,迭代器的 __next__ 方法调用各个期物的 result 方法,因此我们得到的是各个期物的结果,而非期物本身

为了从实用的角度理解期物,我们使用 concurrent.futures.as_complete 函数重写上面例子,这个函数是一个期物列表,返回值是一个迭代器,在期物运行结束后产出期物

为了使用 futures.as_completed 函数,只需修改 download_many 函数,把较抽象的 executor.map 调用换成两个 for 循环:一个用于创建并排定期物,另一个用于获取期物的结果。同时,我们会添加几个print 调用,显示运行结束前后的期物。修改后的 download_many 函数如下所示


In [13]:
def download_many(cc_list):
    cc_list = cc_list[:5]
    
    # max_workers 硬编码为 3,以便在输出中观察待完成的期物
    with futures.ThreadPoolExecutor(max_workers=3) as executor:
        to_do = []
        # 按照字母表迭代国家代码,明确表明输出的顺序与输入一致
        for cc in sorted(cc_list):
            # submit 方法可以排定调用对象的执行时间,然后返回一个期物,表示这个待执行的操作
            future = executor.submit(download_one, cc)
            to_do.append(future)
            msg = 'Scheduled for {}: {}'
            print(msg.format(cc, future))
        
        results = []
        for future in futures.as_completed(to_do):
            res = future.result()
            msg = '{} result: {!r}'
            print(msg.format(future, res))
            results.append(res)
        return len(results)
    
main(download_many)


Scheduled for BR: <Future at 0x7f174c4bbcc0 state=running>
Scheduled for CN: <Future at 0x7f174c4bf630 state=running>
Scheduled for ID: <Future at 0x7f174c4b3dd8 state=running>
Scheduled for IN: <Future at 0x7f174c4b74e0 state=pending>
Scheduled for US: <Future at 0x7f174c4b7128 state=pending>
BR <Future at 0x7f174c4bbcc0 state=finished returned str> result: 'BR'
ID <Future at 0x7f174c4b3dd8 state=finished returned str> result: 'ID'
IN <Future at 0x7f174c4b74e0 state=finished returned str> result: 'IN'
US <Future at 0x7f174c4b7128 state=finished returned str> result: 'US'
CN <Future at 0x7f174c4bf630 state=finished returned str> result: 'CN'

5 flags downloaded in 4.58s

注意,这个示例中调用 future.result() 方法绝不会阻塞,因为 future 由 as_completed 函数产出。

上面输出可以看出,期物的 repr() 方法会显示期物的状态;前 3 个期物是 running,因为有 3 个工作的线程

后两个期物的状态是 pending,等待有线程可用

严格的来说,我们现在的脚本都不能并行下载,因为 GIL 锁的限制,他们都在单个线程中运行

阻塞型 I/O 和 GIL

CPython 解释器本身就不是线程安全的,因此有全局解释器锁(GIL),一次只允许使用一个线程执行 Python 字节码。因此,一个 Python 进程通常不能使用多个 CPU 核心

这是 CPython 解释器的局限,与 Python 语言无关,Jython 和 IronPython 没有这种限制,不过目前最快的 PyPy 也有 GIL

不过标准库中的阻塞型 I/O 操作函数在操作系统返回结果时都会释放 GIL,这意味着一个内置的函数或一个使用 C 语言编写的扩展释放 GIL。其实有个使用 C 语言编写的 Python 库可以管理 GIL,不过太麻烦,不建议使用

然而,标准库中所有执行阻塞型 I/O 操作的函数,在等待操作系统返回结果时都会释放 GIL。这意味着 Python 在这个层次可以使用多线程程,I/O 密集型 Python 程序能从中收益,一个 Python 程序等待网络响应,阻塞型 I/O 函数会释放 GIL,再运行一个进程

Python 标准库中所有的阻塞型 I/O 函数都会释放 GIL,允许其他线程运行。time.sleep() 函数也会释放 GIL。因此,尽管有 GIL,Python 还是能在 I/O 密集型应用发挥作用

使用 concurrent.futures 模块启动进程

concurrent.futures 模块文档副标题是执行并行任务。这个模块实现的是真正的并行计算,因为它使用 ProcessPoolExecutor 类把工作分配给多个进程处理,因此,如果需要做 CPU 密集型处理,使用这个模块能绕开 GIL,利用所有 CPU 核心

ProcessPoolExecutor 和 ThreadPoolExecutor 类都实现了通用的Executor 接口,因此使用 concurrent.futures 模块能特别轻松地把基于线程的方案转成基于进程的方案。

只需要把第二个例子的

def download_many(cc_list):
     workers = min(MAX_WORKERS, len(cc_list))
    with futures.ThreadPoolExecutor(workers) as executor:

改成:

def download_many(cc_list):
    with futures.ProcessPoolExecutor() as executor:

In [15]:
from concurrent import futures

MAX_WORKERS = 20 

def download_one(cc):
    image = get_flag(cc)
    show(cc)
    save_flag(image, cc.lower() + '.gif')
    return cc

def download_many(cc_list):
    workers = min(MAX_WORKERS, len(cc_list))
    with futures.ProcessPoolExecutor() as executor:
        res = executor.map(download_one, sorted(cc_list))
    
    return len(list(res))

main(download_many)


CN BD BR ET CD EG ID FR IN JP IR DE NG MX RU PH VN PK TR US 
20 flags downloaded in 4.89s

对于简单用途来说,这两个实现 Executor 接口的唯一指的注意的区别是,ThreadPoolExecutor.__init__ 方法需要 max_workers 参数,指定线程池中的线程数量,在 ProcessPoolExecutor 类中,那个参数是可选的,大多数时候不适用,默认值是 os.cpu_count() 函数返回的 CPU 数量,这样处理说得通,因为对 CPU 密集型的处理来说,不可能要求使用超过 CPU 数量的职程。二队 I/O 密集型处理来说,可以在一个 ThreadPoolExecutor 实例中使用 10 和 100 个或 1000 个线程,最佳线程数量取决于线程做的事,以及可用内存等

实验 Executor.map 方法

如果想并发运行多个可调用对象,最简单方式是使用上面 Executor.map 方法,下面展示了这个方法的某些运作细节


In [19]:
from time import sleep, strftime
from concurrent import futures

# 将参数打印,前面加上时间戳
def display(*args):
    print(strftime('[%H:%M:%S]'), end=' ')
    print(*args)

# 开始时候显示一个消息,休眠 n 秒,休眠 n 秒,结束时候显示一个消息
# 消息使用制表符缩进,缩进量由 n 确定
def loiter(n):
    msg = '{}loiter({}): doing nothing for {}s...'
    display(msg.format('\t'*n, n, n))
    sleep(n)
    msg = '{}loiter({}): done'
    display(msg.format('\t'*n, n))
    return n * 10

def main():
    display('Script starting')
    executor = futures.ThreadPoolExecutor(max_workers=3)
    results = executor.map(loiter, range(5))
    display('results:', results)
    display('Waiting for individual results:')
    # for 循环中的 enumerate 函数会隐式调用 next(results),
    # 这个函数又会在(内部)表示第一个任务(loiter(0))的 _f 期物上调用_f.result() 方法。
    # result 方法会阻塞,直到期物运行结束,因此这个循环每次迭代时都要等待下一个结果做好准备。
    for i, result in enumerate(results):
        display('result {}: {}'.format(i, result))
        
main()


[14:43:36] Script starting
[14:43:36] loiter(0): doing nothing for 0s...[14:43:36] 
[14:43:36][14:43:36]	loiter(1): doing nothing for 1s...[14:43:36] 
 		loiter(2): doing nothing for 2s... results:loiter(0): done
 
<generator object Executor.map.<locals>.result_iterator at 0x7f174c457eb8>[14:43:36]
 [14:43:36]			loiter(3): doing nothing for 3s... 
Waiting for individual results:
[14:43:36] result 0: 0
[14:43:37] 	loiter(1): done
[14:43:37] 				loiter(4): doing nothing for 4s...
[14:43:37] result 1: 10
[14:43:38] [14:43:38]		loiter(2): done 
result 2: 20
[14:43:39] 			loiter(3): done
[14:43:39] result 3: 30
[14:43:41] 				loiter(4): done
[14:43:41] result 4: 40

Executor.map 函数易于使用,不过有个特性可能有用,也可能没用,具体情况取决于需求:这个函数返回结果的顺序与调用开始的顺序一 致。如果第一个调用生成结果用时 10 秒,而其他调用只用 1 秒,代码会阻塞 10 秒,获取 map 方法返回的生成器产出的第一个结果。在此之 后,获取后续结果时不会阻塞,因为后续的调用已经结束。如果必须等到获取所有结果后再处理,这种行为没问题;不过,通常更可取的方式 是,不管提交的顺序,只要有结果就获取。为此,要把Executor.submit 方法和 futures.as_completed 函数结合起来使 用。

executor.submit 和 futures.as_completed 这个组合比executor.map 更灵活,因为 submit 方法能处理不同的可调用对象和参数,而executor.map 只能处理参数不同的同一个可调用对象。此外,传给futures.as_completed 函数的期物集合可以来自多个 Executor 实例,例如一些由 ThreadPoolExecutor 实例创建,另一些由 ProcessPoolExecutor 实例创建。


In [ ]: