vcf变异信息提取及目标区间比对

提取sv信息

import csv

def extract_and_filter_sv(vcf_input_path, csv_output_path):
    """
    从VCF文件提取并过滤SV:保留长度≥30bp且类型非BND的SV
    :param vcf_input_path: 输入VCF文件路径
    :param csv_output_path: 输出CSV文件路径
    """
    headers = ["染色体(CHROM)", "起始位置(POS)", "结束位置(END)", "SV类型(SVTYPE)", "长度(bp)"]
    
    with open(vcf_input_path, "r", encoding="utf-8") as vcf_file, \
         open(csv_output_path, "w", newline="", encoding="utf-8") as csv_file:
        
        csv_writer = csv.writer(csv_file)
        csv_writer.writerow(headers)
        
        for line in vcf_file:
            line = line.strip()
            if line.startswith("#"):
                continue  # 跳过注释行
            
            columns = line.split("\t")
            chrom = columns[0]
            pos = columns[1]
            info_field = columns[7]
            
            # 解析INFO字段:提取SVTYPE、END、SVLEN
            sv_type = "未知"
            end_pos = "未知"
            sv_len = None  # 用于存储SV长度(优先取SVLEN,无则用END-POS)
            info_items = info_field.split(";")
            
            for item in info_items:
                if item.startswith("SVTYPE="):
                    sv_type = item.split("=")[1]
                elif item.startswith("END="):
                    end_pos = item.split("=")[1]
                elif item.startswith("SVLEN="):
                    # SVLEN可能为负数(如DEL表示缺失长度),取绝对值
                    try:
                        sv_len = abs(int(item.split("=")[1]))
                    except (ValueError, IndexError):
                        sv_len = None  # 解析失败则忽略
            
            # 过滤条件1:排除BND类型
            if sv_type == "BND":
                continue
            
            # 计算SV长度(优先用SVLEN,无则用END-POS)
            length = None
            if sv_len is not None:
                length = sv_len
            else:
                # 尝试用END-POS计算(需确保pos和end_pos为数字)
                try:
                    pos_int = int(pos)
                    end_int = int(end_pos)
                    length = end_int - pos_int + 1  # 包含起止位置的总长度
                except (ValueError, TypeError):
                    # 若pos或end_pos无法转换为数字,视为长度无效
                    continue
            
            # 过滤条件2:保留长度≥30bp的SV
            if length is not None and length >= 30:
                csv_writer.writerow([chrom, pos, end_pos, sv_type, length])
    
    print(f"过滤完成!结果已保存至:{csv_output_path}")

# 配置路径
INPUT_VCF = "IR64pb1.vcf"    # 输入VCF文件路径
OUTPUT_CSV = "filtered_sv.csv"   # 输出过滤后的CSV路径

if __name__ == "__main__":
    extract_and_filter_sv(INPUT_VCF, OUTPUT_CSV)
提取序列
绘图
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import os
import argparse
from matplotlib.backends.backend_pdf import PdfPages
import re

# 设置中文显示
plt.rcParams["font.family"] = ["SimHei", "WenQuanYi Micro Hei", "Heiti TC"]
plt.rcParams["axes.unicode_minus"] = False
plt.rcParams["figure.dpi"] = 300


def read_pos1(file_path):
    """读取pos1文件(染色体、start、end、类型)"""
    try:
        if not os.path.exists(file_path):
            raise FileNotFoundError(f"文件不存在: {file_path}")
        
        df = pd.read_csv(
            file_path,
            sep='\t',
            header=None,
            dtype=str
        )
        if df.shape[1] != 4:
            raise ValueError(f"pos1文件应为4列,实际为{df.shape[1]}列")
        
        df.columns = ['chrom', 'start', 'end', 'type']
        df['start'] = pd.to_numeric(df['start'], errors='coerce')
        df['end'] = pd.to_numeric(df['end'], errors='coerce')
        
        invalid = df[df['start'].isna() | df['end'].isna()]
        if not invalid.empty:
            print(f"pos1文件中有{len(invalid)}行位置转换失败,已排除")
        df = df.dropna(subset=['start', 'end'])
        
        df['start'], df['end'] = np.where(
            df['start'] > df['end'],
            [df['end'], df['start']],
            [df['start'], df['end']]
        )
        df['length'] = df['end'] - df['start'] + 1
        return df
    except Exception as e:
        print(f"读取pos1失败: {e}")
        return None


