tornado的协程是如何实现的

说明

因为tornado最新的版本主要推荐使用asyncio async await, 而这篇文章主要探讨早期tornado如何使用generator实现coroutine, 所以这里统一使用tornado 4.5版本

异步和非阻塞io

asynchronous 和 non-blocking这两个概念关系紧密,经常被当作一回事。实际上两者是有差异的,并不是同一个概念

阻塞

一个函数在等待某些执行而没有返回,称为阻塞。阻塞可能有很多原因:网络io,磁盘io,锁等。实际上,每个函数都会阻塞至少一点点,在它运行和使用cpu时(一个极端的例子可以说明cpu的阻塞一样要认真对待: 像bcrypt这样的hashing函数,需要几百毫秒的执行时间,比典型的网络和磁盘io都要慢的多)

一个函数可能在有些部分阻塞,有些不阻塞。比如tornado.httpclient默认在DNS解析时阻塞,在其他网络访问时不阻塞

异步

一个asynchronous函数会在它完成之前返回,一般会触发一些后台的任务,等后台任务完成之后再继续应用中的其他操作(作为对比,同步函数会完成它所有的操作之后才返回)。异步接口有多种风格:

  • Callback argument
  • Return a placeholder ([Future], Promise, Deferred)
  • Deliver to a queue
  • Callback registry (e.g. POSIX signals)
    无论使用哪种风格,异步函数和它的caller的交互从定义上就是不同的

同步函数

同步函数,直观容易理解

import requests
def synchronous_fetch(url):
    response = requests.get(url)
    return response.content

回调风格的异步函数及背后的ioloop

异步函数,回调方式

from tornado.httpclient import AsyncHTTPClient

def asynchronous_fetch(url, callback):
    http_client = AsyncHTTPClient()
    def handle_response(response):
        callback(response.body)
    http_client.fetch(url, callback=handle_response)

def request_callback(body):
    print("request success")
    print(body[:10])

if __name__  == '__main__':
    asynchronous_fetch("http://www.baidu.com",  request_callback)

执行上述代码,并没有print出预期的信息, 这是因为缺少ioloop
实际上asynchronous_fetch调用之后,发起了非阻塞的网络请求,然后就直接返回了。后续等待socket.send和socket.recv完成,及回调request_callback都需要ioloop来完成

上面的代码做一些补充:

if __name__ == '__main__':
   asynchronous_fetch("http://www.baidu.com",  request_callback)
   ioloop = tornado.ioloop.IOLoop.current().start()

重新执行就能看到预期的输出了:

request success
b'<!DOCTYPE '

什么是ioloop呢, 看如下的例子:

import errno
import functools
import tornado.ioloop
import socket

def connection_ready(sock, fd, events):
    while True:
        try:
            connection, address = sock.accept()
        except socket.error as e:
            if e.args[0] not in (errno.EWOULDBLOCK, errno.EAGAIN):
                raise
            return
        connection.setblocking(0)
        handle_connection(connection, address)

def handle_connection(connection, address):
    .........

if __name__ == '__main__':
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.setblocking(0)
    sock.bind(("", port))
    sock.listen(128)

    io_loop = tornado.ioloop.IOLoop.current()  # 单例模式,只创建一个IOLoop
    callback = functools.partial(connection_ready, sock)
    io_loop.add_handler(sock.fileno(), callback, io_loop.READ)
    io_loop.start()

留意这里的socket,都通过sock.setblocking(0)设置为非阻塞
ioloop做的事情就是创建epoll(也可能是kqueue或select ...), 然后add_handler把socket的fd加入epoll,同时记录一个该fd关联的callback,再然后start循环调用epoll wait, 如果发现ready的fd,就调用对应的callback.

tornado的EPollIOLoop的add_handler如下:

  def add_handler(self, fd, handler, events):
        fd, obj = self.split_fd(fd)
        self._handlers[fd] = (obj, stack_context.wrap(handler))
        self._impl.register(fd, events | self.ERROR)

其中self._impl = select.epoll(), 这是python的epoll封装

epoll的c语言api如下:

