Skip to main content

PPO算法

想象一下,你在学习骑自行车。

TRPO就像是一个过于谨慎的教练,每走一步都要用复杂的数学计算来确保你不会摔倒。虽然安全,但学习进度缓慢。PPO则像是一个经验丰富的教练,用更简单有效的方法帮你保持平衡,让你快速进步。

TRPO算法虽然效果出色,但其复杂的计算过程让人头疼:首先,你需要计算二阶导数(黑塞矩阵);然后使用复杂的共轭梯度法求解约束优化问题。每一步更新都要进行线性搜索。

而PPO算法的核心洞察很精妙:我们不一定需要精确求解约束优化问题,只要保证策略更新不要太大就行。基于这个想法,PPO提出了两种简单实用的方法。

1. PPO-惩罚

PPO-惩罚的思路很直接:与其费劲地求解带约束的问题,不如把约束条件变成目标函数的一部分。就像在骑自行车时,与其严格限制车把的转动角度,不如在偏离方向时轻轻纠正。

PPO-惩罚的优化目标:

maxθE[πθ(as)πθold(as)A(s,a)]βE[DKL(πθoldπθ)]\max_{\theta} \mathbb{E} \left[ \frac{\pi_{\theta}(a|s)}{\pi_{\theta_{old}}(a|s)} A(s,a) \right] - \beta \cdot \mathbb{E}[D_{KL}(\pi_{\theta_{old}} \parallel \pi_{\theta})]

这个公式包含两个部分:第一部分 πθ(as)πθold(as)A(s,a)\frac{\pi_{\theta}(a|s)}{\pi_{\theta_{old}}(a|s)} A(s,a) 衡量策略改进的效果,第二部分 βDKL(πθoldπθ)\beta \cdot D_{KL}(\pi_{\theta_{old}} \parallel \pi_{\theta}) 惩罚策略变化太大。

β\beta 不是固定值,而是根据策略变化程度动态调整:当KL散度太小(策略更新太保守)时减小惩罚,允许更大更新;当KL散度太大(策略更新太激进)时增大惩罚,限制更新幅度;否则保持当前惩罚系数。这种自适应机制确保了策略更新既不会太小(学习太慢),也不会太大(训练不稳定)。

2. PPO-截断

Image

PPO-截断采用了更巧妙的思路:直接限制新旧策略的差异,而不是通过复杂的惩罚项。就像给策略更新装上了"保险杠",确保它不会偏离太远。

PPO-截断的目标函数:

LCLIP(θ)=E[min(r(θ)A,clip(r(θ),1ϵ,1+ϵ)A)]L^{CLIP}(\theta) = \mathbb{E} \left[ \min\left( r(\theta)A, \text{clip}\left( r(\theta), 1-\epsilon, 1+\epsilon \right)A \right) \right]

其中 r(θ)=πθ(as)πθold(as)r(\theta) = \frac{\pi_{\theta}(a|s)}{\pi_{\theta_{old}}(a|s)} 是重要性权重。

当优势函数 A>0A > 0(好动作)时,我们希望增加这个动作的概率,但不能无限制地增加:如果 r(θ)1+ϵr(\theta) \leq 1+\epsilon,使用 r(θ)Ar(\theta)A;如果 r(θ)>1+ϵr(\theta) > 1+\epsilon,使用 (1+ϵ)A(1+\epsilon)A。这就像说:"这个动作很好,可以增加它的概率,但最多只能增加到原来的 (1+ϵ)(1+\epsilon) 倍"。

当优势函数 A<0A < 0(坏动作)时,我们希望减少这个动作的概率,但不能无限制地减少:如果 r(θ)1ϵr(\theta) \geq 1-\epsilon,使用 r(θ)Ar(\theta)A;如果 r(θ)<1ϵr(\theta) < 1-\epsilon,使用 (1ϵ)A(1-\epsilon)A。这就像说:"这个动作不好,可以减少它的概率,但最多只能减少到原来的 (1ϵ)(1-\epsilon) 倍"。

与TRPO相比,PPO虽然理论保证较弱,但实现更简单、计算效率更高、超参数更鲁棒,实际效果同等优秀。在实际应用中,PPO-截断通常更受欢迎,因为它只需要调节一个超参数 ϵ\epsilon(表示进行截断的范围),实现更加简单,在大多数任务上表现优异,训练过程更加稳定。通常 ϵ=0.2\epsilon = 0.2 是一个很好的默认值,在大多数任务上都能取得不错的效果。

3. PPO算法完整实现

import gymnasium as gym  # 改为 gymnasium
import torch
import torch.nn.functional as F
import numpy as np
import matplotlib.pyplot as plt
from collections import deque
import random
from tqdm import tqdm

