In [1]:
import threading
import itertools
import time
import sys

class Signal:
    go = True
  

def spin(msg, signal):
    write, flush = sys.stdout.write, sys.stdout.flush
    for char in itertools.cycle('|/-\\'):
        status = char + ' ' + msg
        write(status)
        flush()
        write('\x08' * len(status)) # \x08 是退格符
        time.sleep(.1)
        if not signal.go:
            break
    write(' ' * len(status) + '\x08' * len(status)) # 最后用空格擦除屏幕内容,把光标移回开头
    
def slow_function():
    time.sleep(3) # 3 秒动画
    return 42

def supervisor():
    signal = Signal()
    spinner = threading.Thread(target=spin,
                               args=('thinking!', signal))
    print('spinner object:', spinner)
    spinner.start()
    result = slow_function()
    signal.go = False
    spinner.join()
    return result

def main():
    result = supervisor()
    print('Answer:', result)
    
main()


spinner object: <Thread(Thread-4, initial)>
| thinking/ thinking- thinking\ thinking| thinking/ thinking- thinking\ thinking| thinking/ thinking- thinking\ thinking| thinking/ thinking- thinking\ thinking| thinking/ thinking- thinking\ thinking| thinking/ thinking- thinking\ thinking| thinking/ thinking- thinking\ thinking| thinking/ thinking          Answer: 42

In [2]:
import asyncio
import itertools
import sys

@asyncio.coroutine # 打算交给 asyncio 处理的使用此装饰器,不是强制要求,但强烈建议这么做,原因后面讲
def spin(msg): # 这里不需要上面例子中的 signal 参数
    write, flush = sys.stdout.write, sys.stdout.flush
    for char in itertools.cycle('|/-\\'):
        status = char + ' ' + msg
        write(status)
        flush()
        write('\x08' * len(status))
        try:
            yield from asyncio.sleep(.1) # 代替 time.sleep(.1),这样休眠不会阻塞事件循环
        except asyncio.CancelledError: # 如果 spin 函数苏醒后抛出此异常,其原因是发出了取消请求,因此退出循环
            break
    write(' ' * len(status) + '\x08' * len(status))
  
@asyncio.coroutine
def slow_function(): # 现在这个函数是协程,在用休眠假装进行 I/O 操作时,使用 yield from 继续执行事件循环
    yield from asyncio.sleep(3) # 将控制权交给主循环,3 秒后休眠结束恢复此线程
    return 42

@asyncio.coroutine
def supervisor(): # 这个函数是协程,因此可以使用 yield from 驱动 slow_function() 函数
    spinner = asyncio.async(spin('thinking!')) # 排定 spin 协程的运行时间,使用一个 Task 对象包装 spin 协程,并立即返回
    print('spinner object:', spinner)
    result = yield from slow_function() # 驱动 slow_function(), 结束后获取返回值,同时事件继续执行,因为此函数 sleep 将控制权交回主循环
    spinner.cancel() # Task 对象可以取消,取消后会在协程当前暂停的 yield 抛出 asyncio.CancelledError 异常,协程可以捕获这个异常,也可以延时取消,甚至可以不取消
    return result

def main():
    loop = asyncio.get_event_loop() # 获取事件循环的引用
    result = loop.run_until_complete(supervisor()) # 驱动 supervisor,让其运行完毕,这个协程返回的是这次调用的返回值
    loop.close()
    print('Answer:', result)
    
main()


spinner object: <Task pending coro=<spin() running at <ipython-input-2-b2636da91070>:5>>
| thinking/ thinking- thinking\ thinking| thinking/ thinking- thinking\ thinking| thinking/ thinking- thinking\ thinking| thinking/ thinking- thinking\ thinking| thinking/ thinking- thinking\ thinking| thinking/ thinking- thinking\ thinking| thinking/ thinking- thinking\ thinking| thinking/ thinking          Answer: 42

