Python进阶 - 高性能计算之多线程

写在前面

这个系列是笔者学习python一些进阶功能的笔记和思考,水平有限,错漏在所难免,还请方家不吝指教。

什么是线程?

首先应该了解操作系统如何支持多任务运行的,这里有“并行”和“并发”两个概念。

  • 并行: 在同一时刻有多条指令在多个处理器上同时执行,通俗点就是CPU数>=任务数的情况;
  • 并发:在同一时刻只能有一条指令执行,但是多个任务被快速轮换执行,使得宏观上让人感觉到有多个任务同时执行的效果。通俗点来说就是CPU数<任务数的情况。

在操作系统中运行的一个程序就是一个进程(Process),它是代码+资源的组合。而在一个进程中,有一个或者多个线程(Thread)。线程是进程的组成部分,一个进程至少有一个主线程来完成进程从开始到结束的全部操作。

单核CPU如何运行多个线程?

  • 时间片轮转:操作系统让每个程序依次运行极短的时间
  • 优先级调度:让高优先级的任务优先占用CPU

python中多线程的实现

threading模块

t1 = threading.Thread(target = someFunction, args = ()),此时会创建一个线程对象,但是不会直接创建线程。用args关键词可以为函数传入参数,但是注意这里传入的参数因为数量不定,因此一定要是一个元组

调用线程对象的start方法才会创建线程,并且让线程开始运行。

如下面的例子:

import threading


def my_func(cnt):
    for i in range(cnt):
        print(i)


if __name__ == '__main__':
    t1 = threading.Thread(target=my_func, args=(10,))  # 创建一个线程
    t1.start() # 启动子线程

主线程需要负责回收分配给子线程的资源,因此主线程一定会晚于子线程结束。

继承thread类

用继承了Thread等类也可以实现线程创建。但是这个类中一定需要定义run方法,这样在start启动线程后会自动调用run方法,例如以下程序:

from time import sleep
import threading

class myThread(threading.Thread):
    def run(self):
        for i in range(10):
            sleep(1)
            msg = "I'm " + self.name + " @ " + str(i)
            print(msg)

def main():
    testThread = myThread()
    testThread.start()

if __name__ == "__main__":
    main()

会得到以下结果:

I'm Thread-1 @ 0
I'm Thread-1 @ 1
I'm Thread-1 @ 2
I'm Thread-1 @ 3
I'm Thread-1 @ 4
I'm Thread-1 @ 5
I'm Thread-1 @ 6
I'm Thread-1 @ 7
I'm Thread-1 @ 8
I'm Thread-1 @ 9

如果需要调用一系列的函数,那么可以将其封装在这个类中,然后在run方法里调用。

不同线程中全局变量的共享

在不同的线程中,全局变量是共享的,这使得不同线程之间协作处理数据非常方便。但是这种共享也会有负面影响 -- 造成资源竞争,例如如下代码:

from time import sleep
import threading

g_num = 0

def test1(num):
    global g_num
    for i in range(num):
        g_num += 1
    print("g_num in test1: %d" %g_num)


def test2(num):
    global g_num
    for i in range(num):
        g_num += 1
    print("g_num in test2: %d" %g_num)


def main():
    t1 = threading.Thread(target=test1, args=(1000000,))
    t2 = threading.Thread(target=test2, args=(1000000,))

    t1.start()
    t2.start()
    sleep(5)

    print("g_num in main thread: %d" % g_num)


if __name__ == "__main__":
    main()

执行后会得到如下结果:

g_num in test1: 1221968
g_num in test2: 1323741
g_num in main thread: 1323741

这里得到的g_num并不是我们想象的2000000,因为在执行python代码时,我们使得数据自加的一句python代码会被翻译为好几句机器码:

  • 读取数据
  • 数据+1
  • 存储数据

在执行时,由于只使用了一个CPU,CPU会采用时间片轮转的方法来模拟多任务。因此实际上会在任意一步被cpu打断,例如在完成数据+1后,在存储数据时被打断,这样增加后的数据就没有存入内存,下一个线程从内存读取时,读到的就是没有自增前的数,这样可能使得两个线程中自增的数据相互覆盖,导致加到最后得到的值要比想象中的小。这种问题,也叫做“数据不同步”。

线程锁

当多个线程几乎同时修改一个数据时,需要进行同步控制。线程同步能够保证多个线程安全访问竞争资源,最简单的同步机制是引入<u>互斥锁</u>。

互斥锁会给资源附加一个状态:锁定/非锁定。

当某个线程需要修改资源时,先将其锁定,此时其他线程不可以修改该资源;到该线程修改结束后,释放资源,使其变为非锁定,其他的线程才能再次锁定该资源,进行修改。这样互斥锁就保证了每次只有一个线程进行写入操作,保证了多线程下全局变量的正确性。

为了实现互斥锁,threading模块中提供了Lock和RLock两个类:

  • threading.Lock是一个基本锁对象,每次只能锁定一次,其余的锁请求需要等锁释放后才能获取;
  • threading.RLock是可重入锁(Reentrant Lock),在同一个线程中可以进行多次锁定和释放,但是锁定和释放的方法必须成堆出现,如果调用了n次锁定,那么只有n次释放才能解锁。

