Boa tarde, queridos leitores! Há poucos dias, relendo o livro de Anthony Molinaro “SQL. Uma coleção de receitas ”, em um dos capítulos me deparei com um tema que se dedicou a determinar o início e o fim da faixa de valores consecutivos. Depois de ler o material brevemente, lembrei imediatamente que já havia me deparado com essa questão como uma das tarefas do teste, mas então o tópico foi declarado como “A tarefa de encontrar sessões”. O truque da entrevista técnica não foi uma revisão do trabalho executado, mas uma das perguntas do entrevistador sobre como obter valores semelhantes usando o Spark. Preparando-me para a entrevista, não sabia se a empresa usa (ou talvez não ...) Apache Spark, portanto não coletou informações sobre uma nova ferramenta para mim naquela época. Restava apenas apresentar a hipótese de que a solução desejada poderia ser como um script,que pode ser escrito usando a biblioteca Pandas. Embora muito remotamente, ainda acertei o alvo, mas não consegui trabalhar nesta organização.
Para ser justo, quero observar que, ao longo dos anos, fiz pouco progresso no aprendizado do Apache Spark. Mas ainda quero compartilhar as melhores práticas com os leitores, já que muitos analistas nunca encontraram essa ferramenta e outros podem ter uma entrevista semelhante. Se você é um profissional Spark, pode sempre sugerir o código mais adequado nos comentários da postagem.
Este foi um preâmbulo, vamos proceder diretamente à análise deste tópico. Vamos primeiro escrever um script SQL. Mas primeiro, vamos criar um banco de dados e preenchê-lo com valores. Como este é um exemplo de demonstração, sugiro usar SQLite. Este banco de dados é inferior aos "colegas de trabalho" mais poderosos, mas suas capacidades de desenvolvimento de scripts são suficientes para nós por completo. Para automatizar as operações acima, escrevi o seguinte código em Python.
#
import sqlite3
#
projects = [
('2020-01-01', '2020-01-02'),
('2020-01-02', '2020-01-03'),
('2020-01-03', '2020-01-04'),
('2020-01-04', '2020-01-05'),
('2020-01-06', '2020-01-07'),
('2020-01-16', '2020-01-17'),
('2020-01-17', '2020-01-18'),
('2020-01-18', '2020-01-19'),
('2020-01-19', '2020-01-20'),
('2020-01-21', '2020-01-22'),
('2020-01-26', '2020-01-27'),
('2020-01-27', '2020-01-28'),
('2020-01-28', '2020-01-29'),
('2020-01-29', '2020-01-30')
]
try:
#
con = sqlite3.connect("projects.sqlite")
#
cur = con.cursor()
#
cur.execute("""CREATE TABLE IF NOT EXISTS projects (
proj_id INTEGER PRIMARY KEY AUTOINCREMENT,
proj_start TEXT,
proj_end TEXT)""")
#
cur.executemany("INSERT INTO projects VALUES(NULL, ?,?)", projects)
#
con.commit()
#
cur.close()
except sqlite3.Error as err:
print(" ", err)
finally:
#
con.close()
print(" ")
. DBeaver. , SQL .
select
p3.proj_group,
min(p3.proj_start) as date_start,
max(p3.proj_end) as date_end,
julianday(max(p3.proj_end))-julianday( min(p3.proj_end))+1 as delta
from
(select
p2.*,
sum(p2.flag)over(order by p2.proj_id) as proj_group
from
(select
p.proj_id ,
p.proj_start,
p.proj_end,
case
when lag(p.proj_end)over(order by p.proj_id) = p.proj_start then 0 else 1
end as flag
from projects as p) as p2) as p3
group by p3.proj_group
, . . , : . , . , , lag. 0, 1. , . . , . . , ( julianday SQLite). . Spark.
, Apache Spark , Hadoop. Java, Scala R, Spark PySpark. . Google Colab, . - , . , .
Linux OpenJDK, Spark. . findspark. , .
SQLite , . , .
Spark , . , . -, , , -, . , “ Spark. ”, , , , .
, , SQL. : , ( datediff).
, . , - , , , SQL Spark. , , . .
from pyspark.sql.functions import lag
from pyspark.sql import functions as F
from pyspark.sql.window import Window
# Equivalent of Pandas.dataframe.shift() method
w = Window().partitionBy().orderBy(col("proj_id"))
df_dataframe = df.withColumn('lag', F.lag("proj_end").over(w))
#...
# Equivalent of SQL- CASE WHEN...THEN...ELSE... END
df_dataframe = df_dataframe.withColumn('flag',F.when(df_dataframe["proj_start"] == df_dataframe["lag"],0).otherwise(1))
#...
# Cumsum by column flag
w = Window().partitionBy().orderBy(col("proj_id"))
df_dataframe = df_dataframe.withColumn("proj_group", F.sum("flag").over(w))
#...
# Equivalent of SQL - GROUP BY
from pyspark.sql.functions import min, max
df_group = df_dataframe.groupBy("proj_group").agg(min("proj_start").alias("date_start"), \
max("proj_end").alias("date_end"))
df_group = df_group.withColumn("delta", F.datediff(df_group.date_end,df_group.date_start))
df_group.show()
.
, . . , “” , .
Mesmo que você nunca tenha trabalhado com o Spark antes, isso não é motivo para recusar a competição por uma vaga. O básico do PySpark pode ser dominado em um curto espaço de tempo, desde que o background já tenha experiência em programação usando a biblioteca Pandas.
Não faltam livros no Spark.
Isso é tudo. Toda saúde, boa sorte e sucesso profissional!