使用 @asyncio.coroutine 装饰器不是强制要求,但是强烈建议这么做,因为这样能在一众普通的函数中把协程凸现出来,也有助于调试,如果还没从中产出值,协程就被垃圾回收了(意味着有操作未完成,因此可能是个缺陷),那就可以发出警告。这个装饰器不会预激协程

注意这两个函数的不同


In [2]:
# def supervisor():
#     signal = Signal()
#     spinner = threading.Thread(target=spin,
#                                args=('thinking!', signal))
#     print('spinner object:', spinner)
#     spinner.start()
#     result = slow_function()
#     signal.go = False
#     spinner.join()
#     return result

# @asyncio.coroutine
# def supervisor(): # 这个函数是协程,因此可以使用 yield from 驱动 slow_function() 函数
#     spinner = asyncio.async(spin('thinking!')) # 排定 spin 协程的运行时间,使用一个 Task 对象包装 spin 协程,并立即返回
#     print('spinner object:', spinner)
#     result = yield from slow_function() # 驱动 slow_function(), 结束后获取返回值,同时事件继续执行,因为此函数 sleep 将控制权交回主循环
#     spinner.cancel() # Task 对象可以取消,取消后会在协程当前暂停的 yield 抛出 asyncio.CancelledError 异常,协程可以捕获这个异常,也可以延时取消,甚至可以不取消
#     return result

主要区别如下:

  • asyncio.Task 对象差不多与 threading.Thread 对象等效
  • Task 对象用于驱动协程,Thread 对象用于调用可调用对象
  • Task 对象不由自己手动实例化,而是通过把协程川贝 asyncio.async(...) 函数或 loop.create_task(...) 方法获取
  • 获取的 Task 对象已经排定了运行时间(例如,由 asyncio.async 函数排定);Thread 实例则必须调用 start 方法,明确告知让它运行
  • 在线程版 supervisor 函数中,slow_function() 函数是普通的函数,直接由线程调用。在异步版 supervisor 函数中,此函数是协程,由 yield from 驱动
  • 没有 API 能从外部终止线程,因为线程随时可能被中断,导致系统处于无效状态。如果想终止任务,可以使用 Task.cancel() 实例方法,在协程内部抛出 CancelledError 异常。协程可以在暂停的 yield 处捕获这个异常,处理终止请求
  • supervisor 协程必须在 main 函数中由 loop.run_until_complete 方法执行

线程与协程还有一点区别要说明:如果使用线程编程,调度程序任何时候都能中断线程,必须记住保留锁,去保护程序中的重要部分,防止多步操作在执行的过程中中断,防止数据处于无效状态

协程默认会做好全方位包含,防止中断,我们必须显式产出才能让程序的余下部分运行,对于协程来说,无需保留锁,在多个线程之间同步操作,协程自身就会同步,因此在任意时刻只有一个协程运行,想交出控制权时,可以使用 yield 或 yield from 把控制权交还给调度程序。这就是能够安全地取消协程的原因:按照定义,协程只能在暂停的 yield 处取消,因此可以处理 CancelledError 异常,执行清理操作

asyncio.Future 故意不阻塞

asyncio.Future 类和 concurrent.futures.Futer 类的接口基本一致,不过实现方式不同,不能互换。

期物只是调度执行某物的结果,在 asyncio 包中,BaseEventLoop.create_task(...) 方法来接收一个协程,排定它的运行时间,然后返回一个 asyncio.Task 实例 -- 也是 asyncio.Future 类的实例,因为 Task 是 Future 的子类,用于包装协程,这与调用 Executor.submit(...) 方法创建 concurrent.futures.Future 实例是一个道理

与 concurrent.futures.Future 类似,asyncio.Future 类也提供了 .done(), .add_done_callback(...) 和 .result() 等方法。前两个方法的用法与上一章例子一样,不过 .result() 方法差别很大

