python多线程之二——threading模块

上一篇文章讲了python多线程的基础知识和thread模块,这一篇着重讲解一下threading模块

threading模块

threading模块除了Thread类之外,好包括其他很多的同步机制,下面来看一下threading模块汇总所包含的对象。

对象 描述
Thread 执行线程的对象
Lock 锁对象
RLock 递归锁,是一个线程可以再次拥有已持有的锁对象
Condition 条件变量对象,使一个线程等待另一个线程满足特定的条件触发
Event 事件对象,普通版的Condition
Semaphore 信号量,为线程间共享的资源提供一个“计数器”,计数开始值为设置的值,默认为1
BoundedSemaphore 与Semaphore相同,有边界,不能超过设置的值
Timer 定时运行的线程对象,定时器
Barrier 界限,当达到某一界限后才可以继续执行

看到threading有这么多对象,是不是有些懵了,下面一个个的来看一下

Thread类

Thread类是threading模块的主要主要执行对象。Thread对象有三个数据属性,name(线程名)、ident(线程的标识)、daemon(布尔值,是否是守护线程)。这三个数据属性可以直接通过对象进行调用并进行设置。

守护线程一般是一个等待客户端请求服务的服务器,进程退出时,该线程在正常情况下不会退出
Thread类还有一些对象方法

对象方法 描述
__init__() 实例化一个线程对象
start() 开始执行线程
run() 定义线程功能方法(一般在子类中进行重写)
join(timeout=None) 直至启动的线程终止或timeout秒,否则一直挂起,多用于主线程进行阻塞等待子线程运行完毕。
isAlivel/is_alive() 线程是否存活

注意
__init__()完整函数如下__init__(group=None,target=None,name=None,args=(),kwargs={},verbose=None,daemon=None),Thread对象实例化需要一个可调用的target(可以是一个函数,也可是一个可调用的类实例),参数args或者kwargs。

说了这么多,那怎么创建线程呢?一般有两种方法:

  • 创建Thread实例,传给其一个函数或可调用的类实例
  • 派生Thread的子类,并创建子类的实例
    一般来说,创建Thread实例并传递一个函数和派生Thread子类比较常用,后者更符合面向对象且比较容易扩展
  1. 创建Thread实例,传给它一个函数
import random
import threading
from time import ctime,sleep

def loop(nloop,nsec):
    print('start loop ',nloop,' sec:',nsec,' at:',ctime())
    sleep(nsec)
    print('end loop ',nloop,' done at:',ctime())

def main():
    print('starting at:',ctime())
    threads = []

    for i in range(3):
        t = threading.Thread(target=loop,args=(i,random.randint(1,5)))
        threads.append(t)

    for i in range(3):
        threads[i].start()

    for i in range(3):
        threads[i].join()

    print('all done at:',ctime())

if __name__ == '__main__':
    main()

传递给Thread实例一个函数其实和thread模块中差不多,这里随机生成3个Thread实例,分别运行随机事件,然后通过循环让线程启动threads[i].start(),然后通过join()让主线程等待结束。
打印结果如下

starting at: Thu Sep  7 17:53:44 2017
start loop  0  sec: 1  at: Thu Sep  7 17:53:44 2017
start loop  1  sec: 5  at: Thu Sep  7 17:53:44 2017
start loop  2  sec: 3  at: Thu Sep  7 17:53:44 2017
end loop  0  done at: Thu Sep  7 17:53:45 2017
end loop  2  done at: Thu Sep  7 17:53:47 2017
end loop  1  done at: Thu Sep  7 17:53:49 2017
all done at: Thu Sep  7 17:53:49 2017

如果将join代码注释掉的话,主线程将不会等待子线程运行,打印结果如下:

starting at: Thu Sep  7 17:56:11 2017
start loop  0  sec: 2  at: Thu Sep  7 17:56:11 2017
start loop  1  sec: 4  at: Thu Sep  7 17:56:11 2017
start loop  2  sec: 5  at: Thu Sep  7 17:56:11 2017
all done at: Thu Sep  7 17:56:11 2017
end loop  0  done at: Thu Sep  7 17:56:13 2017
end loop  1  done at: Thu Sep  7 17:56:15 2017
end loop  2  done at: Thu Sep  7 17:56:16 2017

