Um táxi não tripulado conduz patos de borracha amarelos pela cidade! Módulo de verificação de problemas para a plataforma Gym-Duckietown

Em 2040, a maioria das grandes cidades do mundo vai dirigir carros sem motoristas, dizem os analistas . Mas para relaxar na estrada daqui a 20 anos, precisamos fazer um bom trabalho em algoritmos de direção autônoma. Para isso, o MIT desenvolveu a plataforma Duckietown , que permite fazer isso com custos mínimos. Em Duckietown, robôs móveis de baixo custo transportam patos de borracha amarelos em um modelo reduzido da cidade. Com base nessa plataforma, são realizadas Olimpíadas de Direção AI e lançados cursos em universidades sobre o uso de tecnologias de inteligência artificial na gestão de veículos não tripulados.



Neste artigo, falarei sobre meu projeto de curso, no qual trabalhei em conjunto com o Laboratório de Algoritmos de Robôs Móveis JetBrains Research : Sobre o Issue Checker que escrevi para o emulador Gym-Duckietown . Falaremos sobre o sistema de testes e a integração deste sistema com plataformas educacionais online que utilizam a tecnologia External Grader - por exemplo, com a plataforma Stepik.org .









Sobre o autor



Meu nome é Daniil Plushenko, e sou um aluno do primeiro (segundo) ano do programa de Mestrado " Programação e Análise de Dados " no HSE de São Petersburgo. Em 2019, concluí meu bacharelado em Matemática Aplicada e Ciência da Computação na mesma universidade.



Plataforma Duckietown



Duckietown é um projeto de pesquisa de veículos autônomos . Os organizadores do projeto criaram uma plataforma que ajuda a introduzir uma nova abordagem à aprendizagem no campo da inteligência artificial e da robótica. Tudo começou como um curso no MIT em 2016, mas aos poucos se espalhou pelo mundo e em diferentes níveis de ensino: do ensino médio ao mestrado.



A plataforma Duckietown tem duas partes. Em primeiro lugar, é um modelo reduzido de um ambiente de transporte urbano com estradas, edifícios, sinais de trânsito e obstáculos. Em segundo lugar, é o transporte. Pequenos robôs móveis (Duckiebots) executando um Raspberry Pi recebem informações sobre o mundo ao seu redor por meio de uma câmera e transportam os moradores da cidade - patos de borracha amarelos - pelas estradas.







Tenho trabalhado com o emulador Duckietown. É chamadoGym-Duckietown , e é um projeto de código aberto escrito em Python. O emulador coloca seu bot dentro da cidade, muda sua posição dependendo do algoritmo que você está usando (ou do botão que você pressionou), redesenha a imagem e escreve a posição atual do bot nos logs.  



Se você estiver interessado em tentar, recomendo clonar você mesmo o repositório e executar o manual_control.py : desta forma, o bot pode ser controlado usando as setas do teclado.



Captura de tela do emulador



O emulador pode ser usado como um ambiente para executar tarefas. Vamos definir o seguinte problema: em um determinado mapa, que consiste em uma faixa, você precisa dirigir um metro em linha reta.







Para resolver o problema, você pode usar o seguinte algoritmo:



for _ in range(25):
    env.step([1, 0])
    env.render()


Esta variável envarmazena o estado do ambiente.

O método step recebe como entrada action: uma lista de dois elementos que descrevem a ação do bot. O primeiro elemento define a velocidade, o segundo - o ângulo de rotação. O método renderredesenha a imagem, levando em consideração a nova posição do bot. O número de passos e a velocidade foram escolhidos empiricamente: esses são os valores necessários para que o bot se desloque exatamente um metro em linha reta.



Se desejar usar este snippet em manual_control.py, cole-o aqui . O código até este ponto está ocupado carregando o ambiente. Para simplificar, você pode reutilizá-lo e adicionar a solução acima ao problema.



Sistema de teste



Gostaria de poder verificar tais tarefas automaticamente: realizar a implementação do algoritmo de controle do bot, simular a viagem e relatar se o algoritmo proposto realiza a tarefa corretamente. Tal sistema de testes possibilitaria a utilização do ambiente durante a preparação para competições de gestão autônoma de transportes, bem como para fins educacionais: emitir um conjunto de problemas aos alunos e verificar suas soluções automaticamente. Eu me engajei no seu desenvolvimento enquanto trabalhava em um projeto de curso e a seguir contarei o que consegui.