Lock和RLock两个类都提供了以下方法来锁定和释放:

  • acquire(blocking = True, timeout = -1)进行锁定,blocking为True时会堵塞当前线程,直到其他线程释放该锁,当前线程获取到这个锁为止;而blocking为False的情况下则不会堵塞当前线程,而是往下执行
  • release()释放锁

对于上面的问题,我们可以用互斥锁来解决多个线程之间的资源竞争问题,如夏例:

from time import sleep
import threading

g_num = 0
mutex = threading.Lock() # 建立互斥锁

def test1(num):
    global g_num
    mutex.acquire(True) # 在更改数据之前获得互斥锁
    for i in range(num):
        g_num += 1
    mutex.release() # 更改完数据之后释放锁定
    print("g_num in test1: %d" %g_num)


def test2(num):
    global g_num
    mutex.acquire(True) # 在更改数据之前获得互斥锁
    for i in range(num):
        g_num += 1
    mutex.release() # 更改完数据之后释放锁定
    print("g_num in test2: %d" %g_num)


def main():
    t1 = threading.Thread(target=test1, args=(1000000,))
    t2 = threading.Thread(target=test2, args=(1000000,))

    t1.start()
    t2.start()
    sleep(5)

    print("g_num in main thread: %d" % g_num)


if __name__ == "__main__":
    main()

这样,我们在最后得到的就是我们预期的结果了:

g_num in test1: 1000000
g_num in test2: 2000000
g_num in main thread: 2000000

但是需要注意,用互斥锁会带来以下两个问题:

  • 阻止了多线程并发执行,包含锁的某段代码实际上只能以单线程模式运行,其他模式处于堵塞的状态,效率就大大下降了;
  • 由于可以存在多个锁,不同的线程持有不同锁,并试图获取对方持有的锁时,可能会造成死锁。

死锁

线程间共享多个资源时,如果两个线程分别占有一部分资源并且同时等待对方的资源,就可能造成死锁。当出现死锁时,会造成应用停止响应,如下例:

import threading
from time import sleep


lockA = threading.Lock()
lockB = threading.Lock()

def testA():
    # 为进程A上锁
    lockA.acquire()
    print("----Lock A acquired in testA----")
    sleep(1)
    # 中间需要操作另一批数据,上锁B
    lockB.acquire()
    print("----Lock B acquired in testA----")
    sleep(2)
    # 数据操作完成,解开锁B
    lockB.release()
    print("----Lock B released in testA----")
    # 完成操作,释放锁A
    lockA.release()
    print("----Lock A released in testA----")


def testB():
    # 开始操作时为进程B上锁
    lockB.acquire()
    print("----Lock B acquired in testB----")
    sleep(1)
    # 进行下一步操作前需要获取锁A
    lockA.acquire()
    print("----Lock A acquired in testB----")
    sleep(2)
    # 进行完数据操作释放锁A
    lockA.release()
    print("----Lock A released in testB----")
    # 释放锁B
    lockB.release()
    print("----Lock B released in testB----")


def main():
    t1 = threading.Thread(target=testA)
    t2 = threading.Thread(target=testB)

    t1.start()
    t2.start()


if __name__ == "__main__":
    main()

在这个例子中,线程t1开始时,为lockA上锁,并进入睡眠(模拟一些耗时操作),而同时开始的线程t2为lockB上锁,等t1向下执行时,需要lockB,lockB处于上锁状态,因此线程t1堵塞,等待lockB被释放;而t2在执行时,acquire lockA失败,也进入了堵塞状态,等待lockA被释放。

这样,两个线程互相需要对方释放互斥锁,两个线程都无法向下执行,进入了死锁状态。

死锁的避免

  • 程序设计时尽量避免(例如银行家算法)
  • 添加超时时间限制

ThreadLocal

除了使用互斥锁以外,还有一种办法可以实现各线程间的数据隔离,那就是使用threading.local()。它会为各个变量创建完全属于他们自己的副本(也就是线程局部变量),这样各个线程操作的就是属于自己的私有资源,可以杜绝数据不同步的问题。

import threading
from time import sleep

from time import sleep
import threading

g_num = 0
local = threading.local()


def test1(num):
    global g_num
    local.g_num = g_num  # 将g_num绑定一个线程局部变量
    for i in range(num):
        local.g_num += 1
    print("g_num in test1: %d" % local.g_num)


def test2(num):
    global g_num
    local.g_num = g_num  # 将g_num绑定一个线程局部变量
    for i in range(num):
        local.g_num += 1
    print("g_num in test2: %d" % local.g_num)


def main():
    t1 = threading.Thread(target=test1, args=(1000000,))
    t2 = threading.Thread(target=test2, args=(1000000,))

    t1.start()
    t2.start()
    t1.join()  # 等待线程t1执行完毕
    t2.join()  # 等待线程t2执行完毕

    print("g_num in main thread: %d" % g_num)