int epoll_create(int size);
int epoll_create1(int flags);  #extends epoll_crate
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

int epoll_wait(int epfd, struct epoll_event *events,
               int maxevents, int timeout);

更多epoll的介绍: https://www.cnblogs.com/Hijack-you/p/13057792.html

接下来,我们尝试补充下上面例子中的handle_connection:

_ERRNO_WOULDBLOCK = (errno.EWOULDBLOCK, errno.EAGAIN)
_CHUNK_SIZE = 1024

class ConnReader(object):
    def __init__(self):
        self.socket = socket
        self.buffer = ""

    def _read_from_socket(self):
        try:
            chunk = self.socket.recv(_CHUNK_SIZE)
        except socket.error as e:
            if e.args[0] in _ERRNO_WOULDBLOCK:
                return None
            else:
                raise
        return chunk

    def handle_read(self, fd, events):
        while True:
            chunk = self.read_from_socket()
            self.buffer += chunk

            # read block or interupt
            if chunk is None:
                break

            # read finish, remote peer close the socket
            if chunk == "":
                tornado.ioloop.IOLoop.current().remove_handler(connection.fileno())
                print(self.buffer)

这里实现的功能很简单,做为一个server,收到client的连接,就从新建立的connection中读取数据,直到对端关闭socket,然后就print接收到的全部数据。可以看到实现这么一个简单的功能,代码还写的挺复杂,看起来理解起来都比较绕。

tonado对non-blocing的socket做了一层封装, 称作iostream, 目标是能方便的从非阻塞的文件或socket中读写数据.

使用iostream重新实现上面的handle_connection:

import tornado.iostream

def handle_connection(connection, address):
    stream = tornado.iostream.IOStream(connection)
    def handle_result(data):
        print(data)
    stream.read_until_close(handle_result)

可以看到简单了很多,iostream屏蔽了各种epoll/socket的细节,提供了统一read/write + callback的回调风格的接口

再看一个client发起http请求的例子:

import tornado.ioloop
import tornado.iostream
import socket

def send_request():
    stream.write(b"GET / HTTP/1.0\r\nHost: friendfeed.com\r\n\r\n")
    stream.read_until(b"\r\n\r\n", on_headers)

def on_headers(data):
    headers = {}
    for line in data.split(b"\r\n"):
       parts = line.split(b":")
       if len(parts) == 2:
           headers[parts[0].strip()] = parts[1].strip()
    stream.read_bytes(int(headers[b"Content-Length"]), on_body)

def on_body(data):
    print(data)
    stream.close()
    tornado.ioloop.IOLoop.current().stop()

if __name__ == '__main__':
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
    stream = tornado.iostream.IOStream(s)
    stream.connect(("friendfeed.com", 80), send_request)
    tornado.ioloop.IOLoop.current().start()

现在再回头看最开始的异步http请求的例子:

from tornado.httpclient import AsyncHTTPClient

def asynchronous_fetch(url, callback):
    http_client = AsyncHTTPClient()
    def handle_response(response):
        callback(response.body)
    http_client.fetch(url, callback=handle_response)

def request_callback(body):
    print("request success")
    print(body[:10])

if __name__  == '__main__':
    asynchronous_fetch("http://www.baidu.com",  request_callback)
    ioloop = tornado.ioloop.IOLoop.current().start()

这时应该能理解为什么需要有ioloop start, 才能真正执行请求得到结果,以及回调风格的api背后的原理。接下来我们看第二种风格的异步函数

第二种风格的异步函数 Return a placeholder

from tornado.concurrent import Future

def async_fetch_future(url):
    http_client = AsyncHTTPClient()
    my_future = Future()
    def handle_response(response):
        my_future.set_result(response.body)
      http_client.fetch(url, callback=handle_response)

这里async_fetch_future没有接受一个callback参数,而是返回了一个Future类型的placeholder, 这个函数可以和coroutine一起使用, 代码如下:

import tornado.ioloop
from tornado import gen
from tornado.httpclient import AsyncHTTPClient
from tornado.concurrent import Future

