0 项目简介
RNA逆折叠是计算生物学中的一个核心概念,指设计特定的RNA序列,使其能够折叠成预定的目标RNA结构。这一过程与传统的RNA折叠(即从已知序列预测其结构)形成鲜明对比,因其在设计功能性RNA分子方面的广泛应用而备受关注,涉及临床医学、工业生产等多个领域。
基于给定的 RNA 三维骨架结构,生成一个 RNA 序列。评估标准是序列的恢复率 (recovery rate),即算法生成的 RNA 序列,在多大程度上与真实能够折叠成目标结构的 RNA 序列相似。恢复率越高,表明算法性能越好。
1 导入模块
import os
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch_geometric.data import Data, Batch
from torch_geometric.nn import GCNConv, global_mean_pool
import torch_geometric
from Bio import SeqIO
2 数据探索
数据探索性分析,是通过了解数据集,了解变量间的相互关系以及变量与预测值之间的关系,对已有数据在尽量少的先验假设下通过作图、制表、方程拟合、计算特征量等手段探索数据的结构和规律的一种数据分析方法,从而帮助我们后期更好地进行特征工程和建立模型,是机器学习中十分重要的一步。
# 数据探索性分析模块
# 查看coords seqs文件有多个个
seqs=glob.glob("./RNAdesignv1/train/seqs/*.fasta")
coords=glob.glob("./RNAdesignv1/train/coords/*.npy")
print(f"RNA序列文件有 {len(seqs)}个, 坐标文件有 {len(coords)}个")
#RNA序列文件有 2317个, 坐标文件有 2317个
#查看npy的数据类型
import numpy as np
file_path= "./RNAdesignv1/train/coords/1A9N_1_Q.npy"
data = np.load(file_path)
print(data.shape)
#(24, 7, 3)
# 对RNA序列的长度信息进行统计
import os
from Bio import SeqIO
import matplotlib.pyplot as plt
def calculate_sequence_lengths(folder_path):
sequence_lengths = []
for filename in os.listdir(folder_path):
if filename.endswith('.fasta'):
file_path = os.path.join(folder_path, filename)
try:
for record in SeqIO.parse(file_path, 'fasta'):
sequence_lengths.append(len(record.seq))
except Exception as e:
print(f"Error reading {file_path}: {e}")
return sequence_lengths
def plot_histogram(sequence_lengths):
plt.figure(figsize=(10, 6))
plt.hist(sequence_lengths, bins=50, edgecolor='black')
plt.title('Distribution of RNA Sequence Lengths')
plt.xlabel('Sequence Length')
plt.ylabel('Frequency')
plt.grid(True)
plt.show()
if __name__ == "__main__":
folder_path = './RNAdesignv1/train/seqs/' # 请将此路径替换为你的文件夹路径
lengths = calculate_sequence_lengths(folder_path)
print(lengths)
plot_histogram(lengths)
3 特征构建
# 图结构数据生成器
class RNAGraphBuilder:
@staticmethod
def build_graph(coord, seq):
"""将坐标和序列转换为图结构"""
num_nodes = coord.shape[0]
# 节点特征:展平每个节点的7个骨架点坐标
x = torch.tensor(coord.reshape(num_nodes, -1), dtype=torch.float32) # [N, 7*3]
# 边构建:基于序列顺序的k近邻连接
edge_index = []
for i in range(num_nodes):
# 连接前k和后k个节点
neighbors = list(range(max(0, i-Config.k_neighbors), i)) + \
list(range(i+1, min(num_nodes, i+1+Config.k_neighbors)))
for j in neighbors:
edge_index.append([i, j])
edge_index.append([j, i]) # 双向连接
edge_index = torch.tensor(edge_index, dtype=torch.long).t().contiguous()
# 节点标签
y = torch.tensor([Config.seq_vocab.index(c) for c in seq], dtype=torch.long)
return Data(x=x, edge_index=edge_index, y=y, num_nodes=num_nodes)
4 模型训练与验证
# 模型训练与验证
class RNADataset(torch.utils.data.Dataset):
def __init__(self, coords_dir, seqs_dir):
self.samples = []
# 读取所有数据并转换为图
for fname in os.listdir(coords_dir):
# 加载坐标数据
coord = np.load(os.path.join(coords_dir, fname)) # [L, 7, 3]
coord = np.nan_to_num(coord, nan=0.0) # 新增行:将NaN替换为0
# 加载对应序列
seq_id = os.path.splitext(fname)[0]
seq = next(SeqIO.parse(os.path.join(seqs_dir, f"{seq_id}.fasta"), "fasta")).seq
# 转换为图结构
graph = RNAGraphBuilder.build_graph(coord, str(seq))
self.samples.append(graph)
def __len__(self):
return len(self.samples)
def __getitem__(self, idx):
return self.samples[idx]
# 简单GNN模型
class GNNModel(nn.Module):
def __init__(self):
super().__init__()
# 特征编码
self.encoder = nn.Sequential(
nn.Linear(7*3, Config.hidden_dim),
nn.ReLU()
)
# GNN层
self.conv1 = GCNConv(Config.hidden_dim, Config.hidden_dim)
self.conv2 = GCNConv(Config.hidden_dim, Config.hidden_dim)
# 分类头
self.cls_head = nn.Sequential(
nn.Linear(Config.hidden_dim, len(Config.seq_vocab))
)
def forward(self, data):
# 节点特征编码
x = self.encoder(data.x) # [N, hidden]
# 图卷积
x = self.conv1(x, data.edge_index)
x = torch.relu(x)
x = self.conv2(x, data.edge_index)
x = torch.relu(x)
# 节点分类
logits = self.cls_head(x) # [N, 4]
return logits
# 训练函数
def train(model, loader, optimizer, criterion):
model.train()
total_loss = 0
for batch in loader:
batch = batch.to(Config.device)
optimizer.zero_grad()
# 前向传播
logits = model(batch)
# 计算损失
loss = criterion(logits, batch.y)
# 反向传播
loss.backward()
optimizer.step()
total_loss += loss.item()
return total_loss / len(loader)
# 评估函数
def evaluate(model, loader):
model.eval()
correct = 0
total = 0
with torch.no_grad():
for batch in loader:
batch = batch.to(Config.device)
logits = model(batch)
preds = logits.argmax(dim=1)
correct += (preds == batch.y).sum().item()
total += batch.y.size(0)
return correct / total
# 主流程
if __name__ == "__main__":
# 设置随机种子
torch.manual_seed(Config.seed)
# 加载数据集
full_dataset = RNADataset("./RNAdesignv1/train/coords", "./RNAdesignv1/train/seqs")
# 划分数据集
train_size = int(0.8 * len(full_dataset))
val_size = (len(full_dataset) - train_size) // 2
test_size = len(full_dataset) - train_size - val_size
train_set, val_set, test_set = torch.utils.data.random_split(
full_dataset, [train_size, val_size, test_size])
# 创建DataLoader
train_loader = torch_geometric.loader.DataLoader(
train_set, batch_size=Config.batch_size, shuffle=True)
val_loader = torch_geometric.loader.DataLoader(val_set, batch_size=Config.batch_size)
test_loader = torch_geometric.loader.DataLoader(test_set, batch_size=Config.batch_size)
# 初始化模型
model = GNNModel().to(Config.device)
optimizer = optim.Adam(model.parameters(), lr=Config.lr)
criterion = nn.CrossEntropyLoss()
# 训练循环
best_acc = 0
for epoch in range(Config.epochs):
train_loss = train(model, train_loader, optimizer, criterion)
val_acc = evaluate(model, val_loader)
print(f"Epoch {epoch+1}/{Config.epochs}")
print(f"Train Loss: {train_loss:.4f} | Val Acc: {val_acc:.4f}")
# 保存最佳模型
if val_acc > best_acc:
best_acc = val_acc
torch.save(model.state_dict(), "best_gnn_model.pth")
# 最终测试
model.load_state_dict(torch.load("best_gnn_model.pth",weights_only=True))
test_acc = evaluate(model, test_loader)
print(f"\nTest Accuracy: {test_acc:.4f}")
5 结果输出
#输出赛题提交格式的结果
import numpy as np
import torch
import torch.nn as nn
import os
from torch_geometric.nn import GCNConv
import torch.nn.functional as F
from torch_geometric.data import Data
import pandas as pd
import glob
class Config:
seed = 42
device = "cuda:7" if torch.cuda.is_available() else "cpu"
batch_size = 32
lr = 0.01
epochs = 20
seq_vocab = "AUCG"
coord_dims = 7 # 7个骨架点
hidden_dim = 128
k_neighbors = 5 # 每个节点的近邻数
class GNNModel(nn.Module):
def __init__(self):
super().__init__()
# 特征编码
self.encoder = nn.Sequential(
nn.Linear(7*3, Config.hidden_dim),
nn.ReLU()
)
# GNN层
self.conv1 = GCNConv(Config.hidden_dim, Config.hidden_dim)
self.conv2 = GCNConv(Config.hidden_dim, Config.hidden_dim)
# 分类头
self.cls_head = nn.Sequential(
nn.Linear(Config.hidden_dim, len(Config.seq_vocab))
)
def forward(self, data):
# 节点特征编码
x = self.encoder(data.x) # [N, hidden]
# 图卷积
x = self.conv1(x, data.edge_index)
x = torch.relu(x)
x = self.conv2(x, data.edge_index)
x = torch.relu(x)
# 节点分类
logits = self.cls_head(x) # [N, 4]
return logits
class RNAGraphBuilder:
@staticmethod
def build_graph(coord, seq):
"""将坐标和序列转换为图结构"""
num_nodes = coord.shape[0]
# 节点特征:展平每个节点的7个骨架点坐标
x = torch.tensor(coord.reshape(num_nodes, -1), dtype=torch.float32) # [N, 7*3]
# 边构建:基于序列顺序的k近邻连接
edge_index = []
for i in range(num_nodes):
# 连接前k和后k个节点
neighbors = list(range(max(0, i-Config.k_neighbors), i)) + \
list(range(i+1, min(num_nodes, i+1+Config.k_neighbors)))
for j in neighbors:
edge_index.append([i, j])
edge_index.append([j, i]) # 双向连接
edge_index = torch.tensor(edge_index, dtype=torch.long).t().contiguous()
# 节点标签
y = torch.tensor([Config.seq_vocab.index(c) for c in seq], dtype=torch.long)
return Data(x=x, edge_index=edge_index, y=y, num_nodes=num_nodes)
class RNASequenceGenerator:
def __init__(self, model_path):
self.model = GNNModel().to(Config.device)
self.model.load_state_dict(
torch.load(model_path, map_location=Config.device, weights_only=True)
)
self.model.eval()
def generate_sequences(self, coord_data, num_seq=5, temperature=1.0, top_k=3):
"""
生成候选RNA序列
:param coord_data: numpy数组 [L, 7, 3]
:param num_seq: 需要生成的序列数量
:param temperature: 温度参数控制多样性
:param top_k: 每个位置只考虑top_k高概率的碱基
:return: 生成的序列列表
"""
# 转换为图数据
graph = self._preprocess_data(coord_data)
graph = graph.to(Config.device) # 关键修复:转移数据到模型设备
# 获取概率分布
with torch.no_grad():
logits = self.model(graph)
probs = F.softmax(logits / temperature, dim=1) # [L, 4]
# 生成候选序列
sequences = set()
while len(sequences) < num_seq:
seq = self._sample_sequence(probs, top_k)
sequences.add(seq)
return list(sequences)[:num_seq]
def _preprocess_data(self, coord):
"""预处理坐标数据为图结构"""
# 创建伪序列(实际不会被使用)
dummy_seq = "A" * coord.shape[0]
return RNAGraphBuilder.build_graph(coord, dummy_seq)
def _sample_sequence(self, probs, top_k):
"""基于概率分布进行采样"""
seq = []
for node_probs in probs:
# 应用top-k筛选
topk_probs, topk_indices = torch.topk(node_probs, top_k)
# 重新归一化
norm_probs = topk_probs / topk_probs.sum()
# 采样
chosen = np.random.choice(topk_indices.cpu().numpy(), p=norm_probs.cpu().numpy())
seq.append(Config.seq_vocab[chosen])
return "".join(seq)
# 使用示例
if __name__ == "__main__":
# 加载生成器
generator = RNASequenceGenerator("best_gnn_model.pth")
result={
"pdb_id":[],
"seq":[]
}
# 示例输入(替换为真实骨架坐标)
npy_files=glob.glob("./saisdata/coords/*.npy")
for npy in npy_files:
id_name=os.path.basename(npy).split(".")[0]
coord = np.load(npy) # [L, 7, 3]
coord = np.nan_to_num(coord, nan=0.0) # 新增行:将NaN替换为0
# 生成1个候选序列
candidates = generator.generate_sequences(
coord,
num_seq=1,
temperature=0.8, # 适度多样性
top_k=4 # 每个位置考虑前4个可能
)
result["pdb_id"].append(id_name)
result["seq"].append(candidates[0])
result=pd.DataFrame(result)
result.to_csv("./saisresult/submit.csv",index=False)