最近用python
要写一些大量爬取api
数据,然后分析、存储的程序,api
提供的数据格式完全一致,只是url
不同而已,这是一个明显受限于网络和数据库的问题,因此想利用用一些python
对并发编程方面的支持,以提高cpu
的利用率。我是用的是python 2.7
版本。目前标准库已经支持多线程、多进程模型,协程方面也有相当有名的第三方库gevent
。由于我的程序执行的任务基本完全一致,每个任务需要做的事情也不多,但是任务量相当大,如果每个任务均发起起一个进程/线程,那么仅进程/线程的创建、销毁、上下文切换都需要消耗相当大的系统资源。我的想法是使用进城池/线程池,这样系统中存在的总进程数/线程数不变,每个进程/线程可以根据自己的实际执行情况从任务队列中获取任务。python
作为著名的以库多、库封装良好著称的语言,对这方面的支持也是相当优秀,至少解决了我的需求。在此将python
中多进程多线程以及协程池的使用做一个大概的总结。
multiprocessing
进城池
python
对多进程的支持都通过multiprocessing
这个库提供。先看python
中对进程池的支持,我仅仅使用如下几行代码就完成了将自己原来单进程的程序改造为多进程的程序。
import multiprocessing
p = multiprocessing.Pool(multiprocessing.cpu_count() * 2 + 1)
p.map_async(func=do_crawler, iterable=get_tasks(), chunksize=5)
p.close()
p.join()
其中multiprocessing.cpu_count
可以获得系统的cpu core num
,所以我的进城池中共有multiprocessing.cpu_count() * 2 + 1
的进程数。 类似python
中的map
函数,可以将一个函数依次作用于一个序列,我们的p.map_async
就是依次将func
函数依次作用于iterable
参数提供的任务序列,不过是在多进程中进行的而已。另外说明一下这个chuncksize
参数,默认情况下进城池中的每个进程从iterable
中拿到一个任务,然后传给func
函数,执行完毕后再到iterable
中取下一个任务。这个chunck_size
就是进程一次可以从iterable
中获取chunck_size
个任务,然后他把这chunck_size
个任务执行完毕之后再去取任务,在任务量相当大的情况下适当调大chunck_size
可以发挥更好的性能。
多进程通信
多进程编程自然是好,但是由于多个进程被分配到不同的虚拟内存中,不能相互访问,因此多进程编程就必须解决多进程通信的问题。可以使用的方法也有很多,管道(multiprocessing.Pipe)
、共享内存(multiprocessing.sharedtypes)
、队列(multiprocessing.Queue)
、套接字甚至自己搞一个文件来解决这个问题也未尝不可。python
中对这些都有良好的支持。我对其中的Queue
比较感兴趣,看了文档,说Queue
实际上底层还是用的Pipe
,队列会有个后台线程,每次放入一个元素,这个元素就会被pickle
序列化,然后队列的后台线程就会把他冲到底层管道中去,当然这个队列是可以有多个消费者进程多个生产者进程安全的共同操作。除了同一个机器上多进程之间的通信,python
还通过multiprocessing.manager
模块提供了分布式的多进程通信。
multiprocessing.dummy
突然出来multiprocessing.dummy
这么一个模块,搞得我云里雾里的,python
标准库说这个模块为多线程复制的multiprocessing
,我才发现多线程的待遇实在一般。而且多线程执行时,python
中的全局锁还会导致每一时刻同时只有一个线程可以执行,现如今手机都有四核的年代,这个全局锁来的有些太不识时务了。那为啥还要用这个多线程呢?无他,因为进程的创建、销毁、切换都比线程大,而且进程之间共享数据也没有线程方便。所以也要看看这个multiprocessing.dummy
,线程池是这样使用的
from multiprocessing import dummy
p = dummy.Pool(multiprocessing.cpu_count() * 2 + 1)
p.map_async(func=do_crawler, iterable=get_tasks(), chunksize=5)
p.close()
p.join()
除了import
的模块不同之外,其他完全和多进程一致。
gevent
gevent
是一个第三方库,他提供了python
对协程的支持,通过monkey
在底层也对IO
多路复用提供了支撑,可以说是专门为IO bound
的程序量身订做的。他的使用也很简单,还是用一个gevent.Pool
的例子来看一下,方便和之前对比。
from gevent import pool
p = pool.Pool(multiprocessing.cpu_count() * 2 + 1)
p.map_async(func=do_crawler, iterable=get_tasks(), chunksize=5)
p.join()
和之前的长得实在太像了,这样方便我们使用,python
库的作者真是用心良苦。