A sequência de etapas ao verificar a solução é assim:







Você mesmo pode adicionar tarefas ao sistema de teste ou referir-se às que eu fiz. Cada tarefa possui um gerador de condição - uma classe que gera um estado do ambiente. Ele contém o nome do mapa e a posição inicial do bot dentro da célula inicial. As coordenadas do alvo também são definidas: neste caso, é um ponto a um metro da posição inicial.



class Ride1MTaskGenerator(TaskGenerator):
    def __init__(self, args):
        super().__init__(args)

    def generate_task(self):
        env_loader = CVTaskEnv if self.args.is_cv_task else TrackingDuckietownEnv
        env = env_loader(
            map_name="straight_road",
            position_on_initial_road_tile=PositionOnInitialRoadTile(
                x_coefficient=0.5,
                z_coefficient=0.5,
                angle=0,
            ),
        )
        self.generated_task['target_coordinates'] = [[env.road_tile_size * 0.5 + 1, 0, env.road_tile_size * 0.5]]
        self.generated_task['env'] = env
        env.render()
        return self.generated_task


Aqui TrackingDuckietownEnve CVTaskEnvestão as classes de wrapper que são usadas para armazenar informações de viagem para análise posterior. 



class TrackingDuckietownEnv:
    def __init__(self, **kwargs):
        self.__wrapped = DuckietownEnv(**kwargs)
    def step(self, action):
        obs, reward, done, misc = self.__wrapped.step(action)
        message = misc['Simulator']['msg']
        if 'invalid pose' in message.lower():
            raise InvalidPoseException(message)
        for t in self.trackers:
            t.track(self)
        return obs, reward, done, misc


Os rastreadores coletam informações sobre o estado atual, por exemplo, sobre a posição do bot.



CVTaskEnvé usado se uma solução é necessária usando apenas informações da câmera ("visão computacional"), e não as funções do emulador: por exemplo, se você precisa saber a que distância o bot está do centro da faixa ou onde está o objeto visível mais próximo. CVTaskEnvChamar funções do emulador pode simplificar a solução do problema e a classe limita a invocação dos métodos do emulador. É usado quando o sinalizador é exibido is_cv_task



class CVTaskEnv:
    def __init__(self, **kwargs):
        self.__wrapped = TrackingDuckietownEnv(**kwargs)

    def __getattr__(self, item):
        if item in self.__wrapped.overriden_methods:
            return self.__wrapped.__getattribute__(item)
        ALLOWED_FOR_CV_TASKS = [
            'render', '_CVTaskEnv__wrapped', '_TrackingDuckietownEnv__wrapped',
            'road_tile_size', 'trip_statistics'
        ]
        if item in ALLOWED_FOR_CV_TASKS:
            return self.__wrapped.__getattr__(item)
        else:
            raise AttributeError(item + " call is not allowed in CV tasks")


Após a conclusão da execução da decisão - desde que não tenha sido interrompida por um timeout - a informação da viagem é passada por uma sequência de verificadores. Se todos os verificadores funcionaram com sucesso, o problema é considerado resolvido corretamente. Caso contrário, um veredicto explicativo é exibido - por exemplo, um bot caiu, saiu da estrada, etc.



Aqui está um dos verificadores padrão. Ele verifica se o bot voltou ao ponto de partida no final da viagem. Isso pode ser útil se, por exemplo, em uma tarefa você precisar chegar a um certo ponto e depois voltar.



class SameInitialAndFinalCoordinatesChecker(Checker):
    def __init__(self, maximum_deviation=0.1, **kwargs):
        super().__init__(**kwargs)
        self.maximum_deviation = maximum_deviation

    def check(self, generated_task, trackers, **kwargs):
        trip_statistics = next(x for x in trackers if isinstance(x, TripStatistics))
        trip_data = trip_statistics.trip_data
        if len(trip_data) == 0:
            return True
        initial_coordinates = trip_data[0].position.coordinates
        final_coordinates = trip_data[-1].position.coordinates
        return np.linalg.norm(initial_coordinates - final_coordinates) < self.maximum_deviation