可以看到all done语句已经先执行完毕。然后各个子线程仍然在运行直到结束。

  1. 创建Thread实例,传给它一个可调用类的实例
    这里需要解释一下,一般实现了__call__方法的类,其实例可以像函数一样进行调用(其实函数就是可调用的对象),称之为可调用类的实例。这样的话,只需要在类中实现__call__方法即可
import random
import threading
from time import ctime, sleep

class ThreadFunc(object):
    def __init__(self,func,args,name=''):
        self.name = name
        self.func = func
        self.args = args

    def __call__(self):
        self.func(*self.args)

def loop(nloop,nsec):
    print('start loop ',nloop,' sec:',nsec,' at:',ctime())
    sleep(nsec)
    print('end loop ',nloop,' at:',ctime())

def main():
    print('start at:',ctime())
    threads = []
    loops = range(3)
    for i in loops:
        t = threading.Thread(target=ThreadFunc(loop,(i,random.randint(1,5)),loop.__name__))
        threads.append(t)

    for i in loops:
        threads[i].start()

    for i in loops:
        threads[i].join()

    print('all done at:',ctime())

if __name__ == '__main__':
    main()

这个例子和上面的例子相同,只不过在实例化Thread的时候将ThreadFunc传递给target,当t调用start的时候,其会调用__call__方法。
看一下运行的结果:

import random
import threading
from time import ctime, sleep


class ThreadFunc(object):
    def __init__(self,func,args,name=''):
        self.name = name
        self.func = func
        self.args = args

    def __call__(self):
        self.func(*self.args)

def loop(nloop,nsec):
    print('start loop ',nloop,' sec:',nsec,' at:',ctime())
    sleep(nsec)
    print('end loop ',nloop,' at:',ctime())

def main():
    print('start at:',ctime())
    threads = []
    loops = range(3)
    for i in loops:
        t = threading.Thread(target=ThreadFunc(loop,(i,random.randint(1,5)),loop.__name__))
        threads.append(t)

    for i in loops:
        threads[i].start()

    for i in loops:
        threads[i].join()

    print('all done at:',ctime())

if __name__ == '__main__':
    main()
  1. 派生Thread的子类,创建子类的实例。
    派生Thread的子类,一般需要重写run方法。
import random
from threading import Thread
from atexit import register
from time import ctime, sleep

class ThreadFunc(Thread):
    def __init__(self,func,args):
        Thread.__init__(self)
        self.func = func
        self.args = args
        
    def run(self):
        self.func(*self.args)
        
def loop(nloop,nsec):
    print('start loop ',nloop,' sec:',nsec,' at:',ctime())
    sleep(nsec)
    print('end loop ',nloop,' at:',ctime())

def main():
    print('start at:',ctime())
    threads = []
    loops = range(3)
    for i in loops:
        t = ThreadFunc(loop,(i,random.randint(1,5)))
        threads.append(t)

    for i in loops:
        threads[i].start()

@register
def _atexit():
    print('all done at:',ctime())

if __name__ == '__main__':
    main()

需要注意的是,子类的构造函数必须先调用基类的构造函数,在基类的构造函数中对于相应的参数进行了设置。这里并没有用join来控制主线程等待子线程完成,而是使用atexit.register()来注册一个退出函数。来看一下结果:

start at: Thu Sep  7 18:33:22 2017
start loop  0  sec: 4  at: Thu Sep  7 18:33:22 2017
start loop  1  sec: 3  at: Thu Sep  7 18:33:22 2017
start loop  2  sec: 4  at: Thu Sep  7 18:33:22 2017
end loop  1  at: Thu Sep  7 18:33:25 2017
end loop  0  at: Thu Sep  7 18:33:26 2017
end loop  2  at: Thu Sep  7 18:33:26 2017
all done at: Thu Sep  7 18:33:26 2017

以上就是多线程的三种实现方式。

同步原语

一般在多线程代码中,一般有一些特定的函数或代码块不希望被多个线程同时执行,这就需要使用同步了。同步原语中,锁是最简单、最低级的机制,而信号量通常用于多线程竞争有限资源的情况。

