关于python多进程使用(Queue、生产者和消费者)

图片发自简书App

关于\color{#D86683}{多进程}的生产者和消费者的实现,刚好最近有用到,简单总结记录下:

多进程

\color{#D86683}{进程}是系统独立调度核分配系统资源(CPU、内存)的基本单位,进程之间是相互独立的,每启动一个新的进程相当于把数据进行了一次克隆。
python提供了多种方法实现了多进程中间的\color{#D86683}{通信和数据共享}(可以修改同一份数据)。

关于GIL

GIL的全称是 Global Interpreter Lock(全局解释器锁),来源是 Python 设计之初的考虑,为了数据安全所做的决定。
某个线程想要执行,必须先拿到 GIL,我们可以把 GIL 看作是“通行证”,并且在一个 Python 进程中,GIL 只有一个,这就导致了多线程抢占GIL耗时。这就是为什么在多核CPU上,Python 的多线程效率并不高的根本原因。
所以有必要学习下多进程的使用。

多进程使用示例

\color{#68A6D3}{multiprocessing\_queue.py}

#!/usr/bin/env Python
# -- coding: utf-8 --

"""
@version: v1.0
@author: narutohyc
@file: multiprocessing_queue.py
@Description: 多进程队列使用示例
@time: 2020/5/14 15:53
"""

from multiprocessing import Process, Queue, Manager
from multiprocessing import cpu_count
import os
import time

class MultiProcessingQueue:
    def __init__(self):
        # 进程数
        self.num_of_worker = cpu_count()
        # 进程队列大小,根据不同的任务需求
        self.size_of_queue = 10

    def start_work(self):
        print("start_work 开始")

        # 进程队列
        process_list = []

        # 新建一个大小为10的队列
        work_queue = Queue(self.size_of_queue)

        # 进程间共享列表, 其他的还有共享字典等,都是进程安全的
        dealed_sample_lst = Manager().list()

        # 多个生产者
        # 这里要注意任务的拆分,数据的产生来源是不是可以被多个生产者共享
        num_os_producer = 4
        for _ in range(num_os_producer):
            sent = Process(target=self.productor, args=(work_queue, dealed_sample_lst,))
            sent.start()
            process_list.append(sent)

        # 多个消费者
        for _ in range(self.num_of_worker - num_os_producer):
            process = Process(target=self.consumer, args=(work_queue, dealed_sample_lst,))
            process.start()
            process_list.append(process)
        
        [process.join() for process in process_list[:num_os_producer]]
        # 这里需要加入结束标识,还有就是JoinableQueue的方式
        for _ in range(self.num_of_worker - 1):
            work_queue.put(None)
        [process.join() for process in process_list[num_os_producer:]]
        print("start_work 结束")
        return dealed_sample_lst

    def productor(self, work_queue: Queue, dealed_sample_lst):
        print("生产者开始工作")
        for ii in range(100):
            work_queue.put(Task(task_name=f'{str(os.getpid())}-{str(ii)}', data=[ii for _ in range(2)]))
            if ii % 30 == 0:
                time.sleep(1)
                print("生产者休息ing")

        '''
        JoinableQueue 比Queue多了task_done() 与join()两个函数,多用于生产者消费者问题。
        task_done()是用在get()后,发送通知说我get完了
        join()是说Queue里所有的task都已处理。
        '''
        print("生产者工作结束")

    def consumer(self, work_queue: Queue, dealed_sample_lst):
        while True:
            task: Task = work_queue.get()
            if task is None:
                break

            # 处理数据
            task.data = [ii * 2 for ii in task.data]
            dealed_sample_lst.append(task)
            print(task)

        print(f'进程{os.getpid()} 处理结束')

\color{#68A6D3}{任务对象}

# 注意这里不能放太多的数据,不然会导致多进程效率变很低
class Task:
    def __init__(self, task_name: str, data: list, **kwargs):
        self.task_name = task_name
        self.data = data

    def __repr__(self):
        return f'task_name:{self.task_name} data:{self.data}'

\color{#68A6D3}{调用示例}

def multiprocessing_queue_test():
    multiprocessing_queue = MultiProcessingQueue()
    dealed_sample_lst = multiprocessing_queue.start_work()
    # for sample in dealed_sample_lst:
    #     print(sample)
    print("测试结束")

if __name__ == '__main__':
    multiprocessing_queue_test()

\color{#68A6D3}{结束输出}

# 删了一些多余的输出
start_work 开始
生产者开始工作
task_name:28868-0 data:[0, 0]
生产者休息ing
task_name:28868-1 data:[2, 2]
...
task_name:28868-6 data:[12, 12]
生产者休息ing
task_name:28868-31 data:[62, 62]
...
task_name:28868-58 data:[116, 116]
生产者休息ing
task_name:28868-61 data:[122, 122]
...
生产者休息ing
task_name:28868-91 data:[182, 182]
...
task_name:28868-96 data:[192, 192]
生产者工作结束
进程29208 处理结束
进程28496 处理结束
task_name:28868-98 data:[196, 196]
进程30200 处理结束
task_name:28868-99 data:[198, 198]
进程30072 处理结束
start_work 结束
测试结束
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 220,137评论 6 511
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,824评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 166,465评论 0 357
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,131评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,140评论 6 397
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,895评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,535评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,435评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,952评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,081评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,210评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,896评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,552评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,089评论 0 23
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,198评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,531评论 3 375
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,209评论 2 357