Skip to main content

DDPG算法

1. DDPG算法

之前的章节介绍了基于策略梯度的算法 REINFORCE、Actor-Critic 以及两个改进算法——TRPO 和 PPO。这类算法有一个共同的特点:它们都是在线策略算法,这意味着它们的样本效率比较低。

我们回忆一下 DQN 算法,DQN 算法直接估计最优函数 Q,可以做到离线策略学习,但是它只能处理动作空间有限的环境,这是因为它需要从所有动作中挑选一个值最大的动作。如果动作个数是无限的,虽然我们可以将动作空间离散化,但这比较粗糙,无法精细控制。

深度确定性策略梯度(DDPG)算法解决了这个问题,它构造一个确定性策略,用梯度上升的方法来最大化 Q 值。DDPG 也属于一种 Actor-Critic 算法。我们之前学习的 REINFORCE、TRPO 和 PPO 学习随机性策略,而 DDPG 则学习一个确定性策略。

1.1 确定性策略与随机性策略

之前我们学习的策略是随机性的,可以表示为 π(as)\pi(a|s);而如果策略是确定性的,则可以记为 a=μ(s)a = \mu(s)。与策略梯度定理类似,我们可以推导出确定性策略梯度定理:

θJ(μθ)=Esρβ[θμθ(s)aQμ(s,a)a=μθ(s)]\nabla_\theta J(\mu_\theta) = \mathbb{E}_{s \sim \rho^\beta} \left[ \nabla_\theta \mu_\theta(s) \nabla_a Q^\mu(s,a)|_{a=\mu_\theta(s)} \right]

其中 β\beta 是用来收集数据的行为策略。

1.2 Actor-Critic 框架

我们可以这样理解这个定理:假设现在已经有 Q 函数,给定一个状态,但由于现在动作空间是无限的,无法通过遍历所有动作来得到 Q 值最大的动作,因此我们想用策略找到使 Q 值最大的动作,即 maxaQ(s,a)\max_a Q(s,a)。此时,Q 函数就是 Critic,策略 μ\mu 就是 Actor,这是一个 Actor-Critic 的框架。

1.3 目标网络与软更新

DDPG 要用到 4 个神经网络,其中 Actor 和 Critic 各用一个网络,此外它们都各自有一个目标网络。DDPG 中目标网络的更新采取软更新的方式:

θτθ+(1τ)θ\theta^- \leftarrow \tau \theta + (1-\tau)\theta^-ωτω+(1τ)ω\omega^- \leftarrow \tau \omega + (1-\tau)\omega^-

通常 τ\tau 是一个比较小的数(如 0.001),当 τ=1\tau=1 时,就和 DQN 的更新方式一致了。

1.4 探索策略

由于 DDPG 采用的是确定性策略,它本身的探索仍然十分有限。DDPG 在行为策略上引入一个随机噪声 N\mathcal{N} 来进行探索:

at=μ(stθμ)+Nta_t = \mu(s_t|\theta^\mu) + \mathcal{N}_t

2. 算法流程

DDPG 的具体算法流程如下:

  1. 初始化

    • 用随机的网络参数 ω\omegaθ\theta 分别初始化 Critic 网络 Q(s,aω)Q(s,a|\omega) 和 Actor 网络 μ(sθ)\mu(s|\theta)
    • 复制相同的参数 ωω\omega^- \leftarrow \omegaθθ\theta^- \leftarrow \theta,分别初始化目标网络 QQ^-μ\mu^-
    • 初始化经验回放池 R\mathcal{R}
  2. 循环每个序列 e=1e=1MM

    • 初始化随机过程 N\mathcal{N} 用于动作探索
    • 获取环境初始状态 s1s_1
    • 循环每个时间步 t=1t=1TT
      • 根据当前策略和噪声选择动作 at=μ(stθ)+Nta_t = \mu(s_t|\theta) + \mathcal{N}_t
      • 执行动作 ata_t,获得奖励 rtr_t,环境状态变为 st+1s_{t+1}
      • 将转移 (st,at,rt,st+1)(s_t, a_t, r_t, s_{t+1}) 存储进回放池 R\mathcal{R}
      • R\mathcal{R} 中采样 NN 个元组 (si,ai,ri,si+1)(s_i, a_i, r_i, s_{i+1})
      • 对每个元组,用目标网络计算目标值: yi=ri+γQ(si+1,μ(si+1θ)ω)y_i = r_i + \gamma Q^-(s_{i+1}, \mu^-(s_{i+1}|\theta^-)|\omega^-)
      • 最小化目标损失更新 Critic 网络: L=1Ni(yiQ(si,aiω))2L = \frac{1}{N} \sum_i (y_i - Q(s_i,a_i|\omega))^2
      • 计算采样的策略梯度更新 Actor 网络: θJ1NiaQ(s,aω)s=si,a=μ(si)θμ(sθ)si\nabla_\theta J \approx \frac{1}{N} \sum_i \nabla_a Q(s,a|\omega)|_{s=s_i,a=\mu(s_i)} \nabla_\theta \mu(s|\theta)|_{s_i}
      • 更新目标网络: ωτω+(1τ)ω\omega^- \leftarrow \tau \omega + (1-\tau)\omega^- θτθ+(1τ)θ\theta^- \leftarrow \tau \theta + (1-\tau)\theta^-
  3. 结束循环