asyncio.Future 类的 .result() 方法没有参数,因此不能指定超时时间,此外,如果调用 .result() 方法时期物还没运行完毕,.result() 也不会阻塞等待结果,而是抛出 asyncio.InvalidStateError 异常

然而,获取 asyncio.Future 对象通常使用 yield from,从中产出结果

使用 yield from 处理期物,等待期物运行完毕这一步无需我们关心,而且不会阻塞事件循环,因为在 asyncio 包中,yield from 作用是把控制权交还给事件循环

注意,使用 yield from 处理期物与使用 add_done_callback 方法处理协程的作用一样:延时操作结束后,事件循环不会触发回调对象,而是设置期物的返回值,而 yield from 表达式则在暂停的协程中产生返回值,恢复执行协程

总之,因为 asyncio.Future 类的目的是与 yield from 一起使用,所以通常不需要使用以下方法

  • 无需调用 my_future.add_done_callback(...),因为可以直接把想在期物运行结束后执行的操作放到协程中的 yield from my_future 表达式后面。这是协程的一大优势:协程是可以暂停和恢复的函数
  • 无需调用 my_future.result(),因为 yield from 从期物中产出的值就是结果(例如 result = yield from my_future)

当然,有时也是用这些方法,但是一般情况下, asyncio.Future 对象是由 yield from 驱动,而不是靠调用这些方法驱动

从期物、任务和协程中产出

在 asyncio 包中,期物与协程关系紧密,因为可以使用 yield from 从 asyncio.Future 对象中产出结果。这意味着,如果 foo 是协程函数(调用后返回协程对象),抑或是返回 Future 或 Task 实例的普通函数,那么可以这样写: res = yield from foo() 。这是 asyncio 包的 API中很多地方可以互换协程与期物的原因之一

为了执行这些操作,必须排定协程运行时间,然后使用 asyncio.Task 对象包装协程。对协程来说,获取 Task 对象有两种主要方式

asyncio.async(coro_or_future, *, loop=None)

  • 这个函数统一了协程与期物:第一个参数可以是二者中的任意一个。如果是 Future 或 Task对象,那就原封不动的返回,如果是协程,那么 async 函数会调用 loop.create_task(...) 方法创建 Task 对象,loop 关键字是可选的用于传入事件循环;如果没有传入,那么 async 函数会通过调用 asyncio.get_event_loop() 函数获取循环对象

BaseEventLoop.create_task(coro)

  • 这个方法排定协程的执行时间,返回一个 asyncio.Task 对象。如果在自定义的 BaseEventLoop 子类上调用,返回的对象可能是外部库(如 Tornado)中与 Task 类兼容的某个类的实例

BaseEventLoop.create_task(...) 只能在 Python 3.4.2 及以上版本使用,Python 3.3 或 Python 3.4 要使用 asyncio.async(...) 函数或从 PyPI 中安装新的 asyncio 版本

asyncio 包中有多个函数会自动(内部使用的是 asyncio.async 函数)把参数指定的协程包装在 asyncio.Task 对象中,例如 BaseEventLoop.run_until_complete(...) 方法

如果想在控制台或小型测试脚本中试验期物和协程,可以使用下面代码:

import asyncio
def run_sync(coro_or_future):
    loop = asyncio.get_event_loop()
    return loop.run_until_complete(coro_or_future)

a = run_sync(some_coroutine())

在 asyncio 文档中有个注解说道:这份文档把一些方法说成协程,即使它们其实是返回 Future 对象的普通 Python 函数,这是故意的,为的是给以后修改这些函数的实现留下余地

使用 asyncio 和 aiohttp 包下载


In [2]:
import asyncio
import aiohttp # 不是标准库中的,需要 pip 安装
import time

POP20_CC = ('CN IN US ID BR PK NG BD RU JP '
            'MX PH VN ET EG DE IR TR CD FR').split()

BASE_URL = 'http://flupy.org/data/flags'
DEST_DIR = 'downloads/'

