指定位置序列的提取

位置信息
import os
import sys
import argparse
from Bio import SeqIO


def read_cer_file(cer_path):
    """读取cer文件,返回有效样本信息(样本名、染色体、区间)"""
    samples = []
    # 检查cer文件是否存在
    if not os.path.exists(cer_path):
        print(f"错误:cer文件不存在 → {cer_path}")
        sys.exit(1)
    
    with open(cer_path, 'r') as f:
        header = f.readline()  # 跳过表头(若无需跳过可删除此句)
        line_num = 1
        for line in f:
            line_num += 1
            line = line.strip()
            if not line:
                continue  # 跳过空行
            
            parts = line.split()
            if len(parts) < 5:
                print(f"警告:行 {line_num} 列数不足(需≥5列),跳过 → {line}")
                continue
            
            # 解析并校验数值(起始、终止、染色体总长)
            try:
                start = int(parts[2])
                end = int(parts[3])
                chrom_length = int(parts[4])
            except ValueError as e:
                print(f"警告:行 {line_num} 数值无效({e}),跳过 → {line}")
                continue
            
            # 确保起始位置小于终止位置
            if start >= end:
                print(f"警告:行 {line_num} 起始≥终止,已交换 → 原{start}-{end} → 新{end}-{start}")
                start, end = end, start
            
            samples.append({
                'sample_name': parts[0],
                'chromosome': parts[1],
                'start': start,
                'end': end,
                'chrom_length': chrom_length
            })
    
    if not samples:
        print(f"错误:从cer文件中未读取到有效样本信息 → {cer_path}")
        sys.exit(1)
    return samples


def extract_sequence(record, start, end):
    """提取序列片段(处理区间超出序列长度的情况)"""
    seq_total_len = len(record.seq)
    # 修正区间:确保不超出序列实际长度(1-based)
    actual_start = max(1, start)
    actual_end = min(seq_total_len, end)
    
    # 若修正后区间无效(起始>终止),返回失败
    if actual_start > actual_end:
        print(f"警告:序列区间无效 → 原{start}-{end},序列总长{seq_total_len},无法提取")
        return None, (start, end, False)
    
    # 提取子序列(Python切片为0-based,需减1)
    sub_seq = record.seq[actual_start - 1:actual_end]
    return sub_seq, (actual_start, actual_end, True)