def read_filtered_sv(file_path):
    """读取filtered_sv.csv,自动适配列结构"""
    try:
        if not os.path.exists(file_path):
            raise FileNotFoundError(f"文件不存在: {file_path}")
        
        df = pd.read_csv(
            file_path,
            sep=',',
            header=None,
            dtype=str
        )
        print("\nfiltered_sv.csv前2行预览:")
        print(df.head(2))
        
        if df.shape[1] < 4:
            raise ValueError(f"filtered_sv.csv至少需要4列,实际为{df.shape[1]}列")
        
        numeric_cols = []
        for col in df.columns:
            if pd.to_numeric(df[col], errors='coerce').notna().mean() > 0.8:
                numeric_cols.append(col)
        
        if len(numeric_cols) < 2:
            raise ValueError("未找到足够的数值列(推测为start和end)")
        start_col, end_col = numeric_cols[0], numeric_cols[1]
        
        type_candidates = [col for col in df.columns if col not in numeric_cols and col != 0]
        type_col = None
        for col in type_candidates:
            if df[col].str.contains('DEL|INS|DUP|INV', case=False).any():
                type_col = col
                break
        if type_col is None:
            type_col = type_candidates[0]
            print(f"警告:默认使用第{type_col+1}列作为类型列")
        
        df_parsed = pd.DataFrame({
            'chrom': df[0],
            'start': pd.to_numeric(df[start_col], errors='coerce'),
            'end': pd.to_numeric(df[end_col], errors='coerce'),
            'type': df[type_col]
        })
        
        invalid = df_parsed[df_parsed['start'].isna() | df_parsed['end'].isna()]
        if not invalid.empty:
            print(f"filtered_sv.csv中有{len(invalid)}行位置转换失败,已排除")
        df_parsed = df_parsed.dropna(subset=['start', 'end'])
        
        df_parsed['start'], df_parsed['end'] = np.where(
            df_parsed['start'] > df_parsed['end'],
            [df_parsed['end'], df_parsed['start']],
            [df_parsed['start'], df_parsed['end']]
        )
        df_parsed['length'] = df_parsed['end'] - df_parsed['start'] + 1
        return df_parsed
    except Exception as e:
        print(f"读取filtered_sv.csv失败: {e}")
        return None


def find_overlapping_regions(pos1_df, sv_df):
    """寻找两个文件中的重叠区间"""
    overlaps = []
    common_chroms = set(pos1_df['chrom']).intersection(set(sv_df['chrom']))
    
    for chrom in common_chroms:
        pos1_chrom = pos1_df[pos1_df['chrom'] == chrom].reset_index(drop=True)
        sv_chrom = sv_df[sv_df['chrom'] == chrom].reset_index(drop=True)
        
        for _, pos1_row in pos1_chrom.iterrows():
            p_start, p_end = pos1_row['start'], pos1_row['end']
            candidate_sv = sv_chrom[
                (sv_chrom['start'] <= p_end) & 
                (sv_chrom['end'] >= p_start)
            ]
            
            for _, sv_row in candidate_sv.iterrows():
                s_start, s_end = sv_row['start'], sv_row['end']
                overlap_start = max(p_start, s_start)
                overlap_end = min(p_end, s_end)
                overlap_length = overlap_end - overlap_start + 1
                
                overlaps.append({
                    'chrom': chrom,
                    'pos1_start': p_start, 'pos1_end': p_end, 'pos1_type': pos1_row['type'], 'pos1_length': pos1_row['length'],
                    'sv_start': s_start, 'sv_end': s_end, 'sv_type': sv_row['type'], 'sv_length': sv_row['length'],
                    'overlap_start': overlap_start, 'overlap_end': overlap_end, 'overlap_length': overlap_length,
                    'overlap_ratio_pos1': overlap_length / pos1_row['length'],
                    'overlap_ratio_sv': overlap_length / sv_row['length']
                })
    
    return pd.DataFrame(overlaps)