def async_fetch_future(url):
    http_client = AsyncHTTPClient()
    my_future = Future()
    def handle_response(response):
        my_future.set_result(response.body)
    http_client.fetch(url, callback=handle_response)
    return my_future

@gen.coroutine
def call():
    body = yield async_fetch_future('http://www.baidu.com')
    print(body[:10])

if __name__ == '__main__':
    call()
    ioloop = tornado.ioloop.IOLoop.current()
    ioloop.start()

留意call函数有2个比较特别的地方

  • 函数加了装饰器@gen.coroutine
  • 调用async_fetch_future时,前面加了yield

具体原因和原理稍后讲,这里先说明下这种风格的异步函数相比回调风格有很大的好处,因为写这种代码的方式几乎是和同步代码一样的,比如我们要链式调用多个http接口:

@gen.coroutine
def call():
    body = yield async_fetch_future('http://www.baidu.com')
    word = body[:10]
    body2 = yield async_fetch_future('http://www.baidu.com?search=word')
    print(body2[:10]) 

if __name__ == '__main__':
    call()
    ioloop = tornado.ioloop.IOLoop.current()
    ioloop.start()

可以看到这里的代码几乎是和同步代码一样的,主要不同就是多了yield关键字
如果用callback的方式,写起来就非常麻烦,读起来也很不直观

这种异步函数之间也可以使用相同的方式相互调用:

@gen.coroutine
def call_inner():
    body = yield async_fetch_future('http://www.163.com')
    return body[:10]

@gen.coroutine
def call():
    body = yield async_fetch_future('http://www.baidu.com')
    word = body[:10]
    body2 = yield async_fetch_future('http://www.baidu.com?search=word')
    body_163 = yield call_inner()
    return body2[:10] + ":" + body_163[:10]

if __name__ == '__main__':
    call()
    ioloop = tornado.ioloop.IOLoop.current()
    ioloop.start()

留意上面的程序,我们先调用call函数,再开启ioloop.start, 实际上调用call函数的标准姿势是是用ioloop.spawn_callback(如果coroutine函数是用asyncio的async定义的,必须用这种方式调用, async定义的协程函数必须由await调用), 修改之后如下:

if __name__ == '__main__':
    ioloop = tornado.ioloop.IOLoop.current()
    ioloop.spawn_callback(call)
    ioloop.start()

这里的ioloop.start()启动了死循环,call函数全部执行,程序还是卡在死循环中不会退出,如果想执行完call自动退出,可以改为ioloop.run_sync, ioloop.run_sync会等待函数执行完成之后退出循环,并且会返回函数的结果,如下:

if __name__ == '__main__':
    ioloop = tornado.ioloop.IOLoop.current()
    call_result = ioloop.run_sync(call)
    print(call_result)

yield和generator

包含yield的函数就是generator,看例子:

def gen_example():
    print('A')
    yield 1
    print('B')
    yield 2
    print('C')
    return 'end'


g = gen_example() # 调用函数之后,得到一个generator对象,实际函数中的代码并未执行
print(type(g))
assert 1==next(g) #代码执行到第一个yield返回,并把yield后面的值返回作为next调用的返回值
assert 2 == next(g) #从上次yield离开

#执行最后一个yield之后的代码,抛出StopIteration, 并把函数中的返回值作为StopIteration的value
try:
    next(g)
except StopIteration as e:
    result = e.value
assert result == 'end'

上面代码的执行结果:

$ python example.py 
<class 'generator'>
A
B
C

包含yield关键字的函数,不再是一个顺序执行的函数。直接调用函数并不会执行函数内的任何代码,而是返回一个generator对象,之后在该对象上调用next就从函数开始处的代码执行,遇到yield就把yield后面的值作为结果返回,下次再调用next时从上次yield离开的地方进入,遇到下一个yield返回。 yield全部返回之后,再执行next会抛出StopIteration,并把函数中的return值作为StopIteration的value。

yield有点像return,把后面的值返回给调用值,只是函数中可以多次yield,每次调用next就执行其中的一段代码。

