移除缺失位点,基于有效位点构建单倍型,统计每个单倍型的位点数量
结果文件中可以看出,保留了缺失位点
核心实现说明(位点个数列 site_count):
数据收集:
1、用 haplotype_sites 字典记录每个单倍型实际保留的位点列表(retained_sites),这些位点是排除了缺失数据后的有效位点。
2、位点个数计算逻辑:
对于正常单倍型:site_count = len(retained_sites),即有效位点的数量
对于全缺失单倍型(所有位点都缺失):site_count = 0(明确标记无有效位点)
3、结果写入:
在输出文件(*_haplotypes.tsv 和 *_sample_haplotypes.tsv)中,通过新增的 site_count 列写入计算结果,表头明确包含该列
运行脚本后,输出文件中将清晰展示每个单倍型的位点数量,例如:
gene_id orig_geneID Merged_Anno retained_sites site_count haplotype frequency count
GeneA GeneA_123 FunctionX 10001,10005,10010 3 10001:A/T;10005:G/G;10010:C/T 0.65 13
其中 site_count 列直接显示该单倍型由 3 个有效位点组成。
import os
import sys
import pandas as pd
from collections import defaultdict
def identify_haplotypes(
input_file,
output_prefix=None,
anno_col_name='Anno',
merge_sep=';',
orig_geneID_col_name='orig_geneID',
gene_id_col_name='gene_id'
):
"""
识别单倍型并添加位点个数统计列(site_count)
功能:移除缺失位点,基于有效位点构建单倍型,统计每个单倍型的位点数量
"""
# 读取输入文件
print(f"正在读取文件: {input_file}")
try:
df = pd.read_csv(input_file, sep='\t')
# 检查必要列是否存在
required_cols = [gene_id_col_name, orig_geneID_col_name, anno_col_name, 'REF', 'ALT', 'pos']
for col in required_cols:
if col not in df.columns:
print(f"错误: 未找到列 '{col}',请检查输入文件")
return None
# 获取样本列(假设第5-37列为样本列)
if len(df.columns) < 40:
print(f"错误: 输入文件列数不足,需至少40列,实际有 {len(df.columns)} 列")
return None
sample_cols = df.columns[4:37]
print(f"检测到 {len(sample_cols)} 个样本列")
# 定义全缺失单倍型标识
missing_haplotype = ("all_sites_missing",)
# 结果存储结构
haplotype_results = defaultdict(lambda: defaultdict(int)) # 单倍型计数
sample_haplotypes = defaultdict(lambda: defaultdict(list)) # 样本-单倍型对应关系
gene_annotations = defaultdict(set) # 基因注释
haplotype_sites = defaultdict(dict) # 记录每个单倍型的位点列表(用于计算个数)
# 第一步:整合基因注释
print("整合基因注释信息...")
for (gene_id, orig_geneID), group in df.groupby([gene_id_col_name, orig_geneID_col_name]):
annos = group[anno_col_name].dropna().unique()
valid_annos = [a.strip() for a in annos if str(a).strip() not in ['', 'NA', '.']]
gene_annotations[(gene_id, orig_geneID)].update(valid_annos)
# 第二步:识别单倍型并统计位点个数
print("识别单倍型并计算位点个数...")
for (gene_id, orig_geneID), group in df.groupby([gene_id_col_name, orig_geneID_col_name]):
if len(group) <= 1:
continue # 跳过只有1个SNP的基因(无法形成单倍型)
# 处理每个样本
for sample in sample_cols:
genotypes = group[sample].tolist()
refs = group['REF'].tolist()
alts = group['ALT'].tolist()
positions = group['pos'].tolist()
retained_sites = [] # 保留的有效位点(排除缺失)
haplotype_parts = [] # 单倍型组成部分
# 遍历每个位点,仅保留非缺失位点
for genotype, ref, alt, pos in zip(genotypes, refs, alts, positions):
if pd.isna(genotype) or genotype in ['.', './.', '.|.']:
continue # 跳过缺失位点
# 解析基因型为实际碱基
alleles = genotype.split('/')
base_alleles = [ref if a == '0' else alt for a in alleles]
haplotype_parts.append(f"{pos}:{'/'.join(base_alleles)}")
retained_sites.append(pos) # 记录有效位点
# 构建单倍型键(区分全缺失和正常单倍型)
if not haplotype_parts: # 所有位点都缺失
hap_key = missing_haplotype
else:
# 按位置排序确保单倍型一致性
sorted_pairs = sorted(zip(retained_sites, haplotype_parts), key=lambda x: x[0])
sorted_sites, sorted_parts = zip(*sorted_pairs)
hap_key = (tuple(sorted_parts),)
retained_sites = sorted_sites # 更新为排序后的位点列表
# 更新统计结果
haplotype_results[(gene_id, orig_geneID)][hap_key] += 1
sample_haplotypes[(gene_id, orig_geneID)][hap_key].append(sample)
haplotype_sites[(gene_id, orig_geneID)][hap_key] = retained_sites # 保存位点列表(用于计算个数)
# 计算单倍型频率
haplotype_frequencies = {
(g, o): {h: c/sum(hs.values()) for h, c in hs.items()}
for (g, o), hs in haplotype_results.items()
}
# 输出结果(包含site_count列)
if output_prefix:
# 1. 单倍型汇总文件(含site_count)
summary_file = f"{output_prefix}_haplotypes.tsv"
with open(summary_file, 'w') as f:
# 表头添加site_count列
f.write("gene_id\torig_geneID\tMerged_Anno\tretained_sites\tsite_count\thaplotype\tfrequency\tcount\n")
for (gene_id, orig_geneID), haplotypes in haplotype_results.items():
merged_anno = merge_sep.join(sorted(gene_annotations[(gene_id, orig_geneID)])) if gene_annotations[(gene_id, orig_geneID)] else 'NA'
for hap, count in sorted(haplotypes.items(), key=lambda x: -x[1]):
freq = haplotype_frequencies[(gene_id, orig_geneID)][hap]
# 计算位点个数(核心步骤)
if hap == missing_haplotype:
# 全缺失单倍型位点个数为0
site_count = 0
sites_str = "none"
hap_str = "all_sites_missing"
else:
# 正常单倍型位点个数 = 保留位点的长度
sites = haplotype_sites[(gene_id, orig_geneID)][hap]
site_count = len(sites) # 位点个数计算
sites_str = ",".join(map(str, sites))
hap_str = ";".join(hap[0])
# 写入包含site_count的行
f.write(f"{gene_id}\t{orig_geneID}\t{merged_anno}\t{sites_str}\t{site_count}\t{hap_str}\t{freq:.4f}\t{count}\n")
print(f"单倍型汇总文件已保存: {summary_file}")
# 2. 样本-单倍型关系文件(含site_count)
sample_file = f"{output_prefix}_sample_haplotypes.tsv"
with open(sample_file, 'w') as f:
header = "gene_id\torig_geneID\tMerged_Anno\tretained_sites\tsite_count\thaplotype\tfrequency\tcount\t" + "\t".join(sample_cols) + "\n"
f.write(header)
for (gene_id, orig_geneID), haplotypes in haplotype_results.items():
merged_anno = merge_sep.join(sorted(gene_annotations[(gene_id, orig_geneID)])) if gene_annotations[(gene_id, orig_geneID)] else 'NA'
for hap, count in sorted(haplotypes.items(), key=lambda x: -x[1]):
freq = haplotype_frequencies[(gene_id, orig_geneID)][hap]
samples_with_hap = sample_haplotypes[(gene_id, orig_geneID)][hap]
# 计算位点个数(与汇总文件逻辑一致)
if hap == missing_haplotype:
site_count = 0
sites_str = "none"
hap_str = "all_sites_missing"
else:
sites = haplotype_sites[(gene_id, orig_geneID)][hap]
site_count = len(sites) # 位点个数计算
sites_str = ",".join(map(str, sites))
hap_str = ";".join(hap[0])
# 生成样本数据列
sample_data = ['all_missing' if s in samples_with_hap and hap == missing_haplotype
else '1' if s in samples_with_hap
else '0' for s in sample_cols]
# 写入行数据
line = f"{gene_id}\t{orig_geneID}\t{merged_anno}\t{sites_str}\t{site_count}\t{hap_str}\t{freq:.4f}\t{count}\t" + "\t".join(sample_data) + "\n"
f.write(line)
print(f"样本-单倍型关系文件已保存: {sample_file}")
return haplotype_frequencies, sample_haplotypes, gene_annotations, haplotype_sites
except Exception as e:
print(f"处理出错: {e}")
return None, None, None, None
if __name__ == "__main__":
if len(sys.argv) < 2:
print("用法: python haplotype_analysis.py <输入文件路径> [输出前缀]")
sys.exit(1)
input_file = sys.argv[1]
output_prefix = sys.argv[2] if len(sys.argv) > 2 else "haplotype_result"
if not os.path.isfile(input_file):
print(f"错误: 输入文件 '{input_file}' 不存在")
sys.exit(1)
# 执行分析
results = identify_haplotypes(
input_file,
output_prefix,
orig_geneID_col_name='orig_geneID',
gene_id_col_name='gene_id',
anno_col_name='Anno'
)
if all(res is not None for res in results):
print("\n分析完成!结果文件已生成,包含位点个数统计列(site_count)")
更新:
添加missing_sites 列,retained_sites 和 missing_sites 列的数据始终以分号分隔,满足后续数据处理的格式要求。