Ensinando IA para distribuir tortas para lojas usando o aprendizado por reforço

Introdução



De alguma forma, ao ler o livro "Aprendizagem por Reforço: Uma Introdução", pensei em complementar meus conhecimentos teóricos com os práticos, mas não havia vontade de resolver o próximo problema de equilibrar uma barra, ensinar um agente a jogar xadrez ou inventar outra bicicleta.



Ao mesmo tempo, o livro continha um exemplo interessante de otimização da fila do cliente, que, por um lado, não é muito complicado em termos de implementação / compreensão do processo e, por outro lado, é bastante interessante e pode ser implementado com algum sucesso na vida real.



Tendo mudado ligeiramente este exemplo, tive a ideia, que será discutida mais tarde.



Formulação do problema



Então, imagine a seguinte foto:







Temos à nossa disposição uma padaria que produz 6 (condicionalmente) toneladas de tortas de framboesa todos os dias e distribui esses produtos em três lojas todos os dias.



No entanto, a melhor forma de fazer isso para que haja o mínimo de produtos vencidos possível (desde que a vida útil das tortas seja de três dias), se tivermos apenas três caminhões com capacidade de 1, 2 e 3 toneladas, respectivamente, em cada ponto de venda é mais lucrativo enviar apenas um caminhão (porque estão bem distantes) e, além disso, apenas uma vez por dia após assar as tortas e, além disso, não sabemos o poder de compra das nossas lojas (pois o negócio está apenas começando)?



Vamos concordar que a estratégia de layout FIFO funciona perfeitamente nas lojas, nas quais os clientes levam apenas as mercadorias que foram produzidas mais tarde que as outras, mas se a torta de framboesa não foi comprada em três dias, o pessoal da loja se livra dela.



Nós (condicionalmente) não sabemos qual será a demanda por tortas em um determinado dia em uma determinada loja, no entanto, em nossa simulação, definimos como segue para cada uma das três lojas: 3 ± 0,1, 1 ± 0,1, 2 ± 0,1.



Obviamente, a opção mais lucrativa para nós é enviar três toneladas para a primeira loja, uma para a segunda e duas toneladas de tortas para a terceira, respectivamente.



Para resolver este problema, usamos um ambiente de academia personalizado, bem como Deep Q Learning (implementação Keras).



Ambiente personalizado



Descreveremos o estado do ambiente com três números reais positivos - o restante dos produtos do dia atual em cada uma das três lojas. As ações do agente são números de 0 a 5 inclusive, denotando os índices da permutação dos inteiros 1, 2 e 3. É claro que a ação mais benéfica estará abaixo do 4º índice (3, 1, 2). Consideramos a tarefa como episódica, em um episódio de 30 dias.



import gym
from gym import error, spaces, utils
from gym.utils import seeding

import itertools
import random
import time

class ShopsEnv(gym.Env):
  metadata = {'render.modes': ['human']}
  
  #  ,   
  #  
  def __init__(self):
    self.state = [0, 0, 0]  #  
    self.next_state = [0, 0, 0]  #  
    self.done = False  #   
    self.actions = list(itertools.permutations([1, 2, 3]))  #    
    self.reward = 0  #    
    self.time_tracker = 0  #   
    
    self.remembered_states = []  #     
    
    #   
    t = int( time.time() * 1000.0 )
    random.seed( ((t & 0xff000000) >> 24) +
                 ((t & 0x00ff0000) >>  8) +
                 ((t & 0x0000ff00) <<  8) +
                 ((t & 0x000000ff) << 24)   )
  
  #       ()  
  def step(self, action_num):
    #      
    if self.done:
        return [self.state, self.reward, self.done, self.next_state]
    else:
        #    
        self.state = self.next_state
        
        #  
        self.remembered_states.append(self.state) 
    
        #  
        self.time_tracker += 1
        
        #       
        action = self.actions[action_num]
        
        #  ,    ( )
        self.next_state = [x + y for x, y in zip(action, self.state)]
        
        #    
        self.next_state[0] -= (3 + random.uniform(-0.1, 0.1))
        self.next_state[1] -= (1 + random.uniform(-0.1, 0.1))
        self.next_state[2] -= (2 + random.uniform(-0.1, 0.1))
        
        #    
        if any([x < 0 for x in self.next_state]):
            self.reward = sum([x for x in self.next_state if x < 0])
        else:
            self.reward = 1
            
        #       
        #     
        #       (    ),
        #      
        if self.time_tracker >= 3:
            remembered_state = self.remembered_states.pop(0)
            self.next_state = [max(x - y, 0) for x, y in zip(self.next_state, remembered_state)]
        else:
            self.next_state = [max(x, 0) for x in self.next_state]
        
        
        #     30 
        self.done = self.time_tracker == 30
        
        #      
        return [self.state, self.reward, self.done, self.next_state]
  
  #   
  def reset(self):
    #      
    self.state = [0, 0, 0]
    self.next_state = [0, 0, 0]
    self.done = False
    self.reward = 0
    self.time_tracker = 0
    
    self.remembered_states = []
    
    t = int( time.time() * 1000.0 )
    random.seed( ((t & 0xff000000) >> 24) +
                 ((t & 0x00ff0000) >>  8) +
                 ((t & 0x0000ff00) <<  8) +
                 ((t & 0x000000ff) << 24)   )
    
    #   
    return self.state
  
  #     :
  #      
  def render(self, mode='human', close=False):
    print('-'*20)
    print('First shop')
    print('Pies:', self.state[0])

    print('Second shop')
    print('Pies:', self.state[1])

    print('Third shop')
    print('Pies:', self.state[2])
    print('-'*20)
    print('')