next也可以用generator.send代替,send方法还能向generator中传入值, 比如:

def gen_example():
    print('A')
    x = yield 1
    print('get value fron send: %s' % x)
    return 'end'

g = gen_example() 
print(type(g))
assert 1==next(g) next调用的返回值
try:
    g.send('SEND_X')
except StopIteration as e:
    result = e.value
assert result == 'end'

执行结果:

$ python example.py 
<class 'generator'>
A
get value fron send: SEND_X

留意上述函数的第二上代码中的x接收了caller通用send发来的值
caller还能通过generator.throw向generator中发送异常, eg:

def gen_example():
    print('A')
    try:
        x = yield 1
    except Exception as e:
        print('get exception from throw: %s' % e)
        raise
    print('get value fron send: %s' % x)
    return 'end'


g = gen_example()
print(type(g))
assert 1==next(g)
g.throw(Exception('test'))

执行结果:

$ python example.py
<class 'generator'>
A
get exception from throw: test
Traceback (most recent call last):
  File "t4.py", line 31, in <module>
    g.throw(Exception('test'))
  File "t4.py", line 20, in gen_example
    x = yield 1
Exception: test

yield和coroutine

@gen.coroutine decorator 通过yield expressions和generator沟通,通过返回一个Future和调用者沟通

下面是一个简化版本的decorator的内部循环:

def run(self):
    # send(x) makes the current yield return x.
    # It returns when the next yield is reached
    future = self.gen.send(self.next)
    def callback(f):
        self.next = f.result()
        self.run()
    future.add_done_callback(callback)

decorator从generator中接收了一个Future, 然后等待这个Future完成,然后展开这个Future并把结果发回generator作为yield表达式的结果。

概括来说@gen.coroutine的作用就是执行/展开generator(通过next或send), 并向调用者返回一个Future

我们来看下tornado gen.coroutine的具体实现:
gen.coroutine 的关键代码

def _make_coroutine_wrapper(func, replace_callback):
    """The inner workings of ``@gen.coroutine`` and ``@gen.engine``.

    The two decorators differ in their treatment of the ``callback``
    argument, so we cannot simply implement ``@engine`` in terms of
    ``@coroutine``.
    """
    # On Python 3.5, set the coroutine flag on our generator, to allow it
    # to be used with 'await'.
    wrapped = func
    if hasattr(types, 'coroutine'):
        func = types.coroutine(func)

    @functools.wraps(wrapped)
    def wrapper(*args, **kwargs):
        future = TracebackFuture()

        if replace_callback and 'callback' in kwargs:
            callback = kwargs.pop('callback')
            IOLoop.current().add_future(
                future, lambda future: callback(future.result()))

        try:
            result = func(*args, **kwargs)
        except (Return, StopIteration) as e:
            result = _value_from_stopiteration(e)
        except Exception:
            future.set_exc_info(sys.exc_info())
            return future
        else:
            if isinstance(result, GeneratorType):
                # Inline the first iteration of Runner.run.  This lets us
                # avoid the cost of creating a Runner when the coroutine
                # never actually yields, which in turn allows us to
                # use "optional" coroutines in critical path code without
                # performance penalty for the synchronous case.
                try:
                    orig_stack_contexts = stack_context._state.contexts
                    yielded = next(result)
                    if stack_context._state.contexts is not orig_stack_contexts:
                        yielded = TracebackFuture()
                        yielded.set_exception(
                            stack_context.StackContextInconsistentError(
                                'stack_context inconsistency (probably caused '
                                'by yield within a "with StackContext" block)'))
                except (StopIteration, Return) as e:
                    future.set_result(_value_from_stopiteration(e))
                except Exception:
                    future.set_exc_info(sys.exc_info())
                else:
                    _futures_to_runners[future] = Runner(result, future, yielded)
                yielded = None
                try:
                    return future
                finally:
                    # Subtle memory optimization: if next() raised an exception,
                    # the future's exc_info contains a traceback which
                    # includes this stack frame.  This creates a cycle,
                    # which will be collected at the next full GC but has
                    # been shown to greatly increase memory usage of
                    # benchmarks (relative to the refcount-based scheme
                    # used in the absence of cycles).  We can avoid the
                    # cycle by clearing the local variable after we return it.
                    future = None
        future.set_result(result)
        return future

