并发编程 — — 多线程

# -*- coding: utf-8 -*-
# @Time    : 2019/11/28 16:42
# @Author  : John
# @Email   : 2398344557@qq.com
# @File    : 111.py
# @Software: PyCharm

当使用单线程时,耗费时间特长

【例】单线程

import time
def run(n):
    print("task ", n)
    time.sleep(2)

start_time = time.time()
run("1")
run("2")
print(f'执行所消耗的时间:{time.time() - start_time}秒')

# task  1
# task  2
# 执行所消耗的时间:4.00098729133606秒

执行上面这个程序要4秒,如果用多线程的话,函数“同时执行”,只需一半的时间即可!!!因此我们要引入多线程概念 ==>

【例】启动多个线程(函数方式)

import threading
import time

def run(n):
    print(f'任务{n}')
    time.sleep(2)

# run('1')
# run('2')

start = time.time()

# t1和t2同时执行
t1 = threading.Thread(target=run, args=('1',))
t2 = threading.Thread(target=run, args=('2',))
t1.start()
t2.start()

# 阻塞作用,等t1和t2执行完毕后再执行下面的内容
t1.join()
t2.join()

print('finished')
end = time.time()
print(f'此程序执行时间为:{end-start}秒')

# 任务1
# 任务2
# finished
# 此程序执行时间为:2.001213788986206秒

【例】启动多个线程(类方式)

import threading
import time


class MyThread(threading.Thread):
    def __init__(self, n, sleep_time):
        super().__init__()
        self.n = n
        self.sleep_time = sleep_time

    def run(self):
        print(f'任务{self.n}开始')
        time.sleep(self.sleep_time)
        print(f'任务{self.n}结束了')


t1 = MyThread(1, 1)    # 传入n=1,sleep_time=1
t2 = MyThread(2, 2)    # 传入n=2,sleep_time=2

t1.start()
t2.start()

t1.join()
t2.join()

print('finished')

# 任务1开始
# 任务2开始
# 任务1结束了
# 任务2结束了
# finished

【例】同时启动20个线程

import threading
import time
def run(n):
    print(f'任务{n}开始')
    time.sleep(2)
    print(f'任务{n}结束了')

start_time = time.time()
for i in range(10):
    t = threading.Thread(target=run, args=(i,))
    t.start()

print("----------all threads has finished...")
print("消耗的时间:", time.time() - start_time)

# 结果略

# 分析:
# 主线程直接结束,没有等子线程,2s后子线程分别task done
# 代码共有51个线程,一个主线程与50个子线程,主线程无法计算子线程执行时间
# 因此,我们需要设置主线程等待子线程执行结束,通过一个临时列表,在线程启动后分别join等待,子线程分别结束后,结束主进程,计算耗时约2.011415958s

【例】计算所有线程执行时间

import threading
import time

def run(n):
    print(f'任务{n}')
    time.sleep(2)
    print(f'任务{n}已完成')

start_time = time.time()
t_objs = []  # 存线程实例
for i in range(10):
    t = threading.Thread(target=run, args=(i,))
    t.start()
    t_objs.append(t)  # 为了不阻塞后面线程的启动,不在这里join,先放到一个列表里

for t in t_objs:
    print(t.is_alive())

for t in t_objs:  # 循环线程实例列表,等待所有线程执行完毕
    t.join()

for t in t_objs:
    print(t.is_alive())    # is_alive()方法可以用来判断一个线程是否结束,返回True或False

print("----------all threads has finished...")
print("消耗的时间:", time.time() - start_time)

结果如下:

任务0
任务1
任务2
任务3
任务4
任务5
任务6
任务7
任务8
任务9True
True
True

True
True
True
True
True
True
True
任务1已完成任务2已完成任务3已完成

任务0已完成

任务7已完成任务6已完成
任务5已完成
任务9已完成
任务4已完成

任务8已完成
False
False
False
False
False
False
False
False
False
False
----------all threads has finished...
消耗的时间: 2.12958025932312

【例】根据当前线程(Thread)活着的数量来查看线程生命周期

import threading
import time

def sing():
    for i in range(3):
        print('正在唱歌。。。。%d' % i)
        # time.sleep(random.random()*3)
        time.sleep(1)

def dance():
    for i in range(3):
        print('正在跳舞。。。。%d' % i)
        # time.sleep(random.random()*5)
        time.sleep(1)

