Tornado 异步非阻塞浅析

先上代码演示


In [ ]:
#!/usr/bin/python
# coding: utf-8
"""
File: demo.py
Author: zhangxu01 <zhangxu01@zhihu.com>
Date: 2017-08-28 22:59
Description: demo
"""
import random
import time
import urllib

import requests
import tornado

from tornado import gen, web
from tornado.httpclient import AsyncHTTPClient, HTTPRequest


@gen.coroutine
def async_fetch(url, method="GET", data=None, timeout=2 * 30,):
    request = url
    if isinstance(url, HTTPRequest):
        url = request.url
    if not isinstance(url, HTTPRequest):
        kwargs = {
            "connect_timeout": timeout,
            "request_timeout": timeout,
        }
        if data:
            if method == "GET":
                url += '?' + urllib.urlencode(data)
            else:
                kwargs["body"] = urllib.urlencode(data)
        request = HTTPRequest(url, method, **kwargs)

    http_client = AsyncHTTPClient()
    response = yield http_client.fetch(request)
    raise gen.Return(response)


@gen.coroutine
def service_method():
    url = "http://127.0.0.1:2345/api/time"
    data = {
        "time": str(time.time())
    }
    response = yield async_fetch(url, data=data)
    raise gen.Return(response)


class NoBlockHandler(tornado.web.RequestHandler):

    @web.asynchronous
    @gen.coroutine
    def get(self):
        result = yield service_method()
        self.write(result.body)
        self.finish()


class BlockHandler(tornado.web.RequestHandler):

    def get(self):
        begin_time = time.time()
        response = requests.get("http://127.0.0.1:2345/api/time", data={"time": str(begin_time)})
        self.write(response.content)
        self.finish()


class TimeHandler(tornado.web.RequestHandler):

    def get(self):
        req_time = self.get_argument("time", "")
        sleep_time = random.randint(10, 500) * 0.001
        time.sleep(sleep_time)
        self.write("b:{},s:{} => e:{}".format(req_time, sleep_time, time.time()))
        self.finish()


class Application(tornado.web.Application):

    def __init__(self):
        settings = {
            "xsrf_cookies": False,
        }
        handlers = [
            (r"/api/noblock", NoBlockHandler),  # 非阻塞 IO 请求
            (r"/api/block", BlockHandler),  # 阻塞 IO 请求
            (r"/api/time", TimeHandler),  # 被请求接口
        ]
        tornado.web.Application.__init__(self, handlers, **settings)


def main():
    """ main"""
    tornado.httpserver.HTTPServer(Application()).listen(2345)
    tornado.ioloop.IOLoop.instance().start()


if __name__ == "__main__":
    main()

IOLoop 模块

主要用来处理任务调度

def start(self):

    # do something...
    try:
        while True:
            # do something...
            try:
                event_pairs = self._impl.poll(poll_timeout)
            except Exception as e:
                # do something...
                pass
            self._events.update(event_pairs)
            while self._events:
                fd, events = self._events.popitem()
                try:
                    fd_obj, handler_func = self._handlers[fd]
                    handler_func(fd_obj, events)
                except (OSError, IOError) as e:
                    # do something...
                    pass
                except Exception:
                    # do something...
                    pass
    finally:
        # do something...

self._impl.poll(poll_timeout) 会接收请求,等待期间阻塞状态,超时会进入下一次轮训继续等待。

handler_func(fd_obj, events) 会去处理接收的请求,通过HTTPServer -> TCPServer -> gen.coroutine

gen.coroutine 装饰器

主要用来将一个 IOLoop 任务拆分成多个异步子任务

def coroutine(func, replace_callback=True):
    @functools.wraps(wrapped)
    def wrapper(*args, **kwargs):
        future = TracebackFuture()
        # do something...
        try:
            result = func(*args, **kwargs)
        except Exception:
            # do something...
            pass
        else:
            if isinstance(result, GeneratorType):
                try:
                    yielded = next(result)
                    # do something...
                except Exception:
                    future.set_exc_info(sys.exc_info())
                else:
                    Runner(result, future, yielded)
                yielded = None
                try:
                    return future
                finally:
                    future = None
        future.set_result(result)
        return future
    return wrapper
