提取sv信息
import csv
def extract_and_filter_sv(vcf_input_path, csv_output_path):
"""
从VCF文件提取并过滤SV:保留长度≥30bp且类型非BND的SV
:param vcf_input_path: 输入VCF文件路径
:param csv_output_path: 输出CSV文件路径
"""
headers = ["染色体(CHROM)", "起始位置(POS)", "结束位置(END)", "SV类型(SVTYPE)", "长度(bp)"]
with open(vcf_input_path, "r", encoding="utf-8") as vcf_file, \
open(csv_output_path, "w", newline="", encoding="utf-8") as csv_file:
csv_writer = csv.writer(csv_file)
csv_writer.writerow(headers)
for line in vcf_file:
line = line.strip()
if line.startswith("#"):
continue # 跳过注释行
columns = line.split("\t")
chrom = columns[0]
pos = columns[1]
info_field = columns[7]
# 解析INFO字段:提取SVTYPE、END、SVLEN
sv_type = "未知"
end_pos = "未知"
sv_len = None # 用于存储SV长度(优先取SVLEN,无则用END-POS)
info_items = info_field.split(";")
for item in info_items:
if item.startswith("SVTYPE="):
sv_type = item.split("=")[1]
elif item.startswith("END="):
end_pos = item.split("=")[1]
elif item.startswith("SVLEN="):
# SVLEN可能为负数(如DEL表示缺失长度),取绝对值
try:
sv_len = abs(int(item.split("=")[1]))
except (ValueError, IndexError):
sv_len = None # 解析失败则忽略
# 过滤条件1:排除BND类型
if sv_type == "BND":
continue
# 计算SV长度(优先用SVLEN,无则用END-POS)
length = None
if sv_len is not None:
length = sv_len
else:
# 尝试用END-POS计算(需确保pos和end_pos为数字)
try:
pos_int = int(pos)
end_int = int(end_pos)
length = end_int - pos_int + 1 # 包含起止位置的总长度
except (ValueError, TypeError):
# 若pos或end_pos无法转换为数字,视为长度无效
continue
# 过滤条件2:保留长度≥30bp的SV
if length is not None and length >= 30:
csv_writer.writerow([chrom, pos, end_pos, sv_type, length])
print(f"过滤完成!结果已保存至:{csv_output_path}")
# 配置路径
INPUT_VCF = "IR64pb1.vcf" # 输入VCF文件路径
OUTPUT_CSV = "filtered_sv.csv" # 输出过滤后的CSV路径
if __name__ == "__main__":
extract_and_filter_sv(INPUT_VCF, OUTPUT_CSV)

提取序列

