python--线程rlock

源码: tests/rlock.py

# -.- coding:utf-8 -.-
import time
import threading
import unittest
from .utils import get_lock_count


class TestRLock(unittest.TestCase):

    def test_rlock_can_acquire_multi_times(self):
        lock = threading.RLock()
        lock.acquire()
        lock.acquire()
        lock.acquire()
        lock_count = get_lock_count(lock)
        self.assertEqual(lock_count, 3)

    def test_rlock_can_not_acquire_with_multi_threads(self):
        lock = threading.RLock()
        lock.acquire()
        lifetime = {"status": "init"}

        def acquire_by_other_thread(the_lock, the_lifetime):
            the_lock.acquire()
            the_lifetime.update({"status": "Done"})

        t = threading.Thread(target=acquire_by_other_thread,
                             args=(lock, lifetime))
        t.start()

        count = 1
        timeout = 5
        while lifetime["status"] != "Done":
            if count > timeout:
                break
            time.sleep(1)
            count += 1

        self.assertNotEqual(lifetime["status"], "Done")

        # 这里加一个release是为了让 t 线程顺利退出.
        # 因为 lock.release() 之后, t 线程中的acquire() 就不会一直堵着.
        lock.release()

 
 

测试: tests/main.py

import unittest


TEST_MODULE = [
    "ln_threading.tests.rlock",
]


if __name__ == '__main__':
    suite = unittest.defaultTestLoader.loadTestsFromNames(TEST_MODULE)
    runner = unittest.TextTestRunner(verbosity=2)
    runner.run(suite)

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容