def save_flag(img, filename):
    path = os.path.join(DEST_DIR, filename)
    with open(path, 'wb') as fp:
        fp.write(img)
        
def show(text):
    print(text, end=' ') # 输出末尾的换行符变成了空格
    sys.stdout.flush() # Python 中正常情况下,遇到换行才会刷新 stdout 缓冲。所以这里手动刷新缓冲
    
def main(download_many):
    t0 = time.time()
    count = download_many(POP20_CC) 
    elapsed = time.time() - t0
    msg = '\n{} flags downloaded in {:.2f}s'
    print(msg.format(count, elapsed))

@asyncio.coroutine
def get_flag(cc):
    url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc = cc.lower())
    # 阻塞的操作通过协程实现,客户代码通过 yield from 把职责委托给协程,以便异步运行协程
    resp = yield from aiohttp.request('GET', url) # 读取响应内容是一项单独的异步操作
    image = yield from resp.read()
    return image

@asyncio.coroutine
def download_one(cc): # 因为这里用到了 yield from,也必须是协程
    image = yield from get_flag(cc)
    show(cc)
    save_flag(image, cc.lower() + '.gif')
    return cc

def download_many(cc_list):
    loop = asyncio.get_event_loop() # 获取事件循环底层实现的引用
    # 获取各个国旗,构建一个生成器对象列表
    to_do = [download_one(cc) for cc in sorted(cc_list)]
    # 虽然函数名是 wait,但它不是阻塞型函数,而是一个协程,等传给它的所有协程运行完毕后结束
    wait_coro = asyncio.wait(to_do)
    # 执行事件循环,直到 wait_coro 运行结束。事件循环运行过程中,脚本会在这里阻塞
    res, _ = loop.run_until_complete(wait_coro)
    loop.close()
    return len(res)

main(download_many)


Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x7f4fa55e72b0>
JP 
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x7f4fa454d6a0>
VN 
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x7f4fa55e7e10>
TR 
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x7f4fa4553da0>
IN 
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x7f4fa4569a90>
EG 
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x7f4fa55e7860>
RU 
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x7f4fa55fab00>
BD 
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x7f4fa454d0f0>
NG 
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x7f4fa454dc50>
ID 
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x7f4fa455d390>
DE 
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x7f4fa45537f0>
CN 
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x7f4fa55fa550>
FR 
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x7f4fa45694e0>
BR 
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x7f4fa55f1400>
ET 
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x7f4fa455d940>
PK 
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x7f4fa55c3e10>
CD 
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x7f4fa55f19b0>
MX 
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x7f4fa4553240>
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x7f4fa455def0>
PH IR 
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x7f4fa55f1f60>
US 
20 flags downloaded in 0.70s

如果事件循环是上下文管理器就好了,我们就可以使用 with 块保证循环会被关闭,然而实际情况是复杂的,客户代码绝不会直接创建事件循环,而是调用 asyncio.get_event_loop() 函数,获取事件循环的引用。而且有时我们代码不“拥有”事件循环,因此关闭事件循环会出错

asyncio.wait(...) 协程的参数是由一个期物或协程构成的可迭代对象,wait 会分别把各个协程包装进一个 Task 对象,最终结果是,wait 处理的所有对象都通过某种方式变成 Future 类的实例。wait 是协程函数,因此返回的是一个协程或生成器对象,wait_coro 变量中存储的正是这种对象,为了驱动协程,我们把协程传给 loop.run_until_complete(...) 方法

loop.run_until_complete 方法的参数是一个期物或协程,如果是协程,run_until_complete 一样,把协程包装进一个 Task 对象中。协程,期物都能由 yield from 驱动,这正是 run_until_complete 方法对 wait 函数返回的 wait_coro 对象所做的事。waitcoro 运行结束后返回一个元组,第一个元素是一系列结束的期物,第二个元素是一系列未结束的期物。上面的例子,第二个元素永远是空,所以我们将其赋给 ``,忽略。但是 wait 函数有两个关键字参数,如果设定了可能会返回未结束的期物,这两个参数是 timeout 和 return_when