绘图
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import os
import argparse
from matplotlib.backends.backend_pdf import PdfPages
import re
# 设置中文显示
plt.rcParams["font.family"] = ["SimHei", "WenQuanYi Micro Hei", "Heiti TC"]
plt.rcParams["axes.unicode_minus"] = False
plt.rcParams["figure.dpi"] = 300
def read_pos1(file_path):
"""读取pos1文件(染色体、start、end、类型)"""
try:
if not os.path.exists(file_path):
raise FileNotFoundError(f"文件不存在: {file_path}")
df = pd.read_csv(
file_path,
sep='\t',
header=None,
dtype=str
)
if df.shape[1] != 4:
raise ValueError(f"pos1文件应为4列,实际为{df.shape[1]}列")
df.columns = ['chrom', 'start', 'end', 'type']
df['start'] = pd.to_numeric(df['start'], errors='coerce')
df['end'] = pd.to_numeric(df['end'], errors='coerce')
invalid = df[df['start'].isna() | df['end'].isna()]
if not invalid.empty:
print(f"pos1文件中有{len(invalid)}行位置转换失败,已排除")
df = df.dropna(subset=['start', 'end'])
df['start'], df['end'] = np.where(
df['start'] > df['end'],
[df['end'], df['start']],
[df['start'], df['end']]
)
df['length'] = df['end'] - df['start'] + 1
return df
except Exception as e:
print(f"读取pos1失败: {e}")
return None
def read_filtered_sv(file_path):
"""读取filtered_sv.csv,自动适配列结构"""
try:
if not os.path.exists(file_path):
raise FileNotFoundError(f"文件不存在: {file_path}")
df = pd.read_csv(
file_path,
sep=',',
header=None,
dtype=str
)
print("\nfiltered_sv.csv前2行预览:")
print(df.head(2))
if df.shape[1] < 4:
raise ValueError(f"filtered_sv.csv至少需要4列,实际为{df.shape[1]}列")
numeric_cols = []
for col in df.columns:
if pd.to_numeric(df[col], errors='coerce').notna().mean() > 0.8:
numeric_cols.append(col)
if len(numeric_cols) < 2:
raise ValueError("未找到足够的数值列(推测为start和end)")
start_col, end_col = numeric_cols[0], numeric_cols[1]
type_candidates = [col for col in df.columns if col not in numeric_cols and col != 0]
type_col = None
for col in type_candidates:
if df[col].str.contains('DEL|INS|DUP|INV', case=False).any():
type_col = col
break
if type_col is None:
type_col = type_candidates[0]
print(f"警告:默认使用第{type_col+1}列作为类型列")
df_parsed = pd.DataFrame({
'chrom': df[0],
'start': pd.to_numeric(df[start_col], errors='coerce'),
'end': pd.to_numeric(df[end_col], errors='coerce'),
'type': df[type_col]
})
invalid = df_parsed[df_parsed['start'].isna() | df_parsed['end'].isna()]
if not invalid.empty:
print(f"filtered_sv.csv中有{len(invalid)}行位置转换失败,已排除")
df_parsed = df_parsed.dropna(subset=['start', 'end'])
df_parsed['start'], df_parsed['end'] = np.where(
df_parsed['start'] > df_parsed['end'],
[df_parsed['end'], df_parsed['start']],
[df_parsed['start'], df_parsed['end']]
)
df_parsed['length'] = df_parsed['end'] - df_parsed['start'] + 1
return df_parsed
except Exception as e:
print(f"读取filtered_sv.csv失败: {e}")
return None
def find_overlapping_regions(pos1_df, sv_df):
"""寻找两个文件中的重叠区间"""
overlaps = []
common_chroms = set(pos1_df['chrom']).intersection(set(sv_df['chrom']))
for chrom in common_chroms:
pos1_chrom = pos1_df[pos1_df['chrom'] == chrom].reset_index(drop=True)
sv_chrom = sv_df[sv_df['chrom'] == chrom].reset_index(drop=True)
for _, pos1_row in pos1_chrom.iterrows():
p_start, p_end = pos1_row['start'], pos1_row['end']
candidate_sv = sv_chrom[
(sv_chrom['start'] <= p_end) &
(sv_chrom['end'] >= p_start)
]
for _, sv_row in candidate_sv.iterrows():
s_start, s_end = sv_row['start'], sv_row['end']
overlap_start = max(p_start, s_start)
overlap_end = min(p_end, s_end)
overlap_length = overlap_end - overlap_start + 1
overlaps.append({
'chrom': chrom,
'pos1_start': p_start, 'pos1_end': p_end, 'pos1_type': pos1_row['type'], 'pos1_length': pos1_row['length'],
'sv_start': s_start, 'sv_end': s_end, 'sv_type': sv_row['type'], 'sv_length': sv_row['length'],
'overlap_start': overlap_start, 'overlap_end': overlap_end, 'overlap_length': overlap_length,
'overlap_ratio_pos1': overlap_length / pos1_row['length'],
'overlap_ratio_sv': overlap_length / sv_row['length']
})
return pd.DataFrame(overlaps)
def sort_chromosomes(chrom_list):
"""将染色体按自然顺序排序(如chr1, chr2, ..., chr22,chrX,chrY)"""
def chrom_key(chrom):
# 提取染色体编号(去除非数字字符)
num_match = re.search(r'\d+', chrom)
if num_match:
return (0, int(num_match.group())) # 数字染色体排在前面
elif 'X' in chrom:
return (1, 0) # X染色体
elif 'Y' in chrom:
return (1, 1) # Y染色体
else:
return (2, chrom) # 其他染色体
return sorted(chrom_list, key=chrom_key)
def generate_chromosome_stats(pos1_df, sv_df, overlap_df, output_dir):
"""生成每条染色体的统计结果"""
# 1. 各染色体区间数量统计
pos1_chrom_counts = pos1_df['chrom'].value_counts().rename('pos1_count').to_frame()
sv_chrom_counts = sv_df['chrom'].value_counts().rename('sv_count').to_frame()
overlap_chrom_counts = overlap_df['chrom'].value_counts().rename('overlap_count').to_frame()
# 合并统计结果
chrom_stats = pos1_chrom_counts.join(sv_chrom_counts, how='outer').join(overlap_chrom_counts, how='outer').fillna(0)
chrom_stats = chrom_stats.astype(int)
# 计算重叠比例(相对于各自染色体的总区间数)
chrom_stats['overlap_ratio_pos1'] = chrom_stats.apply(
lambda row: row['overlap_count'] / row['pos1_count'] if row['pos1_count'] > 0 else 0, axis=1
)
chrom_stats['overlap_ratio_sv'] = chrom_stats.apply(
lambda row: row['overlap_count'] / row['sv_count'] if row['sv_count'] > 0 else 0, axis=1
)
# 按染色体顺序排序
chrom_stats = chrom_stats.reindex(sort_chromosomes(chrom_stats.index))
# 保存染色体统计
chrom_stats.to_csv(os.path.join(output_dir, 'chromosome_stats.csv'), encoding='utf-8')
print("染色体层面统计已保存到 chromosome_stats.csv")
return chrom_stats
def generate_statistics(pos1_df, sv_df, overlap_df, chrom_stats, output_dir):
"""生成总体统计结果"""
stats = {
'pos1总区间数': len(pos1_df),
'filtered_sv总区间数': len(sv_df),
'重叠区间数': len(overlap_df),
'重叠区间占pos1比例': f"{len(overlap_df)/len(pos1_df):.2%}" if len(pos1_df) > 0 else '0%',
'重叠区间占sv比例': f"{len(overlap_df)/len(sv_df):.2%}" if len(sv_df) > 0 else '0%',
'平均重叠长度': f"{overlap_df['overlap_length'].mean():.1f}" if not overlap_df.empty else '0',
'最大重叠长度': f"{overlap_df['overlap_length'].max()}" if not overlap_df.empty else '0',
'最小重叠长度': f"{overlap_df['overlap_length'].min()}" if not overlap_df.empty else '0',
'含重叠区间的染色体数': sum(chrom_stats['overlap_count'] > 0)
}
with open(os.path.join(output_dir, 'statistics.txt'), 'w', encoding='utf-8') as f:
for key, value in stats.items():
f.write(f"{key}: {value}\n")
if not overlap_df.empty:
overlap_df.to_csv(os.path.join(output_dir, 'overlap_details.csv'), index=False, encoding='utf-8')
print("总体统计结果已保存")
return stats
def plot_chromosome_distributions(pos1_df, sv_df, overlap_df, chrom_stats, output_pdf):
"""绘制染色体分布相关图表并添加到PDF"""
with PdfPages(output_pdf) as pdf:
# 1. 各染色体区间数量对比(堆叠条形图)
plt.figure(figsize=(14, 6))
chrom_order = sort_chromosomes(chrom_stats.index)
bar_width = 0.6
# 绘制pos1和sv的区间数
plt.bar(chrom_order, chrom_stats['pos1_count'], width=bar_width, label='pos1区间数', color='skyblue', alpha=0.8)
plt.bar(chrom_order, chrom_stats['sv_count'], width=bar_width, label='filtered_sv区间数', color='orange', alpha=0.6, bottom=chrom_stats['pos1_count'])
# 叠加重叠区间数(用黑色线条标记)
for i, chrom in enumerate(chrom_order):
overlap_count = chrom_stats.loc[chrom, 'overlap_count']
if overlap_count > 0:
plt.text(i, chrom_stats.loc[chrom, 'pos1_count'] + chrom_stats.loc[chrom, 'sv_count'] + 5,
f"重叠: {overlap_count}", ha='center', fontsize=8, color='black')
plt.title('各染色体区间数量分布')
plt.xlabel('染色体')
plt.ylabel('区间数量')
plt.xticks(rotation=45, ha='right')
plt.legend()
plt.tight_layout()
pdf.savefig()
plt.close()
# 2. 各染色体重叠比例(双轴图)
if not overlap_df.empty:
plt.figure(figsize=(14, 6))
ax1 = plt.gca()
ax2 = ax1.twinx()
# 左侧轴:重叠区间数量
ax1.bar(chrom_order, chrom_stats['overlap_count'], width=0.4, label='重叠区间数', color='green', alpha=0.7)
ax1.set_ylabel('重叠区间数量', color='green')
ax1.tick_params(axis='y', labelcolor='green')
# 右侧轴:重叠比例(相对于pos1和sv)
ax2.plot(chrom_order, chrom_stats['overlap_ratio_pos1'], label='重叠/pos1比例', color='blue', marker='o')
ax2.plot(chrom_order, chrom_stats['overlap_ratio_sv'], label='重叠/sv比例', color='red', marker='s')
ax2.set_ylabel('重叠比例', color='black')
ax2.set_ylim(0, 1.1) # 比例范围0-1
# 合并图例
lines1, labels1 = ax1.get_legend_handles_labels()
lines2, labels2 = ax2.get_legend_handles_labels()
ax1.legend(lines1 + lines2, labels1 + labels2, loc='upper left')
plt.title('各染色体重叠区间数量与比例')
plt.xlabel('染色体')
plt.xticks(rotation=45, ha='right')
plt.tight_layout()
pdf.savefig()
plt.close()
# 3. 各染色体类型分布(堆叠条形图)
plt.figure(figsize=(16, 8))
# pos1各染色体类型分布
plt.subplot(2, 1, 1)
pos1_chrom_type = pos1_df.groupby(['chrom', 'type']).size().unstack(fill_value=0)
pos1_chrom_type = pos1_chrom_type.reindex(sort_chromosomes(pos1_chrom_type.index))
pos1_chrom_type.plot(kind='bar', stacked=True, width=0.8, ax=plt.gca())
plt.title('pos1各染色体区间类型分布')
plt.xlabel('染色体')
plt.ylabel('区间数量')
plt.xticks(rotation=45, ha='right')
plt.legend(title='类型', bbox_to_anchor=(1.05, 1), loc='upper left')
# sv各染色体类型分布
plt.subplot(2, 1, 2)
sv_chrom_type = sv_df.groupby(['chrom', 'type']).size().unstack(fill_value=0)
sv_chrom_type = sv_chrom_type.reindex(sort_chromosomes(sv_chrom_type.index))
sv_chrom_type.plot(kind='bar', stacked=True, width=0.8, ax=plt.gca())
plt.title('filtered_sv各染色体区间类型分布')
plt.xlabel('染色体')
plt.ylabel('区间数量')
plt.xticks(rotation=45, ha='right')
plt.legend(title='类型', bbox_to_anchor=(1.05, 1), loc='upper left')
plt.tight_layout()
pdf.savefig()
plt.close()
# 4. 原有基础图表(保留之前的类型和长度分析)
# 类型分布对比
plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
pos1_type_counts = pos1_df['type'].value_counts()
sns.barplot(x=pos1_type_counts.index, y=pos1_type_counts.values, palette='Blues_d')
plt.title('pos1区间类型分布')
plt.xlabel('类型')
plt.ylabel('数量')
plt.xticks(rotation=45)
plt.subplot(1, 2, 2)
sv_type_counts = sv_df['type'].value_counts()
sns.barplot(x=sv_type_counts.index, y=sv_type_counts.values, palette='Oranges_d')
plt.title('filtered_sv区间类型分布')
plt.xlabel('类型')
plt.ylabel('数量')
plt.xticks(rotation=45)
plt.tight_layout()
pdf.savefig()
plt.close()
# 重叠类型对热力图
if not overlap_df.empty:
plt.figure(figsize=(10, 8))
type_pairs = overlap_df.groupby(['pos1_type', 'sv_type']).size().unstack(fill_value=0)
sns.heatmap(type_pairs, annot=True, fmt='d', cmap='YlGnBu')
plt.title('重叠区间的类型对分布')
plt.tight_layout()
pdf.savefig()
plt.close()
# 重叠长度分布
if not overlap_df.empty:
plt.figure(figsize=(10, 6))
sns.histplot(np.log10(overlap_df['overlap_length']), kde=True, color='green', bins=30)
plt.title('重叠区间长度分布 (log10)')
plt.xlabel('重叠长度 (log10)')
plt.ylabel('频数')
plt.tight_layout()
pdf.savefig()
plt.close()
print(f"染色体分布图表已保存到PDF: {output_pdf}")
def main():
parser = argparse.ArgumentParser(description='分析pos1和filtered_sv.csv的染色体分布及重叠区间')
parser.add_argument('pos1_file', help='pos1文件的路径')
parser.add_argument('sv_file', help='filtered_sv.csv文件的路径')
parser.add_argument('-o', '--output', default='sv_chrom_analysis', help='结果输出目录(默认:sv_chrom_analysis)')
args = parser.parse_args()
# 读取数据
print(f"正在读取pos1文件: {args.pos1_file}")
pos1_df = read_pos1(args.pos1_file)
print(f"正在读取filtered_sv文件: {args.sv_file}")
sv_df = read_filtered_sv(args.sv_file)
if pos1_df is None or sv_df is None:
print("文件读取失败,程序退出")
return
# 计算重叠区间
print("正在计算重叠区间...")
overlap_df = find_overlapping_regions(pos1_df, sv_df)
print(f"找到 {len(overlap_df)} 个重叠区间")
# 创建输出目录
output_dir = args.output
os.makedirs(output_dir, exist_ok=True)
print(f"结果将保存到目录: {output_dir}")
# 生成染色体层面统计
chrom_stats = generate_chromosome_stats(pos1_df, sv_df, overlap_df, output_dir)
# 生成总体统计
generate_statistics(pos1_df, sv_df, overlap_df, chrom_stats, output_dir)
# 绘制所有图表(含染色体分布)
output_pdf = os.path.join(output_dir, 'chromosome_analysis_plots.pdf')
plot_chromosome_distributions(pos1_df, sv_df, overlap_df, chrom_stats, output_pdf)
print("分析完成!")
if __name__ == "__main__":
main()