if __name__=='__main__':
    # print('晚会开始:%s'%time.time())返回的是长串的时间戳
    print('晚会开始:%s' % time.ctime())
    t1 = threading.Thread(target=sing)
    t2 = threading.Thread(target=dance)
    t1.start()
    t2.start()

    while True:
        length = len(threading.enumerate())    # 枚举返回个列表
        print('当前运行的线程数为:%d' % length)
        time.sleep(0.7)
        if length <= 1:
            break

结果如下:

晚会开始:Sun Dec  8 17:12:54 2019
正在唱歌。。。。0
正在跳舞。。。。0
当前运行的线程数为:3
当前运行的线程数为:3
正在唱歌。。。。1
正在跳舞。。。。1
当前运行的线程数为:3
正在唱歌。。。。2
正在跳舞。。。。2
当前运行的线程数为:3
当前运行的线程数为:3
当前运行的线程数为:1

多线程互斥锁

作用:为了防止不同的线程访问同一共享资源造成混乱

import threading
# 生成锁对象,全局唯一

lock = threading.Lock()
# 获取锁。未获取到会阻塞程序,直到获取到锁才会往下执行
lock.acquire()
# 释放锁,归回倘,其他人可以拿去用了
lock.release()

# 注:lock.acquire() 和 lock.release()必须成对出现,否则就有可能造成死锁

# 可以使用使用上下文管理器来加锁(常用)
# with 语句会在这个代码块执行前自动获取锁,在执行结束后自动释放锁
import threading
lock = threading.Lock()
with lock:
    pass

不上锁的时候,对少量数据的修改有一定的作用;
但对大量数据的修改,不上锁的话会出现资源竞争问题,从而数据结果会不正确

【例】测试1:未上锁前,用多线程对全局变量进程修改

import threading
import time

g_num = 0

def work1(num):
    global g_num
    for i in range(num):
        g_num += 1
    print("----in work1, g_num is %d---" % g_num)

def work2(num):
    global g_num
    for i in range(num):
        g_num += 1
    print("----in work2, g_num is %d---" % g_num)

print("---线程创建之前g_num is %d---" % g_num)

t1 = threading.Thread(target=work1, args=(100,))
t1.start()

t2 = threading.Thread(target=work2, args=(100,))
t2.start()

while len(threading.enumerate()) != 1:
    time.sleep(1)

print("2个线程对同一个全局变量操作之后的最终结果是:%s" % g_num)
# ---线程创建之前g_num is 0---
# ----in work1, g_num is 100---
# ----in work2, g_num is 200---
# 2个线程对同一个全局变量操作之后的最终结果是:200

【例】测试2:未上锁前,用多线程对全局变量进程修改

import threading
import time

g_num = 0

def work1(num):
    global g_num
    for i in range(num):
        g_num += 1
    print("----in work1, g_num is %d---" % g_num)

def work2(num):
    global g_num
    for i in range(num):
        g_num += 1
    print("----in work2, g_num is %d---" % g_num)

print("---线程创建之前g_num is %d---"%g_num)

t1 = threading.Thread(target=work1, args=(1000000,))
t1.start()

t2 = threading.Thread(target=work2, args=(1000000,))
t2.start()

while len(threading.enumerate()) != 1:
    time.sleep(1)

print("2个线程对同一个全局变量操作之后的最终结果是:%s" % g_num)
# ---线程创建之前g_num is 0---
# ----in work1, g_num is 1208926---
# ----in work2, g_num is 1264177---
# 2个线程对同一个全局变量操作之后的最终结果是:1264177

死锁

在线程间共享多个资源的时候,如果两个线程分别占有一部分资源并且同时等待对方的资源,就会造成死锁,一旦发生就会造成应用的停止响应。

【例】死锁

#coding=utf-8
import threading
import time

class MyThread1(threading.Thread):
    def run(self):
        # 对mutexA上锁
        mutexA.acquire()

        # mutexA上锁后,延时1秒,等待另外那个线程 把mutexB上锁
        print(self.name+'----do1---up----')
        time.sleep(1)

        # 此时会堵塞,因为这个mutexB已经被另外的线程抢先上锁了
        mutexB.acquire()
        print(self.name+'----do1---down----')
        mutexB.release()

        # 对mutexA解锁
        mutexA.release()