注意,我们为了使用 asyncio 包,必须把每个访问网络的函数改成异步版,使用 yield from 处理网络,这样才能把控制权交还给事件循环。在 get_flag 函数中使用 yield from 意味着它必须像协程那样驱动

asyncio 有许多新概念要掌握,不过有个小技巧,那就是假装没有 yield from 关键字,你会发现代码会容易阅读很多。

比如说,以这个协程为例:

@asyncio.coroutine
def get_flag(cc):
    url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc = cc.lower())
    resp = yield from aiohttp.request('GET', url) 
    image = yield from resp.read()
    return image

我们可以假设它与下面这个函数作用相同,只是协程版从不阻塞

@asyncio.coroutine
def get_flag(cc):
    url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc = cc.lower())
    resp = aiohttp.request('GET', url) 
    image = resp.read()
    return image

yield from foo 语法可以防止阻塞,是因为当前协程(即包含 yield from 代码的委派生成器)暂停后,控制权回到事件循环手里,再去驱动其他协程;foo 期物或协程运行完毕后,把结果返回给暂停的协程,将其恢复

我们在 16 章说过:

  • 使用 yield from 链接的多个协程最终必须由不是协程的调用方驱动,调用方显式或隐式(例如在 for 循环中)在最外层委派生成器上调用 next(...) 函数或 send(...) 函数

  • 链条中最内层的子生成器必须是简单的生成器(只使用 yield)或者是可迭代对象

在 asyncio 包的 API 中使用 yield from 时,这两点都成立,不过要注意下面细节

  • 我们编写的协程链条始终通过把最外层委派生成器传给 asyncio 包 API 中的某个函数(如 loop.run_until_complete(...)) 驱动,也就是说,使用 asyncio 包时,我们编写的代码不通过调用 next(...) 或 send(...) 方法驱动协程 -- 这一点由 asyncio 包实现的事件循环去做

  • 我们编写的协程链条最终通过 yield from 把职责委托给 asyncio 包中的某个协程函数或协程方法(例如 yield from asyncio.sleep()),或者其他库中实现的高层协程(例如 resp = yield from aiohttp.request('GET', url))。也就是说,最内层的自生成器是真正执行 I/O 操作的函数,而不是我们自己编写的函数

概括起来就是:使用 asyncio 包时,我们编写的异步代码中包含由 asyncio 本身驱动的协程(即委派生成器),而生成器最终把职责委托给 asyncio 包或第三方库(如 aiohttp)中的协程。这种处理方式相当于架起了管道,让 asyncio 事件循环(通过我们编写的协程)驱动执行底层异步 I/O 操作的库函数

避免阻塞型调用

我们的 CPU 运行计算很快,处理网络请求很慢,有两种方法可以避免阻塞型调用终止整个应用程序的进程

  • 在单独的线程中运行各个阻塞型的操作
  • 把每个阻塞型的操作转换成非阻塞的异步调用

使用多个线程是可以的,但是各个操作系统线程(Python 使用的是这种线程)消耗的内存很多,要处理好几千个连接,每个连接都用线程的话负担不起

为了降低内存消耗,通常使用回调来实现异步调用。这是一种低层的概念,类似于所有并发机制中最古老,最原始的那种--硬件中断。使用回调时,我们不等待响应,而是注册一个函数,在某件事时调用。这样,所有的调用都是非阻塞的。因为回调简单,而且消耗低,所以比较推荐这种方式

当然,只有异步应用程序低层的事件循环能依靠基础设置的中断、线程、轮询和后台进程蛋等,确保多个并发请求能取得进展并最终完成,这样才能使用回调。事件循环获得响应后,会回过头来调用我们指定的回调。不过,如果做法正确,时间循环和应用代码公用的主线程绝不会阻塞

