python多线程及线程池

1.什么是单线程和多线程

  • 1.单线程就是一个人处理大量的事情,耗时又耗力。
  • 2.多线程就是多个人一块处理这大量的事情,虽然不一定省时(多线程需要去竞争cpu资源)但一定省力。
  • 3.如果需要专业性表述请自行查阅相关资料(一千位读者就有一千位哈姆雷特)

2.线程相关属性

  • 1.代码如下
#!/usr/bin/python
# -*- coding: UTF-8 -*-
import threading

# 线程的相关属性
def about_main_thread():
    print("当前活跃线程的数量: ", threading.active_count())
    print("当前所有线程的具体信息: ", threading.enumerate())
    print("当前线程的信息: ", threading.current_thread())
    print("线程名字: ", threading.current_thread().name)
    print("线程是否存活: ", threading.current_thread().is_alive())
    print("线程是否是随从线程: ", threading.current_thread().daemon)  # 随从线程(守护线程): 守护线程(设置daemon为true时)随着所有非守护线程的结束而结束
    print("线程的本地线程id: ", threading.current_thread().native_id)
    ......


if __name__ == '__main__':
    # 其他线程相关属性类似
    about_main_thread()
  • 2.运行截图如下


    image.png

3.单线程和多线程各模拟做100万件事

打印0-1000000数据观察所耗时间

  • 1.单线程处理100万数据
import threading
import time
# 单线程打印100万数据
def single_thread_print1000000():
    start = int(round(time.time() * 1000))
    for i in range(1000000):
        print("线程%s处理%d" % (threading.current_thread().name, i))
    end = int(round(time.time() * 1000))
    print("打印1000000耗时%d毫秒" % (end-start))

if __name__ == '__main__':
    single_thread_print1000000()

执行结果如下:


image.png
  • 2.多线程处理100万数据
# 开启5个线程,每个线程处理20万数据
def mul_thread_print1000000():
    start = int(round(time.time() * 1000))
    threadlist=[]
    for i in range(5):
        threadlist.append(threading.Thread(name='T'+str(i), target=mul_target_print1000000))
    for i in range(5):
        threadlist[i].start()
        threadlist[i].join()
    end = int(round(time.time() * 1000))
    print("打印1000000耗时%d毫秒: " % (end - start))

# 需要做的事
def mul_target_print1000000():
    for i in range(200000):
        print("线程%s处理%d" % (threading.current_thread().name, i))


if __name__ == '__main__':
    mul_thread_print1000000()

执行结果如下:


image.png
  • 3.经对比发现,此例中多线程比单线程要快200多毫秒,可能200多毫秒感觉不出什么,可是当我们将打印的数字换成一个个复杂的业务逻辑,你会发现,二者的差距越来越大,这个时候就能体会到多线程的好处了。
  • 4.多线程使用场景
    • 1.当你在学习爬虫的过程中,爬取1000本小说,用单线程怕不是得爬到明年去,使用多线程你会发现爬取1000本小说六的起飞。
    • 2.当工作的时候需要处理大几千万的数据量,多线程会让你少花一些功夫和时间。
    • 3.相关类似的场景很多很多,多线程可以视自己的业务情况而定。

4.多线程模块的使用及如何处理线程执行结果

  • 1.继承threading.Thread类来创建自己的线程
import threading
import time
def use_mul_thread_class():
    thread1 = DetailData(name="T1", mydata=[1, 3, 5, 7, 9])
    thread2 = DetailData(name="T3", mydata=[2, 4, 6, 8, 10])
    thread1.start()
    thread2.start()
    time.sleep(5)
    thread1.stop()
    thread2.stop()


class DetailData(threading.Thread):
    def __init__(self, name, mydata):
        threading.Thread.__init__(self)
        self.name = name  # 线程名字
        self.mydata = mydata # 处理的数据
        self.thread_stop = False  # 手动停止线程标识

    # run方法中处理需要的任务
    def run(self):
        while not self.thread_stop:
            print('Thread Object(%s), Time:%s\n' % (self.name, time.ctime()))
            sum = 0
            for item in self.mydata:
                sum += item
            time.sleep(1)  # 睡眠3秒
            print("线程%s的传入参数求和为%d" % (self.name, sum))
            print("线程%s正常执行结束" % self.name)

    def stop(self):
        print("线程%s已手动停止" % self.name)
        self.thread_stop = True


if __name__ == '__main__':
    use_mul_thread_class()
  • 2.将需要做的事情单独封装一个方法,直接调用threading.Thread()方法去创建自己的线程(前面其实已经使用过这种方法了,下面介绍Thread()的参数)
    源码如下,具体是如何实现可以自己去看:
