双序列全局比对

对于两条染色体片段序列的全局比对(强制比对全长,即使存在长度差异或局部不匹配),推荐使用基于Needleman-Wunsch 算法的工具,这类工具专门用于全局比对,能计算全长序列的整体相似度。以下是具体实现步骤:

推荐工具:EMBOSS Needle(经典全局比对工具)
EMBOSS(欧洲分子生物学开放软件套件)中的needle程序是实现 Needleman-Wunsch 算法的标准工具,支持 DNA / 蛋白质序列的全局比对,输出包括相似度、一致性、缺口罚分等关键指标,适合你的需求。

stretcher -asequence IR64_Chr01_17337793-18303009.fasta -bsequence 13-65_Chr01_16794358-17543870.fasta -outfile long_global_alignment.txt 

若两条序列长度不同(如你的示例中,IR64 片段约 965kb,13-65 片段约 750kb),全局比对会在较短序列的末端或中间插入缺口以强制全长对齐,此时Identity可能低于局部比对,但更能反映整体相似性。

软件下载
*   wget [ftp://ftp.ebi.ac.uk/pub/software/unix/EMBOSS/EMBOSS-6.6.0.tar.gz](ftp://emboss.open-bio.org/pub/EMBOSS/old/6.5.0/EMBOSS-6.5.7.tar.gz)

修改可执行权限
sudo chown -R fengting /public/home/fengting/soft/emboss/EMBOSS-6.6.0/emboss/.libs/
prettyplot -sequence global_alignment.txt -outfile alignment_plot.png

我想看对应染色体着丝粒区域与IR64序列的差异,就要写一个脚本,读取当前文件夹所有fasta文件,格式如IR64_Chr12_10885438-11747949.fasta,第一个_前是样本名,第二个是染色体名字,后面的为位置,将stretcher -asequence IR64_Chr01_17337793-18303009.fasta -bsequence 13-65_Chr01_16794358-17543870.fasta -outfile IR64_Chr01-13-65_Chr01.txt,命令打印出来,-asequence 全部用IR64的12个染色体名fasta文件,-bsequence用其它样本对应的染色体名fasta文件,-outfile为IR64染色体编号及对应样本对应染色体编号

#!/bin/bash

# 定义存储IR64文件和其他样本文件的关联数组
declare -A ir64_files  # 键:染色体名 (如Chr01),值:文件名
declare -A other_samples  # 键:原始样本名 (如Pulut-Hitam-2),值:处理后的样本名(替换-为_)和嵌套数组名

# 遍历当前目录所有fasta文件
for fasta in *.fasta; do
    [ -f "$fasta" ] || continue  # 跳过非文件

    # 解析文件名(样本名_染色体名_位置.fasta)
    prefix="${fasta%.fasta}"
    IFS='_' read -r sample chr pos <<< "$prefix"

    # 处理样本名:将-替换为_,避免变量名包含非法字符
    sample_safe="${sample//-/_}"  # 替换所有-为_

    # 分类存储
    if [ "$sample" = "IR64" ]; then
        ir64_files["$chr"]="$fasta"
    else
        # 若样本名在other_samples中不存在,初始化嵌套数组
        if [ -z "${other_samples[$sample]}" ]; then
            array_name="sample_${sample_safe}"  # 数组名:sample_处理后的样本名
            declare -A "$array_name"  # 定义嵌套数组
            other_samples["$sample"]="$array_name"  # 存储原始样本名对应的数组名
        fi
        # 将当前文件存入对应样本的染色体键中
        array_name="${other_samples[$sample]}"
        eval "${array_name}[$chr]='$fasta'"
    fi
done

# 检查IR64文件是否存在
if [ ${#ir64_files[@]} -eq 0 ]; then
    echo "错误:未找到IR64的fasta文件"
    exit 1
fi

# 提取IR64的所有染色体名,按数字升序排序
sorted_chrs=($(printf "%s\n" "${!ir64_files[@]}" | sort -V))

# 按排序后的染色体顺序生成命令
for chr in "${sorted_chrs[@]}"; do
    # 遍历所有其他样本(原始样本名)
    for sample in "${!other_samples[@]}"; do
        array_name="${other_samples[$sample]}"
        # 检查当前样本是否有对应染色体的文件
        eval "has_chr=\${${array_name}[$chr]+_exists_}"
        if [ -n "$has_chr" ]; then
            # 获取文件名
            ir64_file="${ir64_files[$chr]}"
            eval "sample_file=\${${array_name}[$chr]}"
            # 输出文件名(用原始样本名,保留-)
            outfile="IR64_${chr}-${sample}_${chr}.txt"
            # 打印命令
            echo "stretcher -asequence $ir64_file -bsequence $sample_file -outfile $outfile"
        fi
    done
done
1761556003378.png

结果提取:

import os
import re
import pandas as pd
import matplotlib.pyplot as plt
from typing import Dict, List

# -------------------------- 核心配置 --------------------------
LENGTH_LINE = 21    # # Length: xxx
IDENTITY_LINE = 22  # # Identity: xxx/xxx (xx.x%)
SIMILARITY_LINE = 23# # Similarity: xxx/xxx (xx.x%)
GAPS_LINE = 24      # # Gaps: xxx/xxx ( 6.7%)
SCORE_LINE = 25     # # Score: xxxxx
GROUP_FILE_PATH = "group.txt"
GROUP_SEP = "\t"
OUTPUT_CSV = "alignment_stats_with_groups.csv"
OUTPUT_PLOT = "sample_group_alignment_plot.png"


# -------------------------- 工具函数:提取txt信息 --------------------------
def extract_txt_info(txt_path: str) -> Dict[str, str or float or int]:
    file_name = os.path.basename(txt_path)
    name_pattern = r"^IR64_(Chr\d+)-([^_]+)_Chr\d+\.txt$"
    name_match = re.match(name_pattern, file_name)
    if not name_match:
        print(f"警告:跳过格式错误的文件 → {file_name}")
        return None

    info = {
        "file_name": file_name,
        "chromosome": name_match.group(1),
        "sample": name_match.group(2)  # 样本名(用于后续合并group)
    }

    try:
        with open(txt_path, "r", encoding="utf-8") as f:
            lines = [line.strip() for line in f]
    except Exception as e:
        print(f"错误:读取 {file_name} 失败 → {e}")
        return None

    if len(lines) < 25:
        print(f"警告:{file_name} 行数不足25行,跳过")
        return None

    # 提取长度
    length_match = re.search(r"# Length: (\d+)", lines[LENGTH_LINE-1])
    info["total_length"] = int(length_match.group(1)) if length_match else None
    # 提取一致性
    iden_match = re.search(r"# Identity: +(\d+)/(\d+) \(\s*([\d\.]+%)\)", lines[IDENTITY_LINE-1])
    if iden_match:
        info["identity_pct"] = float(iden_match.group(3)[:-1])
    # 提取相似性
    sim_match = re.search(r"# Similarity: +(\d+)/(\d+) \(\s*([\d\.]+%)\)", lines[SIMILARITY_LINE-1])
    if sim_match:
        info["similarity_pct"] = float(sim_match.group(3)[:-1])
    # 提取Gaps
    gaps_match = re.search(r"# Gaps: +(\d+)/(\d+) \(\s*([\d\.]+%)\)", lines[GAPS_LINE-1])
    if gaps_match:
        info["gaps_pct"] = float(gaps_match.group(3)[:-1])
    # 提取得分
    score_match = re.search(r"# Score: (-?\d+)", lines[SCORE_LINE-1])
    info["score"] = int(score_match.group(1)) if score_match else None

    # 过滤关键信息缺失的样本
    if any(key not in info or info[key] is None for key in ["identity_pct", "similarity_pct", "gaps_pct", "score"]):
        print(f"警告:{file_name} 关键信息缺失,跳过")
        return None

    return info


# -------------------------- 工具函数:读取group文件 --------------------------
def read_group_file() -> pd.DataFrame:
    if not os.path.exists(GROUP_FILE_PATH):
        print(f"错误:group文件 {GROUP_FILE_PATH} 不存在!")
        return None

    try:
        # 读取group文件,保留“样本名(ID列)”和所有分组列
        group_df = pd.read_csv(
            GROUP_FILE_PATH,
            sep=GROUP_SEP,
            header=0,
            dtype=str,
            na_filter=False
        )
    except Exception as e:
        print(f"错误:读取group文件失败 → {e}")
        return None

    # 检查必要列(必须有样本名ID列)
    if "ID" not in group_df.columns:
        print(f"错误:group文件缺少 'ID' 列(样本名列),无法合并")
        return None

    # 重命名分组列(简化后续引用,保留所有分组信息)
    group_df.rename(columns={
        "Other alias": "alias",
        "Eco-type": "eco_type"
    }, inplace=True)

    print(f"成功读取group文件:共 {len(group_df)} 个样本")
    return group_df


# -------------------------- 主流程 --------------------------
def main():
    # 1. 提取txt文件信息
    print("=== 步骤1:提取txt文件信息 ===")
    txt_infos = []
    for file in os.listdir("."):
        if file.endswith(".txt") and re.match(r"^IR64_Chr\d+-.*_Chr\d+\.txt$", file):
            info = extract_txt_info(file)
            if info:
                txt_infos.append(info)

    if not txt_infos:
        print("错误:未提取到有效txt信息,退出")
        return
    info_df = pd.DataFrame(txt_infos)
    print(f"成功提取 {len(info_df)} 个txt样本(示例:{info_df['sample'].unique()[:3]}...)")

    # 2. 读取group文件,按sample列合并(核心:info_df.sample ↔ group_df.ID)
    print("\n=== 步骤2:按sample列合并group信息 ===")
    group_df = read_group_file()
    if group_df is not None:
        # 按样本名合并(保留所有txt样本,未匹配到group的填Unknown)
        merged_df = pd.merge(
            info_df,
            group_df,
            left_on="sample",  # txt中的样本名
            right_on="ID",     # group中的样本名
            how="left"
        )
        # 处理未匹配到的分组信息
        for col in group_df.columns:
            if col != "ID":
                merged_df[col] = merged_df[col].fillna("Unknown")
        # 删除重复的ID列(与sample列一致)
        merged_df.drop(columns=["ID"], inplace=True)
    else:
        # 无group文件时,添加空分组列
        merged_df = info_df.copy()
        merged_df["taxa1"] = "NoGroup"
        merged_df["taxa2"] = "NoGroup"
        merged_df["eco_type"] = "NoGroup"
        print("警告:无group文件,分组信息设为NoGroup")

    # 保存合并后的数据
    merged_df.to_csv(OUTPUT_CSV, index=False, encoding="utf-8-sig")
    print(f"合并数据已保存 → {OUTPUT_CSV}(列数:{len(merged_df.columns)})")

    # 3. 按分组统计(用简单的agg格式,适配旧pandas版本)
    print("\n=== 步骤3:按分组统计关键指标 ===")
    # 确定分组列(优先用Taxa1,无则用eco_type,避免之前的合并列问题)
    group_col = "Taxa1" if "Taxa1" in merged_df.columns else "eco_type"
    if group_col not in merged_df.columns:
        group_col = "sample"  # 兜底:按样本名分组
    print(f"当前分组列:{group_col}(分组数量:{merged_df[group_col].nunique()})")

    # 【核心修改】用列表格式的agg参数(兼容性强,避免KeyError)
    # 格式:[(列名1, [统计函数1, 统计函数2]), (列名2, [统计函数1, 统计函数2])]
    stats_result = merged_df.groupby(group_col).agg([
        ("identity_pct", ["mean", "std", "count"]),
        ("similarity_pct", ["mean", "std", "count"]),
        ("gaps_pct", ["mean", "std", "count"]),
        ("score", ["mean", "std", "count"])
    ]).round(2)

    # 打印统计结果(按层级列名读取)
    print(f"\n各分组统计结果(分组列:{group_col}):")
    for group in stats_result.index:
        print(f"\n【{group}】(样本数:{stats_result.loc[group, ('identity_pct', 'count')]})")
        print(f"  一致性:{stats_result.loc[group, ('identity_pct', 'mean')]}% ± {stats_result.loc[group, ('identity_pct', 'std')]}")
        print(f"  相似性:{stats_result.loc[group, ('similarity_pct', 'mean')]}% ± {stats_result.loc[group, ('similarity_pct', 'std')]}")
        print(f"  缺口比例:{stats_result.loc[group, ('gaps_pct', 'mean')]}% ± {stats_result.loc[group, ('gaps_pct', 'std')]}")
        print(f"  得分:{stats_result.loc[group, ('score', 'mean')]} ± {stats_result.loc[group, ('score', 'std')]}")

    # 4. 绘制分组图表(基于统计结果)
    print("\n=== 步骤4:绘制分组统计图表 ===")
    plt.rcParams["font.sans-serif"] = ["DejaVu Sans", "SimHei"]
    plt.rcParams["axes.unicode_minus"] = False

    # 准备绘图数据(提取分组名和各指标均值)
    groups = stats_result.index.tolist()
    identity_mean = stats_result[("identity_pct", "mean")].tolist()
    similarity_mean = stats_result[("similarity_pct", "mean")].tolist()
    gaps_mean = stats_result[("gaps_pct", "mean")].tolist()
    sample_count = stats_result[("identity_pct", "count")].tolist()  # 样本数

    # 创建2×2子图
    fig, axes = plt.subplots(2, 2, figsize=(14, 10))
    fig.suptitle(f"样本分组统计(分组列:{group_col})", fontsize=16)
    axes = axes.flatten()

    # 1. 一致性图表
    axes[0].bar(groups, identity_mean, color="#FF6B6B", alpha=0.8)
    axes[0].set_title("一致性 (%)")
    axes[0].set_xlabel("分组(样本数)")
    axes[0].set_xticklabels([f"{g}\n(n={c})" for g, c in zip(groups, sample_count)], rotation=45, ha="right")
    axes[0].grid(axis="y", linestyle="--", alpha=0.5)
    # 添加数值标签
    for i, v in enumerate(identity_mean):
        axes[0].text(i, v+1, f"{v:.1f}", ha="center", va="bottom")

    # 2. 相似性图表
    axes[1].bar(groups, similarity_mean, color="#4ECDC4", alpha=0.8)
    axes[1].set_title("相似性 (%)")
    axes[1].set_xlabel("分组(样本数)")
    axes[1].set_xticklabels([f"{g}\n(n={c})" for g, c in zip(groups, sample_count)], rotation=45, ha="right")
    axes[1].grid(axis="y", linestyle="--", alpha=0.5)
    for i, v in enumerate(similarity_mean):
        axes[1].text(i, v+1, f"{v:.1f}", ha="center", va="bottom")

    # 3. 缺口比例图表
    axes[2].bar(groups, gaps_mean, color="#45B7D1", alpha=0.8)
    axes[2].set_title("缺口比例 (%)")
    axes[2].set_xlabel("分组(样本数)")
    axes[2].set_xticklabels([f"{g}\n(n={c})" for g, c in zip(groups, sample_count)], rotation=45, ha="right")
    axes[2].grid(axis="y", linestyle="--", alpha=0.5)
    for i, v in enumerate(gaps_mean):
        axes[2].text(i, v+1, f"{v:.1f}", ha="center", va="bottom")

    # 4. 得分图表
    axes[3].bar(groups, stats_result[("score", "mean")].tolist(), color="#96CEB4", alpha=0.8)
    axes[3].set_title("得分")
    axes[3].set_xlabel("分组(样本数)")
    axes[3].set_xticklabels([f"{g}\n(n={c})" for g, c in zip(groups, sample_count)], rotation=45, ha="right")
    axes[3].grid(axis="y", linestyle="--", alpha=0.5)
    for i, v in enumerate(stats_result[("score", "mean")].tolist()):
        axes[3].text(i, v + (max(stats_result[("score", "mean")].tolist())*0.05), f"{v:.0f}", ha="center", va="bottom")

    plt.tight_layout()
    plt.savefig(OUTPUT_PLOT, dpi=300, bbox_inches="tight")
    print(f"图表已保存 → {OUTPUT_PLOT}")

    print("\n=== 所有流程完成! ===")
    print(f"1. 合并数据:{OUTPUT_CSV}")
    print(f"2. 统计图表:{OUTPUT_PLOT}")


if __name__ == "__main__":
    main()
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容