def am():
    abc = 1234
    try:
        print "try"
        return abc
    finally:
        print "finally"
        abc = None
    print "out"
    return abc
am()
try
finally
Out[19]: 1234

高性能 IO 模型在 Tornado 中的应用

大纲

1. 基本概念

  • 同步与异步
  • 阻塞与非阻塞
  • 用户空间和内核空间
  • 缓存 I/O

2. 四种常见 IO 模型

  • 同步阻塞 IO
  • 同步非阻塞 IO
  • 异步 IO
  • IO 多路复用

3. Tornado 异步非阻塞 IO

  • Tornado AsyncHTTPClient() 实例演示
  • Tornado IOLoop 的调度方式

1. 基本概念

同步和异步

描述的是用户线程与内核的交互方式

同步是指用户线程发起IO请求后需要等待或者轮询内核IO操作完成后才能继续执行;而异步是指用户线程发起IO请求后仍继续执行,当内核IO操作完成后会通知用户线程,或者调用用户线程注册的回调函数。

阻塞和非阻塞

描述的是用户线程调用内核IO的操作方式

阻塞是指IO操作需要彻底完成后才返回到用户空间;而非阻塞是指IO操作被调用后立即返回给用户一个状态值,无需等到IO操作彻底完成。

同步异步 阻塞非阻塞的区别

同步和异步关注的是消息通信机制;阻塞和非阻塞关注的是程序在等待调用结果(消息,返回值)时的状态。

同步,就是在发出一个『调用』时,在没有得到结果之前,该『调用』就不返回。但是一旦调用返回,就得到返回值了。换句话说,就是由『调用者』主动等待这个『调用』的结果。异步则是相反,『调用』在发出之后,这个调用就直接返回了,所以没有返回结果。换句话说,当一个异步过程调用发出后,调用者不会立刻得到结果。而是在『调用』发出后,『被调用者』通过状态、通知来通知调用者,或通过回调函数处理这个调用。

阻塞调用是指调用结果返回之前,当前线程会被挂起,调用线程只有在得到结果之后才会返回。非阻塞调用指在不能立刻得到结果之前,该调用不会阻塞当前线程。

用户空间与内核空间

Linux 操作系统和驱动程序运行在内核空间,应用程序运行在用户空间

现在操作系统都是采用虚拟存储器,那么对32位操作系统而言,它的寻址空间(虚拟存储空间)为4G(2的32次方)。操作系统的核心是内核,独立于普通的应用程序,可以访问受保护的内存空间,也有访问底层硬件设备的所有权限。为了保证用户进程不能直接操作内核(kernel),保证内核的安全,操心系统将虚拟空间划分为两部分,一部分为内核空间,一部分为用户空间。针对linux操作系统而言,将最高的1G字节(从虚拟地址0xC0000000到0xFFFFFFFF),供内核使用,称为内核空间,而将较低的3G字节(从虚拟地址0x00000000到0xBFFFFFFF),供各个进程使用,称为用户空间。

缓存 I/O

又被称作标准 I/O,大多数文件系统的默认 I/O 操作都是缓存 I/O。在 Linux 的缓存 I/O 机制中,操作系统会将 I/O 的数据缓存在文件系统的页缓存( page cache )中,也就是说,数据会先被拷贝到操作系统内核的缓冲区中,然后才会从操作系统内核的缓冲区拷贝到应用程序的地址空间。

2. 四种常见 IO 模型

同步阻塞 IO