class MyThread2(threading.Thread):
    def run(self):
        # 对mutexB上锁
        mutexB.acquire()

        # mutexB上锁后,延时1秒,等待另外那个线程 把mutexA上锁
        print(self.name+'----do2---up----')
        time.sleep(1)

        # 此时会堵塞,因为这个mutexA已经被另外的线程抢先上锁了
        mutexA.acquire()
        print(self.name+'----do2---down----')
        mutexA.release()

        # 对mutexB解锁
        mutexB.release()

mutexA = threading.Lock()
mutexB = threading.Lock()

if __name__ == '__main__':
    t1 = MyThread1()
    t2 = MyThread2()
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print('done')
# Thread-1----do1---up----
# Thread-2----do2---up----
# (程序没有结束)

# 分析:
# 对mutexA和mutexB同时上锁,但没有释放他们时,那就不能对他们继续访问,他们现在是阻塞状态。

GIL(全局锁)

  • 区分多进程和多线程:
    (1)多进程是真正意义上的并行,
    (2)而多线程只是伪并行(交替执行)

  • 多线程伪并行(交替执行)的原因:
    GIL(Global Interpreter Lock,全局解释器锁)

  • 分析:
    任何Python线程执行前,必须先获得GIL锁,
    然后每执行100条字节码时,解释器就会自动释放GIL锁,让别的线程有机会执行。
    而这个GIL全局锁实际上把所有线程的执行代码都上锁了。
    因此,多线程在Python中只能交替执行,
    即使100个线程抛在100核CPU上,也只能用到1个核。

  • 注:
    GIL并不是Python的特性,它是实现Python解释器(CPython)时所引入的一个概念。
    而Python解释器并不只有CPython,还有PyPy,Psyco,JPython,IronPython等

  • 我们通常认为Python == CPython,所以也就默许了Python具有GIL锁

  • GIL影响性能,如何避免受到GIL影响?
    (1)使用多线程代替多线程(常用)
    (2)更换Python解释器,不使用CPython(不怎么用,因为CPthon挺好用的,真香~~)

Queue队列

控制线程的触发执行 — — 用Queue队列

格式:

from queue import  Queue
q = Queue(maxsize=0)    # maxsize默认为0,此时队列长度不受限

q.get()    # 等待队列信息
q.get(timeout=5)    # 设置超时时间,时间到之后执行其他
q.put()    # 发送信息
q.join()    # 等待所有的消息被消费完

【例】生产者消费者

import random
import threading
import time
import queue

q = queue.Queue(maxsize=10)    # 设置队列的最大长度为10


# 生产者
def producer(name):
    count = 1
    while True:
        q.put("雪碧%s" % count)
        print("[Timmy]生产了雪碧", count)
        count += 1
        time.sleep(random.randrange(3))


# 消费者
def consumer(name):
    while True:
        print("[%s]取到[%s]并且喝了它..." % (name, q.get()))
        time.sleep(random.randrange(5))


p = threading.Thread(target=producer, args=("Timmy",))
c1 = threading.Thread(target=consumer, args=("King",))
c2 = threading.Thread(target=consumer, args=("Wang",))

p.start()
c1.start()
c2.start()

结果如下:

[Timmy]生产了雪碧 1
[Timmy]生产了雪碧 2
[King]取到[雪碧1]并且喝了它...
[Wang]取到[雪碧2]并且喝了它...
[Timmy]生产了雪碧 3
[Wang]取到[雪碧3]并且喝了它...
[Timmy]生产了雪碧 4
[King]取到[雪碧4]并且喝了它...
[Timmy]生产了雪碧 5
[Wang]取到[雪碧5]并且喝了它...
[Timmy]生产了雪碧 6
[Timmy]生产了雪碧 7
[Wang]取到[雪碧6]并且喝了它...
[Wang]取到[雪碧7]并且喝了它...
# (程序没有结束)

线程池

存储线程,需要用的时候调出线程
跑完任务之后,这些线程不会被销毁,而是返回到线程池等待下一次任务

  • submit():返回一个future对象
  • future对象:在未来的某一时刻完成操作的对象

【例】创建线程池

import time
from concurrent.futures.thread import ThreadPoolExecutor

# 线程执行的函数
def add(n1,n2):
    v = n1 + n2
    print('add :', v, ', tid:', threading.currentThread().ident)
    time.sleep(n1)
    return v

