Tornado应用笔记03-协程与异步示例

索引

本节内容以日常开发中常见的异步场景为基础, 给出Tornado定义的协程和异步示例, 其中的代码稍加修改就可以用到实际项目中. 另外, 本节内容不会对其中原理做进一步说明, 原理分析将放到下一节.

常用异步应用示例

  • 非阻塞 sleep
  • 用线程池处理阻塞操作
  • 异步HTTP请求
  • IOLoop事件(定时, 回调)
  • 长连接输出(RequestHandler.flush)
  • 后台定时任务
  • 循环
非阻塞 sleep
# 下面三种方法实现的功能都是, 异步sleep 2秒, 然后输出 "i sleep 2s"

# 推荐的写法, `.gen.sleep`是`tornado.gen`对`IOLoop`操作的封装
class NonBlockSleep(tornado.web.RequestHandler):
    @tornado.gen.coroutine
    def get(self):
        yield tornado.gen.sleep(2)
        self.finish("i sleep 2s")


# 本质上和第一个方法几乎没差别, 相当于上面的原始版
class NonBlockSleep(tornado.web.RequestHandler):
    @tornado.gen.coroutine
    def get(self):
        yield tornado.gen.Task(tornado.ioloop.IOLoop.current().add_timeout, time.time() + 2)
        self.finish("i sleep 2s")


# 采用异步回调
class NonBlockSleep(tornado.web.RequestHandler):
    @tornado.web.asynchronous
    def get(self):
        tornado.ioloop.IOLoop.instance().add_timeout(time.time() + 2, callback=self.awake)

    def awake(self):
        self.finish("i sleep 2s")
用线程池处理阻塞操作

这里需要用到一个新的包futures, 通过pip install futures安装即可.

单任务, 无回调, 需要用到阻塞操作结果

两种方式实现非阻塞计算, 完成计算后输出结果(不需要操作结果时, 把yield@coroutine去掉即可)

# 使用 submit, 较原始的方式, 未经过Tornado封装 
class CoroutineWithThreadPool(tornado.web.RequestHandler):
    @property
    def executor(self):
        # 下面两种实际上是一样的
        # return concurrent.futures.ThreadPoolExecutor(2)
        return tornado.concurrent.futures.ThreadPoolExecutor(2)

    @tornado.gen.coroutine
    def get(self, *args, **kwargs):
        s = time.time()
        result = yield self.executor.submit(self._calculate, *(1,))
        used_time = time.time() - s
        self.finish('calculate completed, used %.3f s, result is %s' % (used_time, result))

    def _calculate(self, num=0):
        for i in xrange(100000000):
            num += 1
        return num


# 使用 run_on_executor , 更推荐这种做法
class CoroutineWithThreadPool(tornado.web.RequestHandler):
    executor = concurrent.futures.ThreadPoolExecutor(2)

    @tornado.gen.coroutine
    def get(self, *args, **kwargs):
        s = time.time()
        result = yield self._calculate(1)
        used_time = time.time() - s
        self.finish('calculate completed, used %.3f s, result is %s' % (used_time, result))

    @tornado.concurrent.run_on_executor
    def _calculate(self, num=0):
        for i in xrange(100000000):
            num += 1
        return num
单任务, 带回调, 需要用到阻塞操作结果, 蹩脚原始实现
class CoroutineWithThreadPool(tornado.web.RequestHandler):
    @tornado.web.asynchronous
    def get(self, *args, **kwargs):
        future = self.executor.submit(self._calculate, *(1,))
        tornado.ioloop.IOLoop.current().add_future(future, self.result_callback)

    # 阻塞操作的回调
    def block_callback(self):
        print 'after block func callback'

    # 获取阻塞操作的结果
    def result_callback(self, future):
        tornado.ioloop.IOLoop.current().add_callback(self.block_callback)
        self.finish('the calculate result is |%s|' % future.result())

    def _calculate(self, num=0):
        for i in xrange(100000000):
            num += 1
        return num

