python之gevent(3)

在之前的文章中已经讲过了gevent的使用、gevent的底层greenlet的使用以及gevent调度的源码分析,可以阅读文章回顾一下:python之gevent(1)python之greenletpython之gevent(2)。本文将带大家一起学习几个gevent比较重要的模块,包括Timeout,Event/AsynResult,Semphore,socket patch,这些模块都涉及当前协程与hub的切换。

Timeout

这个类在gevent.timeout模块,其作用是超时后在当前协程抛出异常,这样执行流程也强制回到了当前协程。看一个简单的例子:

import time
import gevent
from gevent.timeout import Timeout


SLEEP = 3
TIMEOUT = 2

timeout = Timeout(TIMEOUT)
timeout.start()


def wait():
    gevent.sleep(SLEEP)
    print('log in wait')


begin = time.time()
try:
    gevent.spawn(wait).join()
except Timeout:
    print('after %s catch Timeout Exception' % (time.time() - begin))
finally:
    timeout.cancel()

运行这段代码,输出如下:


image.png

可以看出,在2s之后在main协程抛出了Timeout异常(继承自BaseException)。Timeout的实现的核心在start函数:

    def start(self):
        """Schedule the timeout."""
        if self.pending:
            raise AssertionError('%r is already started; to restart it, cancel it first' % self)

        if self.seconds is None:
            # "fake" timeout (never expires)
            return

        if self.exception is None or self.exception is False or isinstance(self.exception, string_types):
            # timeout that raises self
            throws = self
        else:
            # regular timeout with user-provided exception
            throws = self.exception

        # Make sure the timer updates the current time so that we don't
        # expire prematurely.
        self.timer.start(getcurrent().throw, throws, update=True)

从代码中可以看到,在超时之后调用了getcurrent().throw(),throw方法会切换协程,并抛出异常(在上面的代码中默认抛出Timeout异常)。使用Timeout有几点需要注意:

第一:一定要记得在finally调用cancel,否则如果协程先于TIMEOUT时间恢复,之后还会抛出异常,例如下面的代码:

import gevent
from gevent import Timeout

SLEEP = 4
TIMEOUT = 5

timeout = Timeout(TIMEOUT)
timeout.start()

def wait():
    gevent.sleep(SLEEP)
    print('log in wait')

begin = time.time()
try:
    gevent.spawn(wait).join()
except Timeout:
    print('after %s catch Timeout Exception'  % (time.time() - begin))
# finally:    
#     timeout.cancel()

gevent.sleep(2)
print 'program will finish'

#协程先于超时恢复

上述的代码运行会抛出Timeout异常,在这个例子中,协程先于超时恢复(SLEEP < TIMEOUT),且没有在finally中调用Timeout.cancel。最后的两行保证程序不要过早结束退出,那么在hub调度的时候会重新抛出异常。

由于Timeout实现了with协议(enterexit方法),更好的写法是将TImeout写在with语句中,如下面的代码:

import gevent
from gevent import Timeout

SLEEP = 4
TIMEOUT = 5


def wait():
    gevent.sleep(SLEEP)
    print('log in wait')

with Timeout(TIMEOUT):
    begin = time.time()
    try:
        gevent.spawn(wait).join()
    except Timeout:
        print('after %s catch Timeout Exception'  % (time.time() - begin))

gevent.sleep(2)
print 'program will finish'

#Timeout with

第二:Timeout只是切换到当前协程,并不会取消已经注册的协程(上面通过spawn发起的协程),我们改改代码:

import gevent
from gevent import Timeout

SLEEP = 6
TIMEOUT = 5

timeout = Timeout(TIMEOUT)
timeout.start()

def wait():
    gevent.sleep(SLEEP)
    print('log in wait')

begin = time.time()
try:
    gevent.spawn(wait).join()
except Timeout:
    print('after %s catch Timeout Exception'  % (time.time() - begin))
finally:    
    timeout.cancel()

gevent.sleep(2)
print 'program will finish'
# output:
# after 5.00100016594 catch Timeout Exception
# log in wait
# program will finish

#Timeout不影响发起的协程

从输出可以看到,即使因为超时切回了main greenlet,但spawn发起的协程并不受影响。如果希望超时取消之前发起的协程,那么可以在捕获到异常之后调用 Greenlet.kill

