跳到主要内容

动态规划算法

动态规划的基本思想是将待求解问题分解成若干个子问题,先求解子问题,然后从这些子问题的解得到目标问题的解。

基于动态规划的强化学习算法主要有两种:一是策略迭代(policy iteration),二是价值迭代(value iteration)。其中,策略迭代由两部分组成:策略评估(policy evaluation)和策略提升(policy improvement)。策略迭代中的策略评估使用贝尔曼期望方程来得到一个策略的状态价值函数,这是一个动态规划的过程;而价值迭代直接使用贝尔曼最优方程来进行动态规划,得到最终的最优状态价值。

Image

我们将基于一个非常经典的例子————悬崖漫步(CliffWalking)来介绍策略迭代算法。

它的环境:

import copy


class CliffWalkingEnv:
""" 悬崖漫步环境"""
def __init__(self, ncol=12, nrow=4):
self.ncol = ncol # 定义网格世界的列
self.nrow = nrow # 定义网格世界的行
# 转移矩阵P[state][action] = [(p, next_state, reward, done)]包含下一个状态和奖励
self.P = self.createP()

def createP(self):
# 初始化
P = [[[] for j in range(4)] for i in range(self.nrow * self.ncol)]
# 4种动作, change[0]:上,change[1]:下, change[2]:左, change[3]:右。坐标系原点(0,0)
# 定义在左上角
change = [[0, -1], [0, 1], [-1, 0], [1, 0]]
for i in range(self.nrow):
for j in range(self.ncol):
for a in range(4):
# 位置在悬崖或者目标状态,因为无法继续交互,任何动作奖励都为0
if i == self.nrow - 1 and j > 0:
P[i * self.ncol + j][a] = [(1, i * self.ncol + j, 0,
True)]
continue
# 其他位置
next_x = min(self.ncol - 1, max(0, j + change[a][0]))
next_y = min(self.nrow - 1, max(0, i + change[a][1]))
next_state = next_y * self.ncol + next_x
reward = -1
done = False
# 下一个位置在悬崖或者终点
if next_y == self.nrow - 1 and next_x > 0:
done = True
if next_x != self.ncol - 1: # 下一个位置在悬崖
reward = -100
P[i * self.ncol + j][a] = [(1, next_state, reward, done)]
return P

1. 策略迭代算法

策略迭代是策略评估和策略提升不断循环交替,直至最后得到最优策略的过程。

1.1 策略评估

策略评估这一过程用来计算一个策略的状态价值函数。

我们将使用我们曾经推导的贝尔曼期望方程来进行策略评估:

Vπ(s)=aπ(as)[r(s,a)+γs,rP(s,rs,a)Vπ(s)]V^\pi(s) = \sum_{a} \pi(a \mid s) [r(s, a) + \gamma \sum_{s', r} P(s', r \mid s, a) V^\pi(s')]

根据动态规划的思想,我们可以通过迭代方式求解贝尔曼期望方程。将计算下一个状态的价值视为子问题,当前状态的价值作为当前问题。

迭代更新公式

Vk+1(s)=aπ(as)[R(s,a)+γsP(ss,a)Vk(s)]V_{k+1}(s) = \sum_{a} \pi(a \mid s) \left[ R(s, a) + \gamma \sum_{s'} P(s' \mid s, a) V_k(s') \right]

其中 Vk(s)V_k(s) 表示第 kk 轮迭代时状态 ss 的价值估计。

我们可以选定任意初始值 V0V_0。根据贝尔曼期望方程,可以得知 VπV^\pi 是以上更新公式的一个不动点(fixed point)。事实上,可以证明当 kk \to \infty 时,序列 Vk{V_k} 会收敛到 VπV^\pi,所以可以据此来计算得到一个策略的状态价值函数。

不动点定理

贝尔曼期望方程是一个不动点方程,满足:

Vπ=Tπ(Vπ)V^\pi = \mathcal{T}^\pi(V^\pi)

其中 Tπ\mathcal{T}^\pi 是策略 π\pi 的贝尔曼期望算子。

收敛性证明

定理:对于任意初始值 V0V_0,序列 {Vk}\{V_k\} 收敛到 VπV^\pi

证明: 定义贝尔曼期望算子 Tπ\mathcal{T}^\pi

(TπV)(s)=aπ(as)[R(s,a)+γsP(ss,a)V(s)](\mathcal{T}^\pi V)(s) = \sum_{a} \pi(a \mid s) \left[ R(s, a) + \gamma \sum_{s'} P(s' \mid s, a) V(s') \right]

可以证明 Tπ\mathcal{T}^\pi 是一个 γ\gamma-压缩映射:

TπV1TπV2γV1V2\|\mathcal{T}^\pi V_1 - \mathcal{T}^\pi V_2\|_\infty \leq \gamma \|V_1 - V_2\|_\infty

根据巴拿赫不动点定理,Tπ\mathcal{T}^\pi 有唯一不动点 VπV^\pi,且迭代序列 Vk+1=TπVkV_{k+1} = \mathcal{T}^\pi V_k 收敛到该不动点。

收敛速度为:

VkVπγkV0Vπ\|V_k - V^\pi\|_\infty \leq \gamma^k \|V_0 - V^\pi\|_\infty

提前终止条件:

在实际实现过程中,如果某一轮的值变化 maxsSVk+1(s)Vk(s)<θ\max_{s \in \mathcal{S}} |V_{k+1}(s) - V_k(s)| < \theta 非常小(其中 θ\theta 是一个预设的小正数),可以提前结束策略评估。这样做可以提升效率,并且得到的价值也非常接近真实的价值。

1.2 策略提升

策略提升的目标是根据当前的状态价值函数 VπV^\pi 来改进策略 π\pi。假设此时对于策略 π\pi,我们已经知道其价值 VπV^\pi,也就是知道了在策略 π\pi 下从每一个状态出发最终得到的期望回报。

我们要如何改变策略来获得在状态 ss 下更高的期望回报呢?假设智能体在状态 ss 下采取动作 aa,之后的动作依旧遵循策略 π\pi,此时得到的期望回报其实就是动作价值 Qπ(s,a)Q^\pi(s, a)。如果我们有 Qπ(s,a)>Vπ(s)Q^\pi(s, a) > V^\pi(s),则说明在状态 ss 下采取动作 aa 会比原来的策略 π\pi 得到更高的期望回报。

以上假设只是针对一个状态,现在假设存在一个确定性策略 π\pi',在任意一个状态 ss 下,都满足:

Qπ(s,π(s))Vπ(s)Q^\pi(s, \pi'(s)) \geq V^\pi(s)

于是在任意状态 ss 下,我们有:

Vπ(s)Vπ(s)V^{\pi'}(s) \geq V^\pi(s)

这便是策略提升定理(policy improvement theorem)。

于是我们可以直接贪心地在每一个状态选择动作价值最大的动作,也就是:

π(s)=argmaxaQπ(s,a)=argmaxa[R(s,a)+γsP(ss,a)Vπ(s)]\pi'(s) = \arg\max_a Q^\pi(s, a) = \arg\max_a \left[ R(s, a) + \gamma \sum_{s'} P(s' \mid s, a) V^\pi(s') \right]

我们发现构造的贪心策略 π\pi' 满足策略提升定理的条件,所以策略 π\pi' 能够比策略 π\pi 更好或者至少与其一样好。这个根据贪心法选取动作从而得到新的策略的过程称为策略提升

我们需要证明:如果对于所有状态 ss,有 Qπ(s,π(s))Vπ(s)Q^\pi(s, \pi'(s)) \geq V^\pi(s),那么对于所有状态 ss,有 Vπ(s)Vπ(s)V^{\pi'}(s) \geq V^\pi(s)

证明

根据动作价值的定义:

Qπ(s,π(s))=R(s,π(s))+γsP(ss,π(s))Vπ(s)Q^\pi(s, \pi'(s)) = R(s, \pi'(s)) + \gamma \sum_{s'} P(s' \mid s, \pi'(s)) V^\pi(s')

由假设条件 Qπ(s,π(s))Vπ(s)Q^\pi(s, \pi'(s)) \geq V^\pi(s),我们得到:

R(s,π(s))+γsP(ss,π(s))Vπ(s)Vπ(s)(1)R(s, \pi'(s)) + \gamma \sum_{s'} P(s' \mid s, \pi'(s)) V^\pi(s') \geq V^\pi(s) \quad \text{(1)}

现在考虑新策略 π\pi' 的状态价值函数。根据贝尔曼方程:

Vπ(s)=R(s,π(s))+γsP(ss,π(s))Vπ(s)(2)V^{\pi'}(s) = R(s, \pi'(s)) + \gamma \sum_{s'} P(s' \mid s, \pi'(s)) V^{\pi'}(s') \quad \text{(2)}

我们定义价值函数的差:

Δ(s)=Vπ(s)Vπ(s)\Delta(s) = V^{\pi'}(s) - V^\pi(s)

将公式 (1) 和 (2) 结合,我们得到:

Vπ(s)Vπ(s)γsP(ss,π(s))[Vπ(s)Vπ(s)]V^{\pi'}(s) - V^\pi(s) \geq \gamma \sum_{s'} P(s' \mid s, \pi'(s)) [V^{\pi'}(s') - V^\pi(s')]

即:

Δ(s)γsP(ss,π(s))Δ(s)(3)\Delta(s) \geq \gamma \sum_{s'} P(s' \mid s, \pi'(s)) \Delta(s') \quad \text{(3)}

公式 (3) 表明,Δ(s)\Delta(s) 满足一个线性不等式系统。由于 γ<1\gamma < 1P(ss,π(s))P(s' \mid s, \pi'(s)) 是概率分布,这个不等式系统的唯一解是 Δ(s)0\Delta(s) \geq 0 对于所有状态 ss 成立。

更严格地,我们可以通过迭代方式证明。定义序列 {Δk(s)}\{\Delta_k(s)\},其中 Δ0(s)=0\Delta_0(s) = 0,且:

Δk+1(s)=γsP(ss,π(s))Δk(s)\Delta_{k+1}(s) = \gamma \sum_{s'} P(s' \mid s, \pi'(s)) \Delta_k(s')

由于 Δ0(s)=0Δ(s)\Delta_0(s) = 0 \leq \Delta(s),且算子 T(Δ)(s)=γsP(ss,π(s))Δ(s)T(\Delta)(s) = \gamma \sum_{s'} P(s' \mid s, \pi'(s)) \Delta(s') 是单调的,通过数学归纳法可得 Δk(s)Δ(s)\Delta_k(s) \leq \Delta(s) 对于所有 kk 成立。

又因为 Δk+1ΔkγΔkΔk1\|\Delta_{k+1} - \Delta_k\|_\infty \leq \gamma \|\Delta_k - \Delta_{k-1}\|_\infty,序列 {Δk}\{\Delta_k\} 收敛到 0。因此,Δ(s)0\Delta(s) \geq 0 对于所有状态 ss 成立。

这就证明了 Vπ(s)Vπ(s)V^{\pi'}(s) \geq V^\pi(s) 对于所有状态 ss 成立。

当策略提升之后得到的策略 π\pi' 和之前的策略 π\pi 一样时,说明策略迭代达到了收敛,此时 π\piπ\pi' 就是最优策略。

此时满足:

π(s)=argmaxaQπ(s,a)\pi(s) = \arg\max_a Q^\pi(s, a)

这正好是贝尔曼最优方程的条件,因此 π\pi 是最优策略。

2. 策略迭代算法

策略迭代算法是一种求解马尔可夫决策过程(MDP)的经典方法,它通过交替执行策略评估和策略提升两个步骤来逐步改进策略,直至收敛到最优策略。算法的完整过程如下:

首先对当前策略进行策略评估,计算其状态价值函数;然后基于该状态价值函数进行策略提升,得到一个改进的新策略;接着继续评估新策略、提升策略……如此循环迭代,直至收敛到最优策略。

策略稳定标志 ← True

对于每一个状态 s ∈ S:

  • 旧动作 ← π(s)
  • π(s) ← argmax_a [R(s,a) + γ Σ_s' P(s'|s,a) V(s')]
  • 如果 旧动作 ≠ π(s), 则 策略稳定标志 ← False

若 策略稳定标志 = True, 则停止算法并返回 V 和 π。

import numpy as np

def policy_iteration(env, gamma=0.99, theta=1e-8):
"""
策略迭代算法实现

参数:
env: 环境对象,需要提供以下属性:
- nS: 状态数量
- nA: 动作数量
- P: 转移概率函数,P[s][a] = [(prob, next_state, reward, done), ...]
gamma: 折扣因子
theta: 收敛阈值

返回:
policy: 最优策略,形状为 (nS, nA)
V: 最优价值函数,形状为 (nS,)
"""
# 初始化随机策略(均匀分布)和价值函数
policy = np.ones([env.nS, env.nA]) / env.nA
V = np.zeros(env.nS)

policy_stable = False
iteration_count = 0

while not policy_stable:
iteration_count += 1

# 策略评估阶段
while True:
delta = 0
for s in range(env.nS):
v_old = V[s]
new_value = 0

# 计算当前策略下的状态价值
for a, action_prob in enumerate(policy[s]):
for prob, next_state, reward, done in env.P[s][a]:
# 如果到达终止状态,下一个状态价值为0
next_value = 0 if done else V[next_state]
new_value += action_prob * prob * (reward + gamma * next_value)

V[s] = new_value
delta = max(delta, abs(v_old - V[s]))

if delta < theta:
break

# 策略提升阶段
policy_stable = True
for s in range(env.nS):
# 保存旧动作(概率最大的动作)
old_action = np.argmax(policy[s])

# 计算每个动作的动作价值
action_values = np.zeros(env.nA)
for a in range(env.nA):
for prob, next_state, reward, done in env.P[s][a]:
next_value = 0 if done else V[next_state]
action_values[a] += prob * (reward + gamma * next_value)

# 选择最优动作(可能有多个相同价值的动作)
best_actions = np.where(action_values == np.max(action_values))[0]

# 创建新的策略(均匀分布在最优动作上)
new_policy = np.zeros(env.nA)
for a in best_actions:
new_policy[a] = 1.0 / len(best_actions)

# 检查策略是否改变
if not np.array_equal(policy[s], new_policy):
policy_stable = False

policy[s] = new_policy

print(f"迭代 {iteration_count} 完成,策略{'已稳定' if policy_stable else '未稳定'}")

return policy, V

# 辅助函数:显示策略
def display_policy(policy, shape=None):
"""
以可读格式显示策略

参数:
policy: 策略数组,形状为 (nS, nA)
shape: 环境的形状 (rows, cols),可选
"""
# 动作符号映射
action_symbols = {0: '↑', 1: '→', 2: '↓', 3: '←'} # 上下左右

if shape is not None:
rows, cols = shape
for row in range(rows):
line = ""
for col in range(cols):
state = row * cols + col
if state < len(policy):
action = np.argmax(policy[state])
line += action_symbols.get(action, str(action)) + " "
else:
line += "X " # 无效状态
print(line)
else:
for s in range(len(policy)):
action = np.argmax(policy[s])
print(f"状态 {s}: {action_symbols.get(action, str(action))}")

# 辅助函数:显示价值函数
def display_values(V, shape=None):
"""
以可读格式显示价值函数

参数:
V: 价值函数数组,形状为 (nS,)
shape: 环境的形状 (rows, cols),可选
"""
if shape is not None:
rows, cols = shape
for row in range(rows):
line = ""
for col in range(cols):
state = row * cols + col
if state < len(V):
line += f"{V[state]:.2f} "
else:
line += "X " # 无效状态
print(line)
else:
for s in range(len(V)):
print(f"状态 {s}: {V[s]:.4f}")

# 简单的网格世界环境示例
class SimpleGridWorld:
"""
简单的网格世界环境
"""
def __init__(self, rows=3, cols=3):
self.rows = rows
self.cols = cols
self.nS = rows * cols
self.nA = 4 # 上下左右

# 定义转移概率
self.P = {s: {a: [] for a in range(self.nA)} for s in range(self.nS)}

# 目标状态(右下角)
self.goal_state = self.nS - 1

# 初始化转移概率
self._init_transitions()

def _init_transitions(self):
"""初始化转移概率"""
for s in range(self.nS):
row, col = s // self.cols, s % self.cols

for a in range(self.nA):
# 计算下一个状态
if a == 0: # 上
next_row, next_col = max(0, row-1), col
elif a == 1: # 右
next_row, next_col = row, min(self.cols-1, col+1)
elif a == 2: # 下
next_row, next_col = min(self.rows-1, row+1), col
elif a == 3: # 左
next_row, next_col = row, max(0, col-1)

next_state = next_row * self.cols + next_col

# 计算奖励
if next_state == self.goal_state:
reward = 1.0
done = True
else:
reward = 0.0
done = False

# 转移概率为1(确定性环境)
self.P[s][a].append((1.0, next_state, reward, done))

# 示例使用
if __name__ == "__main__":
# 创建环境
env = SimpleGridWorld(rows=3, cols=3)

# 运行策略迭代
print("开始策略迭代...")
policy, V = policy_iteration(env, gamma=0.9, theta=1e-6)

# 显示结果
print("\n最优策略:")
display_policy(policy, shape=(3, 3))

print("\n最优价值函数:")
display_values(V, shape=(3, 3))

# 验证策略
print("\n策略验证:")
for s in range(env.nS):
action = np.argmax(policy[s])
action_symbol = {0: '上', 1: '右', 2: '下', 3: '左'}[action]
print(f"状态 {s}: 动作 {action_symbol}, 价值 {V[s]:.4f}")
Details

运行结果为:

开始策略迭代... 迭代 1 完成,策略未稳定 迭代 2 完成,策略未稳定 迭代 3 完成,策略已稳定

最优策略: → → ↓ → → ↓ → → →

最优价值函数: 0.73 0.81 0.90 0.81 0.90 1.00 0.90 1.00 1.00

策略验证: 状态 0: 动作 右, 价值 0.7290 状态 1: 动作 右, 价值 0.8100 状态 2: 动作 下, 价值 0.9000 状态 3: 动作 右, 价值 0.8100 状态 4: 动作 右, 价值 0.9000 状态 5: 动作 下, 价值 1.0000 状态 6: 动作 右, 价值 0.9000 状态 7: 动作 右, 价值 1.0000 状态 8: 动作 右, 价值 1.0000

3. 价值迭代算法

价值迭代基于贝尔曼最优方程:

V(s)=maxa[R(s,a)+γsP(ss,a)V(s)]V^*(s) = \max_a \left[ R(s, a) + \gamma \sum_{s'} P(s' \mid s, a) V^*(s') \right]

将贝尔曼最优方程写成迭代更新的形式:

Vk+1(s)=maxa[R(s,a)+γsP(ss,a)Vk(s)]V_{k+1}(s) = \max_a \left[ R(s, a) + \gamma \sum_{s'} P(s' \mid s, a) V_k(s') \right]

Vk+1V_{k+1}VkV_k 足够接近时,VkV_k 就是贝尔曼最优方程的不动点,此时对应着最优状态价值函数 VV^*

得到最优状态价值函数后,我们可以从中恢复出最优策略:

π(s)=argmaxa[R(s,a)+γsP(ss,a)V(s)]\pi^*(s) = \arg\max_a \left[ R(s, a) + \gamma \sum_{s'} P(s' \mid s, a) V^*(s') \right]
  1. 初始化:随机初始化价值函数 V(s)V(s)
  2. 价值迭代
while $max_s |V_{k+1}(s) - V_k(s)| > θ $
do:
- 对于每一个状态 $s ∈ S$:
- $V_{k+1}(s) = max_a [ R(s,a) + γ * Σ_s' P(s'|s,a) * V_k(s') ]$
- end while
  1. 策略提取:返回一个确定性策略
import numpy as np

def value_iteration(env, gamma=0.99, theta=1e-8):
"""
价值迭代算法实现

参数:
env: 环境对象,需要提供以下属性:
- nS: 状态数量
- nA: 动作数量
- P: 转移概率函数,P[s][a] = [(prob, next_state, reward, done), ...]
gamma: 折扣因子
theta: 收敛阈值

返回:
policy: 最优策略,形状为 (nS, nA)
V: 最优价值函数,形状为 (nS,)
"""
# 初始化价值函数
V = np.zeros(env.nS)

iteration_count = 0

while True:
iteration_count += 1
delta = 0

# 对每个状态进行价值更新
for s in range(env.nS):
v_old = V[s]

# 计算每个动作的期望回报
action_values = np.zeros(env.nA)
for a in range(env.nA):
for prob, next_state, reward, done in env.P[s][a]:
# 如果到达终止状态,下一个状态价值为0
next_value = 0 if done else V[next_state]
action_values[a] += prob * (reward + gamma * next_value)

# 选择最大的动作价值作为新的状态价值
V[s] = np.max(action_values)
delta = max(delta, abs(v_old - V[s]))

print(f"迭代 {iteration_count}, 最大变化: {delta:.6f}")

if delta < theta:
break

# 从最优价值函数中提取策略
policy = np.zeros([env.nS, env.nA])
for s in range(env.nS):
# 计算每个动作的期望回报
action_values = np.zeros(env.nA)
for a in range(env.nA):
for prob, next_state, reward, done in env.P[s][a]:
next_value = 0 if done else V[next_state]
action_values[a] += prob * (reward + gamma * next_value)

# 选择最优动作(可能有多个相同价值的动作)
best_actions = np.where(action_values == np.max(action_values))[0]

# 创建确定性策略(均匀分布在最优动作上)
for a in best_actions:
policy[s][a] = 1.0 / len(best_actions)

return policy, V

# 使用相同的环境和辅助函数
if __name__ == "__main__":
# 创建环境
env = SimpleGridWorld(rows=3, cols=3)

# 运行价值迭代
print("开始价值迭代...")
policy, V = value_iteration(env, gamma=0.9, theta=1e-6)

# 显示结果
print("\n最优策略:")
display_policy(policy, shape=(3, 3))

print("\n最优价值函数:")
display_values(V, shape=(3, 3))

# 验证策略
print("\n策略验证:")
for s in range(env.nS):
action = np.argmax(policy[s])
action_symbol = {0: '上', 1: '右', 2: '下', 3: '左'}[action]
print(f"状态 {s}: 动作 {action_symbol}, 价值 {V[s]:.4f}")

在冰湖环境中,试验这两种算法:

import gym
env = gym.make("FrozenLake-v0") # 创建环境
env = env.unwrapped # 解封装才能访问状态转移矩阵P
env.render() # 环境渲染,通常是弹窗显示或打印出可视化的环境

holes = set()
ends = set()
for s in env.P:
for a in env.P[s]:
for s_ in env.P[s][a]:
if s_[2] == 1.0: # 获得奖励为1,代表是目标
ends.add(s_[1])
if s_[3] == True:
holes.add(s_[1])
holes = holes - ends
print("冰洞的索引:", holes)
print("目标的索引:", ends)

for a in env.P[14]: # 查看目标左边一格的状态转移信息
print(env.P[14][a])

# 这个动作意义是Gym库针对冰湖环境事先规定好的
# 策略迭代算法
action_meaning = ['<', 'v', '>', '^']
theta = 1e-5
gamma = 0.9
agent = PolicyIteration(env, theta, gamma)
agent.policy_iteration()
print_agent(agent, action_meaning, [5, 7, 11, 12], [15])
# 价值迭代算法
action_meaning = ['<', 'v', '>', '^']
theta = 1e-5
gamma = 0.9
agent = ValueIteration(env, theta, gamma)
agent.value_iteration()
print_agent(agent, action_meaning, [5, 7, 11, 12], [15])