Datawhale AI春训营 RNA结构预测学习笔记

0 项目简介

RNA逆折叠是计算生物学中的一个核心概念,指设计特定的RNA序列,使其能够折叠成预定的目标RNA结构。这一过程与传统的RNA折叠(即从已知序列预测其结构)形成鲜明对比,因其在设计功能性RNA分子方面的广泛应用而备受关注,涉及临床医学、工业生产等多个领域。

基于给定的 RNA 三维骨架结构,生成一个 RNA 序列。评估标准是序列的恢复率 (recovery rate),即算法生成的 RNA 序列,在多大程度上与真实能够折叠成目标结构的 RNA 序列相似。恢复率越高,表明算法性能越好。

1 导入模块


import os

import numpy as np

import torch

import torch.nn as nn

import torch.optim as optim

from torch_geometric.data import Data, Batch

from torch_geometric.nn import GCNConv, global_mean_pool

import torch_geometric

from Bio import SeqIO

2 数据探索

数据探索性分析,是通过了解数据集,了解变量间的相互关系以及变量与预测值之间的关系,对已有数据在尽量少的先验假设下通过作图、制表、方程拟合、计算特征量等手段探索数据的结构和规律的一种数据分析方法,从而帮助我们后期更好地进行特征工程和建立模型,是机器学习中十分重要的一步。

# 数据探索性分析模块
# 查看coords seqs文件有多个个

seqs=glob.glob("./RNAdesignv1/train/seqs/*.fasta")
coords=glob.glob("./RNAdesignv1/train/coords/*.npy")

print(f"RNA序列文件有 {len(seqs)}个, 坐标文件有 {len(coords)}个")
#RNA序列文件有 2317个, 坐标文件有 2317个

#查看npy的数据类型
import numpy as np

file_path= "./RNAdesignv1/train/coords/1A9N_1_Q.npy"
data = np.load(file_path)

print(data.shape)
#(24, 7, 3)
# 对RNA序列的长度信息进行统计
import os
from Bio import SeqIO
import matplotlib.pyplot as plt

def calculate_sequence_lengths(folder_path):
    sequence_lengths = []
    for filename in os.listdir(folder_path):
        if filename.endswith('.fasta'):
            file_path = os.path.join(folder_path, filename)
            try:
                for record in SeqIO.parse(file_path, 'fasta'):
                    sequence_lengths.append(len(record.seq))
            except Exception as e:
                print(f"Error reading {file_path}: {e}")
    return sequence_lengths

def plot_histogram(sequence_lengths):
    plt.figure(figsize=(10, 6))
    plt.hist(sequence_lengths, bins=50, edgecolor='black')
    plt.title('Distribution of RNA Sequence Lengths')
    plt.xlabel('Sequence Length')
    plt.ylabel('Frequency')
    plt.grid(True)
    plt.show()

if __name__ == "__main__":
    folder_path = './RNAdesignv1/train/seqs/'  # 请将此路径替换为你的文件夹路径
    lengths = calculate_sequence_lengths(folder_path)
    print(lengths)
    plot_histogram(lengths)

3 特征构建

# 图结构数据生成器
class RNAGraphBuilder:
    @staticmethod
    def build_graph(coord, seq):
        """将坐标和序列转换为图结构"""
        num_nodes = coord.shape[0]
        
        # 节点特征:展平每个节点的7个骨架点坐标
        x = torch.tensor(coord.reshape(num_nodes, -1), dtype=torch.float32)  # [N, 7*3]
        
        # 边构建:基于序列顺序的k近邻连接
        edge_index = []
        for i in range(num_nodes):
            # 连接前k和后k个节点
            neighbors = list(range(max(0, i-Config.k_neighbors), i)) + \
                       list(range(i+1, min(num_nodes, i+1+Config.k_neighbors)))
            for j in neighbors:
                edge_index.append([i, j])
                edge_index.append([j, i])  # 双向连接
        
        edge_index = torch.tensor(edge_index, dtype=torch.long).t().contiguous()
        
        # 节点标签
        y = torch.tensor([Config.seq_vocab.index(c) for c in seq], dtype=torch.long)
        
        return Data(x=x, edge_index=edge_index, y=y, num_nodes=num_nodes)

4 模型训练与验证

# 模型训练与验证
class RNADataset(torch.utils.data.Dataset):
    def __init__(self, coords_dir, seqs_dir):
        self.samples = []
        
        # 读取所有数据并转换为图
        for fname in os.listdir(coords_dir):
            # 加载坐标数据
            coord = np.load(os.path.join(coords_dir, fname))  # [L, 7, 3]
            coord = np.nan_to_num(coord, nan=0.0)  # 新增行:将NaN替换为0
            # 加载对应序列
            seq_id = os.path.splitext(fname)[0]
            seq = next(SeqIO.parse(os.path.join(seqs_dir, f"{seq_id}.fasta"), "fasta")).seq
            
            # 转换为图结构
            graph = RNAGraphBuilder.build_graph(coord, str(seq))
            self.samples.append(graph)

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        return self.samples[idx]