# 自定义工具函数,替代 rl_utils
def compute_advantage(gamma, lmbda, td_delta):
td_delta = td_delta.detach().numpy()
advantage_list = []
advantage = 0.0
for delta in td_delta[::-1]:
advantage = gamma * lmbda * advantage + delta
advantage_list.append(advantage)
advantage_list.reverse()
return torch.tensor(advantage_list, dtype=torch.float)

def moving_average(a, window_size):
cumulative_sum = np.cumsum(np.insert(a, 0, 0))
middle = (cumulative_sum[window_size:] - cumulative_sum[:-window_size]) / window_size
r = np.arange(1, window_size-1, 2)
begin = np.cumsum(a[:window_size-1])[::2] / r
end = (np.cumsum(a[:-window_size:-1])[::2] / r)[::-1]
return np.concatenate((begin, middle, end))

def train_on_policy_agent(env, agent, num_episodes):
return_list = []
for i in range(10):
with tqdm(total=int(num_episodes/10), desc='Iteration %d' % i) as pbar:
for i_episode in range(int(num_episodes/10)):
episode_return = 0
transition_dict = {
'states': [],
'actions': [],
'next_states': [],
'rewards': [],
'dones': []
}
state, _ = env.reset()
done = False
while not done:
action = agent.take_action(state)
next_state, reward, terminated, truncated, _ = env.step(action)
done = terminated or truncated
transition_dict['states'].append(state)
transition_dict['actions'].append(action)
transition_dict['next_states'].append(next_state)
transition_dict['rewards'].append(reward)
transition_dict['dones'].append(done)
state = next_state
episode_return += reward
return_list.append(episode_return)
agent.update(transition_dict)
if (i_episode+1) % 10 == 0:
pbar.set_postfix({'episode': '%d' % (num_episodes/10 * i + i_episode+1),
'return': '%.3f' % np.mean(return_list[-10:])})
pbar.update(1)
return return_list

class PolicyNet(torch.nn.Module):
def __init__(self, state_dim, hidden_dim, action_dim):
super(PolicyNet, self).__init__()
self.fc1 = torch.nn.Linear(state_dim, hidden_dim)
self.fc2 = torch.nn.Linear(hidden_dim, action_dim)

def forward(self, x):
x = F.relu(self.fc1(x))
return F.softmax(self.fc2(x), dim=1)

class ValueNet(torch.nn.Module):
def __init__(self, state_dim, hidden_dim):
super(ValueNet, self).__init__()
self.fc1 = torch.nn.Linear(state_dim, hidden_dim)
self.fc2 = torch.nn.Linear(hidden_dim, 1)

def forward(self, x):
x = F.relu(self.fc1(x))
return self.fc2(x)

class PPO:
''' PPO算法,采用截断方式 '''
def __init__(self, state_dim, hidden_dim, action_dim, actor_lr, critic_lr,
lmbda, epochs, eps, gamma, device):
self.actor = PolicyNet(state_dim, hidden_dim, action_dim).to(device)
self.critic = ValueNet(state_dim, hidden_dim).to(device)
self.actor_optimizer = torch.optim.Adam(self.actor.parameters(),
lr=actor_lr)
self.critic_optimizer = torch.optim.Adam(self.critic.parameters(),
lr=critic_lr)
self.gamma = gamma
self.lmbda = lmbda
self.epochs = epochs # 一条序列的数据用来训练轮数
self.eps = eps # PPO中截断范围的参数
self.device = device

def take_action(self, state):
state = torch.tensor([state], dtype=torch.float).to(self.device)
probs = self.actor(state)
action_dist = torch.distributions.Categorical(probs)
action = action_dist.sample()
return action.item()

def update(self, transition_dict):
states = torch.tensor(np.array(transition_dict['states']),
dtype=torch.float).to(self.device)
actions = torch.tensor(transition_dict['actions']).view(-1, 1).to(
self.device)
rewards = torch.tensor(transition_dict['rewards'],
dtype=torch.float).view(-1, 1).to(self.device)
next_states = torch.tensor(np.array(transition_dict['next_states']),
dtype=torch.float).to(self.device)
dones = torch.tensor(transition_dict['dones'],
dtype=torch.float).view(-1, 1).to(self.device)
td_target = rewards + self.gamma * self.critic(next_states) * (1 -
dones)
td_delta = td_target - self.critic(states)
advantage = compute_advantage(self.gamma, self.lmbda,
td_delta.cpu()).to(self.device)
old_log_probs = torch.log(self.actor(states).gather(1,
actions)).detach()