def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs=None, *, daemon=None):
# 1.group参数应当为 None,为将来实现Python Thread类的扩展而保留。
# 2.target参数是被 run()方法调用的回调对象. 默认应为None, 意味着没有对象被调用,需要注意的是target参数需要为回调对象(就是一个需要做什么事的方法)的名字,不能加()。
# 3.name参数是线程的名字,默认形式为'Thread-N'的唯一的名字被创建,其中N 是比较小的十进制数。
# 4.args参数是一个元组tuple,这个是回调对象需要的参数,就是target赋值的方法对应的参数列表,当只有一个参数的时候应为(one,)这种形式,不能直接写one。
# 5.kwargs参数是目标调用的参数的关键字dictionary,默认为{}。
# 6.daemon参数是是否是守护线程,默认为false

注: 这个方法的参数args和kwargs,args传的元组是自动直接对应方法参数位置的,kwargs是传递一个字典,可以根据方法参数的名字去手动对应。

  • 3.处理线程执行结果
    1.使用全局的列表或者队列来保存返回值
import threading
import time
def deal_thread_return():
    returnlist = []
    threadlist = []
    threadlist.append(threading.Thread(target=deal_thread_return_target, args=([1, 3, 5, 7, 9], returnlist)))
    threadlist.append(threading.Thread(target=deal_thread_return_target, args=([2, 4, 6, 8, 10], returnlist)))
    threadlist.append(threading.Thread(target=deal_thread_return_target, args=([21, 14, 36, 18, 100], returnlist)))
    threadlist.append(threading.Thread(target=deal_thread_return_target, args=([2, 14, 6, 18, 200], returnlist)))
    for i in threadlist:
        i.start()
        i.join()    # 此处的join方法是等待当前线程结束后程序才接着执行
    print(returnlist)


def deal_thread_return_target(data, returnlist):
    sum = 0
    for item in data:
        sum += item
    returnlist.append(sum)


if __name__ == '__main__':
    deal_thread_return()

注:选择列表的一个原因是:列表的 append() 方法是线程安全的,选择队列的原因是其先进先出的特性。

2.重写 Thread 的 join 方法,返回线程函数的返回值


class ThreadReturn(threading.Thread):
    def __init__(self, target=None, args=()):
        threading.Thread.__init__(self, target=target, args=args)  # 传入需要的参数

    def run(self):
        if self._target is not None:  # 这种写法可以看源码
            self._return = self._target(*self._args, **self._kwargs)

    def join(self):  # 重新join方法
        super().join() 
        return self._return  # 这返回对应线程的结果


def rewrite_join_target(name):
    return "你好!"+str(name)


def rewrite_join():
    threadlist = []
    returnlist = []
    for i in range(10):
        threadlist.append(ThreadReturn(target=rewrite_join_target, args=("李白"+str(i),)))
    for i in threadlist:
        i.start()
        returnlist.append(i.join())
    print(returnlist)


if __name__ == '__main__':
    rewrite_join()

3.使用标准库的concurrent.futures

import concurrent.futures

def rewrite_join_target(name):
    return "你好!"+str(name)


def stand_futures():
    # 这是线程池的方式
    with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
        futurelist = []
        for i in range(10):  # 模拟多个任务
            future = executor.submit(rewrite_join_target, "李白"+str(i))
            futurelist.append(future)

        for future in concurrent.futures.as_completed(futurelist):  # 并发执行
            print(future.result())    # 拿到结果


if __name__ == '__main__':
    stand_futures()

注:这段代码中的submit方法,回调函数不能显示赋值(即fn=target报错),且需要的参数列表不需要传元组,直接参数1,参数2,参数3...这样传进去就可以了。

5.线程池的使用

线程池的引入是为了更好的利用计算机资源,上文中建立10个20个线程放到列表里面,然后启动,这样虽然可以实现功能,但是并没有把计算机资源充分利用起来,由此线程池就诞生了

  • 1.ThreadPoolExecutor 方式
    ThreadPoolExecutor 是 python3 新引入的库,通过 submit 和 as_completed 来提交和获取任务返回结果。
import concurrent.futures
from concurrent.futures import ThreadPoolExecutor

def rewrite_join_target(name):
    print("线程%s正在向%s问好" % (threading.current_thread().name, name))
    return threading.current_thread().name + ": 你好!"+str(name)

def use_threadpool():
    pool = ThreadPoolExecutor(max_workers=10, thread_name_prefix="T")
    future_list = [pool.submit(rewrite_join_target, i) for i in range(10)]
    for future in concurrent.futures.as_completed(future_list):
        result = future.result()
        print("线程%s拿到的返回值%s" % (threading.current_thread().name, result))
    pool.shutdown()