第三:gevent对可能导致当前协程挂起的函数都提供了timeout参数,用于在指定时间到达之后恢复被挂起的协程。在函数内部会捕获Timeout异常,并不会抛出。例如:

SLEEP = 6
TIMEOUT = 5


def wait():
    gevent.sleep(SLEEP)
    print('log in wait')


begin = time.time()
try:
    gevent.spawn(wait).join(TIMEOUT)
except Timeout:
    print('after %s catch Timeout Exception' % (time.time() - begin))

print 'program will exit', time.time() - begin

#函数的timeout参数

Event & AsyncResult

Event用来在Greenlet之间同步,tutorial上的例子简单明了:

import gevent
from gevent.event import Event

'''
Illustrates the use of events
'''


evt = Event()

def setter():
    '''After 3 seconds, wake all threads waiting on the value of evt'''
    print('A: Hey wait for me, I have to do something')
    gevent.sleep(3)
    print("Ok, I'm done")
    evt.set()


def waiter():
    '''After 3 seconds the get call will unblock'''
    print("I'll wait for you")
    evt.wait()  # blocking
    print("It's about time")

def main():
    gevent.joinall([
        gevent.spawn(setter),
        gevent.spawn(waiter),
        gevent.spawn(waiter),

    ])

if __name__ == '__main__': main()

Event Example

Event主要的两个方法是set和wait:wait等待事件发生,如果事件未发生那么挂起该协程;set通知事件发生,然后hub会唤醒所有wait在该事件的协程。从输出可知, 一次event触发可以唤醒所有在该event上等待的协程。AsyncResult同Event类似,只不过可以在协程唤醒的时候传值(有点类似generator的next send的区别)。接下来大致看看Event的set和wait方法。

Event.wait的核心代码在gevent.event._AbstractLinkable._wait_core,其中_AbstractLinkable是Event的基类。_wait_core源码如下:

def _wait_core(self, timeout, catch=Timeout):
        # The core of the wait implementation, handling
        # switching and linking. If *catch* is set to (),
        # a timeout that elapses will be allowed to be raised.
        # Returns a true value if the wait succeeded without timing out.
        switch = getcurrent().switch
        self.rawlink(switch)
        try:
            timer = Timeout._start_new_or_dummy(timeout)
            try:
                try:
                    result = self.hub.switch()
                    if result is not self: # pragma: no cover
                        raise InvalidSwitchError('Invalid switch into Event.wait(): %r' % (result, ))
                    return True
                except catch as ex:
                    if ex is not timer:
                        raise
                    # test_set_and_clear and test_timeout in test_threading
                    # rely on the exact return values, not just truthish-ness
                    return False
            finally:
                timer.cancel()
        finally:
            self.unlink(switch)

首先是将当前协程的switch加入到Event的callback列表,然后切换到hub。

接下来是set函数:

    def set(self):
        self._flag = True # make event ready
        self._check_and_notify()
    def _check_and_notify(self):
        # If this object is ready to be notified, begin the process.
        if self.ready():
            if self._links and not self._notifier:
                self._notifier = self.hub.loop.run_callback(self._notify_links)

_check_and_notify函数通知hub调用_notify_links, 在这个函数中将调用Event的callback列表(记录的是之前各个协程的switch函数),这样就恢复了所有wait的协程。

Semaphore & Lock

Semaphore是gevent提供的信号量,实例化为Semaphore(value), value代表了可以并发的量。当value为1,就变成了互斥锁(Lock)。Semaphore提供了两个函数,acquire(P操作)和release(V操作)。当acquire操作导致资源数量将为0之后,就会在当前协程wait,源代码如下(gevent._semaphore.Semaphore.acquire):

def acquire(self, blocking=True, timeout=None):
        
        if self.counter > 0:
            self.counter -= 1
            return True

        if not blocking:
            return False

        timeout = self._do_wait(timeout)
        if timeout is not None:
            # Our timer expired.
            return False

        # Neither our timer no another one expired, so we blocked until
        # awoke. Therefore, the counter is ours
        self.counter -= 1
        assert self.counter >= 0
        return True

