DDPG && TD3强化学习算法

DDPG:
“DQN 的连续动作版 + Actor-Critic”。

import random
from dataclasses import dataclass
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim

# ============================================================
# 1) 最简单的连续控制环境(不用 gym)
# ============================================================
class Simple1DEnv:
    """
    目标:把状态 x 调到 0 附近(连续动作控制)

    reset() -> state
      返回初始状态 state = [x]

    step(action) -> next_state, reward, done, info
      next_state (sp): 执行动作后得到的下一状态(s')
      reward (r):      本步奖励
      done:            当前 episode 是否结束
      info:            额外信息字典(调试用,我们这里返回空 {})

    状态 state:  [x]  (shape=(1,))
    动作 action: a ∈ [-1, 1]  (连续)
    转移: x <- x + a + noise
    奖励: r = -(x^2) (越接近 0 越好)
    终止: 走满 max_steps 就 done=True
    """
    def __init__(self, max_steps=50):
        self.max_steps = max_steps
        self.reset()

    def reset(self):
        # 随机给一个初始 x
        self.x = np.random.uniform(-2.0, 2.0)
        self.t = 0  # 当前 episode 已走步数
        return np.array([self.x], dtype=np.float32)

    def step(self, a):
        # 1) 限制动作范围到 [-1, 1]
        a = float(np.clip(a, -1.0, 1.0))

        # 2) 环境带一点噪声(模拟真实控制的不确定性)
        noise = np.random.normal(0, 0.02)

        # 3) 状态更新:执行动作后得到新 x
        self.x = self.x + a + noise
        self.t += 1

        # 4) 奖励:离 0 越近越好
        reward = -(self.x ** 2)

        # 5) 是否结束:达到 max_steps 就结束一局
        done = (self.t >= self.max_steps)

        # 6) next_state:同样用向量表示
        next_state = np.array([self.x], dtype=np.float32)

        # 7) info:可放调试信息,我们这里不需要
        info = {}

        # 返回顺序:sp, r, done, info
        return next_state, float(reward), done, info


# ============================================================
# 2) Replay Buffer:存储经验 (s, a, r, s', done)
# ============================================================
@dataclass
class Batch:
    """
    一批训练样本(从 replay buffer 里采样得到)
    s:  [B, s_dim]
    a:  [B, a_dim]
    r:  [B, 1]
    sp: [B, s_dim]
    d:  [B, 1]  done 标志(1=结束,0=未结束)
    """
    s: torch.Tensor
    a: torch.Tensor
    r: torch.Tensor
    sp: torch.Tensor
    d: torch.Tensor


class ReplayBuffer:
    def __init__(self, capacity=100000):
        self.capacity = capacity
        self.data = []
        self.ptr = 0  # 用于循环覆盖

    def push(self, s, a, r, sp, done):
        """
        s:    当前状态 (np array shape=(1,))
        a:    动作 (np array shape=(1,))
        r:    奖励 (float)
        sp:   下一状态 (np array shape=(1,))
        done: 是否终止 (float 0/1)
        """
        item = (s, a, r, sp, done)
        if len(self.data) < self.capacity:
            self.data.append(item)
        else:
            self.data[self.ptr] = item
            self.ptr = (self.ptr + 1) % self.capacity

    def sample(self, batch_size):
        """
        随机采样 batch_size 条经验,并转为 torch.Tensor
        """
        batch = random.sample(self.data, batch_size)
        s, a, r, sp, d = map(np.array, zip(*batch))

        # r 和 done 变成 [B,1] 更方便做公式运算
        return Batch(
            s=torch.tensor(s, dtype=torch.float32),                 # [B,1]
            a=torch.tensor(a, dtype=torch.float32),                 # [B,1]
            r=torch.tensor(r, dtype=torch.float32).unsqueeze(-1),   # [B,1]
            sp=torch.tensor(sp, dtype=torch.float32),               # [B,1]
            d=torch.tensor(d, dtype=torch.float32).unsqueeze(-1),   # [B,1]
        )

    def __len__(self):
        return len(self.data)


# ============================================================
# 3) Actor / Critic 网络
# ============================================================
class Actor(nn.Module):
    """
    Actor:输入状态 s,输出动作 a(确定性策略 μ(s))
    输出用 tanh 限制到 [-1, 1]
    """
    def __init__(self, s_dim=1, a_dim=1, hidden=64):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(s_dim, hidden), nn.ReLU(),
            nn.Linear(hidden, hidden), nn.ReLU(),
            nn.Linear(hidden, a_dim), nn.Tanh()
        )

    def forward(self, s):
        return self.net(s)


