1.什么是单线程和多线程
- 1.单线程就是一个人处理大量的事情,耗时又耗力。
- 2.多线程就是多个人一块处理这大量的事情,虽然不一定省时(多线程需要去竞争cpu资源)但一定省力。
- 3.如果需要专业性表述请自行查阅相关资料(一千位读者就有一千位哈姆雷特)
2.线程相关属性
- 1.代码如下
#!/usr/bin/python
# -*- coding: UTF-8 -*-
import threading
# 线程的相关属性
def about_main_thread():
print("当前活跃线程的数量: ", threading.active_count())
print("当前所有线程的具体信息: ", threading.enumerate())
print("当前线程的信息: ", threading.current_thread())
print("线程名字: ", threading.current_thread().name)
print("线程是否存活: ", threading.current_thread().is_alive())
print("线程是否是随从线程: ", threading.current_thread().daemon) # 随从线程(守护线程): 守护线程(设置daemon为true时)随着所有非守护线程的结束而结束
print("线程的本地线程id: ", threading.current_thread().native_id)
......
if __name__ == '__main__':
# 其他线程相关属性类似
about_main_thread()
-
2.运行截图如下
3.单线程和多线程各模拟做100万件事
打印0-1000000数据观察所耗时间
- 1.单线程处理100万数据
import threading
import time
# 单线程打印100万数据
def single_thread_print1000000():
start = int(round(time.time() * 1000))
for i in range(1000000):
print("线程%s处理%d" % (threading.current_thread().name, i))
end = int(round(time.time() * 1000))
print("打印1000000耗时%d毫秒" % (end-start))
if __name__ == '__main__':
single_thread_print1000000()
执行结果如下:
- 2.多线程处理100万数据
# 开启5个线程,每个线程处理20万数据
def mul_thread_print1000000():
start = int(round(time.time() * 1000))
threadlist=[]
for i in range(5):
threadlist.append(threading.Thread(name='T'+str(i), target=mul_target_print1000000))
for i in range(5):
threadlist[i].start()
threadlist[i].join()
end = int(round(time.time() * 1000))
print("打印1000000耗时%d毫秒: " % (end - start))
# 需要做的事
def mul_target_print1000000():
for i in range(200000):
print("线程%s处理%d" % (threading.current_thread().name, i))
if __name__ == '__main__':
mul_thread_print1000000()
执行结果如下:
- 3.经对比发现,此例中多线程比单线程要快200多毫秒,可能200多毫秒感觉不出什么,可是当我们将打印的数字换成一个个复杂的业务逻辑,你会发现,二者的差距越来越大,这个时候就能体会到多线程的好处了。
- 4.多线程使用场景
- 1.当你在学习爬虫的过程中,爬取1000本小说,用单线程怕不是得爬到明年去,使用多线程你会发现爬取1000本小说六的起飞。
- 2.当工作的时候需要处理大几千万的数据量,多线程会让你少花一些功夫和时间。
- 3.相关类似的场景很多很多,多线程可以视自己的业务情况而定。
4.多线程模块的使用及如何处理线程执行结果
- 1.继承threading.Thread类来创建自己的线程
import threading
import time
def use_mul_thread_class():
thread1 = DetailData(name="T1", mydata=[1, 3, 5, 7, 9])
thread2 = DetailData(name="T3", mydata=[2, 4, 6, 8, 10])
thread1.start()
thread2.start()
time.sleep(5)
thread1.stop()
thread2.stop()
class DetailData(threading.Thread):
def __init__(self, name, mydata):
threading.Thread.__init__(self)
self.name = name # 线程名字
self.mydata = mydata # 处理的数据
self.thread_stop = False # 手动停止线程标识
# run方法中处理需要的任务
def run(self):
while not self.thread_stop:
print('Thread Object(%s), Time:%s\n' % (self.name, time.ctime()))
sum = 0
for item in self.mydata:
sum += item
time.sleep(1) # 睡眠3秒
print("线程%s的传入参数求和为%d" % (self.name, sum))
print("线程%s正常执行结束" % self.name)
def stop(self):
print("线程%s已手动停止" % self.name)
self.thread_stop = True
if __name__ == '__main__':
use_mul_thread_class()
- 2.将需要做的事情单独封装一个方法,直接调用threading.Thread()方法去创建自己的线程(前面其实已经使用过这种方法了,下面介绍Thread()的参数)
源码如下,具体是如何实现可以自己去看:
def __init__(self, group=None, target=None, name=None,
args=(), kwargs=None, *, daemon=None):
# 1.group参数应当为 None,为将来实现Python Thread类的扩展而保留。
# 2.target参数是被 run()方法调用的回调对象. 默认应为None, 意味着没有对象被调用,需要注意的是target参数需要为回调对象(就是一个需要做什么事的方法)的名字,不能加()。
# 3.name参数是线程的名字,默认形式为'Thread-N'的唯一的名字被创建,其中N 是比较小的十进制数。
# 4.args参数是一个元组tuple,这个是回调对象需要的参数,就是target赋值的方法对应的参数列表,当只有一个参数的时候应为(one,)这种形式,不能直接写one。
# 5.kwargs参数是目标调用的参数的关键字dictionary,默认为{}。
# 6.daemon参数是是否是守护线程,默认为false
注: 这个方法的参数args和kwargs,args传的元组是自动直接对应方法参数位置的,kwargs是传递一个字典,可以根据方法参数的名字去手动对应。
- 3.处理线程执行结果
1.使用全局的列表或者队列来保存返回值
import threading
import time
def deal_thread_return():
returnlist = []
threadlist = []
threadlist.append(threading.Thread(target=deal_thread_return_target, args=([1, 3, 5, 7, 9], returnlist)))
threadlist.append(threading.Thread(target=deal_thread_return_target, args=([2, 4, 6, 8, 10], returnlist)))
threadlist.append(threading.Thread(target=deal_thread_return_target, args=([21, 14, 36, 18, 100], returnlist)))
threadlist.append(threading.Thread(target=deal_thread_return_target, args=([2, 14, 6, 18, 200], returnlist)))
for i in threadlist:
i.start()
i.join() # 此处的join方法是等待当前线程结束后程序才接着执行
print(returnlist)
def deal_thread_return_target(data, returnlist):
sum = 0
for item in data:
sum += item
returnlist.append(sum)
if __name__ == '__main__':
deal_thread_return()
注:选择列表的一个原因是:列表的 append() 方法是线程安全的,选择队列的原因是其先进先出的特性。
2.重写 Thread 的 join 方法,返回线程函数的返回值
class ThreadReturn(threading.Thread):
def __init__(self, target=None, args=()):
threading.Thread.__init__(self, target=target, args=args) # 传入需要的参数
def run(self):
if self._target is not None: # 这种写法可以看源码
self._return = self._target(*self._args, **self._kwargs)
def join(self): # 重新join方法
super().join()
return self._return # 这返回对应线程的结果
def rewrite_join_target(name):
return "你好!"+str(name)
def rewrite_join():
threadlist = []
returnlist = []
for i in range(10):
threadlist.append(ThreadReturn(target=rewrite_join_target, args=("李白"+str(i),)))
for i in threadlist:
i.start()
returnlist.append(i.join())
print(returnlist)
if __name__ == '__main__':
rewrite_join()
3.使用标准库的concurrent.futures
import concurrent.futures
def rewrite_join_target(name):
return "你好!"+str(name)
def stand_futures():
# 这是线程池的方式
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
futurelist = []
for i in range(10): # 模拟多个任务
future = executor.submit(rewrite_join_target, "李白"+str(i))
futurelist.append(future)
for future in concurrent.futures.as_completed(futurelist): # 并发执行
print(future.result()) # 拿到结果
if __name__ == '__main__':
stand_futures()
注:这段代码中的submit方法,回调函数不能显示赋值(即fn=target报错),且需要的参数列表不需要传元组,直接参数1,参数2,参数3...这样传进去就可以了。
5.线程池的使用
线程池的引入是为了更好的利用计算机资源,上文中建立10个20个线程放到列表里面,然后启动,这样虽然可以实现功能,但是并没有把计算机资源充分利用起来,由此线程池就诞生了
- 1.ThreadPoolExecutor 方式
ThreadPoolExecutor 是 python3 新引入的库,通过 submit 和 as_completed 来提交和获取任务返回结果。
import concurrent.futures
from concurrent.futures import ThreadPoolExecutor
def rewrite_join_target(name):
print("线程%s正在向%s问好" % (threading.current_thread().name, name))
return threading.current_thread().name + ": 你好!"+str(name)
def use_threadpool():
pool = ThreadPoolExecutor(max_workers=10, thread_name_prefix="T")
future_list = [pool.submit(rewrite_join_target, i) for i in range(10)]
for future in concurrent.futures.as_completed(future_list):
result = future.result()
print("线程%s拿到的返回值%s" % (threading.current_thread().name, result))
pool.shutdown()
if __name__ == '__main__':
use_threadpool()
运行结果如下:
- 2.threadpool 方式(第三方库)
使用方法:通过 pool.putRequest() 将任务丢到线程池执行, pool.wait() 等待所有线程结束。同时还可以定义回调函数,拿到任务的返回结果。
import threadpool
import threading
def use_threadpool3():
pool = threadpool.ThreadPool(num_workers=3)
# arg_list1, arg_list2 , arg_list3 是3种传参数方式
arg_list1 = ["lily", "marry", "李白", "哇哈哈"] # 列表中是字符串类型, 每一项对应target方法的参数列表
arg_list2 = [(["lily1"], None), (["李白"], None), (["杜甫"], None)] # 列表中是元组, 元组的第一项对应target的参数列表
arg_list3 = [(None, {"name": "lily3"}), (None, {"name": "王羲之"}), (None, {"name": "伏羲"})] # 列表中是元组, 元组的第二项对应target的参数列表(显示对应)
req_list1 = threadpool.makeRequests(threadpool3_target, arg_list1, threadpool3_callback)
req_list2 = threadpool.makeRequests(threadpool3_target, arg_list2, threadpool3_callback)
req_list3 = threadpool.makeRequests(threadpool3_target, arg_list3, threadpool3_callback)
# thread_list = [pool.putRequest(req) for req in req_list1]
# thread_list = [pool.putRequest(req) for req in req_list2]
thread_list = [pool.putRequest(req) for req in req_list3]
pool.wait()
print(thread_list) # 观察结果
def threadpool3_target(name):
print("线程%s 向 %s 问好!" % (threading.current_thread().name, name));
time.sleep(1)
return name
def threadpool3_callback(request, result): # 回调函数,用于取回结果
print("线程%s拿到结果:%s!" % (threading.current_thread().name, result))
if __name__ == '__main__':
use_threadpool3()
arg_list3的运行结果:
-
3.俩种线程池对比
- 1.线程名字问题:第一种可以为线程池中的线程起名字(较为简单),第二种起名字要稍微复杂点。
- 2.返回值问题:第一种的返回拿到以后可以继续做相应处理,第二种是将返回封装成一个方法,在方法中做相应处理。
- 3.线程池关闭问题:第一种需要去手动关闭,第二种是任务执行完成以后自动关闭,这个俩者都是可以设置的。
- 4.个人推荐使用第一种,但是第二种感觉灵活一点,看自己需求吧。
-
4.线程锁(处理某些资源需要线程独占,从而引入了锁)
各个线程之间需要共享统一资源,在线程并发执行时,保证每个线程执行的原子性。
在python的threading模块中提供了5种最常见的锁,下面是按照功能进行划分:- 1.同步锁:lock(一次只能放行一个)
- 2.递归锁:rlock(一次只能放行一个)
- 3.条件锁:condition(一次可以放行任意个)
- 4.事件锁:event(一次全部放行)
- 5.信号量锁:semaphore(一次可以放行特定个)
下方我使用了lock锁(案例为多线程打印0-100),其它相关锁可以自行学习使用。
import threading
import concurrent.futures
def mul_thread_print100():
# 如果不知道with语法可以查阅一下资料,其实with..as语法就是可以关闭相关流
with concurrent.futures.ThreadPoolExecutor(max_workers=5, thread_name_prefix="T") as ex:
i = 0
while True:
if i == 11:
break
ex.submit(mul_thread_print100_target1) # 提交11个任务到线程池
i += 1
def mul_thread_print100_target1():
global lock
global num
lock.acquire()
for i in [j for j in range(101)][num:]:
# time.sleep(1)
print("当前线程%s正在打印%d" % (threading.current_thread().name, num))
num += 1
if num % 10 == 0:
lock.release()
return
if __name__ == '__main__':
lock = threading.Lock()
num = 1
mul_thread_print100()
6.结语:
多线程的使用情况还是很多的,如果遇到需要一次性使用的多线程,首推python,简单易上手,直接引入相关模块干就完了。看python这么顶,真想入坑人工智能,哎,可惜了,没那个头脑!