Python43_多任务之线程

python的thread模块是比较底层的模块(可能不同的操作系统不一样),python的threading模块是对thread做了一些包装的,可以更加方便的被使用(跨平台)

ps:看程序代码的时候,切忌从上往下看,而是主要看程序的框架(比如C语言中,主要看main函数)

Thread对象基础

threading模块

threading模块中的对象列表

  1. Thread:表示一执行线程的对象
  2. Lock:锁原语对象(和thread模块中的锁一样)
  3. RLock:可重入锁对象(可读不可写)
  4. Condition:条件变量对象,使得一个线程等待另一个线程满足特等的“条件”,比如改变状态或某个数据值
  5. Event:条件变量的通用版本,任意数量的线程等待某个事件的发生,在该事件发生后所有线程将被激活
  6. Semaphore:为线程间共享的有限资源提供一个“计数器”,如果没有可用资源将会被阻塞
  7. BoundeSemaphore:与Semaphore相似,不过它不允许超过初始值
  8. Timer:与Thread相似,不过它要在运行前等待一段时间
  9. Barrier:创建一个“障碍”,必须达到指定数量的线程后才可以继续

ps:我们通过Python实现过线程编程,猪油用到的是threading.Thread对象

Thread对象常用的方法和属性

数据属性:

  1. name:线程名

  2. ident:线程的标识符

  3. daemon:布尔标志,表示这个线程是否是守护线程

    • 用法:线程对象.setDaemon(True)
    • 如果把子线程设置为守护线程,表示该线程不重要,主线程结束,子线程结束

对象方法:

  1. __init__(group=None, target=None, name=None, args=(), kwargs={}, verbose=None,daemon=None):实例化一个线程对象,需要有一个可调用的target(函数),以及其参数args或kwargs。还可以传递name或group参数,不过后者还未实现。此外,verbose标志也是可接受的,而daemon的值将会设定thread.daemon标志/属性
  2. start():开始执行该线程
  3. run():定义线程功能的方法,通常在子类中被开发者重写
  4. join(timeout=None):直至启动的线程终止之前一直:除非给出了timeout(秒),否则会一直阻塞,相当于在此处等待调用者线程完成

线程详解

多线程执行

法一:函数方式

import threading    #threading模块与Process很类似
import time

def saySorry():
    print("亲爱的,我错了,我能吃饭了吗?")
    time.sleep(1)   #睡一秒

if __name__ == "__main__":
    for _ in range(10):
        t = threading.Thread(target=saySorry)   #创建一个线程对象t
        t.start()
 #会发现其执行时间一共也就在1s左右(如果不用线程,应该在10s作用)
 #主线程要等待所有子线程结束后才能结束

进程、程序与线程:

  • 程序是死的,是代码的集合;

  • 程序的运行被称之为进程——是拥有资源的最小单位;

  • 线程是程序调度的最小单位

ps:如果多个线程执行的都是同一个函数的话,各自之间不会有影响,各是个的

方式二:类的方式

import threading,time

class MyThread(threading.Thread):
    #与Process相似,重写run方法
    def run(self):
        for i in range(10):
            time.sleep(1)
            msg = "I'm {name} @ {count}".format(name = self.name, count = i)
            print(msg)

if __name__ == "__main__":
    t = MyThread(name = "test_thread")  #创建线程并为线程指定名称。当函数结束的时候,t这个线程就结束了
    t.start()   #线程开始执行
    print("当前线程:" + str(threading.enumerate())) #enumerate()能够获得当前时刻 程序中的所有线程(包括自己)
    # ps:enumerate回忆:
    # names = ["aa","bb","cc"]
    # for temp in enumerate(names):
        # print(temp)  # 会输出序号几列表元素所组成的元组
    #对于线程,主线程一般也要等待子线程(为了收回子线程占有的一点点资源)

线程的执行顺序

线程的执行顺序不确定,与进程一样,取决于操作系统的调度算法

