[python] 线程间同步之条件变量Condition

为什么需要条件变量

有了前面提到的互斥锁,为什么还需要条件变量呢,当然是由于有些复杂问题互斥锁搞不定了。Python提供的Condition对象提供了对复杂线程同步问题的支持。Condition被称为条件变量,除了提供与Lock类似的acquirerelease方法外,还提供了waitnotify方法。

先看一个互斥锁解决不了的场景,假设两个智能聊天机器人(小米的小爱和天猫的天猫精灵)对话,

天猫精灵:小爱同学

小爱:在

天猫精灵:我们来对古诗吧

小爱:好啊

天猫精灵:我住长江头

小爱:不聊了,再见

假设小爱和天猫精灵分别是两个线程,先使用互斥锁来实现一下:

import threading

class XiaoAi(threading.Thread):
    def __init__(self, lock):
        super().__init__(name="小爱")
        self.lock = lock

    def run(self):
        self.lock.acquire()
        print("{} : 在".format(self.name))
        self.lock.release()

        self.lock.acquire()
        print("{} : 好啊".format(self.name))
        self.lock.release()

class TianMao(threading.Thread):
    def __init__(self, lock):
        super().__init__(name="天猫精灵")
        self.lock = lock

    def run(self):
        self.lock.acquire()
        print("{} : 小爱同学".format(self.name))
        self.lock.release()

        self.lock.acquire()
        print("{} : 我们来对古诗吧".format(self.name))
        self.lock.release()

if __name__ == "__main__":
    lock = threading.Lock()
    xiaoai = XiaoAi(lock)
    tianmao = TianMao(lock)

    tianmao.start()
    xiaoai.start()
    
# 运行结果如下:
# 天猫精灵 : 小爱同学
# 天猫精灵 : 我们来对古诗吧
# 小爱 : 在
# 小爱 : 好啊

可以看到,输出结果并不是预期的对话顺序,这是因为天猫精灵的线程说完“小爱同学”之后,cpu的控制权还没有交出去,继续获取了互斥锁,又执行了“我们来对古诗吧”,所以不能得到预期结果。

先自己想一下解决办法,理论上应该A线程在等待中,B线程在干活,干活完毕之后通知A线程活干完了,B线程进入等待,而A线程得到了通知之后,不再继续等待,开始干活,看完之后通知B线程,如此循环,直到结束。

比较粗糙的想法:假设有一个全局变量active_user,为0表示该A线程执行,1表示B线程执行,对于A线程,先实现wait方法:就是while循环判断是否active_user == 0(必须保证这个变量在两个线程中使用的是同一个),notify方法:将active_user赋值为1。对于B线程,实现方式相反。代码如下:

import threading

class XiaoAi(threading.Thread):
    def __init__(self, lock, active_user):
        super().__init__(name="小爱")
        self.lock = lock
        self.active_user = active_user

    def wait(self):
        while(1):
            self.lock.acquire()
            user = self.active_user[0]
            self.lock.release()
            if user == 1:
                break

    def notify(self):
        self.lock.acquire()
        self.active_user[0] = 0
        self.lock.release()

    def run(self):
        self.wait()
        print("{} : 在".format(self.name))
        self.notify()

        self.wait()
        print("{} : 好啊".format(self.name))
        self.notify()

class TianMao(threading.Thread):
    def __init__(self, lock, active_user):
        super().__init__(name="天猫精灵")
        self.lock = lock
        self.active_user = active_user

    def wait(self):
        while(1):
            self.lock.acquire()
            user = self.active_user[0]
            self.lock.release()
            if user == 0:
                break

    def notify(self):
        self.lock.acquire()
        self.active_user[0] = 1
        self.lock.release()


    def run(self):
        self.wait()
        print("{} : 小爱同学".format(self.name))
        self.notify()

        self.wait()
        print("{} : 我们来对古诗吧".format(self.name))
        self.notify()

if __name__ == "__main__":
    # 0表示天猫执行, 1表示小爱
    # 为了保证两个线程修改active_user之后,互相是可见的,所以传了一个List,而不是整数
    active_user = [0] 
    lock = threading.Lock()
    xiaoai = XiaoAi(lock, active_user)
    tianmao = TianMao(lock, active_user)

    tianmao.start()
    xiaoai.start()
# 运行结果如下:可得到预期结果
# 天猫精灵 : 小爱同学
# 天猫精灵 : 我们来对古诗吧
# 小爱 : 在
# 小爱 : 好啊

由上面的例子可知,由互斥锁是可以实现互相通知的需求的。但是上面的代码效率不高,一直在while循环中判断,还要自己维护一个全局变量,很麻烦,在复杂场景下不能胜任。于是python就给我们封装好了Condition类。

条件变量Condition

构造方法:

import threading
# 可传入一个互斥锁或者可重入锁
cond = threading.Condition()

实例方法:

acquire([timeout])/release(): 调用关联的锁的相应方法。 
wait([timeout]): 调用这个方法将使线程进入Condition的等待池等待通知,并释放锁。
    使用前线程必须已获得锁定,否则将抛出异常。 
notify(): 调用这个方法将从等待池挑选一个线程并通知,收到通知的线程将自动调用
    acquire()尝试获得锁定(进入锁定池);其他线程仍然在等待池中。调用这个方法不会
    释放锁定。使用前线程必须已获得锁定,否则将抛出异常。 
notifyAll(): 调用这个方法将通知等待池中所有的线程,这些线程都将进入锁定池
    尝试获得锁定。调用这个方法不会释放锁定。使用前线程必须已获得锁定,否则将抛出异常。