Importações principais



import numpy as np #  
import pandas as pd #  
import gym #  
import gym_shops #    
from tqdm import tqdm #   

#  
import matplotlib.pyplot as plt
import seaborn as sns
from IPython.display import clear_output
sns.set_color_codes()

#  
from collections import deque
from keras.models import Sequential
from keras.layers import Dense
from keras.optimizers import Adam
import random #   


Definindo um agente



class DQLAgent(): 
    
    def __init__(self, env):
        #    
       
        self.state_size = 3 #    
        self.action_size = 6 #    
        
        #    replay()
        self.gamma = 0.99
        self.learning_rate = 0.01
        
        #    adaptiveEGreedy()
        self.epsilon = 0.99
        self.epsilon_decay = 0.99
        self.epsilon_min = 0.0001
        
        self.memory = deque(maxlen = 5000) #   5000  ,    -   
        
        #   (NN)
        self.model = self.build_model()
    
    #      Deep Q Learning
    def build_model(self):
        model = Sequential()
        model.add(Dense(10, input_dim = self.state_size, activation = 'sigmoid')) #   
        model.add(Dense(50, activation = 'sigmoid')) #  
        model.add(Dense(10, activation = 'sigmoid')) #  
        model.add(Dense(self.action_size, activation = 'sigmoid')) #  
        model.compile(loss = 'mse', optimizer = Adam(lr = self.learning_rate))
        return model
    
    #    
    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))
    
    #   
    def act(self, state):
        #     0  1  epsilon
        #     (exploration)
        if random.uniform(0,1) <= self.epsilon:
            return random.choice(range(6))
        else:
            #          
            act_values = self.model.predict(state)
            return np.argmax(act_values[0])
            
    
    #     
    def replay(self, batch_size):
        
        #   ,        
        if len(self.memory) < batch_size:
            return
        
        minibatch = random.sample(self.memory, batch_size) #  batch_size    
        #     
        for state, action, reward, next_state, done in minibatch:
            if done: #    -      
                target = reward
            else:
                #       
                target = reward + self.gamma * np.amax(self.model.predict(next_state)[0]) 
                # target = R(s,a) + gamma * max Q`(s`,a`)
                # target (max Q` value)     ,   s`  
            train_target = self.model.predict(state) # s --> NN --> Q(s,a) = train_target
            train_target[0][action] = target
            self.model.fit(state, train_target, verbose = 0)
    
    #    exploration rate,
    #   epsilon
    def adaptiveEGreedy(self):
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay




Treine o agente



#  gym   
env = gym.make('shops-v0')
agent = DQLAgent(env)

#   
batch_size = 100
episodes = 1000

#  
progress_bar = tqdm(range(episodes), position=0, leave=True)
for e in progress_bar:
    #  
    state = env.reset()
    state = np.reshape(state, [1, 3])

    #    , id       
    time = 0
    taken_actions = []
    sum_rewards = 0


    #   
    while True:
        #  
        action = agent.act(state)

        #  
        taken_actions.append(action)

        #     
        next_state, reward, done, _ = env.step(action)
        next_state = np.reshape(next_state, [1, 3])

        #     
        sum_rewards += reward

        #   
        agent.remember(state, action, reward, next_state, done)

        #    
        state = next_state

        #  replay
        agent.replay(batch_size)

        #  epsilon
        agent.adaptiveEGreedy()

        #   
        time += 1

        #   
        progress_bar.set_postfix_str(s='mean reward: {}, time: {}, epsilon: {}'.format(round(sum_rewards/time, 3), time, round(agent.epsilon, 3)), refresh=True)

        #     
        if done:
            #       
            clear_output(wait=True)
            sns.distplot(taken_actions, color="y")
            plt.title('Episode: ' + str(e))
            plt.xlabel('Action number')
            plt.ylabel('Occurrence in %')
            plt.show()
            break






Testando o agente



import time
trained_model = agent  #     
state = env.reset()  #  
state = np.reshape(state, [1,3])

#        
time_t = 0
MAX_EPISOD_LENGTH = 1000  #   
taken_actions = []
mean_reward = 0

#   
progress_bar = tqdm(range(MAX_EPISOD_LENGTH), position=0, leave=True)
for time_t in progress_bar:
    #     
    action = trained_model.act(state)
    next_state, reward, done, _ = env.step(action)
    next_state = np.reshape(next_state, [1,3])
    state = next_state
    taken_actions.append(action)

    #   
    clear_output(wait=True)
    env.render()
    progress_bar.set_postfix_str(s='time: {}'.format(time_t), refresh=True)
    print('Reward:', round(env.reward, 3))
    time.sleep(0.5)

    mean_reward += env.reward

    if done:
        break

#    
sns.distplot(taken_actions, color='y')
plt.title('Test episode - mean reward: ' + str(round(mean_reward/(time_t+1), 3)))
plt.xlabel('Action number')
plt.ylabel('Occurrence in %')
plt.show()    






Total



Assim, o agente entendeu rapidamente como agir de forma mais lucrativa.



Em geral, ainda há muito espaço para experimentação: você pode aumentar o número de lojas, diversificar ações ou até mesmo alterar os hiperparâmetros do modelo de treinamento - e isso é só o começo da lista.



Fontes






All Articles