我们看下上面代码的一些关键部分:

  • 第16行,创建了一个Future对象,作为整个整个协程函数调用返回值的placeholder
  • 第24行,表示调用原始的函数,如果函数中有yield,这里的result就是一个generator对象
  • 如果没有异常,接下里会执行第31行及后面的分支代码
  • 接下来到第39行,使用next方法,首次对generator执行迭代,即执行原始函数开始部分到第一个yield之间的语句,返回到yielded就是函数中第一个yield后面的表达式的值。注意yield后面的expresston的值必须是一个可等待的对象,典型的值是Future对象,否则会抛异常
  • 接下来如果代码没有异常,会执行到第51行,调用Runner(下面贴出了runner的代码)
  • runner init中的几个参数明确下, gen: 原始函数调用之后返回的generator对象, result_future: 整个协程函数调用返回值的placeholder, first_yield: 就是上面首次调用next的返回值,典型值是Future对象
  • 继续看runner中第14行中调用了handle_yield, 该函数做了很多兼容处理(处理YieldPoint, list, dict等,支持多个可等待对象),最终会把yielded统一转为Future对象,赋给self.future, 典型代码第89行: self.future = convert_yielded(yielded)。 接下里的关键逻辑是94行到102行,这里先判断self.future是否已经完成,典型的异步调用场景,这里会是未完成。
  • 先看self.future未完成(还没有通过future.set_result设置结果), 会调用self.io_loop.add_future给self.future设置回调,并返回Flase,而第14行这里检查到False,就不会执行下面的self.run, 而是直接返回。而runner的调用者gen.coroutine代码的第51行也会返回,再到第54行,返回整个协程函数的future。至此,整个协程函数的调用返回了一个空的Future对象,而协程函数中的代码执行到第一个yield expression之后中断了
  • 总结下,调用gen.coroutine装饰的协程函数的典型执行流程: 执行到函数中的第一个yield, 返回一个空Future placeholder.
  • 之后的执行由ioloop和Runner驱动,再回到runner中94行到102行,self.io_loop.add_future这里是关键,这里future对象后面的异步调用完成之后,就会调用self.future.set_result, 并触发这里的回调函数inner, 然后再次执行self.run
  • self.run的关键代码看36行和第50行, 低36行把yield后面返回的Future对象展开,获取其中的result,然后第50行调用self.gen.send把result发给generator, generator拿到异步调用的结果,并继续执行直到下一个yield, 如此循环,直到函数执行结束,抛出StopIteration, 第57行捕获该异常, 取StopIteration的value给整个协程函数的Future占位符set_result(第68行). 至此协程函数的调用和执行全部完成
  • 再看runner代码中第94行到102行,这里判断self.future是否已经完成,如果已经完成直接返回True,可以认为是一些特殊情况,比如yield后面调用的异步函数执行更快,在检查之前完成了,不用再等异步函数的回调, 继续执行self.run就可以了
  • 关于异常处理: 协程函数执行过程中的Exception不会直接抛出,而是统一捕获,设置到Future中(从gen.coroutine的第28行可以看到),调用某个协程函数过程中展开Future对象,如果结果是Exception,则通过gen.throw发送caller的generator中(可以从runner代码的第44行看到)

runner的关键代码