主要使用方法和前面自己实现的差不多,主要调用waitnotify方法,将上面的方法改写为使用条件变量:

import threading
class XiaoAi(threading.Thread):
    def __init__(self, cond):
        super().__init__(name="小爱")
        self.cond = cond

    def run(self):
        self.cond.acquire()

        self.cond.wait()
        print("{} : 在".format(self.name))
        self.cond.notify()

        self.cond.wait()
        print("{} : 好啊".format(self.name))
        self.cond.notify()

        self.cond.wait()
        print("{} : 不聊了,再见".format(self.name))
        self.cond.notify()

        self.cond.release()


class TianMao(threading.Thread):
    def __init__(self, cond):
        super().__init__(name="天猫精灵")
        self.cond = cond

    def run(self):
        self.cond.acquire()

        print("{} : 小爱同学".format(self.name))
        self.cond.notify()
        self.cond.wait()

        print("{} : 我们来对古诗吧".format(self.name))
        self.cond.notify()
        self.cond.wait()

        print("{} : 我住长江头".format(self.name))
        self.cond.notify()
        self.cond.wait()

        self.cond.release()

if __name__ == "__main__":
    cond = threading.Condition()
    xiaoai = XiaoAi(cond)
    tianmao = TianMao(cond)
    
    tianmao.start()
    xiaoai.start()

# 执行结果
# 天猫精灵 : 小爱同学

运行之后会发现天猫精灵说出了“小爱同学”之后就没有了响应,这就是在使用条件变量的时候需要注意的点。仔细观察主函数中的线程启动顺序,tianmao先启动了,假设tianmao已经启动完成,并打印了“小爱同学”,执行notify之后,xiaoai才刚刚启动,成功执行完self.cond.acquire()之后,开始执行wait语句,但此时会陷入死循环!原因是 wait()只能被notify()唤醒,而notify()已经被另一个线程执行过了,注意:只能是一个线程执行过了wait(),在被阻塞过程中,另一个线程执行了notify()才可以。不然就像上面一下陷入死循环。因此,需要将上面的main方法改写:

if __name__ == "__main__":
    cond = threading.Condition()
    xiaoai = XiaoAi(cond)
    tianmao = TianMao(cond)
    
    # 启动顺序很重要
    xiaoai.start()
    tianmao.start()
    
# 执行结果
# 天猫精灵 : 小爱同学
# 小爱 : 在
# 天猫精灵 : 我们来对古诗吧
# 小爱 : 好啊
# 天猫精灵 : 我住长江头
# 小爱 : 不聊了,再见

可以看到,改完启动顺序运行结果对了,其实这样并不能完全保证xiaoai会先启动,如果xiaoairun方法中有个1s延时,就算先执行xiaoai.start()tianmao也会先执行notify(),具体这种情况下应该怎么办,暂时还不清楚。。。

源码分析

大致实现思路描述:Codition有两层锁,一把底层锁会在进入wait方法的时候释放,离开wait方法的时候再次获取,上层锁会在每次调用wait时分配一个新的锁,并放入condition的等待队列中,而notify负责释放这个锁。可能理解起来不是很直观,直接看源码:

init方法

init方法.png

先看源码Condition类的说明,这是一个实现了条件变量的类,允许一个或多个线程等待其他线程的通知。在__init__方法中,有一个参数lock,默认为None。有两种用法:

  1. 如果lock是非None,也就是说用户想自己设置参数,必须传递LockRLock对象。
  2. 如果lockNone__init__方法中默认使用可重入锁RLock

这个lock作为底层维护的锁 underlying lock,条件变量实现的关键。

__init__函数中另一个比较重要的步骤是,建立了一个双端队列,存储所有在等待中的锁,self._waiters = _deque()

wait方法

wait方法.png

这里有一个疑惑,第二次的waiter.acquire()没有找到对应的release方法?虽然感觉不会影响结果,一种可能是在从队列中移除这个锁的时候尝试了释放这个锁。

notify方法

notify方法.png

简单总结,A线程阻塞在wait方法时,只有B线程执行了notifywait的时候(也有可能B线程执行了notify,而C线程执行了wait),A线程的wait方法才能执行完毕,而此时B线程会阻塞在wait方法中。

总结

  • 条件变量提供了对复杂线程同步问题的支持。
  • 条件变量也是使用互斥锁实现的,主要是两层锁结构。

参考

  1. python 线程之 Condition
  2. Python3高级编程和异步IO并发编程
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,313评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,369评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,916评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,333评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,425评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,481评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,491评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,268评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,719评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,004评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,179评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,832评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,510评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,153评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,402评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,045评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,071评论 2 352

推荐阅读更多精彩内容

  • 接着上节 atomic,本节主要介绍condition_varible的内容,练习代码地址。本文参考http://...
    jorion阅读 8,479评论 0 7
  • 线程安全 当多个线程访问一个对象时,如果不用考虑这些线程在运行时环境下的调度和交替执行,也不需要进行额外的同步,或...
    闽越布衣阅读 761评论 0 6
  • Java-Review-Note——4.多线程 标签: JavaStudy PS:本来是分开三篇的,后来想想还是整...
    coder_pig阅读 1,643评论 2 17
  • 【幸福女孩 糖糖 一年级 坚持原创分享第163天 2018.2.12 星期一】 今天我在家做了一个玩偶,用...
    何亚珂阅读 212评论 0 0
  • 在同一个班里,热热闹闹的,上课老师不管我们。却有一天,老师和我换了一个同桌,在那个时候我和他完全不熟。但是,时间一...
    萌杀阅读 290评论 0 0