跳到主要内容

动态规划算法

RL 中的动态规划

动态规划(dynamic programming)是程序设计算法中非常重要的内容,能够高效解决一些经典问题,例如背包问题和最短路径规划。动态规划的基本思想是将待求解问题分解成若干个子问题,先求解子问题,然后从这些子问题的解得到目标问题的解。动态规划会保存已解决的子问题的答案,在求解目标问题的过程中,需要这些子问题答案时就可以直接利用,避免重复计算。

以下是两种主要的动态规划算法:

1、策略迭代(Policy Iteration)

策略迭代是一个两阶段的过程,包括策略评估和策略改进。

(1.) 策略评估(Policy Evaluation): 在这一步骤中,算法评估当前策略的价值,即计算在该策略下从每个状态出发的预期回报。这通常通过迭代应用贝尔曼期望方程来实现,直到状态价值函数收敛。

提示

贝尔曼期望方程:

V(s)=aAπ(as)sP(ss,a)(R(s,a,s)+γV(s))V(s) = \sum_{a \in A} \pi(a|s) \sum_{s'} P(s'|s,a) \left( R(s,a,s') + \gamma V(s') \right)

其中 π\pi 是策略,PP 是转移概率,RR 是奖励,γ\gamma 是折扣因子。

(2.) 策略改进(Policy Improvement):

  • 基于当前的价值函数来改进策略。对于每个状态,选择使得期望回报最大化的动作作为新的策略。
  • 策略改进定理保证这一过程会得到一个等于或优于当前策略的新策略。

策略迭代交替进行策略评估和策略改进,直到策略收敛。

2、价值迭代(Value Iteration)

价值迭代是策略迭代的一种特殊形式,它将策略评估和策略改进步骤结合在一起。

  • 在价值迭代中,你不需要等待策略评估步骤完全收敛。在每次迭代中,你都会更新状态的价值估计,并基于这个新的估计来改进策略。
  • 价值迭代使用的是贝尔曼最优方程:V(s)=maxaAsP(ss,a)(R(s,a,s)+γV(s))V(s) = \max_{a \in A} \sum_{s'} P(s'|s,a) \left( R(s,a,s') + \gamma V(s') \right)。这里,我们选择的是使价值最大化的动作,而不是根据当前策略选择动作。

价值迭代通常比策略迭代更快收敛,尤其是在状态价值函数变化很大的情况下。

基于动态规划的这两种强化学习算法要求事先知道环境的状态转移函数和奖励函数,也就是需要知道整个马尔可夫决策过程。在这样一个白盒环境中,不需要通过智能体和环境的大量交互来学习,可以直接用动态规划求解状态价值函数。但是,现实中的白盒环境很少,这也是动态规划算法的局限之处,我们无法将其运用到很多实际场景中。另外,策略迭代和价值迭代通常只适用于有限马尔可夫决策过程,即状态空间和动作空间是离散且有限的。

编写悬崖漫步环境

悬崖漫步是一个非常经典的强化学习环境,它要求一个智能体从起点出发,避开悬崖行走,最终到达目标位置。如图 4-1 所示,有一个 4×12 的网格世界,每一个网格表示一个状态。智能体的起点是左下角的状态,目标是右下角的状态,智能体在每一个状态都可以采取 4 种动作:上、下、左、右。如果智能体采取动作后触碰到边界墙壁则状态不发生改变,否则就会相应到达下一个状态。环境中有一段悬崖,智能体掉入悬崖或到达目标状态都会结束动作并回到起点,也就是说掉入悬崖或者达到目标状态是终止状态。智能体每走一步的奖励是 −1,掉入悬崖的奖励是 −100。

这里使用 Gymnasium 搭建悬崖漫步环境,代码如下:

import numpy as np
from gymnasium import Env, spaces
import os
import time
import numpy as np
import copy


class CliffWalkingEnv(Env):
metadata = {'render.modes': ['console']}

def __init__(self, width=12, height=4):
super(CliffWalkingEnv, self).__init__()
self.width = width
self.height = height
self.action_space = spaces.Discrete(4) # 上、下、左、右
self.observation_space = spaces.Discrete(
self.width * self.height) # 描述智能体可以观察到的所有不同状态

