perl处理字符串是真的舒服,也很省事,很多代码用perl一行解决用python却要很多行。但是不得不说python是主流趋势了。除了李恒等老一辈大神坚持perl貌似大家都在转py了。(人家主程序也是C好不)。
接触python一周,最大的感受是:用的人多,文档是真的全啊!基本你把问题描述出来就一定能找到中文的处理方式,再不行也有完备的manual可以看。对于一些复杂的功能python提供的包确实更加丰富而且因为面向对象也更加贴近主流编程思想。
多进程主要用的是multiprocessiong模块,这个模块其实是对多线程threading模块的封装,因此大部分的参数、思想都是相似的。
咱们搞生信的简单了解了解就行了:多进程 - 廖雪峰的官方网站 (liaoxuefeng.com)
研究了2天【多进程】,把我的模板贴一下:
import sys
import os
import time
import getopt
from multiprocessing import Pool #用进程池对象创建多进程
#适用于一个可以切块批量进行的文件(进程间不需要内存交流)
def vcfFormat(vcfInput): #需要并发的处理脚本
if xxx:
##do something...
else:
raise Exception("抛出异常:xxxx")
return xxx #返回结果给func run
def run(data, index, size): # 用于将文件切片。index是每个进程,size是总共的进程数
size = math.ceil(int(count) / size) #向上取整确保每个切片内的行数都包含在内
start = size * index #向上取整后需要读的文件位置起点
end = (index + 1) * size if (index + 1) * size < int(count) else int(count) #末尾
vcfInput=open(data) #因为每次进入子进程都要变化句柄的位置,所以一定要在这里重新打开并且关闭句柄(因为句柄从头读到尾是不会变的,如果在程序前面只打开一次会报IOwrapper错误就是这个原因)
temp_data = vcfInput.readlines()[start:end] #处理这部分的数据
vcfInput.close() #打开后必须关闭句柄,下此再进入才不会报错
return(vcfFormat(temp_data)) # 进入正式的处理子函数,返回格式化后的vcf数组,当然返回的并不是直接的结果而是AsyncResult对象,需要get方法提取
#main func(vcfInput是输入文件)
#获取文件行数
count = os.popen("wc -l "+vcfInput+"|cut -d \" \" -f 1").read().strip("\n")
processor = int(threads) #threads是参数指定的并行数目
res = [] #多进程的结果存在数组里
p = Pool(processor) #创建Pool实例,进程池中放入threads个进程
try: #使用try..except结构来捕获子进程抛出的异常
for i in range(processor):
#res.append(p.apply_async(run, args=(vcfInput, i, processor,),error_callback=throw_exception)) #不完美解决方案,稍后会说明
res.append(p.apply_async(run, args=(vcfInput, i, processor,))) #在for里循环apply_async异步调用线程
sys.stderr.write(str(i) + " processor started !\n") #在错误流里面监控进程构建,没啥用的一行
p.close() #关闭进程池,无法再调用了
p.join() #阻塞进程,所有子进程都结束才进行下一步(必须先关闭进程池,也就是上一行)
for i in res: #读取res数组中的结果
print(*i.get(),sep="\n") #get方法queue的方式推出进程运行结果(异常也通过get方法抛出!!)
sys.stderr.write('Time cost = %fs\n' % (time_end - time_start))
except Exception as e: #如果get抛出的使Exception对象则被except捕获并打印到标准错误流
sys.stderr.write(str(e))
p.terminate() #结束进程
OK,那么这段代码实现的是怎么样的运行方式呢:
python3 script.py [options] input.vcf >result.vcf 2>log
这样做的好处非常明显:
- 捕获子进程的错误流,使想要的结果和错误流分离
- 优雅的异常抛出、退出
当然在网上看了很多教程,在实现过程中踩了亿点点坑:
- 捕获子进程的错误流,并且停止主程序。
这应该是初学者(我)最先碰到的问题,如果简单的在并发的函数里使用sys.stderr.write记录会发现是不会被主程序的错误流记录到的。这主要原因是fork创建的是父进程的子进程,子父进程并不能用标准错误通信。也就是说你打印的错误流并不是主进程的错误流,而是打印到了那个子进程的错误流!但是使用raise抛出异常就可以被捕获传递(我的理解是类似return的机制,也会被记录到AsyncResult对象中)
- 网上的另一种解决方案是Pool.apply_async自带的参数:error_callback
#由于子函数异常所以AsyncResult记录的就是Exception对象
def throw_exception(exceptionName):
sys.stderr.write(str(exceptionName)) #打印异常
sys.stderr.write('子进程发生异常,进程号为%s' % (os.getpid()))
os.killpg(os.getpgid(os.getpid()), signal.SIGKILL) #杀掉进程结束
#在执行apply_async中,定义error_callback说明在run抛出异常后则执行error_callback指定的函数(ps这个函数只能有一个参数)
res.append(p.apply_async(run, args=(vcfInput, i, processor,),error_callback=throw_exception))
这样做不需要try..except结构也可以使程序优雅的结束。但是如果把第一点理解了的话(或者自己试试)就知道这样只能把错误尴尬的打印到屏幕,还是无法打印给主程序的错误流中。
为啥呢?很简单,因为error_callback的调用还是在apply_async的方法内,还是属于当前子进程的范围内,只不过这个子进程的第一个函数抛出了异常所以开始执行第二个子函数而已。因此在第二个函数打印错误还是打印到了子进程的错误流里,当然无法保存到主进程中。
- 返回的对象观念。try..except的触发机制
因为之前用perl基本没啥面向对象的观念,以为通过Pool调用的子函数返回后就是我们要的结果了(不论是正确的结果还是raise返回的异常)。所以有个坑是我之前的写法:
try:
#XXXXXXX
p.apply_async(xxxx)
except Exception as e:
sys.stderr.write(str(e))
p.terminate()
p.close()
p.join()
for i in res:
print(*i.get(),sep="\n")
sys.stderr.write('Time cost = %fs\n' % (time_end - time_start))
我的想法是try中的程序返回了 子函数 抛出的异常,被except捕获然后结束。如果try中的程序正常运行则p.join()阻塞后由后面的for输出正确的结果。而那些不是多进程的函数确实也是这样做的。
但是这里面就忽略了其实Pool.apply_async返回的并不是直接的结果,而是一个叫做AsyncResult的对象。
其中get()方法才是返回子进程的结果!所以如果上面那么写(而不是在try中调用get方法)except是不会捕获Exception对象的~
希望你有收获