Num01-->多线程threading
Python中建议使用threading模块,而不要使用thread模块。原因如下:
1,Python中threading模块对thread进行了一些包装,可以更加方便的使用。
2,Python中threading模块能确保重要的子线程在进程退出前结束。
3,Python中thread模块,当主线程技术,同一主线程下的其他所有子线程都被强制退出。
4,Python中thread模块,不支持守护(daemon)线程。
Test01-->多线程执行
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2017/4/28 14:05
# @Author : xiaoke
import threading
import time
def say():
print("我是子线程")
time.sleep(1)
if __name__ == "__main__":
print("我是主线程")
# 创建三个子线程
for i in range(3):
t = threading.Thread(target=say)
# 启动线程
t.start()
# 结果如下:
# 我是主线程
# 我是子线程
# 我是子线程
# 我是子线程
Test02-->主线程会等待所有子线程结束后,再结束
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2017/4/28 14:05
# @Author : xiaoke
import threading
from time import sleep, ctime
def sing():
for i in range(3):
print("正在唱歌...%d" % i)
sleep(1)
print("唱歌子线程结束")
def dance():
for i in range(3):
print("正在跳舞...%d" % i)
sleep(1)
print("跳舞子线程结束")
if __name__ == '__main__':
print('主线程---开始时间---:%s' % ctime())
t1 = threading.Thread(target=sing)
t2 = threading.Thread(target=dance)
t1.start()
t2.start()
# 屏蔽以下代码,试试看,程序是否会立马结束?
# t1.join()
# t2.join()
print('主线程---结束时间---:%s' % ctime())
# 没有屏蔽join()函数的结果如下:
# 主线程---开始时间---:Fri Apr 28 14:19:29 2017
# 正在唱歌...0
# 正在跳舞...0
# 正在唱歌...1
# 正在跳舞...1
# 正在跳舞...2
# 正在唱歌...2
# 跳舞子线程结束
# 唱歌子线程结束
# 主线程---结束时间---:Fri Apr 28 14:19:32 2017
# 屏蔽join()函数,的结果如下:
# 主线程---开始时间---:Fri Apr 28 14:14:00 2017
# 正在唱歌...0
# 正在跳舞...0
# 主线程---结束时间---:Fri Apr 28 14:14:00 2017
# 正在跳舞...1
# 正在唱歌...1
# 正在跳舞...2
# 正在唱歌...2
# 唱歌子线程结束
# 跳舞子线程结束
以上代码说明,子线程在启动后,调用join()函数,是等待所有的子线程结束后,主线程才结束。否则主线程,很快就结束了。
Test03-->查看线程数量
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : xiaoke
import threading
from time import sleep, ctime
import gc
def sing():
for i in range(3):
print("正在唱歌...%d" % i)
sleep(1)
print("唱歌子线程结束")
def dance():
for i in range(3):
print("正在跳舞...%d" % i)
sleep(1)
print("跳舞子线程结束")
if __name__ == '__main__':
print('---开始时间---:%s' % ctime())
t1 = threading.Thread(target=sing)
t2 = threading.Thread(target=dance)
t1.start()
t2.start()
while True:
# enumerate()函数的意思:返回一个列表,存放当前活着的线程
length = len(threading.enumerate())
print('当前运行的线程数为:%d' % length)
if length <= 1:
break
sleep(1)
# t1.join()
# t2.join()
print('---结束时间---:%s' % ctime())
# 手动垃圾回收
#gc.collect()
print('当前还活着的线程是:%s' % threading.current_thread().name)
# 結果如下:
# ---开始时间---:Fri Apr 28 14:44:50 2017
# 正在唱歌...0
# 正在跳舞...0
# 当前运行的线程数为:3
# 正在唱歌...1
# 正在跳舞...1
# 当前运行的线程数为:3
# 正在唱歌...2
# 当前运行的线程数为:3
# 正在跳舞...2
# 唱歌子线程结束
# 当前运行的线程数为:2
# 跳舞子线程结束
# 当前运行的线程数为:1
# ---结束时间---:Fri Apr 28 14:44:54 2017
# 当前还活着的线程是:MainThread
以上代码加以说明:在程序的最后,还剩下主线程还在存活,是因为一个程序至少要有一个主线程,一直存活着。当然Python垃圾回收器也会不定时的回收垃圾(引用计数和分代清除两种机制),也可以手动回收垃圾。
Num02-->第二种方式创建多线程
定义一个新的子类,继承threading.Thread就可以,然后重写run()方法。
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : xiaoke
import threading
import time
class MyThread(threading.Thread):
def run(self):
for i in range(3):
time.sleep(1)
msg = "I'm " + self.name + ' @ ' + str(i) # name属性中保存的是当前线程的名字
print(msg)
def main():
for i in range(3):
t = MyThread()
t.start()
if __name__ == '__main__':
main()
# 结果如下:
# I'm Thread-2 @ 0
# I'm Thread-1 @ 0
# I'm Thread-3 @ 0
# I'm Thread-1 @ 1
# I'm Thread-3 @ 1
# I'm Thread-2 @ 1
# I'm Thread-2 @ 2
# I'm Thread-1 @ 2
# I'm Thread-3 @ 2
以上代码加以说明:
1,Python中threading.Thread类中有一个run()方法,用于定义线程的功能函数,可以在自己定义的线程类中重写该方法。而创建自己的线程实例后,通过Thread类的start()方法,可以启动该线程,交个Python虚拟机进行调度,当该线程获得执行的机会时,就会调用run()方法执行线程。
2,多线程程序的执行顺序是不确定的。当执行到sleep()语句时,线程将被阻塞(Blocked),到sleep()结束后,线程进入就绪状态(Runnable),等待调度。而线程的调度将会随机选择一个线程执行。上面的代码只能保证每个线程都运行完,整个run()函数。但是线程的启动顺序,run()函数中每次循环的执行顺序不能确定。
Num03-->线程的几种状态
Num04-->多线程共享全局变量
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : xiaoke
from threading import Thread
import time
g_num = 100
def work1():
global g_num
for i in range(3):
g_num += 1
print("----in work1, g_num is %d---" % g_num)
def work2():
global g_num
print("----in work2, g_num is %d---" % g_num)
print("---线程创建之前g_num is %d---" % g_num)
t1 = Thread(target=work1)
t1.start()
t1.join()
t2 = Thread(target=work2)
t2.start()
t2.join()
以上代码加以说明:
1,优点:不管是可变类型或者不可变类型的数据,在一个进程内的所有线程共享全局变量,能够在不使用其他方式的前提下完成多线程之间的数据共享。
2,缺点:多线程对全局变量的随意改变,会造成对全局变量值的混乱。也就是多线程非安全的原因。
Num05-->多线程中的互斥锁
当多个线程几乎同时修改某一个共享数据的时候,需要进行同步控制
线程同步能够保证多个线程安全访问竞争资源,最简单的同步机制是引入互斥锁。
互斥锁为资源引入一个状态:锁定/非锁定。
某个线程要更改共享数据时,先将其锁定,此时资源的状态为“锁定”,其他线程不能更改;直到该线程释放资源,将资源的状态变成“非锁定”,其他的线程才能再次锁定该资源。互斥锁保证了每次只有一个线程进行写入操作,从而保证了多线程情况下数据的正确性。
Test01-->互斥锁的创建
#创建锁
mutex = threading.Lock()
#锁定
mutex.acquire([blocking])
#释放
mutex.release()
加以说明:
其中,锁定方法acquire可以有一个blocking参数。
如果设定blocking为True,则当前线程会堵塞,直到获取到这个锁为止。(默认为True)
如果设定blocking为False,则当前线程不会堵塞
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : xiaoke
from threading import Thread, Lock
import time
g_num = 0
def test1():
global g_num
for i in range(1000000):
# True表示堵塞 即如果这个锁在上锁之前已经被上锁了,那么这个线程会在这里一直等待到解锁为止
# False表示非堵塞,即不管本次调用能够成功上锁,都不会卡在这,而是继续执行下面的代码
mutexFlag = mutex.acquire(True)
if mutexFlag:
g_num += 1
mutex.release()
print("---test1---g_num=%d" % g_num)
def test2():
global g_num
for i in range(1000000):
mutexFlag = mutex.acquire(True) # True表示堵塞
if mutexFlag:
g_num += 1
mutex.release()
print("---test2---g_num=%d" % g_num)
def main():
p1 = Thread(target=test1)
p1.start()
p2 = Thread(target=test2)
p2.start()
print("---g_num=%d---" % g_num)
p1.join()
p2.join()
if __name__ == '__main__':
# 创建一个互斥锁
# 这个所默认是未上锁的状态
mutex = Lock()
main()
# 运行结果如下:
# ---g_num=44416---
# ---test1---g_num=1990567
# ---test2---g_num=2000000
Test02-->上锁解锁过程
当一个线程调用锁的acquire()方法获得锁时,锁就进入“locked”锁住状态。
每次只有一个线程可以获得锁。
如果此时另一个线程试图获得锁,该线程就会变成“blocked”状态,称为阻塞。直到拥有锁的线程调用release()函数释放锁之后,这时这个锁就进入“unlocked”解锁的状态。
线程调度程序,从处于同步阻塞状态的线程中选择一个来获得锁,并使得该线程进入(running)运行状态。
Test03-->互斥锁的优缺点
优点:确保了某段关键代码只能有一个线程从头到尾的执行。
缺点:
1,阻止了多线程并发执行,包含锁的代码实际上是以单线程的模式执行,效率大大降低了。
2,由于可以存在多个锁,不同的线程持有不同的锁,并试图获取对方持有的锁时,可能会造成死锁。
Num06-->线程死锁和递归锁
Test01--> 死锁的情况
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : xiaoke
import threading
import time
class MyThread1(threading.Thread):
def run(self):
# 对mutexA上锁
if mutexA.acquire():
print(self.name + '----mutexA上锁----')
time.sleep(1)
if mutexB.acquire():
print(self.name + '----mutexA中mutexB上锁----')
mutexB.release()
mutexA.release()
class MyThread2(threading.Thread):
def run(self):
# 对mutexB上锁
if mutexB.acquire():
print(self.name + '----mutexB上锁----')
time.sleep(1)
if mutexA.acquire():
print(self.name + '----mutexB中mutexA上锁----')
mutexA.release()
mutexB.release()
mutexA = threading.Lock()
mutexB = threading.Lock()
if __name__ == '__main__':
t1 = MyThread1()
t2 = MyThread2()
t1.start()
t2.start()
t1.join()
t2.join()
# 死锁的结果如下:
# Thread-1----mutexA上锁----
# Thread-2----mutexB上锁----
# 我手写的:一直停留在这里不动
Test02--> 递归锁,用于解决死锁的问题,对以上死锁代码加以修改
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : xiaoke
import threading
import time
class MyThread1(threading.Thread):
def run(self):
# 对mutexA上锁
if mutex.acquire():
print(self.name + '----mutexA上锁----')
time.sleep(1)
if mutex.acquire():
print(self.name + '----mutexA中mutexB上锁----')
mutex.release()
mutex.release()
class MyThread2(threading.Thread):
def run(self):
# 对mutexB上锁
if mutex.acquire():
print(self.name + '----mutexB上锁----')
time.sleep(1)
if mutex.acquire():
print(self.name + '----mutexB中mutexA上锁----')
mutex.release()
mutex.release()
mutex = threading.RLock()
if __name__ == '__main__':
t1 = MyThread1()
t2 = MyThread2()
t1.start()
t2.start()
t1.join()
t2.join()
# RLock递归锁的结果如下:
# Thread-1----mutexA上锁----
# Thread-1----mutexA中mutexB上锁----
# Thread-2----mutexB上锁----
# Thread-2----mutexB中mutexA上锁----
Num07-->利用互斥锁实现同步
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : xiaoke
# 利用互斥锁实现同步
from threading import Thread, Lock
import time
# 先创建A、B、C三个锁
lockA = Lock()
lockB = Lock()
lockC = Lock()
def task_a():
# 申请lockA,获得锁才开始任务
while True:
lockA.acquire()
print("-----A--------")
time.sleep(1)
lockB.release()
def task_b():
# 申请lockB,获得锁才开始任务
while True:
lockB.acquire()
print("-----B--------")
time.sleep(1)
lockC.release()
def task_c():
# 申请lockC,获得锁才开始任务
while True:
lockC.acquire()
print("-----C--------")
time.sleep(1)
lockA.release()
def main():
# lockB和lockC在主线程获得锁
lockB.acquire()
lockC.acquire()
thread_a = Thread(target=task_a)
thread_b = Thread(target=task_b)
thread_c = Thread(target=task_c)
thread_a.start()
thread_b.start()
thread_c.start()
thread_a.join()
thread_b.join()
thread_c.join()
if __name__ == '__main__':
main()
# 结果如下:
# -----A--------
# -----B--------
# -----C--------
# -----A--------
# -----B--------
# -----C--------
# -----A--------
# -----B--------
# -----C--------
# 此处省略......一直在同步的打印A、B、C
Num08-->生产者和消费者模式--Queue
Python的Queue模块中提供了同步的、线程安全的队列类,包括FIFO(先入先出)队列Queue,LIFO(后入先出)队列LifoQueue,和优先级队列PriorityQueue。这些队列都实现了锁原语(可以理解为原子操作,即要么不做,要么就做完),能够在多线程中直接使用。可以使用队列来实现线程间的同步。
为什么要使用生产者和消费者模式???
在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。
什么是生产者消费者模式???
生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
这个阻塞队列就是用来给生产者和消费者解耦的。
Test01-->FIFO先进先出队列Queue
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : xiaoke
import queue # 队列,解决多线程问题
q = queue.Queue()
q.put("我的第一个进来的")
q.put("我是第二个进来的")
q.put({"name": "我是第三个进来的"})
while True:
if not q.empty():
data = q.get(block=False)
print(data)
# 结果如下:
# 我的第一个进来的
# 我是第二个进来的
# {'name': '我是第三个进来的'}
Test02-->LIFO后入先出队列LifoQueue
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : xiaoke
import queue
# 后进先出
q = queue.LifoQueue()
q.put("我的第一个进来的")
q.put("我是第二个进来的")
q.put({"name": "我是第三个进来的"})
while True:
if not q.empty():
data = q.get(block=False)
print(data)
# 结果如下:
# {'name': '我是第三个进来的'}
# 我是第二个进来的
# 我的第一个进来的
Test03-->优先级队列PriorityQueue
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : xiaoke
import queue
# 按优先级,按照2,3,4,5这样从小到大的优先级
q = queue.PriorityQueue()
q.put([5, "我的第一个进来的"])
q.put([2, "我是第二个进来的"])
q.put([4, {"name": "我是第三个进来的"}])
q.put([3, (2, 3, 4, 5)])
while True:
if not q.empty():
data = q.get(block=False)
print(data)
# 结果如下:
# [2, '我是第二个进来的']
# [3, (2, 3, 4, 5)]
# [4, {'name': '我是第三个进来的'}]
# [5, '我的第一个进来的']
Test04-->Queue队列的方法:
创建一个“队列”对象
from queue import Queue
q = queue.Queue(maxsize = 100)
Queue类即是一个队列的同步实现。队列长度可为无限或者有限。可通过Queue的构造函数的可选参数maxsize来设定队列长度。如果maxsize小于1就表示队列长度无限。
将一个值放入队列中
q.put(100)
调用队列对象的put()方法在队尾插入一个项目。put()有两个参数,第一个item为必需的,为插入项目的值;第二个block为可选参数,默认为
1。如果队列当前为空且block为1,put()方法就使调用线程暂停,直到空出一个数据单元。如果block为0,put方法将引发Full异常。
将一个值从队列中取出
q.get()
调用队列对象的get()方法从队头删除并返回一个项目。可选参数为block,默认为True。如果队列为空且block为True,
get()就使调用线程暂停,直至有项目可用。如果队列为空且block为False,队列将引发Empty异常。
此包中的常用方法(q = queue.Queue()):
q.qsize() 返回队列的大小
q.empty() 如果队列为空,返回True,反之False
q.full() 如果队列满了,返回True,反之False
q.full 与 maxsize 大小对应
q.get([block[, timeout]]) 获取队列,timeout等待时间
q.get_nowait() 相当q.get(False)
非阻塞 q.put(item) 写入队列,timeout等待时间
q.put_nowait(item) 相当q.put(item, False)
q.task_done() 在完成一项工作之后,q.task_done() 函数向任务已经完成的队列发送一个信号
q.join() 实际上意味着等到队列为空,再执行别的操作
Test05-->生产者消费者案例如下:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : xiaoke
from threading import Thread
import time
from queue import Queue
# 3个生产者,当产品数量小于100就生产8个
# 5个消费者,当产品数量大于10就消费3
storageQueue = Queue() # 创建用于保存产品的队列
# 保存产品编号
serial_num = 0
# 生产者
class Producer(Thread):
def __init__(self, name):
Thread.__init__(self)
self.name = name
def run(self):
global serial_num
while True:
if storageQueue.qsize() < 100:
# 向队列中添加消息模拟生产
for i in range(8):
msg = self.name + ":" + str(serial_num)
storageQueue.put(msg)
print("%s 生产了:%s" % (self.name, msg))
serial_num += 1
time.sleep(1)
# 消费者
class Consumer(Thread):
def __init__(self, name):
Thread.__init__(self)
self.name = name
def run(self):
while True:
if storageQueue.qsize() > 10:
# 从消息队列中取消息,代表消费产品
for i in range(3):
msg = storageQueue.get()
print("%s消费了:%s" % (self.name, msg))
time.sleep(1)
def main():
# 3个生产者
for i in range(3):
t = Producer("Producter--" + str(i))
t.start()
# 5个消费者
for i in range(5):
t = Consumer("Consumer--" + str(i))
t.start()
if __name__ == '__main__':
main()
# 结果如下:
# Producter--0 生产了:Producter--0:0
# Producter--0 生产了:Producter--0:1
# Producter--0 生产了:Producter--0:2
# Producter--0 生产了:Producter--0:3
# Producter--0 生产了:Producter--0:4
# Producter--0 生产了:Producter--0:5
# Producter--0 生产了:Producter--0:6
# Producter--0 生产了:Producter--0:7
# Producter--1 生产了:Producter--1:8
# Producter--1 生产了:Producter--1:9
# Producter--1 生产了:Producter--1:10
# Producter--1 生产了:Producter--1:11
# Producter--1 生产了:Producter--1:12
# Producter--1 生产了:Producter--1:13
# Producter--2 生产了:Producter--2:13
# Producter--2 生产了:Producter--2:14
# Producter--2 生产了:Producter--2:15
# Producter--2 生产了:Producter--2:16
# Producter--2 生产了:Producter--2:17
# Producter--2 生产了:Producter--2:18
# Producter--2 生产了:Producter--2:19
# Producter--2 生产了:Producter--2:20
# Producter--1 生产了:Producter--1:22
# Producter--1 生产了:Producter--1:23
# Consumer--0消费了:Producter--0:0
# Consumer--0消费了:Producter--0:1
# Consumer--0消费了:Producter--0:2
# Consumer--1消费了:Producter--0:3
# Consumer--1消费了:Producter--0:4
# Consumer--1消费了:Producter--0:5
# Consumer--2消费了:Producter--0:6
# Consumer--2消费了:Producter--0:7
# Consumer--2消费了:Producter--1:8
# Consumer--3消费了:Producter--1:9
# Consumer--3消费了:Producter--1:10
# Consumer--3消费了:Producter--1:11
# Consumer--4消费了:Producter--1:12
# Consumer--4消费了:Producter--1:13
# Consumer--4消费了:Producter--2:13
# Producter--0 生产了:Producter--0:24
# Producter--0 生产了:Producter--0:25
# Producter--0 生产了:Producter--0:26
# Producter--0 生产了:Producter--0:27
# Producter--0 生产了:Producter--0:28
# Producter--0 生产了:Producter--0:29
# 我手动写的:此处省略很多......
Num09-->守护线程
只要非守护线程结束了,不管守护线程结束没结束,程序都结束.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : xiaoke
import threading
import time
def run(n):
print("我是守护线程--", n)
time.sleep(1)
start_time = time.time()
thread_list = []
for i in range(3):
t = threading.Thread(target=run, args=('thread_list-%s' % i,))
# 设置线程为守护状态,非守护状态线程都退出,程序就退出,不等待守护状态线程
t.daemon = True
t.start() # t.daemon=True 必须在 调用start()函数 前面
thread_list.append(t)
print("当前还活着的线程数量是:", threading.active_count())
print("当前还活着的线程有:", thread_list)
print("当前还剩线程是:", threading.current_thread().name)
print("耗时:", time.time() - start_time)
# 结果是:
# 我是守护线程-- thread_list-0
# 我是守护线程-- thread_list-1
# 我是守护线程-- thread_list-2
# 当前还活着的线程数量是: 4
# 当前还活着的线程有: [<Thread(Thread-1, started daemon 12852)>, <Thread(Thread-2, started daemon 6696)>, <Thread(Thread-3, started daemon 11940)>]
# 当前还剩线程是: MainThread
# 耗时: 0.0
Num10-->多线程--非共享数据
在多线程开发中,全局变量是多个线程都共享的数据,而局部变量则是各个线程的,是非共享的。
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : xiaoke
import threading
import time
class MyThread(threading.Thread):
def __init__(self, num, sleepTime):
threading.Thread.__init__(self)
self.num = num
self.sleepTime = sleepTime
# 重写run()方法
def run(self):
self.num += 1
time.sleep(self.sleepTime)
print('线程(%s),num=%d' % (self.name, self.num))
if __name__ == '__main__':
t1 = MyThread(99, 5)
t1.start()
t2 = MyThread(199, 1)
t2.start()
t1.join()
t2.join()
# 结果如下:
# 线程(Thread-2),num=200
# 线程(Thread-1),num=100
Num11-->线程中ThreadLocal
Test01-->使用函数传参的方法
def process_student(name):
std = Student(name)
# std是局部变量,但是每个函数都要用它,因此必须传进去:
do_task_1(std)
do_task_2(std)
def do_task_1(std):
do_subtask_1(std)
do_subtask_2(std)
def do_task_2(std):
do_subtask_2(std)
do_subtask_2(std)
每个函数一层一层调用都这么传参数那还得了?用全局变量?也不行,因为每个线程处理不同的Student对象,不能共享。
Test02-->使用全局字典的方法
global_dict = {}
def std_thread(name):
std = Student(name)
# 把std放到全局变量global_dict中:
global_dict[threading.current_thread()] = std
do_task_1()
do_task_2()
def do_task_1():
# 不传入std,而是根据当前线程查找:
std = global_dict[threading.current_thread()]
...
def do_task_2():
# 任何函数都可以查找出当前线程的std变量:
std = global_dict[threading.current_thread()]
...
这种方式理论上是可行的,它最大的优点是消除了std对象在每层函数中的传递问题,但是,每个函数获取std的代码有点low。
Test03-->使用ThreadLocal的方法
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : xiaoke
from threading import Thread
import threading
# 创建ThreadLocal对象
local_school = threading.local()
class Student(object):
def __init__(self, name):
self.name = name
# 线程中调用的函数
def do_task():
# 相当于以当前线程对象为key取出stu对象
stu = local_school.student
print("%s中学生名字为:%s" % (threading.current_thread().name, stu.name))
# 线程的功能函数
def task_thread(name):
# 创建类的实例对象
stu = Student(name)
# 相当于以当前的线程对象为key,放入全局的字典local_school
local_school.student = stu
do_task()
def main():
t1 = Thread(target=task_thread, args=("xiaoke",))
t2 = Thread(target=task_thread, args=("lili",))
t1.start()
t2.start()
t1.join()
t2.join()
if __name__ == '__main__':
main()
# 结果如下:
# Thread-1中学生名字为:xiaoke
# Thread-2中学生名字为:lili
全局变量local_school就是一个ThreadLocal对象,每个Thread对它都可以读写student属性,但互不影响。你可以把local_school看成全局变量,但每个属性如local_school.student都是线程的局部变量,可以任意读写而互不干扰,也不用管理锁的问题,ThreadLocal内部会处理。
可以理解为全局变量local_school是一个dict。
ThreadLocal最常用的地方就是为每个线程绑定一个数据库连接,HTTP请求,用户身份信息等。这样一个线程所有调用到的处理函数,都可以非常方便地访问这些资源。
Test04-->小总结
一个ThreadLocal变量虽然是“全局变量”,但每个线程都只能读写自己线程的独立副本,互不干扰。
ThreadLocal解决了参数在一个线程中各个函数之间互相传递的问题。
Num12-->GIL全局解释器锁
作用:保证同一时刻,无论你有多少个线程,只有一个线程被CPU执行。
因为Python的线程虽然是真正的线程,但解释器执行代码时,有一个GIL锁:Global Interpreter Lock,任何Python线程执行前,必须先获得GIL锁,然后,每执行100条字节码,解释器就自动释放GIL锁,让别的线程有机会执行。这个GIL全局锁实际上把所有线程的执行代码都给上了锁,所以,多线程在Python中只能交替执行,即使100个线程跑在100核CPU上,也只能用到1个核。
所以,在Python中,可以使用多线程,但不要指望能有效利用多核。如果一定要通过多线程利用多核,那只能通过C扩展来实现,不过这样就失去了Python简单易用的特点。
不过,也不用过于担心,Python虽然不能利用多线程实现多核任务,但可以通过多进程实现多核任务。多个Python进程有各自独立的GIL锁,互不影响。
Num13-->信号量Semaphore
信号量:是指同时开几个线程并发。比如厕所有5个坑,那最多只允许5个人上厕所,后面的人只能等里面的5个人都出来了,才能再进去。
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : xiaoke
import threading, time
class myThread(threading.Thread):
def run(self):
# 加把锁,可以放进去多个(相当于5把锁,同时有5个线程)
if semaphore.acquire():
print(self.name)
time.sleep(3)
semaphore.release()
if __name__ == "__main__":
# 同时能有几个线程进去(设置为5就是一次5个线程进去)
semaphore = threading.Semaphore(5)
thread_list = [] # 空列表
for i in range(200): # 200个线程
thread_list.append(myThread()) # 加线程对象
for t in thread_list:
t.start() # 分别启动
# 结果如下:每隔3秒就会同时显示5个进程
# Thread-1
# Thread-2
# Thread-3
# Thread-4
# Thread-5
# Thread-6
# Thread-7
# Thread-8
# Thread-9
# Thread-10
# .......
Num14-->Event线程间通信
Python提供了Event对象用于线程间通信。它是由线程设置的信号标志,如果信号标志为真,则其他线程等待,直到信号拿到。
Event对象实现了简单的线程通信机制。它提供了设置信号,清除信号,等待信号等,用于实现线程间的通信。
Events的使用
event = threading.Event()
event.wait()
Event对象wait()方法只有在内部信号为真的时候,才会很快的执行并完成返回。当Event对象的内部信号标志为假时,则wait()方法一直等待到其为真时才返回。
event.set()
使用Event的set()方法可以设置Event对象内部的信号标志为真。Event对象提供了isSet()方法来判断其内部信号标志的状态。当使用Event对象的set()方法后,isSet()方法返回真。
event.clear()
使用Event对象的clear()方法可以清除Event对象内部的信号标志。即将其设为假,当使用Event的clear方法后,isSet()方法返回假。
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : xiaoke
import threading, time
class Boss(threading.Thread):
def run(self):
print("大老板说:今晚大家都要加班到24:00。")
print(event.isSet())
event.set()
print(event.isSet())
print("辛苦大家了......加油干吧!!!")
time.sleep(10)
print("大老板说:24:00到了,可以下班了。")
print(event.isSet())
event.set()
print(event.isSet())
class Worker(threading.Thread):
def run(self):
# 等待信号为真,取数据
event.wait()
print("加油工作,多赚钱……!")
time.sleep(5)
event.clear()
# 等待信号为真,取数据
event.wait()
print("下班回家陪老婆,孩子,哈哈哈!")
if __name__ == "__main__":
event = threading.Event()
threads = []
# 三个员工,一个老板
for i in range(3):
threads.append(Worker())
threads.append(Boss())
# 开启老板和员工
for t in threads:
t.start()
# 等待老板和员工工作都结束
for t in threads:
t.join()
# 结果如下:
# 大老板说:今晚大家都要加班到24:00。
# False
# True
# 辛苦大家了......加油干吧!!!
# 加油工作,多赚钱……!
# 加油工作,多赚钱……!
# 加油工作,多赚钱……!
# 大老板说:24:00到了,可以下班了。
# False
# True
# 下班回家陪老婆,孩子,哈哈哈!
# 下班回家陪老婆,孩子,哈哈哈!
# 下班回家陪老婆,孩子,哈哈哈!