class Runner(object):
    """Internal implementation of `tornado.gen.engine`.

    Maintains information about pending callbacks and their results.

    The results of the generator are stored in ``result_future`` (a
    `.TracebackFuture`)
    """
    def __init__(self, gen, result_future, first_yielded):
        self.gen = gen
        self.result_future = result_future
        self.future = _null_future
        #.......... 省略 ............ #
        if self.handle_yield(first_yielded):
            gen = result_future = first_yielded = None
            self.run()

    def run(self):
        """Starts or resumes the generator, running until it reaches a
        yield point that is not ready.
        """
        if self.running or self.finished:
            return
        try:
            self.running = True
            while True:
                future = self.future
                if not future.done():
                    return
                self.future = None
                try:
                    orig_stack_contexts = stack_context._state.contexts
                    exc_info = None

                    try:
                        value = future.result()
                    except Exception:
                        self.had_exception = True
                        exc_info = sys.exc_info()
                    future = None

                    if exc_info is not None:
                        try:
                            yielded = self.gen.throw(*exc_info)
                        finally:
                            # Break up a reference to itself
                            # for faster GC on CPython.
                            exc_info = None
                    else:
                        yielded = self.gen.send(value)

                    if stack_context._state.contexts is not orig_stack_contexts:
                        self.gen.throw(
                            stack_context.StackContextInconsistentError(
                                'stack_context inconsistency (probably caused '
                                'by yield within a "with StackContext" block)'))
                except (StopIteration, Return) as e:
                    self.finished = True
                    self.future = _null_future
                    if self.pending_callbacks and not self.had_exception:
                        # If we ran cleanly without waiting on all callbacks
                        # raise an error (really more of a warning).  If we
                        # had an exception then some callbacks may have been
                        # orphaned, so skip the check in that case.
                        raise LeakedCallbackError(
                            "finished without waiting for callbacks %r" %
                            self.pending_callbacks)
                    self.result_future.set_result(_value_from_stopiteration(e))
                    self.result_future = None
                    self._deactivate_stack_context()
                    return
                # .......... 省略 ........... #
                if not self.handle_yield(yielded):
                    return
                yielded = None
        finally:
            self.running = False

    def handle_yield(self, yielded):
        # Lists containing YieldPoints require stack contexts;
        # other lists are handled in convert_yielded.
        if _contains_yieldpoint(yielded):
            yielded = multi(yielded)

        if isinstance(yielded, YieldPoint):
            #.......... 省略 ..........
        else:
            try:
                self.future = convert_yielded(yielded)
            except BadYieldError:
                self.future = TracebackFuture()
                self.future.set_exc_info(sys.exc_info())

        if not self.future.done() or self.future is moment:
            def inner(f):
                # Break a reference cycle to speed GC.
                f = None # noqa
                self.run()
            self.io_loop.add_future(
                self.future, inner)
            return False
        return True

coroutine总结

结合一个例子描述coroutine的调用过程,算是总结

import tornado.ioloop
from tornado.httpclient import AsyncHTTPClient
from tornado.concurrent import Future
from tornado import gen

def async_fetch_future(url):
    http_client = AsyncHTTPClient()
    my_future = Future()
    def handle_response(response):
        my_future.set_result(response.body)
    http_client.fetch(url, callback=handle_response)
    return my_future

@gen.coroutine
def call_inner():
    body = yield async_fetch_future('http://www.163.com')
    return body[:10]

@gen.coroutine
def call():
    body = yield async_fetch_future('http://www.baidu.com')
    word = body[:10]
    body2 = yield async_fetch_future('http://www.baidu.com?search=word')
    body_163 = yield call_inner()
    return body2[:10] + b":" + body_163[:10]

if __name__ == '__main__':
    ret = tornado.ioloop.IOLoop.current().run_sync(call)
    print(ret)
  • 从ioloop的run_sync开始,call函数交给ioloop在后台执行
  • ioloop.start()开始事件循环
  • 调用call会返回一个Future对象作为整个协程函数调用的返回值placeholder, run_sync通过闭包变量持有该Future的引用
  • 协程函数首次执行到第一个yield处async_fetch_future调用之后中断(第21行)
  • async_fetch_future是个异步网络请求函数,在背后的ioloop, iostream等帮助下,在后台完成各种网络往返之后,得到请求的最终结果,触发结果回调函数handle_response(第9行)
  • 第9行的handle_response会执行my_future.set_result(response.body),再次触发回调, 执行call协程的gen.send,并且会把my_future.result()作为send的参数。 call函数基础执行,这时第21行的body就是第一个网络请求响应的body了
  • call函数继续执行到第2个yield处中断(第23行), 等待异步函数完成,通过回调继续执行call函数,这时body2拿到了第二个网络请求的响应body
  • 第三个yield后边是调用了一个协程函数call_inner, 执行逻辑是一样的,call_inner也会返回一个Future对象,协程函数按层级由外到内调用,每一层都是执行到yield处中断,返回Future对象。之后通过Future从内向外逐层回调恢复外层函数的执行流,并传递返回值

