Value Iteration
-
Bellman optimality equation
-
The contraction_mapping_theorem suggests an iterative algorithm:
where can be arbitrary. This algorithm can be eventually find the optimal state value and an optimal policy.
-
This algorithm is called
-
Step 1: policy update. Given
-
Step 2: value update.
是state value吗?
不,这里的是迭代过程中的值,不一定满足bellman equation的,比如就是随机的
Algorithm
Appendix
Here is the implementation of value iteration algorithm for the FrozenLake environment in gymnasium. The code includes:
- two implementations: one using element-wise operations and the other using matrix form
- a function to print the state values and policy in a grid format
- a video of the agent’s performance in the FrozenLake environment
- a 32x32 map with 40% holes
python value_iteration_frozenlake.py
# value_iteration_frozenlake.py
import gymnasium as gym
from gymnasium.envs.toy_text.frozen_lake import FrozenLakeEnv, generate_random_map
import numpy as np
from tabulate import tabulate
def value_iteration_element_wise(env, gamma=0.99, tol=1e-10):
"""
Implement value iteration algorithm for the FrozenLake environment.
- refer: https://donglinkang2021.github.io/RL-Note/concepts/value-iteration#algorithm
"""
# Get system model, include the p(s'|s,a) and p(r|s,a) here,
# for details, refer to https://donglinkang2021.github.io/RL-Note/concepts/Bellman-Equation#derivation
model_P = env.P # n_states x m_actions x (prob, next_state, reward, done)
n_states = env.observation_space.n
m_actions = env.action_space.n
# Initialize state value
state_value = np.zeros(n_states)
action_value = np.zeros((n_states, m_actions))
# get the optimal state value v_{\pi^*} for the optimal policy \pi^*
while True:
old_state_value = state_value.copy()
for state in range(n_states):
# for each action, calculate the action value q(s,a)
for action in range(m_actions):
action_value[state][action] = sum([
prob * (
reward + gamma * state_value[next_state] * (not done)
) for prob, next_state, reward, done in model_P[state][action]
])
# Update state value for the current state
state_value[state] = np.max(action_value[state])
# Check convergence
if np.max(np.abs(old_state_value - state_value)) < tol:
break
# get the policy optimal policy \pi^*
policy = action_value.argmax(axis=1)
return state_value, policy
def value_iteration_matrix_form(env, gamma=0.99, tol=1e-10):
model_P = env.P # n_states x m_actions x (prob, next_state, reward, done)
n_states = env.observation_space.n
m_actions = env.action_space.n
# Initialize state value and action value
V = np.zeros(n_states)
Q = np.zeros((n_states, m_actions))
P_s = np.zeros((n_states, m_actions, n_states))
R = np.zeros((n_states, m_actions))
is_done = np.zeros((n_states, m_actions))
# Construct the transition and reward matrices
for state in range(n_states):
for action in range(m_actions):
for prob, next_state, reward, done in model_P[state][action]:
P_s[state][action][next_state] = prob
R[state][action] = reward
is_done[state][action] = done
while True:
old_V = V.copy()
# Update action value using matrix multiplication
Q = R + gamma * np.einsum('ijk,k->ij', P_s, V) * (1 - is_done)
# Update state value
V = np.max(Q, axis=1)
# Check convergence
if np.max(np.abs(old_V - V)) < tol:
break
pi = np.argmax(Q, axis=1)
return V, pi
def print_state_value_and_policy(state_value, policy, map_size):
"""
Print the state value and policy in a grid format using tabulate.
Args:
state_value (numpy.ndarray): State values for each state
policy (numpy.ndarray): Policy (action) for each state
map_size (int): Size of the square grid map
"""
# Convert policy numbers to arrows for readability
# 0: LEFT, 1: DOWN, 2: RIGHT, 3: UP
policy_arrows = {0: "←", 1: "↓", 2: "→", 3: "↑"}
# Reshape state value and policy to match the grid
state_value_grid = state_value.reshape(map_size, map_size)
policy_grid = np.array([policy_arrows[a] for a in policy]).reshape(map_size, map_size)
# Format state values
state_value_table = [[f"{val:.4f}" for val in row] for row in state_value_grid]
# Print tables
print("State Values:")
print(tabulate(state_value_table, tablefmt="fancy_grid"))
print("\nPolicy (← = LEFT, ↓ = DOWN, → = RIGHT, ↑ = UP):")
print(tabulate(policy_grid, tablefmt="fancy_grid"))
def main():
env = FrozenLakeEnv(
desc = generate_random_map(
size = 32, # size of the map
p = 0.6, # probability of a tile is frozen
seed = 42
),
is_slippery = False, # slippery or not
render_mode = 'rgb_array' # for non-interactive mode, training
)
optimal_state_value, optimal_policy = value_iteration_element_wise(env)
# optimal_state_value, optimal_policy = value_iteration_matrix_form(env)
print_state_value_and_policy(optimal_state_value, optimal_policy, env.ncol)
env = gym.wrappers.RecordVideo(
env,
episode_trigger=lambda num: num % 2 == 0,
video_folder="results/vedios",
name_prefix="fronzenlake-value-iteration",
)
state, info = env.reset()
for _ in range(1000):
# action = env.action_space.sample()
action = optimal_policy[state]
next_state, reward, terminated, truncated, info = env.step(action)
state = next_state
if terminated:
break
env.close()
if __name__ == "__main__":
main()