说明
因为tornado最新的版本主要推荐使用asyncio async await, 而这篇文章主要探讨早期tornado如何使用generator实现coroutine, 所以这里统一使用tornado 4.5版本
异步和非阻塞io
asynchronous 和 non-blocking这两个概念关系紧密,经常被当作一回事。实际上两者是有差异的,并不是同一个概念
阻塞
一个函数在等待某些执行而没有返回,称为阻塞。阻塞可能有很多原因:网络io,磁盘io,锁等。实际上,每个函数都会阻塞至少一点点,在它运行和使用cpu时(一个极端的例子可以说明cpu的阻塞一样要认真对待: 像bcrypt这样的hashing函数,需要几百毫秒的执行时间,比典型的网络和磁盘io都要慢的多)
一个函数可能在有些部分阻塞,有些不阻塞。比如tornado.httpclient默认在DNS解析时阻塞,在其他网络访问时不阻塞
异步
一个asynchronous函数会在它完成之前返回,一般会触发一些后台的任务,等后台任务完成之后再继续应用中的其他操作(作为对比,同步函数会完成它所有的操作之后才返回)。异步接口有多种风格:
- Callback argument
- Return a placeholder ([
Future
],Promise
,Deferred
) - Deliver to a queue
- Callback registry (e.g. POSIX signals)
无论使用哪种风格,异步函数和它的caller的交互从定义上就是不同的
同步函数
同步函数,直观容易理解
import requests
def synchronous_fetch(url):
response = requests.get(url)
return response.content
回调风格的异步函数及背后的ioloop
异步函数,回调方式
from tornado.httpclient import AsyncHTTPClient
def asynchronous_fetch(url, callback):
http_client = AsyncHTTPClient()
def handle_response(response):
callback(response.body)
http_client.fetch(url, callback=handle_response)
def request_callback(body):
print("request success")
print(body[:10])
if __name__ == '__main__':
asynchronous_fetch("http://www.baidu.com", request_callback)
执行上述代码,并没有print出预期的信息, 这是因为缺少ioloop
实际上asynchronous_fetch调用之后,发起了非阻塞的网络请求,然后就直接返回了。后续等待socket.send和socket.recv完成,及回调request_callback都需要ioloop来完成
上面的代码做一些补充:
if __name__ == '__main__':
asynchronous_fetch("http://www.baidu.com", request_callback)
ioloop = tornado.ioloop.IOLoop.current().start()
重新执行就能看到预期的输出了:
request success
b'<!DOCTYPE '
什么是ioloop呢, 看如下的例子:
import errno
import functools
import tornado.ioloop
import socket
def connection_ready(sock, fd, events):
while True:
try:
connection, address = sock.accept()
except socket.error as e:
if e.args[0] not in (errno.EWOULDBLOCK, errno.EAGAIN):
raise
return
connection.setblocking(0)
handle_connection(connection, address)
def handle_connection(connection, address):
.........
if __name__ == '__main__':
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setblocking(0)
sock.bind(("", port))
sock.listen(128)
io_loop = tornado.ioloop.IOLoop.current() # 单例模式,只创建一个IOLoop
callback = functools.partial(connection_ready, sock)
io_loop.add_handler(sock.fileno(), callback, io_loop.READ)
io_loop.start()
留意这里的socket,都通过sock.setblocking(0)设置为非阻塞
ioloop做的事情就是创建epoll(也可能是kqueue或select ...), 然后add_handler把socket的fd加入epoll,同时记录一个该fd关联的callback,再然后start循环调用epoll wait, 如果发现ready的fd,就调用对应的callback.
tornado的EPollIOLoop的add_handler如下:
def add_handler(self, fd, handler, events):
fd, obj = self.split_fd(fd)
self._handlers[fd] = (obj, stack_context.wrap(handler))
self._impl.register(fd, events | self.ERROR)
其中self._impl = select.epoll()
, 这是python的epoll封装
epoll的c语言api如下:
int epoll_create(int size);
int epoll_create1(int flags); #extends epoll_crate
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event *events,
int maxevents, int timeout);
更多epoll的介绍: https://www.cnblogs.com/Hijack-you/p/13057792.html
接下来,我们尝试补充下上面例子中的handle_connection:
_ERRNO_WOULDBLOCK = (errno.EWOULDBLOCK, errno.EAGAIN)
_CHUNK_SIZE = 1024
class ConnReader(object):
def __init__(self):
self.socket = socket
self.buffer = ""
def _read_from_socket(self):
try:
chunk = self.socket.recv(_CHUNK_SIZE)
except socket.error as e:
if e.args[0] in _ERRNO_WOULDBLOCK:
return None
else:
raise
return chunk
def handle_read(self, fd, events):
while True:
chunk = self.read_from_socket()
self.buffer += chunk
# read block or interupt
if chunk is None:
break
# read finish, remote peer close the socket
if chunk == "":
tornado.ioloop.IOLoop.current().remove_handler(connection.fileno())
print(self.buffer)
这里实现的功能很简单,做为一个server,收到client的连接,就从新建立的connection中读取数据,直到对端关闭socket,然后就print接收到的全部数据。可以看到实现这么一个简单的功能,代码还写的挺复杂,看起来理解起来都比较绕。
tonado对non-blocing的socket做了一层封装, 称作iostream, 目标是能方便的从非阻塞的文件或socket中读写数据.
使用iostream重新实现上面的handle_connection:
import tornado.iostream
def handle_connection(connection, address):
stream = tornado.iostream.IOStream(connection)
def handle_result(data):
print(data)
stream.read_until_close(handle_result)
可以看到简单了很多,iostream屏蔽了各种epoll/socket的细节,提供了统一read/write + callback的回调风格的接口
再看一个client发起http请求的例子:
import tornado.ioloop
import tornado.iostream
import socket
def send_request():
stream.write(b"GET / HTTP/1.0\r\nHost: friendfeed.com\r\n\r\n")
stream.read_until(b"\r\n\r\n", on_headers)
def on_headers(data):
headers = {}
for line in data.split(b"\r\n"):
parts = line.split(b":")
if len(parts) == 2:
headers[parts[0].strip()] = parts[1].strip()
stream.read_bytes(int(headers[b"Content-Length"]), on_body)
def on_body(data):
print(data)
stream.close()
tornado.ioloop.IOLoop.current().stop()
if __name__ == '__main__':
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
stream = tornado.iostream.IOStream(s)
stream.connect(("friendfeed.com", 80), send_request)
tornado.ioloop.IOLoop.current().start()
现在再回头看最开始的异步http请求的例子:
from tornado.httpclient import AsyncHTTPClient
def asynchronous_fetch(url, callback):
http_client = AsyncHTTPClient()
def handle_response(response):
callback(response.body)
http_client.fetch(url, callback=handle_response)
def request_callback(body):
print("request success")
print(body[:10])
if __name__ == '__main__':
asynchronous_fetch("http://www.baidu.com", request_callback)
ioloop = tornado.ioloop.IOLoop.current().start()
这时应该能理解为什么需要有ioloop start, 才能真正执行请求得到结果,以及回调风格的api背后的原理。接下来我们看第二种风格的异步函数
第二种风格的异步函数 Return a placeholder
from tornado.concurrent import Future
def async_fetch_future(url):
http_client = AsyncHTTPClient()
my_future = Future()
def handle_response(response):
my_future.set_result(response.body)
http_client.fetch(url, callback=handle_response)
这里async_fetch_future没有接受一个callback参数,而是返回了一个Future类型的placeholder, 这个函数可以和coroutine一起使用, 代码如下:
import tornado.ioloop
from tornado import gen
from tornado.httpclient import AsyncHTTPClient
from tornado.concurrent import Future
def async_fetch_future(url):
http_client = AsyncHTTPClient()
my_future = Future()
def handle_response(response):
my_future.set_result(response.body)
http_client.fetch(url, callback=handle_response)
return my_future
@gen.coroutine
def call():
body = yield async_fetch_future('http://www.baidu.com')
print(body[:10])
if __name__ == '__main__':
call()
ioloop = tornado.ioloop.IOLoop.current()
ioloop.start()
留意call函数有2个比较特别的地方
- 函数加了装饰器@gen.coroutine
- 调用async_fetch_future时,前面加了yield
具体原因和原理稍后讲,这里先说明下这种风格的异步函数相比回调风格有很大的好处,因为写这种代码的方式几乎是和同步代码一样的,比如我们要链式调用多个http接口:
@gen.coroutine
def call():
body = yield async_fetch_future('http://www.baidu.com')
word = body[:10]
body2 = yield async_fetch_future('http://www.baidu.com?search=word')
print(body2[:10])
if __name__ == '__main__':
call()
ioloop = tornado.ioloop.IOLoop.current()
ioloop.start()
可以看到这里的代码几乎是和同步代码一样的,主要不同就是多了yield关键字
如果用callback的方式,写起来就非常麻烦,读起来也很不直观
这种异步函数之间也可以使用相同的方式相互调用:
@gen.coroutine
def call_inner():
body = yield async_fetch_future('http://www.163.com')
return body[:10]
@gen.coroutine
def call():
body = yield async_fetch_future('http://www.baidu.com')
word = body[:10]
body2 = yield async_fetch_future('http://www.baidu.com?search=word')
body_163 = yield call_inner()
return body2[:10] + ":" + body_163[:10]
if __name__ == '__main__':
call()
ioloop = tornado.ioloop.IOLoop.current()
ioloop.start()
留意上面的程序,我们先调用call函数,再开启ioloop.start, 实际上调用call函数的标准姿势是是用ioloop.spawn_callback(如果coroutine函数是用asyncio的async定义的,必须用这种方式调用, async定义的协程函数必须由await调用), 修改之后如下:
if __name__ == '__main__':
ioloop = tornado.ioloop.IOLoop.current()
ioloop.spawn_callback(call)
ioloop.start()
这里的ioloop.start()启动了死循环,call函数全部执行,程序还是卡在死循环中不会退出,如果想执行完call自动退出,可以改为ioloop.run_sync, ioloop.run_sync会等待函数执行完成之后退出循环,并且会返回函数的结果,如下:
if __name__ == '__main__':
ioloop = tornado.ioloop.IOLoop.current()
call_result = ioloop.run_sync(call)
print(call_result)
yield和generator
包含yield的函数就是generator,看例子:
def gen_example():
print('A')
yield 1
print('B')
yield 2
print('C')
return 'end'
g = gen_example() # 调用函数之后,得到一个generator对象,实际函数中的代码并未执行
print(type(g))
assert 1==next(g) #代码执行到第一个yield返回,并把yield后面的值返回作为next调用的返回值
assert 2 == next(g) #从上次yield离开
#执行最后一个yield之后的代码,抛出StopIteration, 并把函数中的返回值作为StopIteration的value
try:
next(g)
except StopIteration as e:
result = e.value
assert result == 'end'
上面代码的执行结果:
$ python example.py
<class 'generator'>
A
B
C
包含yield关键字的函数,不再是一个顺序执行的函数。直接调用函数并不会执行函数内的任何代码,而是返回一个generator对象,之后在该对象上调用next就从函数开始处的代码执行,遇到yield就把yield后面的值作为结果返回,下次再调用next时从上次yield离开的地方进入,遇到下一个yield返回。 yield全部返回之后,再执行next会抛出StopIteration,并把函数中的return值作为StopIteration的value。
yield有点像return,把后面的值返回给调用值,只是函数中可以多次yield,每次调用next就执行其中的一段代码。
next也可以用generator.send代替,send方法还能向generator中传入值, 比如:
def gen_example():
print('A')
x = yield 1
print('get value fron send: %s' % x)
return 'end'
g = gen_example()
print(type(g))
assert 1==next(g) next调用的返回值
try:
g.send('SEND_X')
except StopIteration as e:
result = e.value
assert result == 'end'
执行结果:
$ python example.py
<class 'generator'>
A
get value fron send: SEND_X
留意上述函数的第二上代码中的x接收了caller通用send发来的值
caller还能通过generator.throw向generator中发送异常, eg:
def gen_example():
print('A')
try:
x = yield 1
except Exception as e:
print('get exception from throw: %s' % e)
raise
print('get value fron send: %s' % x)
return 'end'
g = gen_example()
print(type(g))
assert 1==next(g)
g.throw(Exception('test'))
执行结果:
$ python example.py
<class 'generator'>
A
get exception from throw: test
Traceback (most recent call last):
File "t4.py", line 31, in <module>
g.throw(Exception('test'))
File "t4.py", line 20, in gen_example
x = yield 1
Exception: test
yield和coroutine
@gen.coroutine
decorator 通过yield
expressions和generator沟通,通过返回一个Future
和调用者沟通
下面是一个简化版本的decorator的内部循环:
def run(self):
# send(x) makes the current yield return x.
# It returns when the next yield is reached
future = self.gen.send(self.next)
def callback(f):
self.next = f.result()
self.run()
future.add_done_callback(callback)
decorator从generator中接收了一个Future
, 然后等待这个Future
完成,然后展开这个Future
并把结果发回generator作为yield表达式的结果。
概括来说@gen.coroutine的作用就是执行/展开generator(通过next或send), 并向调用者返回一个Future
我们来看下tornado gen.coroutine的具体实现:
gen.coroutine 的关键代码
def _make_coroutine_wrapper(func, replace_callback):
"""The inner workings of ``@gen.coroutine`` and ``@gen.engine``.
The two decorators differ in their treatment of the ``callback``
argument, so we cannot simply implement ``@engine`` in terms of
``@coroutine``.
"""
# On Python 3.5, set the coroutine flag on our generator, to allow it
# to be used with 'await'.
wrapped = func
if hasattr(types, 'coroutine'):
func = types.coroutine(func)
@functools.wraps(wrapped)
def wrapper(*args, **kwargs):
future = TracebackFuture()
if replace_callback and 'callback' in kwargs:
callback = kwargs.pop('callback')
IOLoop.current().add_future(
future, lambda future: callback(future.result()))
try:
result = func(*args, **kwargs)
except (Return, StopIteration) as e:
result = _value_from_stopiteration(e)
except Exception:
future.set_exc_info(sys.exc_info())
return future
else:
if isinstance(result, GeneratorType):
# Inline the first iteration of Runner.run. This lets us
# avoid the cost of creating a Runner when the coroutine
# never actually yields, which in turn allows us to
# use "optional" coroutines in critical path code without
# performance penalty for the synchronous case.
try:
orig_stack_contexts = stack_context._state.contexts
yielded = next(result)
if stack_context._state.contexts is not orig_stack_contexts:
yielded = TracebackFuture()
yielded.set_exception(
stack_context.StackContextInconsistentError(
'stack_context inconsistency (probably caused '
'by yield within a "with StackContext" block)'))
except (StopIteration, Return) as e:
future.set_result(_value_from_stopiteration(e))
except Exception:
future.set_exc_info(sys.exc_info())
else:
_futures_to_runners[future] = Runner(result, future, yielded)
yielded = None
try:
return future
finally:
# Subtle memory optimization: if next() raised an exception,
# the future's exc_info contains a traceback which
# includes this stack frame. This creates a cycle,
# which will be collected at the next full GC but has
# been shown to greatly increase memory usage of
# benchmarks (relative to the refcount-based scheme
# used in the absence of cycles). We can avoid the
# cycle by clearing the local variable after we return it.
future = None
future.set_result(result)
return future
我们看下上面代码的一些关键部分:
- 第16行,创建了一个
Future
对象,作为整个整个协程函数调用返回值的placeholder - 第24行,表示调用原始的函数,如果函数中有yield,这里的result就是一个generator对象
- 如果没有异常,接下里会执行第31行及后面的分支代码
- 接下来到第39行,使用next方法,首次对generator执行迭代,即执行原始函数开始部分到第一个yield之间的语句,返回到yielded就是函数中第一个yield后面的表达式的值。注意yield后面的expresston的值必须是一个可等待的对象,典型的值是
Future
对象,否则会抛异常 - 接下来如果代码没有异常,会执行到第51行,调用Runner(下面贴出了runner的代码)
- runner init中的几个参数明确下, gen: 原始函数调用之后返回的generator对象, result_future: 整个协程函数调用返回值的placeholder, first_yield: 就是上面首次调用next的返回值,典型值是
Future
对象 - 继续看runner中第14行中调用了handle_yield, 该函数做了很多兼容处理(处理YieldPoint, list, dict等,支持多个可等待对象),最终会把yielded统一转为
Future
对象,赋给self.future, 典型代码第89行:self.future = convert_yielded(yielded)
。 接下里的关键逻辑是94行到102行,这里先判断self.future是否已经完成,典型的异步调用场景,这里会是未完成。 - 先看self.future未完成(还没有通过future.set_result设置结果), 会调用self.io_loop.add_future给self.future设置回调,并返回Flase,而第14行这里检查到False,就不会执行下面的self.run, 而是直接返回。而runner的调用者gen.coroutine代码的第51行也会返回,再到第54行,返回整个协程函数的future。至此,整个协程函数的调用返回了一个空的
Future
对象,而协程函数中的代码执行到第一个yield expression之后中断了 - 总结下,调用gen.coroutine装饰的协程函数的典型执行流程: 执行到函数中的第一个yield, 返回一个空
Future
placeholder. - 之后的执行由ioloop和Runner驱动,再回到runner中94行到102行,self.io_loop.add_future这里是关键,这里future对象后面的异步调用完成之后,就会调用self.future.set_result, 并触发这里的回调函数inner, 然后再次执行self.run
- self.run的关键代码看36行和第50行, 低36行把yield后面返回的Future对象展开,获取其中的result,然后第50行调用self.gen.send把result发给generator, generator拿到异步调用的结果,并继续执行直到下一个yield, 如此循环,直到函数执行结束,抛出StopIteration, 第57行捕获该异常, 取StopIteration的value给整个协程函数的
Future
占位符set_result(第68行). 至此协程函数的调用和执行全部完成 - 再看runner代码中第94行到102行,这里判断self.future是否已经完成,如果已经完成直接返回True,可以认为是一些特殊情况,比如yield后面调用的异步函数执行更快,在检查之前完成了,不用再等异步函数的回调, 继续执行self.run就可以了
- 关于异常处理: 协程函数执行过程中的Exception不会直接抛出,而是统一捕获,设置到
Future
中(从gen.coroutine的第28行可以看到),调用某个协程函数过程中展开Future
对象,如果结果是Exception,则通过gen.throw发送caller的generator中(可以从runner代码的第44行看到)
runner的关键代码
class Runner(object):
"""Internal implementation of `tornado.gen.engine`.
Maintains information about pending callbacks and their results.
The results of the generator are stored in ``result_future`` (a
`.TracebackFuture`)
"""
def __init__(self, gen, result_future, first_yielded):
self.gen = gen
self.result_future = result_future
self.future = _null_future
#.......... 省略 ............ #
if self.handle_yield(first_yielded):
gen = result_future = first_yielded = None
self.run()
def run(self):
"""Starts or resumes the generator, running until it reaches a
yield point that is not ready.
"""
if self.running or self.finished:
return
try:
self.running = True
while True:
future = self.future
if not future.done():
return
self.future = None
try:
orig_stack_contexts = stack_context._state.contexts
exc_info = None
try:
value = future.result()
except Exception:
self.had_exception = True
exc_info = sys.exc_info()
future = None
if exc_info is not None:
try:
yielded = self.gen.throw(*exc_info)
finally:
# Break up a reference to itself
# for faster GC on CPython.
exc_info = None
else:
yielded = self.gen.send(value)
if stack_context._state.contexts is not orig_stack_contexts:
self.gen.throw(
stack_context.StackContextInconsistentError(
'stack_context inconsistency (probably caused '
'by yield within a "with StackContext" block)'))
except (StopIteration, Return) as e:
self.finished = True
self.future = _null_future
if self.pending_callbacks and not self.had_exception:
# If we ran cleanly without waiting on all callbacks
# raise an error (really more of a warning). If we
# had an exception then some callbacks may have been
# orphaned, so skip the check in that case.
raise LeakedCallbackError(
"finished without waiting for callbacks %r" %
self.pending_callbacks)
self.result_future.set_result(_value_from_stopiteration(e))
self.result_future = None
self._deactivate_stack_context()
return
# .......... 省略 ........... #
if not self.handle_yield(yielded):
return
yielded = None
finally:
self.running = False
def handle_yield(self, yielded):
# Lists containing YieldPoints require stack contexts;
# other lists are handled in convert_yielded.
if _contains_yieldpoint(yielded):
yielded = multi(yielded)
if isinstance(yielded, YieldPoint):
#.......... 省略 ..........
else:
try:
self.future = convert_yielded(yielded)
except BadYieldError:
self.future = TracebackFuture()
self.future.set_exc_info(sys.exc_info())
if not self.future.done() or self.future is moment:
def inner(f):
# Break a reference cycle to speed GC.
f = None # noqa
self.run()
self.io_loop.add_future(
self.future, inner)
return False
return True
coroutine总结
结合一个例子描述coroutine的调用过程,算是总结
import tornado.ioloop
from tornado.httpclient import AsyncHTTPClient
from tornado.concurrent import Future
from tornado import gen
def async_fetch_future(url):
http_client = AsyncHTTPClient()
my_future = Future()
def handle_response(response):
my_future.set_result(response.body)
http_client.fetch(url, callback=handle_response)
return my_future
@gen.coroutine
def call_inner():
body = yield async_fetch_future('http://www.163.com')
return body[:10]
@gen.coroutine
def call():
body = yield async_fetch_future('http://www.baidu.com')
word = body[:10]
body2 = yield async_fetch_future('http://www.baidu.com?search=word')
body_163 = yield call_inner()
return body2[:10] + b":" + body_163[:10]
if __name__ == '__main__':
ret = tornado.ioloop.IOLoop.current().run_sync(call)
print(ret)
- 从ioloop的run_sync开始,call函数交给ioloop在后台执行
- ioloop.start()开始事件循环
- 调用call会返回一个
Future
对象作为整个协程函数调用的返回值placeholder, run_sync通过闭包变量持有该Future
的引用 - 协程函数首次执行到第一个yield处async_fetch_future调用之后中断(第21行)
- async_fetch_future是个异步网络请求函数,在背后的ioloop, iostream等帮助下,在后台完成各种网络往返之后,得到请求的最终结果,触发结果回调函数handle_response(第9行)
- 第9行的handle_response会执行my_future.set_result(response.body),再次触发回调, 执行call协程的gen.send,并且会把my_future.result()作为send的参数。 call函数基础执行,这时第21行的body就是第一个网络请求响应的body了
- call函数继续执行到第2个yield处中断(第23行), 等待异步函数完成,通过回调继续执行call函数,这时body2拿到了第二个网络请求的响应body
- 第三个yield后边是调用了一个协程函数call_inner, 执行逻辑是一样的,call_inner也会返回一个
Future
对象,协程函数按层级由外到内调用,每一层都是执行到yield处中断,返回Future
对象。之后通过Future
从内向外逐层回调恢复外层函数的执行流,并传递返回值
通过上面的分析,可以明确知道所有协程函数都要加上@gen.coroutine逐步,并且在协程函数中调用其他协程函数或可等待对象时都要加上yield关键字
补充
阻塞函数
因为协程的方式在一个线程上通过非阻塞io + ioloop + 回调的方式实现并发, 所有的代码执行都是在同一个线程上执行的。所以中间不能有任何阻塞函数的调用,比如time.sleep(10)
会导致整个线程卡住10s,无法执行其他任何代码,作为一个server就意味着无法响应任何请求。 time.sleep
可以用gen.sleep
代替,避免阻塞, eg:
from tornado import gen
@gen.coroutine
def call_sleep():
gen.sleep(10)
print('sleep end')
如果没有可用的异步函数,调用阻塞io的函数,可以在让阻塞函数在另外一个线程中执行,使用线程池的例子如下:
thread_pool = ThreadPoolExecutor(4)
@gen.coroutine
def call_blocking():
yield thread_pool.submit(blocking_func, args)
并发
gen.coroutine支持值是Future
的list和dict,可以并行等待所有的Future
对象:
@gen.coroutine
def parallel_fetch(url1, url2):
resp1, resp2 = yield [http_client.fetch(url1),
http_client.fetch(url2)]
@gen.coroutine
def parallel_fetch_many(urls):
responses = yield [http_client.fetch(url) for url in urls]
# responses is a list of HTTPResponses in the same order
@gen.coroutine
def parallel_fetch_dict(urls):
responses = yield {url: http_client.fetch(url)
for url in urls}
# responses is a dict {url: HTTPResponse}
调用callback风格的接口
调用callback风格的异步接口,可以用Task包装一下,eg:
@gen.coroutine
def call_task():
# Note that there are no parens on some_function.
# This will be translated by Task into
# some_function(other_args, callback=callback)
yield gen.Task(some_function, other_args)
留意这里要求some_function的回调参数名必须是callback, 并且Task调用返回的Future
对象的result是callback函数接收的所有参数, 如果有多个参数的话,包装位一个Arguments对象:
Arguments = collections.namedtuple('Arguments', ['args', 'kwargs'])