通过上面的分析,可以明确知道所有协程函数都要加上@gen.coroutine逐步,并且在协程函数中调用其他协程函数或可等待对象时都要加上yield关键字

补充

阻塞函数

因为协程的方式在一个线程上通过非阻塞io + ioloop + 回调的方式实现并发, 所有的代码执行都是在同一个线程上执行的。所以中间不能有任何阻塞函数的调用,比如time.sleep(10)会导致整个线程卡住10s,无法执行其他任何代码,作为一个server就意味着无法响应任何请求。 time.sleep可以用gen.sleep代替,避免阻塞, eg:

from tornado import gen

@gen.coroutine
def call_sleep():
    gen.sleep(10)
    print('sleep end')

如果没有可用的异步函数,调用阻塞io的函数,可以在让阻塞函数在另外一个线程中执行,使用线程池的例子如下:

thread_pool = ThreadPoolExecutor(4)

@gen.coroutine
def call_blocking():
    yield thread_pool.submit(blocking_func, args)

并发

gen.coroutine支持值是Future的list和dict,可以并行等待所有的Future对象:

@gen.coroutine
def parallel_fetch(url1, url2):
    resp1, resp2 = yield [http_client.fetch(url1),
                          http_client.fetch(url2)]

@gen.coroutine
def parallel_fetch_many(urls):
    responses = yield [http_client.fetch(url) for url in urls]
    # responses is a list of HTTPResponses in the same order

@gen.coroutine
def parallel_fetch_dict(urls):
    responses = yield {url: http_client.fetch(url)
                        for url in urls}
    # responses is a dict {url: HTTPResponse}

调用callback风格的接口

调用callback风格的异步接口,可以用Task包装一下,eg:

@gen.coroutine
def call_task():
    # Note that there are no parens on some_function.
    # This will be translated by Task into
    #   some_function(other_args, callback=callback)
    yield gen.Task(some_function, other_args)

留意这里要求some_function的回调参数名必须是callback, 并且Task调用返回的Future对象的result是callback函数接收的所有参数, 如果有多个参数的话,包装位一个Arguments对象:

Arguments = collections.namedtuple('Arguments', ['args', 'kwargs'])
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 222,252评论 6 516
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 94,886评论 3 399
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 168,814评论 0 361
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,869评论 1 299
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,888评论 6 398
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 52,475评论 1 312
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 41,010评论 3 422
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,924评论 0 277
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 46,469评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,552评论 3 342
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,680评论 1 353
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 36,362评论 5 351
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 42,037评论 3 335
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,519评论 0 25
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,621评论 1 274
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 49,099评论 3 378
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,691评论 2 361

推荐阅读更多精彩内容

  • 原创文章出自公众号:「码农富哥」,如需转载请请注明出处!文章如果对你有收获,可以收藏转发,这会给我一个大大鼓励哟!...
    大富帅阅读 9,927评论 3 21
  • 参考资料 https://www.cnblogs.com/becker/p/9335136.html Tornad...
    JunChow520阅读 2,277评论 0 5
  • 协程的定义 函数入口:有且只有一个入口出口:有且只有一个出口 协程入口:多个入口出口:多个出口特点:暂定,保留执行...
    龙猫六六阅读 740评论 0 50
  • Tornado 所谓的异步:就是你调用我之后,我发现数据没准备好,那我就不处理,而是跳到程序的其他地方继续执行,等...
    OMSobliga阅读 6,117评论 3 11
  • 参考:tornado中的协程是如何工作的 写在之前 基础依旧很弱,欠缺理论知识(实践就更不要说了),很多东西都只知...
    蒋狗阅读 3,684评论 0 0