线程中的同步#
关于同步异步的介绍,在异步那篇文章里我有自己认识的一个大体介绍,现在在这里主要说的是对于多个线程如何做到同步,即控制线程执行的先后顺序。
threading.Condition
在threading模块中Condition(),它起到的作用呢类似于判断条件。
示例代码如下:
#coding=utf-8
import threading
import time
class A(threading.Thread):
def run(self):
while True:
if con.acquire():
print('--------A-1----')
con.wait()
print('--------A-2----')
con.release()
time.sleep(1)
class B(threading.Thread):
def run(self):
while True:
if con.acquire():
print('hello world')
con.notify()
con.release()
time.sleep(1)
con = threading.Condition()
if __name__ == '__main__':
a = A()
a.start()
b = B()
b.start()
其中threading.Condition()创建的对象con,具有acquire()、wait()、notify()
、release()方法。类似的我们好像在threading的Lock中用到过,其实在这里,我觉得Condition()就是一个高级的Lock()。我们在这里读一下代码,首先在执行a、b两个线程的时候,由操作系统来决定执行哪个线程(操作系统对于线程的执行顺序有自己的一套算法,如时间片轮转之类的算法),假设先执行a,con.acquire()即上了一把锁,b就无法进行执行,当a执行到con.wait()的时候会卡住,这时候b会忽略之前的那把锁,然后继续执行b的内容,直到执行到con.notify()的时会告诉所有con对象中的wait()的随即一个,可以继续往下执行了,这样在wait的地方就可以继续往下执行了。另外Condition()种还有一个notifyALL()方法,即通知所有con对象中的wait()
消费者与生产者的两种解决思路#
1)条件判断
根据上文中的条件判断方法,我们可以实现消费者与生产者这种模式的代码
#coding=utf-8
import threading
import time
class Producer(threading.Thread):
def run(self):
global count
while True:
if con.acquire():
if count > 1000:
con.wait()
else:
count += 100
print('生产者生产了100个产品,现在还剩%d个产品'%count)
con.notify()
con.release()
time.sleep(0.5)
class Consumer(threading.Thread):
def run(self):
global count
while True:
if con.acquire():
if count < 100:
con.wait()
else:
count -= 3
print('消费者消费了3个产品,现在还剩%d个产品'%count)
con.notify()
con.release()
time.sleep(0.1)
if __name__ == '__main__':
count = 500
con = threading.Condition()
# for i in range(5):
# p = Producer()
# p.start()
# for i in range(3):
# c = Consumer()
# c.start()
p = Producer()
p.start()
c = Consumer()
c.start()
2)队列
除了之前学过的multiprocessing中的Queue、multiprocessing中的Manager().Queue()来处理Pool中的Queue。接下来介绍的这种事在Queue模块中的Queue(),它用来解决的是线程间同步的问题,而且他更能够能好的记录执行内容的记录(他也有put、get方法,而且这两种方法在使用的时候,是也有阻塞功能的)。
示例代码如下:
#coding=utf-8
import threading,time
from Queue import Queue
class Producer(threading.Thread):
def run(self):
global queue
count = 0
while True:
if queue.qsize() < 1000:
for i in range(100):
count += 1
msg = '生成产品'+str(count)
queue.put(msg)
print(msg)
time.sleep(1)
class Consumer(threading.Thread):
def run(self):
global queue
while True:
if queue.qsize() > 100:
for i in range(3):
msg = self.name + '消费了 '+queue.get()
print(msg)
time.sleep(1)
if __name__ == '__main__':
queue = Queue()
for i in range(500):
queue.put('初始产品'+str(i))
for i in range(2):
p = Producer()
p.start()
for i in range(5):
c = Consumer()
c.start()