1. 多线程-threading
Python的标准库提供了两个模块:_thread和threading,_thread是低级模块,threading是高级模块,对_thread进行了封装。绝大多数情况下,我们只需要使用threading这个高级模块。
启动一个线程就是把一个函数传入并创建Thread实例,然后调用start()开始执行:
- 单线程执行
import time
def saySorry():
print("亲爱的,我错了,我能吃饭了吗?%s"%time.ctime())
time.sleep(1)
if __name__ == "__main__":
for i in range(5):
saySorry()
- 运行结果
>>>
亲爱的,我错了,我能吃饭了吗?Mon Jun 12 17:08:33 2017
亲爱的,我错了,我能吃饭了吗?Mon Jun 12 17:08:34 2017
亲爱的,我错了,我能吃饭了吗?Mon Jun 12 17:08:35 2017
亲爱的,我错了,我能吃饭了吗?Mon Jun 12 17:08:36 2017
亲爱的,我错了,我能吃饭了吗?Mon Jun 12 17:08:37 2017
- 多线程执行
import time
import threading
def saySorry():
print("亲爱的,我错了,我能吃饭了吗? name=%s, %s"%(threading.current_thread().name,time.ctime()))
time.sleep(1)
if __name__ == "__main__":
print(threading.current_thread().name)
for i in range(5):
t1 = threading.Thread(target=saySorry)
t1.start()
- 运行结果
>>>
MainThread
亲爱的,我错了,我能吃饭了吗? name=Thread-1, Mon Jun 12 19:15:02 2017
亲爱的,我错了,我能吃饭了吗? name=Thread-2, Mon Jun 12 19:15:02 2017
亲爱的,我错了,我能吃饭了吗? name=Thread-3, Mon Jun 12 19:15:02 2017
亲爱的,我错了,我能吃饭了吗? name=Thread-4, Mon Jun 12 19:15:02 2017
亲爱的,我错了,我能吃饭了吗? name=Thread-5, Mon Jun 12 19:15:02 2017
#多线程的时候可以看出来是同时执行的
说明
- 可以明显看出使用了多线程并发的操作,花费时间要短很多
- 创建好的线程,需要调用start()方法来启动
由于任何进程默认就会启动一个线程,我们把该线程称为主线程,主线程又可以启动新的线程,Python的threading模块有个current_thread()函数,它永远返回当前线程的实例。主线程实例的名字叫MainThread,子线程的名字在创建时指定,这里我们没有指定。名字仅仅在打印时用来显示,完全没有其他意义,如果不起名字Python就自动给线程命名为Thread-1,Thread-2……
主线程会等待所有的子线程结束后才结束
查看线程数量
length = len(threading.enumerate())
print('当前运行的线程数为:%d' % length)
2. threading注意点
1.2.2.1线程执行代码的封装
通过上一小节,能够看出,通过使用threading模块能完成多任务的程序开发,为了让每个线程的封装性更完美,所以使用threading模块时,往往会定义一个新的子类class,只要继承threading.Thread就可以了,然后重写run方法
示例如下:
import threading
import time
class MyThread(threading.Thread):
def run(self):
for i in range(3):
time.sleep(1)
msg = "I'm " + self.name + ' @ ' + str(i) # name属性中保存的是当前线程的名字
print(msg)
if __name__ == '__main__':
t = MyThread()
t.start()
说明
python的threading.Thread类有一个run方法,用于定义线程的功能函数,可以在自己的线程类中覆盖该方法。而创建自己的线程实例后,通过Thread类的start方法,可以启动该线程,交给python虚拟机进行调度,当该线程获得执行的机会时,就会调用run方法执行线程。
- 线程的执行顺序
import threading
import time
class MyThread(threading.Thread):
def run(self):
for i in range(3):
time.sleep(1)
msg = "I'm " + self.name + ' @ ' + str(i)
print(msg)
def test():
for i in range(5):
t = MyThread()
t.start()
if __name__ == '__main__':
test()
- 运行结果(运行的结果可能不一样,但是大体是一致的)
I'm Thread-2 @ 0
I'm Thread-1 @ 0
I'm Thread-3 @ 0
I'm Thread-4 @ 0
I'm Thread-5 @ 0
I'm Thread-1 @ 1
I'm Thread-2 @ 1
I'm Thread-3 @ 1
I'm Thread-4 @ 1
I'm Thread-5 @ 1
I'm Thread-2 @ 2
I'm Thread-1 @ 2
I'm Thread-3 @ 2
I'm Thread-4 @ 2
I'm Thread-5 @ 2
说明
从代码和执行结果我们可以看出,多线程程序的执行顺序是不确定的。当执行到sleep语句时,线程将被阻塞(Blocked),到sleep结束后,线程进入就绪(Runnable)状态,等待调度。而线程调度将自行选择一个线程执行。上面的代码中只能保证每个线程都运行完整个run函数,但是线程的启动顺序、run函数中每次循环的执行顺序都不能确定。
总结
- 每个线程一定会有一个名字,尽管上面的例子中没有指定线程对象的name,但是python会自动为线程指定一个名字。
- 当线程的run()方法结束时该线程完成。
- 无法控制线程调度程序,但可以通过别的方式来影响线程调度的方式。
- 线程的几种状态
3. 多线程-共享全局变量
- 在一个进程内的所有线程共享全局变量,能够在不适用其他方式的前提下完成多线程之间的数据共享(这点要比多进程要好)
- 缺点就是,线程是对全局变量随意遂改可能造成多线程之间对全局变量的混乱(即线程非安全)
4. 进程VS线程
定义
- 进程是系统进行资源分配和调度的一个独立单位.
- 线程是进程的一个实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位.线程自己基本上不拥有系统资源,只拥有一点在运行中必不可少的资源(如程序计数器,一组寄存器和栈),但是它可与同属一个进程的其他的线程共享进程所拥有的全部资源.
区别
- 一个程序至少有一个进程,一个进程至少有一个线程.
- 线程的划分尺度小于进程(资源比进程少),使得多线程程序的并发性高。
- 进程在执行过程中拥有独立的内存单元,而多个线程共享内存,从而极大地提高了程序的运行效率
- 线程不能够独立执行,必须依存在进程中
优缺点
- 线程和进程在使用上各有优缺点:线程执行开销小,但不利于资源的管理和保护;而进程正相反。
5. 同步的概念
- 什么是同步
同步就是协同步调,按预定的先后次序进行运行。如:你说完,我再说。
"同"字从字面上容易理解为一起动作
其实不是,"同"字应是指协同、协助、互相配合。
如进程、线程同步,可理解为进程或线程A和B一块配合,A执行到一定程度时要依靠B的某个结果,于是停下来,示意B运行;B依言执行,再将结果给A;A再继续操作。 - 多线程的优势在于可以同时运行多个任务(至少感觉起来是这样)。但是当线程需要共享数据时,可能存在数据不同步的问题。
考虑这样一种情况:一个列表里所有元素都是0,线程"set"从后向前把所有元素改成1,而线程"print"负责从前往后读取列表并打印。
那么,可能线程"set"开始改的时候,线程"print"便来打印列表了,输出就成了一半0一半1,这就是数据的不同步。为了避免这种情况,引入了锁的概念。
6. 互斥锁
threading模块中定义了Lock类,可以方便的处理锁定:
import threading
#创建锁
myLock = threading.Lock()
print(myLock)
print('1...')
#锁住,如果此锁,已经锁了,如果再锁,会阻塞,直到开锁了,才能再锁
#myLock.acquire()
myLock.acquire()
print('2...')
#开锁,如果锁住了,可以开锁。如果没锁,直接开,报错
myLock.release()
print('3...')
myLock.acquire()
print('4...')
- 运行结果
>>>
<unlocked _thread.lock object at 0x0000000000AEBC60>
1...
2...
3...
4...
其中,锁定方法acquire可以有一个blocking参数。
- 如果设定blocking为True,则当前线程会堵塞,直到获取到这个锁为止(如果没有指定,那么默认为True)
- 如果设定blocking为False,则当前线程不会堵塞
例子
import threading
import time
num = 0
myLock=threading.Lock()
def fun1():
global num
for i in range(1000000):
myLock.acquire()
num +=1
myLock.release()
print('fun1...num=%d,%s'%(num,time.ctime()))
def fun2():
global num
for i in range(10000000):
myLock.acquire()
num += 1
myLock.release()
#time.sleep(2)
print('fun2...num=%d,%s'%(num,time.ctime()))
def main():
t1 = threading.Thread(target=fun1)
t2 = threading.Thread(target=fun2)
t1.start()
#time.sleep(2)
t2.start()
print('num=%d,%s'%(num,time.ctime()))
if __name__ == '__main__':
main()
- 运行结果
num=22642,Mon Jun 12 20:29:23 2017
fun1...num=2060762,Mon Jun 12 20:29:25 2017
fun2...num=11000000,Mon Jun 12 20:29:29 2017
在打印fun1的时候,cpu权限切换到fun2,此时num已经在疯狂运算了,所以打印的时候不是1000000.如果没有互斥锁的话,最后这个fun2打印出来的不会是这个值。
例子2:简单的售票,同步锁完成售票
import threading
import time
import os
def doChore(): # 作为间隔 每次调用间隔0.5s
time.sleep(0.5)
def booth(tid):
global i
global lock
while True:
lock.acquire() # 得到一个锁,锁定
if i != 0:
i = i - 1 # 售票 售出一张减少一张
print(tid, ':now left:', i) # 剩下的票数
doChore()
else:
print("Thread_id", tid, " No more tickets")
os._exit(0) # 票售完 退出程序
lock.release() # 释放锁
doChore()
#全局变量
i = 15 # 初始化票数
lock = threading.Lock() # 创建锁
def main():
# 总共设置了3个线程
for k in range(3):
# 创建线程; Python使用threading.Thread对象来代表线程
new_thread = threading.Thread(target=booth, args=(k,))
# 调用start()方法启动线程
new_thread.start()
if __name__ == '__main__':
main()
- 运行结果
0 :now left: 14
1 :now left: 13
2 :now left: 12
0 :now left: 11
1 :now left: 10
2 :now left: 9
0 :now left: 8
1 :now left: 7
0 :now left: 6
2 :now left: 5
1 :now left: 4
0 :now left: 3
2 :now left: 2
1 :now left: 1
0 :now left: 0
Thread_id 2 No more tickets
总结
锁的好处:
- 确保了某段关键代码只能由一个线程从头到尾完整地执行
锁的坏处: - 阻止了多线程并发执行,包含锁的某段代码实际上只能以单线程模式执行,效率就大大地下降了
- 由于可以存在多个锁,不同的线程持有不同的锁,并试图获取对方持有的锁时,可能会造成死锁
7. 多线程-非共享数据
对于全局变量,在多线程中要格外小心,否则容易造成数据错乱的情况发生
在多线程开发中,全局变量是多个线程都共享的数据,而局部变量等是各自线程的,是非共享的
8. 死锁
- 在线程间共享多个资源的时候,如果两个线程分别占有一部分资源并且同时等待对方的资源,就会造成死锁。
- 尽管死锁很少发生,但一旦发生就会造成应用的停止响应。
避免死锁
- 程序设计时要尽量避免(银行家算法)
- 添加超时时间等
9. 同步应用
from threading import Thread, Lock
from time import sleep
'''
等待锁的打开:阻塞-唤醒 机制
还有一种实现形式:轮循,效率低
'''
class Task1(Thread):
def run(self):
while True:
if lock1.acquire():
print("------Task 1 -----")
sleep(0.5)
lock2.release()
class Task2(Thread):
def run(self):
while True:
if lock2.acquire():
print("------Task 2 -----")
sleep(0.5)
lock3.release()
class Task3(Thread):
def run(self):
while True:
if lock3.acquire():
print("------Task 3 -----")
sleep(0.5)
lock1.release()
# 使用Lock创建出的锁默认没有“锁上”
lock1 = Lock()
# 创建另外一把锁,并且“锁上”
lock2 = Lock()
lock2.acquire()
# 创建另外一把锁,并且“锁上”
lock3 = Lock()
lock3.acquire()
if __name__ == '__main__':
t1 = Task1()
t2 = Task2()
t3 = Task3()
t1.start()
t2.start()
t3.start()
- 运行结果
------Task 1 -----
------Task 2 -----
------Task 3 -----
------Task 1 -----
------Task 2 -----
------Task 3 -----
------Task 1 -----
------Task 2 -----
------Task 3 -----
------Task 1 -----
------Task 2 -----
------Task 3 -----
------Task 1 -----
------Task 2 -----
------Task 3 -----
...省略...
总结
- 可以使用互斥锁完成多个任务,有序的进程工作,这就是线程的同步
10. 生产者与消费者模式
- 队列
先进先出 - 栈
先进后出
Python的Queue模块中提供了同步的、线程安全的队列类,包括FIFO(先入先出)队列Queue,LIFO(后入先出)队列LifoQueue,和优先级队列PriorityQueue。这些队列都实现了锁原语(可以理解为原子操作,即要么不做,要么就做完),能够在多线程中直接使用。可以使用队列来实现线程间的同步。
用FIFO队列实现上述生产者与消费者问题的代码如下:
'''
队列:
1、进程之间的通信: q = multiprocessing.Queue()
2、进程池之间的通信: q = multiprocessing.Manager().Queue()
3、线程之间的通信: q = queue.Queue()
'''
import threading
import time
# python2中
# from Queue import Queue
# python3中
from queue import Queue
class Producer(threading.Thread):
def run(self):
global queue
count = 0
while True:
if queue.qsize() < 1000:
for i in range(100):
count = count + 1
msg = '生成产品' + str(count)
queue.put(msg)
print(msg)
time.sleep(1)
class Consumer(threading.Thread):
def run(self):
global queue
while True:
if queue.qsize() > 100:
for i in range(3):
msg = self.name + '消费了 ' + queue.get()
print(msg)
time.sleep(1)
#全局变量
queue = Queue()
if __name__ == '__main__':
for i in range(500):
queue.put('初始产品' + str(i))
for i in range(2):
p = Producer()
p.start()
for i in range(5):
c = Consumer()
c.start()
Queue的说明
1.对于Queue,在多线程通信之间扮演重要的角色
2.添加数据到队列中,使用put()方法
3.从队列中取数据,使用get()方法
4.判断队列中是否还有数据,使用qsize()方法
11. ThreadLocal
在多线程环境下,每个线程都有自己的数据。一个线程使用自己的局部变量比使用全局变量好,因为局部变量只有线程自己能看见,不会影响其他线程,而全局变量的修改必须加锁。
- 使用ThreadLocal的方法
ThreadLocal应运而生,不用查找dict,ThreadLocal帮你自动做这件事:
'''
threadlocal就有俩功能:
1、将各自的局部变量绑定到各自的线程中
2、局部变量可以传递了,而且并没有变成形参
'''
import threading
# 创建全局ThreadLocal对象:
local_school = threading.local()
def process_student():
# 获取当前线程关联的student:
std = local_school.student
print('Hello, %s (in %s)' % (std, threading.current_thread().name))
def process_thread(name):
# 绑定ThreadLocal的student:
local_school.student = name
process_student()
t1 = threading.Thread(target=process_thread, args=('yongGe',), name='Thread-A')
t2 = threading.Thread(target=process_thread, args=('老王',), name='Thread-B')
t1.start()
t2.start()
- 运行结果
Hello, yongGe (in Thread-A)
Hello, 老王 (in Thread-B)
小结
一个ThreadLocal变量虽然是全局变量,但每个线程都只能读写自己线程的独立副本,互不干扰。ThreadLocal解决了参数在一个线程中各个函数之间互相传递的问题
12. 异步
- 同步调用就是你 喊 你朋友吃饭 ,你朋友在忙 ,你就一直在那等,等你朋友忙完了 ,你们一起去
- 异步调用就是你 喊 你朋友吃饭 ,你朋友说知道了 ,待会忙完去找你 ,你就去做别的了。
- 代码
from multiprocessing import Pool
import time
import os
def test():
print("---进程池中的进程---pid=%d,ppid=%d--" % (os.getpid(), os.getppid()))
for i in range(3):
print("----%d---" % i)
time.sleep(1)
return "老王"
def test2(args):
print('1...')
time.sleep(10)
print("---callback func--pid=%d" % os.getpid())
print("---callback func--args=%s" % args)
print('2...')
if __name__ == '__main__':
pool = Pool(3)
#callback表示前面的func方法执行完,再执行callback,并且可以获取func的返回值作为callback的参数
pool.apply_async(func=test, callback=test2)
#pool.apply_async(func=test)
#模拟主进程在做任务
time.sleep(5)
print("----主进程-pid=%d.....服务器是不关闭的----" % os.getpid())
- 运行结果
---进程池中的进程---pid=8808,ppid=8072--
----0---
----1---
----2---
1...
----主进程-pid=8072.....服务器是不关闭的----
---callback func--pid=8072
---callback func--args=老王
2...