源码: tests/rlock.py
# -.- coding:utf-8 -.-
import time
import threading
import unittest
from .utils import get_lock_count
class TestRLock(unittest.TestCase):
def test_rlock_can_acquire_multi_times(self):
lock = threading.RLock()
lock.acquire()
lock.acquire()
lock.acquire()
lock_count = get_lock_count(lock)
self.assertEqual(lock_count, 3)
def test_rlock_can_not_acquire_with_multi_threads(self):
lock = threading.RLock()
lock.acquire()
lifetime = {"status": "init"}
def acquire_by_other_thread(the_lock, the_lifetime):
the_lock.acquire()
the_lifetime.update({"status": "Done"})
t = threading.Thread(target=acquire_by_other_thread,
args=(lock, lifetime))
t.start()
count = 1
timeout = 5
while lifetime["status"] != "Done":
if count > timeout:
break
time.sleep(1)
count += 1
self.assertNotEqual(lifetime["status"], "Done")
# 这里加一个release是为了让 t 线程顺利退出.
# 因为 lock.release() 之后, t 线程中的acquire() 就不会一直堵着.
lock.release()
测试: tests/main.py
import unittest
TEST_MODULE = [
"ln_threading.tests.rlock",
]
if __name__ == '__main__':
suite = unittest.defaultTestLoader.loadTestsFromNames(TEST_MODULE)
runner = unittest.TextTestRunner(verbosity=2)
runner.run(suite)