3. 确定性策略梯度推导

确定性策略梯度定理的推导基于链式法则。我们的目标是最大化期望回报:

J(μθ)=Esρμ[Qμ(s,μθ(s))]J(\mu_\theta) = \mathbb{E}_{s \sim \rho^\mu} [Q^\mu(s,\mu_\theta(s))]

对其求梯度:

θJ(μθ)=Esρμ[θQμ(s,μθ(s))]\nabla_\theta J(\mu_\theta) = \mathbb{E}_{s \sim \rho^\mu} [\nabla_\theta Q^\mu(s,\mu_\theta(s))]

应用链式法则:

θQμ(s,μθ(s))=θμθ(s)aQμ(s,a)a=μθ(s)\nabla_\theta Q^\mu(s,\mu_\theta(s)) = \nabla_\theta \mu_\theta(s) \nabla_a Q^\mu(s,a)|_{a=\mu_\theta(s)}

因此:

θJ(μθ)=Esρμ[θμθ(s)aQμ(s,a)a=μθ(s)]\nabla_\theta J(\mu_\theta) = \mathbb{E}_{s \sim \rho^\mu} [\nabla_\theta \mu_\theta(s) \nabla_a Q^\mu(s,a)|_{a=\mu_\theta(s)}]

在实际算法中,我们使用行为策略 β\beta 来收集样本,因此最终的梯度估计为:

θJ(μθ)Esρβ[θμθ(s)aQμ(s,a)a=μθ(s)]\nabla_\theta J(\mu_\theta) \approx \mathbb{E}_{s \sim \rho^\beta} [\nabla_\theta \mu_\theta(s) \nabla_a Q^\mu(s,a)|_{a=\mu_\theta(s)}]

DDPG 算法通过结合确定性策略梯度和深度神经网络,成功地将 DQN 的思想扩展到了连续动作空间,成为深度强化学习中的重要算法之一。

4. DDPG算法完整实现

import random
import gymnasium as gym
import numpy as np
from tqdm import tqdm
import torch
from torch import nn
import torch.nn.functional as F
import matplotlib.pyplot as plt
from collections import deque

# 自定义工具函数
class ReplayBuffer:
def __init__(self, capacity):
self.buffer = deque(maxlen=capacity)

def add(self, state, action, reward, next_state, done):
self.buffer.append((state, action, reward, next_state, done))

def sample(self, batch_size):
transitions = random.sample(self.buffer, batch_size)
states, actions, rewards, next_states, dones = zip(*transitions)
return (np.array(states), np.array(actions), np.array(rewards),
np.array(next_states), np.array(dones))

def size(self):
return len(self.buffer)

def moving_average(a, window_size):
cumulative_sum = np.cumsum(np.insert(a, 0, 0))
middle = (cumulative_sum[window_size:] - cumulative_sum[:-window_size]) / window_size
r = np.arange(1, window_size-1, 2)
begin = np.cumsum(a[:window_size-1])[::2] / r
end = (np.cumsum(a[:-window_size:-1])[::2] / r)[::-1]
return np.concatenate((begin, middle, end))

def train_off_policy_agent(env, agent, num_episodes, replay_buffer, minimal_size, batch_size):
return_list = []
for i in range(10):
with tqdm(total=int(num_episodes/10), desc='Iteration %d' % i) as pbar:
for i_episode in range(int(num_episodes/10)):
episode_return = 0
state, _ = env.reset()
done = False
while not done:
action = agent.take_action(state)
# 确保动作是标量值
if isinstance(action, np.ndarray):
action = action.item() if action.size == 1 else action[0]

next_state, reward, terminated, truncated, _ = env.step([action])
done = terminated or truncated
replay_buffer.add(state, action, float(reward), next_state, done)
state = next_state
episode_return += reward