# 悬崖的位置
self.cliff = np.array([i for i in range(
self.width * (self.height - 1) + 1, self.width * self.height - 1)])

self.start = self.width * (self.height - 1) # 起始位置
self.goal = self.width * self.height - 1 # 终点位置
self.state = None

def reset(self):
self.state = self.start
return self.state

# 这个 step 实际就是一个状态转移函数,它接收一个动作作为输入,然后返回下一个状态、回报、是否达到终止状态以及其他信息。
def step(self, action):
y, x = divmod(self.state, self.width)
if action == 0: # 上
y = max(y - 1, 0)
elif action == 1: # 右
x = min(x + 1, self.width - 1)
elif action == 2: # 下
y = min(y + 1, self.height - 1)
elif action == 3: # 左
x = max(x - 1, 0)

new_state = y * self.width + x
reward = -1 # 每走一步减1分
done = False

if new_state in self.cliff:
reward = -100 # 落入悬崖减100分
new_state = self.start # 重置到起始位置
elif new_state == self.goal:
done = True # 达到终点

self.state = new_state
return new_state, reward, done, {}

def render(self, mode='console'):
if mode != 'console':
raise NotImplementedError()

os.system('cls')
grid = np.zeros(self.width * self.height, dtype=int)

# 下面使用数字表示不同的物体
grid[self.start] = 1 # 起点
grid[self.goal] = 2 # 终点
grid[self.cliff] = -1 # 悬崖
grid[self.state] = 7 # 智能体当前位置

for i in range(self.height):
row = ' '.join(f"{grid[j]:>2}" for j in range(
i*self.width, (i+1)*self.width))
print(row)

编写一个简单的测试代码

def test_cliff_walking(env, episodes=10):
for episode in range(episodes):
state = env.reset()
done = False
total_reward = 0
steps = 0

print(f"\nEpisode {episode + 1}\nStart")
env.render()

while not done:
action = env.action_space.sample() # 随机选择一个动作
next_state, reward, done, _ = env.step(action)
total_reward += reward
steps += 1

# 等待 0.3s
time.sleep(0.3)
env.render()
print(
f"\nAction: {action}, Next State: {next_state}, Reward: {reward}, Done: {done}")

if done:
print(
f"\nEnd of Episode {episode + 1}: Total Reward = {total_reward}, Steps = {steps}")
break


# 创建环境实例
env = CliffWalkingEnv()

# 测试悬崖漫步环境
test_cliff_walking(env)

可以看到控制台能正常的显示环境,以及智能体的位置和动作,如下所示:

 0  0  0  0  0  0  0  0  0  0  0  0
7 0 0 0 0 0 0 0 0 0 0 0
0 0 0 0 0 0 0 0 0 0 0 0
1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 2

Action: 0, Next State: 12, Reward: -1, Done: False

这里的数字表示不同的物体,1 表示起点,2 表示终点,-1 表示悬崖,7 表示智能体当前位置。

首先是策略迭代算法

策略迭代是一种在强化学习和马尔可夫决策过程(MDP)中寻找最优策略的方法。它由两个主要步骤组成:策略评估(Policy Evaluation)和策略提升(Policy Improvement)。这两个步骤交替进行,直到策略收敛到最优策略。

  • 循环开始:从一个初始策略开始。
  • 策略评估:对当前策略进行评估,计算每个状态的值。
  • 策略提升:根据评估结果,更新策略,使其在每个状态都选择最优动作。
  • 重复:用新的策略再次进行策略评估,然后进行策略提升,如此循环。

策略评估

在策略评估步骤中,目标是计算当前策略下的状态价值函数。给定一个策略 π\pi,策略评估求解每个状态的价值,即在该策略下从该状态开始的期望回报。

