一点python多进程multiprocessing的感悟

一 python多进程multiprocessing 主要是process和pool两个类, pool类主要是两个方法:pool.apply_async和 pool.apply

1.Process 类

Process 类用来描述一个进程对象。创建子进程的时候,只需要传入一个执行函数和函数的参数即可完成 Process 示例的创建。

star() 方法启动进程,
join() 方法实现进程间的同步,等待所有进程退出。
close() 用来阻止多余的进程涌入进程池 Pool 造成进程阻塞。

multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)

target 是函数名字,需要调用的函数
args 函数需要的参数,以 tuple 的形式传入


参考; https://www.jb51.net/article/141825.htm

2. Pool 类

可以提供指定数量的进程供用户使用,默认是 CPU 核数。当有新的请求提交到 Poll 的时候,如果池子没有满,会创建一个进程来执行,否则就会让该请求等待。

  • Pool 对象调用 join 方法会等待所有的子进程执行完毕
  • 调用 join 方法之前,必须调用 close
  • 调用 close 之后就不能继续添加新的 Process 了

二.一些实际bug:

  1. 多进程的使用的时候 [p.join() for p in process_pool if p.is_alive()] 的位置很关键,如果在for 循环里面则输出还是单次创建一个进程,然后杀死此进程,在创建新的进程,这个时候估计是阻塞的,
    如果p.join 和 p.close 在 任务for循环 的同级(也就是外面),则是同时创建指定个数的进程,此时我的程序是参数process_count来控制,如果process_count等于8,那么就会同时创造八个进程打开八个浏览器窗口.
    例子1:在for循环里面:阻塞式
def main(url, process_count):
    process_pool=[]
    for i in range(20000):
        # sleep(1)
        print(f"{i} is starting")

        start_time=time.perf_counter()
        while len(process_pool) >= process_count:
            # time.sleep(1)
            alive_pool=[]
            for p in process_pool:
                if p.is_alive():
                    alive_pool.append(p)
                else:
                    p.close()
            process_pool=alive_pool
        p=multiprocessing.Process(target=add, args=(url,))

        p.start()

        process_pool.append(p)

    [p.join() for p in process_pool if p.is_alive()]
    print(f"{i} is close")
    end_time=time.perf_counter()
    logger.debug(f'handle {i} use time {end_time - start_time}')

例子2:在for循环外面,就会同时创建进程

运行结果:

2.启动多进程时的两个方法的对比:

import multiprocessing
from multiprocessing import Pool
# 多进程一
if __name__ == '__main__':
    print('Parent process %s.' % os.getpid())
    p=Pool(4)
    for i in range(20000):
        print(f"{i} is starting")
        p.apply_async(add, args=(url,))
        sleep(2)
        print(f"{i} is close")
    print('Waiting for all subprocesses done...')
    p.close()
    p.join()
    print('All subprocesses done.')

# 多进程二
if __name__ == '__main__':
    process_count=8
    url="https://www.jianshu.com/p/8728110ad2b2"
    main(url, process_count)


  • 方法1: p.apply_async(add, args=(url,))
    注意:如果传递给apply_async()的函数如果有参数,需要以元组的形式传递 并在最后一个参数后面加上 ,号,如果没有加, 号,提交到进程池的任务也是不会执行的

  • 方法2: p=multiprocessing.Process(target=add, args=(url,))

Python 多进程模块multiprocessing的使用
python的多进程主要通过multiprocessing来实现,主要使用到了两个类,一个是Process,一个是Pool,下面来详细说一下这两个类的使用场景。

* Pool

顾名思义这个类实现的是进程池功能,我们可以使用Pool类来高效地创建进程池,代码如下

from multiprocessing import Pool
import time
def test(arg):
    print(arg)
    time.sleep(3)
process_pool = Pool(4)  # 进程的数量最好不要超过机器的CPU核数
for i in range(4):
    process_pool.apply_async(test, (i))
    process_pool.apply(test, (i))
  
process_pool.close()  # close表示进程池不再接受新的进程
process_pool.join()   # join表示等待进程池中的所有进程执行完,要不然主函数会直接执行完并关闭
* Pool的两个坑 (apply 和 apply_async)

在向进程池添加进程的时候有两种方法,一个是apply, 一个是apply_async
• apply函数会同步执行,一个任务执行完成后再执行下一个任务,就和同步代码一样一样的,所以如果你想实现多进程并发操作,不要用apply
• apply_async函数会异步执行,对,就是你想要的异步执行,但是有一个问题千万注意一下:apply_async函数会把你的所有的进程任务先加载到内存中,然后等待执行!这在一般情况下肯可能没有问题,但是有一种情况要小心:如果你的每一个进程需要加载很大的文件到内存,这个操作会把你的内存撑爆导致死机。

from multiprocessing import Process

def test(data):
    print("push datat to db")

process_pool = []
process_count = 8

for file in file_list:
    with open(file) as f:
        data = f.readlines()
    
    tasks = [data[:10000], data[10000:20000], data[20000:30000], data[30000:]]
    for task in tasks:
        # 如果进程池已满,就等待
        while len(process_pool) >= process_count:
            time.sleep(1)
            # 将已经执行完毕的进程移除进程池
            process_pool = [p for p in process_pool if p.is_alive()]
        
        # 如果进程池有空位就将任务添加进去
        p = Process(target=test, args=(task,))
        p.start()
        process_pool.append(p)

# 等待没有执行完成的进程执行完
[p.join() for p in process_pool if p.is_alive()]
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,258评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,335评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,225评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,126评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,140评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,098评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,018评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,857评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,298评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,518评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,678评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,400评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,993评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,638评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,801评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,661评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,558评论 2 352