if replay_buffer.size() > minimal_size:
b_s, b_a, b_r, b_ns, b_d = replay_buffer.sample(batch_size)
transition_dict = {
'states': b_s,
'actions': b_a,
'rewards': b_r,
'next_states': b_ns,
'dones': b_d
}
agent.update(transition_dict)
return_list.append(episode_return)
if (i_episode+1) % 10 == 0:
pbar.set_postfix({
'episode': '%d' % (num_episodes/10 * i + i_episode+1),
'return': '%.3f' % np.mean(return_list[-10:])
})
pbar.update(1)
return return_list

class PolicyNet(torch.nn.Module):
def __init__(self, state_dim, hidden_dim, action_dim, action_bound):
super(PolicyNet, self).__init__()
self.fc1 = torch.nn.Linear(state_dim, hidden_dim)
self.fc2 = torch.nn.Linear(hidden_dim, hidden_dim)
self.fc3 = torch.nn.Linear(hidden_dim, action_dim)
self.action_bound = action_bound

def forward(self, x):
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
return torch.tanh(self.fc3(x)) * self.action_bound

class QValueNet(torch.nn.Module):
def __init__(self, state_dim, hidden_dim, action_dim):
super(QValueNet, self).__init__()
self.fc1 = torch.nn.Linear(state_dim + action_dim, hidden_dim)
self.fc2 = torch.nn.Linear(hidden_dim, hidden_dim)
self.fc_out = torch.nn.Linear(hidden_dim, 1)

def forward(self, x, a):
# 确保动作张量形状正确
if a.dim() == 1:
a = a.unsqueeze(1)
cat = torch.cat([x, a], dim=1)
x = F.relu(self.fc1(cat))
x = F.relu(self.fc2(x))
return self.fc_out(x)

class DDPG:
def __init__(self, state_dim, hidden_dim, action_dim, action_bound, sigma, actor_lr, critic_lr, tau, gamma, device):
self.actor = PolicyNet(state_dim, hidden_dim, action_dim, action_bound).to(device)
self.critic = QValueNet(state_dim, hidden_dim, action_dim).to(device)
self.target_actor = PolicyNet(state_dim, hidden_dim, action_dim, action_bound).to(device)
self.target_critic = QValueNet(state_dim, hidden_dim, action_dim).to(device)

# 初始化目标网络
self.target_critic.load_state_dict(self.critic.state_dict())
self.target_actor.load_state_dict(self.actor.state_dict())

self.actor_optimizer = torch.optim.Adam(self.actor.parameters(), lr=actor_lr)
self.critic_optimizer = torch.optim.Adam(self.critic.parameters(), lr=critic_lr)
self.gamma = gamma
self.sigma = sigma
self.tau = tau
self.action_dim = action_dim
self.device = device

def take_action(self, state):
# 修复张量创建问题
state_tensor = torch.as_tensor(state, dtype=torch.float32, device=self.device).unsqueeze(0)
with torch.no_grad():
action = self.actor(state_tensor).cpu().numpy()[0]
# 添加噪声
action = action + self.sigma * np.random.randn(self.action_dim)
# 确保动作在有效范围内
action = np.clip(action, -self.actor.action_bound, self.actor.action_bound)
return action

def soft_update(self, net, target_net):
for param_target, param in zip(target_net.parameters(), net.parameters()):
param_target.data.copy_(param_target.data * (1.0 - self.tau) + param.data * self.tau)

def update(self, transition_dict):
# 修复张量形状问题
states = torch.as_tensor(transition_dict['states'], dtype=torch.float32, device=self.device)
actions = torch.as_tensor(transition_dict['actions'], dtype=torch.float32, device=self.device)
rewards = torch.as_tensor(transition_dict['rewards'], dtype=torch.float32, device=self.device).view(-1, 1)
next_states = torch.as_tensor(transition_dict['next_states'], dtype=torch.float32, device=self.device)
dones = torch.as_tensor(transition_dict['dones'], dtype=torch.float32, device=self.device).view(-1, 1)

# 确保动作有正确的维度
if actions.dim() == 1:
actions = actions.unsqueeze(1)

# 更新Critic
with torch.no_grad():
next_actions = self.target_actor(next_states)
next_q_values = self.target_critic(next_states, next_actions)
q_targets = rewards + self.gamma * next_q_values * (1 - dones)

current_q_values = self.critic(states, actions)
critic_loss = F.mse_loss(current_q_values, q_targets)

self.critic_optimizer.zero_grad()
critic_loss.backward()
self.critic_optimizer.step()

