#!/usr/bin/env python3
import pandas as pd
import numpy as np
import argparse
import re
from pathlib import Path
def parse_chromosome(chrom):
"""将染色体名称解析为可排序的元组"""
chrom = str(chrom)
match = re.match(r'chr(\d+|X|Y|MT)', chrom, re.IGNORECASE)
if match:
part = match.group(1)
if part.isdigit():
return (1, int(part)) # 数字染色体
else:
return (2, part.upper()) # 性染色体和线粒体
return (3, chrom) # 其他类型的染色体标记
def calculate_sv_stats(input_file, output_file=None, sort_chromosomes=True):
"""
读取SV数据文件,计算各染色体的SV统计信息并输出为CSV
参数:
input_file: 输入文件路径
output_file: 输出文件路径(默认:输入文件名添加_stats.csv后缀)
sort_chromosomes: 是否按染色体编号排序
"""
try:
# 读取文件
df = pd.read_csv(
input_file,
sep='\s+',
header=None,
names=['chromosome', 'start', 'end'],
dtype={'chromosome': str, 'start': int, 'end': int}
)
# 计算每个SV的长度
df['length'] = df['end'] - df['start']
# 按染色体分组计算统计量
chrom_stats = df.groupby('chromosome').agg(
count=('length', 'count'), # SV数量
total_length=('length', 'sum'), # 总长度
mean_length=('length', 'mean'), # 平均长度
sd_length=('length', 'std') # 标准差
).reset_index()
# 计算所有SV的整体统计量(新增功能)
all_sv_mean = df['length'].mean()
all_sv_sd = df['length'].std()
# 计算均值±标准差(保留两位小数)
chrom_stats['mean±sd'] = chrom_stats.apply(
lambda row: f"{row['mean_length']:.2f} ± {row['sd_length']:.2f}",
axis=1
)
# 计算整体统计量
total_count = chrom_stats['count'].sum()
total_length = chrom_stats['total_length'].sum()
# 添加总计行(包含整体均值和标准差)
total_row = pd.DataFrame({
'chromosome': ['Total'],
'count': [total_count],
'total_length': [total_length],
'mean_length': [all_sv_mean],
'sd_length': [all_sv_sd],
'mean±sd': [f"{all_sv_mean:.2f} ± {all_sv_sd:.2f}"]
})
result = pd.concat([chrom_stats, total_row], ignore_index=True)
# 按染色体排序(如果启用)
if sort_chromosomes:
# 提取染色体编号进行排序
chrom_order = {chrom: parse_chromosome(chrom) for chrom in result['chromosome']}
result['sort_key'] = result['chromosome'].map(chrom_order)
# 对染色体进行排序(Total行始终放在最后)
result = result.sort_values(by=['sort_key'], na_position='last')
result = result.drop(columns=['sort_key'])
# 确定输出文件路径
if not output_file:
input_path = Path(input_file)
output_file = f"{input_path.stem}_stats.csv"
# 保存为CSV文件
result.to_csv(output_file, index=False)
print(f"分析完成,结果已保存至: {output_file}")
print("\n各染色体的SV统计结果概览:")
print(result[['chromosome', 'count', 'total_length', 'mean±sd']].to_string(index=False))
return result
except Exception as e:
print(f"处理过程出错: {str(e)}")
return None
def main():
"""命令行入口函数"""
parser = argparse.ArgumentParser(
description='分析结构变异(SV)数据,计算各染色体的数量、总长度、均值±标准差及整体统计量'
)
parser.add_argument('input_file', help='输入SV数据文件路径')
parser.add_argument('-o', '--output', help='输出CSV文件路径')
parser.add_argument('--no-sort', action='store_true', help='不按染色体编号排序')
args = parser.parse_args()
calculate_sv_stats(
args.input_file,
args.output,
sort_chromosomes=not args.no_sort
)
if __name__ == "__main__":
main()
输入文件格式
输出
例:
SV的起止位置是1-2000,则长度为:
2000-1+1=2000