def sort_chromosomes(chrom_list):
    """将染色体按自然顺序排序(如chr1, chr2, ..., chr22,chrX,chrY)"""
    def chrom_key(chrom):
        # 提取染色体编号(去除非数字字符)
        num_match = re.search(r'\d+', chrom)
        if num_match:
            return (0, int(num_match.group()))  # 数字染色体排在前面
        elif 'X' in chrom:
            return (1, 0)  # X染色体
        elif 'Y' in chrom:
            return (1, 1)  # Y染色体
        else:
            return (2, chrom)  # 其他染色体
        
    return sorted(chrom_list, key=chrom_key)


def generate_chromosome_stats(pos1_df, sv_df, overlap_df, output_dir):
    """生成每条染色体的统计结果"""
    # 1. 各染色体区间数量统计
    pos1_chrom_counts = pos1_df['chrom'].value_counts().rename('pos1_count').to_frame()
    sv_chrom_counts = sv_df['chrom'].value_counts().rename('sv_count').to_frame()
    overlap_chrom_counts = overlap_df['chrom'].value_counts().rename('overlap_count').to_frame()
    
    # 合并统计结果
    chrom_stats = pos1_chrom_counts.join(sv_chrom_counts, how='outer').join(overlap_chrom_counts, how='outer').fillna(0)
    chrom_stats = chrom_stats.astype(int)
    
    # 计算重叠比例(相对于各自染色体的总区间数)
    chrom_stats['overlap_ratio_pos1'] = chrom_stats.apply(
        lambda row: row['overlap_count'] / row['pos1_count'] if row['pos1_count'] > 0 else 0, axis=1
    )
    chrom_stats['overlap_ratio_sv'] = chrom_stats.apply(
        lambda row: row['overlap_count'] / row['sv_count'] if row['sv_count'] > 0 else 0, axis=1
    )
    
    # 按染色体顺序排序
    chrom_stats = chrom_stats.reindex(sort_chromosomes(chrom_stats.index))
    
    # 保存染色体统计
    chrom_stats.to_csv(os.path.join(output_dir, 'chromosome_stats.csv'), encoding='utf-8')
    print("染色体层面统计已保存到 chromosome_stats.csv")
    return chrom_stats


def generate_statistics(pos1_df, sv_df, overlap_df, chrom_stats, output_dir):
    """生成总体统计结果"""
    stats = {
        'pos1总区间数': len(pos1_df),
        'filtered_sv总区间数': len(sv_df),
        '重叠区间数': len(overlap_df),
        '重叠区间占pos1比例': f"{len(overlap_df)/len(pos1_df):.2%}" if len(pos1_df) > 0 else '0%',
        '重叠区间占sv比例': f"{len(overlap_df)/len(sv_df):.2%}" if len(sv_df) > 0 else '0%',
        '平均重叠长度': f"{overlap_df['overlap_length'].mean():.1f}" if not overlap_df.empty else '0',
        '最大重叠长度': f"{overlap_df['overlap_length'].max()}" if not overlap_df.empty else '0',
        '最小重叠长度': f"{overlap_df['overlap_length'].min()}" if not overlap_df.empty else '0',
        '含重叠区间的染色体数': sum(chrom_stats['overlap_count'] > 0)
    }
    
    with open(os.path.join(output_dir, 'statistics.txt'), 'w', encoding='utf-8') as f:
        for key, value in stats.items():
            f.write(f"{key}: {value}\n")
    
    if not overlap_df.empty:
        overlap_df.to_csv(os.path.join(output_dir, 'overlap_details.csv'), index=False, encoding='utf-8')
    
    print("总体统计结果已保存")
    return stats