if __name__ == '__main__':
    use_threadpool()

运行结果如下:


image.png
  • 2.threadpool 方式(第三方库)
    使用方法:通过 pool.putRequest() 将任务丢到线程池执行, pool.wait() 等待所有线程结束。同时还可以定义回调函数,拿到任务的返回结果。
import threadpool
import threading

def use_threadpool3():
    pool = threadpool.ThreadPool(num_workers=3)
    # arg_list1, arg_list2 , arg_list3 是3种传参数方式
    arg_list1 = ["lily", "marry", "李白", "哇哈哈"]  # 列表中是字符串类型, 每一项对应target方法的参数列表
    arg_list2 = [(["lily1"], None), (["李白"], None), (["杜甫"], None)]   # 列表中是元组, 元组的第一项对应target的参数列表
    arg_list3 = [(None, {"name": "lily3"}), (None, {"name": "王羲之"}), (None, {"name": "伏羲"})]    # 列表中是元组, 元组的第二项对应target的参数列表(显示对应)
    req_list1 = threadpool.makeRequests(threadpool3_target, arg_list1, threadpool3_callback)
    req_list2 = threadpool.makeRequests(threadpool3_target, arg_list2, threadpool3_callback)
    req_list3 = threadpool.makeRequests(threadpool3_target, arg_list3, threadpool3_callback)
    # thread_list = [pool.putRequest(req) for req in req_list1]
    # thread_list = [pool.putRequest(req) for req in req_list2]
    thread_list = [pool.putRequest(req) for req in req_list3]
    pool.wait()
    print(thread_list)  # 观察结果


def threadpool3_target(name):
    print("线程%s 向 %s 问好!" % (threading.current_thread().name, name));
    time.sleep(1)
    return name


def threadpool3_callback(request, result):  # 回调函数,用于取回结果
    print("线程%s拿到结果:%s!" % (threading.current_thread().name, result))

if __name__ == '__main__':
    use_threadpool3()

arg_list3的运行结果:


image.png
  • 3.俩种线程池对比

    • 1.线程名字问题:第一种可以为线程池中的线程起名字(较为简单),第二种起名字要稍微复杂点。
    • 2.返回值问题:第一种的返回拿到以后可以继续做相应处理,第二种是将返回封装成一个方法,在方法中做相应处理。
    • 3.线程池关闭问题:第一种需要去手动关闭,第二种是任务执行完成以后自动关闭,这个俩者都是可以设置的。
    • 4.个人推荐使用第一种,但是第二种感觉灵活一点,看自己需求吧。
  • 4.线程锁(处理某些资源需要线程独占,从而引入了锁)
    各个线程之间需要共享统一资源,在线程并发执行时,保证每个线程执行的原子性。
    在python的threading模块中提供了5种最常见的锁,下面是按照功能进行划分:

    • 1.同步锁:lock(一次只能放行一个)
    • 2.递归锁:rlock(一次只能放行一个)
    • 3.条件锁:condition(一次可以放行任意个)
    • 4.事件锁:event(一次全部放行)
    • 5.信号量锁:semaphore(一次可以放行特定个)

下方我使用了lock锁(案例为多线程打印0-100),其它相关锁可以自行学习使用。

import threading
import concurrent.futures

def mul_thread_print100():
    # 如果不知道with语法可以查阅一下资料,其实with..as语法就是可以关闭相关流 
    with concurrent.futures.ThreadPoolExecutor(max_workers=5, thread_name_prefix="T") as ex:
        i = 0
        while True:
            if i == 11:
                break
            ex.submit(mul_thread_print100_target1)  # 提交11个任务到线程池
            i += 1


def mul_thread_print100_target1():
    global lock
    global num
    lock.acquire()
    for i in [j for j in range(101)][num:]:
        # time.sleep(1)
        print("当前线程%s正在打印%d" % (threading.current_thread().name, num))
        num += 1
        if num % 10 == 0:
            lock.release()
            return


if __name__ == '__main__':
    lock = threading.Lock()
    num = 1
    mul_thread_print100()

6.结语:

多线程的使用情况还是很多的,如果遇到需要一次性使用的多线程,首推python,简单易上手,直接引入相关模块干就完了。看python这么顶,真想入坑人工智能,哎,可惜了,没那个头脑!

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,029评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,238评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,576评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,214评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,324评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,392评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,416评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,196评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,631评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,919评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,090评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,767评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,410评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,090评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,328评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,952评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,979评论 2 351

推荐阅读更多精彩内容