Neste artigo, falarei sobre meu projeto de curso, no qual trabalhei em conjunto com o Laboratório de Algoritmos de Robôs Móveis JetBrains Research : Sobre o Issue Checker que escrevi para o emulador Gym-Duckietown . Falaremos sobre o sistema de testes e a integração deste sistema com plataformas educacionais online que utilizam a tecnologia External Grader - por exemplo, com a plataforma Stepik.org .
Sobre o autor
Meu nome é Daniil Plushenko, e sou um aluno do primeiro (segundo) ano do programa de Mestrado " Programação e Análise de Dados " no HSE de São Petersburgo. Em 2019, concluí meu bacharelado em Matemática Aplicada e Ciência da Computação na mesma universidade.
Plataforma Duckietown
Duckietown é um projeto de pesquisa de veículos autônomos . Os organizadores do projeto criaram uma plataforma que ajuda a introduzir uma nova abordagem à aprendizagem no campo da inteligência artificial e da robótica. Tudo começou como um curso no MIT em 2016, mas aos poucos se espalhou pelo mundo e em diferentes níveis de ensino: do ensino médio ao mestrado.
A plataforma Duckietown tem duas partes. Em primeiro lugar, é um modelo reduzido de um ambiente de transporte urbano com estradas, edifícios, sinais de trânsito e obstáculos. Em segundo lugar, é o transporte. Pequenos robôs móveis (Duckiebots) executando um Raspberry Pi recebem informações sobre o mundo ao seu redor por meio de uma câmera e transportam os moradores da cidade - patos de borracha amarelos - pelas estradas.
Tenho trabalhado com o emulador Duckietown. É chamadoGym-Duckietown , e é um projeto de código aberto escrito em Python. O emulador coloca seu bot dentro da cidade, muda sua posição dependendo do algoritmo que você está usando (ou do botão que você pressionou), redesenha a imagem e escreve a posição atual do bot nos logs.
Se você estiver interessado em tentar, recomendo clonar você mesmo o repositório e executar o manual_control.py : desta forma, o bot pode ser controlado usando as setas do teclado.
Captura de tela do emulador
O emulador pode ser usado como um ambiente para executar tarefas. Vamos definir o seguinte problema: em um determinado mapa, que consiste em uma faixa, você precisa dirigir um metro em linha reta.
Para resolver o problema, você pode usar o seguinte algoritmo:
for _ in range(25):
env.step([1, 0])
env.render()
Esta variável
env
armazena o estado do ambiente.
O método step recebe como entrada
action
: uma lista de dois elementos que descrevem a ação do bot. O primeiro elemento define a velocidade, o segundo - o ângulo de rotação. O método render
redesenha a imagem, levando em consideração a nova posição do bot. O número de passos e a velocidade foram escolhidos empiricamente: esses são os valores necessários para que o bot se desloque exatamente um metro em linha reta.
Se desejar usar este snippet em manual_control.py, cole-o aqui . O código até este ponto está ocupado carregando o ambiente. Para simplificar, você pode reutilizá-lo e adicionar a solução acima ao problema.
Sistema de teste
Gostaria de poder verificar tais tarefas automaticamente: realizar a implementação do algoritmo de controle do bot, simular a viagem e relatar se o algoritmo proposto realiza a tarefa corretamente. Tal sistema de testes possibilitaria a utilização do ambiente durante a preparação para competições de gestão autônoma de transportes, bem como para fins educacionais: emitir um conjunto de problemas aos alunos e verificar suas soluções automaticamente. Eu me engajei no seu desenvolvimento enquanto trabalhava em um projeto de curso e a seguir contarei o que consegui.
A sequência de etapas ao verificar a solução é assim:
Você mesmo pode adicionar tarefas ao sistema de teste ou referir-se às que eu fiz. Cada tarefa possui um gerador de condição - uma classe que gera um estado do ambiente. Ele contém o nome do mapa e a posição inicial do bot dentro da célula inicial. As coordenadas do alvo também são definidas: neste caso, é um ponto a um metro da posição inicial.
class Ride1MTaskGenerator(TaskGenerator):
def __init__(self, args):
super().__init__(args)
def generate_task(self):
env_loader = CVTaskEnv if self.args.is_cv_task else TrackingDuckietownEnv
env = env_loader(
map_name="straight_road",
position_on_initial_road_tile=PositionOnInitialRoadTile(
x_coefficient=0.5,
z_coefficient=0.5,
angle=0,
),
)
self.generated_task['target_coordinates'] = [[env.road_tile_size * 0.5 + 1, 0, env.road_tile_size * 0.5]]
self.generated_task['env'] = env
env.render()
return self.generated_task
Aqui
TrackingDuckietownEnv
e CVTaskEnv
estão as classes de wrapper que são usadas para armazenar informações de viagem para análise posterior.
class TrackingDuckietownEnv:
def __init__(self, **kwargs):
self.__wrapped = DuckietownEnv(**kwargs)
…
…
def step(self, action):
obs, reward, done, misc = self.__wrapped.step(action)
message = misc['Simulator']['msg']
if 'invalid pose' in message.lower():
raise InvalidPoseException(message)
for t in self.trackers:
t.track(self)
return obs, reward, done, misc
Os rastreadores coletam informações sobre o estado atual, por exemplo, sobre a posição do bot.
CVTaskEnv
é usado se uma solução é necessária usando apenas informações da câmera ("visão computacional"), e não as funções do emulador: por exemplo, se você precisa saber a que distância o bot está do centro da faixa ou onde está o objeto visível mais próximo. CVTaskEnv
Chamar funções do emulador pode simplificar a solução do problema e a classe limita a invocação dos métodos do emulador. É usado quando o sinalizador é exibido is_cv_task
.
class CVTaskEnv:
def __init__(self, **kwargs):
self.__wrapped = TrackingDuckietownEnv(**kwargs)
def __getattr__(self, item):
if item in self.__wrapped.overriden_methods:
return self.__wrapped.__getattribute__(item)
ALLOWED_FOR_CV_TASKS = [
'render', '_CVTaskEnv__wrapped', '_TrackingDuckietownEnv__wrapped',
'road_tile_size', 'trip_statistics'
]
if item in ALLOWED_FOR_CV_TASKS:
return self.__wrapped.__getattr__(item)
else:
raise AttributeError(item + " call is not allowed in CV tasks")
Após a conclusão da execução da decisão - desde que não tenha sido interrompida por um timeout - a informação da viagem é passada por uma sequência de verificadores. Se todos os verificadores funcionaram com sucesso, o problema é considerado resolvido corretamente. Caso contrário, um veredicto explicativo é exibido - por exemplo, um bot caiu, saiu da estrada, etc.
Aqui está um dos verificadores padrão. Ele verifica se o bot voltou ao ponto de partida no final da viagem. Isso pode ser útil se, por exemplo, em uma tarefa você precisar chegar a um certo ponto e depois voltar.
class SameInitialAndFinalCoordinatesChecker(Checker):
def __init__(self, maximum_deviation=0.1, **kwargs):
super().__init__(**kwargs)
self.maximum_deviation = maximum_deviation
def check(self, generated_task, trackers, **kwargs):
trip_statistics = next(x for x in trackers if isinstance(x, TripStatistics))
trip_data = trip_statistics.trip_data
if len(trip_data) == 0:
return True
initial_coordinates = trip_data[0].position.coordinates
final_coordinates = trip_data[-1].position.coordinates
return np.linalg.norm(initial_coordinates - final_coordinates) < self.maximum_deviation
Esse sistema de teste pode ser usado no modo manual, ou seja, iniciar manualmente o teste e, em seguida, estudar visualmente o veredicto. Se quiséssemos, por exemplo, lançar um curso online sobre transporte autônomo na Stepik.org , precisaríamos de integração com a plataforma. Isso será discutido na próxima parte do artigo.
Integração com plataformas online
Para tarefas de teste, é frequentemente utilizada a tecnologia External Grader, que foi desenvolvida pela plataforma edX .
Ao usar o Graduador Externo, a plataforma educacional não verifica as tarefas por conta própria, mas gera uma fila de pacotes que são enviados para outro dispositivo. A funcionalidade de conexão de fila é implementada no projeto xqueue-watcher . O Xqueue-watcher busca os pacotes e então eles são testados pelo validador (que geralmente faz mais ações não triviais do que comparações de texto / número). Depois disso, o veredicto de verificação é enviado de volta para o lado da plataforma educacional.
Vamos considerar com mais detalhes o momento de conexão com a fila. Depois que a plataforma educacional fornecer os dados de conexão, eles precisarão ser adicionados aoarquivos de configuração e, no método de classificação , implemente o lançamento de verificação diretamente. Instruções mais detalhadas podem ser encontradas aqui e aqui .
O Xqueue-watcher chama o endpoint get_submission , que recuperará o pacote da fila, se possível. Depois disso, ela vai para o teste. O xqueue-watcher então chama put_result para retornar o veredicto.
Você pode iniciar o xqueue-watcher assim:
make requirements && python -m xqueue_watcher -d conf.d/
Digamos que desejamos usar a tecnologia External Grader, mas não queremos executar o curso em uma plataforma online. O Xqueue-watcher é implementado supondo que haja algum armazenamento de arquivo onde os arquivos com soluções são carregados (as plataformas têm esse armazenamento). Podemos modificar o Xqueue para que o armazenamento de arquivos não seja mais necessário, e tais sistemas podem ser executados, em geral, até mesmo em nosso laptop.
Primeiro, você precisa aprender a manter a fila de pacotes em si. A funcionalidade de fila é fornecida pelo projeto xqueue .
Foto tirada da documentação .
Você pode executá-lo assim:
apt-get install libaio1 libaio-dev
apt-get install libmysqlclient-dev
pip3 install -r requirements.txt
python3 manage.py migrate
python3 manage.py runserver $xqueue_address
Pode ser necessário criar um arquivo ~ / edx / edx.log
Por padrão, xqueue fornece ao xqueue-watcher não o conteúdo dos pacotes, ou seja, arquivos com a solução do problema, mas links para esses arquivos no armazenamento de arquivos. Para não depender do armazenamento de arquivos, você pode fazer os próprios arquivos transferidos e armazená-los na mesma máquina em que o xqueue-watcher está sendo executado.
Veja como o código-fonte precisa ser modificado para conseguir isso:
A implementação do método _upload em lms_interface.py foi substituída por esta:
def _upload(file_to_upload, path, name):
'''
Upload file using the provided keyname.
Returns:
URL to access uploaded file
'''
full_path = os.path.join(path, name)
return default_storage.save(full_path, file_to_upload)
Se nenhum armazenamento de arquivo foi conectado, então este método salvará o arquivo com a solução no caminho $ queue_name / $ file_to_upload_hash.
Na implementação de get_sumbission no arquivo ext_interface.py, em vez desta linha, escreva:
xqueue_files = json.loads(submission.urls)
for xqueue_file in xqueue_files.keys():
with open(xqueue_files[xqueue_file], 'r') as f:
xqueue_files[xqueue_file] = f.readlines()
Vamos transferir não links (caminhos) para arquivos, mas seu conteúdo.
Cada solução é executada em um contêiner docker "único" com recursos limitados, ou seja, um contêiner separado é criado para a execução de cada solução, que é excluído após o término do teste. Para criar esses contêineres e executar comandos neles, o portainer-api é usado (na verdade, como um wrapper sobre a API Docker).
Resultado
Neste artigo, falei sobre como foram criados o sistema de testes e as tarefas de transporte autônomo, bem como a integração desse sistema com plataformas educacionais online que utilizam a tecnologia External Grader. Espero que em breve seja lançado um curso com este sistema, e a parte sobre integração com plataformas online seja útil para quem deseja criar seu próprio curso offline ou online.