Desenvolvimento de modelo no PySpark ML em um conjunto de dados com diferentes tipos de dados para manequins enferrujados

Você já sabe como trabalhar com vários tipos de dados no PySpark ML? Não? Então você precisa nos visitar com urgência.



imagem



Olá! Quero abordar em detalhes um tópico interessante, mas, infelizmente, não é um tópico na documentação do Spark: como treinar um modelo no PySpark ML em um conjunto de dados com diferentes tipos de dados (strings e números)? O desejo de escrever este artigo foi causado pela necessidade de navegar na Internet por vários dias em busca do artigo necessário com o código, pois o tutorial oficial do Spark fornece um exemplo de trabalho não apenas com sinais de um tipo de dados, mas geralmente com um sinal, mas informações sobre como trabalhar com várias colunas os mais diferentes tipos de dados, não há. No entanto, tendo estudado em detalhes os recursos do PySpark para trabalhar com dados, consegui escrever um código funcional e entender como tudo acontece, o que quero compartilhar com você. A toda velocidade, amigos!



Inicialmente, vamos importar todas as bibliotecas necessárias para o trabalho, e depois vamos analisar o código em detalhes para que qualquer "bule enferrujado" que se preze, como, aliás, recentemente, vai entender tudo:



#  
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import *
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import pyspark.sql.functions as sf
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.regression import GBTRegressor
#other types of regression models
#     
#from pyspark.ml.regression import LinearRegression
#from pyspark.ml.regression import RandomForestRegressor
#from pyspark.ml.regression import GeneralizedLinearRegression
#from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator


Agora vamos criar um contexto (local) do Spark e uma sessão do Spark e verificar se tudo funciona exibindo-o na tela. Criar uma sessão do Spark é o ponto de partida para trabalhar com conjuntos de dados no Spark:



#  
sc = SparkContext('local')
spark = SparkSession(sc)
spark






Existe uma ferramenta para trabalhar com dados, agora vamos carregá-la. O artigo usa um conjunto de dados retirado do site da competição de aprendizado de máquina Kaggle:

https://www.kaggle.com/unitednations/international-greenhouse-gas-emissions

que, após o download, é armazenado em path_csv no formato .csv e tem as seguintes opções:



  • cabeçalho: se em nosso arquivo a primeira linha for um cabeçalho, defina "verdadeiro"
  • delimitador: colocamos um sinal que separa os dados de uma linha por sinais, muitas vezes é "," ou ";"
  • inferSchema: se verdadeiro, o PySpark detectará automaticamente o tipo de cada coluna, caso contrário, você terá que escrevê-lo você mesmo


#   .csv  path_csv
path_csv = 'greenhouse_gas_inventory_data_data.csv'
data = spark.read.format("csv")\
        .option("header", "true")\
        .option("delimiter", ",")\
        .option("inferSchema", "true")\
        .load(path_csv)


Para entender melhor com que tipo de dados estamos lidando, vamos examinar algumas de suas linhas:



#   
data.show()




Vejamos também quantas linhas temos no conjunto de dados:

#  
data.select('year').count()






E, finalmente, vamos inferir os tipos de nossos dados, que, como nos lembramos, pedimos ao PySpark para determinar automaticamente usando a opção ("inferSchema", "true"):



#     
data.printSchema()






Agora vamos passar ao nosso prato principal - trabalhar com vários sinais de diferentes tipos de dados. O Spark pode treinar o modelo nos dados transformados, onde a coluna prevista é um vetor e as colunas com recursos também são um vetor, o que complica a tarefa ... Mas não desistimos, e para treinar o modelo no PySpark usaremos o Pipeline, para o qual passaremos um determinado plano de ação (variável estágios):



  1. etapa label_stringIdx: transformamos a coluna do conjunto de dados de valor que desejamos prever em uma string de vetor Spark e a renomeamos para rotular com o parâmetro handleInvalid = 'keep', o que significa que nossa coluna prevista suporta nulo
  2. step stringIndexer: converter colunas de string em strings categóricas do Spark
  3. encoder: ()
  4. assembler: Spark, , VectorAssembler(), ( ) (assemblerInputs) «features»
  5. gbt: PySpark ML GBTRegressor,


#value -      - 
stages = []
label_stringIdx = StringIndexer(inputCol = 'value', outputCol = 'label', handleInvalid = 'keep')
stages += [label_stringIdx]

#depend on categorical columns: country and types of emission
#   :    
categoricalColumns = ['country_or_area', 'category']
for categoricalCol in categoricalColumns:
    #        
    stringIndexer = StringIndexer(inputCol = categoricalCol,
                                  outputCol = categoricalCol + 'Index',
                                  handleInvalid = 'keep')
    encoder = OneHotEncoder(inputCol=stringIndexer.getOutputCol(),
                            outputCol=categoricalCol + "classVec")
    stages += [stringIndexer, encoder]

#   : 
numericCols = ['year']
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
#    - - 
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]


Vamos dividir nosso conjunto de dados em amostras de treinamento e teste em nossa proporção favorita de 70% a 30%, respectivamente, e começar a treinar o modelo usando uma árvore de aumento de regressão de gradiente (GBTRegressor), que deve prever o vetor de "rótulo" com base nos recursos previamente combinados em um vetor de "recursos" com limite iterável maxIter = 10:



#       (30% )
(trainingData, testData) = data.randomSplit([0.7, 0.3])

#  (   )
gbt = GBTRegressor(labelCol="label", featuresCol="features", maxIter=10)
stages += [gbt]

#   stages    
pipeline = Pipeline(stages=stages)


E agora só precisamos enviar ao computador um plano de ação e um conjunto de dados de treinamento:



#  
model = pipeline.fit(trainingData)

#     
predictions = model.transform(testData)


Vamos salvar nosso modelo para que possamos sempre voltar a usá-lo, sem treinamento adicional:



# 
pipeline.write().overwrite().save('model/gbtregr_model')


E se você decidir começar a usar o modelo treinado para previsões novamente, basta escrever:



#     
load_model = pipeline.read().load('model/gbtregr_model')




Então, vimos como em uma ferramenta para trabalhar com big data na linguagem Python, o PySpark, o trabalho com várias colunas de recursos de diferentes tipos de dados é implementado.



Agora é hora de aplicar isso aos seus modelos ...



All Articles