def plot_chromosome_distributions(pos1_df, sv_df, overlap_df, chrom_stats, output_pdf):
    """绘制染色体分布相关图表并添加到PDF"""
    with PdfPages(output_pdf) as pdf:
        # 1. 各染色体区间数量对比(堆叠条形图)
        plt.figure(figsize=(14, 6))
        chrom_order = sort_chromosomes(chrom_stats.index)
        bar_width = 0.6
        
        # 绘制pos1和sv的区间数
        plt.bar(chrom_order, chrom_stats['pos1_count'], width=bar_width, label='pos1区间数', color='skyblue', alpha=0.8)
        plt.bar(chrom_order, chrom_stats['sv_count'], width=bar_width, label='filtered_sv区间数', color='orange', alpha=0.6, bottom=chrom_stats['pos1_count'])
        
        # 叠加重叠区间数(用黑色线条标记)
        for i, chrom in enumerate(chrom_order):
            overlap_count = chrom_stats.loc[chrom, 'overlap_count']
            if overlap_count > 0:
                plt.text(i, chrom_stats.loc[chrom, 'pos1_count'] + chrom_stats.loc[chrom, 'sv_count'] + 5, 
                         f"重叠: {overlap_count}", ha='center', fontsize=8, color='black')
        
        plt.title('各染色体区间数量分布')
        plt.xlabel('染色体')
        plt.ylabel('区间数量')
        plt.xticks(rotation=45, ha='right')
        plt.legend()
        plt.tight_layout()
        pdf.savefig()
        plt.close()
        
        # 2. 各染色体重叠比例(双轴图)
        if not overlap_df.empty:
            plt.figure(figsize=(14, 6))
            ax1 = plt.gca()
            ax2 = ax1.twinx()
            
            # 左侧轴:重叠区间数量
            ax1.bar(chrom_order, chrom_stats['overlap_count'], width=0.4, label='重叠区间数', color='green', alpha=0.7)
            ax1.set_ylabel('重叠区间数量', color='green')
            ax1.tick_params(axis='y', labelcolor='green')
            
            # 右侧轴:重叠比例(相对于pos1和sv)
            ax2.plot(chrom_order, chrom_stats['overlap_ratio_pos1'], label='重叠/pos1比例', color='blue', marker='o')
            ax2.plot(chrom_order, chrom_stats['overlap_ratio_sv'], label='重叠/sv比例', color='red', marker='s')
            ax2.set_ylabel('重叠比例', color='black')
            ax2.set_ylim(0, 1.1)  # 比例范围0-1
            
            # 合并图例
            lines1, labels1 = ax1.get_legend_handles_labels()
            lines2, labels2 = ax2.get_legend_handles_labels()
            ax1.legend(lines1 + lines2, labels1 + labels2, loc='upper left')
            
            plt.title('各染色体重叠区间数量与比例')
            plt.xlabel('染色体')
            plt.xticks(rotation=45, ha='right')
            plt.tight_layout()
            pdf.savefig()
            plt.close()
        
        # 3. 各染色体类型分布(堆叠条形图)
        plt.figure(figsize=(16, 8))
        
        # pos1各染色体类型分布
        plt.subplot(2, 1, 1)
        pos1_chrom_type = pos1_df.groupby(['chrom', 'type']).size().unstack(fill_value=0)
        pos1_chrom_type = pos1_chrom_type.reindex(sort_chromosomes(pos1_chrom_type.index))
        pos1_chrom_type.plot(kind='bar', stacked=True, width=0.8, ax=plt.gca())
        plt.title('pos1各染色体区间类型分布')
        plt.xlabel('染色体')
        plt.ylabel('区间数量')
        plt.xticks(rotation=45, ha='right')
        plt.legend(title='类型', bbox_to_anchor=(1.05, 1), loc='upper left')
        
        # sv各染色体类型分布
        plt.subplot(2, 1, 2)
        sv_chrom_type = sv_df.groupby(['chrom', 'type']).size().unstack(fill_value=0)
        sv_chrom_type = sv_chrom_type.reindex(sort_chromosomes(sv_chrom_type.index))
        sv_chrom_type.plot(kind='bar', stacked=True, width=0.8, ax=plt.gca())
        plt.title('filtered_sv各染色体区间类型分布')
        plt.xlabel('染色体')
        plt.ylabel('区间数量')
        plt.xticks(rotation=45, ha='right')
        plt.legend(title='类型', bbox_to_anchor=(1.05, 1), loc='upper left')
        
        plt.tight_layout()
        pdf.savefig()
        plt.close()
        
        # 4. 原有基础图表(保留之前的类型和长度分析)
        # 类型分布对比
        plt.figure(figsize=(12, 5))
        plt.subplot(1, 2, 1)
        pos1_type_counts = pos1_df['type'].value_counts()
        sns.barplot(x=pos1_type_counts.index, y=pos1_type_counts.values, palette='Blues_d')
        plt.title('pos1区间类型分布')
        plt.xlabel('类型')
        plt.ylabel('数量')
        plt.xticks(rotation=45)
        
        plt.subplot(1, 2, 2)
        sv_type_counts = sv_df['type'].value_counts()
        sns.barplot(x=sv_type_counts.index, y=sv_type_counts.values, palette='Oranges_d')
        plt.title('filtered_sv区间类型分布')
        plt.xlabel('类型')
        plt.ylabel('数量')
        plt.xticks(rotation=45)
        plt.tight_layout()
        pdf.savefig()
        plt.close()
        
        # 重叠类型对热力图
        if not overlap_df.empty:
            plt.figure(figsize=(10, 8))
            type_pairs = overlap_df.groupby(['pos1_type', 'sv_type']).size().unstack(fill_value=0)
            sns.heatmap(type_pairs, annot=True, fmt='d', cmap='YlGnBu')
            plt.title('重叠区间的类型对分布')
            plt.tight_layout()
            pdf.savefig()
            plt.close()
        
        # 重叠长度分布
        if not overlap_df.empty:
            plt.figure(figsize=(10, 6))
            sns.histplot(np.log10(overlap_df['overlap_length']), kde=True, color='green', bins=30)
            plt.title('重叠区间长度分布 (log10)')
            plt.xlabel('重叠长度 (log10)')
            plt.ylabel('频数')
            plt.tight_layout()
            pdf.savefig()
            plt.close()
    
    print(f"染色体分布图表已保存到PDF: {output_pdf}")


