python中threading库详解及实例

python中threading库

另一篇文章:pthreading库详解

threading模块

threading模块除了Thread类之外,好包括其他很多的同步机制,下面来看一下threading模块汇总所包含的对象。

对象描述

Thread执行线程的对象

Lock锁对象

RLock递归锁,是一个线程可以再次拥有已持有的锁对象

Condition条件变量对象,使一个线程等待另一个线程满足特定的条件触发

Event事件对象,普通版的Condition

Semaphore信号量,为线程间共享的资源提供一个“计数器”,计数开始值为设置的值,默认为1

BoundedSemaphore与Semaphore相同,有边界,不能超过设置的值

Timer定时运行的线程对象,定时器

Barrier界限,当达到某一界限后才可以继续执行

看到threading有这么多对象,是不是有些懵了,下面一个个的来看一下

Thread类

Thread类是threading模块的主要主要执行对象。Thread对象有三个数据属性,name(线程名)、ident(线程的标识)、daemon(布尔值,是否是守护线程)。这三个数据属性可以直接通过对象进行调用并进行设置。

守护线程一般是一个等待客户端请求服务的服务器,进程退出时,该线程在正常情况下不会退出

Thread类还有一些对象方法

对象方法描述

__init__()实例化一个线程对象

start()开始执行线程

run()定义线程功能方法(一般在子类中进行重写)

join(timeout=None)直至启动的线程终止或timeout秒,否则一直挂起,多用于主线程进行阻塞等待子线程运行完毕。

isAlivel/is_alive()线程是否存活

注意

__init__()完整函数如下__init__(group=None,target=None,name=None,args=(),kwargs={},verbose=None,daemon=None),Thread对象实例化需要一个可调用的target(可以是一个函数,也可是一个可调用的类实例),参数args或者kwargs。

说了这么多,那怎么创建线程呢?一般有两种方法:

创建Thread实例,传给其一个函数或可调用的类实例

派生Thread的子类,并创建子类的实例

一般来说,创建Thread实例并传递一个函数和派生Thread子类比较常用,后者更符合面向对象且比较容易扩展

创建Thread实例,传给它一个函数

importrandomimportthreadingfromtimeimportctime,sleepdefloop(nloop,nsec):print('start loop ',nloop,' sec:',nsec,' at:',ctime())    sleep(nsec)    print('end loop ',nloop,' done at:',ctime())defmain():print('starting at:',ctime())    threads = []foriinrange(3):        t = threading.Thread(target=loop,args=(i,random.randint(1,5)))        threads.append(t)foriinrange(3):        threads[i].start()foriinrange(3):        threads[i].join()    print('all done at:',ctime())if__name__ =='__main__':    main()

传递给Thread实例一个函数其实和thread模块中差不多,这里随机生成3个Thread实例,分别运行随机事件,然后通过循环让线程启动threads[i].start(),然后通过join()让主线程等待结束。

打印结果如下

startingat:ThuSep7 17:53:442017startloop0sec: 1at:ThuSep7 17:53:442017startloop1sec: 5at:ThuSep7 17:53:442017startloop2sec: 3at:ThuSep7 17:53:442017endloop0doneat:ThuSep7 17:53:452017endloop2doneat:ThuSep7 17:53:472017endloop1doneat:ThuSep7 17:53:492017alldoneat:ThuSep7 17:53:492017

如果将join代码注释掉的话,主线程将不会等待子线程运行,打印结果如下:

startingat:ThuSep7 17:56:112017startloop0sec: 2at:ThuSep7 17:56:112017startloop1sec: 4at:ThuSep7 17:56:112017startloop2sec: 5at:ThuSep7 17:56:112017alldoneat:ThuSep7 17:56:112017endloop0doneat:ThuSep7 17:56:132017endloop1doneat:ThuSep7 17:56:152017endloop2doneat:ThuSep7 17:56:162017

可以看到all done语句已经先执行完毕。然后各个子线程仍然在运行直到结束。

创建Thread实例,传给它一个可调用类的实例

这里需要解释一下,一般实现了__call__方法的类,其实例可以像函数一样进行调用(其实函数就是可调用的对象),称之为可调用类的实例。这样的话,只需要在类中实现__call__方法即可

importrandomimportthreadingfromtimeimportctime, sleepclassThreadFunc(object):def__init__(self,func,args,name=''):self.name = name        self.func = func        self.args = argsdef__call__(self):self.func(*self.args)defloop(nloop,nsec):print('start loop ',nloop,' sec:',nsec,' at:',ctime())    sleep(nsec)    print('end loop ',nloop,' at:',ctime())defmain():print('start at:',ctime())    threads = []    loops = range(3)foriinloops:        t = threading.Thread(target=ThreadFunc(loop,(i,random.randint(1,5)),loop.__name__))        threads.append(t)foriinloops:        threads[i].start()foriinloops:        threads[i].join()    print('all done at:',ctime())if__name__ =='__main__':    main()

