Apis são descritos como classes, por exemplo
class Categories(JsonEndpoint):
url = "http://127.0.0.1:8888/categories"
params = {"page": range(100), "language": "en"}
headers = {"User-Agent": get_user_agent}
results_key = "*.slug"
categories = Categories()
class Posts(JsonEndpoint):
url = "http://127.0.0.1:8888/categories/{category}/posts"
params = {"page": range(100), "language": "en"}
url_params = {"category": categories.iter_results()}
results_key = "posts"
async def comments(self, post):
comments = Comments(
self.session,
url_params={"category": post.url.params["category"], "id": post["id"]},
)
return [comment async for comment in comments]
posts = Posts()
Params e url_params podem conter funções (como aqui get_user_agent - retorna um useragent aleatório), intervalo, iteradores, iteradores aguardáveis e assíncronos (para que você possa vinculá-los).
Os cabeçalhos de parâmetros e cookies também podem conter funções e aguardar.
A categoria api no exemplo acima retorna uma matriz de objetos que possuem um slug, o iterador retornará exatamente esses. Ao deslizar este iterador para os url_params de postagens, o iterador irá iterar recursivamente sobre todas as categorias e todas as páginas em cada uma. Ele será abortado quando encontrar um erro 404 ou algum outro erro e passar para a próxima categoria.
E os repositórios têm um exemplo de servidor aiohttp para essas classes para que tudo possa ser testado.
Além de obter parâmetros, você pode passá-los como data ou json e definir outro método.
results_key é pontilhada e tentará extrair as chaves dos resultados. Por exemplo, "comments. *. Text" retornará o texto de cada comentário do array dentro dos comentários.
Os resultados são embalados em um invólucro com propriedades url e params. url é derivado de uma string que também possui parâmetros. Assim, poderá saber quais os parâmetros utilizados para obter este resultado, o que é demonstrado no método dos comentários.
Também existe uma classe Sink básica para lidar com os resultados. Por exemplo, dobrando-os em mq ou um banco de dados. Ele trabalha em tarefas separadas e recebe dados via asyncio.Queue.
class LoggingSink(Sink):
def transform(self, obj):
return repr(obj)
async def init(self):
from loguru import logger
self.logger = logger
async def process(self, obj):
self.logger.info(obj)
return True
sink = LoggingSink(num_tasks=1)
Um exemplo do Sink mais simples. O método transform nos permite fazer alguma manipulação do objeto e retornar None se não for adequado para nós. Essa. em temas você também pode fazer validação.
Sink é um gerenciador de contexto assíncrono, que, ao ser encerrado, em teoria, irá esperar até que todos os objetos na fila sejam processados, para então cancelar suas tarefas.
E finalmente, para amarrar tudo junto, fiz uma classe Worker. Ele aceita um ponto de extremidade e vários coletores. Por exemplo,
worker = Worker(endpoint=posts, sinks=[loggingsink, mongosink])
worker.run()
run executará o asyncio.run_until_complete para o pipeline de trabalho. Ele também possui um método de transformação.
Há também uma classe WorkerGroup que permite criar vários workers de uma vez e fazer um asyncio.gather para eles.
O código contém um exemplo de servidor que gera dados por meio de faker e manipuladores para seus terminais. Acho que isso é o mais óbvio.
Tudo isso está em um estágio inicial de desenvolvimento e, até agora, mudei frequentemente a API. Mas agora parece ter chegado a como deveria ser. Vou apenas mesclar solicitações e comentários ao meu código.