源码: tests/lock.py
# -.- coding:utf-8 -.-
import time
import threading
import unittest
py3 = hasattr(threading, "main_thread") is True
py2 = not py3
class TestLock(unittest.TestCase):
def test_init_lock_status_is_false(self):
lock = threading.Lock()
self.assertFalse(lock.locked())
@unittest.skipIf(py2, u"锁上锁python2不支持timeout功能!")
def test_locked_can_not_lock_again_py3(self):
lock = threading.Lock()
lock.acquire()
self.assertRaises(excClass=threading.ThreadError,
callableObj=lock.acquire, **{"timeout": 5})
def test_a_thread_lock_another_thread_release(self):
"""
A 线程加锁后, B线程可以强行解锁.
:return:
"""
lock = threading.Lock()
lock.acquire()
lifetime = {"status": "init"}
def release_lock(the_lock, the_lifetime):
the_lock.release()
the_lifetime.update({"status": "Done"})
t = threading.Thread(target=release_lock, args=(lock, lifetime))
t.start()
while lifetime.get("status") != "Done":
print("debug:")
time.sleep(1)
self.assertFalse(lock.locked()) # 无锁状态
测试: tests/main.py
import unittest
TEST_MODULE = [
"ln_threading.tests.lock",
]
if __name__ == '__main__':
suite = unittest.defaultTestLoader.loadTestsFromNames(TEST_MODULE)
runner = unittest.TextTestRunner(verbosity=2)
runner.run(suite)