if __name__ == "__main__":
    main()

结果为:

g_num in test1: 1000000
g_num in test2: 1000000
g_num in main thread: 0

可以看到,尽管我们将全局变量g_num绑定在线程局部变量中,但是每个线程操作的实际上是自己的线程局部变量,并不会作用于我们绑定上去的全局变量。

Python中多线程的问题

在Python中(当前使用版本3.7,在3.8中据说会用局部解释器绕开GIL的问题),多线程并不能真正有效利用多核,例如如下代码:

import threading, time, multiprocessing


def my_counter(num):
    i = 0
    for _ in range(num):
        i += 1
    return True


def main():
    # 单线程顺序执行
    thread_array = {}
    start_time = time.time()
    count_num = int(1e8)
    for tid in range(2):
        t = threading.Thread(target=my_counter, args=(count_num,))
        t.start()
        t.join()
    end_time = time.time()
    print("Total time for two sequential threads: ", round(end_time - start_time, 2), " s")

    # 双线程并行
    print("CPU num for current machine: ",multiprocessing.cpu_count())
    thread_array = {}
    start_time = time.time()
    for tid in range(2):
        t = threading.Thread(target=my_counter, args=(count_num,))
        t.start()
        thread_array[tid] = t
    for tid in thread_array.keys():
        thread_array[tid].join()
    end_time = time.time()
    print("Total time for multi-threads: ", round(end_time - start_time, 2), " s")

if __name__ == '__main__':
    main()

运行的结果如下:

Total time for two sequential threads:  11.32  s
CPU num for current machine:  4
Total time for multi-threads:  11.69  s

可以看到,在四核的电脑上,两个线程用多线程并行和单线程串行,速度并没有任何提高。

因为Python的线程虽然是真正的线程,但解释器执行代码时,有一个GIL锁:Global Interpreter Lock,任何Python线程执行前,必须先获得GIL锁,然后,每执行100条字节码,解释器就自动释放GIL锁,让别的线程有机会执行。这个GIL全局锁实际上把所有线程的执行代码都给上了锁,所以,多线程在Python中只能交替执行,即使100个线程跑在100核CPU上,也只能用到1个核。

GIL是Python解释器设计的历史遗留问题,通常我们用的解释器是官方实现的CPython,要真正利用多核,除非重写一个不带GIL的解释器。

解决方法

  • 用多进程替代多线程

multiprocess库的出现很大程度上是为了弥补thread库因为GIL而低效的缺陷。它完整的复制了一套thread所提供的接口方便迁移。唯一的不同就是它使用了多进程而不是多线程。每个进程有自己的独立的GIL,因此也不会出现进程之间的GIL争抢。

当然multiprocess也不是万能良药。它的引入会增加程序实现时线程间数据通讯和同步的困难。就拿计数器来举例子,如果我们要多个线程累加同一个变量,对于thread来说,申明一个global变量,用thread.Lock的context包裹住三行就搞定了。而multiprocess由于进程之间无法看到对方的数据,只能通过在主线程申明一个Queue,put再get或者用share memory的方法。这个额外的实现成本使得本来就非常痛苦的多线程程序编码,变得更加痛苦了。

在将上面用threading实现的多任务改为multiprocessing:

import threading, time, multiprocessing


def my_counter(num):
    i = 0
    for _ in range(num):
        i += 1
    return True


def main():
    # 单进程顺序执行
    process_array = {}
    start_time = time.time()
    count_num = int(1e8)
    for pid in range(2):
        p = multiprocessing.Process(target=my_counter, args=(count_num,))
        p.start()
        p.join()
    end_time = time.time()
    print("Total time for two sequential processes: ", round(end_time - start_time, 2), " s")

    # 多进程并行
    print("CPU num for current machine: ", multiprocessing.cpu_count())
    process_array = {}
    start_time = time.time()
    for tid in range(2):
        t = multiprocessing.Process(target=my_counter, args=(count_num,))
        t.start()
        process_array[tid] = t
    for tid in process_array.keys():
        process_array[tid].join()
    end_time = time.time()
    print("Total time for multi-processings: ", round(end_time - start_time, 2), " s")


if __name__ == '__main__':
    main()

结果如下,可以看到多进程相比单进程,速度有了明显提升:

Total time for two sequential processes:  11.3  s
CPU num for current machine:  4
Total time for multi-processings:  7.91  s
  • 用其他解析器

之前也提到了既然GIL只是CPython的产物,那么其他解析器是不是更好呢?没错,像JPython和IronPython这样的解析器由于实现语言的特性,他们不需要GIL的帮助。然而由于用了Java/C#用于解析器实现,他们也失去了利用社区众多C语言模块有用特性的机会。所以这些解析器也因此一直都比较小众。毕竟功能和性能大家在初期都会选择前者,Done is better than perfect。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,390评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,821评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,632评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,170评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,033评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,098评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,511评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,204评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,479评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,572评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,341评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,213评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,576评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,893评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,171评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,486评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,676评论 2 335

推荐阅读更多精彩内容