2021-08-03 【生信】用multiprocessing多进程处理数据

perl处理字符串是真的舒服,也很省事,很多代码用perl一行解决用python却要很多行。但是不得不说python是主流趋势了。除了李恒等老一辈大神坚持perl貌似大家都在转py了。(人家主程序也是C好不)。

接触python一周,最大的感受是:用的人多,文档是真的全啊!基本你把问题描述出来就一定能找到中文的处理方式,再不行也有完备的manual可以看。对于一些复杂的功能python提供的包确实更加丰富而且因为面向对象也更加贴近主流编程思想。
多进程主要用的是multiprocessiong模块,这个模块其实是对多线程threading模块的封装,因此大部分的参数、思想都是相似的。
咱们搞生信的简单了解了解就行了:多进程 - 廖雪峰的官方网站 (liaoxuefeng.com)

研究了2天【多进程】,把我的模板贴一下:

import sys
import os
import time
import getopt
from multiprocessing import Pool #用进程池对象创建多进程
#适用于一个可以切块批量进行的文件(进程间不需要内存交流)
def vcfFormat(vcfInput):  #需要并发的处理脚本
    if xxx:
    ##do something...
    else:
        raise Exception("抛出异常:xxxx")
    return xxx #返回结果给func run

def run(data, index, size):  # 用于将文件切片。index是每个进程,size是总共的进程数
    size = math.ceil(int(count) / size) #向上取整确保每个切片内的行数都包含在内
    start = size * index #向上取整后需要读的文件位置起点
    end = (index + 1) * size if (index + 1) * size < int(count) else int(count) #末尾
    vcfInput=open(data) #因为每次进入子进程都要变化句柄的位置,所以一定要在这里重新打开并且关闭句柄(因为句柄从头读到尾是不会变的,如果在程序前面只打开一次会报IOwrapper错误就是这个原因)
    temp_data = vcfInput.readlines()[start:end] #处理这部分的数据
    vcfInput.close() #打开后必须关闭句柄,下此再进入才不会报错
    return(vcfFormat(temp_data))  # 进入正式的处理子函数,返回格式化后的vcf数组,当然返回的并不是直接的结果而是AsyncResult对象,需要get方法提取

#main func(vcfInput是输入文件)
#获取文件行数
count = os.popen("wc -l "+vcfInput+"|cut -d \" \" -f 1").read().strip("\n")
processor = int(threads) #threads是参数指定的并行数目
res = [] #多进程的结果存在数组里
p = Pool(processor) #创建Pool实例,进程池中放入threads个进程
try: #使用try..except结构来捕获子进程抛出的异常
    for i in range(processor):
        #res.append(p.apply_async(run, args=(vcfInput, i, processor,),error_callback=throw_exception)) #不完美解决方案,稍后会说明
        res.append(p.apply_async(run, args=(vcfInput, i, processor,))) #在for里循环apply_async异步调用线程
        sys.stderr.write(str(i) + " processor started !\n") #在错误流里面监控进程构建,没啥用的一行
    p.close() #关闭进程池,无法再调用了
    p.join() #阻塞进程,所有子进程都结束才进行下一步(必须先关闭进程池,也就是上一行)
    for i in res: #读取res数组中的结果
        print(*i.get(),sep="\n") #get方法queue的方式推出进程运行结果(异常也通过get方法抛出!!)
    sys.stderr.write('Time cost = %fs\n' % (time_end - time_start))
except Exception as e: #如果get抛出的使Exception对象则被except捕获并打印到标准错误流
    sys.stderr.write(str(e))
    p.terminate() #结束进程

OK,那么这段代码实现的是怎么样的运行方式呢:

python3 script.py [options] input.vcf  >result.vcf 2>log

这样做的好处非常明显:

  1. 捕获子进程的错误流,使想要的结果和错误流分离
  2. 优雅的异常抛出、退出

当然在网上看了很多教程,在实现过程中踩了亿点点坑:

  • 捕获子进程的错误流,并且停止主程序。

这应该是初学者(我)最先碰到的问题,如果简单的在并发的函数里使用sys.stderr.write记录会发现是不会被主程序的错误流记录到的。这主要原因是fork创建的是父进程的子进程,子父进程并不能用标准错误通信。也就是说你打印的错误流并不是主进程的错误流,而是打印到了那个子进程的错误流!但是使用raise抛出异常就可以被捕获传递(我的理解是类似return的机制,也会被记录到AsyncResult对象中)

  • 网上的另一种解决方案是Pool.apply_async自带的参数:error_callback
#由于子函数异常所以AsyncResult记录的就是Exception对象
def throw_exception(exceptionName): 
    sys.stderr.write(str(exceptionName)) #打印异常
    sys.stderr.write('子进程发生异常,进程号为%s' % (os.getpid()))
    os.killpg(os.getpgid(os.getpid()), signal.SIGKILL) #杀掉进程结束

#在执行apply_async中,定义error_callback说明在run抛出异常后则执行error_callback指定的函数(ps这个函数只能有一个参数)
res.append(p.apply_async(run, args=(vcfInput, i, processor,),error_callback=throw_exception))

这样做不需要try..except结构也可以使程序优雅的结束。但是如果把第一点理解了的话(或者自己试试)就知道这样只能把错误尴尬的打印到屏幕,还是无法打印给主程序的错误流中。
为啥呢?很简单,因为error_callback的调用还是在apply_async的方法内,还是属于当前子进程的范围内,只不过这个子进程的第一个函数抛出了异常所以开始执行第二个子函数而已。因此在第二个函数打印错误还是打印到了子进程的错误流里,当然无法保存到主进程中。

  • 返回的对象观念。try..except的触发机制
    因为之前用perl基本没啥面向对象的观念,以为通过Pool调用的子函数返回后就是我们要的结果了(不论是正确的结果还是raise返回的异常)。所以有个坑是我之前的写法:
try:
    #XXXXXXX
    p.apply_async(xxxx)
except Exception as e:
    sys.stderr.write(str(e))
    p.terminate()
p.close()
p.join()
for i in res:
    print(*i.get(),sep="\n")
sys.stderr.write('Time cost = %fs\n' % (time_end - time_start))

我的想法是try中的程序返回了 子函数 抛出的异常,被except捕获然后结束。如果try中的程序正常运行则p.join()阻塞后由后面的for输出正确的结果。而那些不是多进程的函数确实也是这样做的。
但是这里面就忽略了其实Pool.apply_async返回的并不是直接的结果,而是一个叫做AsyncResult的对象。

AsyncResult中的方法

其中get()方法才是返回子进程的结果!所以如果上面那么写(而不是在try中调用get方法)except是不会捕获Exception对象的~


希望你有收获

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,921评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,635评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,393评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,836评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,833评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,685评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,043评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,694评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,671评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,670评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,779评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,424评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,027评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,984评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,214评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,108评论 2 351
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,517评论 2 343

推荐阅读更多精彩内容