Introdução
De alguma forma, ao ler o livro "Aprendizagem por Reforço: Uma Introdução", pensei em complementar meus conhecimentos teóricos com os práticos, mas não havia vontade de resolver o próximo problema de equilibrar uma barra, ensinar um agente a jogar xadrez ou inventar outra bicicleta.
Ao mesmo tempo, o livro continha um exemplo interessante de otimização da fila do cliente, que, por um lado, não é muito complicado em termos de implementação / compreensão do processo e, por outro lado, é bastante interessante e pode ser implementado com algum sucesso na vida real.
Tendo mudado ligeiramente este exemplo, tive a ideia, que será discutida mais tarde.
Formulação do problema
Então, imagine a seguinte foto:
Temos à nossa disposição uma padaria que produz 6 (condicionalmente) toneladas de tortas de framboesa todos os dias e distribui esses produtos em três lojas todos os dias.
No entanto, a melhor forma de fazer isso para que haja o mínimo de produtos vencidos possível (desde que a vida útil das tortas seja de três dias), se tivermos apenas três caminhões com capacidade de 1, 2 e 3 toneladas, respectivamente, em cada ponto de venda é mais lucrativo enviar apenas um caminhão (porque estão bem distantes) e, além disso, apenas uma vez por dia após assar as tortas e, além disso, não sabemos o poder de compra das nossas lojas (pois o negócio está apenas começando)?
Vamos concordar que a estratégia de layout FIFO funciona perfeitamente nas lojas, nas quais os clientes levam apenas as mercadorias que foram produzidas mais tarde que as outras, mas se a torta de framboesa não foi comprada em três dias, o pessoal da loja se livra dela.
Nós (condicionalmente) não sabemos qual será a demanda por tortas em um determinado dia em uma determinada loja, no entanto, em nossa simulação, definimos como segue para cada uma das três lojas: 3 ± 0,1, 1 ± 0,1, 2 ± 0,1.
Obviamente, a opção mais lucrativa para nós é enviar três toneladas para a primeira loja, uma para a segunda e duas toneladas de tortas para a terceira, respectivamente.
Para resolver este problema, usamos um ambiente de academia personalizado, bem como Deep Q Learning (implementação Keras).
Ambiente personalizado
Descreveremos o estado do ambiente com três números reais positivos - o restante dos produtos do dia atual em cada uma das três lojas. As ações do agente são números de 0 a 5 inclusive, denotando os índices da permutação dos inteiros 1, 2 e 3. É claro que a ação mais benéfica estará abaixo do 4º índice (3, 1, 2). Consideramos a tarefa como episódica, em um episódio de 30 dias.
import gym
from gym import error, spaces, utils
from gym.utils import seeding
import itertools
import random
import time
class ShopsEnv(gym.Env):
metadata = {'render.modes': ['human']}
# ,
#
def __init__(self):
self.state = [0, 0, 0] #
self.next_state = [0, 0, 0] #
self.done = False #
self.actions = list(itertools.permutations([1, 2, 3])) #
self.reward = 0 #
self.time_tracker = 0 #
self.remembered_states = [] #
#
t = int( time.time() * 1000.0 )
random.seed( ((t & 0xff000000) >> 24) +
((t & 0x00ff0000) >> 8) +
((t & 0x0000ff00) << 8) +
((t & 0x000000ff) << 24) )
# ()
def step(self, action_num):
#
if self.done:
return [self.state, self.reward, self.done, self.next_state]
else:
#
self.state = self.next_state
#
self.remembered_states.append(self.state)
#
self.time_tracker += 1
#
action = self.actions[action_num]
# , ( )
self.next_state = [x + y for x, y in zip(action, self.state)]
#
self.next_state[0] -= (3 + random.uniform(-0.1, 0.1))
self.next_state[1] -= (1 + random.uniform(-0.1, 0.1))
self.next_state[2] -= (2 + random.uniform(-0.1, 0.1))
#
if any([x < 0 for x in self.next_state]):
self.reward = sum([x for x in self.next_state if x < 0])
else:
self.reward = 1
#
#
# ( ),
#
if self.time_tracker >= 3:
remembered_state = self.remembered_states.pop(0)
self.next_state = [max(x - y, 0) for x, y in zip(self.next_state, remembered_state)]
else:
self.next_state = [max(x, 0) for x in self.next_state]
# 30
self.done = self.time_tracker == 30
#
return [self.state, self.reward, self.done, self.next_state]
#
def reset(self):
#
self.state = [0, 0, 0]
self.next_state = [0, 0, 0]
self.done = False
self.reward = 0
self.time_tracker = 0
self.remembered_states = []
t = int( time.time() * 1000.0 )
random.seed( ((t & 0xff000000) >> 24) +
((t & 0x00ff0000) >> 8) +
((t & 0x0000ff00) << 8) +
((t & 0x000000ff) << 24) )
#
return self.state
# :
#
def render(self, mode='human', close=False):
print('-'*20)
print('First shop')
print('Pies:', self.state[0])
print('Second shop')
print('Pies:', self.state[1])
print('Third shop')
print('Pies:', self.state[2])
print('-'*20)
print('')
Importações principais
import numpy as np #
import pandas as pd #
import gym #
import gym_shops #
from tqdm import tqdm #
#
import matplotlib.pyplot as plt
import seaborn as sns
from IPython.display import clear_output
sns.set_color_codes()
#
from collections import deque
from keras.models import Sequential
from keras.layers import Dense
from keras.optimizers import Adam
import random #
Definindo um agente
class DQLAgent():
def __init__(self, env):
#
self.state_size = 3 #
self.action_size = 6 #
# replay()
self.gamma = 0.99
self.learning_rate = 0.01
# adaptiveEGreedy()
self.epsilon = 0.99
self.epsilon_decay = 0.99
self.epsilon_min = 0.0001
self.memory = deque(maxlen = 5000) # 5000 , -
# (NN)
self.model = self.build_model()
# Deep Q Learning
def build_model(self):
model = Sequential()
model.add(Dense(10, input_dim = self.state_size, activation = 'sigmoid')) #
model.add(Dense(50, activation = 'sigmoid')) #
model.add(Dense(10, activation = 'sigmoid')) #
model.add(Dense(self.action_size, activation = 'sigmoid')) #
model.compile(loss = 'mse', optimizer = Adam(lr = self.learning_rate))
return model
#
def remember(self, state, action, reward, next_state, done):
self.memory.append((state, action, reward, next_state, done))
#
def act(self, state):
# 0 1 epsilon
# (exploration)
if random.uniform(0,1) <= self.epsilon:
return random.choice(range(6))
else:
#
act_values = self.model.predict(state)
return np.argmax(act_values[0])
#
def replay(self, batch_size):
# ,
if len(self.memory) < batch_size:
return
minibatch = random.sample(self.memory, batch_size) # batch_size
#
for state, action, reward, next_state, done in minibatch:
if done: # -
target = reward
else:
#
target = reward + self.gamma * np.amax(self.model.predict(next_state)[0])
# target = R(s,a) + gamma * max Q`(s`,a`)
# target (max Q` value) , s`
train_target = self.model.predict(state) # s --> NN --> Q(s,a) = train_target
train_target[0][action] = target
self.model.fit(state, train_target, verbose = 0)
# exploration rate,
# epsilon
def adaptiveEGreedy(self):
if self.epsilon > self.epsilon_min:
self.epsilon *= self.epsilon_decay
Treine o agente
# gym
env = gym.make('shops-v0')
agent = DQLAgent(env)
#
batch_size = 100
episodes = 1000
#
progress_bar = tqdm(range(episodes), position=0, leave=True)
for e in progress_bar:
#
state = env.reset()
state = np.reshape(state, [1, 3])
# , id
time = 0
taken_actions = []
sum_rewards = 0
#
while True:
#
action = agent.act(state)
#
taken_actions.append(action)
#
next_state, reward, done, _ = env.step(action)
next_state = np.reshape(next_state, [1, 3])
#
sum_rewards += reward
#
agent.remember(state, action, reward, next_state, done)
#
state = next_state
# replay
agent.replay(batch_size)
# epsilon
agent.adaptiveEGreedy()
#
time += 1
#
progress_bar.set_postfix_str(s='mean reward: {}, time: {}, epsilon: {}'.format(round(sum_rewards/time, 3), time, round(agent.epsilon, 3)), refresh=True)
#
if done:
#
clear_output(wait=True)
sns.distplot(taken_actions, color="y")
plt.title('Episode: ' + str(e))
plt.xlabel('Action number')
plt.ylabel('Occurrence in %')
plt.show()
break
Testando o agente
import time
trained_model = agent #
state = env.reset() #
state = np.reshape(state, [1,3])
#
time_t = 0
MAX_EPISOD_LENGTH = 1000 #
taken_actions = []
mean_reward = 0
#
progress_bar = tqdm(range(MAX_EPISOD_LENGTH), position=0, leave=True)
for time_t in progress_bar:
#
action = trained_model.act(state)
next_state, reward, done, _ = env.step(action)
next_state = np.reshape(next_state, [1,3])
state = next_state
taken_actions.append(action)
#
clear_output(wait=True)
env.render()
progress_bar.set_postfix_str(s='time: {}'.format(time_t), refresh=True)
print('Reward:', round(env.reward, 3))
time.sleep(0.5)
mean_reward += env.reward
if done:
break
#
sns.distplot(taken_actions, color='y')
plt.title('Test episode - mean reward: ' + str(round(mean_reward/(time_t+1), 3)))
plt.xlabel('Action number')
plt.ylabel('Occurrence in %')
plt.show()
Total
Assim, o agente entendeu rapidamente como agir de forma mais lucrativa.
Em geral, ainda há muito espaço para experimentação: você pode aumentar o número de lojas, diversificar ações ou até mesmo alterar os hiperparâmetros do modelo de treinamento - e isso é só o começo da lista.