# 通过submit把需要执行的函数扔进线程池中.
# submit 直接返回一个future对象
ex = ThreadPoolExecutor(max_workers=3)    # 制定最多运行N个线程
f1 = ex.submit(add, 2, 3)
f2 = ex.submit(add, 2, 2)
print('main thread running')
print(f1.done())    # future对象名.done():看看任务结束了没
print(f2.done())
print(f1.result())    # future对象名.result():获取结果 ,阻塞方法
print(f2.result())
print(f2.done())
print(f1.done())
# add : 5 , tid: 6324
# add : 4 , tid: 6712
# main thread running
# False
# False
# 5
# 4
# True
# True
  • map():返回是跟你提交序列是一致的,是有序的

【例】map()的使用

import requests
from concurrent.futures.thread import ThreadPoolExecutor

URLS = ['http://www.sina.com.cn', 'http://www.baidu.com', 'http://www.qq.com']

def get_html(url):
    print('thread id:', threading.currentThread().ident, ' 访问了:', url)
    return requests.get(url)    # 这里使用了requests 模块

ex = ThreadPoolExecutor(max_workers=3)    # 制定最多运行3个线程
# thread id: 5596  访问了: http://www.sina.com.cn
# thread id: 8968  访问了: http://www.baidu.com
# thread id: 4316  访问了: http://www.qq.com

# sumbit()函数:
lst = []
for i in range(3):
    f = ex.submit(get_html, URLS[i])    # f为future对象,提交一个任务,放入线程池中,准备执行
    lst.append(f)
print(lst)
# [<Future at 0x1330990 state=running>, <Future at 0x3015370 state=running>, <Future at 0x3015870 state=running>]

# map()函数:
res_iter = ex.map(get_html, URLS)    # 返回生成器res_iter
for res in res_iter:
    print(res.url)
# 分别获取sina、baidu、qq的网页网址
# https://www.sina.com.cn/
# http://www.baidu.com/
# https://www.qq.com/
for res in res_iter:
    print(res.text)
# 分别获取sina、baidu、qq的网页源代码
# 略

# - as_completed()函数:
from concurrent.futures import as_completed
    # 下面用到的f,是上面第一个循环里面的futrue对象
    for future in as_completed([f]):    # as_completed()接受一个可迭代的Future序列,返回一个生成器,在完成或异常时返回这个Future对象
        print('一个任务完成。')
        print(future.result())
# thread id: 5484  访问了: http://www.sina.com.cn
# 一个任务完成。
# <Response [200]>
# thread id: 5484  访问了: http://www.baidu.com
# 一个任务完成。
# <Response [200]>
# thread id: 10280  访问了: http://www.qq.com
# 一个任务完成。
# <Response [200]>

【例】as_completed 完整的例子

# as_completed 返回一个生成器,用于迭代, 一旦一个线程完成(或失败) 就返回
import time
import requests
from concurrent.futures.thread import ThreadPoolExecutor
from concurrent.futures import as_completed

URLS = ['http://www.baidu.com', 'http://www.qq.com', 'http://www.sina.com.cn']
def get_html(url):
    time.sleep(1)
    print('thread id:', threading.currentThread().ident,' 访问了:', url)
    return requests.get(url)    # 这里使用了requests 模块
ex = ThreadPoolExecutor(max_workers=3)    # 最多3个线程
future_tasks = [ex.submit(get_html, url) for url in URLS]    #创建3个future对象
for future in as_completed(future_tasks):    # 迭代生成器
    try:
        resp = future.result()
    except Exception as e:
        print('%s' % e)
    else:
        print('%s has %d bytes!' % (resp.url, len(resp.text)))

# thread id: 4912  访问了: http://www.sina.com.cn
# thread id: 9060  访问了: http://www.qq.com
# thread id: 5644  访问了: http://www.baidu.com
# http://www.baidu.com/ has 2381 bytes!
# https://www.qq.com/ has 223356 bytes!
# https://www.sina.com.cn/ has 543959 bytes!

wait:阻塞函数

第一个参数和as_completed一样,一个可迭代的future序列,返回一个元组,包含2个set,一个完成的,一个未完成的

【例】wait()函数的使用

import time
import requests
from concurrent.futures import wait, as_completed, FIRST_COMPLETED, ALL_COMPLETED
from concurrent.futures.thread import ThreadPoolExecutor
import threading
from concurrent import futures

URLS = ['http://www.baidu.com', 'http://www.qq.com', 'http://www.sina.com.cn']
def get_html(url):
    """
    FIRST_COMPLETED    当任何未来完成或被取消时,该函数将返回。
    FIRST_EXCEPTION    当任何未来通过提出异常完成时,函数将返回。如果没有未来引发异常,那么它等同于 ALL_COMPLETED。
    ALL_COMPLETED(默认)      当所有期货完成或被取消时,函数将返回。
    :param url:
    :return:
    """
    time.sleep(1)
    print('thread id:', threading.currentThread().ident, ' 访问了:', url)
    return requests.get(url)    # 这里使用了requests 模块
ex = ThreadPoolExecutor(max_workers=3)    # 最多3个线程
future_tasks = [ex.submit(get_html, url) for url in URLS]    # 创建3个future对象
try:
    result = wait(future_tasks, return_when=futures.ALL_COMPLETED)
    done_set = result[0]
    for future in done_set:
        resp = future.result()
        print('第一个网页任务完成 url:%s , len:%d bytes! ' % (resp.url, len(resp.text)))
except Exception as e:
    print('exception :', e)

# return_when=futures.FIRST_COMPLETED
# thread id: 11404  访问了: http://www.sina.com.cn
# thread id: 10196  访问了: http://www.baidu.com
# thread id: 2588  访问了: http://www.qq.com
# 第一个网页任务完成 url:http://www.baidu.com/ , len:2381 bytes!

# return_when=futures.FIRST_EXCEPTION
# thread id: 4776  访问了: http://www.sina.com.cn
# thread id: 12104  访问了: http://www.baidu.com
# thread id: 10692  访问了: http://www.qq.com
# 第一个网页任务完成 url:https://www.sina.com.cn/ , len:544154 bytes!
# 第一个网页任务完成 url:https://www.qq.com/ , len:223122 bytes!
# 第一个网页任务完成 url:http://www.baidu.com/ , len:2381 bytes!

# return_when=futures.ALL_COMPLETED
# thread id: 4612  访问了: http://www.sina.com.cn
# thread id: 11460  访问了: http://www.qq.com
# thread id: 7744  访问了: http://www.baidu.com
# 第一个网页任务完成 url:http://www.baidu.com/ , len:2381 bytes!
# 第一个网页任务完成 url:https://www.sina.com.cn/ , len:544154 bytes!
# 第一个网页任务完成 url:https://www.qq.com/ , len:223122 bytes!

回调函数:add_done_callback(fn)

import os, sys, time, requests, threading
from concurrent import futures

URLS = [
        'http://baidu.com',
        'http://www.qq.com',
        'http://www.sina.com.cn'
        ]

def load_url(url):
    print('tid:', threading.currentThread().ident, ',url:', url)
    with requests.get(url) as resp:
        return resp.content
def call_back(obj):
    print('->>>>>>>>>call_back , tid:', threading.currentThread().ident, ',obj:', obj)

with futures.ThreadPoolExecutor(max_workers=3) as ex:
    # mp = {ex.submit(load_url,url) : url for url in URLS}
    mp = dict()
    for url in URLS:
        f = ex.submit(load_url, url)
        mp[f] = url
        f.add_done_callback(call_back)
    for f in futures.as_completed(mp):
        url = mp[f]
        try:
            data = f.result()
        except Exception as exc:
            print(exc, ',url:', url)
        else:
            print('url:', url, ',len:', len(data), ',data[:20]:', data[:20])

# tid: 6036 ,url: http://baidu.com
# tid: 6044 ,url: http://www.qq.com
# tid: 3864 ,url: http://www.sina.com.cn
# ->>>>>>>>>call_back , tid: 6036 ,obj: <Future at 0x2f076d0 state=finished returned bytes>
# url: http://baidu.com ,len: 81 ,data[:20]: b'<html>\n<meta http-eq'
# ->>>>>>>>>call_back , tid: 6044 ,obj: <Future at 0x30158b0 state=finished returned bytes>
# url: http://www.qq.com ,len: 237492 ,data[:20]: b'<!doctype html>\n<htm'
# ->>>>>>>>>call_back , tid: 3864 ,obj: <Future at 0x3015c70 state=finished returned bytes>
# url: http://www.sina.com.cn ,len: 544154 ,data[:20]: b'<!DOCTYPE html>\n<!--'
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容