class Critic(nn.Module):
    """
    Critic:输入 (s, a),输出 Q(s,a)
    """
    def __init__(self, s_dim=1, a_dim=1, hidden=64):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(s_dim + a_dim, hidden), nn.ReLU(),
            nn.Linear(hidden, hidden), nn.ReLU(),
            nn.Linear(hidden, 1)
        )

    def forward(self, s, a):
        x = torch.cat([s, a], dim=-1)  # 拼接成 [B, s_dim+a_dim]
        return self.net(x)             # 输出 [B,1]


# ============================================================
# 4) target 网络软更新:θ' ← τθ + (1-τ)θ'
# ============================================================
@torch.no_grad()
def soft_update(target: nn.Module, online: nn.Module, tau: float):
    for tp, p in zip(target.parameters(), online.parameters()):
        tp.data.mul_(1 - tau)
        tp.data.add_(tau * p.data)


# ============================================================
# 5) DDPG 训练
# ============================================================
def train_ddpg(
    episodes=200,
    max_steps=50,
    gamma=0.99,        # 折扣因子
    tau=0.01,          # target 软更新速度
    batch_size=64,
    warmup=500,        # replay buffer 先攒一些经验再训练
    explore_std=0.2,   # 探索噪声标准差
    seed=0,
    debug_first_steps=0  # >0 时打印前 N 步交互内容,帮助理解 sp/r/done
):
    # 固定随机种子,便于复现
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)

    env = Simple1DEnv(max_steps=max_steps)
    buf = ReplayBuffer(capacity=50000)

    # online 网络(用于训练)
    actor = Actor()
    critic = Critic()

    # target 网络(用于计算更稳定的 target)
    actor_t = Actor()
    critic_t = Critic()
    actor_t.load_state_dict(actor.state_dict())
    critic_t.load_state_dict(critic.state_dict())

    # 优化器
    act_opt = optim.Adam(actor.parameters(), lr=1e-3)
    crt_opt = optim.Adam(critic.parameters(), lr=1e-3)

    total_env_steps = 0

    for ep in range(1, episodes + 1):
        # reset 得到初始状态 s
        s = env.reset()
        ep_return = 0.0

        for step in range(max_steps):
            total_env_steps += 1

            # ------------------------------------------------------------
            # A) 用 actor 产生动作(确定性动作) + 噪声(用于探索)
            # ------------------------------------------------------------
            with torch.no_grad():
                # s shape (1,) -> tensor [1,1]
                s_tensor = torch.tensor(s, dtype=torch.float32).unsqueeze(0)
                a = actor(s_tensor).squeeze(0).numpy()  # shape (1,)

            # 加探索噪声:DDPG 最常见的简单探索方式
            a = a + np.random.normal(0, explore_std, size=a.shape)
            a = np.clip(a, -1.0, 1.0)

            # ------------------------------------------------------------
            # B) 与环境交互:step() 返回 (sp, r, done, info)
            # ------------------------------------------------------------
            # sp   = next_state (s') 执行动作后新的状态
            # r    = reward 本步奖励
            # done = 是否结束本局
            # info = 额外信息(可忽略)
            sp, r, done, info = env.step(a[0])

            if debug_first_steps > 0 and total_env_steps <= debug_first_steps:
                print(f"[debug step {total_env_steps}] s={s}, a={a}, sp={sp}, r={r:.4f}, done={done}, info={info}")

            # 存经验到 replay buffer
            buf.push(s, a.astype(np.float32), r, sp, float(done))

            # 更新当前状态
            s = sp
            ep_return += r

            # ------------------------------------------------------------
            # C) 如果 buffer 足够大,就开始训练更新
            # ------------------------------------------------------------
            if len(buf) >= max(warmup, batch_size):
                batch = buf.sample(batch_size)

                # =========================
                # 1) 更新 Critic
                # 目标 y = r + γ*(1-done)*Q_target(s', actor_target(s'))
                # =========================
                with torch.no_grad():
                    ap = actor_t(batch.sp)                 # target actor 产生下一步动作 a'
                    q_target_next = critic_t(batch.sp, ap) # target critic 评估 Q(s',a')
                    y = batch.r + gamma * (1 - batch.d) * q_target_next

                q = critic(batch.s, batch.a)               # 当前 critic 的 Q(s,a)
                critic_loss = ((q - y) ** 2).mean()        # MSE

                crt_opt.zero_grad()
                critic_loss.backward()
                crt_opt.step()

                # =========================
                # 2) 更新 Actor
                # 让 actor 输出的动作在 critic 看来 Q 更大:
                # maximize E[ Q(s, actor(s)) ]
                # 等价于 minimize -E[ Q(s, actor(s)) ]
                # =========================
                actor_loss = -critic(batch.s, actor(batch.s)).mean()

                act_opt.zero_grad()
                actor_loss.backward()
                act_opt.step()

                # =========================
                # 3) 软更新 target 网络
                # =========================
                soft_update(actor_t, actor, tau)
                soft_update(critic_t, critic, tau)

            if done:
                break

        # 打印训练进度:Return 越接近 0 越好(因为 reward=-(x^2) 是负数)
        if ep % 10 == 0:
            print(f"Episode {ep:4d} | Return {ep_return: .3f} | Buffer {len(buf)}")

    print("Training finished.")


