python multiprocessing模块实现多进程任务中运行多进程子任务

python multiprocessing模块实现多进程任务中运行多进程子任务,并实现并发控制。
起因是想使用celery+ansible做任务执行与回收,代码写好后,发现卧槽 celery不允许他执行的任务再建立子进程,这就比较尴尬了,封装好的ansible接口不能用?? 那怎么做多任务自动执行呢?

研究celery 发现他的默认执行的多进程机制是multiprocessing模块的Pool,通过代码测试这个模块,发现他也不允许自己的任务再建立子进程,于是乎大概明白什么回事。 后边可以研究下如何修改celery worker的默认并发机制。

不过本次想要绕过他,于是测试了multiprocessing的Process模块,发现这个模块是允许任务中建立子进程的。
然后就开始自己写任务执行器,先拿mysql简单做个任务队列,测通后再换kafka或者其他。
测试完成后,任务可以并行执行了,但是节奏得控制啊,要不好几万并发 自己不就挂了?翻了半天文档发现multiprocessing的Semaphore模块可以做到,测试后,代码如下。

#!/bin/python

from multiprocessing import Pool
from multiprocessing import Process,Semaphore,current_process
import sys, os, time, random, json

project_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.realpath('__file__'))))
sys.path.append(project_dir)
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "wk_api.settings")

import django

django.setup()

from wkexe.models import WorkTables
from wkexe.models import ExecuteTables
from utils.ansible_api import ANSRunner


def RunPlaybook(ip, ymldir):
    s.acquire()
    print(time.strftime('%H:%M:%S'), current_process().name + " 获得锁运行");
    time.sleep(random.random() * 5)
    rbt = ANSRunner(ip)
    rbt.run_playbook(playbook_path='%s' % (ymldir))
    result = json.dumps(rbt.get_playbook_result(), indent=4)
    print(result)
    s.release()
    print(time.strftime('%H:%M:%S'), current_process().name + " 释放锁结束");


def RunModel(ip, model, module_args):
    s.acquire()
    print(time.strftime('%H:%M:%S'), current_process().name + " 获得锁运行");
    rbt = ANSRunner(ip)
    rbt.run_model(model, module_args)
    result = json.dumps(rbt.get_model_result(), indent=4)
    print(result)
    s.release()
    print(time.strftime('%H:%M:%S'), current_process().name + " 释放锁结束");



if __name__ == '__main__':
    while True:
        p_list = []
        works = WorkTables.objects.all().filter(status=10)
        for work in works:
            concurrent = work.concurrent  # 并发参数
            executeInfo = work.executetables_set.all()  # 需要执行的设备
            taskType = work.taskname.tasktype  # 任务类型
            p = Pool(1)
            for ip in executeInfo:
                ipAdd = ip.ip
                ipId = ip.id
                if taskType == 0:
                    ymlDir = work.taskname.taskymldir
                    print(ymlDir)
                    p = Process(target=RunPlaybook, args=(ipAdd, ymlDir))
                    # p.start()
                    # p.join()
                    # p.apply_async(RunPlaybook, args=(ipAdd, ymlDir))
                    p_list.append(p)
                elif taskType == 1:
                    model = work.taskname.taskmodel
                    modelArgs = work.taskname.taskargs
                    p = Process(target=RunModel, args=(ipAdd, model, modelArgs))
                    # p.start()
                    # p.join()
                    # p.apply_async(RunModel, args=(ipAdd, model, modelArgs))
                    p_list.append(p)
                else:
                    print("未定义的任务类型")

        s = Semaphore(concurrent) #用来控制对共享资源的访问数量,可以控制同一时刻并发的进程数
        for p in p_list:
            p.start()

        #
        for p in p_list:
            # p.close()
            p.join()
        print("执行完成")
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

  • 漫山桃花夭,倾天雪花飘。坐看青松变琼枝,枝头春芽笑。 有心把春闹,沉睡晚来到。洒下清白在人间,不与...
    方叔书阅读 579评论 0 5
  • 在前一篇文章中,我们讨论了什么是架构。事实上,这些基础概念对于做架构是非常重要的,大部分人对于每天都习以为常的概念...
    阿凡提说AI阅读 262评论 0 1
  • 高楼 落地窗 冬天
    前辈s阅读 305评论 0 0
  • 感谢大家一路以来的支持,小编好开心,好想喝两杯 交朋友大家都喜欢 交朋友最怕的是我把你当朋友你却把我当偶像 交闺蜜...
    爱叮叮阅读 1,762评论 0 1

友情链接更多精彩内容