对于两条染色体片段序列的全局比对(强制比对全长,即使存在长度差异或局部不匹配),推荐使用基于Needleman-Wunsch 算法的工具,这类工具专门用于全局比对,能计算全长序列的整体相似度。以下是具体实现步骤:
推荐工具:EMBOSS Needle(经典全局比对工具)
EMBOSS(欧洲分子生物学开放软件套件)中的needle程序是实现 Needleman-Wunsch 算法的标准工具,支持 DNA / 蛋白质序列的全局比对,输出包括相似度、一致性、缺口罚分等关键指标,适合你的需求。
stretcher -asequence IR64_Chr01_17337793-18303009.fasta -bsequence 13-65_Chr01_16794358-17543870.fasta -outfile long_global_alignment.txt
若两条序列长度不同(如你的示例中,IR64 片段约 965kb,13-65 片段约 750kb),全局比对会在较短序列的末端或中间插入缺口以强制全长对齐,此时Identity可能低于局部比对,但更能反映整体相似性。

软件下载
* wget [ftp://ftp.ebi.ac.uk/pub/software/unix/EMBOSS/EMBOSS-6.6.0.tar.gz](ftp://emboss.open-bio.org/pub/EMBOSS/old/6.5.0/EMBOSS-6.5.7.tar.gz)
修改可执行权限
sudo chown -R fengting /public/home/fengting/soft/emboss/EMBOSS-6.6.0/emboss/.libs/
prettyplot -sequence global_alignment.txt -outfile alignment_plot.png
我想看对应染色体着丝粒区域与IR64序列的差异,就要写一个脚本,读取当前文件夹所有fasta文件,格式如IR64_Chr12_10885438-11747949.fasta,第一个_前是样本名,第二个是染色体名字,后面的为位置,将stretcher -asequence IR64_Chr01_17337793-18303009.fasta -bsequence 13-65_Chr01_16794358-17543870.fasta -outfile IR64_Chr01-13-65_Chr01.txt,命令打印出来,-asequence 全部用IR64的12个染色体名fasta文件,-bsequence用其它样本对应的染色体名fasta文件,-outfile为IR64染色体编号及对应样本对应染色体编号
#!/bin/bash
# 定义存储IR64文件和其他样本文件的关联数组
declare -A ir64_files # 键:染色体名 (如Chr01),值:文件名
declare -A other_samples # 键:原始样本名 (如Pulut-Hitam-2),值:处理后的样本名(替换-为_)和嵌套数组名
# 遍历当前目录所有fasta文件
for fasta in *.fasta; do
[ -f "$fasta" ] || continue # 跳过非文件
# 解析文件名(样本名_染色体名_位置.fasta)
prefix="${fasta%.fasta}"
IFS='_' read -r sample chr pos <<< "$prefix"
# 处理样本名:将-替换为_,避免变量名包含非法字符
sample_safe="${sample//-/_}" # 替换所有-为_
# 分类存储
if [ "$sample" = "IR64" ]; then
ir64_files["$chr"]="$fasta"
else
# 若样本名在other_samples中不存在,初始化嵌套数组
if [ -z "${other_samples[$sample]}" ]; then
array_name="sample_${sample_safe}" # 数组名:sample_处理后的样本名
declare -A "$array_name" # 定义嵌套数组
other_samples["$sample"]="$array_name" # 存储原始样本名对应的数组名
fi
# 将当前文件存入对应样本的染色体键中
array_name="${other_samples[$sample]}"
eval "${array_name}[$chr]='$fasta'"
fi
done
# 检查IR64文件是否存在
if [ ${#ir64_files[@]} -eq 0 ]; then
echo "错误:未找到IR64的fasta文件"
exit 1
fi
# 提取IR64的所有染色体名,按数字升序排序
sorted_chrs=($(printf "%s\n" "${!ir64_files[@]}" | sort -V))
# 按排序后的染色体顺序生成命令
for chr in "${sorted_chrs[@]}"; do
# 遍历所有其他样本(原始样本名)
for sample in "${!other_samples[@]}"; do
array_name="${other_samples[$sample]}"
# 检查当前样本是否有对应染色体的文件
eval "has_chr=\${${array_name}[$chr]+_exists_}"
if [ -n "$has_chr" ]; then
# 获取文件名
ir64_file="${ir64_files[$chr]}"
eval "sample_file=\${${array_name}[$chr]}"
# 输出文件名(用原始样本名,保留-)
outfile="IR64_${chr}-${sample}_${chr}.txt"
# 打印命令
echo "stretcher -asequence $ir64_file -bsequence $sample_file -outfile $outfile"
fi
done
done

1761556003378.png
结果提取:
import os
import re
import pandas as pd
import matplotlib.pyplot as plt
from typing import Dict, List
# -------------------------- 核心配置 --------------------------
LENGTH_LINE = 21 # # Length: xxx
IDENTITY_LINE = 22 # # Identity: xxx/xxx (xx.x%)
SIMILARITY_LINE = 23# # Similarity: xxx/xxx (xx.x%)
GAPS_LINE = 24 # # Gaps: xxx/xxx ( 6.7%)
SCORE_LINE = 25 # # Score: xxxxx
GROUP_FILE_PATH = "group.txt"
GROUP_SEP = "\t"
OUTPUT_CSV = "alignment_stats_with_groups.csv"
OUTPUT_PLOT = "sample_group_alignment_plot.png"
# -------------------------- 工具函数:提取txt信息 --------------------------
def extract_txt_info(txt_path: str) -> Dict[str, str or float or int]:
file_name = os.path.basename(txt_path)
name_pattern = r"^IR64_(Chr\d+)-([^_]+)_Chr\d+\.txt$"
name_match = re.match(name_pattern, file_name)
if not name_match:
print(f"警告:跳过格式错误的文件 → {file_name}")
return None
info = {
"file_name": file_name,
"chromosome": name_match.group(1),
"sample": name_match.group(2) # 样本名(用于后续合并group)
}
try:
with open(txt_path, "r", encoding="utf-8") as f:
lines = [line.strip() for line in f]
except Exception as e:
print(f"错误:读取 {file_name} 失败 → {e}")
return None
if len(lines) < 25:
print(f"警告:{file_name} 行数不足25行,跳过")
return None
# 提取长度
length_match = re.search(r"# Length: (\d+)", lines[LENGTH_LINE-1])
info["total_length"] = int(length_match.group(1)) if length_match else None
# 提取一致性
iden_match = re.search(r"# Identity: +(\d+)/(\d+) \(\s*([\d\.]+%)\)", lines[IDENTITY_LINE-1])
if iden_match:
info["identity_pct"] = float(iden_match.group(3)[:-1])
# 提取相似性
sim_match = re.search(r"# Similarity: +(\d+)/(\d+) \(\s*([\d\.]+%)\)", lines[SIMILARITY_LINE-1])
if sim_match:
info["similarity_pct"] = float(sim_match.group(3)[:-1])
# 提取Gaps
gaps_match = re.search(r"# Gaps: +(\d+)/(\d+) \(\s*([\d\.]+%)\)", lines[GAPS_LINE-1])
if gaps_match:
info["gaps_pct"] = float(gaps_match.group(3)[:-1])
# 提取得分
score_match = re.search(r"# Score: (-?\d+)", lines[SCORE_LINE-1])
info["score"] = int(score_match.group(1)) if score_match else None
# 过滤关键信息缺失的样本
if any(key not in info or info[key] is None for key in ["identity_pct", "similarity_pct", "gaps_pct", "score"]):
print(f"警告:{file_name} 关键信息缺失,跳过")
return None
return info
# -------------------------- 工具函数:读取group文件 --------------------------
def read_group_file() -> pd.DataFrame:
if not os.path.exists(GROUP_FILE_PATH):
print(f"错误:group文件 {GROUP_FILE_PATH} 不存在!")
return None
try:
# 读取group文件,保留“样本名(ID列)”和所有分组列
group_df = pd.read_csv(
GROUP_FILE_PATH,
sep=GROUP_SEP,
header=0,
dtype=str,
na_filter=False
)
except Exception as e:
print(f"错误:读取group文件失败 → {e}")
return None
# 检查必要列(必须有样本名ID列)
if "ID" not in group_df.columns:
print(f"错误:group文件缺少 'ID' 列(样本名列),无法合并")
return None
# 重命名分组列(简化后续引用,保留所有分组信息)
group_df.rename(columns={
"Other alias": "alias",
"Eco-type": "eco_type"
}, inplace=True)
print(f"成功读取group文件:共 {len(group_df)} 个样本")
return group_df
# -------------------------- 主流程 --------------------------
def main():
# 1. 提取txt文件信息
print("=== 步骤1:提取txt文件信息 ===")
txt_infos = []
for file in os.listdir("."):
if file.endswith(".txt") and re.match(r"^IR64_Chr\d+-.*_Chr\d+\.txt$", file):
info = extract_txt_info(file)
if info:
txt_infos.append(info)
if not txt_infos:
print("错误:未提取到有效txt信息,退出")
return
info_df = pd.DataFrame(txt_infos)
print(f"成功提取 {len(info_df)} 个txt样本(示例:{info_df['sample'].unique()[:3]}...)")
# 2. 读取group文件,按sample列合并(核心:info_df.sample ↔ group_df.ID)
print("\n=== 步骤2:按sample列合并group信息 ===")
group_df = read_group_file()
if group_df is not None:
# 按样本名合并(保留所有txt样本,未匹配到group的填Unknown)
merged_df = pd.merge(
info_df,
group_df,
left_on="sample", # txt中的样本名
right_on="ID", # group中的样本名
how="left"
)
# 处理未匹配到的分组信息
for col in group_df.columns:
if col != "ID":
merged_df[col] = merged_df[col].fillna("Unknown")
# 删除重复的ID列(与sample列一致)
merged_df.drop(columns=["ID"], inplace=True)
else:
# 无group文件时,添加空分组列
merged_df = info_df.copy()
merged_df["taxa1"] = "NoGroup"
merged_df["taxa2"] = "NoGroup"
merged_df["eco_type"] = "NoGroup"
print("警告:无group文件,分组信息设为NoGroup")
# 保存合并后的数据
merged_df.to_csv(OUTPUT_CSV, index=False, encoding="utf-8-sig")
print(f"合并数据已保存 → {OUTPUT_CSV}(列数:{len(merged_df.columns)})")
# 3. 按分组统计(用简单的agg格式,适配旧pandas版本)
print("\n=== 步骤3:按分组统计关键指标 ===")
# 确定分组列(优先用Taxa1,无则用eco_type,避免之前的合并列问题)
group_col = "Taxa1" if "Taxa1" in merged_df.columns else "eco_type"
if group_col not in merged_df.columns:
group_col = "sample" # 兜底:按样本名分组
print(f"当前分组列:{group_col}(分组数量:{merged_df[group_col].nunique()})")
# 【核心修改】用列表格式的agg参数(兼容性强,避免KeyError)
# 格式:[(列名1, [统计函数1, 统计函数2]), (列名2, [统计函数1, 统计函数2])]
stats_result = merged_df.groupby(group_col).agg([
("identity_pct", ["mean", "std", "count"]),
("similarity_pct", ["mean", "std", "count"]),
("gaps_pct", ["mean", "std", "count"]),
("score", ["mean", "std", "count"])
]).round(2)
# 打印统计结果(按层级列名读取)
print(f"\n各分组统计结果(分组列:{group_col}):")
for group in stats_result.index:
print(f"\n【{group}】(样本数:{stats_result.loc[group, ('identity_pct', 'count')]})")
print(f" 一致性:{stats_result.loc[group, ('identity_pct', 'mean')]}% ± {stats_result.loc[group, ('identity_pct', 'std')]}")
print(f" 相似性:{stats_result.loc[group, ('similarity_pct', 'mean')]}% ± {stats_result.loc[group, ('similarity_pct', 'std')]}")
print(f" 缺口比例:{stats_result.loc[group, ('gaps_pct', 'mean')]}% ± {stats_result.loc[group, ('gaps_pct', 'std')]}")
print(f" 得分:{stats_result.loc[group, ('score', 'mean')]} ± {stats_result.loc[group, ('score', 'std')]}")
# 4. 绘制分组图表(基于统计结果)
print("\n=== 步骤4:绘制分组统计图表 ===")
plt.rcParams["font.sans-serif"] = ["DejaVu Sans", "SimHei"]
plt.rcParams["axes.unicode_minus"] = False
# 准备绘图数据(提取分组名和各指标均值)
groups = stats_result.index.tolist()
identity_mean = stats_result[("identity_pct", "mean")].tolist()
similarity_mean = stats_result[("similarity_pct", "mean")].tolist()
gaps_mean = stats_result[("gaps_pct", "mean")].tolist()
sample_count = stats_result[("identity_pct", "count")].tolist() # 样本数
# 创建2×2子图
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
fig.suptitle(f"样本分组统计(分组列:{group_col})", fontsize=16)
axes = axes.flatten()
# 1. 一致性图表
axes[0].bar(groups, identity_mean, color="#FF6B6B", alpha=0.8)
axes[0].set_title("一致性 (%)")
axes[0].set_xlabel("分组(样本数)")
axes[0].set_xticklabels([f"{g}\n(n={c})" for g, c in zip(groups, sample_count)], rotation=45, ha="right")
axes[0].grid(axis="y", linestyle="--", alpha=0.5)
# 添加数值标签
for i, v in enumerate(identity_mean):
axes[0].text(i, v+1, f"{v:.1f}", ha="center", va="bottom")
# 2. 相似性图表
axes[1].bar(groups, similarity_mean, color="#4ECDC4", alpha=0.8)
axes[1].set_title("相似性 (%)")
axes[1].set_xlabel("分组(样本数)")
axes[1].set_xticklabels([f"{g}\n(n={c})" for g, c in zip(groups, sample_count)], rotation=45, ha="right")
axes[1].grid(axis="y", linestyle="--", alpha=0.5)
for i, v in enumerate(similarity_mean):
axes[1].text(i, v+1, f"{v:.1f}", ha="center", va="bottom")
# 3. 缺口比例图表
axes[2].bar(groups, gaps_mean, color="#45B7D1", alpha=0.8)
axes[2].set_title("缺口比例 (%)")
axes[2].set_xlabel("分组(样本数)")
axes[2].set_xticklabels([f"{g}\n(n={c})" for g, c in zip(groups, sample_count)], rotation=45, ha="right")
axes[2].grid(axis="y", linestyle="--", alpha=0.5)
for i, v in enumerate(gaps_mean):
axes[2].text(i, v+1, f"{v:.1f}", ha="center", va="bottom")
# 4. 得分图表
axes[3].bar(groups, stats_result[("score", "mean")].tolist(), color="#96CEB4", alpha=0.8)
axes[3].set_title("得分")
axes[3].set_xlabel("分组(样本数)")
axes[3].set_xticklabels([f"{g}\n(n={c})" for g, c in zip(groups, sample_count)], rotation=45, ha="right")
axes[3].grid(axis="y", linestyle="--", alpha=0.5)
for i, v in enumerate(stats_result[("score", "mean")].tolist()):
axes[3].text(i, v + (max(stats_result[("score", "mean")].tolist())*0.05), f"{v:.0f}", ha="center", va="bottom")
plt.tight_layout()
plt.savefig(OUTPUT_PLOT, dpi=300, bbox_inches="tight")
print(f"图表已保存 → {OUTPUT_PLOT}")
print("\n=== 所有流程完成! ===")
print(f"1. 合并数据:{OUTPUT_CSV}")
print(f"2. 统计图表:{OUTPUT_PLOT}")
if __name__ == "__main__":
main()