python 线程安全queue、自定义dict实现线程安全类型以及协程安全的讨论

线程安全的实现

线程之间共享数据要注意数据是否是线程安全的,使用锁或者Queue。内置数据类型包括list、dict等等是线程安全的(对其的操作是原子性的),但是在访问时由于可能别的线程对其进行了操作导致得不到预期的结果。

线程1:

size=1

print(size)

线程2:

size=2

print(size)

最后打印的size结果不一定是该线程内所设置的,有可能线程2会打印出1。
即子操作是没有问题的,但是混合操作就不行了。

通过继承Queue来实现线程安全

使用Queue来解决线程之间的数据共享问题。
可以MyQueue(Queue),重构_init()、_get()、_put()等方法,这些方法是线程安全的,即里面可以混合写和读等多个操作作为一个原子操作,源码中所标注的那样:These will only be called with appropriate locks held。


clipboard.png

由于_put和_get是保护属性,按约定应该访问get和put方法,但Queue源码中的get和put方法本身包含了对empty和full的阻塞操作,即empty就阻塞等待put中的notify,full也阻塞等待get中的notify,如果实际操作中不需要这些,那么也需要重写get和put方法来确保代码不阻塞等待。

在Myqueue中可以定义其他数据结构和类型,其本意就是在操作时候加锁。但是要注意可能真正线程安全的是_put和_get方法而不是put和get方法。

自定义dict实现线程安全类型

通过研究Queue源码发现可以使用锁,使其他数据结构和操作变成线程安全的。

class DictWithLock():
    def __init__(self):
        self.mutex = threading.Lock()
        self.__dic = {}

    def setdata(self, name, value):
        with self.mutex:
            self.__dic[name] = value
            return self.getdata(name)

    def getdata(self, name):
        with self.mutex:
            if name in self.__dic and self.__dic[name] != None:
                return self.__dic[name]
            else:
                return None

    def setdefault(self, key, value):
        with self.mutex:
            return self.__dic.setdefault(key, value)

    def append(self, key, value):
        with self.mutex:
            return self.__dic.setdefault(key, []).append(value)

    def remove(self, key, value):
        with self.mutex:
            if value in self.__dic[key]:
                self.__dic[key].remove(value)
            return self.__dic


    def __getitem__(self, name):
        return self.getdata(name)

    def __setitem__(self, name, value):
        return self.setdata(name, value)

    def __str__(self):
        return str(self.__dic)

在使用时,每个实例都拥有一个锁threading.Lock(),在多线程对一个实例(注意不是一个类的多个实例)操作时会去获取锁来进行操作。with的作用是上下文管理器,相当于获取资源-》执行-》释放。如果想要实现其他操作,自定义函数加入 with self.mutex:即可,在这个语句下的所有操作都是原子性的,其实如果是dict的单操作不加锁也可以,因为他本身就是原子性的,如pop。

死锁问题

使用锁的时候,如果一个协程没有释放锁而切换到其他协程,后个协程阻塞等待请求锁就会造成死锁问题。如下:

class A:

    def __init__(self):
        self.a = 1
        self.mutex = threading.Lock()

    def run(self, index):
        while True:
            with self.mutex:
                self.a += 1
                gevent.sleep(0.1)
                self.a -= 1
                print("{}:{}".format(index, self.a))
a = A()

gevent.spawn(a.run, 0)
gevent.spawn(a.run, 1)
gevent.spawn(a.run, 2)
gevent.spawn(a.run, 3)

这种情况下使用monkey.patch_all()可以避免后一个协程一直在阻塞请求锁,但是也只能切换到可以运行的协程0,相当于单个协程在运行。如输出:

0:1
0:1
0:1
...

在while中加入手动切换协程的gevent.sleep(0.1),则可以切换到其他协程:

        while True:
            with self.mutex:
                self.a += 1
                gevent.sleep(0.1)
                self.a -= 1
                print("{}:{}".format(index, self.a))
            gevent.sleep(0.1)

但输出是这样的,协程2和3没有输出,猜测是因为协程0在执行之后休息0.1s释放锁,切换到协程1,协程1在锁内用了0.1s然后切换时候又切换到了0而不是123,这个应该跟系统切换顺序有关。

0:1
1:1
0:1
1:1
0:1
...

将锁内外的时间调整不一致试试:

    def run(self, index):
        while True:
            with self.mutex:
                self.a += 1
                gevent.sleep(0.1)
                self.a -= 1
                # gevent.sleep(0.1)
                print("{}:{}".format(index, self.a))
            gevent.sleep(1)

这样程序就可以按照想要的形式输出了。

0:1
1:1
2:1
3:1
0:1
...

最后我们来看一下不加锁的效果:

        while True:
            # with self.mutex:
            self.a += 1
            gevent.sleep(0.1)
            self.a -= 1
            print("{}:{}".format(index, self.a))
            gevent.sleep(1)

输出:

3:4
2:3
1:2
0:1
2:4
0:3
1:2
3:1
0:4

有意思的是如果将两个sleep都换成0.1,那么可以看到切换就是有顺序的了。

3:4
2:3
1:2
0:1
3:4
2:3
1:2
0:1
3:4

系统具体的切换顺序可以后续研究一下,但是实际编码中代码的精确执行时间并不像sleep直接设定这样简单,所以不建议依赖sleep来确定顺序,更不应该在锁内执行一些可能切换协程的操作。最好的做法就是不在协程中使用锁,尽量不把协程和线程混合用,毕竟这两种都不算真正意义上的并行,有时候协程的效率会高于线程。
只是目前尚不知道如果有IO操作当前线程被系统切换再回来之后还是不是当前协程了,就假设是吧,毕竟不是当前协程的话,协程也必须加锁了。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容