python 多进程

基于官方文档:
https://docs.python.org/zh-cn/3/library/multiprocessing.html
日乐购,刚才看到的一个博客,写的都不太对,还是基于官方的比较稳妥
我就是喜欢抄官方的,哈哈

  • Class Process

通常我们使用Process实例化一个进程,并调用 他的 start() 方法启动它。
这种方法和 Thread 是一样的。

  • 一个最贱单的例子:
from multiprocessing import Process
def f(name):
    print('hello', name)

if __name__ == '__main__':
    p = Process(target=f, args=('傻逼',))
    p.start()
    p.join()
    print("运行结束")
>> hello 傻逼
运行结束

上图中,我写了 p.join() 所以主进程是 等待 子进程执行完后,才执行 print("运行结束")
否则就是反过来了(这个不一定,看你的语句了,顺序其实是随机的)例如:

p = Process(target=f, args=('傻逼',))
    p.start()
    # time.sleep(1)
    print("运行结束")

>> 运行结束
hello 傻逼

主进加个 sleep

结果:
hello 傻逼 
运行结束 (大约等了1秒后,才输出 运行结束)

所以不加join() ,其实子进程和主进程是各干各的,谁也不等谁。都执行完后,文件运行就结束了

  • 查看子进程和父进程的 id
from multiprocessing import Process
import os

def info(title):
    print(title)
    print('module name:', __name__)
    print('parent process:', os.getppid())  #  获取父进程的 pid  (process id)
    print('process id:', os.getpid())


def f(name):
    info('function f') # use info
    print('hello', name)


if __name__ == '__main__':
    info('main line')
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

>> main line
module name: __main__
parent process: 14212
process id: 1940
function f
module name: __mp_main__
parent process: 1940
process id: 15020
hello bob

上面我们用了 os.getpid() 和 os.getppid() 获取 当前进程,和父进程的id
下面就讲一下,这两个函数的用法:
os.getpid()
返回当前进程的id
os.getppid()
返回父进程的id。 父进程退出后,unix 返回初始化进程(1)中的一个
windows返回相同的id (可能被其他进程使用了)
这也就解释了,为啥我上面 的程序运行多次, 第一次打印的parentid 都是 14212 了。
而子进程的父级 process id 是调用他的那个进程的 id : 1940

  • 上下文和启动子进程
    这个内容太多需要,蛮蛮总结。wc。

视频笔记:
多进程:使用大致方法:

  • 多进程通信

参考:进程通信(pipe和queue)

  • queue 管道:队列是线程和进程安全的(不会竞争)
    q = mp.Queue()
    把q 传给args, 然后在里面 q.put(你的东西)
    取值在外面直接 q.get() # 先进先出
    普通的
    Process 函数用 return,我们也拿不到任何返回值 (智能通过 queue 或者共享内存来交换数据)

pool.map (函数可以有return 也可以共享内存或queue) 结果直接是个列表

def b(x):
    # return 20
    print("child process")
if __name__ == "__main__":
    p  =Pool(processes=2)
    print(p.map(b, [1,2,3,4]))
>>
child process
child process
child process
child process
[None, None, None, None]

poll.apply_async() (同map,只不过是一个进程,返回结果用 xx.get() 获得)

  • 共享内存使用:
    multiprocessing.Value("type", value) # 单个值 value 智能是数字类型的,具体看 type 的设置 比如 i是 有符号整形
    multiprocessing.Array("type", [single level list])
  • Pipe 管道

Pipe() 函数返回一个由管道连接的连接对象

from multiprocessing import Process, Pipe

def f(conn):
    conn.send([42, None, 'hello'])
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())   # prints "[42, None, 'hello']"
    p.join()

返回的两个连接对象 Pipe() 表示管道的两端。每个连接对象都有 send()recv() 方法

  • 资源竞争 枷锁
    lock = mp.Lock()
    把 lock 传给进程的args 参数
    在进程函数里面写 lock.acquire() 枷锁
    lock.release() 释放锁
练习过程中出现的问题:
from multiprocessing import Pool
pool = Pool()
def search(a):

    # while a<1000000:
    #     a+=1
    print("子进程结束")
    return 1
if __name__ == "__main__":
    # pool = Pool()
    a = time.time()
    result = pool.map(search, [1,2,10])
    b = time.time()
    print(b-a)

报错:

RuntimeError: 
        An attempt has been made to start a new process before the
        current process has finished its bootstrapping phase.

        This probably means that you are not using fork to start your
        child processes and you have forgotten to use the proper idiom
        in the main module:

            if __name__ == '__main__':
                freeze_support()
                ...
        The "freeze_support()" line can be omitted if the program
        is not going to be frozen to produce an executable.

参考 :https://blog.csdn.net/xiemanR/article/details/71700531

把 pool = Pool() 放到 if name == "main": 下面初始化搞定。
结果:

子进程结束
子进程结束
子进程结束
0.7355780601501465

这个肯定有解释的


spawn.py 的解释

测试多进程计算效果:
进程池运行:

class aa():

    def search(self, a):

        while a<100000000:
            a+=1
        print("子进程结束")
        return 1
if __name__ == "__main__":
    pool = Pool()
    c = aa()
    a = time.time()
    result = pool.map(c.search, [1,2,10])
    b = time.time()
    print(b-a)

结果:

子进程结束
子进程结束
子进程结束
16.176724195480347
红框是程序运行的状态

普通计算:

class aa():

    def search(self, a):

        while a<100000000:
            a+=1
        print("子进程结束")
        return 1
if __name__ == "__main__":
    pool = Pool()
    c = aa()
    a = time.time()
    # result = pool.map(c.search, [1,2,10])
    c.search(1)
    c.search(2)
    c.search(10)

    b = time.time()
    print(b-a)

我们同样传入 1 2 10 三个参数测试:

子进程结束
子进程结束
子进程结束
24.153281211853027
红框为执行计算的过程,看出来很平稳

其实对比下来开始快了一半的;
我们把循环里的数字去掉一个 0;
单进程:

子进程结束
子进程结束
子进程结束
3.1451985836029053

多进程:

子进程结束
子进程结束
子进程结束
2.4246084690093994

两次测试 单进程/进程池 分别为 0.669 和 0.772 几乎成正比的。
问题 二:
视图:
post 视图里面

class xxviews():
    def  post(request):
        music = Music()
        result = music.run()
    return ...

Music 类:

pool = Pool()
class Music():
        def __init__(self):
                pass
        def run(self):
                result  = self.search(...)
                pass
        def search(self, keyword, target_src):
             pool.map(....)     

直接报错:


好像是说 子进程自己不可以在创建子进程了

写在 类里面也 在函数里用 self.pool 调用也不行,也是相同的错误。

最后 把 pool = Pool 直接写在 search 函数里面,奇迹出现了:


class Music():
        def __init__(self):
                pass
        def run(self):
                result  = self.search(...)
                pass
        def search(self, keyword, target_src):
              pool = Pool()      
              pool.map(....)       
              pass
成功运行了

前台也能显示搜索的音乐结果了


总结一点,进程这个东西,最好 写在 直接运行的函数里面,而不是 一个函数跳来跳去。因为最后可能 是在子进程的子进程运行的,这是不许的,会报错。
还有一点,多进程运行的函数对象,不能是 lambda 函数。也许lambda 虚拟,在内存??

使用 pool.map 子进程 函数报错,导致整个 pool 挂了:
参考:https://blog.csdn.net/hedongho/article/details/79139606
主要你要,对函数内部捕获错误,而不能让异常抛出就可以了。

关于map 传多个函数参数
我一开始,就是正常思维,多个参数,搞个元祖,让参数一一对应不就行了:

class aa():

    def search(self, a,b):

        while a<10000000:
            a+=1
        print("子进程结束")
        return 1
if __name__ == "__main__":
    pool = Pool()
    c = aa()
    a = time.time()
    result = pool.map(c.search, ((1,3),))

报错:

   return self._map_async(func, iterable, mapstar, chunksize).get()
  File "E:\Program\python\lib\multiprocessing\pool.py", line 657, in get
    raise self._value
TypeError: search() missing 1 required positional argument: 'b'

参考:
https://blog.csdn.net/qq_15969343/article/details/84672527
普通的 process 当让可以穿多个参数,map 却不知道咋传的。
apply_async 和map 一样,不知道咋传的。

最简单的方法:
使用 starmap 而不是 map

class aa():

    def search(self, a,b):

        while a<10000000:
            a+=1
        print("子进程结束")
        return 1
if __name__ == "__main__":
    pool = Pool()
    c = aa()
    a = time.time()
    result = pool.starmap(c.search, ((1,3),))
    b = time.time()
    print(b-a)

结果:
子进程结束
1.8399453163146973
成功拿到结果了

关于map 和 starmap 不同的地方看源码:

def mapstar(args):
    return list(map(*args))

def starmapstar(args):
    return list(itertools.starmap(args[0], args[1]))

def map(self, func, iterable, chunksize=None):
        '''
        Apply `func` to each element in `iterable`, collecting the results
        in a list that is returned.
        '''
        return self._map_async(func, iterable, mapstar, chunksize).get()

def starmap(self, func, iterable, chunksize=None):
        '''
        Like `map()` method but the elements of the `iterable` are 
expected to
        be iterables as well and will be unpacked as arguments. Hence
        `func` and (a, b) becomes func(a, b).
        '''
        return self._map_async(func, iterable, starmapstar, chunksize).get()

关于apply_async() ,我没找到多参数的方法,大不了用 一个迭代的 starmap 实现。哈哈

关于 上面源码里面有 itertools.starmap
itertools 用法参考:
https://docs.python.org/zh-cn/3/library/itertools.html#itertool-functions

有个问题,多进程最好不要使用全部的 cpu , 因为这样可能影响其他任务,所以 在进程池 添加 process 参数 指定,cpu 个数:

Pool(processes=os.cpu_count()-1)

上面就是预留了 一个cpu 干其他事的

后面直接使用 Queue 遇到这个问题:

RuntimeError: Queue objects should only be shared between processes through inheritance

解决:
Manager().Queue() 代替 Queue()

from multiprocessing import Manager, Queue
queue = Manager().Queue()
#  queue =   Queue()

因为 queue.get() 是堵塞型的,所以可以提前判断是不是 空的,以免堵塞进程。比如下面这样:
使用 queue.empty() 空为True

 current_search = "稍等" if music_current_search.empty() else 
music_current_search.get()
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,406评论 6 503
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,732评论 3 393
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,711评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,380评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,432评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,301评论 1 301
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,145评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,008评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,443评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,649评论 3 334
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,795评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,501评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,119评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,731评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,865评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,899评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,724评论 2 354