Numerical Computing with Python
上QQ阅读APP看书,第一时间看更新

Grid world example using value and policy iteration algorithms with basic Python

The classic grid world example has been used to illustrate value and policy iterations with Dynamic Programming to solve MDP's Bellman equations. In the following grid, the agent will start at the south-west corner of the grid in (1,1) position and the goal is to move towards the north-east corner, to position (4,3). Once it reaches the goal, the agent will get a reward of +1. During the journey, it should avoid the danger zone (4,2), because this will give out a negative penalty of reward -1. The agent cannot get into the position where the obstacle (2,2) is present from any direction. Goal and danger zones are the terminal states, which means the agent continues to move around until it reaches one of these two states. The reward for all the other states would be -0.02. Here, the task is to determine the optimal policy (direction to move) for the agent at every state (11 states altogether), so that the agent's total reward is the maximum, or so that the agent can reach the goal as quickly as possible. The agent can move in 4 directions: north, south, east and west.

The complete code was written in the Python programming language with class implementation. For further reading, please refer to object oriented programming in Python to understand class, objects, constructors, and so on.

Import the random package for generating moves in any of the N, E, S, W directions:

>>> import random,operator

The following argmax function calculated the maximum state among the given states, based on the value for each state:

>>> def argmax(seq, fn):
... best = seq[0]; best_score = fn(best)
... for x in seq:
... x_score = fn(x)
... if x_score > best_score:
... best, best_score = x, x_score
... return best

To add two vectors at component level, the following code has been utilized for:

>>> def vector_add(a, b):
... return tuple(map(operator.add, a, b))

Orientations provide what the increment value would be, which needs to be added to the existing position of the agent; orientations can be applied on the x-axis or y-axis:

>>> orientations = [(1,0), (0, 1), (-1, 0), (0, -1)]

The following function is used to turn the agent in the right direction, as we know at every command the agent moves in that direction about 80% of the time, whilst 10% of the time it would move right, and 10% it would move left.:

>>> def turn_right(orientation):
... return orientations[orientations.index(orientation)-1]
>>> def turn_left(orientation):
... return orientations[(orientations.index(orientation)+1) % len(orientations)]
>>> def isnumber(x):
... return hasattr(x, '__int__')

The Markov decision process is defined as a class here. Every MDP is defined by an initial position, state, transition model, reward function, and gamma values.

>>> class MDP:
... def __init__(self, init_pos, actlist, terminals, transitions={}, states=None, gamma=0.99):
... if not (0 < gamma <= 1):
... raise ValueError("MDP should have 0 < gamma <= 1 values")
... if states:
... self.states = states
... else:
... self.states = set()
... self.init_pos = init_pos
... self.actlist = actlist
... self.terminals = terminals
... self.transitions = transitions
... self.gamma = gamma
... self.reward = {}

Returns a numeric reward for the state:

... def R(self, state):
... return self.reward[state]

Transition model with from a state and an action returns a list of (probability, result-state) pairs for each state:

... def T(self, state, action):
... if(self.transitions == {}):
... raise ValueError("Transition model is missing")
... else:
... return self.transitions[state][action]

Set of actions that can be performed at a particular state:

... def actions(self, state):
... if state in self.terminals:
... return [None]
... else:
... return self.actlist

Class GridMDP is created for modeling a 2D grid world with grid values at each state, terminal positions, initial position, and gamma value (discount):

>>> class GridMDP(MDP):
... def __init__(self, grid, terminals, init_pos=(0, 0), gamma=0.99):

The following code is used for reversing the grid, as we would like to see row 0 at the bottom instead of at the top:

... grid.reverse()

The following __init__ command is a constructor used within the grid class for initializing parameters:

... MDP.__init__(self, init_pos, actlist=orientations,
terminals=terminals, gamma=gamma)
... self.grid = grid
... self.rows = len(grid)
... self.cols = len(grid[0])
... for x in range(self.cols):
... for y in range(self.rows):
... self.reward[x, y] = grid[y][x]
... if grid[y][x] is not None:
... self.states.add((x, y))

State transitions provide randomly 80% toward the desired direction and 10% for left and right. This is to model the randomness in a robot which might slip on the floor, and so on:

... def T(self, state, action):
... if action is None:
... return [(0.0, state)]
... else:
... return [(0.8, self.go(state, action)),
... (0.1, self.go(state, turn_right(action))),
... (0.1, self.go(state, turn_left(action)))]

Returns the state that results from going in the direction, subject to where that state is in the list of valid states. If the next state is not in the list, like hitting the wall, then the agent should remain in the same state:

... def go(self, state, direction):
... state1 = vector_add(state, direction)
... return state1 if state1 in self.states else state

Convert a mapping from (x, y) to v into [[..., v, ...]] grid:

... def to_grid(self, mapping):
... return list(reversed([[mapping.get((x, y), None)
... for x in range(self.cols)]
... for y in range(self.rows)]))

Convert orientations into arrows for better graphical representations:

... def to_arrows(self, policy):
... chars = {(1, 0): '>', (0, 1): '^', (-1, 0): '<', (0, -1):
'v', None: '.'}
... return self.to_grid({s: chars[a] for (s, a) in policy.items()})

The following code is used for solving an MDP, using value iterations, and returns optimum state values:

>>> def value_iteration(mdp, epsilon=0.001):
... STSN = {s: 0 for s in mdp.states}
... R, T, gamma = mdp.R, mdp.T, mdp.gamma
... while True:
... STS = STSN.copy()
... delta = 0
... for s in mdp.states:
... STSN[s] = R(s) + gamma * max([sum([p * STS[s1] for
... (p, s1) in T(s,a)]) for a in mdp.actions(s)])
... delta = max(delta, abs(STSN[s] - STS[s]))
... if delta < epsilon * (1 - gamma) / gamma:
... return STS

Given an MDP and a utility function STS, determine the best policy, as a mapping from state to action:

>>> def best_policy(mdp, STS):
... pi = {}
... for s in mdp.states:
... pi[s] = argmax(mdp.actions(s), lambda a: expected_utility(a, s, STS, mdp))
... return pi

The expected utility of doing a in state s, according to the MDP and STS:

>>> def expected_utility(a, s, STS, mdp):
... return sum([p * STS[s1] for (p, s1) in mdp.T(s, a)])

The following code is used to solve an MDP using policy iterations by alternatively performing policy evaluation and policy improvement steps:

>>> def policy_iteration(mdp):
... STS = {s: 0 for s in mdp.states}
... pi = {s: random.choice(mdp.actions(s)) for s in mdp.states}
... while True:
... STS = policy_evaluation(pi, STS, mdp)
... unchanged = True
... for s in mdp.states:
... a = argmax(mdp.actions(s),lambda a: expected_utility(a, s, STS, mdp))
... if a != pi[s]:
... pi[s] = a
... unchanged = False
... if unchanged:
... return pi

The following code is used to return an updated utility mapping U from each state in the MDP to its utility, using an approximation (modified policy iteration):

>>> def policy_evaluation(pi, STS, mdp, k=20):
... R, T, gamma = mdp.R, mdp.T, mdp.gamma
.. for i in range(k):
... for s in mdp.states:
... STS[s] = R(s) + gamma * sum([p * STS[s1] for (p, s1) in T(s, pi[s])])
... return STS

>>> def print_table(table, header=None, sep=' ', numfmt='{}'):
... justs = ['rjust' if isnumber(x) else 'ljust' for x in table[0]]
... if header:
... table.insert(0, header)
... table = [[numfmt.format(x) if isnumber(x) else x for x in row]
... for row in table]
... sizes = list(map(lambda seq: max(map(len, seq)),
... list(zip(*[map(str, row) for row in table]))))
... for row in table:
... print(sep.join(getattr(str(x), j)(size) for (j, size, x)
... in zip(justs, sizes, row)))

The following is the input grid of a 4 x 3 grid environment that presents the agent with a sequential decision-making problem:

>>> sequential_decision_environment = GridMDP([[-0.02, -0.02, -0.02, +1],
... [-0.02, None, -0.02, -1],
... [-0.02, -0.02, -0.02, -0.02]],
... terminals=[(3, 2), (3, 1)])

The following code is for performing a value iteration on the given sequential decision-making environment:

>>> value_iter = best_policy(sequential_decision_environment,value_iteration (sequential_decision_environment, .01))
>>> print("\n Optimal Policy based on Value Iteration\n")
>>> print_table(sequential_decision_environment.to_arrows(value_iter))

The code for policy iteration is:

>>> policy_iter = policy_iteration(sequential_decision_environment)
>>> print("\n Optimal Policy based on Policy Iteration & Evaluation\n")
>>> print_table(sequential_decision_environment.to_arrows(policy_iter))

From the preceding output with two results, we can conclude that both value and policy iterations provide the same optimal policy for an agent to move across the grid to reach the goal state in the quickest way possible. When the problem size is large enough, it is computationally advisable to go for value iteration rather than policy iteration, as in policy iterations, we need to perform two steps at every iteration of the policy evaluation and policy improvement.