基因有效位点构建单倍型

移除缺失位点,基于有效位点构建单倍型,统计每个单倍型的位点数量
结果文件中可以看出,保留了缺失位点

核心实现说明(位点个数列 site_count):
数据收集:
1、用 haplotype_sites 字典记录每个单倍型实际保留的位点列表(retained_sites),这些位点是排除了缺失数据后的有效位点。
2、位点个数计算逻辑:
对于正常单倍型:site_count = len(retained_sites),即有效位点的数量
对于全缺失单倍型(所有位点都缺失):site_count = 0(明确标记无有效位点)
3、结果写入:
在输出文件(*_haplotypes.tsv 和 *_sample_haplotypes.tsv)中,通过新增的 site_count 列写入计算结果,表头明确包含该列

运行脚本后,输出文件中将清晰展示每个单倍型的位点数量,例如:

gene_id  orig_geneID  Merged_Anno  retained_sites  site_count  haplotype  frequency  count
GeneA    GeneA_123    FunctionX    10001,10005,10010  3        10001:A/T;10005:G/G;10010:C/T  0.65  13

其中 site_count 列直接显示该单倍型由 3 个有效位点组成。

import os
import sys
import pandas as pd
from collections import defaultdict

def identify_haplotypes(
    input_file, 
    output_prefix=None, 
    anno_col_name='Anno', 
    merge_sep=';', 
    orig_geneID_col_name='orig_geneID',
    gene_id_col_name='gene_id'
):
    """
    识别单倍型并添加位点个数统计列(site_count)
    功能:移除缺失位点,基于有效位点构建单倍型,统计每个单倍型的位点数量
    """
    # 读取输入文件
    print(f"正在读取文件: {input_file}")
    try:
        df = pd.read_csv(input_file, sep='\t')
        
        # 检查必要列是否存在
        required_cols = [gene_id_col_name, orig_geneID_col_name, anno_col_name, 'REF', 'ALT', 'pos']
        for col in required_cols:
            if col not in df.columns:
                print(f"错误: 未找到列 '{col}',请检查输入文件")
                return None
        # 获取样本列(假设第5-37列为样本列)
        if len(df.columns) < 40:
            print(f"错误: 输入文件列数不足,需至少40列,实际有 {len(df.columns)} 列")
            return None
        sample_cols = df.columns[4:37]
        print(f"检测到 {len(sample_cols)} 个样本列")
        
        # 定义全缺失单倍型标识
        missing_haplotype = ("all_sites_missing",)
        
        # 结果存储结构
        haplotype_results = defaultdict(lambda: defaultdict(int))  # 单倍型计数
        sample_haplotypes = defaultdict(lambda: defaultdict(list))  # 样本-单倍型对应关系
        gene_annotations = defaultdict(set)  # 基因注释
        haplotype_sites = defaultdict(dict)  # 记录每个单倍型的位点列表(用于计算个数)
        
        # 第一步:整合基因注释
        print("整合基因注释信息...")
        for (gene_id, orig_geneID), group in df.groupby([gene_id_col_name, orig_geneID_col_name]):
            annos = group[anno_col_name].dropna().unique()
            valid_annos = [a.strip() for a in annos if str(a).strip() not in ['', 'NA', '.']]
            gene_annotations[(gene_id, orig_geneID)].update(valid_annos)
        
        # 第二步:识别单倍型并统计位点个数
        print("识别单倍型并计算位点个数...")
        for (gene_id, orig_geneID), group in df.groupby([gene_id_col_name, orig_geneID_col_name]):
            if len(group) <= 1:
                continue  # 跳过只有1个SNP的基因(无法形成单倍型)
            
            # 处理每个样本
            for sample in sample_cols:
                genotypes = group[sample].tolist()
                refs = group['REF'].tolist()
                alts = group['ALT'].tolist()
                positions = group['pos'].tolist()
                
                retained_sites = []  # 保留的有效位点(排除缺失)
                haplotype_parts = []  # 单倍型组成部分
                
                # 遍历每个位点,仅保留非缺失位点
                for genotype, ref, alt, pos in zip(genotypes, refs, alts, positions):
                    if pd.isna(genotype) or genotype in ['.', './.', '.|.']:
                        continue  # 跳过缺失位点
                    
                    # 解析基因型为实际碱基
                    alleles = genotype.split('/')
                    base_alleles = [ref if a == '0' else alt for a in alleles]
                    haplotype_parts.append(f"{pos}:{'/'.join(base_alleles)}")
                    retained_sites.append(pos)  # 记录有效位点
                
                # 构建单倍型键(区分全缺失和正常单倍型)
                if not haplotype_parts:  # 所有位点都缺失
                    hap_key = missing_haplotype
                else:
                    # 按位置排序确保单倍型一致性
                    sorted_pairs = sorted(zip(retained_sites, haplotype_parts), key=lambda x: x[0])
                    sorted_sites, sorted_parts = zip(*sorted_pairs)
                    hap_key = (tuple(sorted_parts),)
                    retained_sites = sorted_sites  # 更新为排序后的位点列表
                
                # 更新统计结果
                haplotype_results[(gene_id, orig_geneID)][hap_key] += 1
                sample_haplotypes[(gene_id, orig_geneID)][hap_key].append(sample)
                haplotype_sites[(gene_id, orig_geneID)][hap_key] = retained_sites  # 保存位点列表(用于计算个数)
        
        # 计算单倍型频率
        haplotype_frequencies = {
            (g, o): {h: c/sum(hs.values()) for h, c in hs.items()} 
            for (g, o), hs in haplotype_results.items()
        }
        
        # 输出结果(包含site_count列)
        if output_prefix:
            # 1. 单倍型汇总文件(含site_count)
            summary_file = f"{output_prefix}_haplotypes.tsv"
            with open(summary_file, 'w') as f:
                # 表头添加site_count列
                f.write("gene_id\torig_geneID\tMerged_Anno\tretained_sites\tsite_count\thaplotype\tfrequency\tcount\n")
                
                for (gene_id, orig_geneID), haplotypes in haplotype_results.items():
                    merged_anno = merge_sep.join(sorted(gene_annotations[(gene_id, orig_geneID)])) if gene_annotations[(gene_id, orig_geneID)] else 'NA'
                    
                    for hap, count in sorted(haplotypes.items(), key=lambda x: -x[1]):
                        freq = haplotype_frequencies[(gene_id, orig_geneID)][hap]
                        
                        # 计算位点个数(核心步骤)
                        if hap == missing_haplotype:
                            # 全缺失单倍型位点个数为0
                            site_count = 0
                            sites_str = "none"
                            hap_str = "all_sites_missing"
                        else:
                            # 正常单倍型位点个数 = 保留位点的长度
                            sites = haplotype_sites[(gene_id, orig_geneID)][hap]
                            site_count = len(sites)  # 位点个数计算
                            sites_str = ",".join(map(str, sites))
                            hap_str = ";".join(hap[0])
                        
                        # 写入包含site_count的行
                        f.write(f"{gene_id}\t{orig_geneID}\t{merged_anno}\t{sites_str}\t{site_count}\t{hap_str}\t{freq:.4f}\t{count}\n")
            print(f"单倍型汇总文件已保存: {summary_file}")
            
            # 2. 样本-单倍型关系文件(含site_count)
            sample_file = f"{output_prefix}_sample_haplotypes.tsv"
            with open(sample_file, 'w') as f:
                header = "gene_id\torig_geneID\tMerged_Anno\tretained_sites\tsite_count\thaplotype\tfrequency\tcount\t" + "\t".join(sample_cols) + "\n"
                f.write(header)
                
                for (gene_id, orig_geneID), haplotypes in haplotype_results.items():
                    merged_anno = merge_sep.join(sorted(gene_annotations[(gene_id, orig_geneID)])) if gene_annotations[(gene_id, orig_geneID)] else 'NA'
                    
                    for hap, count in sorted(haplotypes.items(), key=lambda x: -x[1]):
                        freq = haplotype_frequencies[(gene_id, orig_geneID)][hap]
                        samples_with_hap = sample_haplotypes[(gene_id, orig_geneID)][hap]
                        
                        # 计算位点个数(与汇总文件逻辑一致)
                        if hap == missing_haplotype:
                            site_count = 0
                            sites_str = "none"
                            hap_str = "all_sites_missing"
                        else:
                            sites = haplotype_sites[(gene_id, orig_geneID)][hap]
                            site_count = len(sites)  # 位点个数计算
                            sites_str = ",".join(map(str, sites))
                            hap_str = ";".join(hap[0])
                        
                        # 生成样本数据列
                        sample_data = ['all_missing' if s in samples_with_hap and hap == missing_haplotype 
                                      else '1' if s in samples_with_hap 
                                      else '0' for s in sample_cols]
                        
                        # 写入行数据
                        line = f"{gene_id}\t{orig_geneID}\t{merged_anno}\t{sites_str}\t{site_count}\t{hap_str}\t{freq:.4f}\t{count}\t" + "\t".join(sample_data) + "\n"
                        f.write(line)
            print(f"样本-单倍型关系文件已保存: {sample_file}")
        
        return haplotype_frequencies, sample_haplotypes, gene_annotations, haplotype_sites
    
    except Exception as e:
        print(f"处理出错: {e}")
        return None, None, None, None

if __name__ == "__main__":
    if len(sys.argv) < 2:
        print("用法: python haplotype_analysis.py <输入文件路径> [输出前缀]")
        sys.exit(1)
    
    input_file = sys.argv[1]
    output_prefix = sys.argv[2] if len(sys.argv) > 2 else "haplotype_result"
    
    if not os.path.isfile(input_file):
        print(f"错误: 输入文件 '{input_file}' 不存在")
        sys.exit(1)
    
    # 执行分析
    results = identify_haplotypes(
        input_file, 
        output_prefix, 
        orig_geneID_col_name='orig_geneID',
        gene_id_col_name='gene_id',
        anno_col_name='Anno'
    )
    
    if all(res is not None for res in results):
        print("\n分析完成!结果文件已生成,包含位点个数统计列(site_count)")

更新:
添加missing_sites 列,retained_sites 和 missing_sites 列的数据始终以分号分隔,满足后续数据处理的格式要求。

还有 48% 的精彩内容
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。
支付 ¥8.80 继续阅读

推荐阅读更多精彩内容