Hands-On: Unsupervised Learning - Reinforcement Learning¶

AI and Predictive Analytics in Data-Center Environments - http://dcai.bsc.es¶

In this hands-on we will see an example of reinforcement learning, another unsupervised learning method which does not learn from a labeled dataset but rather from a series of actions and its corresponding rewards. Let's start with some theory:

Let Temporal Differencing Q-Learning be the formula:

$$Q(s, a) \leftarrow Q(s, a) + \alpha (R(s) + \gamma \cdot max_{a'} Q(s', a') - Q(s, a))$$

...also known as SARSA when the new state selection is obtained from our current $Q(s,a)$ look-up table.

• $Q(s,a)$ : is the score of the current status given the current action
• $\alpha$ : is the learning rate
• $R(s)$ : is the reward for reaching status s
• $\gamma$ : is the discount factor for the expected new score from next status
• $max_{a'} Q(s',a')$ : is the best expected score for available actions and status

We also consider the Utility Function $U(s)$ as

$$U(s) = max_{a} Q(s,a)$$

so we want at each step maximize the Utility of our state by choosing the best action each time:

$$Q(s, a) \leftarrow Q(s, a) + \alpha (R(s) + \gamma \cdot U(s') - Q(s, a))$$

When s is a final state, we consider the utility as the final reward obtained.

Implementation of the Q-Learning function¶

Before implementing the qlearn function itself we need to implement the utility function which exhaustively finds the best score, action and state given a set of valid actions and current scoring:

In [ ]:
import numpy as np
import numpy.random as rd
import math

# Auxiliar function to get the index of a given action
def find_action_index(actions, a):
return [x==a[0] and y==a[1] for x, y in actions].index(True)

# Utility function // Fitness function
def utility_function(s, valid_actions, q_scoring):
best_q = -math.inf
best_a = -math.inf
best_s = s

# Shuffle the actions
shuffled_actions = rd.permutation(valid_actions)
for a_prime in shuffled_actions:
# Tentative status
s_prime = a_prime + s
# Check score for Q(state', action')
q_prime = q_scoring[s_prime[0], s_prime[1], find_action_index(actions, a_prime)]

if(q_prime > best_q):
best_q = q_prime
best_a = a_prime
best_s = s_prime
return {"q": best_q, "a": best_a, "s": best_s }

We also need a viability function that returns the list of possible actions from a given state:

In [ ]:
# Viability function
def viability_function(actions, s):
valid_actions = []
for a in actions:
a_prime = a
if s[0] + a_prime[0] >= 0 \
and s[1] + a_prime[1] >= 0 \
and s[0] + a_prime[0] < rewards.shape[0] \
and s[1] + a_prime[1] < rewards.shape[1]:
valid_actions.append(a_prime)
return np.array(valid_actions)

Now we can create the qlearn function:

In [ ]:
# qlearn function
def qlearn(actions, rewards, s_initial, alpha, gamma, max_iters, q_scoring=None):
# Initialize scoring, state and action variables
if q_scoring is None:
q_scoring = np.full([rewards.shape[0], rewards.shape[1], len(actions)], 0.5)
s = s_initial
a = actions[0]

# Initialize counters for breaking loop
convergence_count = 0
iteration_count = 0
prev_status = (-1,-1)

while iteration_count < max_iters:
# Get valid actions
valid_actions = viability_function(actions, s)

# Get maximum Q in current Status s
best = utility_function(s, valid_actions, q_scoring)

# Check convergence. We add a convergence threshold to stop
convergence_count = convergence_count+1 if (all((s == best['q']).flatten()) \
or (all(prev_status == best['s']))) else 0
if convergence_count > 5: break

# Solve Reward
r = rewards[s[0], s[1]]

# Update State-Action Table
action_index = find_action_index(actions, a)
current_score = q_scoring[s[0], s[1], action_index]
q_scoring[s[0], s[1], action_index] = current_score + alpha * (r + gamma*best['q'] - current_score)

# Change the status
prev_status = s
s = best['s']
a = best['a']
iteration_count = iteration_count + 1

print("Iteration: {}".format(iteration_count))
print(" Best Position: ({},{})".format(*best['s']))
print(" Best Action: ({},{})".format(*best['a']))
print(" Best Value: {}".format(best['q']))
print()

return { "s": s, "a": a, "r": rewards[s[0], s[1]], "it": iteration_count, "q": q_scoring }

Case of example¶

Consider a bidimensional space of 4 x 4 cells, with the following rewards for being in each cell:

[,0] [,1] [,2] [,3]
[0,] +0 +0 +0 +0
[1,] +0 +0 +0 +0
[2,] +0 +0 -0.04 +0
[3,] +0 +0 +1 -0.1

There is a goal point, position [3,2], with high reward for being on it, and no reward or negative reward for leaving it.

Our actions are king movements in a chess game, plus the No Operation movement. Adding the NOP movement allows us to remain in the best position when found, then exhaust the convergence steps until loop breaks, finishing the game. The NOP has as drawback that we could get stuck in a local sub-optimal, while forcing us to always move could let us escape from them.

Problem Details:

• Space has dimensions 4 x 4
• Goal is to reach [3,2] (We don't tell which is the goal, but rather we reward it better)
• Start point is at Random
• Reward depends only on the current position
In [ ]:
# Seed the RNG for repeatability
rd.seed(2019)

# Generate the king movement by iterating all possible coordinates
# between -1 and 1 for the two coordinates
actions = np.array([(i,j) for i in range(-1,2) for j in range(-1,2)])

# Generate the reward map
rewards = np.zeros((4,4))
rewards[2,2] = -0.04
rewards[3,2] = 1
rewards[3,3] = -0.1

# Choose an initial state uniformly at random from all possible positions
initial_state = (rd.randint(0,4), rd.randint(0,4))

alpha = 0.5
gamma = 1.0
max_iters = 50

# Run the function
final_result = qlearn(actions, rewards, initial_state, alpha=alpha, gamma=gamma, max_iters=max_iters)

We can see how qlearn has been able to find the spot in 37 iterations

In [ ]:
print("Final Position: ({},{})".format(*final_result['s']))
print("Final Action: ({},{})".format(*final_result['a']))
print("Final Reward: {}".format(final_result['r']))
print("Iterations: {}".format(final_result['it']))
print("Final Score:")
print(final_result['q'])