DDPG:
“DQN 的连续动作版 + Actor-Critic”。

import random
from dataclasses import dataclass
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
# ============================================================
# 1) 最简单的连续控制环境(不用 gym)
# ============================================================
class Simple1DEnv:
"""
目标:把状态 x 调到 0 附近(连续动作控制)
reset() -> state
返回初始状态 state = [x]
step(action) -> next_state, reward, done, info
next_state (sp): 执行动作后得到的下一状态(s')
reward (r): 本步奖励
done: 当前 episode 是否结束
info: 额外信息字典(调试用,我们这里返回空 {})
状态 state: [x] (shape=(1,))
动作 action: a ∈ [-1, 1] (连续)
转移: x <- x + a + noise
奖励: r = -(x^2) (越接近 0 越好)
终止: 走满 max_steps 就 done=True
"""
def __init__(self, max_steps=50):
self.max_steps = max_steps
self.reset()
def reset(self):
# 随机给一个初始 x
self.x = np.random.uniform(-2.0, 2.0)
self.t = 0 # 当前 episode 已走步数
return np.array([self.x], dtype=np.float32)
def step(self, a):
# 1) 限制动作范围到 [-1, 1]
a = float(np.clip(a, -1.0, 1.0))
# 2) 环境带一点噪声(模拟真实控制的不确定性)
noise = np.random.normal(0, 0.02)
# 3) 状态更新:执行动作后得到新 x
self.x = self.x + a + noise
self.t += 1
# 4) 奖励:离 0 越近越好
reward = -(self.x ** 2)
# 5) 是否结束:达到 max_steps 就结束一局
done = (self.t >= self.max_steps)
# 6) next_state:同样用向量表示
next_state = np.array([self.x], dtype=np.float32)
# 7) info:可放调试信息,我们这里不需要
info = {}
# 返回顺序:sp, r, done, info
return next_state, float(reward), done, info
# ============================================================
# 2) Replay Buffer:存储经验 (s, a, r, s', done)
# ============================================================
@dataclass
class Batch:
"""
一批训练样本(从 replay buffer 里采样得到)
s: [B, s_dim]
a: [B, a_dim]
r: [B, 1]
sp: [B, s_dim]
d: [B, 1] done 标志(1=结束,0=未结束)
"""
s: torch.Tensor
a: torch.Tensor
r: torch.Tensor
sp: torch.Tensor
d: torch.Tensor
class ReplayBuffer:
def __init__(self, capacity=100000):
self.capacity = capacity
self.data = []
self.ptr = 0 # 用于循环覆盖
def push(self, s, a, r, sp, done):
"""
s: 当前状态 (np array shape=(1,))
a: 动作 (np array shape=(1,))
r: 奖励 (float)
sp: 下一状态 (np array shape=(1,))
done: 是否终止 (float 0/1)
"""
item = (s, a, r, sp, done)
if len(self.data) < self.capacity:
self.data.append(item)
else:
self.data[self.ptr] = item
self.ptr = (self.ptr + 1) % self.capacity
def sample(self, batch_size):
"""
随机采样 batch_size 条经验,并转为 torch.Tensor
"""
batch = random.sample(self.data, batch_size)
s, a, r, sp, d = map(np.array, zip(*batch))
# r 和 done 变成 [B,1] 更方便做公式运算
return Batch(
s=torch.tensor(s, dtype=torch.float32), # [B,1]
a=torch.tensor(a, dtype=torch.float32), # [B,1]
r=torch.tensor(r, dtype=torch.float32).unsqueeze(-1), # [B,1]
sp=torch.tensor(sp, dtype=torch.float32), # [B,1]
d=torch.tensor(d, dtype=torch.float32).unsqueeze(-1), # [B,1]
)
def __len__(self):
return len(self.data)
# ============================================================
# 3) Actor / Critic 网络
# ============================================================
class Actor(nn.Module):
"""
Actor:输入状态 s,输出动作 a(确定性策略 μ(s))
输出用 tanh 限制到 [-1, 1]
"""
def __init__(self, s_dim=1, a_dim=1, hidden=64):
super().__init__()
self.net = nn.Sequential(
nn.Linear(s_dim, hidden), nn.ReLU(),
nn.Linear(hidden, hidden), nn.ReLU(),
nn.Linear(hidden, a_dim), nn.Tanh()
)
def forward(self, s):
return self.net(s)
class Critic(nn.Module):
"""
Critic:输入 (s, a),输出 Q(s,a)
"""
def __init__(self, s_dim=1, a_dim=1, hidden=64):
super().__init__()
self.net = nn.Sequential(
nn.Linear(s_dim + a_dim, hidden), nn.ReLU(),
nn.Linear(hidden, hidden), nn.ReLU(),
nn.Linear(hidden, 1)
)
def forward(self, s, a):
x = torch.cat([s, a], dim=-1) # 拼接成 [B, s_dim+a_dim]
return self.net(x) # 输出 [B,1]
# ============================================================
# 4) target 网络软更新:θ' ← τθ + (1-τ)θ'
# ============================================================
@torch.no_grad()
def soft_update(target: nn.Module, online: nn.Module, tau: float):
for tp, p in zip(target.parameters(), online.parameters()):
tp.data.mul_(1 - tau)
tp.data.add_(tau * p.data)
# ============================================================
# 5) DDPG 训练
# ============================================================
def train_ddpg(
episodes=200,
max_steps=50,
gamma=0.99, # 折扣因子
tau=0.01, # target 软更新速度
batch_size=64,
warmup=500, # replay buffer 先攒一些经验再训练
explore_std=0.2, # 探索噪声标准差
seed=0,
debug_first_steps=0 # >0 时打印前 N 步交互内容,帮助理解 sp/r/done
):
# 固定随机种子,便于复现
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
env = Simple1DEnv(max_steps=max_steps)
buf = ReplayBuffer(capacity=50000)
# online 网络(用于训练)
actor = Actor()
critic = Critic()
# target 网络(用于计算更稳定的 target)
actor_t = Actor()
critic_t = Critic()
actor_t.load_state_dict(actor.state_dict())
critic_t.load_state_dict(critic.state_dict())
# 优化器
act_opt = optim.Adam(actor.parameters(), lr=1e-3)
crt_opt = optim.Adam(critic.parameters(), lr=1e-3)
total_env_steps = 0
for ep in range(1, episodes + 1):
# reset 得到初始状态 s
s = env.reset()
ep_return = 0.0
for step in range(max_steps):
total_env_steps += 1
# ------------------------------------------------------------
# A) 用 actor 产生动作(确定性动作) + 噪声(用于探索)
# ------------------------------------------------------------
with torch.no_grad():
# s shape (1,) -> tensor [1,1]
s_tensor = torch.tensor(s, dtype=torch.float32).unsqueeze(0)
a = actor(s_tensor).squeeze(0).numpy() # shape (1,)
# 加探索噪声:DDPG 最常见的简单探索方式
a = a + np.random.normal(0, explore_std, size=a.shape)
a = np.clip(a, -1.0, 1.0)
# ------------------------------------------------------------
# B) 与环境交互:step() 返回 (sp, r, done, info)
# ------------------------------------------------------------
# sp = next_state (s') 执行动作后新的状态
# r = reward 本步奖励
# done = 是否结束本局
# info = 额外信息(可忽略)
sp, r, done, info = env.step(a[0])
if debug_first_steps > 0 and total_env_steps <= debug_first_steps:
print(f"[debug step {total_env_steps}] s={s}, a={a}, sp={sp}, r={r:.4f}, done={done}, info={info}")
# 存经验到 replay buffer
buf.push(s, a.astype(np.float32), r, sp, float(done))
# 更新当前状态
s = sp
ep_return += r
# ------------------------------------------------------------
# C) 如果 buffer 足够大,就开始训练更新
# ------------------------------------------------------------
if len(buf) >= max(warmup, batch_size):
batch = buf.sample(batch_size)
# =========================
# 1) 更新 Critic
# 目标 y = r + γ*(1-done)*Q_target(s', actor_target(s'))
# =========================
with torch.no_grad():
ap = actor_t(batch.sp) # target actor 产生下一步动作 a'
q_target_next = critic_t(batch.sp, ap) # target critic 评估 Q(s',a')
y = batch.r + gamma * (1 - batch.d) * q_target_next
q = critic(batch.s, batch.a) # 当前 critic 的 Q(s,a)
critic_loss = ((q - y) ** 2).mean() # MSE
crt_opt.zero_grad()
critic_loss.backward()
crt_opt.step()
# =========================
# 2) 更新 Actor
# 让 actor 输出的动作在 critic 看来 Q 更大:
# maximize E[ Q(s, actor(s)) ]
# 等价于 minimize -E[ Q(s, actor(s)) ]
# =========================
actor_loss = -critic(batch.s, actor(batch.s)).mean()
act_opt.zero_grad()
actor_loss.backward()
act_opt.step()
# =========================
# 3) 软更新 target 网络
# =========================
soft_update(actor_t, actor, tau)
soft_update(critic_t, critic, tau)
if done:
break
# 打印训练进度:Return 越接近 0 越好(因为 reward=-(x^2) 是负数)
if ep % 10 == 0:
print(f"Episode {ep:4d} | Return {ep_return: .3f} | Buffer {len(buf)}")
print("Training finished.")
if __name__ == "__main__":
# debug_first_steps=10 会打印最开始 10 步的 s,a,sp,r,done,帮助你对 sp/r/done 有直观感受
train_ddpg(debug_first_steps=10)
TD3
因为在GR-RL模型当中提到了TD3算法, 这里我们将TD3算法原理做一个简单了解。
一、思想
用一个“演员”(Actor)输出连续动作,用两个“裁判”(两个 Critic)给动作打分;更新时取更保守的那个分数,并且裁判更新更勤、演员更新更慢,同时在计算目标值时给目标动作加点小噪声来“抹平”尖峰误差
二、流程
-
Actor网络: 策略网络
,
用来预测下一步动作 -
两个Critic网络:
,
,
估计价值(动作好不好) -
目标网络:
,
,
(
更慢的拷贝,用于稳定目标) -
经验回放池:Replay Buffer: 存(
)
步骤一 收集数据: 基于当前状态, 策略网络
预测产生动作
actor,为了探索通常加入探索噪音, 基于
执行得到动作
, 把
存入Replay Buffer
步骤二 采样计算TD3目标动作
随机抽取一批历史经验来训练,首先对目标动作会加"小噪音"并裁剪, 并裁剪到[-c,c]之间,(直觉:不要让 critic 只在某个特别尖的动作点上给出离谱高分;加点噪声等于在附近做“平均”,更稳。)
步骤三 用“双 Q 的最小值”算目标 Q
防止actor 被“骗”去追假的高分。取 min 更保守,能压住虚高
步骤四 高频更新Critic 延迟更新Actor
高频更新Critic
延迟更新Actor
这是 TD3 第三个重要点:actor 不每次都更新,比如每更新 critic 2 次才更新 actor 1 次(policy_delay=2),直觉:先让裁判更靠谱一点,再让演员根据裁判意见调整,不然演员会跟着“不靠谱裁判”乱跑。
通常
步骤五 软更新目标网络
每次(更新 actor 时通常也一起)做:


核心:
双 Critic 取最小值:抑制 Q 过高估计目标动作加噪声:目标更平滑,不追尖峰-
Actor 延迟更新:先把 critic 学稳再更新策略三、具体实例

