跳到主要内容

TRPO算法

想象一下你在学习走钢丝——这就是传统的策略梯度算法。你小心翼翼地迈出每一步,但有时候步子太大,就会突然失去平衡摔下来。这种"一步走错,满盘皆输"的情况在深度强化学习中同样存在:当策略网络是深度模型时,沿着梯度方向更新参数,很可能因为步长太大导致策略性能急剧下降。

TRPO算法就像给走钢丝的演员系上了安全绳。它提出了一个精妙的思路:在参数更新时划定一个"信任区域",在这个安全区域内进行策略改进,确保每次更新都能稳定提升性能。

1. TRPO 算法

让我们从数学的角度来理解TRPO的智慧。假设当前策略为 πθ\pi_\theta,参数为 θ\theta,我们的目标是找到一个新参数 θ\theta',使得新策略 πθ\pi_{\theta'} 的性能不低于旧策略,即 J(θ)J(θ)J(\theta') \geq J(\theta)。其中,目标函数 J(θ)J(\theta) 定义为初始状态的价值期望:

J(θ)=Es0[Vπθ(s0)]J(\theta) = \mathbb{E}_{s_0} [V^{\pi_\theta}(s_0)]

为了比较新旧策略的性能差异,我们使用性能差异引理(Performance Difference Lemma)。该引理指出:

J(θ)J(θ)=Eτπθ[t=0γtAπθ(st,at)]J(\theta') - J(\theta) = \mathbb{E}_{\tau \sim \pi_{\theta'}} \left[ \sum_{t=0}^{\infty} \gamma^t A^{\pi_\theta}(s_t, a_t) \right]

其中,Aπθ(st,at)=Qπθ(st,at)Vπθ(st)A^{\pi_\theta}(s_t, a_t) = Q^{\pi_\theta}(s_t, a_t) - V^{\pi_\theta}(s_t) 是优势函数,表示在状态 sts_t 下采取动作 ata_t 相对于平均水平的优势。

