Python 多进程的自定义共享数据类型

最近项目要用到Python多进程,进程是1+N式的,1用来向一个数据结构中不断的写数据,其他N个进程从同时从这个数据结构中读数据,每条数据都是一次性的。因此:

  • 当前进程可以作为这里的1进程,然后默认新建N个子进程。N的大小一般和CPU核数相同,如果是生产环境,不想让程序占满服务器的资源,可以再设置一个最大进程数。
  • 因为是共享操作,这个数据结构就需要是进程安全的。

在网上查了一些关于Python多进程的资料,发现大多都只是从官方文档中随便找了些代码(有的连改都没改),都是些不能用来实用的初级知识,实在无法称其为教程(这里面就包括我之前写的Python多进程文章)。经过我不断的筛选和阅读官方文档,终于让我找到了一个可以用的方法。

这里先列出我找到的对我有帮助的文章列表:

正式写代码之前,先要说清楚几个点。

理解多进程

这点可以参考下我之前写的文章线程、进程、协程。对于一个进程来说,如果其调用了join(),它会阻塞自己,一直要等到其他进程都退出后才会继续执行下去。

这就解决了我的第一个问题,就是让主进程作为1进程不断写入数据,具体的操作方法就是在新建子进程时不调用其join()方法。

由浅入深的几个例子

multiprocessing库提供了两种方式建立子进程Process类和Pool类,前者需要对每个子进程进行操作,包括通信、同步和共享;后者则是对前者的封装,其维护了一定的进程池并实现了异步操作。当然还有一些其他的操作,这里就不赘述了。

场景

我这里需要的共享数据结构是一个最小堆,堆中的存储的是一条条记录(大概30W条),并按照某个记录中的时间戳排序。1进程会定时刷新这个堆,N进程会不断从堆中pop数据,处理后根据新的时间戳再push进堆中。

这其中其实涉及到了几个点:建立多进程、给子进程传入参数、自定义堆结构、共享锁。这里我一一解释。

建立子进程并传入参数的方式

前面说了,建立子进程一般来说有两种方式:Process和Pool,这里我用了Process。为什么不用Pool呢?Pool的优势是可以使用异步操作,但这里有个问题就是普通的multiprocess.Lock不支持Pool,需要使用Manager().Lock()才行,这样就显得笨重了,加上我这个项目对异步要求并不高,就采用了Process类用来创建子进程。

这部分代码网上搜一下有很多,我也放出我的测试代码:

from multiprocessing import Process
import os


class TestClass(object):
    def __init__(self, *args, **kwargs):
        pass

    def func(self, i):
        print('SubProcess[{}]: {}'.format(i, os.getpid()))

    def start(self):
        print('Main Process: {}, begin'.format(os.getpid()))
        processes = [Process(target=self.func, args=(i,)) for i in range(3)]
        for p in processes:
            p.start()
        for p in processes:
            p.join()
        print('Main Process: {}, end'.format(os.getpid()))


if __name__ == '__main__':
    TestClass().start()

这里有个地方让我疑惑了很久,因为官网上有一块代码是这样的:

from multiprocessing import Process, Lock

def f(l, i):
    l.acquire()
    print 'hello world', i
    l.release()

if __name__ == '__main__':
    lock = Lock()

    for num in range(10):
        Process(target=f, args=(lock, num)).start()

这一度让我以为只要用Process+Lock就能满足我的需求了。试了之后发现其实这样是没办法在进程间共享数据的(每个进程都有独立的PCB,还记得吗)。这样只能保证多进程在访问公共资源时不会产生冲突(比如说标准输入输出,也就是上面这个例子)。如果要进程间共享数据,需要使用QueuePipe,或共享内存(ValueArray形式),或用Manager(后面会说),不管哪种,都不能满足我“自定义共享最小堆”的需求。

自定义共享数据

这节和多进程没关系,主要是讲我要用到的这个数据结构。关于堆是什么我就不解释了。Python中有个叫heapq的库,这个库中,堆中的数据可以是一个tuple,在排序比较的时候会优先比较tuple[0]的内容,如果相等再比较tuple[1]……以此类推。文档中有一个关于优先级序列的实现很有参考意义,这也要求tuple中的几个数据必须是可以比较的(对象实现了__lt__, __gt__,__eq__方法)。这里我要实现的是一个支持自定义比较函数的堆,我称为HeapQueueWithComparer,直接给出代码:

