Value Iteration
Value iteration is a reinforcement learning algorithm that is used to solve Markov Decision Processes (MDPs). It is a type of dynamic programming algorithm that works by iteratively improving the estimate of the optimal value function for a given MDP.
To understand how value iteration works, it's important to first understand the concept of a value function in reinforcement learning. A value function is a mathematical function that takes in a state and action, and returns the expected long-term reward for taking that action in that state. The goal of reinforcement learning is to find the optimal value function, which is the function that will maximize the expected long-term reward for a given MDP.
Value iteration is an algorithm for finding the optimal value function for an MDP. It works by iteratively updating the value of each state in the MDP, based on the values of the states that can be reached from that state. The algorithm starts by initializing the value of each state to some initial value, and then repeatedly updates the values of each state based on a simple update rule: the value of a state is equal to the maximum expected long-term reward of any action that can be taken in that state.
This update rule is applied iteratively until the values of the states converge to a fixed point, at which point the algorithm has found the optimal value function for the MDP. This convergence can be shown to occur in a finite number of iterations, so value iteration is guaranteed to find the optimal value function for any MDP.
In practice, value iteration is often used as a subroutine in more complex reinforcement learning algorithms. It is simple to implement and converges quickly, making it a useful building block for more advanced reinforcement learning algorithms. However, it is not always the most efficient algorithm for solving MDPs, and other algorithms such as policy iteration and Q-learning may be more suitable for some tasks.
import numpy as np
class Action():
def __init__(self):
self.action_space = {'U': (-1, 0), 'D': (1, 0),\
'L': (0, -1), 'R': (0, 1)}
self.possible_actions = list(self.action_space.keys())
self.action_n = len(self.possible_actions)
self.action_idxs = range(self.action_n)
def get_action_by_idx(self, idx):
return self.action_space[self.possible_actions[idx]]
def get_action_by_key(self, key):
return self.action_space[key]
class GridWorld():
def __init__(self, shape=(3, 3), obstacles=[], terminal=None,\
agent_pos=np.array([0, 0])):
self.action = Action()
self.shape = shape
self.rows = shape[0]
self.cols = shape[1]
self.obstacles = obstacles
self.agent_pos = agent_pos
self.agent_init_pos = agent_pos
if terminal is None:
self.terminal_state = (self.rows-1, self.cols-1)
else:
self.terminal_state = terminal
self.done = False
self.add_state_action = lambda state, action : \
tuple(np.array(state) + np.array(self.action.get_action_by_idx(action)))
self.is_same_state = lambda s1, s2 : s1[0] == s2[0] and s1[1] == s2[1]
def is_obstacle(self, state):
for s in self.obstacles:
if self.is_same_state(s, state):
return True
return False
def is_terminal(self, s):
return self.is_same_state(s, self.terminal_state)
def is_edge(self, state):
if state[0] < 0 or state[0] > self.rows -1 \
or state[1] < 0 or state[1] > self.cols -1:
return True
return False
def set_agent_pos(self, state):
self.agent_pos = state
def get_agent_pos(self):
return self.agent_pos
def step(self, action):
# agent location is current agent location + action
state = self.get_agent_pos()
tmp_state = self.add_state_action(state, action)
#print(f"tmp_state{tmp_state}")
if self.is_obstacle(tmp_state):
# print("OBSTACLES")
pass
elif self.is_terminal(tmp_state):
self.set_agent_pos(tmp_state)
#print(f"terminal_state:{tmp_state}")
self.done = True
#print("Done")
elif self.is_edge(tmp_state):
# print("Edge")
pass
else:
self.set_agent_pos(tmp_state)
reward = -1 if not self.done else 0
return self.get_agent_pos(), reward, self.done, None
def simulated_step(self, state, action):
if self.is_terminal(state):
#state = next_state
return state, 0, True, None
next_state = self.add_state_action(state, action)
if self.is_obstacle(next_state) or self.is_edge(next_state):
pass
else:
state = next_state
return state, -1, False, None
def reset(self):
self.set_agent_pos(self.agent_init_pos)
self.done = False
return self.agent_init_pos
def render(self):
for r in range(self.rows):
for c in range(self.cols):
state = np.array((r, c))
if self.is_terminal(state):
if self.done:
print('[O]', end="\t")
else:
print('[]', end="\t")
elif self.is_same_state(self.get_agent_pos(), state):
print('O', end="\t")
elif self.is_obstacle(state):
print('X', end="\t")
else:
print('-', end="\t")
print()
if __name__ == '__main__':
env = GridWorld(shape=(5, 5), obstacles=((0, 1), (1, 1)))
env.render()
env.step(0) # 0 -> up
env.step(1) # 1 -> down
env.step(2) # 2 -> left
env.render()
import numpy as np
import time
from utils import *
from env import GridWorld
class ValueIteration():
def __init__(self, env, theta=1e-6, gamma=.99):
self.env = env
self.env_shape = env.shape
self.actions = env.action.action_idxs
self.actions_n = len(self.actions)
self.policy_probs = self._init_policy_probs_table()
self.V = {}
self.reset_V_table()
self.gamma = gamma
self.theta = theta
def _init_policy_probs_table(self):
return np.full((self.env_shape[0], self.env_shape[1], self.actions_n),\
1/self.actions_n)
def _policy(self, state):
return self.policy_probs[state[0], state[1]]
def choose_action(self, state):
print(f"Selected action: {self._policy(state)}")
return np.random.choice(self.actions, p=self._policy(state))
def reset_V_table(self):
# TODO: V table must be reset when the environment is reset
for r in range(self.env.rows):
for c in range(self.env.cols):
self.V[(r, c)] = 0
def value_iteration(self):
delta = float("inf")
while delta > self.theta:
delta = 0
for state in self.V:
if self.env.is_terminal(state):
break
old_value = self.V[state]
# keep track of the action probabilities
action_probs = None
# keep track of the max Q value
max_qsa = float('-inf')
# look for the max Q value back taking fake (simulated) steps
for action in self.actions:
next_state, r, _,_ = self.env.simulated_step(state, action)
qsa = r + self.gamma * self.V[(next_state[0], next_state[1])]
# identify the action with max Q value
if qsa > max_qsa:
max_qsa = qsa
action_probs = np.zeros(4)
action_probs[action] = 1.0
# update value state table
self.V[state] = max_qsa
# update the probabilities of choosing an action
# this is a deterministic policy
self.policy_probs[state] = action_probs
# keep the maximum difference of a state value
delta = max(delta, abs(max_qsa - old_value))
if __name__ == "__main__":
env = GridWorld(shape = np.array((5,5)), obstacles = np.array([[0,1], [1,1], [2,1], [3,1],\
[1,3],[2,3],[3,3],[4,3] ]))
state = env.reset()
agent = ValueIteration(env)
agent.value_iteration()
steps =0
while True:
steps += 1
clear()
action = agent.choose_action(state)
state, _, done, _ = env.step(action)
env.render()
if done:
print(f"the agent reached terminal state in {steps} steps")
plot_state_value_table(agent.V, env.cols)
break
time.sleep(.5)