# 简单GNN模型
class GNNModel(nn.Module):
    def __init__(self):
        super().__init__()
        # 特征编码
        self.encoder = nn.Sequential(
            nn.Linear(7*3, Config.hidden_dim),
            nn.ReLU()
        )
        
        # GNN层
        self.conv1 = GCNConv(Config.hidden_dim, Config.hidden_dim)
        self.conv2 = GCNConv(Config.hidden_dim, Config.hidden_dim)
        
        # 分类头
        self.cls_head = nn.Sequential(
            nn.Linear(Config.hidden_dim, len(Config.seq_vocab))
        )
        
    def forward(self, data):
        # 节点特征编码
        x = self.encoder(data.x)  # [N, hidden]
        
        # 图卷积
        x = self.conv1(x, data.edge_index)
        x = torch.relu(x)
        x = self.conv2(x, data.edge_index)
        x = torch.relu(x)
        
        # 节点分类
        logits = self.cls_head(x)  # [N, 4]
        return logits

# 训练函数
def train(model, loader, optimizer, criterion):
    model.train()
    total_loss = 0
    for batch in loader:
        batch = batch.to(Config.device)
        optimizer.zero_grad()
        
        # 前向传播
        logits = model(batch)
        
        # 计算损失
        loss = criterion(logits, batch.y)
        
        # 反向传播
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()
    return total_loss / len(loader)

# 评估函数
def evaluate(model, loader):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for batch in loader:
            batch = batch.to(Config.device)
            logits = model(batch)
            preds = logits.argmax(dim=1)
            correct += (preds == batch.y).sum().item()
            total += batch.y.size(0)
    return correct / total

# 主流程
if __name__ == "__main__":
    # 设置随机种子
    torch.manual_seed(Config.seed)
    
    # 加载数据集
    full_dataset = RNADataset("./RNAdesignv1/train/coords", "./RNAdesignv1/train/seqs")
    
    # 划分数据集
    train_size = int(0.8 * len(full_dataset))
    val_size = (len(full_dataset) - train_size) // 2
    test_size = len(full_dataset) - train_size - val_size
    train_set, val_set, test_set = torch.utils.data.random_split(
        full_dataset, [train_size, val_size, test_size])
    
    # 创建DataLoader
    train_loader = torch_geometric.loader.DataLoader(
        train_set, batch_size=Config.batch_size, shuffle=True)
    val_loader = torch_geometric.loader.DataLoader(val_set, batch_size=Config.batch_size)
    test_loader = torch_geometric.loader.DataLoader(test_set, batch_size=Config.batch_size)
    
    # 初始化模型
    model = GNNModel().to(Config.device)
    optimizer = optim.Adam(model.parameters(), lr=Config.lr)
    criterion = nn.CrossEntropyLoss()
    
    # 训练循环
    best_acc = 0
    for epoch in range(Config.epochs):
        train_loss = train(model, train_loader, optimizer, criterion)
        val_acc = evaluate(model, val_loader)
        
        print(f"Epoch {epoch+1}/{Config.epochs}")
        print(f"Train Loss: {train_loss:.4f} | Val Acc: {val_acc:.4f}")
        
        # 保存最佳模型
        if val_acc > best_acc:
            best_acc = val_acc
            torch.save(model.state_dict(), "best_gnn_model.pth")
    
    # 最终测试
    model.load_state_dict(torch.load("best_gnn_model.pth",weights_only=True))
    test_acc = evaluate(model, test_loader)
    print(f"\nTest Accuracy: {test_acc:.4f}")

5 结果输出

#输出赛题提交格式的结果
import numpy as np
import torch
import torch.nn as nn
import os
from torch_geometric.nn import GCNConv
import torch.nn.functional as F
from torch_geometric.data import Data
import pandas as pd
import glob

class Config:
    seed = 42
    device = "cuda:7" if torch.cuda.is_available() else "cpu"
    batch_size = 32
    lr = 0.01
    epochs = 20
    seq_vocab = "AUCG"
    coord_dims = 7  # 7个骨架点
    hidden_dim = 128
    k_neighbors = 5  # 每个节点的近邻数

