前面说了 PPO 算法, 下面一鼓作气, 把其他的相关的强化学习也一并学习下。这里还是给出我学习的一些视频链接
视频链接: 不愧是顶会收割机!迪哥精讲强化学习4大主流算法:PPO、Q-learning、DQN、A3C 50集入门到精通!
一、 Q-Learning
1.1 Q-Learning思想
在说 DQN 算法之前先说下 Q-Learning 算法,该算法是当时谷歌发明的一种强化学习算法, 该算法是一种基于值函数的强化学习算法, 它通过学习一个状态-动作值函数(Q函数)来选择最优策略, Q-Learning是一种无模型(model-free)的强化学习方法, 即它不需要了解环境的状态(转移概率以及奖励函数),仅依赖于环境的交互。Q-Learning 的目标是通过不断更新 Q 值,使得 Agent 能够选择在给定状态下能获得
得最大累计奖励的动作,Q-Learning的一个重要特点是它保证在探索足够多的状态-动作对后, 最终会收敛到最优策略。
Q 函数的定义: Q-Learning中, Q 函数
表示在状态
下采取的动作
所能获得的期望回报。Q 函数是Q-Learning 的核心, 通过对
对Q值的不断更新,最终得到最优的 Q 函数
Q-Learning核心思想:
Q-Learning的核心思想就是通过贝尔曼方差来更新 Q 值, 贝尔曼方程描述了某一状态-动作对的 Q 值与其后续状态-动作之间的关系, 在 Q-Learning 中, 更新公式为:
- 其中
和
分别是当前状态和当前动作
是 Agent 在执行动作
后, 从环境中获得的及时奖励
是折扣因子,表示未来奖励的衰减程度
![]()
是状态
下所有可能动作的最大 Q值,代表智能体在下一个状态下选择最优动作后的预期回报。
是学习率, 控制每次
值更新的步长
通过以上公式,Q-Learning 在每个时间步 t 都会根据当前的经验(状态、动作、奖励、下一个状态)来更新 Q 值。随着学习的进行, Q 值会逐渐收敛到一个最优 Q 值, 从而得到最优策略。
-
Q-Learning 算法推导
Q-Learning 的更新公式来自于 贝尔曼最优方程(Bellman Optimality Equation),它为求解最优值函数提供了递归关系。假设是最优状态-动作,即在每个状态下,选择最优动作可以获得最大回报。根据贝尔曼最优方程,我们有:
这表示,某一状态-动作对的
值等于当前奖励
加上未来状态
下, 采取最优动作
所得到的最大预期回报, 通过与实际更新公式的对比, Q-Learning 通过贝尔曼方程递归地更新
值, 是的 Q 值逐渐逼近最优值
核心思想:获取的奖励 = 眼前的瞬时奖励(做了一个动作就能获取到的经验) + 记忆奖励(按照训练时的经验, 上一个动作发生后, 接下来怎么做才能获得更大奖励)
首先需要获取数据, 也就是玩游戏的记录:
, 其中
代表当前状态,
代表当前动作,
代表下一时刻状态,
代表当前执行动作之后获取到的瞬时奖励。
模型的目标等于:
, 也就是说当前计算的 Q 值等于当前获得的瞬时奖励加上下一个状态执行不同动作获得收益最大动作的 Q 值, 这里的
值因为是下一步,影响贡献度有打折的成分在里面, 这里的
可以理解为期望。
模型的目标函数:
-
基于
查表
(DQN使用网络替代查表)进行迭代更新使得奖励值更大
这里我们选择
-greedy Policy策略, 可以根据代码了解
1.2 Q-Learning 简单示例
如下图是一个密室逃脱
下面给出 Q-Learning 的代码, 代码来源Q-learning_Nogo.py
import numpy as np # 导入NumPy库,用于数值计算
import pandas as pd # 导入Pandas库,用于数据结构操作
import matplotlib.pyplot as plt # 导入Matplotlib库,用于绘制图形
import time # 导入time库,用于控制程序暂停时间
ALPHA = 0.1 # 学习率,决定每次Q值更新的幅度
GAMMA = 0.95 # 折扣因子,决定未来奖励的权重
EPSILION = 0.9 # epsilon-greedy策略中的探索概率,控制随机选择动作的比例
N_STATE = 20 # 状态空间的大小,表示状态的数量
ACTIONS = ['left', 'right'] # 可用的动作集合,左移和右移
MAX_EPISODES = 200 # 最大训练回合数
FRESH_TIME = 0.1 # 每步之间的时间间隔,用于渲染环境
# 构建Q表,Q表存储每个状态-动作对的Q值
def build_q_table(n_state, actions):
q_table = pd.DataFrame(
np.zeros((n_state, len(actions))), # 创建一个形状为(n_state, len(actions))的全零矩阵
np.arange(n_state), # 状态的行索引为0到n_state-1
actions # 动作的列索引为'left'和'right'
)
return q_table # 返回Q表
# 选择当前状态下的动作
def choose_action(state, q_table):
# epsilon-greedy 策略
state_action = q_table.loc[state, :] # 获取当前状态下所有动作的Q值
if np.random.uniform() > EPSILION or (state_action == 0).all(): # 探索(随机选择)或当Q值全为0时
action_name = np.random.choice(ACTIONS) # 随机选择一个动作
else: # 利用(选择Q值最大的动作)
action_name = state_action.idxmax() # 选择Q值最大对应的动作
return action_name # 返回选择的动作
# 获取环境反馈,依据当前状态和所选动作返回下一个状态和奖励
def get_env_feedback(state, action):
if action == 'right': # 如果选择了向右的动作
if state == N_STATE - 2: # 如果已经到达倒数第二个状态
next_state = 'terminal' # 终止状态
reward = 1 # 到达终止状态时给予奖励1
else:
next_state = state + 1 # 否则,状态右移
reward = -0.5 # 每步奖励为-0.5
else: # 如果选择了向左的动作
if state == 0: # 如果已经到达最左端
next_state = 0 # 保持在状态0
else:
next_state = state - 1 # 否则,状态左移
reward = -0.5 # 每步奖励为-0.5
return next_state, reward # 返回下一个状态和奖励
# 更新环境的状态并打印出来
def update_env(state, episode, step_counter):
env = ['-'] * (N_STATE - 1) + ['T'] # 创建一个状态列表,其中T表示目标终止状态
if state == 'terminal': # 如果状态是终止状态
print("Episode {}, the total step is {}".format(episode + 1, step_counter)) # 输出当前回合和步数
final_env = ['-'] * (N_STATE - 1) + ['T'] # 终止状态时的环境状态
return True, step_counter # 返回终止状态和步数
else:
env[state] = '*' # 将当前状态位置标记为*表示智能体所在的位置
env = ''.join(env) # 将状态列表转为字符串显示
print(env) # 打印当前环境状态
time.sleep(FRESH_TIME) # 暂停FRESH_TIME秒以控制显示速度
return False, step_counter # 返回非终止状态和步数
# Q-learning算法实现
def q_learning():
q_table = build_q_table(N_STATE, ACTIONS) # 构建Q表
step_counter_times = [] # 用于存储每个回合的步数
for episode in range(MAX_EPISODES): # 遍历每个回合
state = 0 # 每个回合从状态0开始
is_terminal = False # 是否到达终止状态的标志
step_counter = 0 # 步数计数器
update_env(state, episode, step_counter) # 更新环境并显示
while not is_terminal: # 如果没有到达终止状态
action = choose_action(state, q_table) # 选择动作
next_state, reward = get_env_feedback(state, action) # 获取环境反馈
next_q = q_table.loc[state, action] # 获取当前状态-动作对的Q值
if next_state == 'terminal': # 如果到达终止状态
is_terminal = True # 标记为终止状态
q_target = reward # 目标Q值为奖励值
else: # 如果没有到达终止状态
delta = reward + GAMMA * q_table.iloc[next_state, :].max() - q_table.loc[state, action] # 计算TD误差
q_table.loc[state, action] += ALPHA * delta # 更新Q值
state = next_state # 更新状态
is_terminal, steps = update_env(state, episode, step_counter + 1) # 更新环境并显示
step_counter += 1 # 步数+1
if is_terminal: # 如果到达终止状态
step_counter_times.append(steps) # 记录回合的步数
return q_table, step_counter_times # 返回更新后的Q表和每个回合的步数列表
# 程序入口
if __name__ == '__main__':
q_table, step_counter_times = q_learning() # 运行Q-learning算法
print("Q table\n{}\n".format(q_table)) # 打印最终的Q表
print('end') # 打印训练结束信息
# 绘制每回合步数的图表
plt.plot(step_counter_times, 'g-') # 以绿色线条绘制步数
plt.ylabel("steps") # 设置Y轴标签为"steps"
plt.title("Q-Learning Algorithm") # 设置图标题
plt.show() # 显示图表
print("The step_counter_times is {}".format(step_counter_times)) # 打印每个回合的步数
二、DQN 算法
但是在实际问题中, 不太能用矩阵进行表示, 因此需要用神经网络进行替代, 更新表的过程可以用更新网络的参数替代。数据的获取可以使用 replay buffers 方式构建,用的时候取一个 batch 就可以了, 这种方法其实也就是
off policy
的策略。下面有几个核心技术
- Experience Replay(经验回放)
○ 不像 q-table 的 q-learning, 每一步都学习(update)该步的内容(experience), 对于 q-table 而言, 每一步都学习该步的内容,神经网络连续地学习 时间上相关性高的内容(事实上, 时间 t 的学习内容和时间 t+1 的学习内容非常相似,这样的话收敛会很慢)
○ 将每一步(step)的内容存储在经验池(experience pool)并随机从经验池中提取内容(replay, 回放) 让 NN 学习, 这也是一种批次化(batch), 使用经验池中的多个步骤的经验
- Loss Function 使用 huber 而不是 square loss,误差很大时(
),平方误差会导致误差函数的输出过大,导致学习难以稳定
![]()
重点
- 设置奖励:评估每一步的结果, 为 agent 建立目标
- 利用经验回放进行学习(experience replay: 将经验按照(s, a, s_, r)的形式存储在记忆库中, 每次随机抽取一个 batch 大小的 transition 数据训练网络
- 更新目标网络(Target Network: 目标网络原有的网络同构, 学习一定次数后, 再将评估网络的权重赋给目标网络, 目标网络的引入增加了学习的稳定性
下面给出 DQN 相关代码, 很简单看下就知道了DQN_Nogo.py。
import torch # 导入PyTorch库,提供深度学习功能
import torch.nn as nn # 导入PyTorch中的神经网络模块
import torch.nn.functional as F # 导入PyTorch中常用的函数式API,如激活函数
import numpy as np # 导入NumPy库,用于数值计算
import gym # 导入Gym库,提供强化学习环境
import matplotlib.pyplot as plt # 导入matplotlib库,用于绘图
import copy # 导入copy库,用于复制对象
# hyper-parameters
BATCH_SIZE = 128 # 每次更新时,从记忆池中采样的样本数量
LR = 0.01 # 学习率,用于优化器
GAMMA = 0.90 # 折扣因子,用于计算未来奖励
EPISILO = 0.9 # epsilon-greedy策略中的epsilon值,控制探索和利用的平衡
MEMORY_CAPACITY = 2000 # 记忆池的最大容量
Q_NETWORK_ITERATION = 100 # 每100次学习步骤更新一次目标网络
env = gym.make("CartPole-v1") # 创建Gym环境,使用CartPole-v1环境
env = env.unwrapped # 获取原始环境,解除环境的封装
NUM_ACTIONS = env.action_space.n # 获取动作空间的维度,即可选的动作数
NUM_STATES = env.observation_space.shape[0] # 获取状态空间的维度,即每个状态的特征数
ENV_A_SHAPE = 0 if isinstance(env.action_space.sample(), int) else env.action_space.sample.shape # 确定动作的形状(如果是离散动作,返回0)
class Net(nn.Module): # 定义一个继承自nn.Module的神经网络类
"""神经网络结构,用于近似Q值函数"""
def __init__(self):
super(Net, self).__init__() # 初始化父类的构造函数
self.fc1 = nn.Linear(NUM_STATES, 50) # 第一层,全连接层,输入维度为NUM_STATES,输出维度为50
self.fc1.weight.data.normal_(0, 0.1) # 对权重进行正态分布初始化,均值为0,标准差为0.1
self.fc2 = nn.Linear(50, 30) # 第二层,全连接层,输入维度为50,输出维度为30
self.fc2.weight.data.normal_(0, 0.1) # 对权重进行正态分布初始化
self.out = nn.Linear(30, NUM_ACTIONS) # 输出层,全连接层,输入维度为30,输出维度为NUM_ACTIONS
self.out.weight.data.normal_(0, 0.1) # 对输出层的权重进行正态分布初始化
def forward(self, x): # 定义前向传播函数
x = self.fc1(x) # 通过第一层
x = F.relu(x) # 使用ReLU激活函数
x = self.fc2(x) # 通过第二层
x = F.relu(x) # 使用ReLU激活函数
action_prob = self.out(x) # 输出动作的Q值
return action_prob # 返回动作的Q值
class DQN(): # 定义DQN类
"""深度Q网络(DQN)类,包含Q学习的实现"""
def __init__(self):
super(DQN, self).__init__() # 初始化父类的构造函数
self.eval_net, self.target_net = Net(), Net() # 创建评估网络(eval_net)和目标网络(target_net)
self.learn_step_counter = 0 # 学习步骤计数器
self.memory_counter = 0 # 记忆池中的样本计数器
self.memory = np.zeros((MEMORY_CAPACITY, NUM_STATES * 2 + 2)) # 初始化一个空的记忆池
# 记忆池保存的内容:[状态, 动作, 奖励, 下一个状态]
self.optimizer = torch.optim.Adam(self.eval_net.parameters(), lr=LR) # 使用Adam优化器
self.loss_func = nn.MSELoss() # 定义损失函数为均方误差
def choose_action(self, state):
state = torch.unsqueeze(torch.FloatTensor(state), 0) # 将状态转换为浮动张量,并添加一个维度,使其成为batch的形式
if np.random.randn() <= EPISILO: # epsilon-greedy策略,随机探索
action_value = self.eval_net.forward(state) # 获取评估网络的输出Q值
action = torch.max(action_value, 1)[1].data.numpy() # 选择Q值最大的动作
action = action[0] if ENV_A_SHAPE == 0 else action.reshape(ENV_A_SHAPE) # 如果动作空间是离散的,直接取最大值;否则根据动作空间的形状调整
else: # 探索阶段,随机选择动作
action = np.random.randint(0, NUM_ACTIONS) # 从动作空间中随机选择一个动作
action = action if ENV_A_SHAPE == 0 else action.reshape(ENV_A_SHAPE) # 根据动作空间的形状调整动作
return action # 返回选择的动作
def store_transition(self, state, action, reward, next_state):
transition = np.hstack((state, [action, reward], next_state)) # 将状态、动作、奖励和下一个状态拼接为一个过渡
index = self.memory_counter % MEMORY_CAPACITY # 计算当前样本在记忆池中的存储位置
self.memory[index, :] = transition # 将过渡存入记忆池
self.memory_counter += 1 # 增加记忆池计数器
def learn(self):
# 每Q_NETWORK_ITERATION步更新一次目标网络
if self.learn_step_counter % Q_NETWORK_ITERATION == 0:
self.target_net.load_state_dict(self.eval_net.state_dict()) # 目标网络的参数与评估网络同步
self.learn_step_counter += 1 # 增加学习步骤计数器
# 从记忆池中随机抽取一个批次
sample_index = np.random.choice(MEMORY_CAPACITY, BATCH_SIZE) # 从记忆池中随机选择一个批次
batch_memory = self.memory[sample_index, :] # 获取批次中的过渡
batch_state = torch.FloatTensor(batch_memory[:, :NUM_STATES]) # 提取状态部分
batch_action = torch.LongTensor(batch_memory[:, NUM_STATES:NUM_STATES+1].astype(int)) # 提取动作部分
batch_reward = torch.FloatTensor(batch_memory[:, NUM_STATES+1:NUM_STATES+2]) # 提取奖励部分
batch_next_state = torch.FloatTensor(batch_memory[:,-NUM_STATES:]) # 提取下一个状态部分
# 计算当前Q值
q_eval = self.eval_net(batch_state).gather(1, batch_action) # 使用评估网络计算当前状态下,选择的动作的Q值
q_next = self.target_net(batch_next_state).detach() # 计算目标网络输出的Q值(detach避免计算梯度)
q_target = batch_reward + GAMMA * q_next.max(1)[0].view(BATCH_SIZE, 1) # 计算目标Q值
loss = self.loss_func(q_eval, q_target) # 计算损失函数(均方误差)
self.optimizer.zero_grad() # 清零梯度
loss.backward() # 反向传播计算梯度
self.optimizer.step() # 更新评估网络的参数
def reward_func(env, x, x_dot, theta, theta_dot):
# 计算奖励函数,考虑了小车与杆的偏差
r1 = (env.x_threshold - abs(x))/env.x_threshold - 0.5 # 小车位置偏差的奖励
r2 = (env.theta_threshold_radians - abs(theta)) / env.theta_threshold_radians - 0.5 # 杆的角度偏差的奖励
reward = r1 + r2 # 总奖励
return reward # 返回奖励
if __name__ == '__main__':
dqn = DQN() # 创建DQN对象
episodes = 400 # 训练的总回合数
print("Collecting Experience....") # 输出提示信息
reward_list = [] # 用于存储每一回合的奖励
plt.ion() # 开启交互式绘图模式
fig, ax = plt.subplots() # 创建绘图窗口
for i in range(episodes): # 遍历每一回合
state, _ = env.reset() # 重置环境,获取初始状态
ep_reward = 0 # 初始化该回合的奖励
while True: # 回合进行
env.render() # 渲染环境,可视化当前状态
action = dqn.choose_action(state) # 根据当前状态选择动作
next_state, _, done, info, _ = env.step(action) # 执行动作,获取下一个状态、奖励、是否结束标志等信息
x, x_dot, theta, theta_dot = next_state # 提取环境返回的状态变量
reward = reward_func(env, x, x_dot, theta, theta_dot) # 根据状态变量计算奖励
dqn.store_transition(state, action, reward, next_state) # 将当前的过渡存储到记忆池中
ep_reward += reward # 累加当前回合的奖励
if dqn.memory_counter >= MEMORY_CAPACITY: # 如果记忆池已满
dqn.learn() # 开始学习
if done: # 如果当前回合结束
print("episode: {} , the episode reward is {}".format(i, round(ep_reward, 3))) # 输出当前回合的奖励
if done: # 如果回合结束
break # 跳出循环
state = next_state # 更新状态,进入下一个时间步
r = copy.copy(reward) # 复制当前回合的奖励
reward_list.append(r) # 将奖励加入奖励列表
ax.set_xlim(0, 300) # 设置绘图的x轴范围
# ax.cla() # 可选:清空当前绘图(注释掉以保持之前的图形)
ax.plot(reward_list, 'g-', label='total_loss') # 绘制奖励曲线
plt.pause(0.001) # 暂停一小段时间,用于更新图形