if __name__ == "__main__":
    # debug_first_steps=10 会打印最开始 10 步的 s,a,sp,r,done,帮助你对 sp/r/done 有直观感受
    train_ddpg(debug_first_steps=10)

TD3
因为在GR-RL模型当中提到了TD3算法, 这里我们将TD3算法原理做一个简单了解。

一、思想

用一个“演员”(Actor)输出连续动作,用两个“裁判”(两个 Critic)给动作打分;更新时取更保守的那个分数,并且裁判更新更勤、演员更新更慢,同时在计算目标值时给目标动作加点小噪声来“抹平”尖峰误差

二、流程

  • Actor网络: 策略网络\pi_{\theta}(s), 用来预测下一步动作
  • 两个Critic网络Q_{\phi1}(s,a), Q_{\phi2}(s,a), 估计价值(动作好不好)
  • 目标网络\phi_{\theta'}, Q_{\phi1'}, Q_{\phi2'}(更慢的拷贝,用于稳定目标)
  • 经验回放池:Replay Buffer: 存(s, a, r, s', done)

步骤一 收集数据: 基于当前状态s, 策略网络\phi_{\theta}(s)预测产生动作actor,为了探索通常加入探索噪音\alpha \Leftarrow \alpha+\epsilon, 基于\alpha执行得到动作r, s', done, 把(s,a,r,s',done)存入Replay Buffer

步骤二 采样计算TD3目标动作
随机抽取一批历史经验来训练,首先对目标动作会加"小噪音"并裁剪, 并裁剪到[-c,c]之间,(直觉:不要让 critic 只在某个特别尖的动作点上给出离谱高分;加点噪声等于在附近做“平均”,更稳。)
\tilde{a} = \pi_{\theta'}(s') + \text{clip}(\epsilon, -c, c)

步骤三 用“双 Q 的最小值”算目标 Q
防止actor 被“骗”去追假的高分。取 min 更保守,能压住虚高
y = r + \gamma(1 - \text{done}) \cdot \min\left(Q_{\phi_1}'(s', \tilde{a}), Q_{\phi_2}'(s', \tilde{a})\right)

步骤四 高频更新Critic 延迟更新Actor

  • 高频更新Critic
    \mathcal{L}(\phi_i) = \mathbb{E}\left[\left(Q_{\phi_i}(s,a) - y\right)^2\right],\ i=1,2

  • 延迟更新Actor
    这是 TD3 第三个重要点:actor 不每次都更新,比如每更新 critic 2 次才更新 actor 1 次(policy_delay=2), 直觉:先让裁判更靠谱一点,再让演员根据裁判意见调整,不然演员会跟着“不靠谱裁判”乱跑。
    \max_{\theta} \mathbb{E}\left[Q_{\phi_1}(s, \pi_{\theta}(s))\right]
    通常Q_{\phi1}来做梯度即可

步骤五 软更新目标网络
每次(更新 actor 时通常也一起)做:
\begin{aligned} \theta' &\leftarrow \tau\theta + (1-\tau)\theta' \\ \phi_i' &\leftarrow \tau\phi_i + (1-\tau)\phi_i' \end{aligned}


核心

  • 双 Critic 取最小值:抑制 Q 过高估计

  • 目标动作加噪声:目标更平滑,不追尖峰

  • Actor 延迟更新:先把 critic 学稳再更新策略

    三、具体实例


最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

友情链接更多精彩内容