第4章 策略梯度

强化学习三个组成部分:

  • Actor
  • Environment
  • Reward Function

在强化学习中,

  • 环境跟奖励函数是在开始学习之前事先给定的,不受你控制。
  • 你唯一能做的是调整Actor的策略(Policy),使Actor得到最大Reward。
  • Policy根据外界的输入,输出Actor要执行的行为。

Policy of Actor

策略\pi是带有参数\theta的网络:

  • 输入:机器的观察,表示为向量或矩阵;
  • 输出:与动作相关的网络输出层神经元。

以电玩游戏为例:

  • 游戏初始画面记为s_1
  • Actor经过内部网络决定执行第一个动作记为a_1
  • 第一次动作执行完后得到的奖励为r_1,游戏更新新的画面s_2
  • Actor继续执行第二次动作a_2,得到奖励r_2
  • 如此类推,直至判定游戏结束为止。

Actor, Environment, Reward

一场游戏或一个回合(episode)中,环境输出s和Actor的动作a可以表示成一个轨迹(Trajectory):
\text{Trajectory} \ \tau = \{s_1, a_1, s_2, a_2, \cdots, s_t, a_t\}
如果Actor的策略网络参数\theta已经给定,某个轨迹发生的概率可以表示为:
\begin{split} p_{\theta}(\tau) &= p(s_1) p_{\theta}(a_1|s_1)p(s_2|s_1, a_1) p_{\theta}(a_2|s_2)p(s_3|s_2, a_2) \cdots \\ &= p(s_1) \prod_{t=1}^T p_{\theta}(a_t|s_t)p(s_{t+1}|s_t, a_t) \end{split}
该概率取决于两个部分:

  • Environment: p(s_{t+1}|s_t, a_t)
  • Actor:p_{\theta}(a_t|s_t)

奖励函数则根据某一状态采取的某个动作,决定这个行为可获得多少分数。

  • 奖励是一个随机变量
  • 给定参数\theta,可以计算奖励的期望:
    \overline{R}_\theta = \sum_\tau R(\tau)p_{\theta}(\tau) = E_{r\sim p_\theta(\tau)}[R(\tau)]
  • p_{\theta}(\tau)这个分布采样一个轨迹\tau,然后计算 R(\tau) 的期望值,我们要做的事情就是最大化这个期望奖励。

策略梯度(Policy Gradient)

  • 最大化期望奖励:梯度提升(Gradient Ascent)
  • 利用公式:
    \nabla f(x) = f(x) \nabla \log f(x)
    有:
    \begin{split} \nabla \overline{R}_\theta &= \sum_\tau R(\tau) \nabla p_\theta(\tau) \\ &= \sum_\tau R(\tau) p_\theta(\tau) \frac{\nabla p_\theta(\tau)}{p_\theta(\tau)} \\ &= \sum_\tau R(\tau) p_\theta(\tau) \nabla \log p_\theta(\tau) \\ &= E_{r\sim p_\theta(\tau)}[R(\tau) \nabla \log p_\theta(\tau)] \end{split}
  • 采样N条轨迹 \tau, 计算每一条轨迹的R(\tau) \nabla \log p_\theta(\tau),然后把它们全部加起来近似地求得梯度。就可以更新策略参数\theta
    E_{r\sim p_\theta(\tau)}[R(\tau) \nabla \log p_\theta(\tau)] \approx \frac{1}{N} \sum_{n=1}^N R(\tau^n) \nabla \log p_\theta(r^n)
  • 计算\nabla \log p_\theta(\tau),环境部分p(s_1)p(s_{t+1}|s_t, a_t)\theta无关,梯度为0,则有:
    \begin{split} \nabla \log p_\theta(\tau) &= \nabla \log \left( p(s_1) \prod_{t=1}^T p_{\theta}(a_t|s_t)p(s_{t+1}|s_t, a_t) \right) \\ &= \nabla \log p(s_1) + \nabla \sum_{t=1}^T \log p_{\theta}(a_t|s_t) + \nabla \sum_{t=1}^T \log p(s_{t+1}|s_t, a_t) \\ &= \nabla \sum_{t=1}^T \log p_{\theta}(a_t|s_t) = \sum_{t=1}^T \nabla \log p_{\theta}(a_t|s_t) \end{split}
  • 综合起来得到:
    \nabla \overline{R}_\theta \approx \frac{1}{N} \sum_{n=1}^N \sum_{t=1}^T R(\tau^n) \nabla \log p_\theta (a_t^n|s_t^n)
    策略梯度更新
  • 采样的数据就只会用一次。

Tips