class GNNModel(nn.Module):
    def __init__(self):
        super().__init__()
        # 特征编码
        self.encoder = nn.Sequential(
            nn.Linear(7*3, Config.hidden_dim),
            nn.ReLU()
        )

        # GNN层
        self.conv1 = GCNConv(Config.hidden_dim, Config.hidden_dim)
        self.conv2 = GCNConv(Config.hidden_dim, Config.hidden_dim)

        # 分类头
        self.cls_head = nn.Sequential(
            nn.Linear(Config.hidden_dim, len(Config.seq_vocab))
        )

    def forward(self, data):
        # 节点特征编码
        x = self.encoder(data.x)  # [N, hidden]

        # 图卷积
        x = self.conv1(x, data.edge_index)
        x = torch.relu(x)
        x = self.conv2(x, data.edge_index)
        x = torch.relu(x)

        # 节点分类
        logits = self.cls_head(x)  # [N, 4]
        return logits

class RNAGraphBuilder:
    @staticmethod
    def build_graph(coord, seq):
        """将坐标和序列转换为图结构"""
        num_nodes = coord.shape[0]

        # 节点特征:展平每个节点的7个骨架点坐标
        x = torch.tensor(coord.reshape(num_nodes, -1), dtype=torch.float32)  # [N, 7*3]

        # 边构建:基于序列顺序的k近邻连接
        edge_index = []
        for i in range(num_nodes):
            # 连接前k和后k个节点
            neighbors = list(range(max(0, i-Config.k_neighbors), i)) + \
                       list(range(i+1, min(num_nodes, i+1+Config.k_neighbors)))
            for j in neighbors:
                edge_index.append([i, j])
                edge_index.append([j, i])  # 双向连接

        edge_index = torch.tensor(edge_index, dtype=torch.long).t().contiguous()

        # 节点标签
        y = torch.tensor([Config.seq_vocab.index(c) for c in seq], dtype=torch.long)

        return Data(x=x, edge_index=edge_index, y=y, num_nodes=num_nodes)

class RNASequenceGenerator:
    def __init__(self, model_path):
        self.model = GNNModel().to(Config.device)
        self.model.load_state_dict(
            torch.load(model_path, map_location=Config.device, weights_only=True)
        )
        self.model.eval()

    def generate_sequences(self, coord_data, num_seq=5, temperature=1.0, top_k=3):
        """
        生成候选RNA序列
        :param coord_data: numpy数组 [L, 7, 3]
        :param num_seq: 需要生成的序列数量
        :param temperature: 温度参数控制多样性
        :param top_k: 每个位置只考虑top_k高概率的碱基
        :return: 生成的序列列表
        """
        # 转换为图数据
        graph = self._preprocess_data(coord_data)
        graph = graph.to(Config.device)  # 关键修复:转移数据到模型设备
        # 获取概率分布
        with torch.no_grad():
            logits = self.model(graph)
            probs = F.softmax(logits / temperature, dim=1)  # [L, 4]

        # 生成候选序列
        sequences = set()
        while len(sequences) < num_seq:
            seq = self._sample_sequence(probs, top_k)
            sequences.add(seq)

        return list(sequences)[:num_seq]

    def _preprocess_data(self, coord):
        """预处理坐标数据为图结构"""
        # 创建伪序列(实际不会被使用)
        dummy_seq = "A" * coord.shape[0]
        return RNAGraphBuilder.build_graph(coord, dummy_seq)

    def _sample_sequence(self, probs, top_k):
        """基于概率分布进行采样"""
        seq = []
        for node_probs in probs:
            # 应用top-k筛选
            topk_probs, topk_indices = torch.topk(node_probs, top_k)
            # 重新归一化
            norm_probs = topk_probs / topk_probs.sum()
            # 采样
            chosen = np.random.choice(topk_indices.cpu().numpy(), p=norm_probs.cpu().numpy())
            seq.append(Config.seq_vocab[chosen])
        return "".join(seq)

# 使用示例
if __name__ == "__main__":
    # 加载生成器
    generator = RNASequenceGenerator("best_gnn_model.pth")
    result={
     "pdb_id":[],
     "seq":[]
    }
    # 示例输入(替换为真实骨架坐标)
    npy_files=glob.glob("./saisdata/coords/*.npy")
    for npy in npy_files:
        id_name=os.path.basename(npy).split(".")[0]
        coord = np.load(npy)  # [L, 7, 3]
        coord = np.nan_to_num(coord, nan=0.0)  # 新增行:将NaN替换为0

        # 生成1个候选序列
        candidates = generator.generate_sequences(
            coord,
            num_seq=1,
            temperature=0.8,  # 适度多样性
            top_k=4          # 每个位置考虑前4个可能
        )
        result["pdb_id"].append(id_name)
        result["seq"].append(candidates[0])
    result=pd.DataFrame(result)
    result.to_csv("./saisresult/submit.csv",index=False)
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容