纯手工撸LLM:两个小时内的成果

关于LLM的一些技术点,目前都解开了,再重新手撸,加深印象。用AI学AI,让豆包按我的要求生成了如下的代码,直接手撸一次,运行成果。可以胡言乱语。如果提示词是语料的开头,它也可以跟着说下去的,这证明方案是OK的。

1,corpus.txt语料库文本文件

像如下这样的文本行,我收集了几百行。

我家的小猫喜欢抓老鼠,晚上的时候,它会蹲在墙角,一动不动,等老鼠出来,一下子扑上去,抓住老鼠就叼给我们看。
奶奶的院子里有几只麻雀,我在窗台上撒米,它们就会飞过来吃,有时候还会落在窗台上,歪着头看我写字。
我在河边看到一只小螃蟹,横着爬,有硬硬的壳,我把它放在手里,它会用钳子夹我的手指,轻轻的,一点都不疼。
邻居家的鹅带着小鹅去河里游泳,小鹅黄黄的,跟在大鹅后面,大鹅时不时地回头看看,怕小鹅走丢了。
我家的小乌龟喜欢晒太阳,我把它放在阳台上,它就会伸长脖子,四肢张开,晒得懒洋洋的,特别可爱。

2,vocab.py语汇表及语料库文件

# 词汇表:模拟20000词表(实际可通过BPE/WordPiece扩展)
# 包含语料中所有token + 特殊符号(<pad>填充, <unk>未知, <bos>开头, <eos>结尾)

class Vocab(object):
    def __init__(self, corpus):
        self.special_tokens = ['<pad>', '<unk>', '<bos>', '<eos>', ]
        self.token2id = {}
        self.id2token = {}
        self.build_vocab(corpus)

    def build_vocab(self, corpus):
        # 分词: 简单按字分,实际要用BPE等
        all_tokens = []
        for sentence in corpus:
            all_tokens.extend(list(sentence))
        # 去重并排序
        unique_tokens = list(set(all_tokens))
        unique_tokens.sort()

        # 加入特殊符号
        all_tokens = self.special_tokens + unique_tokens

        # 构建映射
        for idx, token in enumerate(all_tokens):
            self.token2id[token] = idx
            self.id2token[idx] = token

    def __len__(self):
        return len(self.token2id)

    def token_to_id(self, token):
        return self.token2id.get(token, self.token2id['<unk>'])

    def id_to_token(self, idx):
        return self.id2token.get(idx, '<unk>')

# 语料列表(示例50条,实际可扩展真实语料)

"""
corpus = [
    "春天到了,公园里的花都开了,桃花粉粉的,梨花白白的,蝴蝶在花丛中飞来飞去,小朋友们在草地上放风筝,笑声传遍了整个公园。",
    "早上起床后,我先喝一杯温水,然后刷牙洗脸,再吃妈妈煮的粥和鸡蛋,吃完早餐背着书包去学校,路上还能看到路边的小猫在晒太阳。"
]
"""
corpus = []
with open("copus.txt", 'r', encoding='utf-8') as f:
    corpus_line = f.readlines()
    for line in corpus_line:
        corpus.append(line.strip())

# 初始化词汇表(模拟20000词汇表,实际可扩展)
vocab = Vocab(corpus)
print(f'词汇表大小: {len(vocab)}')

if __name__ == '__main__':
    print(corpus)

3,模型框架文件

import math

import torch
import torch.nn as nn
import torch.nn.functional as F

class MultiHeadAttention(nn.Module):
    # 多头注意力:捕捉token间的依赖关系,比如,吃和苹果的动宾关系
    def __init__(self, d_model, num_heads):
        super().__init__()
        self.num_heads = num_heads
        self.d_k = d_model // num_heads # 每个头的维度

        # 线性层:将输入映射到Q/K/V(同层共享参数)
        self.w_q = nn.Linear(d_model, d_model)
        self.w_k = nn.Linear(d_model, d_model)
        self.w_v = nn.Linear(d_model, d_model)
        self.w_o = nn.Linear(d_model, d_model)

    def forward(self, x, mask=None):
        batch_size,seq_len, d_model = x.size()

        #1, 生成Q/K/V(batch_size, seq_len, d_model)
        q = self.w_q(x)
        k = self.w_k(x)
        v = self.w_v(x)

        # 2, 拆分多头: (batch_size, num_heads, seq_len, d_k)
        q = q.view(batch_size, seq_len, self.num_heads, self.d_k).transpose(1, 2)
        k = k.view(batch_size, seq_len, self.num_heads, self.d_k).transpose(1, 2)
        v = v.view(batch_size, seq_len, self.num_heads, self.d_k).transpose(1, 2)

        # 3, 计算注意力分数: Q*K^T / sqrt(d_k) (捕捉token间的相似度)
        scores = torch.matmul(q, k.transpose(-2, -1)) / torch.sqrt(torch.tensor(self.d_k, dtype=torch.float32))

        # 4,掩码(防止看到未来token,生成式模型必需)
        if mask is not None:
            scores = scores.masked_fill(mask == 0, -1e9)

        # 5,Softmax归一化,得到注意权重(权重越高,越关注对应原token)
        attn_weights = F.softmax(scores, dim=-1)

        # 6. 加权求和V,得到多头输出
        output = torch.matmul(attn_weights, v)

        # 7, 拼接多头,线性投影
        output = output.transpose(1, 2).contiguous().view(batch_size, seq_len, d_model)

        return output # 输出包括token间的关系特征

