In [ ]:
#!/usr/bin/python
# coding: utf-8
"""
File: demo.py
Author: zhangxu01 <zhangxu01@zhihu.com>
Date: 2017-08-28 22:59
Description: demo
"""
import random
import time
import urllib
import requests
import tornado
from tornado import gen, web
from tornado.httpclient import AsyncHTTPClient, HTTPRequest
@gen.coroutine
def async_fetch(url, method="GET", data=None, timeout=2 * 30,):
request = url
if isinstance(url, HTTPRequest):
url = request.url
if not isinstance(url, HTTPRequest):
kwargs = {
"connect_timeout": timeout,
"request_timeout": timeout,
}
if data:
if method == "GET":
url += '?' + urllib.urlencode(data)
else:
kwargs["body"] = urllib.urlencode(data)
request = HTTPRequest(url, method, **kwargs)
http_client = AsyncHTTPClient()
response = yield http_client.fetch(request)
raise gen.Return(response)
@gen.coroutine
def service_method():
url = "http://127.0.0.1:2345/api/time"
data = {
"time": str(time.time())
}
response = yield async_fetch(url, data=data)
raise gen.Return(response)
class NoBlockHandler(tornado.web.RequestHandler):
@web.asynchronous
@gen.coroutine
def get(self):
result = yield service_method()
self.write(result.body)
self.finish()
class BlockHandler(tornado.web.RequestHandler):
def get(self):
begin_time = time.time()
response = requests.get("http://127.0.0.1:2345/api/time", data={"time": str(begin_time)})
self.write(response.content)
self.finish()
class TimeHandler(tornado.web.RequestHandler):
def get(self):
req_time = self.get_argument("time", "")
sleep_time = random.randint(10, 500) * 0.001
time.sleep(sleep_time)
self.write("b:{},s:{} => e:{}".format(req_time, sleep_time, time.time()))
self.finish()
class Application(tornado.web.Application):
def __init__(self):
settings = {
"xsrf_cookies": False,
}
handlers = [
(r"/api/noblock", NoBlockHandler), # 非阻塞 IO 请求
(r"/api/block", BlockHandler), # 阻塞 IO 请求
(r"/api/time", TimeHandler), # 被请求接口
]
tornado.web.Application.__init__(self, handlers, **settings)
def main():
""" main"""
tornado.httpserver.HTTPServer(Application()).listen(2345)
tornado.ioloop.IOLoop.instance().start()
if __name__ == "__main__":
main()
主要用来处理任务调度
def start(self):
# do something...
try:
while True:
# do something...
try:
event_pairs = self._impl.poll(poll_timeout)
except Exception as e:
# do something...
pass
self._events.update(event_pairs)
while self._events:
fd, events = self._events.popitem()
try:
fd_obj, handler_func = self._handlers[fd]
handler_func(fd_obj, events)
except (OSError, IOError) as e:
# do something...
pass
except Exception:
# do something...
pass
finally:
# do something...
self._impl.poll(poll_timeout)
会接收请求,等待期间阻塞状态,超时会进入下一次轮训继续等待。
handler_func(fd_obj, events)
会去处理接收的请求,通过HTTPServer -> TCPServer -> gen.coroutine
主要用来将一个 IOLoop 任务拆分成多个异步子任务
def coroutine(func, replace_callback=True):
@functools.wraps(wrapped)
def wrapper(*args, **kwargs):
future = TracebackFuture()
# do something...
try:
result = func(*args, **kwargs)
except Exception:
# do something...
pass
else:
if isinstance(result, GeneratorType):
try:
yielded = next(result)
# do something...
except Exception:
future.set_exc_info(sys.exc_info())
else:
Runner(result, future, yielded)
yielded = None
try:
return future
finally:
future = None
future.set_result(result)
return future
return wrapper
def am():
abc = 1234
try:
print "try"
return abc
finally:
print "finally"
abc = None
print "out"
return abc
am()
try
finally
Out[19]: 1234
同步是指用户线程发起IO请求后需要等待或者轮询内核IO操作完成后才能继续执行;而异步是指用户线程发起IO请求后仍继续执行,当内核IO操作完成后会通知用户线程,或者调用用户线程注册的回调函数。
阻塞是指IO操作需要彻底完成后才返回到用户空间;而非阻塞是指IO操作被调用后立即返回给用户一个状态值,无需等到IO操作彻底完成。
同步和异步关注的是消息通信机制;阻塞和非阻塞关注的是程序在等待调用结果(消息,返回值)时的状态。
同步,就是在发出一个『调用』时,在没有得到结果之前,该『调用』就不返回。但是一旦调用返回,就得到返回值了。换句话说,就是由『调用者』主动等待这个『调用』的结果。异步则是相反,『调用』在发出之后,这个调用就直接返回了,所以没有返回结果。换句话说,当一个异步过程调用发出后,调用者不会立刻得到结果。而是在『调用』发出后,『被调用者』通过状态、通知来通知调用者,或通过回调函数处理这个调用。
阻塞调用是指调用结果返回之前,当前线程会被挂起,调用线程只有在得到结果之后才会返回。非阻塞调用指在不能立刻得到结果之前,该调用不会阻塞当前线程。
现在操作系统都是采用虚拟存储器,那么对32位操作系统而言,它的寻址空间(虚拟存储空间)为4G(2的32次方)。操作系统的核心是内核,独立于普通的应用程序,可以访问受保护的内存空间,也有访问底层硬件设备的所有权限。为了保证用户进程不能直接操作内核(kernel),保证内核的安全,操心系统将虚拟空间划分为两部分,一部分为内核空间,一部分为用户空间。针对linux操作系统而言,将最高的1G字节(从虚拟地址0xC0000000到0xFFFFFFFF),供内核使用,称为内核空间,而将较低的3G字节(从虚拟地址0x00000000到0xBFFFFFFF),供各个进程使用,称为用户空间。
又被称作标准 I/O,大多数文件系统的默认 I/O 操作都是缓存 I/O。在 Linux 的缓存 I/O 机制中,操作系统会将 I/O 的数据缓存在文件系统的页缓存( page cache )中,也就是说,数据会先被拷贝到操作系统内核的缓冲区中,然后才会从操作系统内核的缓冲区拷贝到应用程序的地址空间。
用户线程通过系统调用read发起IO读操作,由用户空间转到内核空间。内核等到数据包到达后,然后将接收的数据拷贝到用户空间,完成read操作。用户需要等待read将socket中的数据读取到buffer后,才继续处理接收的数据。整个IO请求的过程中,用户线程是被阻塞的,这导致用户在发起IO请求时,不能做任何事情,对CPU的资源利用率不够。
同步非阻塞IO是在同步阻塞IO的基础上,将socket设置为NONBLOCK。这样做用户线程可以在发起IO请求后可以立即返回。用户需要不断地调用read,尝试读取socket中的数据,直到读取成功后,才继续处理接收的数据。整个IO请求的过程中,虽然用户线程每次发起IO请求后可以立即返回,但是为了等到数据,仍需要不断地轮询、重复请求,消耗了大量的CPU的资源。一般很少直接使用这种模型,而是在其他IO模型中使用非阻塞IO这一特性。
异步IO模型中,用户线程直接使用内核提供的异步IO API发起read请求,且发起后立即返回,继续执行用户线程代码。不过此时用户线程已经将调用的AsynchronousOperation和CompletionHandler注册到内核,然后操作系统开启独立的内核线程去处理IO操作。当read请求的数据到达时,由内核负责读取socket中的数据,并写入用户指定的缓冲区中。最后内核将read的数据和用户线程注册的CompletionHandler分发给内部Proactor,Proactor将IO完成的信息通知给用户线程(一般通过调用用户线程注册的完成事件处理函数),完成异步IO。
IO多路复用模型是建立在内核提供的多路分离函数select基础之上的,使用select函数可以避免同步非阻塞IO模型中轮询等待的问题。通过Reactor的方式,可以将用户线程轮询IO操作状态的工作统一交给handle_events事件循环进行处理。用户线程注册事件处理器之后可以继续执行做其他的工作(异步),而Reactor线程负责调用内核的select函数检查socket状态。当有socket被激活时,则通知相应的用户线程(或执行用户线程的回调函数),执行handle_event进行数据读取、处理的工作。由于select函数是阻塞的,因此多路IO复用模型也被称为异步阻塞IO模型。注意,这里的所说的阻塞是指select函数执行时线程被阻塞,而不是指socket。一般在使用IO多路复用模型时,socket都是设置为NONBLOCK的,不过这并不会产生影响,因为用户发起IO请求时,数据已经到达了,用户线程一定不会被阻塞。
Linux支持IO多路复用的系统调用有select、poll、epoll,这些调用都是内核级别的,从 Reactor 和内核的交互方式来看它们都是同步I/O,先是block住等待就绪的socket,再是block住将数据从内核拷贝到用户内存。
epoll是在2.6内核中提出的,相对于select和poll来说,epoll更加灵活。epoll使用事件的就绪通知方式,通过epoll_ctl注册fd,一旦该fd就绪,内核就会采用类似callback的回调机制来激活该fd,epoll_wait便可以收到通知。
用来创建一个 epoll 描述符( 就是创建了一个 epoll )
操作 epoll 中的 event;可用参数有:
EPOLL_CTL_ADD 添加一个新的epoll事件
EPOLL_CTL_DEL 删除一个epoll事件
EPOLL_CTL_MOD 改变一个事件的监听方式
就是让 epoll 开始工作,里面有个参数 timeout,当设置为非 0 正整数时,会监听(阻塞) timeout 秒;设置为 0 时立即返回,设置为 -1 时一直监听。
在监听时有数据活跃的连接时其返回活跃的文件句柄列表。
In [8]:
#!/usr/bin/python
# coding: utf-8
"""
File: demo.py
Author: zhangxu01 <zhangxu01@zhihu.com>
Date: 2017-08-28 22:59
Description: demo
"""
import random
import time
import urllib
import requests
import tornado
from tornado import gen, web
from tornado.httpclient import AsyncHTTPClient, HTTPRequest
@gen.coroutine
def async_fetch(url, method="GET", data=None, timeout=2 * 30,):
request = url
if isinstance(url, HTTPRequest):
url = request.url
if not isinstance(url, HTTPRequest):
kwargs = {
"connect_timeout": timeout,
"request_timeout": timeout,
}
if data:
if method == "GET":
url += '?' + urllib.urlencode(data)
else:
kwargs["body"] = urllib.urlencode(data)
request = HTTPRequest(url, method, **kwargs)
http_client = AsyncHTTPClient()
response = yield http_client.fetch(request)
raise gen.Return(response)
@gen.coroutine
def service_method():
url = "http://127.0.0.1:2345/api/block2"
data = {
"time": str(time.time())
}
response = yield async_fetch(url, data=data)
raise gen.Return(response)
class NoBlockHandler(tornado.web.RequestHandler):
@web.asynchronous
@gen.coroutine
def get(self):
result = yield service_method()
self.write(result.body)
self.finish()
class BlockHandler(tornado.web.RequestHandler):
def get(self):
begin_time = time.time()
response = requests.get("http://127.0.0.1:2345/api/block2", data={"time": str(begin_time)})
self.write(response.content)
self.finish()
class BlockHandler2(tornado.web.RequestHandler):
def get(self):
req_time = self.get_argument("time", "")
sleep_time = random.randint(10, 500) * 0.001
time.sleep(sleep_time)
self.write("b:{},s:{} => e:{}".format(req_time, sleep_time, time.time()))
self.finish()
class Application(tornado.web.Application):
def __init__(self):
settings = {
"xsrf_cookies": False,
}
handlers = [
(r"/api/noblock", NoBlockHandler),
(r"/api/block", BlockHandler),
(r"/api/block2", BlockHandler2),
]
tornado.web.Application.__init__(self, handlers, **settings)
def main():
""" main"""
tornado.httpserver.HTTPServer(Application()).listen(2345)
tornado.ioloop.IOLoop.instance().start()
if __name__ == "__main__":
pass
# main()
epoll_ctl:这个三个方法分别对应 epoll_ctl 中的 add 、 modify 、 del 参数。 所以这三个方法实现了 epoll 的 epoll_ctl 。
epoll_create:然后 epoll 的生成在前文 EPollIOLoop 的初始化中就已经完成了:super(EPollIOLoop, self).initialize(impl=select.epoll(), **kwargs)
。 这个相当于 epoll_create 。
epoll_wait:epoll_wait 操作则在 start() 中:event_pairs = self._impl.poll(poll_timeout)
epoll_close:而 epoll 的 close 则在 PollIOLoop 中的 close 方法内调用:self._impl.close()
完成。
start 方法中主要分三个部分:一个部分是对超时的相关处理;一部分是 epoll 事件通知阻塞、接收;一部分是对 epoll 返回I/O事件的处理。
In [9]:
class PollIOLoop(IOLoop):
def initialize(self, impl, time_func=None, **kwargs):
super(PollIOLoop, self).initialize(**kwargs)
self._impl = impl
if hasattr(self._impl, 'fileno'):
set_close_exec(self._impl.fileno())
self.time_func = time_func or time.time
self._handlers = {}
self._events = {}
self._callbacks = collections.deque() # 双端队列
self._timeouts = []
self._cancellations = 0
self._running = False
self._stopped = False
self._closing = False
self._thread_ident = None
self._pid = os.getpid()
self._blocking_signal_threshold = None
self._timeout_counter = itertools.count()
self._waker = Waker()
self.add_handler(self._waker.fileno(),
lambda fd, events: self._waker.consume(),
self.READ)
@classmethod
def configurable_base(cls):
return PollIOLoop
@classmethod
def configurable_default(cls):
"""根据系统环境选择"""
if hasattr(select, "epoll"):
from tornado.platform.epoll import EPollIOLoop
return EPollIOLoop
if hasattr(select, "kqueue"):
# Python 2.6+ on BSD or Mac
from tornado.platform.kqueue import KQueueIOLoop
return KQueueIOLoop
from tornado.platform.select import SelectIOLoop
return SelectIOLoop
def close(self, all_fds=False):
self._closing = True
self.remove_handler(self._waker.fileno())
if all_fds:
for fd, handler in list(self._handlers.values()):
self.close_fd(fd)
self._waker.close()
self._impl.close()
self._callbacks = None
self._timeouts = None
def add_handler(self, fd, handler, events):
"""注册事件"""
fd, obj = self.split_fd(fd)
self._handlers[fd] = (obj, stack_context.wrap(handler))
self._impl.register(fd, events | self.ERROR)
def update_handler(self, fd, events):
"""更新事件"""
fd, obj = self.split_fd(fd)
self._impl.modify(fd, events | self.ERROR)
def remove_handler(self, fd):
"""删除事件"""
fd, obj = self.split_fd(fd)
self._handlers.pop(fd, None)
self._events.pop(fd, None)
try:
self._impl.unregister(fd)
except Exception:
gen_log.debug("Error deleting fd from IOLoop", exc_info=True)
def start(self):
# 判断运行状态
if self._running:
raise RuntimeError("IOLoop is already running")
if os.getpid() != self._pid:
raise RuntimeError("Cannot share PollIOLoops across processes")
self._setup_logging()
if self._stopped:
self._stopped = False
return
# 启动当前实例
old_current = getattr(IOLoop._current, "instance", None)
IOLoop._current.instance = self
self._thread_ident = thread.get_ident()
self._running = True
try:
while True:
ncallbacks = len(self._callbacks) # 活动队列的长度
due_timeouts = [] # 存放这个周期已过期且有回调的任务
# 处理超时的任务
if self._timeouts:
now = self.time()
while self._timeouts: # 最小堆
# 处理掉超时的
if self._timeouts[0].callback is None: # todo ?
heapq.heappop(self._timeouts)
self._cancellations -= 1
elif self._timeouts[0].deadline <= now:
due_timeouts.append(heapq.heappop(self._timeouts))
else:
break
# 当超时计数器超过 512 并且大于超时队列一半的长度时,计数器归零,并弹出所有超时无 callback 任务
if (self._cancellations > 512 and
self._cancellations > (len(self._timeouts) >> 1)):
self._cancellations = 0
self._timeouts = [x for x in self._timeouts
if x.callback is not None]
heapq.heapify(self._timeouts) # 最小堆化处理
# 运行所有的可回调任务
for i in range(ncallbacks):
self._run_callback(self._callbacks.popleft())
# 运行所有已过期的任务
for timeout in due_timeouts:
if timeout.callback is not None:
self._run_callback(timeout.callback)
due_timeouts = timeout = None
# 这里设置等待时间
if self._callbacks:
poll_timeout = 0.0
elif self._timeouts:
poll_timeout = self._timeouts[0].deadline - self.time()
poll_timeout = max(0, min(poll_timeout, _POLL_TIMEOUT))
else:
poll_timeout = _POLL_TIMEOUT
# 检查中断状态
if not self._running:
break
if self._blocking_signal_threshold is not None:
signal.setitimer(signal.ITIMER_REAL, 0, 0)
try:
# 通过 self._impl.poll(poll_timeout) 进行事件阻塞,当有事件通知或超时时 poll 返回特定的 event_pairs。
event_pairs = self._impl.poll(poll_timeout) # 获取返回的活跃事件队列
except Exception as e:
if errno_from_exception(e) == errno.EINTR:
continue
else:
raise
if self._blocking_signal_threshold is not None:
signal.setitimer(signal.ITIMER_REAL,
self._blocking_signal_threshold, 0)
self._events.update(event_pairs) # 将活跃事件加入 _events
# 循环处理活跃事件
# epoll 返回通知事件后将新事件加入待处理队列,将就绪事件逐个弹出,通过stack_context.wrap(handler)保存的可执行对象调用事件处理。
while self._events:
fd, events = self._events.popitem()
try:
fd_obj, handler_func = self._handlers[fd]
handler_func(fd_obj, events)
except (OSError, IOError) as e:
if errno_from_exception(e) == errno.EPIPE:
# Happens when the client closes the connection
pass
else:
self.handle_callback_exception(self._handlers.get(fd))
except Exception:
self.handle_callback_exception(self._handlers.get(fd))
fd_obj = handler_func = None
finally:
# reset the stopped flag so another start/stop pair can be issued
self._stopped = False
if self._blocking_signal_threshold is not None:
signal.setitimer(signal.ITIMER_REAL, 0, 0)
IOLoop._current.instance = old_current
if old_wakeup_fd is not None:
signal.set_wakeup_fd(old_wakeup_fd)
def stop(self):
self._running = False
self._stopped = True
self._waker.wake()
def time(self):
return self.time_func()
def call_at(self, deadline, callback, *args, **kwargs):
timeout = _Timeout(
deadline,
functools.partial(stack_context.wrap(callback), *args, **kwargs),
self)
heapq.heappush(self._timeouts, timeout)
return timeout
def remove_timeout(self, timeout):
# Removing from a heap is complicated, so just leave the defunct
# timeout object in the queue (see discussion in
# http://docs.python.org/library/heapq.html).
# If this turns out to be a problem, we could add a garbage
# collection pass whenever there are too many dead timeouts.
timeout.callback = None
self._cancellations += 1
def add_callback(self, callback, *args, **kwargs):
if self._closing:
return
# Blindly insert into self._callbacks. This is safe even
# from signal handlers because deque.append is atomic.
self._callbacks.append(functools.partial(
stack_context.wrap(callback), *args, **kwargs))
if thread.get_ident() != self._thread_ident:
# This will write one byte but Waker.consume() reads many
# at once, so it's ok to write even when not strictly
# necessary.
self._waker.wake()
else:
# If we're on the IOLoop's thread, we don't need to wake anyone.
pass
def add_callback_from_signal(self, callback, *args, **kwargs):
with stack_context.NullContext():
self.add_callback(callback, *args, **kwargs)
https://segmentfault.com/a/1190000005659237
http://www.cnblogs.com/fanzhidongyzby/p/4098546.html
http://blog.csdn.net/iter_zc/article/details/39291647
https://segmentfault.com/a/1190000003063859
http://blog.csdn.net/feitianxuxue/article/details/8936802
http://blog.csdn.net/lisonglisonglisong/article/details/51328062
http://blog.csdn.net/wyx819/article/details/45420017
In [ ]:
# todo future, reactor, promise, ioloop协作,callback,future,之间的关系;twisted,gevent关联区别,应用场景,优缺点。
沛然
陈思
张旭
In [10]:
%%latex
In [ ]: