本篇帖子的分享内容主要也是和前面大基因组拆分相呼应的,利用前面拆分的基因组文件,对变异检测结果文件中拆分的染色体部分进行合并,并输出最终结果。
work_path=工作目录
#前期拆分基因组时生成的文件,也可手动做成
path/to/contrast.id
## 格式如下:
#拆分后染色体号 旧染色体号 要更改为的新染色体号 原有坐标-现有坐标 拆分后对应染色体长度
Chr01_S1 Chr01 new_01 0 551383505
Chr01_S2 Chr01 new_01 91897250 551383505
Chr01_S3 Chr01 new_01 183794500 551383505
Chr01_S4 Chr01 new_01 275691750 551383505
Chr01_S5 Chr01 new_01 367589000 551383505
#要更改的vcf,压缩非压缩均可
input.vcf(.gz)
#输出vcf文件,gz格式的
out.vcf.gz
python add_vcf.py -p ${work_path} \
-c path/to/contrast.id \
-v input.vcf(.gz) -o out.cvf.gz
脚本内容如下:
import argparse
import gzip
import re
parser = argparse.ArgumentParser()
parser.add_argument('-p', "--path",dest = "p", default=".", #metavar=", defining metavar is causing an error for some reason
help = "work path"
)
parser.add_argument('-c', "--contrast_id",dest = "c", default="contrast.id", #metavar=", defining metavar is causing an error for some reason
help = "contrast.id"
)
parser.add_argument('-v', "--vcf",dest = "v", default="input.vcf.gz", #metavar=", defining metavar is causing an error for some reason
help = "old vcf"
)
parser.add_argument('-o', "--out_file",dest = "o", default = "input_add.vcf.gz", #metavar=", defining metavar is causing an error for some reason
help = "outfile"
)
args = parser.parse_args()
contarst = args.p + "/" + args.c
work_dir = args.p
outfile = args.p + "/" + args.o
vcffile = args.p + "/" + args.v
name_dic = {}
with open(contarst,'r')as f1:
for l in f1:
name_dic[l.strip().split("\t")[0]] = l.strip().split("\t")[1:]
change_id = []
with gzip.open(outfile , 'wb')as outvcf:
if 'gz' in vcffile:
with gzip.open(vcffile , 'rb')as vcf:
for l in vcf:
line = l.decode()
if line[0] == '#':
if line[:10] == "##contig=<":
Id = re.search("ID=.*,{1}",line).group()[3:-1].split(",")[0]
length = re.search("length=.*,{1}",line).group()[7:-1]
if name_dic[Id][1] != Id and name_dic[Id][1] not in change_id:
outline = "##contig=<ID=" + name_dic[Id][1] +",length=" + name_dic[Id][-1] + ",assembly=unknown>\n"
outvcf.write(outline.encode())
change_id.append(name_dic[Id][1])
elif name_dic[Id][1] == Id and name_dic[Id][1] not in change_id:
outvcf.write(l)
else:
outvcf.write(l)
else:
lst = line.strip().split("\t")
Id = lst[0]
new_pos = str(int(lst[1]) + int(name_dic[Id][-2]))
outline = name_dic[Id][1] + "\t" + new_pos + "\t" + "\t".join(lst[2:]) + "\n"
outvcf.write(outline.encode())
else:
with open(vcffile , 'r')as vcf:
for line in vcf:
if line[0] == '#':
if line[:10] == "##contig=<":
Id = re.search("ID=.*,{1}",line).group()[3:-1].split(",")[0]
length = re.search("length=.*,{1}",line).group()[7:-1]
if name_dic[Id][1] != Id and name_dic[Id][1] not in change_id:
outline = "##contig=<ID=" + name_dic[Id][1] +",length=" + name_dic[Id][-1] + ",assembly=unknown>\n"
outvcf.write(outline.encode())
change_id.append(name_dic[Id][1])
elif name_dic[Id][1] == Id and name_dic[Id][1] not in change_id:
outvcf.write(line.encode())
else:
outvcf.write(line.encode())
else:
lst = line.strip().split("\t")
Id = lst[0]
new_pos = str(int(lst[1]) + int(name_dic[Id][-2]))
outline = name_dic[Id][1] + "\t" + new_pos + "\t" + "\t".join(lst[2:]) + "\n"
outvcf.write(outline.encode())
往期内容
整理不易,给个赞再走呗~~