使用multiprocessing的问题总结

Python2中的multiprocessing模块,规避了GIL(Global Interpreter Lock)带来的多线程无法实际并发的缺点,设计了几乎与threading模块一模一样的API,目的就是方便我们在必要时可以使用multiprocessing模块替换threading模块达到真正的并发。

但是,因为线程的内存空间是共享的,而进程之间是独立的,所以使用multiprocessing模块时,进程间的同步会比线程间的同步遇到的问题要多。

1:threading中的daemon 和 multiprocessing中的daemon

The entire Python program exits when no alive non-daemon threads are left.

在threading中,如果启动线程时设置为daemon,则主进程要退出时,如果当前其中的线程都是daemon的,则主进程可顺利退出(其他线程也会退出),否则只要有一个non daemon线程存在,则主进程不会顺利退出。

class Thread(threading.Thread):
    def __init__(self, daemon):    
        super(Thread, self).__init__()
        self.daemon = daemon
    def run(self):
        while True:
            print 'in Thread'
            time.sleep(1)

def main():
    thread = Thread(True)
    thread.start()

    time.sleep(2)

    print 'main exit now'
    sys.exit(0)


if __name__ == '__main__':
    main()

在main中,如果启动Thread设置daemon为False,则当main调用sys.exit后,整个进程依然存在,线程依然持续有打印。将daemon设置为True,则当main退出时,整个进程就退出了。

When a process exits, it attempts to terminate all of its daemonic child processes.

在multiprocessing中,当主进程退出时,他会尝试终结其所有daemon的子进程。

class Process(multiprocessing.Process):
    def __init__(self, daemon):    
        super(Process, self).__init__()
        self.daemon = daemon
    def run(self):
        while True:
            print 'in Process'
            time.sleep(1)

def main():
    process = Process(True)
    process.start()

    time.sleep(2)

    print 'main exit now'
    sys.exit(0)

if __name__ == '__main__':
    main()

当启动Process子进程时,如果设置daemon为True,则当main主进程退出时,Process子进程也会退出。如果设置daemon为Fales,则不会。

Note that a daemonic process is not allowed to create child processes. Otherwise a daemonic process would leave its children orphaned if it gets terminated when its parent process exits. Additionally, these are not Unix daemons or services, they are normal processes that will be terminated (and not joined) if non-daemonic processes have exited.

daemon子进程不能在通过multiprocessing创建后代进程,否则当父进程退出后,它终结其daemon子进程,那孙子进程就成了孤儿进程了。当尝试这么做时,会报错:AssertionError: daemonic processes are not allowed to have children

但是,daemon子进程还可以通过subprocess创建后代进程

2:multiprocessing中的Process.terminate

Terminate the process. On Unix this is done using the SIGTERM signal; on Windows TerminateProcess() is used. Note that exit handlers and finally clauses, etc., will not be executed.
Note that descendant processes of the process will not be terminated – they will simply become orphaned.

因为terminate就是直接向该进程按发送SIGTERM信号,进程无法优雅退出。所以terminate方法不会杀掉该子进程的后代进程。即使后代进程是daemon的:

class SubsubProc(multiprocessing.Process):
    def __init__(self):    
        super(SubsubProc, self).__init__(name = 'SubsubProc')
        self.daemon = True
    def run(self):
        while True:
            print 'this is subsubproc'
            time.sleep(2)

class SubProc(multiprocessing.Process):
    def __init__(self):    
        super(SubProc, self).__init__(name = 'SubProc')
        self.daemon = False
    def run(self):
        subsubproc = SubsubProc()
        subsubproc.start()
        while True:
            print 'this is SubProc'
            time.sleep(1)

def main():
    subproc = SubProc()
    subproc.start()

    time.sleep(3)

    subproc.terminate()
    subproc.join()
    print 'subproc terminated'

    time.sleep(3600)
if __name__ == '__main__':
    main()

上面的代码中,主进程创建了SubProc子进程,在SubProc子进程中又创建了SubsubProc孙子进程。当主进程杀掉SubProc子进程后,不管孙子进程SubsubProc是否是daemon的,其都会一直存在,不会被杀掉。

If this method is used when the associated process is using a pipe or queue then the pipe or queue is liable to become corrupted and may become unusable by other process. Similarly, if the process has acquired a lock or semaphore etc. then terminating it is liable to cause other processes to deadlock.

如果进程使用了多进程共享的queue、pipe,则将其terminate时,这些queue或pipe将变的不可用。类似的,当该进程使用了锁或者信号量等共享对象时,杀掉该进程可能会导致其他进程死锁。

class Consumer(multiprocessing.Process):
    def __init__(self, lock):    
        super(Consumer, self).__init__(name = 'Consumer')
        self.lock = lock
    def run(self):
        print 'consumer wait the lock'
        time.sleep(1)
        self.lock.acquire()
        print 'consumer get the lock'
        time.sleep(1)
        self.lock.release()

class Producer(multiprocessing.Process):
    def __init__(self, lock):    
        super(Producer, self).__init__(name = 'Producer')
        self.lock = lock
    def run(self):
        self.lock.acquire()
        print 'producer get the lock'
        time.sleep(100)
        self.lock.release()