把生成器当做协程使用是异步编程的另一种方式。对事件循环来说,调用回调与在暂停的协程上调用 .send() 方法效果差不多,各个暂停的协程是要消耗内存,但是比起线程来说,消耗的极小。

asyncio 的基础设施获得第一个响应后,事件循环把响应发给等待结果的 get_flag 协程,得到响应后,get_flag 向前执行到下一个 yield from 表达式处,调用 resp.read() 方法,然后把主动权还给主循环。其他响应会陆续返回(因为请求几乎同时发出)。所有 get_flag 协程都获得结果后,委派器 download_one 恢复,保存图像文件

为了提高性能,save_flag 应该执行异步操作,可是 asyncio 包目前没有提供异步文件系统 API(None有)。如果这是应用的瓶颈,可以使用 loop.run_in_exector 方法。后面会说明如何使用

因为异步操作和执行是交叉执行的,所以并发下载多张图像所需的总时间比依序下载少得多,我们使用 asyncio 包发起了 600 个 HTTP 请求,获得所有结果的时间比依序下载快 70 倍

改进 asyncio 下载脚本

需要引用 17 章的 flags2_common.py 中的函数,flags2_common.py:


In [5]:
import os
import time
import sys
import string
import argparse
from collections import namedtuple
from enum import Enum


Result = namedtuple('Result', 'status data')

HTTPStatus = Enum('Status', 'ok not_found error')

POP20_CC = ('CN IN US ID BR PK NG BD RU JP '
            'MX PH VN ET EG DE IR TR CD FR').split()

DEFAULT_CONCUR_REQ = 1
MAX_CONCUR_REQ = 1

SERVERS = {
    'REMOTE': 'http://flupy.org/data/flags',
    'LOCAL':  'http://localhost:8001/flags',
    'DELAY':  'http://localhost:8002/flags',
    'ERROR':  'http://localhost:8003/flags',
}
DEFAULT_SERVER = 'LOCAL'

DEST_DIR = 'downloads/'
COUNTRY_CODES_FILE = 'country_codes.txt'


def save_flag(img, filename):
    path = os.path.join(DEST_DIR, filename)
    with open(path, 'wb') as fp:
        fp.write(img)


def initial_report(cc_list, actual_req, server_label):
    if len(cc_list) <= 10:
        cc_msg = ', '.join(cc_list)
    else:
        cc_msg = 'from {} to {}'.format(cc_list[0], cc_list[-1])
    print('{} site: {}'.format(server_label, SERVERS[server_label]))
    msg = 'Searching for {} flag{}: {}'
    plural = 's' if len(cc_list) != 1 else ''
    print(msg.format(len(cc_list), plural, cc_msg))
    plural = 's' if actual_req != 1 else ''
    msg = '{} concurrent connection{} will be used.'
    print(msg.format(actual_req, plural))


def final_report(cc_list, counter, start_time):
    elapsed = time.time() - start_time
    print('-' * 20)
    msg = '{} flag{} downloaded.'
    plural = 's' if counter[HTTPStatus.ok] != 1 else ''
    print(msg.format(counter[HTTPStatus.ok], plural))
    if counter[HTTPStatus.not_found]:
        print(counter[HTTPStatus.not_found], 'not found.')
    if counter[HTTPStatus.error]:
        plural = 's' if counter[HTTPStatus.error] != 1 else ''
        print('{} error{}.'.format(counter[HTTPStatus.error], plural))
    print('Elapsed time: {:.2f}s'.format(elapsed))


def expand_cc_args(every_cc, all_cc, cc_args, limit):
    codes = set()
    A_Z = string.ascii_uppercase
    if every_cc:
        codes.update(a+b for a in A_Z for b in A_Z)
    elif all_cc:
        with open(COUNTRY_CODES_FILE) as fp:
            text = fp.read()
        codes.update(text.split())
    else:
        for cc in (c.upper() for c in cc_args):
            if len(cc) == 1 and cc in A_Z:
                codes.update(cc+c for c in A_Z)
            elif len(cc) == 2 and all(c in A_Z for c in cc):
                codes.add(cc)
            else:
                msg = 'each CC argument must be A to Z or AA to ZZ.'
                raise ValueError('*** Usage error: '+msg)
    return sorted(codes)[:limit]


