In [1]:
import threading
import itertools
import time
import sys
class Signal:
go = True
def spin(msg, signal):
write, flush = sys.stdout.write, sys.stdout.flush
for char in itertools.cycle('|/-\\'):
status = char + ' ' + msg
write(status)
flush()
write('\x08' * len(status)) # \x08 是退格符
time.sleep(.1)
if not signal.go:
break
write(' ' * len(status) + '\x08' * len(status)) # 最后用空格擦除屏幕内容,把光标移回开头
def slow_function():
time.sleep(3) # 3 秒动画
return 42
def supervisor():
signal = Signal()
spinner = threading.Thread(target=spin,
args=('thinking!', signal))
print('spinner object:', spinner)
spinner.start()
result = slow_function()
signal.go = False
spinner.join()
return result
def main():
result = supervisor()
print('Answer:', result)
main()
In [2]:
import asyncio
import itertools
import sys
@asyncio.coroutine # 打算交给 asyncio 处理的使用此装饰器,不是强制要求,但强烈建议这么做,原因后面讲
def spin(msg): # 这里不需要上面例子中的 signal 参数
write, flush = sys.stdout.write, sys.stdout.flush
for char in itertools.cycle('|/-\\'):
status = char + ' ' + msg
write(status)
flush()
write('\x08' * len(status))
try:
yield from asyncio.sleep(.1) # 代替 time.sleep(.1),这样休眠不会阻塞事件循环
except asyncio.CancelledError: # 如果 spin 函数苏醒后抛出此异常,其原因是发出了取消请求,因此退出循环
break
write(' ' * len(status) + '\x08' * len(status))
@asyncio.coroutine
def slow_function(): # 现在这个函数是协程,在用休眠假装进行 I/O 操作时,使用 yield from 继续执行事件循环
yield from asyncio.sleep(3) # 将控制权交给主循环,3 秒后休眠结束恢复此线程
return 42
@asyncio.coroutine
def supervisor(): # 这个函数是协程,因此可以使用 yield from 驱动 slow_function() 函数
spinner = asyncio.async(spin('thinking!')) # 排定 spin 协程的运行时间,使用一个 Task 对象包装 spin 协程,并立即返回
print('spinner object:', spinner)
result = yield from slow_function() # 驱动 slow_function(), 结束后获取返回值,同时事件继续执行,因为此函数 sleep 将控制权交回主循环
spinner.cancel() # Task 对象可以取消,取消后会在协程当前暂停的 yield 抛出 asyncio.CancelledError 异常,协程可以捕获这个异常,也可以延时取消,甚至可以不取消
return result
def main():
loop = asyncio.get_event_loop() # 获取事件循环的引用
result = loop.run_until_complete(supervisor()) # 驱动 supervisor,让其运行完毕,这个协程返回的是这次调用的返回值
loop.close()
print('Answer:', result)
main()
使用 @asyncio.coroutine 装饰器不是强制要求,但是强烈建议这么做,因为这样能在一众普通的函数中把协程凸现出来,也有助于调试,如果还没从中产出值,协程就被垃圾回收了(意味着有操作未完成,因此可能是个缺陷),那就可以发出警告。这个装饰器不会预激协程
注意这两个函数的不同
In [2]:
# def supervisor():
# signal = Signal()
# spinner = threading.Thread(target=spin,
# args=('thinking!', signal))
# print('spinner object:', spinner)
# spinner.start()
# result = slow_function()
# signal.go = False
# spinner.join()
# return result
# @asyncio.coroutine
# def supervisor(): # 这个函数是协程,因此可以使用 yield from 驱动 slow_function() 函数
# spinner = asyncio.async(spin('thinking!')) # 排定 spin 协程的运行时间,使用一个 Task 对象包装 spin 协程,并立即返回
# print('spinner object:', spinner)
# result = yield from slow_function() # 驱动 slow_function(), 结束后获取返回值,同时事件继续执行,因为此函数 sleep 将控制权交回主循环
# spinner.cancel() # Task 对象可以取消,取消后会在协程当前暂停的 yield 抛出 asyncio.CancelledError 异常,协程可以捕获这个异常,也可以延时取消,甚至可以不取消
# return result
主要区别如下:
线程与协程还有一点区别要说明:如果使用线程编程,调度程序任何时候都能中断线程,必须记住保留锁,去保护程序中的重要部分,防止多步操作在执行的过程中中断,防止数据处于无效状态
协程默认会做好全方位包含,防止中断,我们必须显式产出才能让程序的余下部分运行,对于协程来说,无需保留锁,在多个线程之间同步操作,协程自身就会同步,因此在任意时刻只有一个协程运行,想交出控制权时,可以使用 yield 或 yield from 把控制权交还给调度程序。这就是能够安全地取消协程的原因:按照定义,协程只能在暂停的 yield 处取消,因此可以处理 CancelledError 异常,执行清理操作
asyncio.Future 类和 concurrent.futures.Futer 类的接口基本一致,不过实现方式不同,不能互换。
期物只是调度执行某物的结果,在 asyncio 包中,BaseEventLoop.create_task(...) 方法来接收一个协程,排定它的运行时间,然后返回一个 asyncio.Task 实例 -- 也是 asyncio.Future 类的实例,因为 Task 是 Future 的子类,用于包装协程,这与调用 Executor.submit(...) 方法创建 concurrent.futures.Future 实例是一个道理
与 concurrent.futures.Future 类似,asyncio.Future 类也提供了 .done(), .add_done_callback(...) 和 .result() 等方法。前两个方法的用法与上一章例子一样,不过 .result() 方法差别很大
asyncio.Future 类的 .result() 方法没有参数,因此不能指定超时时间,此外,如果调用 .result() 方法时期物还没运行完毕,.result() 也不会阻塞等待结果,而是抛出 asyncio.InvalidStateError 异常
然而,获取 asyncio.Future 对象通常使用 yield from,从中产出结果
使用 yield from 处理期物,等待期物运行完毕这一步无需我们关心,而且不会阻塞事件循环,因为在 asyncio 包中,yield from 作用是把控制权交还给事件循环
注意,使用 yield from 处理期物与使用 add_done_callback 方法处理协程的作用一样:延时操作结束后,事件循环不会触发回调对象,而是设置期物的返回值,而 yield from 表达式则在暂停的协程中产生返回值,恢复执行协程
总之,因为 asyncio.Future 类的目的是与 yield from 一起使用,所以通常不需要使用以下方法
当然,有时也是用这些方法,但是一般情况下, asyncio.Future 对象是由 yield from 驱动,而不是靠调用这些方法驱动
在 asyncio 包中,期物与协程关系紧密,因为可以使用 yield from 从 asyncio.Future 对象中产出结果。这意味着,如果 foo 是协程函数(调用后返回协程对象),抑或是返回 Future 或 Task 实例的普通函数,那么可以这样写: res = yield from foo() 。这是 asyncio 包的 API中很多地方可以互换协程与期物的原因之一
为了执行这些操作,必须排定协程运行时间,然后使用 asyncio.Task 对象包装协程。对协程来说,获取 Task 对象有两种主要方式
asyncio.async(coro_or_future, *, loop=None)
BaseEventLoop.create_task(coro)
BaseEventLoop.create_task(...) 只能在 Python 3.4.2 及以上版本使用,Python 3.3 或 Python 3.4 要使用 asyncio.async(...) 函数或从 PyPI 中安装新的 asyncio 版本
asyncio 包中有多个函数会自动(内部使用的是 asyncio.async 函数)把参数指定的协程包装在 asyncio.Task 对象中,例如 BaseEventLoop.run_until_complete(...) 方法
如果想在控制台或小型测试脚本中试验期物和协程,可以使用下面代码:
import asyncio
def run_sync(coro_or_future):
loop = asyncio.get_event_loop()
return loop.run_until_complete(coro_or_future)
a = run_sync(some_coroutine())
在 asyncio 文档中有个注解说道:这份文档把一些方法说成协程,即使它们其实是返回 Future 对象的普通 Python 函数,这是故意的,为的是给以后修改这些函数的实现留下余地
In [2]:
import asyncio
import aiohttp # 不是标准库中的,需要 pip 安装
import time
POP20_CC = ('CN IN US ID BR PK NG BD RU JP '
'MX PH VN ET EG DE IR TR CD FR').split()
BASE_URL = 'http://flupy.org/data/flags'
DEST_DIR = 'downloads/'
def save_flag(img, filename):
path = os.path.join(DEST_DIR, filename)
with open(path, 'wb') as fp:
fp.write(img)
def show(text):
print(text, end=' ') # 输出末尾的换行符变成了空格
sys.stdout.flush() # Python 中正常情况下,遇到换行才会刷新 stdout 缓冲。所以这里手动刷新缓冲
def main(download_many):
t0 = time.time()
count = download_many(POP20_CC)
elapsed = time.time() - t0
msg = '\n{} flags downloaded in {:.2f}s'
print(msg.format(count, elapsed))
@asyncio.coroutine
def get_flag(cc):
url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc = cc.lower())
# 阻塞的操作通过协程实现,客户代码通过 yield from 把职责委托给协程,以便异步运行协程
resp = yield from aiohttp.request('GET', url) # 读取响应内容是一项单独的异步操作
image = yield from resp.read()
return image
@asyncio.coroutine
def download_one(cc): # 因为这里用到了 yield from,也必须是协程
image = yield from get_flag(cc)
show(cc)
save_flag(image, cc.lower() + '.gif')
return cc
def download_many(cc_list):
loop = asyncio.get_event_loop() # 获取事件循环底层实现的引用
# 获取各个国旗,构建一个生成器对象列表
to_do = [download_one(cc) for cc in sorted(cc_list)]
# 虽然函数名是 wait,但它不是阻塞型函数,而是一个协程,等传给它的所有协程运行完毕后结束
wait_coro = asyncio.wait(to_do)
# 执行事件循环,直到 wait_coro 运行结束。事件循环运行过程中,脚本会在这里阻塞
res, _ = loop.run_until_complete(wait_coro)
loop.close()
return len(res)
main(download_many)
如果事件循环是上下文管理器就好了,我们就可以使用 with 块保证循环会被关闭,然而实际情况是复杂的,客户代码绝不会直接创建事件循环,而是调用 asyncio.get_event_loop() 函数,获取事件循环的引用。而且有时我们代码不“拥有”事件循环,因此关闭事件循环会出错
asyncio.wait(...) 协程的参数是由一个期物或协程构成的可迭代对象,wait 会分别把各个协程包装进一个 Task 对象,最终结果是,wait 处理的所有对象都通过某种方式变成 Future 类的实例。wait 是协程函数,因此返回的是一个协程或生成器对象,wait_coro 变量中存储的正是这种对象,为了驱动协程,我们把协程传给 loop.run_until_complete(...) 方法
loop.run_until_complete 方法的参数是一个期物或协程,如果是协程,run_until_complete 一样,把协程包装进一个 Task 对象中。协程,期物都能由 yield from 驱动,这正是 run_until_complete 方法对 wait 函数返回的 wait_coro 对象所做的事。waitcoro 运行结束后返回一个元组,第一个元素是一系列结束的期物,第二个元素是一系列未结束的期物。上面的例子,第二个元素永远是空,所以我们将其赋给 ``,忽略。但是 wait 函数有两个关键字参数,如果设定了可能会返回未结束的期物,这两个参数是 timeout 和 return_when
注意,我们为了使用 asyncio 包,必须把每个访问网络的函数改成异步版,使用 yield from 处理网络,这样才能把控制权交还给事件循环。在 get_flag 函数中使用 yield from 意味着它必须像协程那样驱动
asyncio 有许多新概念要掌握,不过有个小技巧,那就是假装没有 yield from 关键字,你会发现代码会容易阅读很多。
比如说,以这个协程为例:
@asyncio.coroutine
def get_flag(cc):
url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc = cc.lower())
resp = yield from aiohttp.request('GET', url)
image = yield from resp.read()
return image
我们可以假设它与下面这个函数作用相同,只是协程版从不阻塞
@asyncio.coroutine
def get_flag(cc):
url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc = cc.lower())
resp = aiohttp.request('GET', url)
image = resp.read()
return image
yield from foo 语法可以防止阻塞,是因为当前协程(即包含 yield from 代码的委派生成器)暂停后,控制权回到事件循环手里,再去驱动其他协程;foo 期物或协程运行完毕后,把结果返回给暂停的协程,将其恢复
我们在 16 章说过:
使用 yield from 链接的多个协程最终必须由不是协程的调用方驱动,调用方显式或隐式(例如在 for 循环中)在最外层委派生成器上调用 next(...) 函数或 send(...) 函数
链条中最内层的子生成器必须是简单的生成器(只使用 yield)或者是可迭代对象
在 asyncio 包的 API 中使用 yield from 时,这两点都成立,不过要注意下面细节
我们编写的协程链条始终通过把最外层委派生成器传给 asyncio 包 API 中的某个函数(如 loop.run_until_complete(...)) 驱动,也就是说,使用 asyncio 包时,我们编写的代码不通过调用 next(...) 或 send(...) 方法驱动协程 -- 这一点由 asyncio 包实现的事件循环去做
我们编写的协程链条最终通过 yield from 把职责委托给 asyncio 包中的某个协程函数或协程方法(例如 yield from asyncio.sleep()),或者其他库中实现的高层协程(例如 resp = yield from aiohttp.request('GET', url))。也就是说,最内层的自生成器是真正执行 I/O 操作的函数,而不是我们自己编写的函数
概括起来就是:使用 asyncio 包时,我们编写的异步代码中包含由 asyncio 本身驱动的协程(即委派生成器),而生成器最终把职责委托给 asyncio 包或第三方库(如 aiohttp)中的协程。这种处理方式相当于架起了管道,让 asyncio 事件循环(通过我们编写的协程)驱动执行底层异步 I/O 操作的库函数
我们的 CPU 运行计算很快,处理网络请求很慢,有两种方法可以避免阻塞型调用终止整个应用程序的进程
使用多个线程是可以的,但是各个操作系统线程(Python 使用的是这种线程)消耗的内存很多,要处理好几千个连接,每个连接都用线程的话负担不起
为了降低内存消耗,通常使用回调来实现异步调用。这是一种低层的概念,类似于所有并发机制中最古老,最原始的那种--硬件中断。使用回调时,我们不等待响应,而是注册一个函数,在某件事时调用。这样,所有的调用都是非阻塞的。因为回调简单,而且消耗低,所以比较推荐这种方式
当然,只有异步应用程序低层的事件循环能依靠基础设置的中断、线程、轮询和后台进程蛋等,确保多个并发请求能取得进展并最终完成,这样才能使用回调。事件循环获得响应后,会回过头来调用我们指定的回调。不过,如果做法正确,时间循环和应用代码公用的主线程绝不会阻塞
把生成器当做协程使用是异步编程的另一种方式。对事件循环来说,调用回调与在暂停的协程上调用 .send() 方法效果差不多,各个暂停的协程是要消耗内存,但是比起线程来说,消耗的极小。
asyncio 的基础设施获得第一个响应后,事件循环把响应发给等待结果的 get_flag 协程,得到响应后,get_flag 向前执行到下一个 yield from 表达式处,调用 resp.read() 方法,然后把主动权还给主循环。其他响应会陆续返回(因为请求几乎同时发出)。所有 get_flag 协程都获得结果后,委派器 download_one 恢复,保存图像文件
为了提高性能,save_flag 应该执行异步操作,可是 asyncio 包目前没有提供异步文件系统 API(None有)。如果这是应用的瓶颈,可以使用 loop.run_in_exector 方法。后面会说明如何使用
因为异步操作和执行是交叉执行的,所以并发下载多张图像所需的总时间比依序下载少得多,我们使用 asyncio 包发起了 600 个 HTTP 请求,获得所有结果的时间比依序下载快 70 倍
需要引用 17 章的 flags2_common.py 中的函数,flags2_common.py:
In [5]:
import os
import time
import sys
import string
import argparse
from collections import namedtuple
from enum import Enum
Result = namedtuple('Result', 'status data')
HTTPStatus = Enum('Status', 'ok not_found error')
POP20_CC = ('CN IN US ID BR PK NG BD RU JP '
'MX PH VN ET EG DE IR TR CD FR').split()
DEFAULT_CONCUR_REQ = 1
MAX_CONCUR_REQ = 1
SERVERS = {
'REMOTE': 'http://flupy.org/data/flags',
'LOCAL': 'http://localhost:8001/flags',
'DELAY': 'http://localhost:8002/flags',
'ERROR': 'http://localhost:8003/flags',
}
DEFAULT_SERVER = 'LOCAL'
DEST_DIR = 'downloads/'
COUNTRY_CODES_FILE = 'country_codes.txt'
def save_flag(img, filename):
path = os.path.join(DEST_DIR, filename)
with open(path, 'wb') as fp:
fp.write(img)
def initial_report(cc_list, actual_req, server_label):
if len(cc_list) <= 10:
cc_msg = ', '.join(cc_list)
else:
cc_msg = 'from {} to {}'.format(cc_list[0], cc_list[-1])
print('{} site: {}'.format(server_label, SERVERS[server_label]))
msg = 'Searching for {} flag{}: {}'
plural = 's' if len(cc_list) != 1 else ''
print(msg.format(len(cc_list), plural, cc_msg))
plural = 's' if actual_req != 1 else ''
msg = '{} concurrent connection{} will be used.'
print(msg.format(actual_req, plural))
def final_report(cc_list, counter, start_time):
elapsed = time.time() - start_time
print('-' * 20)
msg = '{} flag{} downloaded.'
plural = 's' if counter[HTTPStatus.ok] != 1 else ''
print(msg.format(counter[HTTPStatus.ok], plural))
if counter[HTTPStatus.not_found]:
print(counter[HTTPStatus.not_found], 'not found.')
if counter[HTTPStatus.error]:
plural = 's' if counter[HTTPStatus.error] != 1 else ''
print('{} error{}.'.format(counter[HTTPStatus.error], plural))
print('Elapsed time: {:.2f}s'.format(elapsed))
def expand_cc_args(every_cc, all_cc, cc_args, limit):
codes = set()
A_Z = string.ascii_uppercase
if every_cc:
codes.update(a+b for a in A_Z for b in A_Z)
elif all_cc:
with open(COUNTRY_CODES_FILE) as fp:
text = fp.read()
codes.update(text.split())
else:
for cc in (c.upper() for c in cc_args):
if len(cc) == 1 and cc in A_Z:
codes.update(cc+c for c in A_Z)
elif len(cc) == 2 and all(c in A_Z for c in cc):
codes.add(cc)
else:
msg = 'each CC argument must be A to Z or AA to ZZ.'
raise ValueError('*** Usage error: '+msg)
return sorted(codes)[:limit]
def process_args(default_concur_req):
server_options = ', '.join(sorted(SERVERS))
parser = argparse.ArgumentParser(
description='Download flags for country codes. '
'Default: top 20 countries by population.')
parser.add_argument('cc', metavar='CC', nargs='*',
help='country code or 1st letter (eg. B for BA...BZ)')
parser.add_argument('-a', '--all', action='store_true',
help='get all available flags (AD to ZW)')
parser.add_argument('-e', '--every', action='store_true',
help='get flags for every possible code (AA...ZZ)')
parser.add_argument('-l', '--limit', metavar='N', type=int,
help='limit to N first codes', default=sys.maxsize)
parser.add_argument('-m', '--max_req', metavar='CONCURRENT', type=int,
default=default_concur_req,
help='maximum concurrent requests (default={})'
.format(default_concur_req))
parser.add_argument('-s', '--server', metavar='LABEL',
default=DEFAULT_SERVER,
help='Server to hit; one of {} (default={})'
.format(server_options, DEFAULT_SERVER))
parser.add_argument('-v', '--verbose', action='store_true',
help='output detailed progress info')
args = parser.parse_args()
if args.max_req < 1:
print('*** Usage error: --max_req CONCURRENT must be >= 1')
parser.print_usage()
sys.exit(1)
if args.limit < 1:
print('*** Usage error: --limit N must be >= 1')
parser.print_usage()
sys.exit(1)
args.server = args.server.upper()
if args.server not in SERVERS:
print('*** Usage error: --server LABEL must be one of',
server_options)
parser.print_usage()
sys.exit(1)
try:
cc_list = expand_cc_args(args.every, args.all, args.cc, args.limit)
except ValueError as exc:
print(exc.args[0])
parser.print_usage()
sys.exit(1)
if not cc_list:
cc_list = sorted(POP20_CC)
return args, cc_list
def main(download_many, default_concur_req, max_concur_req):
args, cc_list = process_args(default_concur_req)
actual_req = min(args.max_req, max_concur_req, len(cc_list))
initial_report(cc_list, actual_req, args.server)
base_url = SERVERS[args.server]
t0 = time.time()
counter = download_many(cc_list, base_url, args.verbose, actual_req)
assert sum(counter.values()) == len(cc_list), \
'some downloads are unaccounted for'
final_report(cc_list, counter, t0)
下面是我们的改进内容:flags2_asyncio.py:
In [ ]:
import asyncio
import collections
import contextlib
import aiohttp
from aiohttp import web
import tqdm
from flags2_common import main, HTTPStatus, Result, save_flag
# default set low to avoid errors from remote site, such as
# 503 - Service Temporarily Unavailable
DEFAULT_CONCUR_REQ = 5
MAX_CONCUR_REQ = 1000
class FetchError(Exception): # 这个自定义的异常用于包装其他 HTTP 或网络异常,并获取 country_code 返回错误
def __init__(self, country_code):
self.country_code = country_code
@asyncio.coroutine
def get_flag(base_url, cc):
# 返回 3 种结果:返回下载得到的图像,HTTP 代码为 404 时,抛出 web.HTTPNotFound 异常,
# 返回其他 HTTP 状态码时,抛出 aiohttp.HttpProcessingError 异常
url = '{}/{cc}/{cc}.gif'.format(base_url, cc=cc.lower())
resp = yield from aiohttp.request('GET', url)
with contextlib.closing(resp):
if resp.status == 200:
image = yield from resp.read()
return image
elif resp.status == 404:
raise web.HTTPNotFound()
else:
raise aiohttp.HttpProcessingError(
code=resp.status, message=resp.reason,
headers=resp.headers)
@asyncio.coroutine
def download_one(cc, base_url, semaphore, verbose):
# semaphore 参数是 asyncio.Semaphore 类的实例,用于限制并发请求数量
try:
# 把 semaphore 当成上下文管理器用,防止阻塞整个系统,如果 semaphore 计数器的值是允许的最大值,这个协程会阻塞
with (yield from semaphore):
# 退出 with 语句后,semaphore 计数器会递减,解除阻塞可能在等待同一个 semaphore 对象的其他协程实例
image = yield from get_flag(base_url, cc)
except web.HTTPNotFound: # 没找到国旗,相应的设置 Resutl 状态
status = HTTPStatus.not_found
msg = 'not found'
except Exception as exc:
raise FetchError(cc) from exc # 其他的异常当做 FetchError 抛出
else:
save_flag(image, cc.lower() + '.gif') # 国旗保存到硬盘
status = HTTPStatus.ok
msg = 'OK'
if verbose and msg:
print(cc, msg)
return Result(status, cc)
# END FLAGS2_ASYNCIO_TOP
# BEGIN FLAGS2_ASYNCIO_DOWNLOAD_MANY
@asyncio.coroutine
def downloader_coro(cc_list, base_url, verbose, concur_req):
counter = collections.Counter()
# 创建一个 asyncio.Semaphore 实例,最多允许激活 concur_req 个使用这个计数器的协程
semaphore = asyncio.Semaphore(concur_req)
to_do = [download_one(cc, base_url, semaphore, verbose)
for cc in sorted(cc_list)] # 创建一个协程列表
to_do_iter = asyncio.as_completed(to_do) # 获取一个迭代器,这个迭代器会在期物运行结束后返回期物
if not verbose:
to_do_iter = tqdm.tqdm(to_do_iter, total=len(cc_list)) # 显示进度
for future in to_do_iter: # 迭代运行结束的期物
try:
res = yield from future # 获取 asyncio.Future 对象结果
except FetchError as exc: # download_one 函数抛出的各个异常都包装在 Fetch 对象里
country_code = exc.country_code # 从 FetchError 异常中获取错误发生时的国家代码
try:
error_msg = exc.__cause__.args[0] # 尝试从原来的异常(__cause__)中获取错误消息
except IndexError:
error_msg = exc.__cause__.__class__.__name__ # 如果在原来的异常中找不到错误消息,使用所链接异常的类名作为错误消息。
if verbose and error_msg:
msg = '*** Error for {}: {}'
print(msg.format(country_code, error_msg))
status = HTTPStatus.error
else:
status = res.status
counter[status] += 1 # 记录结果
return counter # 返回计数器
def download_many(cc_list, base_url, verbose, concur_req):
loop = asyncio.get_event_loop()
coro = downloader_coro(cc_list, base_url, verbose, concur_req)
counts = loop.run_until_complete(coro) # download_many 函数只是实例化 downloader_coro 协程,然后通过 run_until_complete 方法把它传给事件循环。
loop.close()
return counts
if __name__ == '__main__':
main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ)
# END FLAGS2_ASYNCIO_DOWNLOAD_MANY
执行效果如下:
python3 flags2_asyncio.py -s ERROR -al 100 -m 100
ERROR site: http://localhost:8003/flags
Searching for 100 flags: from AD to LK
100 concurrent connections will be used.
--------------------
100 flags downloaded.
Elapsed time: 27.33s
对于这种网络客户端代码来说,一定要使用某种限流机制,防止向服务器发起太多并发请求,因为如果服务器过载,可能整体性能会下降
In [ ]: