Spark schemaEvolution na prática

Caros leitores, bom dia!



Neste artigo, o consultor líder da linha de negócios Big Data Solutions da Neoflex descreve em detalhes as opções para a construção de vitrines de estrutura variável usando Apache Spark.



Como parte de um projeto de análise de dados, muitas vezes surge a tarefa de construir marts com base em dados estruturados vagamente.



Normalmente, esses são logs ou respostas de vários sistemas, salvos como JSON ou XML. Os dados são carregados no Hadoop, então você precisa construir um showcase a partir deles. Podemos organizar o acesso à loja criada, por exemplo, através do Impala.



Nesse caso, o layout da vitrine de destino é anteriormente desconhecido. Além disso, o diagrama ainda não pode ser traçado com antecedência, pois depende dos dados, e estamos lidando com esses dados muito fracamente estruturados.



Por exemplo, hoje a seguinte resposta é registrada:



{source: "app1", error_code: ""}


e amanhã a seguinte resposta vem do mesmo sistema:



{source: "app1", error_code: "error", description: "Network error"}


Como resultado, mais um campo deve ser adicionado à vitrine - descrição, e ninguém sabe se virá ou não.



A tarefa de criar um mart sobre esses dados é razoavelmente padrão, e o Spark possui várias ferramentas para isso. JSON e XML são suportados para análise de dados brutos, e o suporte schemaEvolution é fornecido para um esquema anteriormente desconhecido.



À primeira vista, a solução parece simples. Precisamos pegar uma pasta com JSON e lê-la em um dataframe. O Spark criará um esquema e transformará os dados aninhados em estruturas. Em seguida, tudo precisa ser salvo em parquet, que também é suportado no Impala, registrando o mostruário no metastore Hive.



Tudo parece simples.



No entanto, não fica claro, a partir dos exemplos curtos na documentação, o que fazer com uma série de problemas na prática.



A documentação descreve uma abordagem não para a criação de uma vitrine, mas para a leitura de JSON ou XML em um dataframe.



Ou seja, é simplesmente fornecido como ler e analisar JSON:



df = spark.read.json(path...)


Isso é o suficiente para disponibilizar os dados para o Spark.



Na prática, o cenário é muito mais complicado do que apenas ler arquivos JSON de uma pasta e criar um dataframe. A situação fica assim: já existe uma determinada vitrine, novos dados chegam todos os dias, eles precisam ser adicionados à vitrine, não esquecendo que o esquema pode ser diferente.



O esquema usual para construir uma vitrine é o seguinte:



Etapa 1. Os dados são carregados no Hadoop, seguidos de recarga diária e adicionados a uma nova partição. Acontece que a pasta com os dados iniciais particionada por dias.



Passo 2.Durante o carregamento de inicialização, essa pasta é lida e analisada pelo Spark. O dataframe resultante é salvo em um formato disponível para análise, por exemplo, em parquet, que pode então ser importado para o Impala. Isso cria uma vitrine de destino com todos os dados acumulados até este ponto.



Etapa 3. Um download é criado para atualizar a vitrine todos os dias.

Surge a questão do carregamento incremental, a necessidade de particionar o mostruário e a questão de apoiar o esquema geral do mostruário.



Vamos dar um exemplo. Digamos que a primeira etapa da construção do armazenamento seja implementada e a exportação de arquivos JSON para uma pasta seja configurada.



Não é um problema criar um dataframe a partir deles e salvá-lo como um mostruário. Esta é a primeira etapa que você pode encontrar facilmente na documentação do Spark:



df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)


Tudo parece estar bem.



Lemos e analisamos o JSON, em seguida, salvamos o dataframe como um parquete, registrando-o com o Hive de qualquer maneira conveniente:



df.write.format(“parquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')


Temos uma vitrine.



Mas, no dia seguinte, novos dados da fonte foram adicionados. Temos uma pasta com JSON, e um showcase criado com base nesta pasta. Depois de carregar o próximo bloco de dados da origem, o data mart fica sem dados por um dia.



Uma solução lógica seria particionar a vitrine por dia, o que permitirá adicionar uma nova partição a cada dia seguinte. O mecanismo para isso também é bem conhecido, o Spark permite que você grave partições separadamente.



Primeiro, fazemos o carregamento de inicialização, salvando os dados conforme descrito acima, adicionando apenas o particionamento. Essa ação é chamada de inicialização da vitrine e é feita apenas uma vez:



df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)


No dia seguinte, carregamos apenas uma nova partição:



df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")


Tudo o que resta é registrar-se novamente no Hive para atualizar o esquema.

No entanto, é aqui que surgem os problemas.



Primeiro problema. Mais cedo ou mais tarde, o parquete resultante não poderá ser lido. Isso tem a ver com a forma como o parquet e o JSON abordam os campos vazios de maneira diferente.



Vamos considerar uma situação típica. Por exemplo, JSON chega ontem:



 1: {"a": {"b": 1}},


e hoje o mesmo JSON se parece com isto:



 2: {"a": null}


Digamos que temos duas partições diferentes com uma linha cada.

Quando lermos todos os dados de origem, o Spark será capaz de determinar o tipo e entender que "a" é um campo do tipo "estrutura", com um campo aninhado "b" do tipo INT. Mas, se cada partição foi salva separadamente, então um parquete com esquemas de partição incompatíveis é obtido:



df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)


Esta situação é bem conhecida, portanto, uma opção foi especialmente adicionada para remover campos vazios ao analisar os dados iniciais:



df = spark.read.json("...", dropFieldIfAllNull=True)


Neste caso, o parquet será constituído por divisórias que podem ser lidas em conjunto.

Embora aqueles que fizeram isso na prática riam amargamente. Por quê? Porque mais duas situações podem surgir. Ou três. Ou quatro. O primeiro, que quase certamente aparecerá, é que os tipos numéricos parecerão diferentes em diferentes arquivos JSON. Por exemplo, {intField: 1} e {intField: 1.1}. Se esses campos forem encontrados em uma parte, a mesclagem do esquema lerá tudo corretamente, levando ao tipo mais preciso. Mas se for diferente, então um terá intField: int e o outro intField: double.



Existe o seguinte sinalizador para lidar com esta situação:



df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)


Agora temos uma pasta onde as partições estão localizadas, que podem ser lidas em um único dataframe e um parquete válido para toda a vitrine. Sim? Não.



Lembre-se que registramos a tabela no Hive. O Hive não diferencia maiúsculas de minúsculas em nomes de campo, enquanto o parquet diferencia maiúsculas de minúsculas. Portanto, as partições com esquemas: field1: int e Field1: int são iguais para Hive, mas não para Spark. Lembre-se de colocar os nomes dos campos em letras minúsculas.



Depois disso, tudo parece estar bem.



No entanto, nem tudo é tão simples. Surge um segundo problema, também conhecido. Como cada nova partição é salva separadamente, os arquivos do serviço Spark estarão na pasta da partição, por exemplo, o sinalizador de sucesso da operação _SUCCESS. Isso gerará um erro ao tentar parquete. Para evitar isso, você precisa definir a configuração, impedindo que o Spark adicione arquivos de serviço à pasta:



hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")


Parece que agora, todos os dias, uma nova partição parquet é adicionada à pasta da vitrine de destino, onde os dados analisados ​​para o dia são armazenados. Tomamos cuidado para que não houvesse partições com conflito de tipo de dados.



Mas, diante de nós está o terceiro problema. Ora, não se conhece o esquema geral, aliás, no Hive, a tabela com o esquema errado, visto que cada nova partição, muito provavelmente, introduzia distorção no esquema.



Você precisa registrar novamente a tabela. Isso pode ser feito de forma simples: leia o parquet da vitrine novamente, pegue o esquema e crie um DDL baseado nele, com o qual registre novamente a pasta no Hive como uma tabela externa, atualizando o esquema da vitrine de destino.



Estamos diante de um quarto problema. A primeira vez que registramos a tabela, contamos com o Spark. Agora nós mesmos fazemos isso, e você precisa se lembrar que os campos de parquet podem começar com caracteres que não são válidos para o Hive. Por exemplo, o Spark lança linhas que não podem ser analisadas no campo "corrupt_record". Esse campo não pode ser registrado no Hive sem escapar.



Sabendo disso, temos o esquema:



f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)


Code ("_corrupt_record", "` _corrupt_record` ") +" "+ f [1] .replace (": "," `:"). Substituir ("<", "<` "). Substituir (", " , ",` "). replace (" array <`", "array <") torna o DDL seguro, isto é, em vez de:



create table tname (_field1 string, 1field string)


Com nomes de campo como "_field1, 1field", um DDL seguro é criado onde nomes de campo são escapados: crie a tabela `tname` (string` _field1`, string `1field`).



Surge a pergunta: como obter o dataframe com o esquema completo corretamente (em código pf)? Como faço para obter esse pf? Este é o quinto problema. Releia o esquema de todas as partições da pasta com arquivos em parquet da vitrine de destino? Este é o método mais seguro, mas o mais difícil.



O esquema já está no Hive. Você pode obter um novo esquema combinando o esquema de toda a tabela e a nova partição. Portanto, você precisa pegar o esquema da tabela do Hive e combiná-lo com o novo esquema de partição. Isso pode ser feito lendo os metadados de teste do Hive, salvando-os em uma pasta temporária e lendo as duas partições com o Spark ao mesmo tempo.



Basicamente, você tem tudo o que precisa: o esquema da tabela original no Hive e uma nova partição. Também temos os dados. Tudo o que resta é obter um novo esquema que combine o esquema da vitrine e novos campos da partição criada:



from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")


Em seguida, criamos o DDL para registrar a tabela, como no trecho anterior.

Se toda a cadeia estiver funcionando corretamente, ou seja - houve uma carga de inicialização e no Hive uma tabela criada corretamente, então obtemos um esquema de tabela atualizado.



E o último problema é que você não pode simplesmente adicionar uma partição à tabela Hive, pois ela será quebrada. Você precisa forçar o Hive a corrigir a estrutura da partição:



from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)


A simples tarefa de ler JSON e criar uma vitrine baseada nele se traduz na superação de uma série de dificuldades implícitas, para as quais você deve procurar soluções separadamente. Embora essas soluções sejam simples, demoram muito para serem encontradas.



Para implementar a construção da vitrine, tive que:



  • Adicionar partições à vitrine, livrando-se dos arquivos de serviço
  • Lidar com campos vazios nos dados originais que o Spark digitou
  • Lance tipos simples para string
  • Converta os nomes dos campos em minúsculas
  • Despejo de dados separado e registro de tabela no Hive (criação DDL)
  • Lembre-se de escapar de nomes de campo que podem não ser compatíveis com Hive
  • Aprenda a atualizar o cadastro de uma mesa no Hive


Resumindo, notamos que a decisão de construir vitrines esconde muitas armadilhas. Portanto, se surgirem dificuldades na implementação, é melhor entrar em contato com um parceiro experiente com conhecimentos de sucesso.



Obrigado por ler este artigo, esperamos que as informações sejam úteis.



All Articles