Value Iteration

  • Bellman optimality equation

  • The contraction_mapping_theorem suggests an iterative algorithm:

    where can be arbitrary. This algorithm can be eventually find the optimal state value and an optimal policy.

  • This algorithm is called

  • Step 1: policy update. Given

  • Step 2: value update.

是state value吗?

不,这里的是迭代过程中的值,不一定满足bellman equation的,比如就是随机的

Algorithm

Appendix

Here is the implementation of value iteration algorithm for the FrozenLake environment in gymnasium. The code includes:

  1. two implementations: one using element-wise operations and the other using matrix form
  2. a function to print the state values and policy in a grid format
  3. a video of the agent’s performance in the FrozenLake environment
  4. a 32x32 map with 40% holes
python value_iteration_frozenlake.py
# value_iteration_frozenlake.py
import gymnasium as gym
from gymnasium.envs.toy_text.frozen_lake import FrozenLakeEnv, generate_random_map
import numpy as np
from tabulate import tabulate
 
def value_iteration_element_wise(env, gamma=0.99, tol=1e-10):
    """
    Implement value iteration algorithm for the FrozenLake environment.
 
    - refer: https://donglinkang2021.github.io/RL-Note/concepts/value-iteration#algorithm
    """
    # Get system model, include the p(s'|s,a) and p(r|s,a) here,
    # for details, refer to https://donglinkang2021.github.io/RL-Note/concepts/Bellman-Equation#derivation
    model_P = env.P # n_states x m_actions x (prob, next_state, reward, done)
    n_states = env.observation_space.n
    m_actions = env.action_space.n
    
    # Initialize state value
    state_value = np.zeros(n_states)
    action_value = np.zeros((n_states, m_actions))
 
    # get the optimal state value v_{\pi^*} for the optimal policy \pi^*
    while True:
        old_state_value = state_value.copy()
        for state in range(n_states):
            # for each action, calculate the action value q(s,a)
            for action in range(m_actions):
                action_value[state][action] = sum([
                    prob * (
                        reward + gamma * state_value[next_state] * (not done)
                    ) for prob, next_state, reward, done in model_P[state][action]
                ])
 
            # Update state value for the current state
            state_value[state] = np.max(action_value[state])
 
        # Check convergence
        if np.max(np.abs(old_state_value - state_value)) < tol:
            break
 
    # get the policy optimal policy \pi^*
    policy = action_value.argmax(axis=1)
    return state_value, policy
 
def value_iteration_matrix_form(env, gamma=0.99, tol=1e-10):
    model_P = env.P # n_states x m_actions x (prob, next_state, reward, done)
    n_states = env.observation_space.n
    m_actions = env.action_space.n
    
    # Initialize state value and action value
    V = np.zeros(n_states)
    Q = np.zeros((n_states, m_actions))
 
    P_s = np.zeros((n_states, m_actions, n_states))
    R = np.zeros((n_states, m_actions))
    is_done = np.zeros((n_states, m_actions))
 
    # Construct the transition and reward matrices
    for state in range(n_states):
        for action in range(m_actions):
            for prob, next_state, reward, done in model_P[state][action]:
                P_s[state][action][next_state] = prob
                R[state][action] = reward
                is_done[state][action] = done
 
    while True:
        old_V = V.copy()
        # Update action value using matrix multiplication
        Q = R + gamma * np.einsum('ijk,k->ij', P_s, V) * (1 - is_done)
 
        # Update state value
        V = np.max(Q, axis=1)
 
        # Check convergence
        if np.max(np.abs(old_V - V)) < tol:
            break
 
    pi = np.argmax(Q, axis=1)
    return V, pi
 
def print_state_value_and_policy(state_value, policy, map_size):
    """
    Print the state value and policy in a grid format using tabulate.
    
    Args:
        state_value (numpy.ndarray): State values for each state
        policy (numpy.ndarray): Policy (action) for each state
        map_size (int): Size of the square grid map
    """
    
    # Convert policy numbers to arrows for readability
    # 0: LEFT, 1: DOWN, 2: RIGHT, 3: UP
    policy_arrows = {0: "←", 1: "↓", 2: "→", 3: "↑"}
    
    # Reshape state value and policy to match the grid
    state_value_grid = state_value.reshape(map_size, map_size)
    policy_grid = np.array([policy_arrows[a] for a in policy]).reshape(map_size, map_size)
    
    # Format state values
    state_value_table = [[f"{val:.4f}" for val in row] for row in state_value_grid]
    
    # Print tables
    print("State Values:")
    print(tabulate(state_value_table, tablefmt="fancy_grid"))
    
    print("\nPolicy (← = LEFT, ↓ = DOWN, → = RIGHT, ↑ = UP):")
    print(tabulate(policy_grid, tablefmt="fancy_grid"))
 
 
def main():
 
    env = FrozenLakeEnv(
        desc = generate_random_map(
            size = 32, # size of the map
            p = 0.6, # probability of a tile is frozen
            seed = 42
        ),
        is_slippery = False, # slippery or not
        render_mode = 'rgb_array' # for non-interactive mode, training
    )
    
    optimal_state_value, optimal_policy = value_iteration_element_wise(env)
    # optimal_state_value, optimal_policy = value_iteration_matrix_form(env)
    print_state_value_and_policy(optimal_state_value, optimal_policy, env.ncol)
 
    env = gym.wrappers.RecordVideo(
        env,
        episode_trigger=lambda num: num % 2 == 0,
        video_folder="results/vedios",
        name_prefix="fronzenlake-value-iteration",
    )
 
    state, info = env.reset()
    for _ in range(1000):
        # action = env.action_space.sample()
        action = optimal_policy[state]
        next_state, reward, terminated, truncated, info = env.step(action)
        state = next_state
        if terminated:
            break
 
    env.close()
 
if __name__ == "__main__":
    main()