Tip 1:添加基线(Add a Baseline)

  • 很多游戏里面,奖励R(\tau)总是正的
  • 没有采样到的动作权重会变小
  • 改进:引入基线b
    \begin{split} \nabla \overline{R}_\theta &\approx \frac{1}{N} \sum_{n=1}^N \sum_{t=1}^T (R(\tau^n) - b) \nabla \log p_\theta (a_t^n|s_t^n) \\ b &\approx E[R(\tau)] \end{split}

Tip 2:给每一个动作合适评价(Assign Suitble Credit)

  • 所有的”状态-动作对”都会使用同样的奖励项R(\tau)进行加权
  • 权重应反映每个动作的好坏
  • 改进:
    • 只计算从这个动作执行以后所得到的奖励
    • 折扣回报(Discounted Return):给未来的奖励做一个折扣
      \begin{split} \nabla \overline{R}_\theta &\approx \frac{1}{N} \sum_{t'=t}^{Tn}(\gamma^{t'-t}r_{t'}^n-b) \sum_{n=1}^N \sum_{t=1}^T \nabla \log p_\theta (a_t^n|s_t^n) \\ \gamma &\in [0,1] \end{split}
  • 优势函数(Advantage Function)A^{\theta}(s_t,a_t)=R-b,相对的优势。

REINFORCE:蒙特卡罗策略梯度(Monte Carlo Policy Gradient)

蒙特卡罗策略梯度算法

实现REINFORCE

import numpy as np
import matplotlib.pyplot as plt
import gym
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.autograd import Variable

GAMMA = 0.9


class PolicyNetwork(nn.Module):
    def __init__(self, num_inputs, num_actions, hidden_size, learning_rate=3e-4):
        super().__init__()

        self.num_actions = num_actions
        self.linear1 = nn.Linear(num_inputs, hidden_size)
        self.linear2 = nn.Linear(hidden_size, num_actions)
        self.optimizer = optim.Adam(self.parameters(), lr=learning_rate)

    def forward(self, state):
        x = F.relu(self.linear1(state))
        x = F.softmax(self.linear2(x), dim=1)
        return x

    def get_action(self, state):
        state = torch.from_numpy(state).float().unsqueeze(0)
        probs = self.forward(Variable(state))
        highest_prob_action = np.random.choice(
            self.num_actions, p=np.squeeze(probs.detach().numpy())
        )
        log_prob = torch.log(probs.squeeze(0)[highest_prob_action])
        return highest_prob_action, log_prob


def update_policy(policy_network, rewards, log_probs):
    discounted_rewards = []

    for t in range(len(rewards)):
        Gt = 0
        pw = 0
        for r in rewards[t:]:
            Gt = Gt + GAMMA**pw * r
            pw += 1
        discounted_rewards.append(Gt)

    discounted_rewards = torch.tensor(discounted_rewards)
    discounted_rewards = (discounted_rewards - discounted_rewards.mean()) / (
        discounted_rewards.std() + 1e-9
    )

    policy_gradient = []
    for log_prob, Gt in zip(log_probs, discounted_rewards):
        policy_gradient.append(-log_prob * Gt)

    policy_network.optimizer.zero_grad()
    policy_gradient = torch.stack(policy_gradient).sum()
    policy_gradient.backward()
    policy_network.optimizer.step()


def main():
    env = gym.make("CartPole-v0")
    policy_net = PolicyNetwork(
        num_inputs=env.observation_space.shape[0],
        num_actions=env.action_space.n,
        hidden_size=128,
    )

    max_episode_num = 5000
    max_steps = 10000
    numsteps = []
    avg_numsteps = []
    all_rewards = []

    for episode in range(max_episode_num):
        state = env.reset()
        log_probs = []
        rewards = []

        for steps in range(max_steps):
            env.render()
            action, log_prob = policy_net.get_action(state)
            new_state, reward, done, _ = env.step(action)
            log_probs.append(log_prob)
            rewards.append(reward)

            if done:
                update_policy(policy_net, rewards, log_probs)
                numsteps.append(steps)
                avg_numsteps.append(np.mean(numsteps[-10:]))
                all_rewards.append(np.sum(rewards))
                if episode % 1 == 0:
                    print(
                        "episode: {:4}, total reward: {:5.1f}, average_reward: {:5.1f}, length: {:4}".format(
                            episode,
                            np.sum(rewards),
                            np.mean(all_rewards[-10:]),
                            steps,
                        )
                    )
                break

            state = new_state

    plt.plot(numsteps)
    plt.plot(avg_numsteps)
    plt.xlabel("Episode")
    plt.show()


if __name__ == "__main__":
    main()
策略梯度训练过程

训练结果

参考资料

Deriving Policy Gradients and Implementing REINFORCE

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容