for _ in range(self.epochs):
log_probs = torch.log(self.actor(states).gather(1, actions))
ratio = torch.exp(log_probs - old_log_probs)
surr1 = ratio * advantage
surr2 = torch.clamp(ratio, 1 - self.eps,
1 + self.eps) * advantage # 截断
actor_loss = torch.mean(-torch.min(surr1, surr2)) # PPO损失函数
critic_loss = torch.mean(
F.mse_loss(self.critic(states), td_target.detach()))
self.actor_optimizer.zero_grad()
self.critic_optimizer.zero_grad()
actor_loss.backward()
critic_loss.backward()
self.actor_optimizer.step()
self.critic_optimizer.step()

# 主程序
if __name__ == "__main__":
# 设置参数
actor_lr = 1e-3
critic_lr = 1e-2
num_episodes = 500
hidden_dim = 128
gamma = 0.98
lmbda = 0.95
epochs = 10
eps = 0.2
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")

# 创建环境
env_name = 'CartPole-v1' # 使用 v1 版本
env = gym.make(env_name)

# 设置随机种子
random.seed(0)
np.random.seed(0)
torch.manual_seed(0)

# 获取状态和动作维度
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n

# 创建PPO智能体
agent = PPO(state_dim, hidden_dim, action_dim, actor_lr, critic_lr, lmbda,
epochs, eps, gamma, device)

# 训练智能体
return_list = train_on_policy_agent(env, agent, num_episodes)

# 绘制结果
episodes_list = list(range(len(return_list)))

plt.figure(figsize=(12, 5))

# 原始回报曲线
plt.subplot(1, 2, 1)
plt.plot(episodes_list, return_list)
plt.xlabel('Episodes')
plt.ylabel('Returns')
plt.title('PPO on {}'.format(env_name))
plt.grid(True, alpha=0.3)

# 移动平均回报曲线
plt.subplot(1, 2, 2)
mv_return = moving_average(return_list, 9)
plt.plot(episodes_list[:len(mv_return)], mv_return)
plt.xlabel('Episodes')
plt.ylabel('Returns')
plt.title('PPO on {} (Moving Average)'.format(env_name))
plt.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

# 打印训练统计信息
print(f"训练完成,最终10个回合平均回报: {np.mean(return_list[-10:]):.2f}")
print(f"最大回报: {np.max(return_list)}")
print(f"平均回报: {np.mean(return_list):.2f} ± {np.std(return_list):.2f}")
运行结果

Image

(.venv) PS F:\BLOG\ROT-Blog\docs\Control\强化学习> python .\1.py
Iteration 0: 0%| | 0/50 [00:00<?, ?it/s]F:\BLOG\ROT-Blog\docs\Control\强化学习\1.py:100: UserWarning: Creating a tensor from a list of numpy.ndarrays is extremely slow. Please consider converting the
list to a single numpy.ndarray with numpy.array() before converting to a tensor. (Triggered internally at C:\actions-runner\_work\pytorch\pytorch\pytorch\torch\csrc\utils\tensor_new.cpp:256.)
state = torch.tensor([state], dtype=torch.float).to(self.device)
Iteration 0: 100%|████████████████████████████████████████████████████████████████████████████████| 50/50 [00:03<00:00, 12.91it/s, episode=50, return=229.600]
Iteration 1: 100%|███████████████████████████████████████████████████████████████████████████████| 50/50 [00:04<00:00, 11.83it/s, episode=100, return=198.000]
Iteration 2: 100%|███████████████████████████████████████████████████████████████████████████████| 50/50 [00:07<00:00, 6.44it/s, episode=150, return=399.000]
Iteration 3: 100%|███████████████████████████████████████████████████████████████████████████████| 50/50 [00:08<00:00, 5.94it/s, episode=200, return=453.200]
Iteration 4: 100%|███████████████████████████████████████████████████████████████████████████████| 50/50 [00:09<00:00, 5.38it/s, episode=250, return=479.400]
Iteration 5: 100%|███████████████████████████████████████████████████████████████████████████████| 50/50 [00:09<00:00, 5.15it/s, episode=300, return=500.000]
Iteration 6: 100%|███████████████████████████████████████████████████████████████████████████████| 50/50 [00:09<00:00, 5.32it/s, episode=350, return=455.500]
Iteration 7: 100%|███████████████████████████████████████████████████████████████████████████████| 50/50 [00:09<00:00, 5.19it/s, episode=400, return=480.600]
Iteration 8: 100%|███████████████████████████████████████████████████████████████████████████████| 50/50 [00:09<00:00, 5.15it/s, episode=450, return=500.000]
Iteration 9: 100%|███████████████████████████████████████████████████████████████████████████████| 50/50 [00:09<00:00, 5.06it/s, episode=500, return=500.000]
训练完成,最终10个回合平均回报: 500.00
最大回报: 500.0
平均回报: 401.23 ± 147.74

本文字数:0

预计阅读时间:0 分钟


统计信息加载中...

有问题?请向我提出issue