def extract_and_merge_sequences(samples, fa_dir, output_single_dir, output_merged_dir):
    """
    提取样本序列:
    1. 为每个染色体生成单独序列文件(output_single_dir)
    2. 将同一样本的所有染色体序列合并到一个文件(output_merged_dir)
    """
    # 创建输出目录(不存在则新建)
    os.makedirs(output_single_dir, exist_ok=True)
    os.makedirs(output_merged_dir, exist_ok=True)
    
    # 用于分组存储同一样本的所有染色体序列(键:样本名,值:SeqRecord列表)
    sample_seq_groups = {}
    
    for sample in samples:
        s_name = sample['sample_name']
        chrom = sample['chromosome']
        start = sample['start']
        end = sample['end']
        
        # 1. 查找当前样本的FASTA文件(支持多种常见命名格式)
        possible_fasta_names = [
            f"{s_name}.fasta", f"{s_name}.fa",  # 样本整体FASTA(含所有染色体)
            f"{s_name}_{chrom}.fasta", f"{s_name}_{chrom}.fa"  # 样本-染色体单独FASTA
        ]
        fasta_path = None
        for fn in possible_fasta_names:
            test_path = os.path.join(fa_dir, fn)
            if os.path.exists(test_path):
                fasta_path = test_path
                break
        
        # 若未找到FASTA文件,跳过当前样本-染色体
        if not fasta_path:
            print(f"警告:未找到样本 {s_name} 的FASTA文件(已尝试{len(possible_fasta_names)}种格式),跳过染色体 {chrom}")
            continue
        
        # 2. 加载FASTA文件并匹配染色体
        try:
            # 读取所有序列,用字典存储(键:序列ID,值:SeqRecord)
            seq_records = {rec.id: rec for rec in SeqIO.parse(fasta_path, 'fasta')}
        except Exception as e:
            print(f"错误:加载FASTA文件失败 → {fasta_path},{e},跳过染色体 {chrom}")
            continue
        
        # 匹配染色体(优先精确匹配,其次模糊匹配)
        chrom_id = None
        # 精确匹配:染色体名完全等于序列ID
        if chrom in seq_records:
            chrom_id = chrom
        # 模糊匹配:序列ID中包含染色体名(如"Chr01"在"IR64_Chr01"中)
        else:
            for rec_id in seq_records:
                if chrom in rec_id:
                    chrom_id = rec_id
                    break
        
        # 若未匹配到染色体,跳过
        if not chrom_id:
            print(f"警告:样本 {s_name} 的FASTA中未找到染色体 {chrom},跳过")
            continue
        
        # 3. 提取染色体的目标区间序列
        full_seq_rec = seq_records[chrom_id]
        sub_seq, (actual_start, actual_end, is_valid) = extract_sequence(full_seq_rec, start, end)
        if not is_valid:
            continue
        
        # 4. 生成单个染色体的序列文件
        sub_seq_len = len(sub_seq)
        single_fn = f"{s_name}_{chrom}_{actual_start}-{actual_end}.fasta"
        single_path = os.path.join(output_single_dir, single_fn)
        # 写入FASTA(描述包含实际区间和长度)
        with open(single_path, 'w') as f:
            desc = f"{s_name}_{chrom} | 实际区间:{actual_start}-{actual_end} | 长度:{sub_seq_len}bp"
            f.write(f">{desc}\n")
            # 按80bp换行(符合FASTA标准格式)
            for i in range(0, sub_seq_len, 80):
                f.write(str(sub_seq[i:i+80]) + "\n")
        print(f"已生成单独序列文件 → {single_path}")
        
        # 5. 将当前染色体序列加入样本分组(用于后续合并)
        # 创建SeqRecord对象(包含完整元信息)
        merged_rec = SeqIO.SeqRecord(
            seq=sub_seq,
            id=f"{s_name}_{chrom}_{actual_start}-{actual_end}",  # 唯一ID
            description=f"染色体:{chrom} | 原始区间:{start}-{end} | 实际区间:{actual_start}-{actual_end} | 长度:{sub_seq_len}bp"
        )
        # 加入分组(若样本首次出现,初始化列表)
        if s_name not in sample_seq_groups:
            sample_seq_groups[s_name] = []
        sample_seq_groups[s_name].append(merged_rec)
    
    # 6. 合并同一样本的所有染色体序列并生成文件
    if not sample_seq_groups:
        print("警告:未收集到任何可合并的样本序列")
        return
    
    for s_name, seq_records in sample_seq_groups.items():
        merged_fn = f"{s_name}_merged.fasta"
        merged_path = os.path.join(output_merged_dir, merged_fn)
        # 写入合并文件
        SeqIO.write(seq_records, merged_path, "fasta")
        print(f"已生成样本合并文件 → {merged_path}(含{len(seq_records)}条染色体序列)")
    
    print(f"\n=== 序列提取完成 ===")
    print(f"单独染色体文件总数:{sum(len(recs) for recs in sample_seq_groups.values())}")
    print(f"合并样本文件总数:{len(sample_seq_groups)}")
    print(f"单独文件目录:{output_single_dir}")
    print(f"合并文件目录:{output_merged_dir}")


def main():
    # 解析命令行参数
    parser = argparse.ArgumentParser(description='仅提取样本序列并合并同一样本到单个文件')
    # 必选参数
    parser.add_argument('cer_file', help='cer文件路径(格式:样本名 染色体 起始 终止 染色体总长)')
    parser.add_argument('fa_dir', help='样本FASTA文件所在文件夹路径')
    # 可选参数(输出目录)
    parser.add_argument('-o', '--output', default='sequence_extraction_results', 
                        help='总输出目录(默认:sequence_extraction_results)')
    args = parser.parse_args()

    # 定义子输出目录(总目录下分两个子目录,结构更清晰)
    output_single_dir = os.path.join(args.output, 'single_chromosome_sequences')  # 单个染色体文件
    output_merged_dir = os.path.join(args.output, 'merged_sample_sequences')      # 样本合并文件

    # 核心流程:读取cer → 提取并合并序列
    print("=== 步骤1:读取cer文件 ===")
    samples = read_cer_file(args.cer_file)
    print(f"成功读取 {len(samples)} 个有效样本-染色体区间\n")

    print("=== 步骤2:提取序列并合并同一样本 ===")
    extract_and_merge_sequences(samples, args.fa_dir, output_single_dir, output_merged_dir)


if __name__ == "__main__":
    main()
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容