逻辑比较简单,如果counter数量大于0,那么表示可并发。否则进入wait,_do_wait的实现与Event.wait十分类似,都是记录当前协程的switch,并切换到hub。当资源足够切换回到当前协程,此时counter一定是大于0的。由于协程的并发并不等同于线程的并发,在任意时刻,一个线程内只可能有一个协程在调度,所以上面对counter的操作也不用加锁。

Monkey-Patch

对于python这种动态语言,在运行时替换模块、类、实例的属性都是非常容易的。我们以patch_socket为例:

image.png

可见在patch前后,同一个名字(socket)所指向的对象是不一样的。在python2.x环境下,patch后的socket源码在gevent._socket2.py,如果是python3.x,那么对应的源码在gevent._socket3.py.。至于为什么patch之后就让原生的socket操作可以在协程之间协作,看两个函数socket.init 和 socket.recv就明白了。

init函数(gevent._socket2.socket.init):

def __init__(self, family=AF_INET, type=SOCK_STREAM, proto=0, _sock=None):
        if _sock is None:
            self._sock = _realsocket(family, type, proto) # 原生的socket
            self.timeout = _socket.getdefaulttimeout()
        else:
            if hasattr(_sock, '_sock'):
                self._sock = _sock._sock
                self.timeout = getattr(_sock, 'timeout', False)
                if self.timeout is False:
                    self.timeout = _socket.getdefaulttimeout()
            else:
                self._sock = _sock
                self.timeout = _socket.getdefaulttimeout()
            if PYPY:
                self._sock._reuse()
        self._sock.setblocking(0) #设置成非阻塞
        fileno = self._sock.fileno()
        self.hub = get_hub()    # hub
        io = self.hub.loop.io
        self._read_event = io(fileno, 1) # 监听事件
        self._write_event = io(fileno, 2)

从init函数可以看到,patch后的socket还是会维护原生的socket对象,并且将原生的socket设置成非阻塞,当一个socket是非阻塞时,如果读写数据没有准备好,那么会抛出EWOULDBLOCK\EAGIN异常。最后两行注册socket的可读和可写事件。再来看看recv函数(gevent._socket2.socket.recv):

def recv(self, *args):
        sock = self._sock  # keeping the reference so that fd is not closed during waiting
        while True:
            try:
                return sock.recv(*args) # 如果数据准备好了,直接返回
            except error as ex:
                if ex.args[0] != EWOULDBLOCK or self.timeout == 0.0:
                    raise
                # QQQ without clearing exc_info test__refcount.test_clean_exit fails
                sys.exc_clear()
            self._wait(self._read_event) # 等待数据可读的watcher

如果在while循环中读到了数据,那么直接返回。但实际很大概率数据并没有准备好,对于非阻塞socket,抛出EWOULDBLOCK异常。在最后一行,调用wait,注册当前协程switch,并切换到hub,当read_event触发时,表示socket可读,这个时候就会切回当前协程,进入下一次while循环。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,884评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,755评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,369评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,799评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,910评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,096评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,159评论 3 411
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,917评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,360评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,673评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,814评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,509评论 4 334
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,156评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,882评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,123评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,641评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,728评论 2 351

推荐阅读更多精彩内容

  • 在之前,我已经在两篇文章中分别介绍了gevent的使用以及gevent的底层greenlet的使用,可以阅读文章回...
    WolfLC阅读 3,265评论 0 7
  • 一. 操作系统概念 操作系统位于底层硬件与应用软件之间的一层.工作方式: 向下管理硬件,向上提供接口.操作系统进行...
    月亮是我踢弯得阅读 5,962评论 3 28
  • 进程 线程 协程 异步 并发编程(不是并行)目前有四种方式:多进程、多线程、协程和异步。 多进程编程在python...
    hugoren阅读 4,956评论 1 4
  • 前述 进程 线程 协程 异步 并发编程(不是并行)目前有四种方式:多进程、多线程、协程和异步。 多进程编程在pyt...
    softlns阅读 6,330评论 2 24
  • 必备的理论基础 1.操作系统作用: 隐藏丑陋复杂的硬件接口,提供良好的抽象接口。 管理调度进程,并将多个进程对硬件...
    drfung阅读 3,531评论 0 5