"""支持自定义比较函数(cmp)的堆队列heap queue"""
import heapq


class HeapQueueWithComparer(object):
    def __init__(self, initial=None, comparer=lambda x: x, heapify=heapq.heapify):
        self.comparer = comparer
        self.data = []

        if initial:
            self.data = [(self.comparer(item), item) for item in initial]
            heapify(self.data)
        else:
            self.data = []

    def push(self, item, heappush=heapq.heappush):
        heappush(self.data, (self.comparer(item), item))

    def pop(self, heappop=heapq.heappop):
        return heappop(self.data)[1] if self.data else None

其原理就是根据comparer生成一个特征值,然后将其和数据本身作为一个元组在堆中进行排序。用起来的时候可以这么用:

import random
from collections import namedtuple

volume_t = namedtuple('volume_t', ['length', 'width', 'height', 'id'])

def __get_rand():
    return random.randint(1, 10)

data = [volume_t(__get_rand(), __get_rand(), __get_rand(), i) for i in range(5)]
heap_cmp = HeapQueueWithComparer(data, lambda x: x.length*x.width*x.height)

[print(heap_cmp.pop()) for i in range(len(data))]

这里是根据一个长方体的体积排序,用namedtuple的原因是使代码更可读,理论上用tuple是一样的。

接下来要解决的一个问题就是如何让这个堆再多个进程中共享并且不会产生不一致的问题。前文其实提了,进程间共享数据的一种方式是使用server process,实际中使用就是multiprocessing.managers,它会新建一个额外的进程用来管理共享的数据,而其本身会作为一个代理,每次子进程需要操作共享数据,实际上都是通过这个server process进行操作的,文档中称之为proxy。一个manager对象支持list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Queue, Value and Array。因此,我们需要自定义一个Manager,并让它支持带锁的操作。这里提一句,Manager也可以是远程的,不过这里用不到。

自定义Manager需要继承BaseManager,测试代码如下。注意这里和上一个例子中的代码并不完全一致,我是觉得再用lambda对代码的可读性影响太大了。:

import heapq
import os
import random
import time
from collections import namedtuple
from multiprocessing import Lock, Process
from multiprocessing.managers import BaseManager


class HeapManager(BaseManager):
    def __init__(self):
        super().__init__()
        self.lock = Lock()
        self.data = []

    def sync(self, rules_changed):
        with self.lock:
            if self.data:
                self.data[:] = []
            self.data = [(item[0]*item[1]*item[2], item,) for item in rules_changed]
            heapq.heapify(self.data)

    def pop(self):
        with self.lock:
            return heapq.heappop(self.data)[1] if self.data else None

    def push(self, item):
        with self.lock:
            heapq.heappush(self.data, (item[0]*item[1]*item[2], item,))


HeapManager.register('HeapManager', HeapManager)
volume_t = namedtuple('volume_t', ['length', 'width', 'height', 'id'])


class TestClass(object):
    def __init__(self, *args, **kwargs):
        manager = HeapManager()
        manager.start()
        self.hm = manager.HeapManager()
        self.lock = Lock()

    def func(self, i):
        while True:
            with self.lock:
                item = self.hm.pop()
            if not item:
                print('Begin to sleep 2s.')
                time.sleep(2)
            else:
                print('Process-{} {} {}'.format(i, os.getpid(), item))

    def start(self):
        jobs = [Process(target=self.func, args=(i, )) for i in range(3)]
        [j.start() for j in jobs]
        # [j.join() for j in jobs] # 不用这行可以不阻塞主进程

        def __get_rand():
            return random.randint(1, 10)

        while True:
            data = [volume_t(__get_rand(), __get_rand(), __get_rand(), i) for i in range(10)]
            with self.lock:
                self.hm.sync(data)
            print('Main {}, add {}.'.format(os.getpid(), data))
            print(sorted(data))
            time.sleep(5)


if __name__ == '__main__':
    TestClass().start()

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,014评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,796评论 3 386
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,484评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,830评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,946评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,114评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,182评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,927评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,369评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,678评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,832评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,533评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,166评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,885评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,128评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,659评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,738评论 2 351

推荐阅读更多精彩内容