过程:

  • 对于每个状态 ss,初始化一个估计的价值 V(s)V(s)(通常初始化为0)。
  • 迭代更新每个状态的价值,直到价值函数收敛。这意味着价值函数的变化在连续两次迭代之间非常小。
  • 更新公式基于贝尔曼期望方程:V(s)=aAπ(as)s,rp(s,rs,a)[r+γV(s)]V(s) = \sum_{a \in A} \pi(a|s) \sum_{s', r} p(s', r|s, a) \left[ r + \gamma V(s') \right],这里的 π(as)\pi(a|s) 表示这个动作 aa 在状态 ss 的概率,其中 p(s,rs,a)p(s', r|s, a) 是从状态 ss 采取动作 aa 后转移到状态 ss' 并接收奖励 rr 的概率,γ\gamma 是折扣因子。

注意,在确定性环境中,对于给定的状态 ss 和动作 aa,结果(即下一个状态 ss' 和奖励 rr)是固定的,不涉及概率。因此,在这种情况下,状态转移概率 p(s,rs,a)p(s', r|s, a) 要么是 0(不可能发生的转移),要么是 1(一定发生的转移)。这意味着在计算期望回报时,不需要显式地考虑状态转移概率,因为每个动作导致的结果是确定的。

策略提升

策略提升步骤的目的是根据当前的价值函数改进策略。通过这一步骤,策略变得更好,因为它在每个状态都选择能够带来更高期望回报的动作。

过程:

  • 对于每个状态 ss,考虑采取所有可能动作 aa 后的期望回报,并选择能够产生最高回报的动作作为该状态下的最优动作。
  • 这一步骤可以形式化为:π(s)=argmaxas,rp(s,rs,a)[r+γV(s)]\pi'(s) = \arg\max_{a} \sum_{s', r} p(s', r|s, a) \left[ r + \gamma V(s') \right],其中 π(s)\pi'(s) 是改进后的策略,这里的 p(s,rs,a)p(s', r|s, a) 表示在给定当前状态 ss 和选择的动作 aa 的情况下,转移到下一个状态 ss 并获得奖励 rr 的概率。这个概率通常被称为 状态转移概率
  • 如果新策略和旧策略在所有状态下都相同,那么策略收敛,算法结束。否则,使用新策略进行下一轮的策略评估。

策略迭代算法

编写一个策略迭代算法。策略迭代包含策略评估和策略提升两个步骤,这两个步骤交替进行直到策略收敛。

为什么需要多次迭代而不是一次就能计算出来?

  • 初次评估基于随机或者不完整的信息:初始状态价值通常是基于一个随机或者非最优化的策略,这意味着初始的价值估计可能不够准确。
  • 每次迭代改进策略和价值估计:每次进行策略评估和策略提升后,你会得到一个更好的策略和更准确的状态价值估计。这个过程需要重复进行,因为每次策略的改变都会影响状态的价值估计。
  • 收敛到最优策略:经过多轮迭代后,策略和状态价值估计会逐渐稳定,最终收敛到最优策略。

我这里简要介绍下下面的代码的实现思路:

1、首先最开始 PolicyIteration 初始化的时候,都是给所有的策略都赋值为 1/4,也就是每个动作的概率都是 1/4,这样就保证了每个状态下的策略是均匀的,是随机的。 2、然后根据这个策略进行策略评估,得到状态价值函数。因为每个状态的价值更新都可能影响其他状态的价值估计,所以需要进行多轮的迭代,直到状态价值函数收敛。 3、然后根据这个取得的状态价值函数再拿回来去更新策略,这样就得到了新的策略。 4、然后比较新旧策略,如果相同就说明策略已经稳定了,就可以退出循环了,如果不相同就继续进行策略评估和策略提升,直到策略稳定。

所以可以看到,这个算法其实就是策略评估和策略提升交替执行,因为有了策略变更,所以会导致状态价值函数的变化,所以需要重新进行策略评估,然后再进行策略提升,这样就会一直循环下去,直到策略稳定。

