aio api crawler

Olá. Comecei a trabalhar em uma biblioteca para extrair dados de diferentes APIs json. Ele também pode ser usado para testar a API.



Apis são descritos como classes, por exemplo



class Categories(JsonEndpoint):
    url = "http://127.0.0.1:8888/categories"
    params = {"page": range(100), "language": "en"}
    headers = {"User-Agent": get_user_agent}
    results_key = "*.slug"

categories = Categories()


class Posts(JsonEndpoint):
    url = "http://127.0.0.1:8888/categories/{category}/posts"
    params = {"page": range(100), "language": "en"}
    url_params = {"category": categories.iter_results()}
    results_key = "posts"

    async def comments(self, post):
        comments = Comments(
            self.session,
            url_params={"category": post.url.params["category"], "id": post["id"]},
        )
        return [comment async for comment in comments]

posts = Posts()


Params e url_params podem conter funções (como aqui get_user_agent - retorna um useragent aleatório), intervalo, iteradores, iteradores aguardáveis ​​e assíncronos (para que você possa vinculá-los).



Os cabeçalhos de parâmetros e cookies também podem conter funções e aguardar.



A categoria api no exemplo acima retorna uma matriz de objetos que possuem um slug, o iterador retornará exatamente esses. Ao deslizar este iterador para os url_params de postagens, o iterador irá iterar recursivamente sobre todas as categorias e todas as páginas em cada uma. Ele será abortado quando encontrar um erro 404 ou algum outro erro e passar para a próxima categoria.



E os repositórios têm um exemplo de servidor aiohttp para essas classes para que tudo possa ser testado.



Além de obter parâmetros, você pode passá-los como data ou json e definir outro método.



results_key é pontilhada e tentará extrair as chaves dos resultados. Por exemplo, "comments. *. Text" retornará o texto de cada comentário do array dentro dos comentários.



Os resultados são embalados em um invólucro com propriedades url e params. url é derivado de uma string que também possui parâmetros. Assim, poderá saber quais os parâmetros utilizados para obter este resultado, o que é demonstrado no método dos comentários.



Também existe uma classe Sink básica para lidar com os resultados. Por exemplo, dobrando-os em mq ou um banco de dados. Ele trabalha em tarefas separadas e recebe dados via asyncio.Queue.



class LoggingSink(Sink):
    def transform(self, obj):
        return repr(obj)

    async def init(self):
        from loguru import logger

        self.logger = logger

    async def process(self, obj):
        self.logger.info(obj)
        return True

sink = LoggingSink(num_tasks=1)


Um exemplo do Sink mais simples. O método transform nos permite fazer alguma manipulação do objeto e retornar None se não for adequado para nós. Essa. em temas você também pode fazer validação.



Sink é um gerenciador de contexto assíncrono, que, ao ser encerrado, em teoria, irá esperar até que todos os objetos na fila sejam processados, para então cancelar suas tarefas.



E finalmente, para amarrar tudo junto, fiz uma classe Worker. Ele aceita um ponto de extremidade e vários coletores. Por exemplo,



worker = Worker(endpoint=posts, sinks=[loggingsink, mongosink])
worker.run()


run executará o asyncio.run_until_complete para o pipeline de trabalho. Ele também possui um método de transformação.



Há também uma classe WorkerGroup que permite criar vários workers de uma vez e fazer um asyncio.gather para eles.



O código contém um exemplo de servidor que gera dados por meio de faker e manipuladores para seus terminais. Acho que isso é o mais óbvio.



Tudo isso está em um estágio inicial de desenvolvimento e, até agora, mudei frequentemente a API. Mas agora parece ter chegado a como deveria ser. Vou apenas mesclar solicitações e comentários ao meu código.



All Articles