# 更新Actor
actor_actions = self.actor(states)
actor_loss = -self.critic(states, actor_actions).mean()

self.actor_optimizer.zero_grad()
actor_loss.backward()
self.actor_optimizer.step()

# 软更新目标网络
self.soft_update(self.actor, self.target_actor)
self.soft_update(self.critic, self.target_critic)

# 主程序
if __name__ == "__main__":
# 设置参数
actor_lr = 3e-4
critic_lr = 3e-3
num_episodes = 200
hidden_dim = 64
gamma = 0.98
tau = 0.005
buffer_size = 10000
minimal_size = 1000
batch_size = 64
sigma = 0.1 # 增加噪声以促进探索
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")

# 创建环境
env_name = 'Pendulum-v1'
env = gym.make(env_name)

# 设置随机种子
random.seed(0)
np.random.seed(0)
torch.manual_seed(0)

# 获取环境参数
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.shape[0]
action_bound = env.action_space.high[0]

print(f"环境: {env_name}")
print(f"状态维度: {state_dim}, 动作维度: {action_dim}, 动作边界: {action_bound}")

# 创建回放缓冲区和智能体
replay_buffer = ReplayBuffer(buffer_size)
agent = DDPG(state_dim, hidden_dim, action_dim, action_bound, sigma,
actor_lr, critic_lr, tau, gamma, device)

# 训练智能体
print("开始训练DDPG智能体...")
return_list = train_off_policy_agent(env, agent, num_episodes, replay_buffer, minimal_size, batch_size)

# 绘制结果
episodes_list = list(range(len(return_list)))

plt.figure(figsize=(12, 5))

# 原始回报曲线
plt.subplot(1, 2, 1)
plt.plot(episodes_list, return_list)
plt.xlabel('Episodes')
plt.ylabel('Returns')
plt.title('DDPG on {}'.format(env_name))
plt.grid(True, alpha=0.3)

# 移动平均回报曲线
plt.subplot(1, 2, 2)
mv_return = moving_average(return_list, 9)
plt.plot(episodes_list[:len(mv_return)], mv_return)
plt.xlabel('Episodes')
plt.ylabel('Returns')
plt.title('DDPG on {} (Moving Average)'.format(env_name))
plt.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

# 打印训练统计信息
print(f"\n训练完成!")
print(f"最终10个回合平均回报: {np.mean(return_list[-10:]):.2f}")
print(f"最大回报: {np.max(return_list):.2f}")
print(f"平均回报: {np.mean(return_list):.2f} ± {np.std(return_list):.2f}")

env.close()
运行结果
Image
(.venv) PS F:\BLOG\ROT-Blog\docs\Control\强化学习> python .\1.py
环境: Pendulum-v1
状态维度: 3, 动作维度: 1, 动作边界: 2.0
开始训练DDPG智能体...
Iteration 0: 100%|██████████████████████████████████████████████████████████████████████████████| 20/20 [00:12<00:00, 1.62it/s, episode=20, return=-1250.105]
Iteration 1: 100%|███████████████████████████████████████████████████████████████████████████████| 20/20 [00:16<00:00, 1.21it/s, episode=40, return=-831.800]
Iteration 2: 100%|███████████████████████████████████████████████████████████████████████████████| 20/20 [00:15<00:00, 1.26it/s, episode=60, return=-201.454]
Iteration 3: 100%|███████████████████████████████████████████████████████████████████████████████| 20/20 [00:15<00:00, 1.33it/s, episode=80, return=-184.158]
Iteration 4: 100%|██████████████████████████████████████████████████████████████████████████████| 20/20 [00:15<00:00, 1.31it/s, episode=100, return=-156.415]
Iteration 5: 100%|██████████████████████████████████████████████████████████████████████████████| 20/20 [00:15<00:00, 1.33it/s, episode=120, return=-153.596]
Iteration 6: 100%|██████████████████████████████████████████████████████████████████████████████| 20/20 [00:15<00:00, 1.26it/s, episode=140, return=-177.353]
Iteration 7: 100%|██████████████████████████████████████████████████████████████████████████████| 20/20 [00:15<00:00, 1.30it/s, episode=160, return=-211.238]
Iteration 8: 100%|██████████████████████████████████████████████████████████████████████████████| 20/20 [00:15<00:00, 1.33it/s, episode=180, return=-191.531]
Iteration 9: 100%|██████████████████████████████████████████████████████████████████████████████| 20/20 [00:15<00:00, 1.30it/s, episode=200, return=-141.028]

训练完成!
最终10个回合平均回报: -141.03
最大回报: -0.17
平均回报: -388.66 ± 437.20