注意
点击 这里 下载完整的示例代码
强化学习 (DQN) 教程¶
创建于:2017年3月24日 | 最后更新:2024年6月18日 | 最后验证:2024年11月5日
- Author: Adam Paszke
本教程展示了如何使用PyTorch在Gymnasium的CartPole-v1任务上训练深度Q学习(DQN)代理。
你可能会发现阅读原始的Deep Q Learning (DQN)论文很有帮助
任务
代理必须在两个动作之间做出决定——向左或向右移动小车——以便附着在其上的杆保持直立。您可以在Gymnasium的网站上找到有关该环境和其他更具挑战性的环境的更多信息。
CartPole¶
当代理观察到环境的当前状态并选择一个动作时,环境会转换到一个新的状态,并且还会返回一个奖励,该奖励表示动作的后果。在这个任务中,每个增量时间步的奖励为+1,如果杆子倒下太远或小车移动超过中心2.4个单位,环境将终止。这意味着表现更好的场景将运行更长时间,积累更大的回报。
CartPole任务的设计使得代理的输入是4个表示环境状态的实数值(位置、速度等)。我们直接使用这4个输入,不进行任何缩放,并将它们传递到一个具有2个输出的小型全连接网络中,每个输出对应一个动作。网络被训练来预测给定输入状态下每个动作的期望值。然后选择具有最高期望值的动作。
包
首先,让我们导入所需的包。首先,我们需要 gymnasium 用于环境, 通过使用 pip 安装。这是原始 OpenAI Gym 项目的一个分支,自 Gym v0.19 以来由同一团队维护。 如果您在 Google Colab 中运行此代码,请运行:
%%bash
pip3 install gymnasium[classic_control]
我们还将使用以下来自 PyTorch 的内容:
神经网络 (
torch.nn)优化 (
torch.optim)自动微分 (
torch.autograd)
import gymnasium as gym
import math
import random
import matplotlib
import matplotlib.pyplot as plt
from collections import namedtuple, deque
from itertools import count
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
env = gym.make("CartPole-v1")
# set up matplotlib
is_ipython = 'inline' in matplotlib.get_backend()
if is_ipython:
from IPython import display
plt.ion()
# if GPU is to be used
device = torch.device(
"cuda" if torch.cuda.is_available() else
"mps" if torch.backends.mps.is_available() else
"cpu"
)
回放记忆¶
我们将使用经验回放记忆来训练我们的DQN。它存储了代理观察到的转换,使我们能够稍后重用这些数据。通过随机从中采样,构建批次的转换是不相关的。已经证明,这大大稳定并改进了DQN训练过程。
为此,我们需要两个类:
Transition- 一个命名元组,表示我们环境中的单个转换。它基本上将(状态,动作)对映射到它们的(下一个状态,奖励)结果,其中状态是稍后描述的屏幕差异图像。ReplayMemory- 一个有限大小的循环缓冲区,用于保存最近观察到的转换。它还实现了.sample()方法,用于选择随机的一批转换进行训练。
Transition = namedtuple('Transition',
('state', 'action', 'next_state', 'reward'))
class ReplayMemory(object):
def __init__(self, capacity):
self.memory = deque([], maxlen=capacity)
def push(self, *args):
"""Save a transition"""
self.memory.append(Transition(*args))
def sample(self, batch_size):
return random.sample(self.memory, batch_size)
def __len__(self):
return len(self.memory)
现在,让我们定义我们的模型。但首先,让我们快速回顾一下什么是DQN。
DQN算法¶
我们的环境是确定性的,因此为了简单起见,这里提出的所有方程也都是以确定性的方式表述的。在强化学习文献中,它们还会包含对环境中的随机转移的期望。
我们的目标是训练一个策略,试图最大化折扣后的累积奖励 \(R_{t_0} = \sum_{t=t_0}^{\infty} \gamma^{t - t_0} r_t\),其中 \(R_{t_0}\) 也被称为回报。折扣率 \(\gamma\) 应该是一个介于 \(0\) 和 \(1\) 之间的常数, 以确保总和收敛。较低的 \(\gamma\) 使得 来自不确定的遥远未来的奖励对我们的智能体来说不如近未来的奖励重要, 因为它对近未来的奖励更有信心。这也鼓励智能体在时间上更接近的时候收集奖励, 而不是在时间上遥远的未来收集等价的奖励。
Q-learning 的主要思想是,如果我们有一个函数 \(Q^*: State \times Action \rightarrow \mathbb{R}\),它可以告诉我们 在给定状态下采取某个动作后的回报,那么我们就可以轻松地构建一个最大化奖励的策略:
然而,我们并不了解世界的所有事物,因此我们无法直接获取\(Q^*\)。但是,由于神经网络是通用的函数逼近器,我们可以简单地创建一个并训练它以近似\(Q^*\)。
对于我们的训练更新规则,我们将使用一个事实,即每个策略的\(Q\)函数都遵循贝尔曼方程:
等式两边的差异被称为时间差异误差,\(\delta\):
为了最小化这个误差,我们将使用Huber损失。当误差较小时,Huber损失表现得像均方误差,但当误差较大时,它表现得像平均绝对误差——这使得它在\(Q\)的估计非常嘈杂时对异常值更加稳健。我们从回放内存中采样一批转换\(B\)来计算这个损失:
Q-网络¶
我们的模型将是一个前馈神经网络,它接收当前和之前的屏幕补丁之间的差异。它有两个输出,分别代表\(Q(s, \mathrm{left})\)和\(Q(s, \mathrm{right})\)(其中\(s\)是网络的输入)。实际上,网络试图预测在当前输入下采取每个动作的预期回报。
class DQN(nn.Module):
def __init__(self, n_observations, n_actions):
super(DQN, self).__init__()
self.layer1 = nn.Linear(n_observations, 128)
self.layer2 = nn.Linear(128, 128)
self.layer3 = nn.Linear(128, n_actions)
# Called with either one element to determine next action, or a batch
# during optimization. Returns tensor([[left0exp,right0exp]...]).
def forward(self, x):
x = F.relu(self.layer1(x))
x = F.relu(self.layer2(x))
return self.layer3(x)
训练¶
超参数和实用工具¶
这个单元格实例化了我们的模型及其优化器,并定义了一些实用工具:
select_action- 将根据ε贪婪策略选择一个动作。简单来说,我们有时会使用我们的模型来选择动作,有时我们只是均匀地采样一个动作。选择随机动作的概率将从EPS_START开始,并会以指数方式衰减到EPS_END。EPS_DECAY控制衰减的速率。plot_durations- 一个用于绘制每集持续时间的辅助工具, 同时显示最近100集的平均值(官方评估中使用的指标)。图表将位于包含主训练循环的单元格下方,并在每集结束后更新。
# BATCH_SIZE is the number of transitions sampled from the replay buffer
# GAMMA is the discount factor as mentioned in the previous section
# EPS_START is the starting value of epsilon
# EPS_END is the final value of epsilon
# EPS_DECAY controls the rate of exponential decay of epsilon, higher means a slower decay
# TAU is the update rate of the target network
# LR is the learning rate of the ``AdamW`` optimizer
BATCH_SIZE = 128
GAMMA = 0.99
EPS_START = 0.9
EPS_END = 0.05
EPS_DECAY = 1000
TAU = 0.005
LR = 1e-4
# Get number of actions from gym action space
n_actions = env.action_space.n
# Get the number of state observations
state, info = env.reset()
n_observations = len(state)
policy_net = DQN(n_observations, n_actions).to(device)
target_net = DQN(n_observations, n_actions).to(device)
target_net.load_state_dict(policy_net.state_dict())
optimizer = optim.AdamW(policy_net.parameters(), lr=LR, amsgrad=True)
memory = ReplayMemory(10000)
steps_done = 0
def select_action(state):
global steps_done
sample = random.random()
eps_threshold = EPS_END + (EPS_START - EPS_END) * \
math.exp(-1. * steps_done / EPS_DECAY)
steps_done += 1
if sample > eps_threshold:
with torch.no_grad():
# t.max(1) will return the largest column value of each row.
# second column on max result is index of where max element was
# found, so we pick action with the larger expected reward.
return policy_net(state).max(1).indices.view(1, 1)
else:
return torch.tensor([[env.action_space.sample()]], device=device, dtype=torch.long)
episode_durations = []
def plot_durations(show_result=False):
plt.figure(1)
durations_t = torch.tensor(episode_durations, dtype=torch.float)
if show_result:
plt.title('Result')
else:
plt.clf()
plt.title('Training...')
plt.xlabel('Episode')
plt.ylabel('Duration')
plt.plot(durations_t.numpy())
# Take 100 episode averages and plot them too
if len(durations_t) >= 100:
means = durations_t.unfold(0, 100, 1).mean(1).view(-1)
means = torch.cat((torch.zeros(99), means))
plt.plot(means.numpy())
plt.pause(0.001) # pause a bit so that plots are updated
if is_ipython:
if not show_result:
display.display(plt.gcf())
display.clear_output(wait=True)
else:
display.display(plt.gcf())
训练循环¶
最后,训练我们模型的代码。
在这里,你可以找到一个optimize_model函数,它执行优化的一步。它首先采样一个批次,将所有张量连接成一个,计算\(Q(s_t, a_t)\)和\(V(s_{t+1}) = \max_a Q(s_{t+1}, a)\),并将它们组合成我们的损失。根据定义,如果\(s\)是终止状态,我们设置\(V(s) = 0\)。我们还使用目标网络来计算\(V(s_{t+1})\)以增加稳定性。目标网络在每一步通过由超参数TAU控制的软更新进行更新,该超参数之前已定义。
def optimize_model():
if len(memory) < BATCH_SIZE:
return
transitions = memory.sample(BATCH_SIZE)
# Transpose the batch (see https://stackoverflow.com/a/19343/3343043 for
# detailed explanation). This converts batch-array of Transitions
# to Transition of batch-arrays.
batch = Transition(*zip(*transitions))
# Compute a mask of non-final states and concatenate the batch elements
# (a final state would've been the one after which simulation ended)
non_final_mask = torch.tensor(tuple(map(lambda s: s is not None,
batch.next_state)), device=device, dtype=torch.bool)
non_final_next_states = torch.cat([s for s in batch.next_state
if s is not None])
state_batch = torch.cat(batch.state)
action_batch = torch.cat(batch.action)
reward_batch = torch.cat(batch.reward)
# Compute Q(s_t, a) - the model computes Q(s_t), then we select the
# columns of actions taken. These are the actions which would've been taken
# for each batch state according to policy_net
state_action_values = policy_net(state_batch).gather(1, action_batch)
# Compute V(s_{t+1}) for all next states.
# Expected values of actions for non_final_next_states are computed based
# on the "older" target_net; selecting their best reward with max(1).values
# This is merged based on the mask, such that we'll have either the expected
# state value or 0 in case the state was final.
next_state_values = torch.zeros(BATCH_SIZE, device=device)
with torch.no_grad():
next_state_values[non_final_mask] = target_net(non_final_next_states).max(1).values
# Compute the expected Q values
expected_state_action_values = (next_state_values * GAMMA) + reward_batch
# Compute Huber loss
criterion = nn.SmoothL1Loss()
loss = criterion(state_action_values, expected_state_action_values.unsqueeze(1))
# Optimize the model
optimizer.zero_grad()
loss.backward()
# In-place gradient clipping
torch.nn.utils.clip_grad_value_(policy_net.parameters(), 100)
optimizer.step()
下面,你可以找到主要的训练循环。开始时,我们重置环境并获取初始的state张量。然后,我们采样一个动作,执行它,观察下一个状态和奖励(总是1),并优化我们的模型一次。当回合结束时(我们的模型失败),我们重新开始循环。
下面,如果GPU可用,num_episodes设置为600,否则计划50集,以便训练不会花费太长时间。然而,50集不足以在CartPole上观察到良好的性能。你应该看到模型在600个训练集内不断达到500步。训练RL代理可能是一个嘈杂的过程,因此如果未观察到收敛,重新开始训练可能会产生更好的结果。
if torch.cuda.is_available() or torch.backends.mps.is_available():
num_episodes = 600
else:
num_episodes = 50
for i_episode in range(num_episodes):
# Initialize the environment and get its state
state, info = env.reset()
state = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0)
for t in count():
action = select_action(state)
observation, reward, terminated, truncated, _ = env.step(action.item())
reward = torch.tensor([reward], device=device)
done = terminated or truncated
if terminated:
next_state = None
else:
next_state = torch.tensor(observation, dtype=torch.float32, device=device).unsqueeze(0)
# Store the transition in memory
memory.push(state, action, next_state, reward)
# Move to the next state
state = next_state
# Perform one step of the optimization (on the policy network)
optimize_model()
# Soft update of the target network's weights
# θ′ ← τ θ + (1 −τ )θ′
target_net_state_dict = target_net.state_dict()
policy_net_state_dict = policy_net.state_dict()
for key in policy_net_state_dict:
target_net_state_dict[key] = policy_net_state_dict[key]*TAU + target_net_state_dict[key]*(1-TAU)
target_net.load_state_dict(target_net_state_dict)
if done:
episode_durations.append(t + 1)
plot_durations()
break
print('Complete')
plot_durations(show_result=True)
plt.ioff()
plt.show()

/usr/local/lib/python3.10/dist-packages/gymnasium/utils/passive_env_checker.py:249: DeprecationWarning:
`np.bool8` is a deprecated alias for `np.bool_`. (Deprecated NumPy 1.24)
Complete
这是说明整体结果数据流的图表。
动作要么随机选择,要么基于策略选择,从gym环境中获取下一步样本。我们将结果记录在回放内存中,并在每次迭代中运行优化步骤。优化从回放内存中随机选取一批数据进行新策略的训练。在优化过程中,也会使用“较旧”的target_net来计算预期的Q值。每一步都会对其权重进行软更新。
脚本总运行时间: (3 分钟 57.238 秒)