多线程对全局变量的共享

from threading import Thread
import time

g_num = 100

def work1():
    global  g_num   #注意:全局变量只要没有改指向,则在函数里面就不需要加global,如果可能改指向,就需要加global
    for _ in range(3):
        g_num += 1
    print("In work1, g_num is {}".format(g_num))

def work2():
    global g_num
    print("In work2, g_num is {}".format(g_num))

print("线程创建之前,g_num is {}".format(g_num))

t1 = Thread(target=work1)
t1.start()

time.sleep(1)   #睡眠1s,保证t1执行完毕

t2 = Thread(target=work2)
t2.start()

'''执行结果如下:
线程创建之前,g_num is 100
In work1, g_num is 103
In work2, g_num is 103
'''

对于进程,全局变量不共享,而对于多线程,全局变量是可以共享的,因为进程是拥有资源的最小单位,而线程是共享其所属进程的资源,线程自己字拥有执行所必不可少的一点点资源。也因此,线程之间的通信比进程之间的通信方便

但是线程对全局变量的共享也会出现问题,如下:

from threading import Thread
import time

g_num = 0

def work1():
    global  g_num
    for _ in range(1000000):
        g_num += 1
    print("g_num is {}".format(g_num))

print("线程创建之前,g_num is {}".format(g_num))

t1 = Thread(target=work1)
t1.start()

# time.sleep(1)   #睡眠1s,保证t1执行完毕

t2 = Thread(target=work1)
t2.start()
'''本次运行结果如下:(进行了2000000次加法,但是结果并不是2000000)
线程创建之前,g_num is 0
g_num is 1153259
g_num is 1247721
'''