class PolicyIteration:
def __init__(self, env, discount_factor=0.9, theta=1e-9):
self.env = env
self.discount_factor = discount_factor
self.theta = theta
# 初始化状态价值函数和策略,这个 V 的含义是状态价值函数,它是一个数组,数组的长度等于状态空间的大小。
self.V = np.zeros(env.observation_space.n)
# np.ones 创建了一个填充有 1 的数组,这意味着每个状态-动作对的初始值都是 1。
# 除以 env.action_space.n:这个操作将每个状态-动作对的值除以动作空间的大小,从而使得每个状态下所有动作的初始概率相等。
# 这样,对于每个状态,策略分配的概率分布是均匀的,意味着初始策略是完全随机的。
self.policy = np.ones(
[env.observation_space.n, env.action_space.n]) / env.action_space.n

def policy_evaluation(self, max_iterations=10000):
"""
策略评估函数:根据当前策略计算状态价值函数。
在这一步骤中,算法评估当前策略的价值,即计算在该策略下从每个状态出发的预期回报。
这通常通过迭代应用贝尔曼期望方程来实现,直到状态价值函数收敛。
"""
iteration = 0
while True:
delta = 0
new_v = np.zeros(self.env.observation_space.n)
# 遍历所有状态
for s in range(self.env.observation_space.n):
v = 0
# 遍历所有可能的动作
for a, action_prob in enumerate(self.policy[s]):
next_state, reward, done = self.compute_next_state_reward(s, a)
# 状态价值函数的计算
# 当 done(表示是否达到终止状态)为真时,对应的未来价值(self.V[next_state])应该是 0。
# 这是因为在终止状态之后,没有进一步的回报或状态转移。如果不考虑 done 的值,可能会导致对状态价值的估计不准确。
v += action_prob * (reward + self.discount_factor * self.V[next_state] * (1 - done))
new_v[s] = v
delta = max(delta, np.abs(v - self.V[s]))

# 再把上面计算得到的新的状态价值函数赋值给 self.V
self.V = new_v
# 注意,需要检查 discount_factor 不能设置成 1,否则会永远无法收敛
if delta < self.theta or iteration >= max_iterations:
break
iteration += 1
print(f"策略评估进行了 {iteration} 轮后完成")

def policy_improvement(self):
"""
策略提升函数:基于当前的价值函数来改进策略
对于每个状态,选择使得期望回报最大化的动作作为新的策略。
策略改进定理保证这一过程会得到一个等于或优于当前策略的新策略。
"""
for s in range(self.env.observation_space.n):
# 评估每个动作的价值
q = np.zeros(self.env.action_space.n)
for a in range(self.env.action_space.n):
next_state, reward, done = self.compute_next_state_reward(s, a)
# 状态改进
# 在确定性环境中,对于给定的状态 s 和动作 a,结果(即下一个状态 s' 和奖励 r)是固定的,不涉及概率。
# 所以这里的 p 是 1,不需要显式地考虑状态转移概率,因为每个动作导致的结果是确定的。
q[a] = reward + self.discount_factor * self.V[next_state] * (1 - done)

# 选择价值最大的动作
best_action = np.argmax(q)
# p.eye 这个函数生成一个单位矩阵(identity matrix),其维度等于动作空间的大小。
# 例如,如果动作空间大小为 4,则 np.eye(4) 会生成如下矩阵:
# [[1. 0. 0. 0.]
# [0. 1. 0. 0.]
# [0. 0. 1. 0.]
# [0. 0. 0. 1.]]
# 然后后面跟的这个 [best_action] 是在选取单位矩阵中对应于 best_action 的行。
# best_action 是该状态下预期回报最大的动作的索引。例如,如果 best_action 是 2,则选择单位矩阵的第三行 [0. 0. 1. 0.]。
self.policy[s] = np.eye(self.env.action_space.n)[best_action]

print("策略提升完成")
return self.policy

def compute_next_state_reward(self, state, action):
"""
计算下一个状态和回报: 这里之所以不调用 env 函数的 step 函数是因为
我们希望在计算下一个状态和回报时不改变当前环境的状态。且应该通过当前策略来计算。
"""
y, x = divmod(state, self.env.width)
if action == 0: # 上
y = max(y - 1, 0)
elif action == 1: # 右
x = min(x + 1, self.env.width - 1)
elif action == 2: # 下
y = min(y + 1, self.env.height - 1)
elif action == 3: # 左
x = max(x - 1, 0)

new_state = y * self.env.width + x
reward = -1 # 每走一步减1分
done = False

if new_state in self.env.cliff:
reward = -100 # 落入悬崖减100分
new_state = self.env.start # 重置到起始位置
elif new_state == self.env.goal:
done = True # 达到终点

return new_state, reward, done

def iterate_policy(self):
iteration = 0
while True:
print(f"\n策略迭代轮次 {iteration + 1}")
self.policy_evaluation() # 进行策略评估(即重新生成价值函数)
old_pi = copy.deepcopy(self.policy) # 将列表进行深拷贝,方便接下来进行比较
new_pi = self.policy_improvement() # 进行策略提升(即重新生成策略)
# 使用 np.array_equal 来比较两个数组是否相同
if np.array_equal(old_pi, new_pi):
print("策略稳定,迭代结束")
break
iteration += 1
return self.policy, self.V


# 创建悬崖漫步环境实例
env = CliffWalkingEnv()
env.reset()

# 创建策略迭代实例
agent = PolicyIteration(env, 0.9, 0.001)

# 运行策略迭代
policy, V = agent.iterate_policy()

# 打印最优策略和状态价值函数
print("最优策略: {0}".format(policy))
print("状态价值函数:")
print(V)

# 可视化最优策略
def simulate_best_path(env, policy):
state = env.reset() # 重置环境到起始状态
env.render() # 渲染起始状态

done = False
while not done:
# 选择当前状态下最优策略推荐的动作
action = np.argmax(policy[state])
# 执行动作并获取新状态
new_state, _, done, _ = env.step(action)
# 更新环境状态
env.state = new_state
# 渲染新状态
env.render()
time.sleep(0.5) # 稍作暂停以便观察

# 如果进入悬崖或到达目标,则结束
if new_state in env.cliff or new_state == env.goal:
break

state = new_state # 更新状态

# 使用最优策略进行模拟
simulate_best_path(env, policy)

价值迭代算法

从上面的代码运行结果中我们能发现,策略迭代中的策略评估需要进行很多轮才能收敛得到某一策略的状态函数,这需要很大的计算量,尤其是在状态和动作空间比较大的情况下。我们是否必须要完全等到策略评估完成后再进行策略提升呢?试想一下,可能出现这样的情况:虽然状态价值函数还没有收敛,但是不论接下来怎么更新状态价值,策略提升得到的都是同一个策略。

如果只在策略评估中进行一轮价值更新,然后直接根据更新后的价值进行策略提升,这样是否可以呢?答案是肯定的,这其实就是价值迭代算法,它可以被认为是一种 策略评估只进行了一轮更新的策略迭代算法。需要注意的是,价值迭代中不存在显式的策略,我们只维护一个状态价值函数。

确切来说,价值迭代可以看成一种动态规划过程,它利用的是贝尔曼最优方程:

V(s)=maxaAsP(ss,a)(R(s,a,s)+γV(s))V(s) = \max_{a \in A} \sum_{s'} P(s'|s,a) \left( R(s,a,s') + \gamma V(s') \right)

翻译成直白的话,它们之间的区别就是:

  1. 策略迭代算法

    • 策略优先:策略迭代首先从一个初始策略开始,这个策略可能是随机的或者基于某种启发式的。
    • 两个步骤循环:它分为两个主要步骤:策略评估和策略提升。
      • 策略评估:在这一步,算法计算当前策略下的每个状态的价值,即如果我按照这个策略行动,每个状态能带来多少回报的预期值。
      • 策略提升:然后,在策略提升步骤中,算法会更新策略,试图让每个状态的行动选择能带来更高的预期回报。
    • 迭代过程:这个过程反复进行,直到策略不再改变,此时我们认为找到了最优策略。
  2. 价值迭代算法

    • 价值优先:与策略迭代不同,价值迭代关注于找到每个状态的最大价值,而不是直接寻找策略。
    • 直接更新状态价值:它直接更新每个状态的价值,而不是先评估当前策略下的价值。在每次迭代中,算法计算在当前状态采取所有可能动作的预期回报,并选择最大值来更新该状态的价值。
    • 策略隐含在价值中:在价值迭代中,策略是隐含的。一旦状态价值稳定,最优策略可以通过查看每个状态下哪个动作会导致最大价值来简单地提取出来。
    • 迭代直到收敛:该过程反复进行,直到状态值的变化非常小(即收敛),此时我们可以根据最终的状态价值函数提取出最优策略。

可以把策略迭代看作是 “先有策略,后优化价值”,而价值迭代则是 “先优化价值,策略自然随之而来”。

价值迭代算法流程如下:

  1. 初始化状态价值函数:在价值迭代算法的开始,首先初始化状态价值函数 V(s)V(s) 为零或任意初始值。这个状态价值函数表示在特定状态 ss 下开始的预期回报。

  2. 价值迭代主循环:价值迭代的核心在于反复更新状态价值函数 V(s)V(s)。对于每个状态 ss,算法遍历所有可能的动作 aa,然后计算每个动作的价值 Q(s,a)Q(s, a),这个价值是根据当前的状态价值函数、可能的下一个状态 ss'、回报 RR 和折扣因子 γ\gamma 来计算的。价值 Q(s,a)Q(s, a) 计算如下:

    Q(s,a)=R(s,a)+γsP(ss,a)V(s)Q(s, a) = R(s, a) + \gamma \sum_{s'} P(s'|s, a) V(s')

    其中,P(ss,a)P(s'|s, a) 是从状态 ss 执行动作 aa 转移到状态 ss' 的概率。接着,算法会选择这些 Q(s,a)Q(s, a) 值中的最大值作为新的状态价值 V(s)V(s)

  3. 价值函数收敛判断:迭代继续进行,直到状态价值函数的变化小于一个预设的阈值 θ\theta。换句话说,当所有状态的价值不再有显著变化时,我们可以认为价值函数已经收敛。

  4. 策略提取:一旦状态价值函数收敛,算法进入策略提取阶段。对于每个状态 ss,算法再次遍历所有动作 aa,计算 Q(s,a)Q(s, a)。此时,选择使 Q(s,a)Q(s, a) 最大化的动作作为该状态下的最优动作。这样,对于每个状态,我们都能找到一个最优动作,从而构成一个最优策略。

  5. 结果输出:最终,算法输出最优策略和收敛的状态价值函数。最优策略指示在每个状态下应选择哪个动作以最大化长期回报,而状态价值函数提供了在该策略下从每个状态开始的预期回报。

价值迭代算法的优点在于它不需要显式维护策略,而是直接通过不断迭代更新状态价值函数来隐式地改进策略。这使得价值迭代在某些情况下比策略迭代更为高效和简洁。

现在来编写价值迭代的代码

class ValueIteration:
def __init__(self, env, discount_factor=0.9, theta=1e-9):
self.env = env
self.discount_factor = discount_factor
self.theta = theta
self.V = np.zeros(env.observation_space.n)
# np.ones 创建了一个填充有 1 的数组,这意味着每个状态-动作对的初始值都是 1。
# 除以 env.action_space.n:这个操作将每个状态-动作对的值除以动作空间的大小,从而使得每个状态下所有动作的初始概率相等。
# 这样,对于每个状态,策略分配的概率分布是均匀的,意味着初始策略是完全随机的。
self.policy = np.ones(
[env.observation_space.n, env.action_space.n]) / env.action_space.n

def value_iteration(self, max_iterations=10000):
"""
价值迭代函数:在每次迭代中同时更新状态价值函数和策略。
该方法不需要策略评估和策略提升的明确分离,而是在每次迭代中直接更新状态值。
"""
for iteration in range(max_iterations):
delta = 0
for s in range(self.env.observation_space.n):
v = self.V[s]
q = np.zeros(self.env.action_space.n)
for a in range(self.env.action_space.n):
next_state, reward, done = self.compute_next_state_reward(s, a)
q[a] = reward + self.discount_factor * self.V[next_state] * (1 - done)
self.V[s] = max(q) # 直接取得最大值,然后更新状态价值函数
delta = max(delta, abs(v - self.V[s]))
if delta < self.theta:
break
print(f"价值迭代进行了 {iteration + 1} 轮后完成")

def extract_policy(self):
"""
提取策略:基于价值函数生成最优策略。
对于每个状态,选择使得期望回报最大化的动作作为策略。
"""
policy = np.zeros([self.env.observation_space.n, self.env.action_space.n])
for s in range(self.env.observation_space.n):
q = np.zeros(self.env.action_space.n)
for a in range(self.env.action_space.n):
next_state, reward, done = self.compute_next_state_reward(s, a)
q[a] = reward + self.discount_factor * self.V[next_state] * (1 - done)
best_action = np.argmax(q)
policy[s] = np.eye(self.env.action_space.n)[best_action]
return policy

def iterate_value(self):
self.value_iteration()
return self.extract_policy(), self.V

def compute_next_state_reward(self, state, action):
"""
计算下一个状态和回报: 这里之所以不调用 env 函数的 step 函数是因为
我们希望在计算下一个状态和回报时不改变当前环境的状态。且应该通过当前策略来计算。
"""
y, x = divmod(state, self.env.width)
if action == 0: # 上
y = max(y - 1, 0)
elif action == 1: # 右
x = min(x + 1, self.env.width - 1)
elif action == 2: # 下
y = min(y + 1, self.env.height - 1)
elif action == 3: # 左
x = max(x - 1, 0)

new_state = y * self.env.width + x
reward = -1 # 每走一步减1分
done = False

if new_state in self.env.cliff:
reward = -100 # 落入悬崖减100分
new_state = self.env.start # 重置到起始位置
elif new_state == self.env.goal:
done = True # 达到终点

return new_state, reward, done

冰壶问题

这里可以直接使用 gym 库提供的冰壶问题的环境,来测试我们的算法。

import gymnasium as gym
import numpy as np


class ValueIteration:
def __init__(self, env, discount_factor=0.9, theta=1e-9):
self.env = env
self.discount_factor = discount_factor
self.theta = theta
self.V = np.zeros(env.observation_space.n)
self.policy = np.ones(
[env.observation_space.n, env.action_space.n]) / env.action_space.n

def value_iteration(self, max_iterations=10000):
for iteration in range(max_iterations):
delta = 0
for s in range(self.env.observation_space.n):
v = self.V[s]
q = np.zeros(self.env.action_space.n)
for a in range(self.env.action_space.n):
for prob, next_state, reward, done in self.env.P[s][a]:
q[a] += prob * (reward + self.discount_factor *
self.V[next_state] * (1 - done))
self.V[s] = max(q)
delta = max(delta, abs(v - self.V[s]))
if delta < self.theta:
break
print(f"价值迭代进行了 {iteration + 1} 轮后完成")

def extract_policy(self):
policy = np.zeros(
[self.env.observation_space.n, self.env.action_space.n])
for s in range(self.env.observation_space.n):
q = np.zeros(self.env.action_space.n)
for a in range(self.env.action_space.n):
for prob, next_state, reward, done in self.env.P[s][a]:
q[a] += prob * (reward + self.discount_factor *
self.V[next_state] * (1 - done))
best_action = np.argmax(q)
policy[s] = np.eye(self.env.action_space.n)[best_action]
return policy

def iterate_value(self):
self.value_iteration()
return self.extract_policy(), self.V


# 创建 FrozenLake 环境实例
env = gym.make("FrozenLake-v1", is_slippery=False,
map_name="4x4", render_mode="human")
env.reset()

# 创建价值迭代实例
agent = ValueIteration(env, 0.9, 0.001)

# 运行价值迭代
policy, V = agent.iterate_value()

# 打印最优策略和状态价值函数
print("最优策略:")
print(policy)
print("状态价值函数:")
print(V)


def simulate_best_path(env, policy):
state, _ = env.reset()
env.render()
done = False
while not done:
action = np.argmax(policy[state])
new_state, _, done, _, _ = env.step(action)
env.render()
state = new_state


# 使用最优策略进行模拟
simulate_best_path(env, policy)

References