class FFN(nn.Module):
    # 煎饼网络,学习特征组合,比如春天+花开->生机勃勃的场景特征
    def __init__(self, d_model, d_ff):
        super().__init__()
        self.linear1 = nn.Linear(d_model, d_ff) # 升维,捕捉更复杂的特征
        self.linear2 = nn.Linear(d_ff, d_model)  # 降维,融合特征后回到原维度
        self.gelu = nn.GELU() # 非线性激活,让特征组合更灵活

    def forward(self, x):
        # 升维 ->激活 ->降维,学习token特征的非线性组合
        x = self.linear1(x) # 768 -> 2048扩展特征空间
        x = self.gelu(x)  # 非线性变性,比如 苹果 + 吃 -> 食物特征
        x = self.linear2(x)  # 2048 -> 768,融合特征后输出
        return x # 输出更抽象的复合特征

class DecoderLayer(nn.Module):
    # 单个decoder层,注意力(抓关系) + FFN (学特征)
    def __init__(self, d_model, num_heads, d_ff):
        super().__init__()
        self.self_attn = MultiHeadAttention(d_model, num_heads) # 自注意力,捕捉当前序列内的关系
        self.ffn = FFN(d_model, d_ff) # FFN, 学习特征组合
        self.norm1 = nn.LayerNorm(d_model)  # 归一化,稳定训练
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(p=0.1)

    def forward(self, x, mask):
        # 自注意力 + 残差连接(防止信息丢失)
        attn_output = self.self_attn(x, mask)
        x = self.norm1(x + self.dropout(attn_output)) # 残差,保留原始信息+新增关系特征

        # FFN + 残差连接(特征升华)
        ffn_output = self.ffn(x)
        x = self.norm2(x + self.dropout(ffn_output)) # 残差,保留关系特征+新增复合特征

        return x # 输出融合了关系和复合特征的向量

class GPT(nn.Module):
    # 极简生成模型Decode-only
    def __init__(self, vocab_size, d_model=128, num_heads=4, num_layers=3, d_ff=512):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, d_model)  # 词嵌入,token -> 高维向量
        self.pos_embedding = nn.Embedding(1024, d_model)  # 位置 嵌入, 捕捉token顺序
        self.layers = nn.ModuleList([
            DecoderLayer(d_model, num_heads, d_ff) for _ in range(num_layers)
        ])
        self.fc = nn.Linear(d_model, vocab_size)  # 输出层,特征->词汇表概率

    def generate_mask(self, seq_len):
        # 生成因果掩码,防止模型看到未来token
        mask = torch.tril(torch.ones(seq_len, seq_len))
        return mask

    def forward(self, x):
        batch_size, seq_len = x.size()

        # 1,词嵌入 + 位置嵌入 (保留token语义+顺序)
        token_emb = self.embedding(x)  # (batch, seq_len, d_model)
        pos_emb = self.pos_embedding(torch.arange(seq_len, device=x.device)) # (seq_len, d_model)
        x = token_emb + pos_emb  # 融合语义和位置

        #2, 生成因果掩码
        mask = self.generate_mask(seq_len).to(x.device)

        #3,逐层处理,每层都用注意力抓关系,FFN学特征
        for layer in self.layers:
            x = layer(x, mask)

        #4 ,输出层,高维特征->词汇表维度(每个token对应词汇表概率)
        logits = self.fc(x) # (batch, seq_len, vocab_size)

        return logits

4 训练文件,生成模型

