Visão geral da nova IU para streaming estruturado no Apache Spark ™ 3.0

A tradução do artigo foi elaborada na véspera do início do curso de Engenharia de Dados .










O Structured Streaming foi introduzido pela primeira vez no Apache Spark 2.0. Essa plataforma se estabeleceu como a melhor escolha para a construção de aplicativos de streaming distribuídos. A unificação da API SQL / Dataset / DataFrame e das funções integradas do Spark torna muito mais fácil para os desenvolvedores implementar seus fundamentos complexos, como agregação de streaming, junção de stream-stream e suporte a janelas. Desde o lançamento do Structured Streaming, tem havido uma solicitação popular de desenvolvedores para melhorar o controle de streaming, assim como fizemos no Spark Streaming (como DStream). No Apache Spark 3.0, lançamos uma nova IU para streaming estruturado.



A nova UI Structured Streaming fornece uma maneira fácil de monitorar todos os jobs de streaming com estatísticas e insights acionáveis, tornando mais fácil solucionar problemas durante a depuração e melhorando a visibilidade da produção com métricas em tempo real. A IU apresenta dois conjuntos de estatísticas: 1) informações agregadas sobre um trabalho de consulta de streaming e 2) informações estatísticas detalhadas sobre solicitações de streaming, incluindo taxa de entrada, taxa de processo, linhas de entrada, duração do lote, duração da operação, etc.



Informações agregadas sobre streaming de jobs de consulta



Quando um desenvolvedor envia uma consulta de streaming SQL, ela aparece na guia Structured Streaming, que inclui consultas de streaming ativas e concluídas. A tabela de resultados fornecerá algumas informações básicas sobre solicitações de streaming, incluindo nome da solicitação, status, ID, runID, tempo de envio, duração da solicitação, ID do último pacote, bem como informações agregadas, como taxa média de recebimento e taxa média de processamento. Existem três tipos de status de solicitação de streaming: RUNNING, FINISHED e FAILED. Todos os pedidos FINISHED e FAILED são listados na tabela de pedidos de streaming concluída. A coluna Erro exibe os detalhes da exceção de solicitação com falha.







Podemos visualizar estatísticas detalhadas da solicitação de streaming clicando no link Run ID.



Informação estatística detalhada



A página Estatísticas exibe métricas, incluindo taxa de ingestão / processamento, latência e duração detalhada da operação, que são úteis para entender o estado de suas solicitações de streaming, facilitando a depuração de anomalias no processamento de solicitações.









Ele contém as seguintes métricas:



  • Taxa de entrada : taxa agregada (em todas as fontes) de chegada de dados.
  • Taxa de processo : a taxa agregada (em todas as fontes) na qual o Spark processa os dados.
  • Duração do lote : a duração de cada lote.
  • Duração da operação : o tempo necessário para realizar várias operações em milissegundos.


As transações monitoradas estão listadas abaixo:



  • addBatch: tempo gasto na leitura dos dados de entrada do microlote das fontes, processando-os e gravando os dados de saída do lote a sincronizar. Isso geralmente leva a maior parte do tempo do microlote.
  • getBatch: tempo necessário para preparar uma solicitação lógica para ler os dados de entrada do micropacote atual das fontes.
  • getOffset: o tempo que leva para consultar as fontes se elas tiverem novas entradas.
  • walCommit: Grava offsets em logs de metadados.
  • queryPlanning: Crie um plano de execução.


Deve-se observar que nem todas as operações listadas serão exibidas na IU. Existem diferentes operações com diferentes tipos de fontes de dados, portanto, algumas das operações listadas podem ser realizadas em uma solicitação de streaming.



Solução de problemas de desempenho de streaming usando a IU



Nesta seção, veremos alguns casos em que o novo fluxo estruturado de IU indica que algo fora do comum está acontecendo. Uma solicitação de demonstração de alto nível é semelhante a esta e, em cada caso, assumiremos algumas pré-condições:



import java.util.UUID

val bootstrapServers = ...
val topics = ...
val checkpointLocation = "/tmp/temporary-" + UUID.randomUUID.toString