用户线程通过系统调用read发起IO读操作,由用户空间转到内核空间。内核等到数据包到达后,然后将接收的数据拷贝到用户空间,完成read操作。用户需要等待read将socket中的数据读取到buffer后,才继续处理接收的数据。整个IO请求的过程中,用户线程是被阻塞的,这导致用户在发起IO请求时,不能做任何事情,对CPU的资源利用率不够。

同步非阻塞 IO

同步非阻塞IO是在同步阻塞IO的基础上,将socket设置为NONBLOCK。这样做用户线程可以在发起IO请求后可以立即返回。用户需要不断地调用read,尝试读取socket中的数据,直到读取成功后,才继续处理接收的数据。整个IO请求的过程中,虽然用户线程每次发起IO请求后可以立即返回,但是为了等到数据,仍需要不断地轮询、重复请求,消耗了大量的CPU的资源。一般很少直接使用这种模型,而是在其他IO模型中使用非阻塞IO这一特性。

异步 IO

异步IO模型中,用户线程直接使用内核提供的异步IO API发起read请求,且发起后立即返回,继续执行用户线程代码。不过此时用户线程已经将调用的AsynchronousOperation和CompletionHandler注册到内核,然后操作系统开启独立的内核线程去处理IO操作。当read请求的数据到达时,由内核负责读取socket中的数据,并写入用户指定的缓冲区中。最后内核将read的数据和用户线程注册的CompletionHandler分发给内部Proactor,Proactor将IO完成的信息通知给用户线程(一般通过调用用户线程注册的完成事件处理函数),完成异步IO。

IO 多路复用

IO多路复用模型是建立在内核提供的多路分离函数select基础之上的,使用select函数可以避免同步非阻塞IO模型中轮询等待的问题。通过Reactor的方式,可以将用户线程轮询IO操作状态的工作统一交给handle_events事件循环进行处理。用户线程注册事件处理器之后可以继续执行做其他的工作(异步),而Reactor线程负责调用内核的select函数检查socket状态。当有socket被激活时,则通知相应的用户线程(或执行用户线程的回调函数),执行handle_event进行数据读取、处理的工作。由于select函数是阻塞的,因此多路IO复用模型也被称为异步阻塞IO模型。注意,这里的所说的阻塞是指select函数执行时线程被阻塞,而不是指socket。一般在使用IO多路复用模型时,socket都是设置为NONBLOCK的,不过这并不会产生影响,因为用户发起IO请求时,数据已经到达了,用户线程一定不会被阻塞。

Linux支持IO多路复用的系统调用有select、poll、epoll,这些调用都是内核级别的,从 Reactor 和内核的交互方式来看它们都是同步I/O,先是block住等待就绪的socket,再是block住将数据从内核拷贝到用户内存。

Epoll

epoll是在2.6内核中提出的,相对于select和poll来说,epoll更加灵活。epoll使用事件的就绪通知方式,通过epoll_ctl注册fd,一旦该fd就绪,内核就会采用类似callback的回调机制来激活该fd,epoll_wait便可以收到通知。

epoll_create

用来创建一个 epoll 描述符( 就是创建了一个 epoll )

epoll_ctl

操作 epoll 中的 event;可用参数有:

EPOLL_CTL_ADD 添加一个新的epoll事件

EPOLL_CTL_DEL 删除一个epoll事件

EPOLL_CTL_MOD 改变一个事件的监听方式

epoll_wait

就是让 epoll 开始工作,里面有个参数 timeout,当设置为非 0 正整数时,会监听(阻塞) timeout 秒;设置为 0 时立即返回,设置为 -1 时一直监听。

在监听时有数据活跃的连接时其返回活跃的文件句柄列表。

3. Tornado 异步非阻塞 IO

Tornado AsyncHTTPClient() 实例演示


In [8]:
#!/usr/bin/python
# coding: utf-8
"""
File: demo.py
Author: zhangxu01 <zhangxu01@zhihu.com>
Date: 2017-08-28 22:59
Description: demo
"""
import random
import time
import urllib

import requests
import tornado