多任务, 带回调, 需要用到阻塞操作结果
class CoroutineWithThreadPool(tornado.web.RequestHandler):
    @property
    def executor(self):
        return concurrent.futures.ThreadPoolExecutor(2)

    @property
    def io_loop(self):
        '''
        使用run_on_executor并为future添加callback的时候, 需要设置`self.io_loop`属性
        实际上`run_on_executor`也提供了给`io_loop`和`executor`改名的功能, 使用方法:
            @property
            def my_io_loop(self):
                return tornado.ioloop.IOLoop.current()

            @property
            def my_executor(self):
                return self.application.executor

            @tornado.concurrent.run_on_executor(io_loop='my_io_loop', executor='my_executor')
            def block_func(*args, **kwargs):
                pass

        callback直接在调用需要执行的函数时, 当做普通参数传入即可,
        `run_on_executor`这个装饰器使用后会`pop`掉, 无须担心报错
        '''
        return tornado.ioloop.IOLoop.current()

    @tornado.gen.coroutine
    def get(self, *args, **kwargs):
        s = time.time()

        calculate_result, sleep_result = yield [
            self._calculate(2, callback=self.executor_callback),
            self._sleep(3),
        ]
        '''
        使用字典实现
        multi_task_result = yield {
            'calculate': self._calculate(1),
            'sleep': self._sleep(3),
        }

        calculate_result, sleep_result = multi_task_result['calculate'], multi_task_result['sleep']
        '''
        print sleep_result
        used_time = time.time() - s
        self.finish('calculate and sleep completed used %.3f s, %s, the calculate result is %s' %
                    (used_time, sleep_result, calculate_result))

    def executor_callback(self, future_result):
        print 'future is done, and the result is |%s|.' % future_result

    @tornado.concurrent.run_on_executor
    def _calculate(self, num=0):
        for i in xrange(100000000):
            num += 1
        return num

    @tornado.concurrent.run_on_executor
    def _sleep(self, seconds=0):
        time.sleep(seconds)
        return 'sleep used %s seconds' % seconds

异步HTTP请求
# 异步回调
class AsyncFetch(tornado.web.RequestHandler):
    @tornado.web.asynchronous
    def get(self):
        http_client = tornado.httpclient.AsyncHTTPClient()
        http_client.fetch("http://www.baidu.com", callback=self.on_response)

    def on_response(self, response):
        r = response
        # body, 状态码, 请求耗时, headers
        print r.body, r.code, r.request_time
        print {k: v for k, v in r.headers.items()}
        self.finish('fetch completed')


# 协程
class AsyncFetch(tornado.web.RequestHandler):
    @tornado.gen.coroutine
    def get(self):
        http_client = tornado.httpclient.AsyncHTTPClient()
        response = yield http_client.fetch("http://example.com")
        self.on_response(response)
        self.finish('fetch completed')

    def on_response(self, response):
        print response


# 原始实现
class AsyncFetch(tornado.web.RequestHandler):
    def get(self, *args, **kwargs):
        self._auto_finish = False

        tornado.httpclient.AsyncHTTPClient.configure(
            None,
            defaults=dict(
                user_agent="MyUserAgent"
            ),
            max_clients=20,
        )
        client = tornado.httpclient.AsyncHTTPClient()

        fetch_future = client.fetch('http://www.baidu.com', request_timeout=2)
        # 下面两种方法均可以实现future done回调, 不过tornado更推荐`add_future`的做法
        tornado.ioloop.IOLoop.current().add_future(fetch_future, callback=self.on_response)
        # fetch_future.add_done_callback(self.on_response)

    def on_response(self, future):
        http_response = future.result()
        print http_response
        result = dict(http_response.headers)
        result.update({'content': http_response.body})
        # raise ValueError  # 异常情况下,
        self.finish(result)