val lines = spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", bootstrapServers)
    .option("subscribe", topics)
    .load()
    .selectExpr("CAST(value AS STRING)")
    .as[String]

val wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count()

val query = wordCounts.writeStream
    .outputMode("complete")
    .format("console")
    .option("checkpointLocation", checkpointLocation)
    .start()


Latência aumentada devido à capacidade de processamento insuficiente



No primeiro caso, executamos uma solicitação para processar os dados do Apache Kafka o mais rápido possível. Para cada lote, um trabalho de streaming processa todos os dados disponíveis no Kafka. Se a potência de processamento for insuficiente para lidar com os dados de burst, a latência aumentará rapidamente. O julgamento mais intuitivo é que as linhas de entrada e a duração do lote crescerão linearmente. O parâmetro Input Rows especifica que o trabalho de streaming pode processar no máximo 8.000 gravações por segundo. Mas a taxa de entrada atual é de cerca de 20.000 registros por segundo. Podemos fornecer ao trabalho de threading mais recursos para ser executado ou podemos adicionar partições suficientes para lidar com todos os consumidores necessários para acompanhar os produtores.







Estável, mas com alta latência



Como este caso é diferente do anterior? A latência não aumenta, mas permanece estável, conforme mostrado na imagem a seguir:







Descobrimos que a taxa de processo pode permanecer estável na mesma taxa de entrada. Isso significa que o poder de processamento do trabalho é suficiente para processar os dados de entrada. No entanto, o tempo de processamento para cada lote, ou seja, o atraso, ainda é de 20 segundos. O principal motivo da alta latência é o excesso de dados em cada lote. Geralmente, podemos reduzir a latência aumentando o paralelismo desse trabalho. Depois de adicionar mais 10 partições Kafka e 10 núcleos para tarefas do Spark, descobrimos que a latência era de cerca de 5 segundos - muito melhor do que 20 segundos.







Use o gráfico de duração da operação para solução de problemas



O gráfico Duração da operação exibe a quantidade de tempo gasto na execução de várias operações em milissegundos. Isso é útil para entender o tempo de cada lote e facilitar a solução de problemas. Vamos usar o trabalho de melhoria de desempenho " SPARK-30915 : Evite ler o arquivo de log de metadados ao procurar a ID do lote mais recente" na comunidade Apache Spark como exemplo.

Antes dessa melhoria, cada lote subsequente após a compactação demorava mais tempo do que outros lotes, quando o log de metadados compactado se tornava enorme.







Depois de examinar o código, a leitura desnecessária do arquivo de log compactado foi encontrada e corrigida. O seguinte diagrama de Duração da Operação confirma o efeito esperado:







Planos para o futuro



Conforme mostrado acima, o novo UI Structured Streaming ajudará os desenvolvedores a controlar melhor seus jobs de streaming, tendo muito mais informações úteis sobre solicitações de streaming. Como uma versão inicial, a nova IU ainda está em desenvolvimento e será aprimorada em versões futuras. Existem vários recursos que podem ser implementados em um futuro não muito distante, incluindo, mas não se limitando ao seguinte:



  • Saiba mais sobre a execução de consulta de streaming: dados atrasados, marcas d'água, métricas de estado de dados e muito mais.
  • Suporte à UI de streaming estruturado no Spark History Server.
  • Pistas mais perceptíveis para comportamento incomum: latência, etc.


Experimente uma nova IU



Experimente esta nova interface de usuário do Spark Streaming no Apache Spark 3.0 no novo Databricks Runtime 7.1. Se você estiver usando blocos de notas do Databricks, isso também fornecerá uma maneira fácil de observar o status de qualquer solicitação de streaming no bloco de notas e gerenciar suas solicitações . Você pode se inscrever para uma conta gratuita no Databricks e começar em minutos gratuitamente, sem nenhuma informação de crédito.






A qualidade dos dados em DWH é a consistência do Data Warehouse. webinar grátis.






Leitura recomendada:



Ferramenta de criação de dados, ou o que o Data Warehouse e o Smoothie têm

em comum Delta Lake Dive: Aplicação e evolução do esquema

Apache Parquet de alta velocidade em Python com Apache Arrow



All Articles