def main():
    lock = multiprocessing.Lock()

    producer = Producer(lock)
    producer.start()

    consumer = Consumer(lock)
    consumer.start()

    time.sleep(3)

    producer.terminate()
    producer.join()
    print 'producer terminated'

    time.sleep(3600)

if __name__ == '__main__':
    main()

producer子进程首先得到了锁,然后进入睡眠。consumer子进程等待其释放锁。3秒之后,producer子进程被主进程杀掉,从而导致producer没有机会释放锁,导致consumer永远等待下去。

Queue内部使用了Lock,因而对于使用了Queue的进程进行terminate自然也是不安全的。

如果确实需要终结使用这些对象的process,可以使用multiprocessing.Event,控制process中的主循环:

class Consumer(multiprocessing.Process):
    def __init__(self, lock):    
        super(Consumer, self).__init__(name = 'Consumer')
        self.lock = lock
    def run(self):
        print 'consumer wait the lock'
        time.sleep(3)
        self.lock.acquire()
        print 'consumer get the lock'
        time.sleep(1)
        self.lock.release()

class Producer(multiprocessing.Process):
    def __init__(self, lock, stop_event):    
        super(Producer, self).__init__(name = 'Producer')
        self.lock = lock
        self.stop_event = stop_event
    def run(self):
        while not self.stop_event.is_set():
            self.lock.acquire()
            print 'producer get the lock'
            time.sleep(2)
            self.lock.release()

def main():
    lock = multiprocessing.Lock()

    stop_event = multiprocessing.Event()
    stop_event.clear()
    producer = Producer(lock, stop_event)
    producer.start()

    consumer = Consumer(lock)
    consumer.start()

    time.sleep(1)

    stop_event.set()
    producer.join()
    print 'producer terminated'

    time.sleep(3600)

if __name__ == '__main__':
    main()
3:关闭父进程描述符

通过multiprocessing创建的子进程,它创建了父进程中所有描述符的副本。这就很容易造成一些意想不到的问题。比如一个httpserver父进程收到了客户端的http请求之后,动态创建了子进程,然后在父进程中关闭已与该客户端建链的socket。此时关闭操作不会发生实际作用(发送FIN包),因为在子进程中还存在一个socket的副本。

要想避免这种情况,要么在父进程中打开任何描述符之前创建子进程;要么就是在子进程中关闭不必要的描述符。比如,下面的方法可以在子进程中调用,关闭所有socket类型的描述符:

def close_socketfd_with_procfs():
    proc_path = '/proc/self/fd'

    for fdstr in os.listdir(proc_path):
        fd = int(fdstr)
        try:
            mode = os.fstat(fd).st_mode
            if stat.S_ISSOCK(mode):
                os.close(fd)
        except OSError:
                pass
4:multiprocessing.Queue的实现

Queue的内部实现使用了collections.deque和multiprocessing.Pipe。对Queue首次进行put操作时,Queue内部就会在后台启动一个daemon为True的threading.Thread。put操作仅仅是将对象append到deque中。由后台线程负责将deque中的对象取出,然后send到Pipe中。

启动后台线程时,还会在当前进程中注册一个Finalize,Finalize主要用于multiprocessing.Process进程退出时做清理工作。Queue.put注册的Finalize,就是在进程退出时,要等待后台线程完成当前的工作,也就是将已经put的对象全部发送出去。

如果put到队列中的对象特别大,或者队列已经满了,也就是只有消费者get之后,后台线程才能真正的send完成。这种情况下,如果process主循环退出了,实际上其后台线程还是阻塞在send操作,而没有真正退出,除非消费者及时的get操作。如果消费者的处理比较慢,则可能会有问题:

class Consumer(multiprocessing.Process):
    def __init__(self, queue):
        super(Consumer, self).__init__(name = 'Consumer')
        self.queue = queue
    def run(self):
        while True:
            count = self.queue.get()
            print 'Consumer get count ', count[0]
            time.sleep(3)

class Producer(multiprocessing.Process):
    def __init__(self, queue, stop_event):    
        super(Producer, self).__init__(name = 'Producer')
        self.queue = queue
        self.stop_event = stop_event
    def run(self):
        count = 0
        while not self.stop_event.is_set():
            self.queue.put(str(count)*65536)
            print 'producer put count ', count
            count += 1
            time.sleep(1)
        print 'producer stop loop now'

def main():
    queue = multiprocessing.Queue()

    stop_event = multiprocessing.Event()
    stop_event.clear()
    producer = Producer(queue, stop_event)
    producer.start()

    consumer = Consumer(queue)
    consumer.start()

    time.sleep(10)

    stop_event.set()
    producer.join()
    print 'producer terminated'

    time.sleep(3600)

上面的代码中,生产者每隔1s产生一个大消息(长度大于65536),底层管道的容量默认为65536,所以,只有消费者get之后,后台线程的send操作才能返回。消费者每隔3s才能消费一个消息。所以,当生产者退出循环时,还无法真正结束,必须等待后台线程发送完所有已经put的消息:

