#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
小麦660K芯片变异定位工具
功能:通过比对探针序列与参考基因组,精确定位小麦660K芯片中SNP和Indel变异的基因组位置,
并按优先级(1.比对质量 2.正向链匹配)去重,最终生成标准化的变异位点信息。
"""
import pandas as pd
import re
import subprocess
import argparse
import logging
import os
import sys
import time
from pathlib import Path
import pysam
from concurrent.futures import ThreadPoolExecutor
from typing import List, Tuple, Dict, Optional, Iterator, Any
from functools import wraps
# 配置日志系统
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[logging.StreamHandler()]
)
logger = logging.getLogger(__name__)
# 预编译正则表达式(支持SNP和Indel变异模式)
PROBE_PATTERN = re.compile(r'\[([ATCG-]+)/([ATCG-]+)\]') # 匹配变异标记,如[A/T]或[ATCG/-]
VALID_DNA = re.compile(r'^[ATCG-]+$') # 验证DNA序列合法性
# 碱基互补字典
COMPLEMENT_BASE = {'A': 'T', 'T': 'A', 'C': 'G', 'G': 'C', '-': '-'}
def reverse_complement(seq: str) -> str:
"""计算DNA序列的反向互补链"""
complement = {'A': 'T', 'T': 'A', 'C': 'G', 'G': 'C', '-': '-'}
return ''.join([complement[c] for c in reversed(seq)])
def get_complement_base(base: str) -> str:
"""获取单个碱基的互补碱基"""
return COMPLEMENT_BASE.get(base, base)
def timer(func):
"""性能计时装饰器,记录函数执行时间"""
@wraps(func)
def wrapper(*args, **kwargs):
start_time = time.perf_counter()
result = func(*args, **kwargs)
end_time = time.perf_counter()
logger.info(f"{func.__name__} 执行时间: {end_time - start_time:.2f}秒")
return result
return wrapper
def check_bwa_version() -> bool:
"""检查BWA版本是否支持-evalue参数(用于调整比对严格度)"""
try:
result = subprocess.run(
['bwa', 'mem', '--help'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
return '-evalue' in result.stdout
except Exception as e:
logger.warning(f"无法检查BWA版本: {str(e)}")
return False
def read_probe_data(csv_path: Path, chunksize: int = 50000) -> Iterator[pd.DataFrame]:
"""分块读取CSV探针数据(避免大文件内存溢出)"""
if not csv_path.exists():
raise FileNotFoundError(f"探针数据文件不存在: {csv_path}")
# 检查文件是否包含必要的列
sample_df = pd.read_csv(csv_path, nrows=1)
required_columns = ['probeset_id', 'Flank']
for col in required_columns:
if col not in sample_df.columns:
raise ValueError(f"CSV文件缺少必要的列: {col}")
logger.info(f"分块读取CSV文件,每块 {chunksize} 行")
return pd.read_csv(csv_path, chunksize=chunksize)
def parse_probe_sequence(flank: str) -> Tuple[Optional[str], Optional[str], Optional[int],
Optional[str], Optional[str]]:
"""解析探针序列中的变异信息(从Flank列提取)"""
match = PROBE_PATTERN.search(flank)
if not match:
return None, None, None, None, None # 无变异标记的探针
ref_allele = match.group(1) # 参考等位基因(如[A/T]中的A)
alt_allele = match.group(2) # 变异等位基因(如[A/T]中的T)
variant_pos = match.start() # 变异标记在原始序列中的起始位置('['的索引)
# 生成参考型和变异型完整序列(替换变异标记)
ref_sequence = flank.replace(match.group(0), ref_allele)
alt_sequence = flank.replace(match.group(0), alt_allele)
return ref_allele, alt_allele, variant_pos, ref_sequence, alt_sequence
def preprocess_original_probes(csv_path: Path) -> Dict[str, Tuple[str, str]]:
"""预处理原始CSV,建立probeset_id到变异碱基的映射(用于后续验证)"""
logger.info(f"预处理原始CSV文件: {csv_path}")
probe_variants = {}
for chunk in pd.read_csv(csv_path, chunksize=50000):
for _, row in chunk.iterrows():
probe_id = row['probeset_id']
flank = row['Flank']
ref_allele, alt_allele, _, _, _ = parse_probe_sequence(flank)
if ref_allele and alt_allele:
probe_variants[probe_id] = (ref_allele, alt_allele)
logger.debug(f"探针 {probe_id} 原始变异: {ref_allele}/{alt_allele}")
logger.info(f"预处理完成,共加载 {len(probe_variants)} 个探针的变异信息")
return probe_variants
def get_reference_base(ref_genome: Path, chrom: str, pos: int) -> Optional[str]:
"""从参考基因组获取指定位置的碱基(1-based坐标)"""
try:
# 使用samtools faidx提取单碱基(需提前对参考基因组建立索引)
result = subprocess.run(
['samtools', 'faidx', str(ref_genome), f"{chrom}:{pos}-{pos}"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
timeout=10
)
# 解析结果(第二行为碱基序列)
lines = result.stdout.strip().split('\n')
if len(lines) >= 2 and lines[0].startswith(f">{chrom}:{pos}"):
base = lines[1].upper()
logger.debug(f"提取参考碱基 {chrom}:{pos} -> {base}")
return base
logger.debug(f"无法获取 {chrom}:{pos} 的参考碱基")
return None
except Exception as e:
logger.warning(f"获取参考碱基失败 {chrom}:{pos}: {str(e)}")
return None
def process_probe_row(row) -> Tuple[List[str], List[str], List[str], List[str]]:
"""处理单条探针数据,生成双端FASTQ格式的序列(用于BWA比对)"""
probe_id = row.probeset_id
flank = row.Flank
ref_allele, alt_allele, variant_pos, ref_sequence, alt_sequence = parse_probe_sequence(flank)
if ref_sequence is None:
logger.debug(f"探针 {probe_id} 序列格式无效(无变异标记),跳过")
return [], [], [], []
# 验证序列合法性(仅允许ATCG和-,-表示缺失)
if not VALID_DNA.match(ref_sequence) or not VALID_DNA.match(alt_sequence):
logger.debug(f"探针 {probe_id} 包含非法字符(仅允许ATCG-),跳过")
return [], [], [], []
# 生成质量值(Q40,对应ASCII码'I',表示最高质量)
qual_ref = 'I' * len(ref_sequence)
qual_alt = 'I' * len(alt_sequence)
# 生成R2反向互补序列(双端测序的R2为R1的反向互补链)
ref_sequence_r2 = reverse_complement(ref_sequence)
alt_sequence_r2 = reverse_complement(alt_sequence)
# 再次验证长度匹配(序列与质量值必须等长)
if len(ref_sequence) != len(qual_ref) or len(alt_sequence) != len(qual_alt):
logger.debug(f"探针 {probe_id} 序列与质量值长度不匹配,跳过")
return [], [], [], []
# 构建FASTQ条目(格式:@ID\n序列\n+\n质量值\n)
ref_r1 = [f"@{probe_id}|{variant_pos}|ref/1\n{ref_sequence}\n+\n{qual_ref}\n"]
ref_r2 = [f"@{probe_id}|{variant_pos}|ref/2\n{ref_sequence_r2}\n+\n{qual_ref}\n"]
alt_r1 = [f"@{probe_id}|{variant_pos}|alt/1\n{alt_sequence}\n+\n{qual_alt}\n"]
alt_r2 = [f"@{probe_id}|{variant_pos}|alt/2\n{alt_sequence_r2}\n+\n{qual_alt}\n"]
return ref_r1, ref_r2, alt_r1, alt_r2
@timer
def generate_fastq(csv_path: Path, output_dir: Path, threads: int = 4) -> Tuple[Path, Path, Path, Path]:
"""生成双端FASTQ文件(参考和变异等位基因分别生成)"""
ref_r1_path = output_dir / "ref_R1.fastq" # 参考等位基因R1
ref_r2_path = output_dir / "ref_R2.fastq" # 参考等位基因R2(反向互补)
alt_r1_path = output_dir / "alt_R1.fastq" # 变异等位基因R1
alt_r2_path = output_dir / "alt_R2.fastq" # 变异等位基因R2(反向互补)
logger.info(f"生成双端FASTQ文件:")
logger.info(f"参考样本: {ref_r1_path}, {ref_r2_path}")
logger.info(f"变异样本: {alt_r1_path}, {alt_r2_path}")
# 确保输出目录存在
output_dir.mkdir(parents=True, exist_ok=True)
with open(ref_r1_path, 'w') as ref_r1, \
open(ref_r2_path, 'w') as ref_r2, \
open(alt_r1_path, 'w') as alt_r1, \
open(alt_r2_path, 'w') as alt_r2, \
ThreadPoolExecutor(max_workers=threads) as executor:
total_probes = 0
valid_probes = 0
for chunk_idx, chunk in enumerate(read_probe_data(csv_path)):
logger.debug(f"处理第 {chunk_idx + 1} 块数据,{len(chunk)} 条探针")
results = executor.map(process_probe_row, chunk.itertuples(index=False))
for ref_r1_entries, ref_r2_entries, alt_r1_entries, alt_r2_entries in results:
total_probes += 1
if ref_r1_entries and ref_r2_entries and alt_r1_entries and alt_r2_entries:
ref_r1.writelines(ref_r1_entries)
ref_r2.writelines(ref_r2_entries)
alt_r1.writelines(alt_r1_entries)
alt_r2.writelines(alt_r2_entries)
valid_probes += 1
logger.info(f"FASTQ生成完成: 总探针 {total_probes}, 有效 {valid_probes}")
return ref_r1_path, ref_r2_path, alt_r1_path, alt_r2_path
def run_bwa_alignment_pair(read1: Path, read2: Path, ref_index: str, output_sam: Path,
threads: int = 8, relax: bool = False, supports_evalue: bool = True) -> None:
"""运行BWA双端比对(处理一对R1/R2文件,生成SAM格式结果)"""
# 验证索引文件是否存在(BWA索引需提前用bwa index构建)
index_extensions = ['.bwt', '.pac', '.ann', '.amb', '.sa']
if not any(Path(f"{ref_index}{ext}").exists() for ext in index_extensions):
raise FileNotFoundError(f"BWA索引文件不存在: {ref_index}(需提前用bwa index构建)")
logger.info(f"BWA比对: {read1} + {read2} -> {output_sam}")
# 构建BWA命令
bwa_cmd = [
'bwa', 'mem',
'-t', str(threads), # 线程数
ref_index, # 参考基因组索引前缀
str(read1), # R1输入文件
str(read2) # R2输入文件
]
# 放宽比对参数(适用于短探针或低复杂度序列,提高比对成功率)
if relax:
if supports_evalue:
# 新版本BWA参数(支持-evalue)
bwa_cmd.extend(['-evalue', '1e-5', '-word_size', '10', '-k', '10'])
else:
# 旧版本BWA兼容参数
bwa_cmd.extend(['-w', '100', '-d', '100', '-r', '1.5', '-k', '10'])
try:
with open(output_sam, 'w') as sam_out:
result = subprocess.run(
bwa_cmd,
stdout=sam_out,
stderr=subprocess.PIPE,
check=True,
text=True
)
# 验证SAM文件非空(空文件可能表示输入序列异常或索引错误)
if os.path.getsize(output_sam) == 0:
raise RuntimeError("BWA生成空SAM文件,可能输入序列异常或索引错误")
except subprocess.CalledProcessError as e:
logger.error(f"BWA比对失败: {e.stderr}")
raise RuntimeError("BWA比对失败,请检查索引和输入文件")
@timer
def run_bwa_samples(ref_r1: Path, ref_r2: Path, alt_r1: Path, alt_r2: Path,
ref_index: str, output_dir: Path, threads: int = 8, relax: bool = False) -> Tuple[Path, Path]:
"""为参考和变异两个样本运行BWA双端比对"""
ref_sam = output_dir / "ref_alignments.sam" # 参考等位基因比对结果
alt_sam = output_dir / "alt_alignments.sam" # 变异等位基因比对结果
supports_evalue = check_bwa_version() # 检查BWA版本支持的参数
# 比对参考等位基因样本
run_bwa_alignment_pair(ref_r1, ref_r2, ref_index, ref_sam, threads, relax, supports_evalue)
# 比对变异等位基因样本
run_bwa_alignment_pair(alt_r1, alt_r2, ref_index, alt_sam, threads, relax, supports_evalue)
return ref_sam, alt_sam
def clean_sam(sam_path: Path, cleaned_sam_path: Path) -> None:
"""清理SAM文件中的异常行(CIGAR字符串与序列长度不匹配的记录)"""
logger.info(f"清理SAM文件: {sam_path} -> {cleaned_sam_path}")
invalid_lines = 0
with open(sam_path, 'r') as in_sam, open(cleaned_sam_path, 'w') as out_sam:
for line_num, line in enumerate(in_sam):
# 保留表头(以@开头的行,如@HD、@SQ等)
if line.startswith('@'):
out_sam.write(line)
continue
# 解析SAM记录(至少11个字段)
fields = line.strip().split('\t')
if len(fields) < 11:
invalid_lines += 1
continue
qname, flag, rname, pos, mapq, cigar, rnext, pnext, tlen, seq, qual = fields[:11]
# 计算CIGAR总长度(需与序列长度匹配)
cigar_length = 0
cigar_match = re.findall(r'(\d+)([MIDNSHPX=])', cigar)
for length, op in cigar_match:
# 这些操作会贡献查询序列长度(M/I/S/=/X/H)
if op in ['M', 'I', 'S', '=', 'X', 'H']:
cigar_length += int(length)
# 检查CIGAR与序列长度是否匹配(不匹配的为异常行)
if len(seq) != cigar_length:
invalid_lines += 1
logger.debug(f"SAM行 {line_num+1} 无效: CIGAR长度 {cigar_length} 与序列长度 {len(seq)} 不匹配")
continue
# 保留有效行
out_sam.write(line)
logger.info(f"SAM清理完成: 无效行 {invalid_lines},保留 {line_num + 1 - invalid_lines} 行")
@timer
def convert_to_bam(sam_file: Path, output_bam: Path) -> None:
"""转换SAM到BAM并排序(先清理异常行,减少后续分析错误)"""
logger.info(f"SAM->BAM: {sam_file} -> {output_bam}")
# 先清理SAM文件(移除异常行)
cleaned_sam = sam_file.with_suffix('.cleaned.sam')
clean_sam(sam_file, cleaned_sam)
# 转换清理后的SAM为BAM并排序
view_cmd = ['samtools', 'view', '-bS', str(cleaned_sam)] # SAM转BAM(二进制格式,节省空间)
sort_cmd = ['samtools', 'sort', '-o', str(output_bam)] # 排序BAM(按坐标排序)
try:
# 管道执行转换和排序(减少中间文件)
view_proc = subprocess.Popen(view_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
sort_proc = subprocess.run(
sort_cmd,
stdin=view_proc.stdout,
stderr=subprocess.PIPE,
text=True,
check=True
)
# 检查错误信息
view_stderr = view_proc.communicate()[1]
if view_proc.returncode != 0:
raise RuntimeError(f"samtools view失败: {view_stderr}")
if sort_proc.stderr:
logger.debug(f"samtools sort信息: {sort_proc.stderr}")
# 创建CSI索引(支持超长染色体,小麦基因组可能包含超长scaffold)
subprocess.run(['samtools', 'index', '-c', str(output_bam)], check=True)
logger.info(f"生成CSI索引: {output_bam}.csi(支持超长染色体)")
except subprocess.CalledProcessError as e:
logger.error(f"samtools失败: {e.stderr}")
raise
finally:
# 清理临时文件(清理后的SAM)
if cleaned_sam.exists():
cleaned_sam.unlink()
@timer
def convert_samples_to_bam(ref_sam: Path, alt_sam: Path, output_dir: Path) -> Tuple[Path, Path]:
"""转换两个样本的SAM到BAM(参考和变异等位基因分别处理)"""
ref_bam = output_dir / "ref_alignments_sorted.bam" # 参考等位基因排序BAM
alt_bam = output_dir / "alt_alignments_sorted.bam" # 变异等位基因排序BAM
convert_to_bam(ref_sam, ref_bam)
convert_to_bam(alt_sam, alt_bam)
return ref_bam, alt_bam
@timer
def parse_bam_results(bam_file: Path, sample_type: str, output_csv: Path, min_mapq: int = 20) -> pd.DataFrame:
"""解析BAM文件,提取变异位点的基因组位置及相关信息"""
logger.info(f"解析BAM: {bam_file} (样本类型: {sample_type})")
# 检查并创建CSI索引(若不存在)
csi_index = Path(f"{bam_file}.csi")
if not csi_index.exists():
pysam.index(str(bam_file), '-c') # 创建CSI索引
results = []
with pysam.AlignmentFile(bam_file, "rb", index_filename=str(csi_index)) as bam:
for read in bam:
# 只处理R1读段(避免与R2重复计数,两者来自同一模板)
if read.is_read2:
continue
# 过滤未比对或低质量比对的读段(mapping_quality < min_mapq)
if read.is_unmapped or read.mapping_quality < min_mapq:
continue
# 过滤次要比对和嵌合比对(仅保留主要比对)
if read.is_secondary or read.is_supplementary:
continue
# 解析读段ID中的信息(格式:probe_id|variant_pos|type/1)
read_id = read.query_name
parts = read_id.split('|')
if len(parts) != 3:
continue # 格式异常的读段ID,跳过
probe_id, var_pos_str, _ = parts
try:
var_pos_in_probe = int(var_pos_str) # 变异在探针中的位置(0-based)
except ValueError:
continue # 位置格式异常,跳过
# 获取比对的基本信息
chrom = bam.get_reference_name(read.reference_id) # 染色体名称(如chr1A)
strand = '+' if not read.is_reverse else '-' # 链方向(+为正向,-为反向)
ref_positions = read.get_reference_positions(full_length=True) # 碱基级基因组位置
# 定位变异位点对应的基因组位置
if 0 <= var_pos_in_probe < len(ref_positions):
genomic_pos_0based = ref_positions[var_pos_in_probe]
if genomic_pos_0based is not None: # 排除未比对的位置(如软裁剪区域)
genomic_pos = genomic_pos_0based + 1 # 转换为1-based坐标(生物学常用)
results.append({
'probeset_id': probe_id,
'sample_type': sample_type,
'chromosome': chrom,
'genomic_position': genomic_pos,
'strand': strand,
'mapping_quality': read.mapping_quality,
'cigar': read.cigarstring
})
# 保存解析结果
results_df = pd.DataFrame(results)
results_df.to_csv(output_csv, index=False)
logger.info(f"{sample_type}样本解析完成: {len(results_df)} 条有效结果")
return results_df
@timer
def parse_both_samples(ref_bam: Path, alt_bam: Path, output_csv: Path, min_mapq: int = 20,
ref_genome: Optional[Path] = None, original_csv: Optional[Path] = None) -> None:
"""解析两个样本的BAM结果,合并、去重并进行碱基比对验证"""
# 解析参考和变异样本的BAM结果
ref_df = parse_bam_results(ref_bam, "ref", output_csv.parent / "ref_variants.csv", min_mapq)
alt_df = parse_bam_results(alt_bam, "alt", output_csv.parent / "alt_variants.csv", min_mapq)
# 合并两个样本的结果
combined_df = pd.concat([ref_df, alt_df], ignore_index=True)
logger.info(f"合并后原始结果: {len(combined_df)} 条记录")
# 加载原始探针的变异信息(用于后续验证)
probe_variants = preprocess_original_probes(original_csv) if original_csv else {}
# 去重:按优先级排序(1.mapping_quality降序 2.正向链优先级2>反向链优先级1)
# 1. 添加链方向优先级列(正向链+为2,反向链-为1)
combined_df['strand_priority'] = combined_df['strand'].apply(lambda x: 2 if x == '+' else 1)
# 2. 按优先级排序:先按mapping_quality降序,再按strand_priority降序
combined_df = combined_df.sort_values(
by=['mapping_quality', 'strand_priority'],
ascending=[False, False]
)
# 3. 定义去重关键列(同一探针+染色体+基因组位置视为同一变异位点)
dedup_columns = ['probeset_id', 'chromosome', 'genomic_position']
dedup_df = combined_df.drop_duplicates(subset=dedup_columns, keep='first') # 保留优先级最高的记录
# 4. 删除临时优先级列
dedup_df = dedup_df.drop(columns=['strand_priority'])
logger.info(f"去重后结果: 原始{len(combined_df)}条 -> 去重后{len(dedup_df)}条(按mapping_quality和链优先级)")
# 与参考基因组碱基和原始探针变异信息比对,确定参考/变异碱基
if ref_genome and original_csv and not dedup_df.empty and probe_variants:
dedup_df = dedup_df.assign(
ref_base=None, # 参考碱基(来自原始探针)
var_base=None, # 变异碱基(来自原始探针)
match_strand=None # 匹配的链方向(正向+或反向-)
)
match_count = 0
complement_match_count = 0 # 反向互补链匹配计数
no_match_count = 0
for idx, row in dedup_df.iterrows():
# 提取参考基因组对应位置的碱基(正向链)
ref_base = get_reference_base(ref_genome, row['chromosome'], row['genomic_position'])
if not ref_base:
no_match_count += 1
continue
# 获取原始探针的变异等位基因
probe_id = row['probeset_id']
if probe_id not in probe_variants:
no_match_count += 1
continue
orig_ref, orig_alt = probe_variants[probe_id]
matched = False
# 1. 检查正向链碱基是否匹配
if ref_base == orig_ref:
dedup_df.at[idx, 'ref_base'] = orig_ref
dedup_df.at[idx, 'var_base'] = orig_alt
dedup_df.at[idx, 'match_strand'] = '+'
match_count += 1
matched = True
logger.debug(f"探针 {probe_id} 正向链匹配参考等位基因: {orig_ref} (参考碱基: {ref_base})")
elif ref_base == orig_alt:
dedup_df.at[idx, 'ref_base'] = orig_alt
dedup_df.at[idx, 'var_base'] = orig_ref
dedup_df.at[idx, 'match_strand'] = '+'
match_count += 1
matched = True
logger.debug(f"探针 {probe_id} 正向链匹配变异等位基因: {orig_alt} (参考碱基: {ref_base})")
# 2. 若正向链不匹配,检查反向链互补碱基
if not matched:
complement_ref_base = get_complement_base(ref_base) # 计算反向互补碱基
if complement_ref_base == orig_ref:
dedup_df.at[idx, 'ref_base'] = orig_ref
dedup_df.at[idx, 'var_base'] = orig_alt
dedup_df.at[idx, 'match_strand'] = '-' # 标记为反向链匹配
complement_match_count += 1
matched = True
logger.debug(f"探针 {probe_id} 反向链匹配参考等位基因: {orig_ref} (互补碱基: {complement_ref_base})")
elif complement_ref_base == orig_alt:
dedup_df.at[idx, 'ref_base'] = orig_alt
dedup_df.at[idx, 'var_base'] = orig_ref
dedup_df.at[idx, 'match_strand'] = '-' # 标记为反向链匹配
complement_match_count += 1
matched = True
logger.debug(f"探针 {probe_id} 反向链匹配变异等位基因: {orig_alt} (互补碱基: {complement_ref_base})")
# 3. 仍不匹配(未匹配到参考或变异等位基因)
if not matched:
no_match_count += 1
logger.debug(f"探针 {probe_id} 正反链均不匹配: {ref_base}(+)/{get_complement_base(ref_base)}(-) vs {orig_ref}/{orig_alt}")
logger.info(f"碱基匹配统计: 总记录 {len(dedup_df)}, "
f"正向链匹配 {match_count}, "
f"反向链互补匹配 {complement_match_count}, "
f"均不匹配 {no_match_count}")
# 保存最终结果
dedup_df.to_csv(output_csv, index=False)
logger.info(f"最终结果保存至: {output_csv},共 {len(dedup_df)} 条记录")
def cleanup_files(files: List[Path]) -> None:
"""清理临时文件(FASTQ和SAM,节省磁盘空间)"""
for file in files:
if file.exists():
logger.info(f"清理临时文件: {file}")
file.unlink()
def main():
"""主函数:解析命令行参数并执行分析流程"""
parser = argparse.ArgumentParser(
description='小麦660K芯片变异定位工具(双端模式)\n\n'
'功能:通过比对探针序列与参考基因组,精确定位小麦660K芯片中SNP和Indel变异的基因组位置,\n'
'并按优先级(1.比对质量 2.正向链匹配)去重,最终生成标准化的变异位点信息。',
formatter_class=argparse.RawTextHelpFormatter # 保留换行符,使帮助信息更易读
)
# 添加必需参数
required_args = parser.add_argument_group('必需参数')
required_args.add_argument('--input-csv', required=True, type=Path,
help='输入CSV文件路径(必须包含probeset_id和Flank列)\n'
'• probeset_id: 探针唯一标识符\n'
'• Flank: 探针序列,需包含变异标记(如ATCG[C/T]GATT)\n'
'变异标记格式:[ref/alt],其中ref为参考碱基,alt为变异碱基')
required_args.add_argument('--ref-index', required=True,
help='参考基因组BWA索引前缀(如/path/to/wheat_ref.fa)\n'
'预处理要求:\n'
'1. 使用bwa index构建索引:\n'
' bwa index /path/to/wheat_ref.fa\n'
'2. 生成的索引文件包括:.bwt, .pac, .ann, .amb, .sa')
required_args.add_argument('--ref-genome', required=True, type=Path,
help='参考基因组FASTA文件路径(如/path/to/wheat_ref.fa)\n'
'预处理要求:\n'
'1. 使用samtools faidx构建索引:\n'
' samtools faidx /path/to/wheat_ref.fa\n'
'2. 生成的索引文件为:wheat_ref.fa.fai')
# 添加可选参数
optional_args = parser.add_argument_group('可选参数')
optional_args.add_argument('--output-dir', type=Path, default=Path('660Kresult'),
help='输出结果目录(默认:当前目录下的660Kresult)\n'
'结果文件包括:\n'
'• variant_positions_combined.csv: 最终变异定位结果\n'
'• 中间文件:FASTQ、SAM、BAM及索引文件')
optional_args.add_argument('--threads', type=int, default=8,
help='并行处理线程数(默认:8)\n'
'取值范围:1-32\n'
'建议设置:不超过CPU核心数,过大会增加内存占用')
optional_args.add_argument('--cleanup', action='store_true',
help='运行完成后清理临时文件(FASTQ和SAM)\n'
'临时文件约占总空间的80%,建议最终运行时启用,调试时禁用')
optional_args.add_argument('--verbose', action='store_true',
help='显示详细日志信息(包括调试级别的匹配细节)\n'
'用于排查错误,正常运行可省略')
optional_args.add_argument('--relax', action='store_true',
help='使用放宽的BWA比对参数(适合短探针或低复杂度序列)\n'
'默认参数适合长度>50bp的探针,短探针建议启用此选项\n'
'注意:放宽参数可能增加假阳性结果')
optional_args.add_argument('--min-mapq', type=int, default=10,
help='最低比对质量阈值(默认:10)\n'
'取值范围:0-60\n'
'• 值越高,筛选越严格(如20+)\n'
'• 值越低,保留更多比对(如5-10)\n'
'建议根据数据质量调整,高质量数据可设为20')
# 添加使用示例
usage_examples = parser.add_argument_group('使用示例')
usage_examples.add_argument('--examples', action='store_true', help='显示使用示例并退出')
args = parser.parse_args()
# 显示使用示例
if args.examples:
print("""
小麦660K芯片变异定位工具使用示例:
1. 基本用法:
python wheat_660k_variant_mapper.py \\
--input-csv wheat_660k_probes.csv \\
--ref-index /path/to/wheat_ref.fa \\
--ref-genome /path/to/wheat_ref.fa \\
--output-dir results \\
--threads 16
2. 放宽比对参数(适合短探针):
python wheat_660k_variant_mapper.py \\
--input-csv wheat_660k_probes.csv \\
--ref-index /path/to/wheat_ref.fa \\
--ref-genome /path/to/wheat_ref.fa \\
--relax \\
--min-mapq 20
3. 启用详细日志并清理临时文件:
python wheat_660k_variant_mapper.py \\
--input-csv wheat_660k_probes.csv \\
--ref-index /path/to/wheat_ref.fa \\
--ref-genome /path/to/wheat_ref.fa \\
--verbose \\
--cleanup
4. 完整参数示例:
python wheat_660k_variant_mapper.py \\
--input-csv wheat_660k_probes.csv \\
--ref-index /path/to/wheat_ref.fa \\
--ref-genome /path/to/wheat_ref.fa \\
--output-dir wheat_660k_mapping \\
--threads 24 \\
--relax \\
--min-mapq 20 \\
--cleanup \\
--verbose
""")
sys.exit(0)
# 验证必需参数
if not args.input_csv or not args.ref_index or not args.ref_genome:
parser.error("必需参数缺失: --input-csv, --ref-index, --ref-genome")
# 配置日志级别
if args.verbose:
logger.setLevel(logging.DEBUG)
logger.debug("启用详细日志模式")
# 验证输入文件是否存在
if not args.input_csv.exists():
logger.error(f"输入CSV文件不存在: {args.input_csv}")
sys.exit(1)
if not args.ref_genome.exists():
logger.error(f"参考基因组文件不存在: {args.ref_genome}")
sys.exit(1)
# 验证参考基因组是否有FAI索引
fai_index = Path(f"{args.ref_genome}.fai")
if not fai_index.exists():
logger.warning(f"参考基因组FAI索引不存在,尝试创建...")
try:
subprocess.run(['samtools', 'faidx', str(args.ref_genome)], check=True)
logger.info(f"成功创建FAI索引: {fai_index}")
except subprocess.CalledProcessError:
logger.error("无法创建FAI索引,请手动执行: samtools faidx /path/to/wheat_ref.fa")
sys.exit(1)
# 确保输出目录存在
args.output_dir.mkdir(parents=True, exist_ok=True)
logger.info(f"输出目录: {args.output_dir}")
# 记录命令行参数
logger.info(f"运行参数:")
logger.info(f" 输入CSV: {args.input_csv}")
logger.info(f" 参考索引: {args.ref_index}")
logger.info(f" 参考基因组: {args.ref_genome}")
logger.info(f" 输出目录: {args.output_dir}")
logger.info(f" 线程数: {args.threads}")
logger.info(f" 宽松比对: {args.relax}")
logger.info(f" 最小比对质量: {args.min_mapq}")
logger.info(f" 清理临时文件: {args.cleanup}")
logger.info(f" 详细日志: {args.verbose}")
try:
# 1. 生成双端FASTQ文件
ref_r1, ref_r2, alt_r1, alt_r2 = generate_fastq(
args.input_csv,
args.output_dir,
args.threads
)
# 2. 运行BWA比对
ref_sam, alt_sam = run_bwa_samples(
ref_r1, ref_r2, alt_r1, alt_r2,
args.ref_index, args.output_dir,
args.threads, args.relax
)
# 3. 转换SAM为BAM
ref_bam, alt_bam = convert_samples_to_bam(
ref_sam, alt_sam, args.output_dir
)
# 4. 解析BAM结果并生成最终变异位点
final_output = args.output_dir / "variant_positions_combined.csv"
parse_both_samples(
ref_bam, alt_bam, final_output,
args.min_mapq, args.ref_genome, args.input_csv
)
# 5. 清理临时文件
if args.cleanup:
cleanup_files([ref_r1, ref_r2, alt_r1, alt_r2, ref_sam, alt_sam])
logger.info(f"分析完成!最终结果保存在: {final_output}")
except Exception as e:
logger.error(f"程序执行失败: {str(e)}")
sys.exit(1)
if __name__ == "__main__":
main()######文件格式如下,
probeset_id,Flank
AX-108725169,CAGCATTGCTCTTAGTAAAACGGATATTTTGAGTG[A/T]ATTACATCTGCAATCCTTAAACTTAAGCAGTGCAC
AX-108725176,GGCGTTGAGAATTTACTAGGTGAAATCGAAGACTA[A/G]CAAAACTAATATAAATTGCAGCTTAGTGGAGGCCA
AX-108725179,ACACCAACCAACAATTCGGTGTATCTGACAAAACT[A/G]AAAAATAGCATAACACTAGTCTTTGCACAACAAAA
AX-108725180,GGTGTGTTTTCTCTTTCAAACCGATTATCATCCAA[A/T]TCTATTTCCAGGCGAAAGGATCTATCAGGACTTTC
AX-108725184,AAACCATTTGGATTTATGTGGAAAAAAAGCAACTA[C/T]GATTTCGGCTCTCTCGTGGAAAACTATATGTATTT
AX-108725185,CTTGTGTCAACTTAATTTGCCATTAAAAAAGCGTT[A/C]GCTATGGATTCTTCGCCGTTAAAGACAGATAAGAA
AX-108725186,TTTTTAGTGTTTTCTCCAAGAAATAATGAGTGAAA[C/T]TTAGTCATTTGCCAACGAAGTTATGCAACTCACCG
AX-108725189,GTGACGAGTAGAAATATCTAGAGTTAAGACTGCAA[A/C]TCATCTTCATCAATAGTACAAGGCAAGTTCACACT