这个公式告诉我们:如果能保证新策略在每个状态下的期望优势函数都非负,那么策略性能一定会提升。但问题在于,新策略πθ\pi_{\theta'}还没有与环境交互,我们无法直接计算这个期望。

从状态价值函数的定义出发,对于任意状态 ss,有:

Vπθ(s)Vπθ(s)=Eπθ[t=0γtAπθ(st,at)s0=s]V^{\pi_{\theta'}}(s) - V^{\pi_\theta}(s) = \mathbb{E}_{\pi_{\theta'}} \left[ \sum_{t=0}^{\infty} \gamma^t A^{\pi_\theta}(s_t, a_t) \mid s_0 = s \right]

这是因为:

Eπθ[t=0γtAπθ(st,at)s0=s]=Eπθ[t=0γt(r(st,at)+γVπθ(st+1)Vπθ(st))s0=s]=Eπθ[t=0γtr(st,at)+t=0γt+1Vπθ(st+1)t=0γtVπθ(st)s0=s]=Eπθ[t=0γtr(st,at)+t=1γtVπθ(st)t=0γtVπθ(st)s0=s]=Eπθ[t=0γtr(st,at)Vπθ(s)s0=s]=Vπθ(s)Vπθ(s)\begin{aligned} &\mathbb{E}_{\pi_{\theta'}} \left[ \sum_{t=0}^{\infty} \gamma^t A^{\pi_\theta}(s_t, a_t) \mid s_0 = s \right] \\ &= \mathbb{E}_{\pi_{\theta'}} \left[ \sum_{t=0}^{\infty} \gamma^t \left( r(s_t, a_t) + \gamma V^{\pi_\theta}(s_{t+1}) - V^{\pi_\theta}(s_t) \right) \mid s_0 = s \right] \\ &= \mathbb{E}_{\pi_{\theta'}} \left[ \sum_{t=0}^{\infty} \gamma^t r(s_t, a_t) + \sum_{t=0}^{\infty} \gamma^{t+1} V^{\pi_\theta}(s_{t+1}) - \sum_{t=0}^{\infty} \gamma^t V^{\pi_\theta}(s_t) \mid s_0 = s \right] \\ &= \mathbb{E}_{\pi_{\theta'}} \left[ \sum_{t=0}^{\infty} \gamma^t r(s_t, a_t) + \sum_{t=1}^{\infty} \gamma^t V^{\pi_\theta}(s_t) - \sum_{t=0}^{\infty} \gamma^t V^{\pi_\theta}(s_t) \mid s_0 = s \right] \\ &= \mathbb{E}_{\pi_{\theta'}} \left[ \sum_{t=0}^{\infty} \gamma^t r(s_t, a_t) - V^{\pi_\theta}(s) \mid s_0 = s \right] \\ &= V^{\pi_{\theta'}}(s) - V^{\pi_\theta}(s) \end{aligned}

取初始状态 s0s_0 的期望,即得:

J(θ)J(θ)=Es0[Vπθ(s0)Vπθ(s0)]=Eτπθ[t=0γtAπθ(st,at)]J(\theta') - J(\theta) = \mathbb{E}_{s_0} \left[ V^{\pi_{\theta'}}(s_0) - V^{\pi_\theta}(s_0) \right] = \mathbb{E}_{\tau \sim \pi_{\theta'}} \left[ \sum_{t=0}^{\infty} \gamma^t A^{\pi_\theta}(s_t, a_t) \right]

接下来,我们将轨迹期望转换为状态访问分布的形式。定义未折扣状态访问分布 νπθ(s)\nu^{\pi_{\theta'}}(s) 为:

νπθ(s)=(1γ)t=0γtPtπθ(s)\nu^{\pi_{\theta'}}(s) = (1 - \gamma) \sum_{t=0}^{\infty} \gamma^t P_t^{\pi_{\theta'}}(s)

其中 Ptπθ(s)P_t^{\pi_{\theta'}}(s) 是策略 πθ\pi_{\theta'} 下在时间步 tt 状态为 ss 的概率。于是:

J(θ)J(θ)=t=0γtEstPtπθEatπθ(st)[Aπθ(st,at)]=11γEsνπθEaπθ(s)[Aπθ(s,a)]\begin{aligned} J(\theta') - J(\theta) &= \sum_{t=0}^{\infty} \gamma^t \mathbb{E}_{s_t \sim P_t^{\pi_{\theta'}}} \mathbb{E}_{a_t \sim \pi_{\theta'}(\cdot|s_t)} \left[ A^{\pi_\theta}(s_t, a_t) \right] \\ &= \frac{1}{1 - \gamma} \mathbb{E}_{s \sim \nu^{\pi_{\theta'}}} \mathbb{E}_{a \sim \pi_{\theta'}(\cdot|s)} \left[ A^{\pi_\theta}(s, a) \right] \end{aligned}

因此,要保证 J(θ)J(θ)J(\theta') \geq J(\theta),只需:

EsνπθEaπθ(s)[Aπθ(s,a)]0\mathbb{E}_{s \sim \nu^{\pi_{\theta'}}} \mathbb{E}_{a \sim \pi_{\theta'}(\cdot|s)} \left[ A^{\pi_\theta}(s, a) \right] \geq 0

理想情况下,如果对于所有状态 ss,有 Eaπθ(s)[Aπθ(s,a)]0\mathbb{E}_{a \sim \pi_{\theta'}(\cdot|s)} [A^{\pi_\theta}(s, a)] \geq 0,则性能单调递增。

巧妙近似

直接优化上述条件是不现实的,因为新策略 πθ\pi_{\theta'} 的状态访问分布 νπθ\nu^{\pi_{\theta'}} 未知。TRPO采用近似方法,用旧策略 πθ\pi_\theta 的状态分布 νπθ\nu^{\pi_\theta} 替代 νπθ\nu^{\pi_{\theta'}},并定义替代目标:

Lθ(θ)=EsνπθEaπθ(s)[Aπθ(s,a)]L_\theta(\theta') = \mathbb{E}_{s \sim \nu^{\pi_\theta}} \mathbb{E}_{a \sim \pi_{\theta'}(\cdot|s)} \left[ A^{\pi_\theta}(s, a) \right]

为了处理动作分布,使用重要性采样:

Lθ(θ)=EsνπθEaπθ(s)[πθ(as)πθ(as)Aπθ(s,a)]L_\theta(\theta') = \mathbb{E}_{s \sim \nu^{\pi_\theta}} \mathbb{E}_{a \sim \pi_\theta(\cdot|s)} \left[ \frac{\pi_{\theta'}(a|s)}{\pi_\theta(a|s)} A^{\pi_\theta}(s, a) \right]

为了保证近似准确性,TRPO引入KL散度约束,形成优化问题:

maxθLθ(θ)s.t.Esνπθ[DKL(πθ(s)πθ(s))]δ\begin{aligned} \max_{\theta'} \quad & L_\theta(\theta') \\ \text{s.t.} \quad & \mathbb{E}_{s \sim \nu^{\pi_\theta}} \left[ D_{\text{KL}}(\pi_\theta(\cdot|s) \parallel \pi_{\theta'}(\cdot|s)) \right] \leq \delta \end{aligned}

其中 δ\delta 是信任区域半径。该约束确保新旧策略足够接近,从而状态访问分布变化不大,替代目标 Lθ(θ)L_\theta(\theta')J(θ)J(θ)J(\theta') - J(\theta) 的合理近似。

Image

2. 约束优化问题

直接求解带约束的优化问题比较复杂,TRPO在实际实现中采用泰勒展开进行近似求解。为方便表示,我们用θk\theta_k代替之前的θ\theta,表示第kk次迭代后的策略。

2.1 泰勒展开近似

对目标函数和约束在θk\theta_k处进行泰勒展开:

  • 目标函数的一阶近似

    Lθk(θ)Lθk(θk)+gT(θθk)L_{\theta_k}(\theta) \approx L_{\theta_k}(\theta_k) + g^T(\theta - \theta_k)

    其中g=θLθk(θ)θ=θkg = \nabla_\theta L_{\theta_k}(\theta)|_{\theta=\theta_k}是目标函数的梯度。

  • 约束条件的二阶近似

    DˉKL(θkθ)12(θθk)TH(θθk)\bar{D}_{KL}(\theta_k \parallel \theta) \approx \frac{1}{2}(\theta - \theta_k)^T H (\theta - \theta_k)

    其中HH是策略之间平均KL距离的黑塞矩阵。

2.2 近似优化问题

经过近似后,优化问题变为:

maxθgT(θθk)s.t.12(θθk)TH(θθk)δ\begin{aligned} \max_{\theta} \quad & g^T(\theta - \theta_k) \\ \text{s.t.} \quad & \frac{1}{2}(\theta - \theta_k)^T H (\theta - \theta_k) \leq \delta \end{aligned}

2.3 KKT条件求解

使用KKT条件可以直接导出上述问题的解:

θθk=2δgTH1gH1g\theta - \theta_k = \sqrt{\frac{2\delta}{g^T H^{-1} g}} H^{-1} g

3. 共轭梯度法

策略网络通常有成千上万的参数,直接计算和存储黑塞矩阵HH的逆矩阵H1H^{-1}在计算上是不可行的。

TRPO通过共轭梯度法直接计算x=H1gx = H^{-1} g,即参数更新方向。假设满足KL距离约束的最大步长为β\beta,根据约束条件有:

12(βx)TH(βx)=δ\frac{1}{2}(\beta x)^T H (\beta x) = \delta

求解得到:

β=2δxTHx\beta = \sqrt{\frac{2\delta}{x^T H x}}

参数更新方式为:

θk+1=θk+βx\theta_{k+1} = \theta_k + \beta x

在共轭梯度运算中,我们避免直接计算和存储黑塞矩阵HH,而是计算向量HpHp。对于任意向量vv,可以通过以下方式计算:

Hv=θ[(θDˉKL(θkθ))Tv]θ=θkHv = \nabla_\theta \left[ (\nabla_\theta \bar{D}_{KL}(\theta_k \parallel \theta))^T v \right]|_{\theta=\theta_k}

即先用梯度和向量vv点乘后计算梯度。

共轭梯度法的具体流程如下:

  1. 初始化x0=0x_0 = 0, r0=gr_0 = g, p0=r0p_0 = r_0

  2. 迭代循环 for i=0,1,i = 0, 1, \dots

    • 计算 αi=riTripiTHpi\alpha_i = \frac{r_i^T r_i}{p_i^T H p_i}
    • 更新 xi+1=xi+αipix_{i+1} = x_i + \alpha_i p_i
    • 更新 ri+1=riαiHpir_{i+1} = r_i - \alpha_i H p_i
    • 如果 ri+1||r_{i+1}|| 非常小,则退出循环
    • 计算 βi=ri+1Tri+1riTri\beta_i = \frac{r_{i+1}^T r_{i+1}}{r_i^T r_i}
    • 更新 pi+1=ri+1+βipip_{i+1} = r_{i+1} + \beta_i p_i
  3. 输出x=xi+1x = x_{i+1}

4. 线性搜索

由于TRPO使用了泰勒展开的1阶和2阶近似,这些近似并不精确,因此θk+1\theta_{k+1}可能未必比θk\theta_k好,或者未必满足KL散度限制。

TRPO在每次迭代的最后进行一次线性搜索,以确保找到满足条件的参数更新。具体步骤是找到一个最小的非负整数ii,使得按照:

θk+1=θk+αiβx\theta_{k+1} = \theta_k + \alpha^i \beta x

求出的θk+1\theta_{k+1}依然满足KL散度限制,并且能够提升目标函数。其中α(0,1)\alpha \in (0,1)是一个决定线性搜索长度的超参数。

5. 算法流程

TRPO算法的完整流程如下:

  1. 初始化策略网络参数θ\theta,价值网络参数ϕ\phi

  2. 循环每个序列 k=1,2,k = 1, 2, \dots

    a. 轨迹采样:用当前策略πθk\pi_{\theta_k}采样轨迹

    b. 优势估计:根据收集到的数据和价值网络估计每个状态动作对的优势函数Aπθk(s,a)A^{\pi_{\theta_k}}(s,a)

    c. 梯度计算:计算策略目标函数的梯度g=θLθk(θ)θ=θkg = \nabla_\theta L_{\theta_k}(\theta)|_{\theta=\theta_k}

    d. 共轭梯度:用共轭梯度法计算x=H1gx = H^{-1} g

    e. 线性搜索:找到一个ii值,并更新策略网络参数:

    θk+1=θk+αiβx\theta_{k+1} = \theta_k + \alpha^i \beta x

    其中ii为能提升策略并满足KL距离限制的最小整数

    f. 价值网络更新:更新价值网络参数(与Actor-Critic中的更新方法相同)

  3. 结束循环

6. 广义优势估计

在TRPO中,我们需要估计优势函数Aπθ(s,a)A^{\pi_\theta}(s,a)。常用的方法是广义优势估计(GAE)。首先定义时序差分误差:

δt=rt+γV(st+1)V(st)\delta_t = r_t + \gamma V(s_{t+1}) - V(s_t)

其中VV是一个已经学习的状态价值函数。

根据多步时序差分的思想,kk步优势估计为:

At(k)=l=0k1γlδt+lA_t^{(k)} = \sum_{l=0}^{k-1} \gamma^l \delta_{t+l}

GAE将这些不同步数的优势估计进行指数加权平均:

AtGAE(γ,λ)=l=0(γλ)lδt+lA_t^{GAE(\gamma,\lambda)} = \sum_{l=0}^{\infty} (\gamma\lambda)^l \delta_{t+l}

其中λ[0,1]\lambda \in [0,1]是GAE中引入的超参数。

  • λ=0\lambda=0时,AtGAE=δtA_t^{GAE} = \delta_t,即只考虑一步差分得到的优势
  • λ=1\lambda=1时,AtGAE=l=0γlδt+lA_t^{GAE} = \sum_{l=0}^{\infty} \gamma^l \delta_{t+l},即考虑所有步数差分得到优势的完全平均值

通过调整λ\lambda,可以在偏差和方差之间进行权衡,获得更稳定的优势估计。

def compute_advantage(gamma, lmbda, td_delta):
td_delta = td_delta.detach().numpy()
advantage_list = []
advantage = 0.0
for delta in td_delta[::-1]:
advantage = gamma * lmbda * advantage + delta
advantage_list.append(advantage)
advantage_list.reverse()
return torch.tensor(advantage_list, dtype=torch.float)

7. TRPO算法完整实现

import torch
import numpy as np
import gymnasium as gym # 改为 gymnasium
import matplotlib.pyplot as plt
import torch.nn.functional as F
import copy
from collections import deque
import random

# 自定义工具函数,替代 rl_utils
def compute_advantage(gamma, lmbda, td_delta):
td_delta = td_delta.detach().numpy()
advantage_list = []
advantage = 0.0
for delta in td_delta[::-1]:
advantage = gamma * lmbda * advantage + delta
advantage_list.append(advantage)
advantage_list.reverse()
return torch.tensor(advantage_list, dtype=torch.float)

def moving_average(a, window_size):
cumulative_sum = np.cumsum(np.insert(a, 0, 0))
middle = (cumulative_sum[window_size:] - cumulative_sum[:-window_size]) / window_size
r = np.arange(1, window_size-1, 2)
begin = np.cumsum(a[:window_size-1])[::2] / r
end = (np.cumsum(a[:-window_size:-1])[::2] / r)[::-1]
return np.concatenate((begin, middle, end))

def train_on_policy_agent(env, agent, num_episodes):
return_list = []
for i in range(10):
with tqdm(total=int(num_episodes/10), desc='Iteration %d' % i) as pbar:
for i_episode in range(int(num_episodes/10)):
episode_return = 0
transition_dict = {
'states': [],
'actions': [],
'next_states': [],
'rewards': [],
'dones': []
}
state, _ = env.reset()
done = False
while not done:
action = agent.take_action(state)
next_state, reward, terminated, truncated, _ = env.step(action)
done = terminated or truncated
transition_dict['states'].append(state)
transition_dict['actions'].append(action)
transition_dict['next_states'].append(next_state)
transition_dict['rewards'].append(reward)
transition_dict['dones'].append(done)
state = next_state
episode_return += reward
return_list.append(episode_return)
agent.update(transition_dict)
if (i_episode+1) % 10 == 0:
pbar.set_postfix({'episode': '%d' % (num_episodes/10 * i + i_episode+1),
'return': '%.3f' % np.mean(return_list[-10:])})
pbar.update(1)
return return_list

class PolicyNet(torch.nn.Module):
def __init__(self, state_dim, hidden_dim, action_dim):
super(PolicyNet, self).__init__()
self.fc1 = torch.nn.Linear(state_dim, hidden_dim)
self.fc2 = torch.nn.Linear(hidden_dim, action_dim)

def forward(self, x):
x = F.relu(self.fc1(x))
return F.softmax(self.fc2(x), dim=1)

class ValueNet(torch.nn.Module):
def __init__(self, state_dim, hidden_dim):
super(ValueNet, self).__init__()
self.fc1 = torch.nn.Linear(state_dim, hidden_dim)
self.fc2 = torch.nn.Linear(hidden_dim, 1)

def forward(self, x):
x = F.relu(self.fc1(x))
return self.fc2(x)

class TRPO:
""" TRPO算法 """
def __init__(self, hidden_dim, state_space, action_space, lmbda,
kl_constraint, alpha, critic_lr, gamma, device):
state_dim = state_space.shape[0]
action_dim = action_space.n
# 策略网络参数不需要优化器更新
self.actor = PolicyNet(state_dim, hidden_dim, action_dim).to(device)
self.critic = ValueNet(state_dim, hidden_dim).to(device)
self.critic_optimizer = torch.optim.Adam(self.critic.parameters(),
lr=critic_lr)
self.gamma = gamma
self.lmbda = lmbda # GAE参数
self.kl_constraint = kl_constraint # KL距离最大限制
self.alpha = alpha # 线性搜索参数
self.device = device

def take_action(self, state):
state = torch.tensor([state], dtype=torch.float).to(self.device)
probs = self.actor(state)
action_dist = torch.distributions.Categorical(probs)
action = action_dist.sample()
return action.item()

def hessian_matrix_vector_product(self, states, old_action_dists, vector):
# 计算黑塞矩阵和一个向量的乘积
new_action_dists = torch.distributions.Categorical(self.actor(states))
kl = torch.mean(
torch.distributions.kl.kl_divergence(old_action_dists,
new_action_dists)) # 计算平均KL距离
kl_grad = torch.autograd.grad(kl,
self.actor.parameters(),
create_graph=True)
kl_grad_vector = torch.cat([grad.view(-1) for grad in kl_grad])
# KL距离的梯度先和向量进行点积运算
kl_grad_vector_product = torch.dot(kl_grad_vector, vector)
grad2 = torch.autograd.grad(kl_grad_vector_product,
self.actor.parameters())
grad2_vector = torch.cat([grad.view(-1) for grad in grad2])
return grad2_vector

def conjugate_gradient(self, grad, states, old_action_dists): # 共轭梯度法求解方程
x = torch.zeros_like(grad)
r = grad.clone()
p = grad.clone()
rdotr = torch.dot(r, r)
for i in range(10): # 共轭梯度主循环
Hp = self.hessian_matrix_vector_product(states, old_action_dists,
p)
alpha = rdotr / torch.dot(p, Hp)
x += alpha * p
r -= alpha * Hp
new_rdotr = torch.dot(r, r)
if new_rdotr < 1e-10:
break
beta = new_rdotr / rdotr
p = r + beta * p
rdotr = new_rdotr
return x

def compute_surrogate_obj(self, states, actions, advantage, old_log_probs,
actor): # 计算策略目标
log_probs = torch.log(actor(states).gather(1, actions))
ratio = torch.exp(log_probs - old_log_probs)
return torch.mean(ratio * advantage)

def line_search(self, states, actions, advantage, old_log_probs,
old_action_dists, max_vec): # 线性搜索
old_para = torch.nn.utils.convert_parameters.parameters_to_vector(
self.actor.parameters())
old_obj = self.compute_surrogate_obj(states, actions, advantage,
old_log_probs, self.actor)
for i in range(15): # 线性搜索主循环
coef = self.alpha**i
new_para = old_para + coef * max_vec
new_actor = copy.deepcopy(self.actor)
torch.nn.utils.convert_parameters.vector_to_parameters(
new_para, new_actor.parameters())
new_action_dists = torch.distributions.Categorical(
new_actor(states))
kl_div = torch.mean(
torch.distributions.kl.kl_divergence(old_action_dists,
new_action_dists))
new_obj = self.compute_surrogate_obj(states, actions, advantage,
old_log_probs, new_actor)
if new_obj > old_obj and kl_div < self.kl_constraint:
return new_para
return old_para

def policy_learn(self, states, actions, old_action_dists, old_log_probs,
advantage): # 更新策略函数
surrogate_obj = self.compute_surrogate_obj(states, actions, advantage,
old_log_probs, self.actor)
grads = torch.autograd.grad(surrogate_obj, self.actor.parameters())
obj_grad = torch.cat([grad.view(-1) for grad in grads]).detach()
# 用共轭梯度法计算x = H^(-1)g
descent_direction = self.conjugate_gradient(obj_grad, states,
old_action_dists)

Hd = self.hessian_matrix_vector_product(states, old_action_dists,
descent_direction)
max_coef = torch.sqrt(2 * self.kl_constraint /
(torch.dot(descent_direction, Hd) + 1e-8))
new_para = self.line_search(states, actions, advantage, old_log_probs,
old_action_dists,
descent_direction * max_coef) # 线性搜索
torch.nn.utils.convert_parameters.vector_to_parameters(
new_para, self.actor.parameters()) # 用线性搜索后的参数更新策略

def update(self, transition_dict):
states = torch.tensor(np.array(transition_dict['states']),
dtype=torch.float).to(self.device)
actions = torch.tensor(transition_dict['actions']).view(-1, 1).to(
self.device)
rewards = torch.tensor(transition_dict['rewards'],
dtype=torch.float).view(-1, 1).to(self.device)
next_states = torch.tensor(np.array(transition_dict['next_states']),
dtype=torch.float).to(self.device)
dones = torch.tensor(transition_dict['dones'],
dtype=torch.float).view(-1, 1).to(self.device)
td_target = rewards + self.gamma * self.critic(next_states) * (1 -
dones)
td_delta = td_target - self.critic(states)
advantage = compute_advantage(self.gamma, self.lmbda,
td_delta.cpu()).to(self.device)
old_log_probs = torch.log(self.actor(states).gather(1,
actions)).detach()
old_action_dists = torch.distributions.Categorical(
self.actor(states).detach())
critic_loss = torch.mean(
F.mse_loss(self.critic(states), td_target.detach()))
self.critic_optimizer.zero_grad()
critic_loss.backward()
self.critic_optimizer.step() # 更新价值函数
# 更新策略函数
self.policy_learn(states, actions, old_action_dists, old_log_probs,
advantage)

# 主程序
if __name__ == "__main__":
from tqdm import tqdm

num_episodes = 500
hidden_dim = 128
gamma = 0.98
lmbda = 0.95
critic_lr = 1e-2
kl_constraint = 0.0005
alpha = 0.5
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")

env_name = 'CartPole-v1' # 使用 v1 版本
env = gym.make(env_name)

# 设置随机种子
random.seed(0)
np.random.seed(0)
torch.manual_seed(0)

agent = TRPO(hidden_dim, env.observation_space, env.action_space, lmbda,
kl_constraint, alpha, critic_lr, gamma, device)
return_list = train_on_policy_agent(env, agent, num_episodes)

episodes_list = list(range(len(return_list)))
plt.figure(figsize=(12, 5))

plt.subplot(1, 2, 1)
plt.plot(episodes_list, return_list)
plt.xlabel('Episodes')
plt.ylabel('Returns')
plt.title('TRPO on {}'.format(env_name))
plt.grid(True, alpha=0.3)

plt.subplot(1, 2, 2)
mv_return = moving_average(return_list, 9)
plt.plot(episodes_list[:len(mv_return)], mv_return)
plt.xlabel('Episodes')
plt.ylabel('Returns')
plt.title('TRPO on {} (Moving Average)'.format(env_name))
plt.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()
运行结果

image

(.venv) PS F:\BLOG\ROT-Blog\docs\Control\强化学习> python .\1.py
Iteration 0: 0%| | 0/50 [00:00<?, ?it/s]F:\BLOG\ROT-Blog\docs\Control\强化学习\1.py:101: UserWarning: Creating a tensor from a list of numpy.ndarrays is extremely slow. Please consider converting the
list to a single numpy.ndarray with numpy.array() before converting to a tensor. (Triggered internally at C:\actions-runner\_work\pytorch\pytorch\pytorch\torch\csrc\utils\tensor_new.cpp:256.)
state = torch.tensor([state], dtype=torch.float).to(self.device)
Iteration 0: 100%|█████████████████████████████████████████████████████████████████████████████████| 50/50 [00:02<00:00, 24.66it/s, episode=50, return=53.000]
Iteration 1: 100%|████████████████████████████████████████████████████████████████████████████████| 50/50 [00:02<00:00, 21.10it/s, episode=100, return=98.600]
Iteration 2: 100%|███████████████████████████████████████████████████████████████████████████████| 50/50 [00:03<00:00, 14.84it/s, episode=150, return=119.700]
Iteration 3: 100%|███████████████████████████████████████████████████████████████████████████████| 50/50 [00:03<00:00, 14.00it/s, episode=200, return=119.500]
Iteration 4: 100%|███████████████████████████████████████████████████████████████████████████████| 50/50 [00:04<00:00, 12.17it/s, episode=250, return=120.800]
Iteration 5: 100%|███████████████████████████████████████████████████████████████████████████████| 50/50 [00:03<00:00, 13.38it/s, episode=300, return=120.700]
Iteration 6: 100%|███████████████████████████████████████████████████████████████████████████████| 50/50 [00:03<00:00, 13.75it/s, episode=350, return=124.600]
Iteration 7: 100%|███████████████████████████████████████████████████████████████████████████████| 50/50 [00:03<00:00, 13.72it/s, episode=400, return=124.100]
Iteration 8: 100%|███████████████████████████████████████████████████████████████████████████████| 50/50 [00:03<00:00, 13.28it/s, episode=450, return=125.100]
Iteration 9: 100%|███████████████████████████████████████████████████████████████████████████████| 50/50 [00:03<00:00, 13.84it/s, episode=500, return=117.000]