IOLoop事件(定时, 回调)
class IOLoopCallback(tornado.web.RequestHandler):
    def get(self, *args, **kwargs):
        print time.time()

        io_loop = tornado.ioloop.IOLoop.current()

        # 定时任务, 将任务丢给IOLoop, 3秒后执行
        io_loop.add_timeout(io_loop.time() + 3, callback=functools.partial(self.callback_timeout))

        # 回调任务, 将任务丢给IOLoop, 由下一个Loop调用
        io_loop.add_callback(self.callback_next_loop, None)

        # sleep 会阻塞 IOLoop, 所以上面的 `IOLoop.add_timeout` 是相对的, 
        # 如果一直阻塞, 就不可能及时响应
        # time.sleep(4) # 阻塞实验

    def callback_timeout(self):
        print 'callback_timeout at the time %s' % time.time()

    def callback_next_loop(self, useless=None):
        print 'callback_next_loop at the time %s' % time.time()

长连接输出(RequestHandler.flush)
class Flush(tornado.web.RequestHandler):
    @tornado.gen.coroutine
    def get(self):
        self.write('<h1>sleeping...</h1>')
        self.flush()
        yield tornado.gen.sleep(2)
        self.finish('<h1>awake</h1>')
后台定时任务

方式1:

@tornado.gen.coroutine
def do_something(func_name):
    print 'from %s n do_something at %s' % (func_name, int(time.time()))


@tornado.gen.coroutine
def minute_loop1():
    """实际上循环周期是(60 + n)秒, n为`do_something`执行时间, 非严格60s"""
    while True:
        yield do_something(minute_loop1.__name__)
        yield tornado.gen.sleep(1)  # 开始计时, 并等待计时完成


@tornado.gen.coroutine
def minute_loop2():
    """比较严格的60s周期循环"""
    while True:
        sleep = tornado.gen.sleep(2)  # 开始计时
        yield do_something(minute_loop2.__name__)  # 执行间隔协程任务
        yield sleep  # "等待"计时结束


# 启动方法
tornado.ioloop.IOLoop.current().spawn_callback(minute_loop1)
tornado.ioloop.IOLoop.current().spawn_callback(minute_loop2)

方式2:

# tornado.ioloop.PeriodicCallback(callback, callback_time, io_loop=None)

# 需要注意`callback_time`的单位是`微秒`, 一般`PeriodicCallback`是不执行`协程`任务的,
# 另外如果执行的`callback`耗时比`callback_time`还要长, 那么
# 应该到点执行的下一次`callback`会被跳过,并放回到执行列表中, 在下一次到点的时候执行

COUNT = 0

def periodic_callback_print():
    global COUNT
    if COUNT < 3:
        COUNT += 1
        time.sleep(2)
        print 'i have been call back %s times and now is %s' % (COUNT, int(time.time()))

ms_loop_time = 1000

# 启动方法, 需要先创建任务, 然后才能启动
# 创建任务
periodic_schedules_one = tornado.ioloop.PeriodicCallback(periodic_callback_print, ms_loop_time)
# 启动
periodic_schedules_one.start()
# 确认状态
assert periodic_schedules_one.is_running()
# 停止
periodic_schedules_one.stop()

循环/迭代

Python 3.5之前, 在协程中实现迭代会比较麻烦, 你需要将循环的条件与yield结果分离. 例如下面这个使用Motor(异步MongoDB驱动)的例子. 不过在Python 3.5+里面, 新增的async for可以实现异步迭代.

import motor
db = motor.MotorClient().test

# Python 3.5- 实现
@gen.coroutine
def loop_example(collection):
    cursor = db.collection.find()
    while (yield cursor.fetch_next):
        doc = cursor.next_object()
        ...

# Python 3.5+ 实现
async def loop_example(collection):
    cursor = db.collection.find()
    async for doc in cursor:
        ...

本节内容就是这些, 下节内容将分析Tornado协程和异步实现的部分源码.

NEXT ===> Tornado应用笔记04-浅析源码

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,245评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,749评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,960评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,575评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,668评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,670评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,664评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,422评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,864评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,178评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,340评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,015评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,646评论 3 323
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,265评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,494评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,261评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,206评论 2 352

推荐阅读更多精彩内容