producer put count  0
Consumer get count  0
producer put count  1
producer put count  2
producer put count  3
Consumer get count  1
producer put count  4
producer put count  5
Consumer get count  2
producer put count  6
producer put count  7
producer put count  8
Consumer get count  3
producer put count  9
producer stop loop now
Consumer get count  4
Consumer get count  5
Consumer get count  6
Consumer get count  7
Consumer get count  8
producer terminated
Consumer get count  9

出现这种情况一般不会是调用者希望看到的,调用者调用stop_event.set()就是希望生产者进程能马上退出,而不会希望其继续存在一段时间,并且这段时间还取决于消费者的消费速度。

解决这个问题,要么是调用Queue.cancel_join_thread,要么是使用SimpleQueue。但两种方法都有缺点。

Queue.cancel_join_thread的作用,实际上就是把注册的Finalize删除,从而在进程退出时,无需等待后台线程完成send,而直接退出。这样做的问题就是:已经put到Queue中的消息会丢失,更严重的问题是,因为进程直接退出,后台线程也强制退出,有可能导致后台线程持有的锁得不到释放(如果此时后台线程正在send的话),导致再也无法向该Queue中put消息:

class Consumer(multiprocessing.Process):
    def __init__(self, queue):
        super(Consumer, self).__init__(name = 'Consumer')
        self.queue = queue
    def run(self):
        while True:
            count = self.queue.get()
            print 'Consumer get count ', count[0]
            time.sleep(3)


class Producer(multiprocessing.Process):
    def __init__(self, queue, stop_event):    
        super(Producer, self).__init__(name = 'Producer')
        self.queue = queue
        self.stop_event = stop_event
    def run(self):
        count = 0
        while not self.stop_event.is_set():
            self.queue.put(str(count)*65536)
            print 'producer put count ', count
            count += 1
            time.sleep(1)
        self.queue.cancel_join_thread()
        print 'producer stop loop now'


def main():
    queue = multiprocessing.Queue()

    stop_event = multiprocessing.Event()
    stop_event.clear()
    producer = Producer(queue, stop_event)
    producer.start()

    consumer = Consumer(queue)
    consumer.start()

    time.sleep(10)

    stop_event.set()
    producer.join()
    print 'producer terminated'

    print queue._wlock
    queue._wlock.acquire()
    print 'get the lock'

    time.sleep(3600)

if __name__ == '__main__':
    main()

上面代码的结果如下:

producer put count  0
Consumer get count  0
producer put count  1
producer put count  2
producer put count  3
Consumer get count  1
producer put count  4
producer put count  5
producer put count  6
Consumer get count  2
producer put count  7
producer put count  8
Consumer get count  3
producer put count  9
producer stop loop now
producer terminated
<Lock(owner=SomeOtherProcess)>

可见,虽然Producer的主循环退出之后该进程就结束了。但是从4到9的数据也丢了。而且,该队列内部的锁也没有得到释放(”get the lock”始终没有打印出来),这是很严重的问题了。

SimpleQueue的实现没有后台线程。对于大对象而言,实际上是生产者put,消费者get,生产者put…这样的操作依次进行;

另一种解决方法是,使用Queue,当Producer进程退出主循环时,直接自己取尽队列中的对象,以免后台线程阻塞。

6:multiprocessing.get_logger和multiprocessing.log_to_stderr

multiprocessing模块内部会打印调试信息到logger中,可以使用multiprocessing.get_logger获取该logger。但是该接口返回的logger没有handler,也没有设置日志级别。因此,要想使用的话需要自己添加handler并设置Level。

如果希望调试信息直接打印到stderr,则可以调用multiprocessing.log_to_stderr接口。该接口为get_logger获取的logger添加了handler,因而可以直接使用。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,293评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,604评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,958评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,729评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,719评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,630评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,000评论 3 397
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,665评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,909评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,646评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,726评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,400评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,986评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,959评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,197评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,996评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,481评论 2 342

推荐阅读更多精彩内容

  • @(python)[笔记] 目录 一、什么是进程 1.1 进程的概念 进程的概念起源于操作系统,是操作系统最核心的...
    CaiGuangyin阅读 1,247评论 0 9
  • 1.进程 1.1多线程的引入 现实生活中 有很多的场景中的事情是同时进行的,比如开车的时候手和脚共同来驾驶汽车,再...
    TENG书阅读 495评论 0 0
  • 本月,新媒矿每周对平台上文章阅读量达到10万+的文章做了分析,先前有一种说法是13字以内的标题更容易获得阅读量,但...
    新媒矿阅读 241评论 0 1
  • 离线钱包,就是不联网的钱包,不联网就不用担心私钥泄漏,当然要是电脑挂了或丢了那就呵呵了,还是需要备份私钥的。简单的...
    十三流阅读 6,408评论 2 0
  • 今日,我忆起了生命中的三堆篝火,于是,我又乘上时光机,穿越往事。 那是一个秋天,我确定!在靖安的一个...
    简爱简乐阅读 1,633评论 3 2