锁(Lock)

锁只有两种状态,锁定和解锁。即支持两种操作:获得锁和释放锁。当多线程争夺锁的时候,允许第一个获得锁的线程进入临界区,并执行代码,其他后到达的线程将被阻塞,直到获得锁的线程执行完毕,退出临界区,并释放锁。其他等待的线程去争夺锁并进行临界区。
下面来看一下不适用锁的例子。

import random
from atexit import register
from threading import currentThread, Thread
from time import ctime, sleep

remaining = []

def loop(nsec):
    tname = currentThread().name
    remaining.append(tname)
    print('{0} started at:{1}'.format(tname,ctime()))
    sleep(nsec)
    remaining.pop()
    print('{0} Completed at:{1} used {2}'.format(tname,ctime(),nsec))
    print(remaining)

def main():
    for i in range(4):
        Thread(target=loop,args=(random.randint(1,5),)).start()

@register
def _axexit():
    print('all down at:',ctime())

if __name__ == '__main__':
    main()

上面的程序很简单,一个共享的列表remaining来存储剩余的线程,在每个线程中先将线程名称添加到该列表,运行完成后,则从列表中删除。下面来看一下运行的结果:

Thread-1 started at:Thu Sep  7 22:11:01 2017
Thread-2 started at:Thu Sep  7 22:11:01 2017
Thread-3 started at:Thu Sep  7 22:11:01 2017
Thread-4 started at:Thu Sep  7 22:11:01 2017
Thread-2 Completed at:Thu Sep  7 22:11:02 2017 used time 1
Thread-3 Completed at:Thu Sep  7 22:11:02 2017 used time 1
['Thread-1', 'Thread-2']
['Thread-1', 'Thread-2']
Thread-4 Completed at:Thu Sep  7 22:11:03 2017 used time 2
['Thread-1']
Thread-1 Completed at:Thu Sep  7 22:11:04 2017 used time 3
[]
all down at: Thu Sep  7 22:11:04 2017

看上面的输出,Thread-2已经执行完毕,下面的remainlist却仍还有Thread-2,而Thread-4正在执行,却没有。这说明多个线程并行执行IO程序,同时有多个程序修改同一个变量导致值输出问题。这个时候需要用锁来防止多个线程同时修改共享数据。
修改代码如下:

import random
from atexit import register
from threading import currentThread, Thread, Lock
from time import ctime, sleep

remaining = []
lock = Lock()
def loop(nsec):
    tname = currentThread().name
    lock.acquire()
    remaining.append(tname)
    print('{0} started at:{1}'.format(tname,ctime()))
    lock.release()
    sleep(nsec)
    lock.acquire()
    remaining.pop()
    print('{0} Completed at:{1} used time {2}'.format(tname,ctime(),nsec))
    print(remaining)
    lock.release()

def main():
    for i in range(4):
        Thread(target=loop,args=(random.randint(1,5),)).start()

@register
def _axexit():
    print('all down at:',ctime())

if __name__ == '__main__':
    main()

大部分代码都没有改动,只是在添加了全局变量lock,然后在remaining添加和删除线程名称的时候进行锁的获取和释放,来看一下运行的结果。

Thread-1 started at:Thu Sep  7 22:26:40 2017
Thread-2 started at:Thu Sep  7 22:26:40 2017
Thread-3 started at:Thu Sep  7 22:26:40 2017
Thread-4 started at:Thu Sep  7 22:26:40 2017
Thread-1 Completed at:Thu Sep  7 22:26:41 2017 used time 1
['Thread-1', 'Thread-2', 'Thread-3']
Thread-4 Completed at:Thu Sep  7 22:26:42 2017 used time 2
['Thread-1', 'Thread-2']
Thread-2 Completed at:Thu Sep  7 22:26:42 2017 used time 2
['Thread-1']
Thread-3 Completed at:Thu Sep  7 22:26:44 2017 used time 4
[]
all down at: Thu Sep  7 22:26:44 2017