def process_args(default_concur_req):
    server_options = ', '.join(sorted(SERVERS))
    parser = argparse.ArgumentParser(
                description='Download flags for country codes. '
                'Default: top 20 countries by population.')
    parser.add_argument('cc', metavar='CC', nargs='*',
                help='country code or 1st letter (eg. B for BA...BZ)')
    parser.add_argument('-a', '--all', action='store_true',
                help='get all available flags (AD to ZW)')
    parser.add_argument('-e', '--every', action='store_true',
                help='get flags for every possible code (AA...ZZ)')
    parser.add_argument('-l', '--limit', metavar='N', type=int,
                help='limit to N first codes', default=sys.maxsize)
    parser.add_argument('-m', '--max_req', metavar='CONCURRENT', type=int,
                default=default_concur_req,
                help='maximum concurrent requests (default={})'
                      .format(default_concur_req))
    parser.add_argument('-s', '--server', metavar='LABEL',
                default=DEFAULT_SERVER,
                help='Server to hit; one of {} (default={})'
                      .format(server_options, DEFAULT_SERVER))
    parser.add_argument('-v', '--verbose', action='store_true',
                help='output detailed progress info')
    args = parser.parse_args()
    if args.max_req < 1:
        print('*** Usage error: --max_req CONCURRENT must be >= 1')
        parser.print_usage()
        sys.exit(1)
    if args.limit < 1:
        print('*** Usage error: --limit N must be >= 1')
        parser.print_usage()
        sys.exit(1)
    args.server = args.server.upper()
    if args.server not in SERVERS:
        print('*** Usage error: --server LABEL must be one of',
              server_options)
        parser.print_usage()
        sys.exit(1)
    try:
        cc_list = expand_cc_args(args.every, args.all, args.cc, args.limit)
    except ValueError as exc:
        print(exc.args[0])
        parser.print_usage()
        sys.exit(1)

    if not cc_list:
        cc_list = sorted(POP20_CC)
    return args, cc_list


def main(download_many, default_concur_req, max_concur_req):
    args, cc_list = process_args(default_concur_req)
    actual_req = min(args.max_req, max_concur_req, len(cc_list))
    initial_report(cc_list, actual_req, args.server)
    base_url = SERVERS[args.server]
    t0 = time.time()
    counter = download_many(cc_list, base_url, args.verbose, actual_req)
    assert sum(counter.values()) == len(cc_list), \
        'some downloads are unaccounted for'
    final_report(cc_list, counter, t0)

下面是我们的改进内容:flags2_asyncio.py:


In [ ]:
import asyncio
import collections
import contextlib

import aiohttp
from aiohttp import web
import tqdm

from flags2_common import main, HTTPStatus, Result, save_flag

# default set low to avoid errors from remote site, such as
# 503 - Service Temporarily Unavailable
DEFAULT_CONCUR_REQ = 5
MAX_CONCUR_REQ = 1000


class FetchError(Exception):  # 这个自定义的异常用于包装其他 HTTP 或网络异常,并获取 country_code 返回错误
    def __init__(self, country_code):
        self.country_code = country_code


@asyncio.coroutine
def get_flag(base_url, cc):
    # 返回 3 种结果:返回下载得到的图像,HTTP 代码为 404 时,抛出 web.HTTPNotFound 异常,
    # 返回其他 HTTP 状态码时,抛出 aiohttp.HttpProcessingError 异常
    url = '{}/{cc}/{cc}.gif'.format(base_url, cc=cc.lower())
    resp = yield from aiohttp.request('GET', url)
    with contextlib.closing(resp):
        if resp.status == 200:
            image = yield from resp.read()
            return image
        elif resp.status == 404:
            raise web.HTTPNotFound()
        else:
            raise aiohttp.HttpProcessingError(
                code=resp.status, message=resp.reason,
                headers=resp.headers)


