python多进程踩过的坑

背景

算法离线测试的上线后,随着业务的增长,算法的构建越来越频繁,数量也越来越多,最近一个任务中就包含28个算法。
随着压力的增大,算法离线测试需要算法并行测试来解决效率问题。

成果

# 28个算法单进程执行时间
Task 1111417 runs 58800.14 seconds.
# 多进程执行时间
Task 1111573 runs 2187.91 seconds.
# 时间由16小时缩短到36分钟!!

多进程demo

一般的多进程代码

from multiprocessing import Pool
import os, time, random
def long_time_task(name):
    print('Run task %s (%s)...' % (name, os.getpid()))
    start = time.time()
    time.sleep(random.random() * 3)
    end = time.time()
    print('Task %s runs %0.2f seconds.' % (name, (end - start)))
    return 0, name

if __name__=='__main__':
    print('Parent process %s.' % os.getpid())
    p = Pool()
    for i in range(4):
        p.apply_async(long_time_task, args=(i,))
    print('Waiting for all subprocesses done...')
    p.close()
    p.join()
    print('All subprocesses done.')

遇到的问题

  1. 类中不能直接使用 pool
class Foo():
    def work(self, i):
        print("this is work~~")
    def run(self):
        p = Pool()
        for i in range(4):
            p.apply_async(self.work, args=(i,))
        p.close()
        p.join()

if __name__ == '__main__':
    foo = Foo()
    foo.run()

直接使用会报错 cPickle.PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup builtin.instancemethod failed
查了下官方文档发现 python 默认只能 pickle 以下的类型:

  • None, True, and False
  • integers, floating point numbers, complex numbers
  • strings, bytes, bytearrays
  • tuples, lists, sets, and dictionaries containing only picklable objects
  • functions defined at the top level of a module (using def, not lambda)
  • built-in functions defined at the top level of a module
  • classes that are defined at the top level of a module
  • instances of such classes whose dict or the result of calling getstate() is picklable (see section -
  • Pickling Class Instances for details).

函数只能pickle在顶层定义的函数,很明显的class内的函数无法被pickle因此会报错。
这是 python2 才会遇到的问题,据说 python3 已经解决

解决方法
有很多种解决方法比如:

  • 调用pathos包下的multiprocessing模块代替原生的multiprocessing。pathos中multiprocessing是用dill包改写过的,dill包可以将几乎所有python的类型都serialize,因此都可以被pickle。
  • 使用线程代替进程 from multiprocessing.pool import ThreadPool as Pool
  • 可以使用 copy_reg 来规避异常
  • 把调用的函数写在顶层规避
  • 重写类的内部函数规避
## 调用函数写在顶层
def func(x):
    return x*x
class someClass(object):
    def __init__(self,func):
        self.f = func
    def go(self):
        p= Pool(processes=4)
        for i in range(4)
            p.apply_async(func, args=(10, ))
        p.close()
        p.join()
a=someClass(func)
a.go()
## 重写类的内部函数
class testClass():
    def upMethod(self, a):
        print 'I am UP:%s' %a
        time.sleep(1)
    def downMethod(self, b):
        print 'I am DOWN:%s' %b
        time.sleep(1)
    def multiProcess(self):
        p = Pool(2)
        aObj=p.apply_async(self, args=('up', 't1',))
        aObj=p.apply_async(self, args=('down', 't2',))
        p.close()
        p.join()
    def __call__(self,sign, *args, **kwds):
        if sign=='up':
                return self.upMethod(*args, **kwds)
        elif sign=='down':
                return self.downMethod(*args, **kwds)
if __name__=='__main__':
    testObj=testClass()
    testObj.multiProcess()
  1. apply_async 函数子进程不执行情况
  • 参数需要以元组的形式传递,并在最后一个参数后面加上 ,号,如果没有加,子进程不会执行
# 解决方法
p.apply_async(func, args=(url,)) #需要在参数后面添加逗号
  • 代码中有队列相关的操作时,也会引起子进程不执行的问题
    不要使用 multiProcess 中的 queue ,要用 manager 中的 queue 。
  • 子进程遇到错误挂掉,却不会抛出任何错误
# 一种
try:
    do some shit
except Exception e:
    print/log traceback.format_exc()
# 另一种
p = Pool(1)
result = p.apply_async(func, args=(arg,))
result = result.get(11)
p.terminate()
p.join()
print(result)
# multiprocessing.pool.AsyncResult.get(timeout)实现 function raise error 时自己也 raise error ,
# 然后通过 pool.terminate() 就能结束进程,还可以设置超时、返回 function return value
# 注意 这里是每个进程都等待 timeout 秒,多进程同时等待 timeout 应将 pool.apply_async() 改为使用 pool.map_async()
  1. 多进程间的数据共享
  • 多进程之间不能使用普通的Python数据类型,比如平常使用的list或者dict由父进程传递给子进程后,子进程只可读,写无效。
# 建议使用
from multiprocessing import Manager  
manager = Manager()
  • 数据多层嵌套无效
# 解决方法-重写嵌套层
mdict['aaa'] = {}
tmp = mdict['aaa'] 
tmp['bbb'] = 666
mdict['aaa'] = tmp
  • 进行 dump 等序列化操作报错
解决方法:1、初始化数据内容;2、强制转化 list() dict()

其他问题

为什么不用线程

  1. 效率不高
    实际操作对比发现,单线程10分钟任务,多线程花费8分钟,多进程花费5分钟。
  2. 原因:GIL锁:Global Interpreter Lock
    任何Python线程执行前,必须先获得GIL锁,然后,每执行100条字节码,解释器就自动释放GIL锁,让别的线程有机会执行。这个GIL全局锁实际上把所有线程的执行代码都给上了锁,所以,多线程在Python中只能交替执行,即使100个线程跑在100核CPU上,也只能用到1个核。
  3. 锁产生的原因
    多进程中,同一个变量,各自有一份拷贝存在于每个进程中,互不影响,而多线程中,所有变量都由所有线程共享,所以,任何一个变量都可以被任何一个线程修改,因此,线程之间共享数据最大的危险在于多个线程同时改一个变量,把内容给改乱了。
  4. 锁优缺点
    好处:确保了某段关键代码只能由一个线程从头到尾完整地执行
    坏处:首先是阻止了多线程并发执行,包含锁的某段代码实际上只能以单线程模式执行,效率就大大地下降了。
    其次,由于可以存在多个锁,不同的线程持有不同的锁,并试图获取对方持有的锁时,可能会造成死锁,导致多个线程全部挂起,既不能执行,也无法结束,只能靠操作系统强制终止。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。