粳稻渗入片段鉴定纯Python脚本
# 用法:python xiangengbin_final.py a
# 输入:a.rlt(3列空格分隔:染色体 位置 基因型)
# 输出:a.bin(4列表制表符分隔:染色体 起始 终止 基因型)
# 核心优化:全流程日志、隐性错误捕获、宽松参数默认值、强制输出保障
import sys
def main():
# ====================== 可调整参数(宽松默认值,确保有输出) ======================
WIN_SIZE = 3 # 滑动窗口大小(更小,适配低SNP密度)
GENO_THRESHOLD = 2 # 基因型判定阈值(更低,易得到有效分型)
FRAGMENT_THRESHOLD = 500000 # 最小片段长度(更低,避免有效片段被过滤)
# ================================================================================
# 步骤1:命令行参数检查(强制提示用法,避免无参数退出)
if len(sys.argv) < 2:
print("【错误】参数不足!")
print("用法:python xiangengbin_final.py <prefix>")
print("示例:python xiangengbin_final.py a")
sys.exit(1)
prefix = sys.argv[1]
input_rlt = f"{prefix}.rlt"
out_bin = f"{prefix}.bin"
print(f"【信息】开始执行,前缀:{prefix}")
print(f"【信息】输入文件:{input_rlt}")
print(f"【信息】输出文件:{out_bin}")
print(f"【信息】参数配置:窗口大小={WIN_SIZE},基因型阈值={GENO_THRESHOLD},片段阈值={FRAGMENT_THRESHOLD}")
# 步骤2:读取输入文件(捕获文件不存在/空文件错误)
vcf_info = []
try:
with open(input_rlt, 'r', encoding='utf-8') as f:
line_count = 0
for line in f.readlines():
line = line.strip()
if not line:
continue
line_count += 1
# 强制过滤非3列行,确保格式有效
line_split = line.split(" ")
if len(line_split) < 3:
print(f"【警告】跳过无效行(非3列):{line}")
continue
vcf_info.append(line_split)
if line_count == 0:
print(f"【错误】输入文件 {input_rlt} 为空!无有效数据可处理")
sys.exit(1)
print(f"【信息】成功读取 {input_rlt},共 {len(vcf_info)} 条有效SNP记录")
except FileNotFoundError:
print(f"【错误】输入文件 {input_rlt} 不存在!请确认文件在当前目录")
sys.exit(1)
except PermissionError:
print(f"【错误】无权限读取输入文件 {input_rlt}!")
sys.exit(1)
except Exception as e:
print(f"【错误】读取文件失败:{str(e)}")
sys.exit(1)
# 步骤3:添加扩展字段(后续存储判定结果)
for item in vcf_info:
# 添加12个扩展字段(初始值0)
item.extend([0] * 12)
print(f"【信息】成功为SNP记录添加扩展字段")
# 步骤4:统计各染色体SNP数量
chrom = [0] * 13 # 索引0未使用,1-12对应染色体1-12
valid_chrom_count = 0
for item in vcf_info:
chr_num = item[0]
if chr_num in [str(i) for i in range(1, 13)]:
chrom[int(chr_num)] += 1
valid_chrom_count += 1
if valid_chrom_count == 0:
print(f"【错误】无有效染色体记录(仅支持1-12号染色体)!")
sys.exit(1)
print(f"【信息】各染色体SNP数量:")
for i in range(1, 13):
if chrom[i] > 0:
print(f" 染色体{i}:{chrom[i]}条SNP")
# 步骤5:生成染色体累计计数(用于划分区间)
chromosome_accum = [0] * 13
count = 0
for i in range(13):
count += chrom[i]
chromosome_accum[i] = count
print(f"【信息】成功生成染色体累计计数")
# 步骤6:滑动窗口统计(核心分型步骤,带边界判断)
half_win_size = int(WIN_SIZE / 2)
ind_count, jap_count, hetero_count = 0, 0, 0
valid_geno_site = 0 # 统计有效分型位点数量
for chr_idx in range(1, 13):
chr_start = chromosome_accum[chr_idx - 1] + half_win_size
chr_end = chromosome_accum[chr_idx] - half_win_size
# 跳过SNP数量不足的染色体
if chr_start >= chr_end:
print(f"【警告】染色体{chr_idx}:SNP数量不足({chrom[chr_idx]}条),跳过窗口统计")
continue
# 遍历当前染色体有效位点
for site_idx in range(chr_start, chr_end):
# 统计窗口内各基因型数量
ind_count, jap_count, hetero_count = 0, 0, 0
for win_idx in range(site_idx - half_win_size, site_idx + half_win_size + 1):
geno = vcf_info[win_idx][2].lower() # 强制小写,兼容大小写不一致
if geno == "ind":
ind_count += 1
elif geno == "jap":
jap_count += 1
elif geno == "hetero":
hetero_count += 1
# 基因型判定(宽松阈值)
max_geno_count = max(ind_count, jap_count, hetero_count)
if max_geno_count >= GENO_THRESHOLD:
if max_geno_count == ind_count:
vcf_info[site_idx][3] = "ind"
valid_geno_site += 1
elif max_geno_count == jap_count:
vcf_info[site_idx][3] = "jap"
valid_geno_site += 1
elif max_geno_count == hetero_count:
vcf_info[site_idx][3] = "hetero"
valid_geno_site += 1
# 记录计数
vcf_info[site_idx][4] = ind_count
vcf_info[site_idx][5] = jap_count
vcf_info[site_idx][6] = hetero_count
# 填充未判定位点为unknown
for item in vcf_info:
if item[3] == 0:
item[3] = "unknown"
print(f"【信息】滑动窗口统计完成,共 {valid_geno_site} 个有效分型位点(非unknown)")
# 步骤7:识别基因型断点
raw_breakpoints = []
total_snp = chromosome_accum[-1]
if total_snp > 1:
for i in range(total_snp - 1):
curr_chr = vcf_info[i][0]
next_chr = vcf_info[i+1][0]
curr_geno = vcf_info[i][3]
next_geno = vcf_info[i+1][3]
# 染色体相同且基因型不同 → 断点
if curr_chr == next_chr and curr_geno != next_geno:
curr_pos = int(vcf_info[i][1])
next_pos = int(vcf_info[i+1][1])
breakpoint_pos = int((curr_pos + next_pos) / 2)
raw_breakpoints.append([curr_chr, curr_geno, breakpoint_pos, next_geno])
print(f"【信息】成功识别 {len(raw_breakpoints)} 个基因型断点")
# 步骤8:水稻染色体长度(硬编码,固定值)
chr_length_dict = {
'1': 43270923, '2': 35937250, '3': 36413819,
'4': 35502694, '5': 29958434, '6': 31248787,
'7': 29697621, '8': 28443022, '9': 23012720,
'10': 23207287, '11': 29021106, '12': 27531856
}
# 步骤9:构建初始基因组片段
initial_fragments = []
breakpoint_len = len(raw_breakpoints)
if breakpoint_len > 1:
for i in range(breakpoint_len - 1):
# 第一个断点:染色体起始到第一个断点
if i == 0:
frag = [raw_breakpoints[i][0], 1, raw_breakpoints[i][2], raw_breakpoints[i][1]]
initial_fragments.append(frag)
# 最后一个断点:断点到染色体末端
if i == breakpoint_len - 2:
chr_num = raw_breakpoints[i+1][0]
end_pos = chr_length_dict[chr_num]
frag = [chr_num, raw_breakpoints[i+1][2], end_pos, raw_breakpoints[i+1][3]]
initial_fragments.append(frag)
# 跨染色体处理
elif raw_breakpoints[i][0] != raw_breakpoints[i+1][0]:
# 下一条染色体起始片段
next_chr = raw_breakpoints[i+1][0]
frag1 = [next_chr, 1, raw_breakpoints[i+1][2], raw_breakpoints[i+1][1]]
# 当前染色体末端片段
curr_chr = raw_breakpoints[i][0]
curr_end = chr_length_dict[curr_chr]
frag2 = [curr_chr, raw_breakpoints[i][2] + 1, curr_end, raw_breakpoints[i][3]]
initial_fragments.extend([frag1, frag2])
# 同染色体断点间片段
elif raw_breakpoints[i][3] == raw_breakpoints[i+1][1] and raw_breakpoints[i][0] == raw_breakpoints[i+1][0]:
chr_num = raw_breakpoints[i][0]
start_pos = raw_breakpoints[i][2] + 1
end_pos = raw_breakpoints[i+1][2]
geno = raw_breakpoints[i][3]
frag = [chr_num, start_pos, end_pos, geno]
initial_fragments.append(frag)
print(f"【信息】成功构建 {len(initial_fragments)} 个初始基因组片段")
# 步骤10:过滤短片段(宽松阈值)
filtered_fragments = []
for frag in initial_fragments:
try:
start = int(frag[1])
end = int(frag[2])
geno = frag[3]
# 过滤条件:长度>阈值 + 基因型非unknown
if (end - start) > FRAGMENT_THRESHOLD and geno not in ["unknown", 0, "0"]:
filtered_fragments.append(frag)
except (ValueError, IndexError):
continue
print(f"【信息】片段过滤完成,保留 {len(filtered_fragments)} 个有效长度片段")
# 步骤11:第一次染色体排序(1-12顺序)
sorted_fragments = []
for chr_num in [str(i) for i in range(1, 13)]:
for frag in filtered_fragments:
if frag[0] == chr_num:
sorted_fragments.append(frag)
print(f"【信息】第一次染色体排序完成")
# 步骤12:二次识别断点优化片段
refined_breakpoints = []
sorted_len = len(sorted_fragments)
if sorted_len > 1:
for i in range(sorted_len - 1):
curr_chr = sorted_fragments[i][0]
next_chr = sorted_fragments[i+1][0]
curr_geno = sorted_fragments[i][3]
next_geno = sorted_fragments[i+1][3]
if curr_chr == next_chr and curr_geno != next_geno:
curr_end = int(sorted_fragments[i][2])
next_start = int(sorted_fragments[i+1][1])
breakpoint_pos = int((curr_end + next_start) / 2)
refined_breakpoints.append([curr_chr, curr_geno, breakpoint_pos, next_geno])
print(f"【信息】成功识别 {len(refined_breakpoints)} 个优化断点")
# 步骤13:重构最终片段
final_fragments = []
refined_len = len(refined_breakpoints)
if refined_len > 1:
for i in range(refined_len - 1):
# 第一个片段
if i == 0:
frag = [refined_breakpoints[i][0], 1, refined_breakpoints[i][2], refined_breakpoints[i][1]]
final_fragments.append(frag)
# 末尾片段处理
if i == refined_len - 2:
curr_chr = refined_breakpoints[i][0]
next_chr = refined_breakpoints[i+1][0]
# 跨染色体
if curr_chr != next_chr:
curr_end = chr_length_dict[curr_chr]
frag = [curr_chr, refined_breakpoints[i][2] + 1, curr_end, refined_breakpoints[i][3]]
frag1 = [next_chr, 1, refined_breakpoints[i+1][2], refined_breakpoints[i+1][1]]
frag2 = [next_chr, refined_breakpoints[i+1][2] + 1, chr_length_dict[next_chr], refined_breakpoints[i+1][3]]
final_fragments.extend([frag, frag1, frag2])
# 同染色体(与前一条不同)
elif curr_chr == next_chr and refined_breakpoints[i][0] != refined_breakpoints[i-1][0]:
frag = [curr_chr, 1, refined_breakpoints[i][2], refined_breakpoints[i][1]]
frag1 = [curr_chr, refined_breakpoints[i][2] + 1, refined_breakpoints[i+1][2], refined_breakpoints[i][3]]
frag2 = [curr_chr, refined_breakpoints[i+1][2] + 1, chr_length_dict[curr_chr], refined_breakpoints[i+1][3]]
final_fragments.extend([frag, frag1, frag2])
# 同染色体(与前一条相同)
elif curr_chr == next_chr and refined_breakpoints[i][0] == refined_breakpoints[i-1][0]:
frag = [curr_chr, refined_breakpoints[i-1][2] + 1, refined_breakpoints[i][2], refined_breakpoints[i][1]]
frag1 = [curr_chr, refined_breakpoints[i][2] + 1, refined_breakpoints[i+1][2], refined_breakpoints[i][3]]
frag2 = [curr_chr, refined_breakpoints[i+1][2] + 1, chr_length_dict[curr_chr], refined_breakpoints[i+1][3]]
final_fragments.extend([frag, frag1, frag2])
# 跨染色体(非末尾)
elif refined_breakpoints[i][0] != refined_breakpoints[i+1][0] and i <= refined_len - 3:
next_chr = refined_breakpoints[i+1][0]
frag1 = [next_chr, 1, refined_breakpoints[i+1][2], refined_breakpoints[i+1][1]]
curr_chr = refined_breakpoints[i][0]
curr_end = chr_length_dict[curr_chr]
frag2 = [curr_chr, refined_breakpoints[i][2] + 1, curr_end, refined_breakpoints[i][3]]
final_fragments.extend([frag1, frag2])
# 同染色体断点间(非末尾)
elif refined_breakpoints[i][3] == refined_breakpoints[i+1][1] and refined_breakpoints[i][0] == refined_breakpoints[i+1][0] and i <= refined_len - 3:
chr_num = refined_breakpoints[i][0]
start_pos = refined_breakpoints[i][2] + 1
end_pos = refined_breakpoints[i+1][2]
geno = refined_breakpoints[i][3]
frag = [chr_num, start_pos, end_pos, geno]
final_fragments.append(frag)
print(f"【信息】成功重构 {len(final_fragments)} 个优化片段")
# 步骤14:补充缺失染色体(确保12条染色体都有输出,避免缺失)
existing_chroms = [frag[0] for frag in final_fragments]
for chr_num in [str(i) for i in range(1, 13)]:
if chr_num not in existing_chroms:
# 优先获取有效基因型,无则默认unknown
default_geno = "unknown"
for item in vcf_info:
if item[0] == chr_num and item[3] != "unknown":
default_geno = item[3]
break
# 补充全长片段
default_frag = [chr_num, 1, chr_length_dict[chr_num], default_geno]
final_fragments.append(default_frag)
print(f"【信息】补充缺失染色体 {chr_num},基因型:{default_geno}")
# 步骤15:第二次染色体排序(确保1-12顺序)
final_sorted_fragments = []
for chr_num in [str(i) for i in range(1, 13)]:
for frag in final_fragments:
if frag[0] == chr_num:
final_sorted_fragments.append(frag)
print(f"【信息】第二次染色体排序完成,共 {len(final_sorted_fragments)} 个最终片段")
# 步骤16:去重(避免重复片段)
unique_fragments = []
seen_frags = set()
for frag in final_sorted_fragments:
frag_tuple = (frag[0], str(frag[1]), str(frag[2]), frag[3])
if frag_tuple not in seen_frags:
seen_frags.add(frag_tuple)
unique_fragments.append(frag)
print(f"【信息】片段去重完成,保留 {len(unique_fragments)} 个唯一片段")
# 步骤17:强制写入输出文件(捕获权限错误)
try:
with open(out_bin, 'w+', encoding='utf-8') as f:
for frag in unique_fragments:
# 制表符分隔,无末尾多余字符
frag_line = "\t".join(map(str, frag))
f.write(frag_line + "\n")
print(f"【成功】输出文件 {out_bin} 已生成!")
print(f"【成功】最终结果:{len(unique_fragments)} 个基因组片段(查看 {out_bin} 获取详情)")
except PermissionError:
print(f"【错误】无权限写入输出文件 {out_bin}!请切换到有写入权限的目录")
sys.exit(1)
except Exception as e:
print(f"【错误】写入文件失败:{str(e)}")
sys.exit(1)
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print(f"【信息】用户手动终止脚本执行")
sys.exit(1)
except Exception as e:
print(f"【致命错误】脚本执行异常:{str(e)}")
sys.exit(1)
原参数199、120、300000,实际要根据SNP密度调整