这个例子和上面的例子相同,只不过在实例化Thread的时候将ThreadFunc传递给target,当t调用start的时候,其会调用__call__方法。

看一下运行的结果:

importrandomimportthreadingfromtimeimportctime, sleepclassThreadFunc(object):def__init__(self,func,args,name=''):self.name = name        self.func = func        self.args = argsdef__call__(self):self.func(*self.args)defloop(nloop,nsec):print('start loop ',nloop,' sec:',nsec,' at:',ctime())    sleep(nsec)    print('end loop ',nloop,' at:',ctime())defmain():print('start at:',ctime())    threads = []    loops = range(3)foriinloops:        t = threading.Thread(target=ThreadFunc(loop,(i,random.randint(1,5)),loop.__name__))        threads.append(t)foriinloops:        threads[i].start()foriinloops:        threads[i].join()    print('all done at:',ctime())if__name__ =='__main__':    main()

派生Thread的子类,创建子类的实例。

派生Thread的子类,一般需要重写run方法。

importrandomfromthreadingimportThreadfromatexitimportregisterfromtimeimportctime, sleepclassThreadFunc(Thread):def__init__(self,func,args):Thread.__init__(self)        self.func = func        self.args = argsdefrun(self):self.func(*self.args)defloop(nloop,nsec):print('start loop ',nloop,' sec:',nsec,' at:',ctime())    sleep(nsec)    print('end loop ',nloop,' at:',ctime())defmain():print('start at:',ctime())    threads = []    loops = range(3)foriinloops:        t = ThreadFunc(loop,(i,random.randint(1,5)))        threads.append(t)foriinloops:        threads[i].start()@registerdef_atexit():print('all done at:',ctime())if__name__ =='__main__':    main()

需要注意的是,子类的构造函数必须先调用基类的构造函数,在基类的构造函数中对于相应的参数进行了设置。这里并没有用join来控制主线程等待子线程完成,而是使用atexit.register()来注册一个退出函数。来看一下结果:

startat:ThuSep7 18:33:222017startloop0sec: 4at:ThuSep7 18:33:222017startloop1sec: 3at:ThuSep7 18:33:222017startloop2sec: 4at:ThuSep7 18:33:222017endloop1at:ThuSep7 18:33:252017endloop0at:ThuSep7 18:33:262017endloop2at:ThuSep7 18:33:262017alldoneat:ThuSep7 18:33:262017

以上就是多线程的三种实现方式。

同步原语

一般在多线程代码中,一般有一些特定的函数或代码块不希望被多个线程同时执行,这就需要使用同步了。同步原语中,锁是最简单、最低级的机制,而信号量通常用于多线程竞争有限资源的情况。

锁(Lock)

锁只有两种状态,锁定和解锁。即支持两种操作:获得锁和释放锁。当多线程争夺锁的时候,允许第一个获得锁的线程进入临界区,并执行代码,其他后到达的线程将被阻塞,直到获得锁的线程执行完毕,退出临界区,并释放锁。其他等待的线程去争夺锁并进行临界区。

下面来看一下不适用锁的例子。

importrandomfromatexitimportregisterfromthreadingimportcurrentThread, Threadfromtimeimportctime, sleepremaining = []defloop(nsec):tname = currentThread().name    remaining.append(tname)    print('{0} started at:{1}'.format(tname,ctime()))    sleep(nsec)    remaining.pop()    print('{0} Completed at:{1} used {2}'.format(tname,ctime(),nsec))    print(remaining)defmain():foriinrange(4):        Thread(target=loop,args=(random.randint(1,5),)).start()@registerdef_axexit():print('all down at:',ctime())if__name__ =='__main__':    main()

上面的程序很简单,一个共享的列表remaining来存储剩余的线程,在每个线程中先将线程名称添加到该列表,运行完成后,则从列表中删除。下面来看一下运行的结果:

Thread-1startedat:ThuSep7 22:11:012017Thread-2startedat:ThuSep7 22:11:012017Thread-3startedat:ThuSep7 22:11:012017Thread-4startedat:ThuSep7 22:11:012017Thread-2Completedat:ThuSep7 22:11:022017usedtime1Thread-3Completedat:ThuSep7 22:11:022017usedtime1['Thread-1', 'Thread-2']['Thread-1', 'Thread-2']Thread-4Completedat:ThuSep7 22:11:032017usedtime2['Thread-1']Thread-1Completedat:ThuSep7 22:11:042017usedtime3[]alldownat:ThuSep7 22:11:042017

看上面的输出,Thread-2已经执行完毕,下面的remainlist却仍还有Thread-2,而Thread-4正在执行,却没有。这说明多个线程并行执行IO程序,同时有多个程序修改同一个变量导致值输出问题。这个时候需要用锁来防止多个线程同时修改共享数据。