这样看结果是不是正常了。
可能你会觉得lock这样进行锁的获取和释放代码不太直观,你也可以使用with语句,如下:

with lock:
    remaining.append(tname)
    print('{0} started at:{1}'.format(tname,ctime()))

这样看起来是不是简洁多了,缩进的代码会自动进行加锁和释放锁的功能。

信号量

信号量其实是一个计数器,当资源消耗时递减,当资源释放时递增。资源的消耗与释放称为一次PV操作,P()源于probeer/proberen,也称为wait、try、acquire、pend或procure,是消耗资源使计数器递减的操作。V()源于verhogen/verhoog,也成为signal、increment、release、post、vacate,是释放资源,使其回到资源池的操作。不过在python中被固定为acquire和release操作。

下面通过生产者-消费者问题来了解一下信号量
场景:生产者生产产品,消费者消费产品,柜台上最多只能摆放5件产品

from atexit import register
from random import randrange
from threading import Lock, Thread, BoundedSemaphore
from time import sleep, ctime

lock = Lock()
MAX = 5
product_on_shelf = BoundedSemaphore(MAX)

def product():
    with lock:
        print('product is doing')
        try:
            product_on_shelf.release()
        except ValueError:
            print('shelf is full')
        else:
            print('product is done')

def consume():
    with lock:
        print('product is consume')
        if product_on_shelf.acquire():
            print('product is successed consume')
        else:
            print('shelf is empty')

def producer(loops):
    print('producer is start for loops:',loops)
    for i in range(loops):
        product()
        sleep(randrange(3))

def customer(loops):
    print('customer is start for loops:', loops)
    for i in range(loops):
        consume()
        sleep(randrange(3))

def main():
    print('starting at:',ctime())
    nloops = randrange(3,6)
    Thread(target=producer,args=(nloops,)).start()
    Thread(target=customer,args=(nloops,)).start()

@register
def _atexit():
    print('all done at:',ctime())

if __name__ == '__main__':
    main()

这里定义了生产者和消费者两个线程,随机循环生产和消费产品。product和consume是生产产品和消费产品的函数,均通过锁进行同步,这里使用的是BoundedSemaphore,主要是因为其有边界,值为Max,初始状态下信号量的值为5。这里需要对release进行异常捕获,超过边界BoundedSemaphore会抛出ValueError异常,而acquire则返回boolean来表示获取是否成功。下面来看一下结果:

starting at: Fri Sep  8 11:34:46 2017
producer is start for loops: 3
product is doing
shelf is full
customer is start for loops: 3
product is consume
product is successed consume
product is consume
product is successed consume
product is doing
product is done
product is consume
product is successed consume
product is doing
product is done
all done at: Fri Sep  8 11:34:50 2017

可以看到运行结果正常。

threading模块是对thread模块的封装,并添加了状态管理,并且通过lock扩展出来的Condition进行了多线程的通过管理,所以一般在实际生产中主要使用threading模块去处理问题。
threading模块暂时就讲到这里。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,332评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,508评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,812评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,607评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,728评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,919评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,071评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,802评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,256评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,576评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,712评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,389评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,032评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,798评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,026评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,473评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,606评论 2 350

推荐阅读更多精彩内容

  • 线程 引言&动机 考虑一下这个场景,我们有10000条数据需要处理,处理每条数据需要花费1秒,但读取数据只需要0....
    不浪漫的浪漫_ea03阅读 359评论 0 0
  • 引言&动机 考虑一下这个场景,我们有10000条数据需要处理,处理每条数据需要花费1秒,但读取数据只需要0.1秒,...
    chen_000阅读 505评论 0 0
  • 来源:数据分析网Threading 模块从 Python 1.5.2 版开始出现,用于增强底层的多线程模块 thr...
    PyChina阅读 1,738评论 0 5
  • 线程状态新建,就绪,运行,阻塞,死亡。 线程同步多线程可以同时运行多个任务,线程需要共享数据的时候,可能出现数据不...
    KevinCool阅读 798评论 0 0
  • The DK2 于2014年春,Oculus发布了第二代开发版头显设备,代号为DK2。与DK1相比,Oculus ...
    小太阳会发光诺阅读 200评论 0 0