{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Hands-On: Unsupervised Learning - Reinforcement Learning\n",
"### AI and Predictive Analytics in Data-Center Environments - http://dcai.bsc.es"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"In this hands-on we will see an example of _reinforcement learning_, another unsupervised learning method which does not learn from a labeled dataset but rather from a series of actions and its corresponding rewards. Let's start with some theory:"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Let **Temporal Differencing** Q-Learning be the formula:\n",
"\n",
" $$Q(s, a) \\leftarrow Q(s, a) + \\alpha (R(s) + \\gamma \\cdot max_{a'} Q(s', a') - Q(s, a))$$\n",
"\n",
"...also known as SARSA when the new state selection is obtained from our current $Q(s,a)$ look-up table.\n",
"\n",
" * $Q(s,a)$ : is the **score** of the current status given the current action\n",
" * $\\\\alpha$ : is the **learning rate**\n",
" * $R(s)$ : is the **reward** for reaching status *s*\n",
" * $\\\\gamma$ : is the **discount factor** for the expected new score from next status\n",
" * $max_{a'} Q(s',a')$ : is the **best expected score** for available actions and status\n",
"\n",
"We also consider the Utility Function $U(s)$ as\n",
"\n",
"$$U(s) = max_{a} Q(s,a) $$\n",
"\n",
"so we want at each step maximize the *Utility* of our state by choosing the best action each time:\n",
"\n",
"$$Q(s, a) \\leftarrow Q(s, a) + \\alpha (R(s) + \\gamma \\cdot U(s') - Q(s, a))$$\n",
"\n",
"When *s* is a final state, we consider the utility as the final reward obtained."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Implementation of the Q-Learning function"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Before implementing the **qlearn** function itself we need to implement the utility function which exhaustively finds the best score, action and state given a set of valid actions and current scoring:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import numpy as np\n",
"import numpy.random as rd\n",
"import math\n",
"\n",
"# Auxiliar function to get the index of a given action\n",
"def find_action_index(actions, a):\n",
" return [x==a[0] and y==a[1] for x, y in actions].index(True)\n",
"\n",
"# Utility function // Fitness function\n",
"def utility_function(s, valid_actions, q_scoring):\n",
" best_q = -math.inf\n",
" best_a = -math.inf\n",
" best_s = s\n",
" \n",
" # Shuffle the actions\n",
" shuffled_actions = rd.permutation(valid_actions)\n",
" for a_prime in shuffled_actions:\n",
" # Tentative status\n",
" s_prime = a_prime + s\n",
" # Check score for Q(state', action')\n",
" q_prime = q_scoring[s_prime[0], s_prime[1], find_action_index(actions, a_prime)]\n",
" \n",
" if(q_prime > best_q):\n",
" best_q = q_prime\n",
" best_a = a_prime\n",
" best_s = s_prime\n",
" return {\"q\": best_q, \"a\": best_a, \"s\": best_s }"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We also need a **viability function** that returns the list of possible actions from a given state:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Viability function\n",
"def viability_function(actions, s):\n",
" valid_actions = []\n",
" for a in actions:\n",
" a_prime = a\n",
" if s[0] + a_prime[0] >= 0 \\\n",
" and s[1] + a_prime[1] >= 0 \\\n",
" and s[0] + a_prime[0] < rewards.shape[0] \\\n",
" and s[1] + a_prime[1] < rewards.shape[1]:\n",
" valid_actions.append(a_prime)\n",
" return np.array(valid_actions)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now we can create the **qlearn** function:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# qlearn function\n",
"def qlearn(actions, rewards, s_initial, alpha, gamma, max_iters, q_scoring=None):\n",
" # Initialize scoring, state and action variables\n",
" if q_scoring is None:\n",
" q_scoring = np.full([rewards.shape[0], rewards.shape[1], len(actions)], 0.5)\n",
" s = s_initial\n",
" a = actions[0]\n",
" \n",
" # Initialize counters for breaking loop\n",
" convergence_count = 0\n",
" iteration_count = 0\n",
" prev_status = (-1,-1)\n",
" \n",
" while iteration_count < max_iters:\n",
" # Get valid actions\n",
" valid_actions = viability_function(actions, s)\n",
" \n",
" # Get maximum Q in current Status s\n",
" best = utility_function(s, valid_actions, q_scoring)\n",
" \n",
" # Check convergence. We add a convergence threshold to stop\n",
" convergence_count = convergence_count+1 if (all((s == best['q']).flatten()) \\\n",
" or (all(prev_status == best['s']))) else 0\n",
" if convergence_count > 5: break\n",
" \n",
" # Solve Reward\n",
" r = rewards[s[0], s[1]]\n",
" \n",
" # Update State-Action Table\n",
" action_index = find_action_index(actions, a)\n",
" current_score = q_scoring[s[0], s[1], action_index]\n",
" q_scoring[s[0], s[1], action_index] = current_score + alpha * (r + gamma*best['q'] - current_score)\n",
" \n",
" # Change the status\n",
" prev_status = s\n",
" s = best['s']\n",
" a = best['a']\n",
" iteration_count = iteration_count + 1\n",
" \n",
" print(\"Iteration: {}\".format(iteration_count))\n",
" print(\" Best Position: ({},{})\".format(*best['s']))\n",
" print(\" Best Action: ({},{})\".format(*best['a']))\n",
" print(\" Best Value: {}\".format(best['q']))\n",
" print()\n",
"\n",
" \n",
" return { \"s\": s, \"a\": a, \"r\": rewards[s[0], s[1]], \"it\": iteration_count, \"q\": q_scoring }"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Case of example"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Consider a bidimensional space of 4 x 4 cells, with the following rewards for being in each cell:\n",
"\n",
"| | [,0] | [,1] | [,2] | [,3] |\n",
"|:--------:|:----:|:----:|:-----:|:----:|\n",
"| **[0,]** | +0 | +0 | +0 | +0 |\n",
"| **[1,]** | +0 | +0 | +0 | +0 |\n",
"| **[2,]** | +0 | +0 | -0.04 | +0 |\n",
"| **[3,]** | +0 | +0 | +1 | -0.1 |\n",
"\n",
"There is a goal point, position **[3,2]**, with high reward for being on it, and no reward or negative reward for leaving it.\n",
"\n",
"Our actions are *king* movements in a chess game, plus the *No Operation* movement. Adding the NOP movement allows us to remain in the best position when found, then exhaust the convergence steps until loop breaks, finishing the game. The NOP has as drawback that we could get stuck in a local sub-optimal, while forcing us to always move could let us escape from them.\n",
"\n",
"Problem Details:\n",
" * Space has dimensions 4 x 4\n",
" * Goal is to reach [3,2] (We don't tell which is the goal, but rather we reward it better)\n",
" * Start point is at Random\n",
" * Reward depends only on the current position\n",
" "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"scrolled": false
},
"outputs": [],
"source": [
"# Seed the RNG for repeatability\n",
"rd.seed(2019)\n",
"\n",
"# Generate the king movement by iterating all possible coordinates \n",
"# between -1 and 1 for the two coordinates\n",
"actions = np.array([(i,j) for i in range(-1,2) for j in range(-1,2)])\n",
"\n",
"# Generate the reward map\n",
"rewards = np.zeros((4,4))\n",
"rewards[2,2] = -0.04\n",
"rewards[3,2] = 1\n",
"rewards[3,3] = -0.1\n",
"\n",
"# Choose an initial state uniformly at random from all possible positions\n",
"initial_state = (rd.randint(0,4), rd.randint(0,4))\n",
"\n",
"alpha = 0.5\n",
"gamma = 1.0\n",
"max_iters = 50\n",
"\n",
"# Run the function\n",
"final_result = qlearn(actions, rewards, initial_state, alpha=alpha, gamma=gamma, max_iters=max_iters)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We can see how qlearn has been able to find the spot in 37 iterations"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"print(\"Final Position: ({},{})\".format(*final_result['s']))\n",
"print(\"Final Action: ({},{})\".format(*final_result['a']))\n",
"print(\"Final Reward: {}\".format(final_result['r']))\n",
"print(\"Iterations: {}\".format(final_result['it']))\n",
"print(\"Final Score:\")\n",
"print(final_result['q'])"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.4"
}
},
"nbformat": 4,
"nbformat_minor": 4
}