def main():
    parser = argparse.ArgumentParser(description='分析pos1和filtered_sv.csv的染色体分布及重叠区间')
    parser.add_argument('pos1_file', help='pos1文件的路径')
    parser.add_argument('sv_file', help='filtered_sv.csv文件的路径')
    parser.add_argument('-o', '--output', default='sv_chrom_analysis', help='结果输出目录(默认:sv_chrom_analysis)')
    args = parser.parse_args()
    
    # 读取数据
    print(f"正在读取pos1文件: {args.pos1_file}")
    pos1_df = read_pos1(args.pos1_file)
    print(f"正在读取filtered_sv文件: {args.sv_file}")
    sv_df = read_filtered_sv(args.sv_file)
    
    if pos1_df is None or sv_df is None:
        print("文件读取失败,程序退出")
        return
    
    # 计算重叠区间
    print("正在计算重叠区间...")
    overlap_df = find_overlapping_regions(pos1_df, sv_df)
    print(f"找到 {len(overlap_df)} 个重叠区间")
    
    # 创建输出目录
    output_dir = args.output
    os.makedirs(output_dir, exist_ok=True)
    print(f"结果将保存到目录: {output_dir}")
    
    # 生成染色体层面统计
    chrom_stats = generate_chromosome_stats(pos1_df, sv_df, overlap_df, output_dir)
    
    # 生成总体统计
    generate_statistics(pos1_df, sv_df, overlap_df, chrom_stats, output_dir)
    
    # 绘制所有图表(含染色体分布)
    output_pdf = os.path.join(output_dir, 'chromosome_analysis_plots.pdf')
    plot_chromosome_distributions(pos1_df, sv_df, overlap_df, chrom_stats, output_pdf)
    
    print("分析完成!")


if __name__ == "__main__":
    main()
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。