
位置信息
import os
import sys
import argparse
from Bio import SeqIO
def read_cer_file(cer_path):
"""读取cer文件,返回有效样本信息(样本名、染色体、区间)"""
samples = []
# 检查cer文件是否存在
if not os.path.exists(cer_path):
print(f"错误:cer文件不存在 → {cer_path}")
sys.exit(1)
with open(cer_path, 'r') as f:
header = f.readline() # 跳过表头(若无需跳过可删除此句)
line_num = 1
for line in f:
line_num += 1
line = line.strip()
if not line:
continue # 跳过空行
parts = line.split()
if len(parts) < 5:
print(f"警告:行 {line_num} 列数不足(需≥5列),跳过 → {line}")
continue
# 解析并校验数值(起始、终止、染色体总长)
try:
start = int(parts[2])
end = int(parts[3])
chrom_length = int(parts[4])
except ValueError as e:
print(f"警告:行 {line_num} 数值无效({e}),跳过 → {line}")
continue
# 确保起始位置小于终止位置
if start >= end:
print(f"警告:行 {line_num} 起始≥终止,已交换 → 原{start}-{end} → 新{end}-{start}")
start, end = end, start
samples.append({
'sample_name': parts[0],
'chromosome': parts[1],
'start': start,
'end': end,
'chrom_length': chrom_length
})
if not samples:
print(f"错误:从cer文件中未读取到有效样本信息 → {cer_path}")
sys.exit(1)
return samples
def extract_sequence(record, start, end):
"""提取序列片段(处理区间超出序列长度的情况)"""
seq_total_len = len(record.seq)
# 修正区间:确保不超出序列实际长度(1-based)
actual_start = max(1, start)
actual_end = min(seq_total_len, end)
# 若修正后区间无效(起始>终止),返回失败
if actual_start > actual_end:
print(f"警告:序列区间无效 → 原{start}-{end},序列总长{seq_total_len},无法提取")
return None, (start, end, False)
# 提取子序列(Python切片为0-based,需减1)
sub_seq = record.seq[actual_start - 1:actual_end]
return sub_seq, (actual_start, actual_end, True)
def extract_and_merge_sequences(samples, fa_dir, output_single_dir, output_merged_dir):
"""
提取样本序列:
1. 为每个染色体生成单独序列文件(output_single_dir)
2. 将同一样本的所有染色体序列合并到一个文件(output_merged_dir)
"""
# 创建输出目录(不存在则新建)
os.makedirs(output_single_dir, exist_ok=True)
os.makedirs(output_merged_dir, exist_ok=True)
# 用于分组存储同一样本的所有染色体序列(键:样本名,值:SeqRecord列表)
sample_seq_groups = {}
for sample in samples:
s_name = sample['sample_name']
chrom = sample['chromosome']
start = sample['start']
end = sample['end']
# 1. 查找当前样本的FASTA文件(支持多种常见命名格式)
possible_fasta_names = [
f"{s_name}.fasta", f"{s_name}.fa", # 样本整体FASTA(含所有染色体)
f"{s_name}_{chrom}.fasta", f"{s_name}_{chrom}.fa" # 样本-染色体单独FASTA
]
fasta_path = None
for fn in possible_fasta_names:
test_path = os.path.join(fa_dir, fn)
if os.path.exists(test_path):
fasta_path = test_path
break
# 若未找到FASTA文件,跳过当前样本-染色体
if not fasta_path:
print(f"警告:未找到样本 {s_name} 的FASTA文件(已尝试{len(possible_fasta_names)}种格式),跳过染色体 {chrom}")
continue
# 2. 加载FASTA文件并匹配染色体
try:
# 读取所有序列,用字典存储(键:序列ID,值:SeqRecord)
seq_records = {rec.id: rec for rec in SeqIO.parse(fasta_path, 'fasta')}
except Exception as e:
print(f"错误:加载FASTA文件失败 → {fasta_path},{e},跳过染色体 {chrom}")
continue
# 匹配染色体(优先精确匹配,其次模糊匹配)
chrom_id = None
# 精确匹配:染色体名完全等于序列ID
if chrom in seq_records:
chrom_id = chrom
# 模糊匹配:序列ID中包含染色体名(如"Chr01"在"IR64_Chr01"中)
else:
for rec_id in seq_records:
if chrom in rec_id:
chrom_id = rec_id
break
# 若未匹配到染色体,跳过
if not chrom_id:
print(f"警告:样本 {s_name} 的FASTA中未找到染色体 {chrom},跳过")
continue
# 3. 提取染色体的目标区间序列
full_seq_rec = seq_records[chrom_id]
sub_seq, (actual_start, actual_end, is_valid) = extract_sequence(full_seq_rec, start, end)
if not is_valid:
continue
# 4. 生成单个染色体的序列文件
sub_seq_len = len(sub_seq)
single_fn = f"{s_name}_{chrom}_{actual_start}-{actual_end}.fasta"
single_path = os.path.join(output_single_dir, single_fn)
# 写入FASTA(描述包含实际区间和长度)
with open(single_path, 'w') as f:
desc = f"{s_name}_{chrom} | 实际区间:{actual_start}-{actual_end} | 长度:{sub_seq_len}bp"
f.write(f">{desc}\n")
# 按80bp换行(符合FASTA标准格式)
for i in range(0, sub_seq_len, 80):
f.write(str(sub_seq[i:i+80]) + "\n")
print(f"已生成单独序列文件 → {single_path}")
# 5. 将当前染色体序列加入样本分组(用于后续合并)
# 创建SeqRecord对象(包含完整元信息)
merged_rec = SeqIO.SeqRecord(
seq=sub_seq,
id=f"{s_name}_{chrom}_{actual_start}-{actual_end}", # 唯一ID
description=f"染色体:{chrom} | 原始区间:{start}-{end} | 实际区间:{actual_start}-{actual_end} | 长度:{sub_seq_len}bp"
)
# 加入分组(若样本首次出现,初始化列表)
if s_name not in sample_seq_groups:
sample_seq_groups[s_name] = []
sample_seq_groups[s_name].append(merged_rec)
# 6. 合并同一样本的所有染色体序列并生成文件
if not sample_seq_groups:
print("警告:未收集到任何可合并的样本序列")
return
for s_name, seq_records in sample_seq_groups.items():
merged_fn = f"{s_name}_merged.fasta"
merged_path = os.path.join(output_merged_dir, merged_fn)
# 写入合并文件
SeqIO.write(seq_records, merged_path, "fasta")
print(f"已生成样本合并文件 → {merged_path}(含{len(seq_records)}条染色体序列)")
print(f"\n=== 序列提取完成 ===")
print(f"单独染色体文件总数:{sum(len(recs) for recs in sample_seq_groups.values())}")
print(f"合并样本文件总数:{len(sample_seq_groups)}")
print(f"单独文件目录:{output_single_dir}")
print(f"合并文件目录:{output_merged_dir}")
def main():
# 解析命令行参数
parser = argparse.ArgumentParser(description='仅提取样本序列并合并同一样本到单个文件')
# 必选参数
parser.add_argument('cer_file', help='cer文件路径(格式:样本名 染色体 起始 终止 染色体总长)')
parser.add_argument('fa_dir', help='样本FASTA文件所在文件夹路径')
# 可选参数(输出目录)
parser.add_argument('-o', '--output', default='sequence_extraction_results',
help='总输出目录(默认:sequence_extraction_results)')
args = parser.parse_args()
# 定义子输出目录(总目录下分两个子目录,结构更清晰)
output_single_dir = os.path.join(args.output, 'single_chromosome_sequences') # 单个染色体文件
output_merged_dir = os.path.join(args.output, 'merged_sample_sequences') # 样本合并文件
# 核心流程:读取cer → 提取并合并序列
print("=== 步骤1:读取cer文件 ===")
samples = read_cer_file(args.cer_file)
print(f"成功读取 {len(samples)} 个有效样本-染色体区间\n")
print("=== 步骤2:提取序列并合并同一样本 ===")
extract_and_merge_sequences(samples, args.fa_dir, output_single_dir, output_merged_dir)
if __name__ == "__main__":
main()