跳到主要内容

DQN算法

1. DQN 算法

Q-Learning算法使用一张完备的存储所有动作Q值的表格,但实际上我们往往不可能存储一个正常二维以及三维的Q值数据,这太大了。所以,我们转变思路,用函数拟合的方法来估计Q值,将这个Q值表格视为数据,并用参数化的函数QθQ_\theta来拟合数据。

近似方法往往存在较大的误差,而DQN 算法可以解决连续状态下离散动作的问题,由此减小误差。

1.1 CartPole 环境

CartPole 指路:https://github.com/openai/gym/wiki/CartPole-v0

它的状态值是连续的,动作值是离散的。

这是一个典型的倒立摆小车环境,调过平衡车的都不陌生吧。这个agent的任务是左右移动保持车杆竖直,若杆的倾斜度数过大,或者车子离初始位置左右的偏离程度过大,或者坚持时间到达 200 帧,则游戏结束。智能体的状态是一个维数为 4 的向量,每一维都是连续的,其动作是离散的,动作空间大小为 2。

我们可以列这个环境的状态空间的表格:

维度意义最小值最大值
0车的位置-2.42.4
1车的速度-∞
2杆的角度≈ -41.8°≈ 41.8°
3杆尖端的速度-∞

动作空间:

标号动作
0向左移动小车
1向右移动小车

现在我们想在类似车杆的环境中得到动作价值函数 Q(s,a)Q(s,a)。由于状态每一维度的值都是连续的,无法使用表格记录,因此采用函数拟合的思想。神经网络凭借其强大的表达能力,成为理想的拟合工具。

对于不同的动作空间,Q网络有两种主要架构:

连续动作空间

  • 输入:状态 ss 和动作 aa
  • 输出:标量值 Q(s,a)Q(s,a)
  • 数学表达:Qθ(s,a)Q_\theta(s,a),其中 θ\theta 为网络参数

离散动作空间

  • 输入:状态 ss
  • 输出:所有可能动作的Q值向量 [Q(s,a1),Q(s,a2),...,Q(s,an)][Q(s,a_1), Q(s,a_2), ..., Q(s,a_n)]
  • 数学表达:Qθ(s)=[Qθ(s,a1),Qθ(s,a2),...,Qθ(s,an)]Q_\theta(s) = [Q_\theta(s,a_1), Q_\theta(s,a_2), ..., Q_\theta(s,a_n)]