Esse sistema de teste pode ser usado no modo manual, ou seja, iniciar manualmente o teste e, em seguida, estudar visualmente o veredicto. Se quiséssemos, por exemplo, lançar um curso online sobre transporte autônomo na Stepik.org , precisaríamos de integração com a plataforma. Isso será discutido na próxima parte do artigo.



Integração com plataformas online



Para tarefas de teste, é frequentemente utilizada a tecnologia External Grader, que foi desenvolvida pela plataforma edX .



Ao usar o Graduador Externo, a plataforma educacional não verifica as tarefas por conta própria, mas gera uma fila de pacotes que são enviados para outro dispositivo. A funcionalidade de conexão de fila é implementada no projeto xqueue-watcher . O Xqueue-watcher busca os pacotes e então eles são testados pelo validador (que geralmente faz mais ações não triviais do que comparações de texto / número). Depois disso, o veredicto de verificação é enviado de volta para o lado da plataforma educacional.



Vamos considerar com mais detalhes o momento de conexão com a fila. Depois que a plataforma educacional fornecer os dados de conexão, eles precisarão ser adicionados aoarquivos de configuração e, no método de classificação , implemente o lançamento de verificação diretamente. Instruções mais detalhadas podem ser encontradas aqui e aqui .



O Xqueue-watcher chama o endpoint get_submission , que recuperará o pacote da fila, se possível. Depois disso, ela vai para o teste. O xqueue-watcher então chama put_result para retornar o veredicto.



Você pode iniciar o xqueue-watcher assim:



make requirements && python -m xqueue_watcher -d conf.d/


Digamos que desejamos usar a tecnologia External Grader, mas não queremos executar o curso em uma plataforma online. O Xqueue-watcher é implementado supondo que haja algum armazenamento de arquivo onde os arquivos com soluções são carregados (as plataformas têm esse armazenamento). Podemos modificar o Xqueue para que o armazenamento de arquivos não seja mais necessário, e tais sistemas podem ser executados, em geral, até mesmo em nosso laptop.



Primeiro, você precisa aprender a manter a fila de pacotes em si. A funcionalidade de fila é fornecida pelo projeto xqueue .





Foto tirada da documentação .



Você pode executá-lo assim:



apt-get install libaio1 libaio-dev
apt-get install libmysqlclient-dev
pip3 install -r requirements.txt
python3 manage.py migrate
python3 manage.py runserver $xqueue_address


Pode ser necessário criar um arquivo ~ / edx / edx.log



Por padrão, xqueue fornece ao xqueue-watcher não o conteúdo dos pacotes, ou seja, arquivos com a solução do problema, mas links para esses arquivos no armazenamento de arquivos. Para não depender do armazenamento de arquivos, você pode fazer os próprios arquivos transferidos e armazená-los na mesma máquina em que o xqueue-watcher está sendo executado. 



Veja como o código-fonte precisa ser modificado para conseguir isso:



A implementação do método _upload em lms_interface.py foi substituída por esta:



def _upload(file_to_upload, path, name):
    '''
    Upload file using the provided keyname.
    Returns:
        URL to access uploaded file
    '''
    full_path = os.path.join(path, name)
    return default_storage.save(full_path, file_to_upload)


Se nenhum armazenamento de arquivo foi conectado, então este método salvará o arquivo com a solução no caminho $ queue_name / $ file_to_upload_hash.



Na implementação de get_sumbission no arquivo ext_interface.py, em vez desta linha, escreva:



xqueue_files = json.loads(submission.urls)
for xqueue_file in xqueue_files.keys():
    with open(xqueue_files[xqueue_file], 'r') as f:
        xqueue_files[xqueue_file] = f.readlines()


Vamos transferir não links (caminhos) para arquivos, mas seu conteúdo.



Cada solução é executada em um contêiner docker "único" com recursos limitados, ou seja, um contêiner separado é criado para a execução de cada solução, que é excluído após o término do teste. Para criar esses contêineres e executar comandos neles, o portainer-api é usado (na verdade, como um wrapper sobre a API Docker).



Resultado



Neste artigo, falei sobre como foram criados o sistema de testes e as tarefas de transporte autônomo, bem como a integração desse sistema com plataformas educacionais online que utilizam a tecnologia External Grader. Espero que em breve seja lançado um curso com este sistema, e a parte sobre integração com plataformas online seja útil para quem deseja criar seu próprio curso offline ou online.



All Articles