@asyncio.coroutine
def download_one(cc, base_url, semaphore, verbose): 
    # semaphore 参数是 asyncio.Semaphore 类的实例,用于限制并发请求数量
    try:
        # 把 semaphore 当成上下文管理器用,防止阻塞整个系统,如果 semaphore 计数器的值是允许的最大值,这个协程会阻塞
        with (yield from semaphore):  
            # 退出 with 语句后,semaphore 计数器会递减,解除阻塞可能在等待同一个 semaphore 对象的其他协程实例
            image = yield from get_flag(base_url, cc)  
    except web.HTTPNotFound:  # 没找到国旗,相应的设置 Resutl 状态
        status = HTTPStatus.not_found
        msg = 'not found'
    except Exception as exc:
        raise FetchError(cc) from exc  # 其他的异常当做 FetchError 抛出
    else:
        save_flag(image, cc.lower() + '.gif')  # 国旗保存到硬盘
        status = HTTPStatus.ok
        msg = 'OK'

    if verbose and msg:
        print(cc, msg)

    return Result(status, cc)
# END FLAGS2_ASYNCIO_TOP

# BEGIN FLAGS2_ASYNCIO_DOWNLOAD_MANY
@asyncio.coroutine
def downloader_coro(cc_list, base_url, verbose, concur_req):
    counter = collections.Counter()
    # 创建一个 asyncio.Semaphore 实例,最多允许激活 concur_req 个使用这个计数器的协程
    semaphore = asyncio.Semaphore(concur_req)  
    to_do = [download_one(cc, base_url, semaphore, verbose)
             for cc in sorted(cc_list)]  # 创建一个协程列表

    to_do_iter = asyncio.as_completed(to_do)  # 获取一个迭代器,这个迭代器会在期物运行结束后返回期物
    if not verbose:
        to_do_iter = tqdm.tqdm(to_do_iter, total=len(cc_list))  # 显示进度
    for future in to_do_iter:  # 迭代运行结束的期物
        try:
            res = yield from future  # 获取 asyncio.Future 对象结果
        except FetchError as exc:  # download_one 函数抛出的各个异常都包装在 Fetch 对象里
            country_code = exc.country_code  # 从 FetchError 异常中获取错误发生时的国家代码
            try:
                error_msg = exc.__cause__.args[0]  # 尝试从原来的异常(__cause__)中获取错误消息
            except IndexError:
                error_msg = exc.__cause__.__class__.__name__  # 如果在原来的异常中找不到错误消息,使用所链接异常的类名作为错误消息。
            if verbose and error_msg:
                msg = '*** Error for {}: {}'
                print(msg.format(country_code, error_msg))
            status = HTTPStatus.error
        else:
            status = res.status

        counter[status] += 1  # 记录结果

    return counter  # 返回计数器

def download_many(cc_list, base_url, verbose, concur_req):
    loop = asyncio.get_event_loop()
    coro = downloader_coro(cc_list, base_url, verbose, concur_req)
    counts = loop.run_until_complete(coro)  # download_many 函数只是实例化 downloader_coro 协程,然后通过 run_until_complete 方法把它传给事件循环。
    loop.close()

    return counts


if __name__ == '__main__':
    main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ)
# END FLAGS2_ASYNCIO_DOWNLOAD_MANY

执行效果如下:

python3 flags2_asyncio.py -s ERROR -al 100 -m 100
ERROR site: http://localhost:8003/flags
Searching for 100 flags: from AD to LK
100 concurrent connections will be used.
--------------------
100 flags downloaded.
Elapsed time: 27.33s

对于这种网络客户端代码来说,一定要使用某种限流机制,防止向服务器发起太多并发请求,因为如果服务器过载,可能整体性能会下降


In [ ]: