Value Iteration

Value iteration is a reinforcement learning algorithm that is used to solve Markov Decision Processes (MDPs). It is a type of dynamic programming algorithm that works by iteratively improving the estimate of the optimal value function for a given MDP.


To understand how value iteration works, it's important to first understand the concept of a value function in reinforcement learning. A value function is a mathematical function that takes in a state and action, and returns the expected long-term reward for taking that action in that state. The goal of reinforcement learning is to find the optimal value function, which is the function that will maximize the expected long-term reward for a given MDP.


Value iteration is an algorithm for finding the optimal value function for an MDP. It works by iteratively updating the value of each state in the MDP, based on the values of the states that can be reached from that state. The algorithm starts by initializing the value of each state to some initial value, and then repeatedly updates the values of each state based on a simple update rule: the value of a state is equal to the maximum expected long-term reward of any action that can be taken in that state.


This update rule is applied iteratively until the values of the states converge to a fixed point, at which point the algorithm has found the optimal value function for the MDP. This convergence can be shown to occur in a finite number of iterations, so value iteration is guaranteed to find the optimal value function for any MDP.


In practice, value iteration is often used as a subroutine in more complex reinforcement learning algorithms. It is simple to implement and converges quickly, making it a useful building block for more advanced reinforcement learning algorithms. However, it is not always the most efficient algorithm for solving MDPs, and other algorithms such as policy iteration and Q-learning may be more suitable for some tasks.


The Grid World Environment
 import numpy as np

class Action(): 
    def __init__(self):
        self.action_space = {'U': (-1, 0), 'D': (1, 0),\
                                'L': (0, -1), 'R': (0, 1)}
        self.possible_actions = list(self.action_space.keys())
        self.action_n = len(self.possible_actions)
        self.action_idxs = range(self.action_n)

    def get_action_by_idx(self, idx):
        return self.action_space[self.possible_actions[idx]]

    def get_action_by_key(self, key):
        return self.action_space[key]

class GridWorld():
    def __init__(self, shape=(3, 3), obstacles=[], terminal=None,\
                    agent_pos=np.array([0, 0])):
        self.action  = Action()
        self.shape = shape
        self.rows = shape[0]
        self.cols = shape[1]
        self.obstacles = obstacles
        self.agent_pos = agent_pos
        self.agent_init_pos = agent_pos
        if terminal is None:
            self.terminal_state = (self.rows-1, self.cols-1)
        else:
            self.terminal_state = terminal
        self.done = False

        self.add_state_action = lambda state, action : \
            tuple(np.array(state) + np.array(self.action.get_action_by_idx(action)))
        self.is_same_state = lambda s1, s2 : s1[0] == s2[0] and s1[1] == s2[1]

    def is_obstacle(self, state):
        for s in self.obstacles:
            if self.is_same_state(s, state):  
                return True
        return False

    def is_terminal(self, s):
        return self.is_same_state(s, self.terminal_state)  

    def is_edge(self, state):
        if state[0] < 0 or state[0] > self.rows -1 \
            or state[1] < 0 or state[1] > self.cols -1:
            return True
        return False

    def set_agent_pos(self, state):
        self.agent_pos = state

    def get_agent_pos(self):
        return self.agent_pos
    
    def step(self, action):
        # agent location is current agent location + action
        state = self.get_agent_pos()
        tmp_state = self.add_state_action(state, action) 
        #print(f"tmp_state{tmp_state}")
        if self.is_obstacle(tmp_state):
            # print("OBSTACLES")
            pass
        elif self.is_terminal(tmp_state):
            self.set_agent_pos(tmp_state)
            #print(f"terminal_state:{tmp_state}")
            self.done = True
            #print("Done")
        elif self.is_edge(tmp_state):
            # print("Edge")
            pass
        else:        
            self.set_agent_pos(tmp_state)
        
        reward = -1 if not self.done else 0

        return self.get_agent_pos(), reward, self.done, None

    def simulated_step(self, state, action):
        if self.is_terminal(state):
            #state = next_state
            return state, 0, True, None

        next_state = self.add_state_action(state, action) 
        if self.is_obstacle(next_state) or self.is_edge(next_state):
            pass
        else:        
            state = next_state
        return state, -1, False, None

    def reset(self):
        self.set_agent_pos(self.agent_init_pos)
        self.done = False
        return self.agent_init_pos

    def render(self):
        for r in range(self.rows):
            for c in range(self.cols):
                state = np.array((r, c))
                if self.is_terminal(state):
                    if self.done:
                        print('[O]', end="\t")
                    else:
                        print('[]', end="\t")
                elif  self.is_same_state(self.get_agent_pos(), state): 
                    print('O', end="\t")
                elif self.is_obstacle(state):
                    print('X', end="\t")
                else:
                    print('-', end="\t")
            print()


if __name__ == '__main__':
    env = GridWorld(shape=(5, 5), obstacles=((0, 1), (1, 1)))
    env.render()
    env.step(0) # 0 -> up
    env.step(1) # 1 -> down
    env.step(2) # 2 -> left
    env.render()
            
 
The Value Iteration Algorithm
 import numpy as np
import time 

from utils import * 
from env import GridWorld

class ValueIteration():
    def __init__(self, env, theta=1e-6, gamma=.99):
        self.env = env
        self.env_shape = env.shape
        self.actions = env.action.action_idxs
        self.actions_n = len(self.actions)
        self.policy_probs = self._init_policy_probs_table()
        self.V = {}
        self.reset_V_table()
        self.gamma = gamma
        self.theta = theta
                            
    def _init_policy_probs_table(self):
        return np.full((self.env_shape[0], self.env_shape[1], self.actions_n),\
                                1/self.actions_n)
    def _policy(self, state):
        return self.policy_probs[state[0], state[1]]
        
    def choose_action(self, state):
        print(f"Selected action: {self._policy(state)}")
        return np.random.choice(self.actions, p=self._policy(state))

    def reset_V_table(self):
        # TODO: V table must be reset when the environment is reset
        for r in range(self.env.rows):
            for c in range(self.env.cols):
                    self.V[(r, c)] = 0

    def value_iteration(self):
        delta = float("inf")
        
        while delta > self.theta: 
            delta = 0 

            for state in self.V:
                if self.env.is_terminal(state):
                    break
                old_value = self.V[state]
                # keep track of the action probabilities 
                action_probs = None
                # keep track of the max Q value
                max_qsa = float('-inf')

                # look for the max Q value back taking fake (simulated) steps
                for action in self.actions:
                    next_state, r, _,_ = self.env.simulated_step(state, action)
                    qsa = r + self.gamma * self.V[(next_state[0], next_state[1])]

                    # identify the action with max Q value
                    if qsa > max_qsa:
                        max_qsa = qsa
                        action_probs = np.zeros(4)
                        action_probs[action] = 1.0
                    
                # update value state table 
                self.V[state] = max_qsa
                # update the probabilities of choosing an action
                # this is a deterministic policy
                self.policy_probs[state] = action_probs
                
                # keep the maximum difference of a state value
                delta = max(delta, abs(max_qsa - old_value))


if __name__ == "__main__":
    env = GridWorld(shape = np.array((5,5)), obstacles = np.array([[0,1], [1,1], [2,1], [3,1],\
                    [1,3],[2,3],[3,3],[4,3] ]))
    
    state = env.reset()
    agent = ValueIteration(env)
    agent.value_iteration()
    steps =0

    while True: 
        steps += 1
        clear()
        action = agent.choose_action(state)
        state, _, done, _ = env.step(action)
        env.render()
        if done:
            print(f"the agent reached terminal state in {steps} steps")
            plot_state_value_table(agent.V, env.cols)
            break

        time.sleep(.5)