ps:对于变量,线程除了可以以全局变量的形式共享,还可以以参数的形式共享(在Thread ()中以args = ()的形式传递

对于以上代码运行结果不是2000000的解析:
在线程执行g_num += 1的时候,实际上是g_num = g_num + 1,先取值加1,然后赋值,一共两步。而在多线程的时候,由于cpu的调度,一个线程中的这两步可能会被打断,所以运行结果不为2000000

原子性:一段代码,要么不执行,要么就直接执行完,不允许被打断

如何保证代码执行的原子性——互斥锁

threading模块中定义了Lock锁,可以方便的处理锁定:

import threading
mutex = threading.Lock()    #创建锁
mutex.acquire([blockign])   #获得锁:锁定
mutext.release()    #释放锁

互斥锁的应用:acquire/release

from threading import Thread,Lock
import time,threading
g_num = 0

def work1():
#   print("%s"%threading.current_thread().name) #此句可以输出当前线程的名称
    global  g_num
    mutex.acquire() #对g_num操作前上锁,如果一方获得了锁,另一方如果还要获得锁,就必须阻塞(一直等待),直到另一方释放这个锁
    for _ in range(1000000):
        g_num += 1
    print("g_num is {}".format(g_num))
    mutex.release() #对g_num的操作完毕后,释放锁,以让其他人可以获得锁从而对g_num进行操作
    #ps:把锁的释放放在for外面,相当于把多线程硬生生弄成了单线程,如果只是要最后的结果是2000000,则可以把锁的释放法在for里面,紧跟g_num += 1。其实通常是能不加的代码就不加(即加锁的地方尽可能小)



print("线程创建之前,g_num is {}".format(g_num))
mutex = Lock()
t1 = Thread(target=work1)
t1.start()

# time.sleep(1)   #睡眠1s,保证t1执行完毕

t2 = Thread(target=work1)
t2.start()

'''运行结果如下:
线程创建之前,g_num is 0
g_num is 1000000
g_num is 2000000
'''

`ps:等待解锁的方式:通知,而不是轮询

互斥锁的应用:with

import threading
import time
g_num = 0

lock = threading.Lock()

def work1(num):
    global g_num
    with lock:
        for i in range(num):
            time.sleep(0.01)
            g_num += 1
    print("work1, g_num is %d"%g_num)

def work2(num):
    global g_num
    try:
        lock.acquire()  #获得对数据的封锁
        for i in range(num):
            time.sleep(0.01)
            g_num += 1
    finally:
        lock.release()  #释放对数据的封锁。acquire和release与with语句效果相同
    print("work2, g_num is %d"%g_num)

print("---线程创建之前,g_num is %d---"%g_num)
t1 = threading.Thread(target=work1,args=(100,))
t1.start()
t2 = threading.Thread(target=work2,args=(100,))
t2.start()

while len(threading.enumerate())  != 1:
    time.sleep(2)
print("---线程操作之后,g_num is %d---"%g_num)

#加锁之后输出结果为:
# ---线程创建之前,g_num is 0---
# work1, g_num is 100
# work2, g_num is 200
# ---线程操作之后,g_num is 200---

对于互斥锁,通常是对值进行修改时才加锁,不修改的话不用加锁

多线程使用非共享变量(函数里面的变量)

即当多个线程所用的代码相同时,其中变量的情况如何

from threading import Thread
import threading,time

def test():
    name = threading.current_thread().name
    print("Thread name is :{}".format(name))
    num = 100
    if name == "Thread-1":
        num += 1
    else:
        time.sleep(2)
        
    print("Thread is {}, num is {}".format(name, num))

t1 = Thread(target=test)
t1.start()

t2 = Thread(target=test)
t2.start()
'''运行结果如下:
Thread name is :Thread-1
Thread is Thread-1, num is 101
Thread name is :Thread-2
Thread is Thread-2, num is 100
'''
#说明虽然两个线程都是到同一个函数里面执行,但是他们函数里面的数据“各人是各人的”,互不影响,所以不需要加锁。而全局变量是公用的

线程间使用Queue通信

from queue import Queue
import queue
import threading
import time

q = Queue(maxsize=10)   #队列的最大容量为10

def producer():
    for i in range(10):
        q.put(i)

def customer():
    for i in range(10):
        data = q.get()
        print(data, end=" ")

t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=customer)


t1.start()
t2.start()

#输出结果如下:
# 0 1 2 3 4 5 6 7 8 9

# ps:stack = queue.LifoQueue()   #栈
# ps:队列的其他属性
# q.empty()
# q.full()
# q.maxsize
# q.qsize()

线程池

线程池中线程的创建、执行、销毁都由线程池自己执行

线程池的基类是 concurrent.futures 模块中d Executor,Executor 提供了两个子类,即ThreadPoolExecutor 和ProcessPoolExecutor,其中ThreadPoolExecutor 用于创建线程池,而 ProcessPoolExecutor 用于创建进程池。

Exectuor 提供了如下常用方法:

  • submit(fn, *args, kwargs):将 fn 函数提交给线程池。args 代表传给 fn 函数的参数,kwargs 代表以关键字参数的形式为 fn 函数传入参数。
  • map(func, *iterables, timeout=None, chunksize=1):该函数类似于全局函数 map(func, *iterables),只是该函数将会启动多个线程,以异步方式立即对 iterables 执行 map 处理。
  • shutdown(wait=True):关闭线程池。

在用完一个线程池后,应该调用该线程池的 shutdown() 方法,该方法将启动线程池的关闭序列。调用 shutdown() 方法后的线程池不再接收新任务,但会将以前所有的已提交任务执行完成。当线程池中的所有任务都执行完成后,该线程池中的所有线程都会死亡。

  • 使用线程池的步骤
  1. 调用 ThreadPoolExecutor 类的构造器创建一个线程池。
  2. 定义一个普通函数作为线程任务。
  3. 调用 ThreadPoolExecutor 对象的 submit() 方法来提交线程任务。
  4. 当不想提交任何任务时,调用 ThreadPoolExecutor 对象的 shutdown() 方法来关闭线程池。
import threading
import time

def add(n1, n2):
    v = n1 + n2
    time.sleep(n1)

    return v

import concurrent.futures as futures

ex = futures.ThreadPoolExecutor(max_workers = 3)    #创建线程池并设置线程池容量
#ps:futures的意义:结果要未来才能获得
f1 = ex.submit(add, 2, 3)   #创建线程并提交到线程池(ps:提交后线程即开始执行),返回值为一个Future对象
f2 = ex.submit(add, 2, 2)

print(f1.done())    #判断线程是否执行结束。输出False,因为线程还没有执行完
print(f1.result())  #获得线程的执行结果。输出5,(通常是程序执行完才有返回值,故这里可以用于阻塞线程,但是也可以对result指定timeout参数。
ex.shutdown()

ps:关于Future的简单理解:由于线程任务会在新线程中以异步方式执行,因此,线程执行的函数相当于一个“将来完成”的任务,所以 Python 使用 Future 来代表。

Future对象

Future 提供了如下方法:

  • cancel():取消该 Future 代表的线程任务。如果该任务正在执行,不可取消,则该方法返回 False;否则,程序会取消该任务,并返回 True。
  • cancelled():返回 Future 代表的线程任务是否被成功取消。
  • running():如果该 Future 代表的线程任务正在执行、不可被取消,该方法返回 True。
  • done():如果该 Funture 代表的线程任务被成功取消或执行完成,则该方法返回 True。
  • result(timeout=None):获取该 Future 代表的线程任务最后返回的结果。如果 Future 代表的线程任务还未完成,该方法将会阻塞当前线程,其中 timeout 参数指定最多阻塞多少秒。
  • exception(timeout=None):获取该 Future 代表的线程任务所引发的异常。如果该任务成功完成,没有异常,则该方法返回 None。
  • add_done_callback(fn):为该 Future 代表的线程任务注册一个“回调函数”,当该任务成功完成时,程序会自动触发该 fn 函数。

获取执行结果

  1. 用Future的result()方法:但是该方法会阻塞当前主线程,只有等到当前任务完成后,result()方法的阻塞才会被解除
  2. 通过Future的add_done_callback()方法来添加回调函数,该回调函数形如fn(future)。当线程任务完成后,程序会自动触发该回调函数,并将对应的Future对象作为参数传给该回调函数
from concurrent.futures import ThreadPoolExecutor
import threading
import time

# 定义一个准备作为线程任务的函数
def action(max):
    my_sum = 0
    for i in range(max):
        print(threading.current_thread().name + ' ' + str(i))
        my_sum += i
    return my_sum
# 创建一个包含2条线程的线程池
with ThreadPoolExecutor(max_workers=2) as pool:
    # 向线程池提交一个task, 50会作为action()函数的参数
    future1 = pool.submit(action, 50)
    # 向线程池再提交一个task, 100会作为action()函数的参数
    future2 = pool.submit(action, 100)
    def get_result(future):
        print(future.result())
    # 为future1添加线程完成的回调函数
    future1.add_done_callback(get_result)
    # 为future2添加线程完成的回调函数
    future2.add_done_callback(get_result)
    print('--------------')



多线程的应用:下载图片

import os
import random
import time
import requests
import concurrent.futures as futures


def download_img(url):
    resp = requests.get(url)    #获得链接指向文件的内容
    filename = os.path.split(url)[1]  # 把路径分为目录名和文件名,[1]为文件名
    with open(filename, "wb+") as f:
        f.write(resp.content)   #写入文件
    num = random.randint(0,5)
    time.sleep(num)
    print(num)
    return filename


# 链接的获得:网络上随便点击一张图片,右击:查看图片
urls = ["http://pic118.huitu.com/res/20190420/1480621_20190420132348580020_1.jpg",
        "https://img.ivsky.com/img/tupian/pre/201811/19/jiguang-008.jpg"]
ex = futures.ThreadPoolExecutor(max_workers=2)  #创建线程池
res_iter = ex.map(download_img, urls)   #以第二个参数为第一个参数(函数)的参数

"""
print(type(res_iter))   #发现其是一个生成器
for res in res_iter:
    print(type(res))
    help(res)
"""
fu_tasks = [ex.submit(download_img, url) for url in urls]

"""
print(type(fu_tasks[0]))
for future in futures.as_completed(fu_tasks):   #as_completed没有顺序,谁先完成就先返回谁
    print(future.result())
"""

ThreadLocal对象在线程中的应用

传参问题

前奏-其一

  • 函数
    • 传参
    • 全局变量
    • 返回值

前奏-其二

-使用全局字典
用一个全局字典dict存放所有的对象,然后以thread自身作为key获得线程对应的对象(以Studetn为例)

global_dict = {}
def std_thread(name):
    std = Student(name) #当多个线程来执行std_thread的时候,各自得到各自的Student对象
    #把std放到全局变量global_dict中;键值也可以使用
    global_dict[threading.current_thread()] = std   #当多个线程来执行这句话时,由于键值不同,能够取出不同的值
    do_task1()
    do_task2()
def do_task1():
    #不传入std,而是根据当前线程查找,故也可以取出自己想要的值。在没有传参的情况下保证了多个线程使用同一个全局变量来时候并没有出错
    std = global_dict[threading.current_thread()]   #键值也可以使用threading.current_thread().name
    ...
def do_task2():
    #不传入std,而是根据当前线程查找:
    std = global_dict[threading.current_thread()]
    ...

这种方式理论上是可行的,它最大的有点是消除了std对象在每层函数中的传递问题,但是,每个函数获得std的代码有点low

其三:使用ThreadLocal

import threading

local_school = threading.local()    #创建一个ThreadLocal对象

def process_student():
    #获取当前进程相关的student
    std = local_school.student  #取出在process_thread中赋予的属性

    print("Hlello, {} in {}".format(std, threading.current_thread().name))

def process_thread(name):
    #绑定ThreadLocal的studetn
    local_school.student = name #给对象添加属性student,对于ThreadLocal对象的同一个属性,在一个线程中设置的是哪个值,到时候取出的就是哪个值,不会因为线程不一样导致同一属性的值不一样(即:对于同一个属性,在不同线程中的操作是互不影响的)
    process_student()

t1 = threading.Thread(target=process_thread, args=("biubiu~",), name = "Thread_A")
t2 = threading.Thread(target=process_thread, args=("老王",), name = "Thread-B")

t1.start()
t2.start()

t1.join()
t2.join()
'''运行结果如下:
Hlello, biubiu~ in Thread_A
Hlello, 老王 in Thread-B

'''
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,163评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,301评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,089评论 0 352
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,093评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,110评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,079评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,005评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,840评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,278评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,497评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,667评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,394评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,980评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,628评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,796评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,649评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,548评论 2 352

推荐阅读更多精彩内容

  • 线程 操作系统线程理论 线程概念的引入背景 进程 之前我们已经了解了操作系统中进程的概念,程序并不能单独运行,只有...
    go以恒阅读 1,641评论 0 6
  • Swift1> Swift和OC的区别1.1> Swift没有地址/指针的概念1.2> 泛型1.3> 类型严谨 对...
    cosWriter阅读 11,096评论 1 32
  • 线程池ThreadPoolExecutor corepoolsize:核心池的大小,默认情况下,在创建了线程池之后...
    irckwk1阅读 724评论 0 0
  • Java-Review-Note——4.多线程 标签: JavaStudy PS:本来是分开三篇的,后来想想还是整...
    coder_pig阅读 1,646评论 2 17
  • 1.1 多线程介绍 学习多线程之前,我们先要了解几个关于多线程有关的概念。 进程:进程指正在运行的程序。确切的来说...
    Pecksniff1994阅读 1,553评论 0 2