通常DQN(以及Q-learning)只能处理动作离散的情况,因为在价值更新过程中需要计算 maxaQ(s,a)\max_a Q(s',a) 操作。

回顾Q-learning的更新规则:

Q(st,at)Q(st,at)+α[rt+γmaxaQ(st+1,a)Q(st,at)]Q(s_t,a_t) \gets Q(s_t,a_t) + \alpha[r_t + \gamma \max_{a'} Q(s_{t+1},a') - Q(s_t,a_t)]

定义时序差分(TD)目标:

yt=rt+γmaxaQθ(st+1,a)y_t = r_t + \gamma \max_{a'} Q_\theta(s_{t+1},a')

对于一组经验数据 {(si,ai,ri,si+1)}i=1N\{(s_i, a_i, r_i, s_{i+1})\}_{i=1}^N,Q网络的损失函数构造为均方误差形式:

θ=argminθ12Ni=1N[Qθ(si,ai)(ri+γmaxaQθ(si+1,a))]2\theta^* = \arg\min_\theta \frac{1}{2N} \sum_{i=1}^N \left[Q_\theta(s_i,a_i) - \left(r_i + \gamma \max_{a'} Q_\theta(s_{i+1},a')\right)\right]^2

这个除以 2N2N 的形式在求导时会使平方项的系数变为 1,简化梯度计算:

θL(θ)=1Ni=1N(Qθ(si,ai)yi)θQθ(si,ai)\nabla_\theta \mathcal{L}(\theta) = \frac{1}{N} \sum_{i=1}^N \left(Q_\theta(s_i,a_i) - y_i\right) \nabla_\theta Q_\theta(s_i,a_i)

从而Q-Learning扩展为神经网络形式,也就是深度 Q 网络(deep Q network,DQN)。

DQN是离线策略算法,因此我们在收集数据的时候可以使用一个ϵ\epsilon -贪婪策略来平衡探索与利用,存储收集到的数据。

除此之外,我们需要两个提升DQN表现的工具。

1.2 经验回放

在一般的监督学习中,训练数据通常都是独立同分布的——就像从一个大箱子里随机抓取糖果,每次品尝的味道都不相关。但强化学习中的交互数据却大不相同:智能体在环境中一步步探索,前后状态紧密相连,就像看连续剧一样,如果只按顺序学习,很容易陷入“最近偏见”,只记得刚发生的事。

为了巧妙解决这个问题,DQN 引入了“经验回放”机制。想象智能体有一个“记忆宝库”(回放缓冲区),每次与环境互动获得的珍贵经验(状态、动作、奖励、下一状态)都被存储起来。训练时,它不再按顺序复习,而是从宝库中随机抽取一批记忆碎片来学习。这种方式有两个妙处:

  1. 打破数据的“连环套”:原本连续的经验就像一串珍珠项链,颗颗相连。通过随机重播,我们把项链拆散,让每颗珍珠独立闪耀,使神经网络不再被近期记忆绑架,而是看到更全面的模式。(在 MDP 中交互采样得到的数据本身不满足独立假设,因为这一时刻的状态和上一时刻的状态有关。非独立同分布的数据对训练神经网络有很大的影响,会使神经网络拟合到最近训练的数据上。采用经验回放可以打破样本之间的相关性,让其满足独立假设。)

  2. 让每份经验“物尽其用”:在传统 Q-learning 中,每份经验就像一次性餐具,用后即弃。而现在,每段记忆都可以被反复品味、多次学习,极大提升了样本利用率——这正是深度神经网络最需要的“营养餐”。

通过这种巧妙的设计,DQN 既继承了 Q-learning 的精华,又克服了神经网络训练中的关键障碍,让智能体在复杂环境中学得更稳、更聪明。

1.3 目标网络

DQN 算法的最终优化目标是让 QθQ_\theta 逼近 TD 目标 r+γmaxaQθ(s,a)r + \gamma \max_{a'} Q_\theta(s',a')。然而这里存在一个内在矛盾:TD 目标本身就包含神经网络的输出,因此在更新网络参数 θ\theta 的同时,目标本身也在不断变化,这就像是在追逐一个移动的靶子,极易导致训练过程震荡和发散。

为了解决这一稳定性问题,DQN 引入了目标网络(target network) 的巧妙设计。其核心思想是:既然训练过程中 Q 网络的持续更新会导致目标不断漂移,不如先将 TD 目标中的 Q 网络暂时"冻结"。

具体实现需要两套独立的 Q 网络:

(1) 训练网络 QθQ_\theta:负责计算损失函数中的预测值 Qθ(s,a)Q_\theta(s,a),每一步都通过梯度下降进行参数更新。其作用是算法的"先锋部队",不断探索优化方向。

(2) 目标网络 QθQ_{\theta^-} :负责计算损失函数中的目标值 r+γmaxaQθ(s,a)r + \gamma \max_{a'} Q_{\theta^-}(s',a'),其参数 θ\theta^- 保持相对固定,提供稳定的学习目标,防止目标过度漂移

两套网络参数不能时刻保持一致,因为这样就没有学习的意义了。为了让目标真正稳定,DQN 采用延迟同步机制,可以理解为“小步快跑,定期校准”:

  • 训练网络:每一步都实时更新,θθαθL(θ)\theta \gets \theta - \alpha \nabla_\theta \mathcal{L}(\theta)
  • 目标网络:每隔 CC 步才与训练网络同步一次,θθ\theta^- \gets \theta

. 初始化阶段

  • 随机初始化训练网络参数 θ\theta
  • 复制参数初始化目标网络:θθ\theta^- \gets \theta
  • 创建空的经验回放池 D\mathcal{D}
  1. 交互学习循环(对每个序列 e=1e=1EE
    • 获取环境初始状态 s1s_1
    • 对每个时间步 t=1t=1TT
      • 探索行动:以 ϵ\epsilon-贪婪策略选择动作 at={随机动作以概率 ϵargmaxaQθ(st,a)以概率 1ϵa_t = \begin{cases} \text{随机动作} & \text{以概率 } \epsilon \\ \arg\max_a Q_\theta(s_t,a) & \text{以概率 } 1-\epsilon \end{cases}
      • 环境交互:执行 ata_t,获得奖励 rtr_t 和下一状态 st+1s_{t+1}
      • 经验存储:将转换 (st,at,rt,st+1)(s_t, a_t, r_t, s_{t+1}) 存入回放池 D\mathcal{D}
      • 批次学习:如果 D\mathcal{D} 中数据足够,随机采样 NN 个转换 {(si,ai,ri,si+1)}i=1N\{(s_i,a_i,r_i,s_{i+1})\}_{i=1}^N
        • 对每个样本计算目标值:yi=ri+γmaxaQθ(si+1,a)y_i = r_i + \gamma \max_{a'} Q_{\theta^-}(s_{i+1},a')
        • 计算损失:L(θ)=1Ni(Qθ(si,ai)yi)2\mathcal{L}(\theta) = \frac{1}{N} \sum_i (Q_\theta(s_i,a_i) - y_i)^2
        • 梯度下降更新训练网络:θθαθL(θ)\theta \gets \theta - \alpha \nabla_\theta \mathcal{L}(\theta)
      • 定期同步:每隔 CC 步更新目标网络:θθ\theta^- \gets \theta

通过这种双网络架构和延迟同步机制,DQN 成功解决了深度强化学习中的训练稳定性难题,让智能体能够在复杂环境中稳健学习。

2. Code

我们用车杆环境测试一下DQN的效果:

import gymnasium as gym
import numpy as np
import random
from collections import deque
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import matplotlib.pyplot as plt
from typing import List, Tuple

class QNetwork(nn.Module):
"""Q网络:输入状态,输出各动作的Q值"""

def __init__(self, state_size: int, action_size: int, hidden_size: int = 64):
super(QNetwork, self).__init__()
self.fc1 = nn.Linear(state_size, hidden_size)
self.fc2 = nn.Linear(hidden_size, hidden_size)
self.fc3 = nn.Linear(hidden_size, action_size)

def forward(self, state: torch.Tensor) -> torch.Tensor:
x = F.relu(self.fc1(state))
x = F.relu(self.fc2(x))
return self.fc3(x)

class ReplayBuffer:
"""经验回放缓冲区"""

def __init__(self, buffer_size: int, batch_size: int):
self.memory = deque(maxlen=buffer_size)
self.batch_size = batch_size

def add(self, state: np.ndarray, action: int, reward: float,
next_state: np.ndarray, done: bool):
"""添加经验到缓冲区"""
self.memory.append((state, action, reward, next_state, done))

def sample(self) -> Tuple:
"""随机采样一批经验"""
experiences = random.sample(self.memory, k=self.batch_size)

states = torch.from_numpy(np.vstack([e[0] for e in experiences])).float()
actions = torch.from_numpy(np.vstack([e[1] for e in experiences])).long()
rewards = torch.from_numpy(np.vstack([e[2] for e in experiences])).float()
next_states = torch.from_numpy(np.vstack([e[3] for e in experiences])).float()
dones = torch.from_numpy(np.vstack([e[4] for e in experiences]).astype(np.uint8)).float()

return states, actions, rewards, next_states, dones

def __len__(self) -> int:
return len(self.memory)

class DQNAgent:
"""DQN智能体"""

def __init__(self, state_size: int, action_size: int):
self.state_size = state_size
self.action_size = action_size

# 超参数
self.buffer_size = 10000
self.batch_size = 32
self.lr = 0.001
self.gamma = 0.99
self.tau = 0.01 # 软更新参数
self.update_every = 4 # 更新频率

# Q网络
self.qnetwork_local = QNetwork(state_size, action_size)
self.qnetwork_target = QNetwork(state_size, action_size)
self.optimizer = optim.Adam(self.qnetwork_local.parameters(), lr=self.lr)

# 经验回放
self.memory = ReplayBuffer(self.buffer_size, self.batch_size)

# 时间步计数器
self.t_step = 0

def step(self, state: np.ndarray, action: int, reward: float,
next_state: np.ndarray, done: bool):
"""存储经验并学习"""
# 存储经验
self.memory.add(state, action, reward, next_state, done)

# 每隔一定步数学习
self.t_step = (self.t_step + 1) % self.update_every
if self.t_step == 0 and len(self.memory) > self.batch_size:
experiences = self.memory.sample()
self.learn(experiences)

def act(self, state: np.ndarray, eps: float = 0.0) -> int:
"""根据当前策略选择动作"""
state = torch.from_numpy(state).float().unsqueeze(0)
self.qnetwork_local.eval()
with torch.no_grad():
action_values = self.qnetwork_local(state)
self.qnetwork_local.train()

# Epsilon-greedy策略
if random.random() > eps:
return np.argmax(action_values.cpu().data.numpy())
else:
return random.choice(np.arange(self.action_size))

def learn(self, experiences: Tuple):
"""使用一批经验更新网络参数"""
states, actions, rewards, next_states, dones = experiences

# 计算当前Q值
q_current = self.qnetwork_local(states).gather(1, actions)

# 计算目标Q值
q_targets_next = self.qnetwork_target(next_states).detach().max(1)[0].unsqueeze(1)
q_targets = rewards + (self.gamma * q_targets_next * (1 - dones))

# 计算损失
loss = F.mse_loss(q_current, q_targets)

# 最小化损失
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()

# 软更新目标网络
self.soft_update()

def soft_update(self):
"""软更新目标网络参数"""
for target_param, local_param in zip(self.qnetwork_target.parameters(),
self.qnetwork_local.parameters()):
target_param.data.copy_(self.tau * local_param.data +
(1.0 - self.tau) * target_param.data)

def train_dqn(env_name: str = "CartPole-v1", n_episodes: int = 1000,
max_t: int = 1000, eps_start: float = 1.0,
eps_end: float = 0.01, eps_decay: float = 0.995):
"""训练DQN智能体"""

# 创建环境和智能体 - 使用Gymnasium
env = gym.make(env_name)
state_size = env.observation_space.shape[0]
action_size = env.action_space.n
agent = DQNAgent(state_size, action_size)

scores = [] # 每回合得分
scores_window = deque(maxlen=100) # 最近100回合得分
eps = eps_start # 探索率

for i_episode in range(1, n_episodes + 1):
# Gymnasium的reset返回(state, info)
state, _ = env.reset()
score = 0

for t in range(max_t):
# 选择并执行动作
action = agent.act(state, eps)
# Gymnasium的step返回(state, reward, terminated, truncated, info)
next_state, reward, terminated, truncated, _ = env.step(action)
done = terminated or truncated

# 学习
agent.step(state, action, reward, next_state, done)

state = next_state
score += reward

if done:
break

scores_window.append(score)
scores.append(score)
eps = max(eps_end, eps_decay * eps) # 衰减探索率

# 打印训练进度
if i_episode % 100 == 0:
print(f'\rEpisode {i_episode}\tAverage Score: {np.mean(scores_window):.2f}')

# 检查是否解决环境
if np.mean(scores_window) >= 195.0:
print(f'\nEnvironment solved in {i_episode} episodes!')
torch.save(agent.qnetwork_local.state_dict(), 'checkpoint.pth')
break

env.close()
return scores

def plot_scores(scores: List[float]):
"""绘制训练得分曲线"""
plt.figure(figsize=(12, 6))
plt.plot(scores, alpha=0.6, label='Episode Score')

# 计算移动平均
window_size = 100
moving_avg = []
for i in range(len(scores)):
if i < window_size:
moving_avg.append(np.mean(scores[:i+1]))
else:
moving_avg.append(np.mean(scores[i-window_size+1:i+1]))

plt.plot(moving_avg, linewidth=2, label=f'{window_size}-Episode Moving Average')
plt.xlabel('Episode')
plt.ylabel('Score')
plt.title('DQN Training Performance on CartPole-v1')
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig('dqn_training.png', dpi=300, bbox_inches='tight')
plt.show()

def test_agent(env_name: str = "CartPole-v1", model_path: str = 'checkpoint.pth'):
"""测试训练好的智能体"""
# 使用Gymnasium
env = gym.make(env_name, render_mode='human') # 添加render_mode以显示可视化
state_size = env.observation_space.shape[0]
action_size = env.action_space.n

# 加载训练好的模型
agent = DQNAgent(state_size, action_size)
agent.qnetwork_local.load_state_dict(torch.load(model_path))

state, _ = env.reset()
score = 0

for t in range(1000):
action = agent.act(state) # 测试时不探索
next_state, reward, terminated, truncated, _ = env.step(action)
state = next_state
score += reward

if terminated or truncated:
break

print(f"Test Score: {score}")
env.close()

if __name__ == "__main__":
# 设置随机种子
torch.manual_seed(0)
np.random.seed(0)
random.seed(0)

print("开始训练DQN智能体...")
scores = train_dqn(n_episodes=1000)

print("绘制训练曲线...")
plot_scores(scores)

print("测试训练好的智能体...")
test_agent()
Details
ImageImage

PS F:\BLOG\ROT-Blog\docs\Control\强化学习> python .\1.py

开始训练DQN智能体...

Episode 100 Average Score: 23.13

Episode 200 Average Score: 160.99

Environment solved in 214 episodes!

绘制训练曲线...

测试训练好的智能体...

Test Score: 278.0

本文字数:0

预计阅读时间:0 分钟


统计信息加载中...

有问题?请向我提出issue