from tornado import gen, web
from tornado.httpclient import AsyncHTTPClient, HTTPRequest


@gen.coroutine
def async_fetch(url, method="GET", data=None, timeout=2 * 30,):
    request = url
    if isinstance(url, HTTPRequest):
        url = request.url
    if not isinstance(url, HTTPRequest):
        kwargs = {
            "connect_timeout": timeout,
            "request_timeout": timeout,
        }
        if data:
            if method == "GET":
                url += '?' + urllib.urlencode(data)
            else:
                kwargs["body"] = urllib.urlencode(data)
        request = HTTPRequest(url, method, **kwargs)

    http_client = AsyncHTTPClient()
    response = yield http_client.fetch(request)
    raise gen.Return(response)


@gen.coroutine
def service_method():
    url = "http://127.0.0.1:2345/api/block2"
    data = {
        "time": str(time.time())
    }
    response = yield async_fetch(url, data=data)
    raise gen.Return(response)


class NoBlockHandler(tornado.web.RequestHandler):

    @web.asynchronous
    @gen.coroutine
    def get(self):
        result = yield service_method()
        self.write(result.body)
        self.finish()


class BlockHandler(tornado.web.RequestHandler):

    def get(self):
        begin_time = time.time()
        response = requests.get("http://127.0.0.1:2345/api/block2", data={"time": str(begin_time)})
        self.write(response.content)
        self.finish()


class BlockHandler2(tornado.web.RequestHandler):

    def get(self):
        req_time = self.get_argument("time", "")
        sleep_time = random.randint(10, 500) * 0.001
        time.sleep(sleep_time)
        self.write("b:{},s:{} => e:{}".format(req_time, sleep_time, time.time()))
        self.finish()


class Application(tornado.web.Application):

    def __init__(self):
        settings = {
            "xsrf_cookies": False,
        }
        handlers = [
            (r"/api/noblock", NoBlockHandler),
            (r"/api/block", BlockHandler),
            (r"/api/block2", BlockHandler2),
        ]
        tornado.web.Application.__init__(self, handlers, **settings)


def main():
    """ main"""
    tornado.httpserver.HTTPServer(Application()).listen(2345)
    tornado.ioloop.IOLoop.instance().start()

if __name__ == "__main__":
    pass
    # main()

Tornado IOLoop 的调度方式

epoll 操作

epoll_ctl:这个三个方法分别对应 epoll_ctl 中的 add 、 modify 、 del 参数。 所以这三个方法实现了 epoll 的 epoll_ctl 。

epoll_create:然后 epoll 的生成在前文 EPollIOLoop 的初始化中就已经完成了:super(EPollIOLoop, self).initialize(impl=select.epoll(), **kwargs)。 这个相当于 epoll_create 。

epoll_wait:epoll_wait 操作则在 start() 中:event_pairs = self._impl.poll(poll_timeout)

epoll_close:而 epoll 的 close 则在 PollIOLoop 中的 close 方法内调用:self._impl.close()完成。

IOLoop 的核心调度集中在 start() 方法中。

start 方法中主要分三个部分:一个部分是对超时的相关处理;一部分是 epoll 事件通知阻塞、接收;一部分是对 epoll 返回I/O事件的处理。