修改代码如下:

importrandomfromatexitimportregisterfromthreadingimportcurrentThread, Thread, Lockfromtimeimportctime, sleepremaining = []lock = Lock()defloop(nsec):tname = currentThread().name    lock.acquire()    remaining.append(tname)    print('{0} started at:{1}'.format(tname,ctime()))    lock.release()    sleep(nsec)    lock.acquire()    remaining.pop()    print('{0} Completed at:{1} used time {2}'.format(tname,ctime(),nsec))    print(remaining)    lock.release()defmain():foriinrange(4):        Thread(target=loop,args=(random.randint(1,5),)).start()@registerdef_axexit():print('all down at:',ctime())if__name__ =='__main__':    main()

大部分代码都没有改动,只是在添加了全局变量lock,然后在remaining添加和删除线程名称的时候进行锁的获取和释放,来看一下运行的结果。

Thread-1startedat:ThuSep7 22:26:402017Thread-2startedat:ThuSep7 22:26:402017Thread-3startedat:ThuSep7 22:26:402017Thread-4startedat:ThuSep7 22:26:402017Thread-1Completedat:ThuSep7 22:26:412017usedtime1['Thread-1', 'Thread-2', 'Thread-3']Thread-4Completedat:ThuSep7 22:26:422017usedtime2['Thread-1', 'Thread-2']Thread-2Completedat:ThuSep7 22:26:422017usedtime2['Thread-1']Thread-3Completedat:ThuSep7 22:26:442017usedtime4[]alldownat:ThuSep7 22:26:442017

这样看结果是不是正常了。

可能你会觉得lock这样进行锁的获取和释放代码不太直观,你也可以使用with语句,如下:

withlock:    remaining.append(tname)    print('{0} started at:{1}'.format(tname,ctime()))

这样看起来是不是简洁多了,缩进的代码会自动进行加锁和释放锁的功能。

信号量

信号量其实是一个计数器,当资源消耗时递减,当资源释放时递增。资源的消耗与释放称为一次PV操作,P()源于probeer/proberen,也称为wait、try、acquire、pend或procure,是消耗资源使计数器递减的操作。V()源于verhogen/verhoog,也成为signal、increment、release、post、vacate,是释放资源,使其回到资源池的操作。不过在python中被固定为acquire和release操作。

下面通过生产者-消费者问题来了解一下信号量

场景:生产者生产产品,消费者消费产品,柜台上最多只能摆放5件产品

fromatexitimportregisterfromrandomimportrandrangefromthreadingimportLock, Thread, BoundedSemaphorefromtimeimportsleep, ctimelock = Lock()MAX =5product_on_shelf = BoundedSemaphore(MAX)defproduct():withlock:        print('product is doing')try:            product_on_shelf.release()exceptValueError:            print('shelf is full')else:            print('product is done')defconsume():withlock:        print('product is consume')ifproduct_on_shelf.acquire():            print('product is successed consume')else:            print('shelf is empty')defproducer(loops):print('producer is start for loops:',loops)foriinrange(loops):        product()        sleep(randrange(3))defcustomer(loops):print('customer is start for loops:', loops)foriinrange(loops):        consume()        sleep(randrange(3))defmain():print('starting at:',ctime())    nloops = randrange(3,6)    Thread(target=producer,args=(nloops,)).start()    Thread(target=customer,args=(nloops,)).start()@registerdef_atexit():print('all done at:',ctime())if__name__ =='__main__':    main()

这里定义了生产者和消费者两个线程,随机循环生产和消费产品。product和consume是生产产品和消费产品的函数,均通过锁进行同步,这里使用的是BoundedSemaphore,主要是因为其有边界,值为Max,初始状态下信号量的值为5。这里需要对release进行异常捕获,超过边界BoundedSemaphore会抛出ValueError异常,而acquire则返回boolean来表示获取是否成功。下面来看一下结果:

starting at: Fri Sep811:34:462017producerisstartforloops:3productisdoingshelfisfullcustomerisstartforloops:3productisconsumeproductissuccessed consumeproductisconsumeproductissuccessed consumeproductisdoingproductisdoneproductisconsumeproductissuccessed consumeproductisdoingproductisdoneall done at: Fri Sep811:34:502017

可以看到运行结果正常。

threading模块是对thread模块的封装,并添加了状态管理,并且通过lock扩展出来的Condition进行了多线程的通过管理,所以一般在实际生产中主要使用threading模块去处理问题。

threading模块暂时就讲到这里。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,826评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,968评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,234评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,562评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,611评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,482评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,271评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,166评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,608评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,814评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,926评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,644评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,249评论 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,866评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,991评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,063评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,871评论 2 354