import torch
import torch.nn as nn
import torch.optim as optim
from vocab import Vocab, corpus
from model import GPT

# 1, 数据预处理,语料->token ID序列
vocab = Vocab(corpus)
bos_id = vocab.token_to_id('<bos>') # 句子开头
eos_id = vocab.token_to_id('<eos>') # 句子结尾
pad_id = vocab.token_to_id('<pad>') # 填充

def preprocess(corpus, vocab, max_len=50):
    # 将语料转为模型输入: <bos> + Token序列 +<eos>,不足填写
    data = []
    for sentence in corpus:
        # 分词 + ID
        tokens = list(sentence)
        ids = [vocab.token_to_id(tokens) for tokens in tokens]
        # 加特殊符号
        ids = [bos_id] + ids + [eos_id]
        # 截断/填充
        if len(ids) > max_len:
            ids = ids[:max_len]
        else:
            ids +=  [pad_id] * (max_len - len(ids))
        data.append(ids)
    return torch.tensor(data)

# 预处理数据
train_data = preprocess(corpus, vocab)
print(f'训练数据形状:{train_data.shape}') # (50, 50): 50条语料,每条50个token

# 2, 初始化模型,优化器,损失函数
vocab_size = len(vocab)
model = GPT(vocab_size=vocab_size)
optimizer = optim.Adam(model.parameters(), lr=5e-4)
criterion = nn.CrossEntropyLoss(ignore_index=pad_id) # 忽略填充token的损失

# 3,多批次训练(参数全局共享,多批次更新)
batch_size = 8
epochs = 200

for epoch in range(epochs):
    model.train()
    total_loss = 0

    # 分批次训练
    for i in range(0, len(train_data), batch_size):
        batch = train_data[i:i + batch_size] # 取批次数据(共享模型参数)
        inputs = batch[:, :-1] # 输入,去年最后一个token,预测下一个
        targets = batch[:, 1:] # 目标: 去掉第一个token

        # 前向训练
        logits = model(inputs) # 所有批次共享同一个模型参数

        # 计算损失(logits: (batch, seq_len-1, vocab_size), targets: (batch, seq_len-1))
        loss = criterion(logits.reshape(-1, vocab_size), targets.reshape(-1))

        # 反向传播
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
    if (epoch + 1) % 10 == 0:
        print(f'Epoch {epoch + 1}, loss: {total_loss / len(train_data):.4f}')

# 保存模型
torch.save(model.state_dict(), 'gpt_model.pth')
print('模型训练完成并保存')


5,模拟生成

import torch
from vocab import Vocab, corpus
from model import GPT

# 加载词汇表和模型
vocab = Vocab(corpus)
vocab_size = len(vocab)

model = GPT(vocab_size=vocab_size)
model.load_state_dict(torch.load('gpt_model.pth', map_location=torch.device('cpu')))
model.eval() # 推理模式

def generate_text(model, vocab, prompt, max_len=50):
    # 生成文本,从prompt开始,逐token预测
    # 预处理promp: <bos> + token ID
    prompt_tokens = list(prompt)
    input_ids = [vocab.token_to_id('<bos>')] + [vocab.token_to_id(token) for token in prompt_tokens]
    input_ids = torch.tensor([input_ids], dtype=torch.long) # (1, seq_len)

    with torch.no_grad():
        remaining_len = max_len - input_ids.size(1)
        for _ in range(remaining_len):
            # 前向传播,获取当前序列的logits
            logits = model(input_ids) #(1,seq_len, vocab_size)
            # 取最后一个token的logits(预测下一个token)
            next_token_logits = logits[:, -1, :]  #(1, vocab_size)
            # Softmax -> 概率,选概率最高的token
            next_token_id = torch.argmax(next_token_logits, dim=-1, keepdim=True)
            # 加入输入序列
            input_ids = torch.cat([input_ids, next_token_id], dim=-1)
            # 遇到<eos>停止
            if next_token_id == vocab.token_to_id('eos'):
                break
    # 解码: ID->token
    generate_tokens =[vocab.id_to_token(idx) for idx in input_ids[0].tolist()]
    # 去掉特殊符号,拼接成文本
    generate_text = ''.join([t for t in generate_tokens if t not in ['<bos>', '<eos>', '<pad>']])
    return generate_text

# 测试生成
prompt = '蜜蜂就会飞出去采蜜'
generated = generate_text(model, vocab, prompt)
print(f'Prompt: {prompt}')
print(f'Generated: {generated}')

6,贴一个示例

image.png
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容