In [9]:
class PollIOLoop(IOLoop):

    def initialize(self, impl, time_func=None, **kwargs):
        super(PollIOLoop, self).initialize(**kwargs)
        self._impl = impl
        if hasattr(self._impl, 'fileno'):
            set_close_exec(self._impl.fileno())
        self.time_func = time_func or time.time
        self._handlers = {}
        self._events = {}
        self._callbacks = collections.deque()  # 双端队列
        self._timeouts = []
        self._cancellations = 0
        self._running = False
        self._stopped = False
        self._closing = False
        self._thread_ident = None
        self._pid = os.getpid()
        self._blocking_signal_threshold = None
        self._timeout_counter = itertools.count()
        self._waker = Waker()
        self.add_handler(self._waker.fileno(),
                         lambda fd, events: self._waker.consume(),
                         self.READ)

    @classmethod
    def configurable_base(cls):
        return PollIOLoop

    @classmethod
    def configurable_default(cls):
        """根据系统环境选择"""
        if hasattr(select, "epoll"):
            from tornado.platform.epoll import EPollIOLoop
            return EPollIOLoop
        if hasattr(select, "kqueue"):
            # Python 2.6+ on BSD or Mac
            from tornado.platform.kqueue import KQueueIOLoop
            return KQueueIOLoop
        from tornado.platform.select import SelectIOLoop
        return SelectIOLoop

    def close(self, all_fds=False):
        self._closing = True
        self.remove_handler(self._waker.fileno())
        if all_fds:
            for fd, handler in list(self._handlers.values()):
                self.close_fd(fd)
        self._waker.close()
        self._impl.close()
        self._callbacks = None
        self._timeouts = None

    def add_handler(self, fd, handler, events):
        """注册事件"""
        fd, obj = self.split_fd(fd)
        self._handlers[fd] = (obj, stack_context.wrap(handler))
        self._impl.register(fd, events | self.ERROR)

    def update_handler(self, fd, events):
        """更新事件"""
        fd, obj = self.split_fd(fd)
        self._impl.modify(fd, events | self.ERROR)

    def remove_handler(self, fd):
        """删除事件"""
        fd, obj = self.split_fd(fd)
        self._handlers.pop(fd, None)
        self._events.pop(fd, None)
        try:
            self._impl.unregister(fd)
        except Exception:
            gen_log.debug("Error deleting fd from IOLoop", exc_info=True)

    def start(self):

        # 判断运行状态
        if self._running:
            raise RuntimeError("IOLoop is already running")
        if os.getpid() != self._pid:
            raise RuntimeError("Cannot share PollIOLoops across processes")
        self._setup_logging()
        if self._stopped:
            self._stopped = False
            return

        # 启动当前实例
        old_current = getattr(IOLoop._current, "instance", None)
        IOLoop._current.instance = self
        self._thread_ident = thread.get_ident()
        self._running = True

        try:
            while True:
                ncallbacks = len(self._callbacks)  # 活动队列的长度
                due_timeouts = []  # 存放这个周期已过期且有回调的任务

                # 处理超时的任务
                if self._timeouts:
                    now = self.time()
                    while self._timeouts:  # 最小堆
                        # 处理掉超时的
                        if self._timeouts[0].callback is None: # todo ?
                            heapq.heappop(self._timeouts)
                            self._cancellations -= 1
                        elif self._timeouts[0].deadline <= now:
                            due_timeouts.append(heapq.heappop(self._timeouts))
                        else:
                            break
                    # 当超时计数器超过 512 并且大于超时队列一半的长度时,计数器归零,并弹出所有超时无 callback 任务
                    if (self._cancellations > 512 and
                            self._cancellations > (len(self._timeouts) >> 1)):
                        self._cancellations = 0
                        self._timeouts = [x for x in self._timeouts
                                          if x.callback is not None]
                        heapq.heapify(self._timeouts)  # 最小堆化处理

                # 运行所有的可回调任务
                for i in range(ncallbacks):
                    self._run_callback(self._callbacks.popleft())
                # 运行所有已过期的任务
                for timeout in due_timeouts:
                    if timeout.callback is not None:
                        self._run_callback(timeout.callback)
                due_timeouts = timeout = None

                # 这里设置等待时间
                if self._callbacks:
                    poll_timeout = 0.0
                elif self._timeouts:
                    poll_timeout = self._timeouts[0].deadline - self.time()
                    poll_timeout = max(0, min(poll_timeout, _POLL_TIMEOUT))
                else:
                    poll_timeout = _POLL_TIMEOUT

                # 检查中断状态
                if not self._running:
                    break

                if self._blocking_signal_threshold is not None:
                    signal.setitimer(signal.ITIMER_REAL, 0, 0)

                try:
                    # 通过 self._impl.poll(poll_timeout) 进行事件阻塞,当有事件通知或超时时 poll 返回特定的 event_pairs。
                    event_pairs = self._impl.poll(poll_timeout)  # 获取返回的活跃事件队列
                except Exception as e:
                    if errno_from_exception(e) == errno.EINTR:
                        continue
                    else:
                        raise

                if self._blocking_signal_threshold is not None:
                    signal.setitimer(signal.ITIMER_REAL,
                                     self._blocking_signal_threshold, 0)

                self._events.update(event_pairs)  # 将活跃事件加入 _events
                # 循环处理活跃事件
                # epoll 返回通知事件后将新事件加入待处理队列,将就绪事件逐个弹出,通过stack_context.wrap(handler)保存的可执行对象调用事件处理。
                while self._events:
                    fd, events = self._events.popitem()
                    try:
                        fd_obj, handler_func = self._handlers[fd]
                        handler_func(fd_obj, events)
                    except (OSError, IOError) as e:
                        if errno_from_exception(e) == errno.EPIPE:
                            # Happens when the client closes the connection
                            pass
                        else:
                            self.handle_callback_exception(self._handlers.get(fd))
                    except Exception:
                        self.handle_callback_exception(self._handlers.get(fd))
                fd_obj = handler_func = None

        finally:
            # reset the stopped flag so another start/stop pair can be issued
            self._stopped = False
            if self._blocking_signal_threshold is not None:
                signal.setitimer(signal.ITIMER_REAL, 0, 0)
            IOLoop._current.instance = old_current
            if old_wakeup_fd is not None:
                signal.set_wakeup_fd(old_wakeup_fd)

    def stop(self):
        self._running = False
        self._stopped = True
        self._waker.wake()

    def time(self):
        return self.time_func()

    def call_at(self, deadline, callback, *args, **kwargs):
        timeout = _Timeout(
            deadline,
            functools.partial(stack_context.wrap(callback), *args, **kwargs),
            self)
        heapq.heappush(self._timeouts, timeout)
        return timeout

    def remove_timeout(self, timeout):
        # Removing from a heap is complicated, so just leave the defunct
        # timeout object in the queue (see discussion in
        # http://docs.python.org/library/heapq.html).
        # If this turns out to be a problem, we could add a garbage
        # collection pass whenever there are too many dead timeouts.
        timeout.callback = None
        self._cancellations += 1

    def add_callback(self, callback, *args, **kwargs):
        if self._closing:
            return
        # Blindly insert into self._callbacks. This is safe even
        # from signal handlers because deque.append is atomic.
        self._callbacks.append(functools.partial(
            stack_context.wrap(callback), *args, **kwargs))
        if thread.get_ident() != self._thread_ident:
            # This will write one byte but Waker.consume() reads many
            # at once, so it's ok to write even when not strictly
            # necessary.
            self._waker.wake()
        else:
            # If we're on the IOLoop's thread, we don't need to wake anyone.
            pass

    def add_callback_from_signal(self, callback, *args, **kwargs):
        with stack_context.NullContext():
            self.add_callback(callback, *args, **kwargs)



NameErrorTraceback (most recent call last)
<ipython-input-9-776d0962cf59> in <module>()
----> 1 class PollIOLoop(IOLoop):
      2 
      3     def initialize(self, impl, time_func=None, **kwargs):
      4         super(PollIOLoop, self).initialize(**kwargs)
      5         self._impl = impl

NameError: name 'IOLoop' is not defined

In [ ]:
# todo future, reactor, promise, ioloop协作,callback,future,之间的关系;twisted,gevent关联区别,应用场景,优缺点。

沛然

陈思

张旭


In [10]:
%%latex


UsageError: %%latex is a cell magic, but the cell body is empty.

In [ ]: