python并发编程:多线程

并发编程之多线程

python中的并发编程,一个是协程,另一个就是多线程了。它们都用于IO操作频繁的场景。

基于Thread的多线程

python3提供了一个内置模块thread.Thread可以方便我们创建多线程。可以在函数中创建,也可以用类继承Thread.

Thread有几个重要的方法,start(),run(),join()

start()

开始线程活动。

它在一个线程里最多只能被调用一次。它安排对象的 run() 方法在一个独立的控制进程中调用。

如果同一个线程对象中调用这个方法的次数大于一次,会抛出 RuntimeError 。

run()

代表线程活动的方法。

你可以在子类型里重载这个方法。 标准的 run() 方法会对作为 target 参数传递给该对象构造器的可调用对象(如果存在)发起调用,并附带从 args 和 kwargs 参数分别获取的位置和关键字参数。

join(timeout)

等待,直到线程终结。这会阻塞调用这个方法的线程,直到被调用 join() 的线程终结 -- 不管是正常终结还是抛出未处理异常 -- 或者直到发生超时,超时选项是可选的。

参考文档、文章:

https://docs.python.org/zh-cn/3/library/threading.html

https://www.cnblogs.com/wongbingming/p/9028851.html

在函数中用Thread创建线程。

Thread有三个重要的参数,target和args、kwargs

target:函数名,即指向线程运行时所执行的函数。

args:target函数的参数,是一个元组。is the argument tuple for the target invocation. Defaults to ().

kwargs:target函数参数,是一个字典。is a dictionary of keyword arguments for the target>

import time
from threading import Thread

def recv_msg(msg,n):
    print("got a msg:{}\n".format(msg))
    #开始处理消息
    time.sleep(2)
    print("process msg end:{}".format(msg))
    
    
def main():
    thread_1 = Thread(target=recv_msg,args=('Hello',10000000))
    thread_1.start()
    
    thread_2 = Thread(target=recv_msg,args=('World',100000000))
    thread_2.start()
    
    thread_1.join()
    thread_2.join()
    
start = time.perf_counter()
main() 
end = time.perf_counter()
print("spend time:{}".format(end-start))
got a msg:Hello

got a msg:World

process msg end:Hello
process msg end:World
spend time:2.006348734999847

join会阻塞主线程,这有当join的子线程执行完了,才会往下执行,按理我们只需join最长的一个线程即可,但很多情况下,我们是不知道哪个线程会运行最长的。
全部join,最后运行的时间也是最长的线程所花费的时间。但是需要保证所有线程都已经start了。如果写成:
thread1.start()
thread1.join()
thread2.start()
thread2.join()
这样线程就没什么用了。

import time
from threading import Thread


global global_a
def thread_sleep(n):
    print("thread sleep {}s\n".format(n))
    time.sleep(n)
    global global_a
    global_a = n
    print("sleep {}s end\n".format(n))
    
    
def main():
    thread_1 = Thread(target=thread_sleep,args=(2,))
    thread_2 = Thread(target=thread_sleep,args=(5,))
    thread_3 = Thread(target=thread_sleep,args=(10,))
    
    thread_1.start()
    thread_2.start()
    thread_3.start()
    print("start join 1")
    thread_1.join()
    print("start join 2")
    thread_2.join()
    print("start join 3")
    thread_3.join()
    
    
start = time.perf_counter()
main() 
end = time.perf_counter()
print("spend time:{}".format(end-start))
print(global_a)

#输出
   thread sleep 2s
    
    thread sleep 5s
    
    thread sleep 10s
    start join 1
    
    sleep 2s end
    
    start join 2
    sleep 5s end
    
    start join 3
    sleep 10s end
    
    spend time:10.003887921000569
    10

自定义线程类,继承Thread,需要重写run()方法

import time
from threading import Thread
class MyThread(Thread):
    def __init__(self,name):
        super().__init__()
        self.name = name
        
    def run(self):
        print("run {}\n".format(self.name))
        self.count = 0
        time.sleep(1)
        for i in range(1000000):
            self.count += i
        print("result is :{}\n".format(self.count))
        
def main():
    thread_1 = MyThread("hello")
    thread_2 = MyThread("world")
    
    thread_1.start()
    thread_2.start()
    thread_1.join()
    thread_2.join()
    
main()


#输出结果:
 run hello
    
    run world
    
    result is :499999500000
    
    result is :499999500000

基于Futures的多线程

参考文章、文档:
https://time.geekbang.org/column/article/102562

https://docs.python.org/3/library/concurrent.futures.html#future-objects

https://python-parallel-programmning-cookbook.readthedocs.io/zh_CN/latest/chapter4/02_Using_the_concurrent.futures_Python_modules.html

Python 中的 Futures 模块,位于 concurrent.futures 和 asyncio 中,它们都表示带有延迟的操作。

Futures 会将处于等待状态的操作包裹起来放到队列中,这些操作的状态随时可以查询,当然,它们的结果或是异常,也能够在操作完成后被获取。
在协程中,我们使用到了ansyncio中的Future。这里我们的多线程,将会使用到concurrent.futures中的Future对象。

concurrent.futures 带来了线程池,使线程能够更加高效地利用。相比Thread,也更加方便使用

这个模块具有线程池和进程池、管理并行编程任务、处理非确定性的执行流程、进程/线程同步等功能

池由两部分组成,一部分是内部的队列,存放着待执行的任务;另一部分是一系列的进程或线程,用于执行这些任务 来源:https://python-parallel-programmning-cookbook.readthedocs.io/zh_CN/latest/chapter4/02_Using_the_concurrent.futures_Python_modules.html

组成部分:

concurrent.futures.Executor: 这是一个虚拟基类,提供了异步执行的方法。

submit(function, argument): 调度函数(可调用的对象)的执行,将 argument 作为参数传入。

map(function, argument): 将 argument 作为参数执行函数,以 异步 的方式。

shutdown(Wait=True): 发出让执行者释放所有资源的信号。

concurrent.futures.Future: 其中包括函数的异步执行。Future对象是submit任务(即带有参数的functions)到executor的实例。

Executor有两种子类,get_ipython自独立操作一个线程池或进程池,分别为:

concurrent.futures.ThreadPoolExecutor(max_workers) #操作线程池

concurrent.futures.ProcessPoolExecutor(max_workers)#操作进程池

import concurrent.futures 
#ThreadPoolExecutor创建一个线程池,max_workers表示最多有多少个worker并行执行任务。
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
    #executor调用异步执行方法submit,执行任务,第一个参数是执行任务的函数,args和kwargs表示函数参数。返回一个Future对象
    future = executor.submit(pow, 2, 3)
    print(type(future))#输出:<class 'concurrent.futures._base.Future'>
    print(future.result()) #结果:8
<class 'concurrent.futures._base.Future'>
8
import concurrent.futures
import requests
import time

def download_one(url):
    resp = requests.get(url)
    print("Read {} from {}".format(len(resp.content),url))

def download_all(sites):
    with concurrent.futures.ThreadPoolExecutor(max_workers = 5) as executor:
        executor.map(download_one,sites)
        
def main():
    sites = [
        'https://en.wikipedia.org/wiki/Portal:Arts',
        'https://en.wikipedia.org/wiki/Portal:History',
        'https://en.wikipedia.org/wiki/Portal:Society',
        'https://en.wikipedia.org/wiki/Portal:Biography',
        'https://en.wikipedia.org/wiki/Portal:Mathematics',
        'https://en.wikipedia.org/wiki/Portal:Technology',
        'https://en.wikipedia.org/wiki/Portal:Geography',
        'https://en.wikipedia.org/wiki/Portal:Science',
        'https://en.wikipedia.org/wiki/Computer_science',
        'https://en.wikipedia.org/wiki/Python_(programming_language)',
        'https://en.wikipedia.org/wiki/Java_(programming_language)',
        'https://en.wikipedia.org/wiki/PHP',
        'https://en.wikipedia.org/wiki/Node.js',
        'https://en.wikipedia.org/wiki/The_C_Programming_Language',
        'https://en.wikipedia.org/wiki/Go_(programming_language)'
    ]
    start_time = time.perf_counter()
    download_all(sites)
    end_time = time.perf_counter()
    print('Download {} sites in {} seconds'.format(len(sites), end_time - start_time))
    
if __name__ == '__main__':
    main()
    
Read 191745 from https://en.wikipedia.org/wiki/Portal:Arts
Read 373799 from https://en.wikipedia.org/wiki/Portal:Biography
Read 319650 from https://en.wikipedia.org/wiki/Portal:History
Read 275172 from https://en.wikipedia.org/wiki/Portal:Society
Read 274280 from https://en.wikipedia.org/wiki/Portal:Technology
Read 323180 from https://en.wikipedia.org/wiki/Portal:Geography
Read 364444 from https://en.wikipedia.org/wiki/Computer_science
Read 554219 from https://en.wikipedia.org/wiki/Python_(programming_language)
Read 340435 from https://en.wikipedia.org/wiki/Java_(programming_language)
Read 560652 from https://en.wikipedia.org/wiki/PHP
Read 193173 from https://en.wikipedia.org/wiki/Node.js
Read 65944 from https://en.wikipedia.org/wiki/The_C_Programming_Language
Read 364566 from https://en.wikipedia.org/wiki/Go_(programming_language)
Read 180470 from https://en.wikipedia.org/wiki/Portal:Science
Read 299680 from https://en.wikipedia.org/wiki/Portal:Mathematics
Download 15 sites in 1.7136735189997125 seconds

多线程对于CPU操作heavy的情况下,效果并不太明显,如,计算1-100000

import time
import concurrent.futures

def count(number):
    count = 0
    for i in range(number):
        count += i
   # print("count is:{}\n".format(count))
    return count
    


number_list = [50000,100000,200000,3000000]
#直接调用
start = time.perf_counter()

for num in number_list:
    print(count(num))
end = time.perf_counter()
print("直接调用花费时间:{}".format(end - start))



#多线程

start = time.perf_counter()
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
    future_to_num = {executor.submit(count,num):num for num in number_list}
  #  futures = [executor.submit(count,num) for num in number_list]
    #as_completed(fs)会yields 一个完成或取消的(总之Complete)的Future对象。
    for future in concurrent.futures.as_completed(future_to_num):
        num = future_to_num[future]
        print("reuslt of {} is :{}".format(num,future.result()))
        try:
            result = future.result() #获取结果
        except Exception as e:
            print("Exception:{}".format(e))
        
            
end = time.perf_counter()
print("多线程调用花费时间:{}".format(end - start))

1249975000
4999950000
19999900000
4499998500000
直接调用花费时间:0.23715457800062723
reuslt of 100000 is :4999950000
reuslt of 50000 is :1249975000
reuslt of 200000 is :19999900000
reuslt of 3000000 is :4499998500000
多线程调用花费时间:0.2344772459982778

这是因为,在Cpython下面,同一时刻,只能有一个线程执行。它的多线程是系统通过线程切换来实现的。这是因为CPython的GIL导致的。

对于IO密集型,因为很多时间是在等待。使用多线程,可以有效利用等待时间

而对于CPU密集型的话,多线程并不会并行计算,所以效果并不太明显,此时我们应该使用多进程。

GIL Global Interpreter Lock,即全局解释器锁。因为Cpython解释器的内存管理并不是线程安全的,即有多个线程的情况下,有可能同时去修改一个对象。于是Cpython使用简单的锁机制(大概即有一个全局锁,每次锁住一个线程,运行一定时间,然后释放锁,让其它线程获取锁,然后其它线程去执行),这样保证同一时间最多只有一个线程执行字节码。这样导致python没办法利用多核优势。

多线程 vs 协程

多线程与协程,都是并发编程。协程是由用户自己定义在什么时候暂停操作,让出控制权;
而多线程是我们只需submit好任务,剩下的交给操作系统。但是频繁的线程切换也是需要消耗资源和时间的。
什么时候用多线程,什么时候用协程,最终还是需要根据一定的场景决定。

The End!

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,258评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,335评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,225评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,126评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,140评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,098评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,018评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,857评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,298评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,518评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,678评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,400评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,993评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,638评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,801评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,661评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,558评论 2 352

推荐阅读更多精彩内容