Repetimos a análise de coorte. Uma abordagem integrada - Python, SQL, Power BI

Boa tarde, queridos leitores! Este artigo é uma continuação do Python Revisiting Power BI Cohort Analysis ( link ). Eu recomendo fortemente que você conheça pelo menos brevemente, caso contrário, a história subsequente será incompreensível para você. Já se passou muito tempo desde seu lançamento no Habr. Eu revisei completamente a metodologia para resolver esses problemas. Meu primeiro desejo era simplesmente reescrever o material antigo, mas após alguma deliberação, cheguei à conclusão de que seria um passo mais razoável formalizar os desenvolvimentos em um novo manuscrito.





Qual é a causa raiz da minha "insatisfação" com Python e Power BI? A linguagem Python / R com bibliotecas temáticas e Power BI (Tableau, Qlik) pode cobrir 70-80% das necessidades de negócios no cálculo de métricas complexas e na construção de visualizações. Mas apenas quando se trata de processar conjuntos de dados relativamente pequenos com dados já agregados. Se estamos falando sobre manipulação preliminar de dados em escala industrial, então aqui o jogo muda para o lado do servidor com o banco de dados e usa SQL. Não abordei esse ponto na publicação anterior, então decidi eliminar essa omissão aqui.





Para desenvolver e testar consultas SQL, escolhi o banco de dados PostgreSQL. Instalei esse banco de dados localmente em um laptop. Não fiz nenhuma configuração específica, deixei todos os parâmetros como estão. Para repetir as etapas descritas no material, iniciar um contêiner com PostgreSQL também é adequado se você for amigo do Docker.





Você pode encontrar conjuntos de dados em formato csv e arquivos com scripts no GitHub ( link ). Como as informações foram preparadas com antecedência para carregamento direto, só precisei usar o programa pgAdmin integrado. O tempo de carregamento é de pouco mais de 1 milhão de linhas no modo editor gráfico, 4-5 segundos. Essa métrica se tornou uma referência porque não consegui superá-la com o código Python. O carregamento de dados no PostgreSQL usando scripts para as necessidades do exemplo de demonstração pode não ter sido implementado, mas não estamos procurando maneiras fáceis de análise.





A primeira etapa é criar a tabela de vendas. O código em si é extremamente simples e não requer nenhum comentário adicional.





import psycopg2

#   
conn = psycopg2.connect("dbname='db' user='postgres' password='gfhjkm' host='localhost' port='5432'")

print("Database opened successfully")

#  
cursor = conn.cursor()

with conn:
    cursor.execute("""
            DROP TABLE IF EXISTS sales;
        """)

    cursor.execute("""
            CREATE TABLE IF NOT EXISTS sales (
              id SERIAL PRIMARY KEY,
              date DATE NOT NULL, 
              promo TEXT NOT NULL,
              site TEXT NOT NULL,
              user_id TEXT NOT NULL,
              transaction_id INTEGER NOT NULL,
              amount INTEGER NOT NULL);
        """)


print("Operation done successfully")

#    
cursor.close()
conn.close()
      
      



A tabela é formada, executamos o seguinte script para gravar dados no banco de dados. Pandas e sqlalchemy trabalham em conjunto. Paralelamente, medimos o tempo usando datetime.





import os
import pandas as pd
import psycopg2
from sqlalchemy import create_engine
from datetime import datetime

start_time = datetime.now()

#   
engine = create_engine('postgresql://postgres:gfhjkm@localhost:5432/db')

print("Database opened successfully")

#     
path_to_data = "C:/Users/Pavel/PycharmProjects/database/"
#    
sale_records = pd.read_csv(os.path.join(path_to_data, "ohortAnalysis_2016_2018.csv"),
                           sep=";", parse_dates=["date"], dayfirst=True)
postgresql_table = "sales"
#    
sale_records.to_sql(postgresql_table, engine, if_exists='append', index=False)

print("Operation done successfully")

end_time = datetime.now()
print('Duration: {}'.format(end_time - start_time))
      
      



3 26 . . , sqlalchemy .





import psycopg2
from datetime import datetime

start_time = datetime.now()

#   
conn = psycopg2.connect("dbname='db' user='postgres' password='gfhjkm' host='localhost' port='5432'")
print("Database opened successfully")

#  
cursor = conn.cursor()

#     
path_to_data = "C:/Users/Pavel/PycharmProjects/database/"
#    
sale_records = pd.read_csv(os.path.join(path_to_data, "ohortAnalysis_2016_2018.csv"),
                           sep=";", parse_dates=["date"], dayfirst=True)

query = "INSERT INTO sales (date, promo, site, user_id, transaction_id, amount) values (%s, %s, %s, %s, %s, %s)"
dataset_for_db = sale_records.values.tolist()

cursor.executemany(query, dataset_for_db)
conn.commit()

print("Operation done successfully")

#    
cursor.close()
conn.close()

end_time = datetime.now()
print('Duration: {}'.format(end_time - start_time))
      
      



10 . – pandas.





import psycopg2
from datetime import datetime

start_time = datetime.now()

#   
conn = psycopg2.connect("dbname='db' user='postgres' password='gfhjkm' host='localhost' port='5432'")
print("Database opened successfully")

#  
cursor = conn.cursor()


#  .       
with open('ohortAnalysis_2016_2018.csv', 'r', encoding='UTF8') as f:
    next(f)
    cursor.copy_from(f, 'sales', sep=';', columns=('date','promo','site','user_id','transaction_id','amount'))
    conn.commit()

f.close()

print("Operation done successfully")

#    
cursor.close()
conn.close()

end_time = datetime.now()
print('Duration: {}'.format(end_time - start_time))
      
      



7 . . . . , pandas .





SQL , . . Python Power BI , . SQL .





SELECT s3.date,
	s3.user_id,
	s3.date - s2.first_date AS delta_days,
	ceil((s3.date - s2.first_date)::real/30::real)*30 AS cohort_days,
	to_char(s2.first_date,'YYYY-MM') AS first_transaction
	s3.amount
FROM public.sales AS s3
LEFT JOIN
				(SELECT s1.user_id,
						MIN(s1.date) AS first_date
					FROM public.sales AS s1
					GROUP BY s1.user_id) AS s2 ON s3.user_id = s2.user_id
ORDER BY s3.user_id,
	s3.date


SELECT  s.date,
		s.user_id,
		s.date - FIRST_VALUE(s.date) OVER(PARTITION BY s.user_id ORDER BY s.date) AS delta_days,
		ceil((s.date - FIRST_VALUE(s.date) OVER(PARTITION BY s.user_id ORDER BY s.date))::real/30::real)*30 AS cohort_days,
		to_char(FIRST_VALUE(s.date) OVER(PARTITION BY s.user_id ORDER BY s.date),'YYYY-MM') AS first_transaction,
		s.amount
FROM public.sales AS s
ORDER BY s.user_id,
	s.date
      
      



, , , . . PostgreSQL to_char().





( ) . - , CASE. , 3 . . , , . PostgreSQL . - .





. – 30 . 30. 0 30, 0, . 0 30 30. , . , 30 30, 1, . , , . PostgreSQL ceil(). 30 .





. INTEGER INTEGER, . ! , ::real .





: SQL .





, .





, .





SELECT r2.first_transaction,
		r2.cohort_days,
		--r2.total_amount,
		--sum(r2.total_amount) OVER (PARTITION BY r2.first_transaction ORDER BY r2.first_transaction, r2.cohort_days) as cumsum_amount,
		--first_value(r2.total_amount) OVER (PARTITION BY r2.first_transaction ORDER BY r2.first_transaction, r2.cohort_days) as first_total_amount,
		round((sum(r2.total_amount) OVER (PARTITION BY r2.first_transaction ORDER BY r2.first_transaction, r2.cohort_days)/ 
		first_value(r2.total_amount) OVER (PARTITION BY r2.first_transaction ORDER BY r2.first_transaction, r2.cohort_days)-1),3) as percent_cumsum_amount
FROM 
		(SELECT r.first_transaction, r.cohort_days, sum(r.amount) AS total_amount		
		FROM public.report_cohort_analysis AS r
		GROUP BY r.first_transaction, r.cohort_days
		ORDER BY r.first_transaction, r.cohort_days) as r2
      
      



, . , . (- ). - . ().





, .





. – SQL. PostgreSQL CROSSTAB, . BI . Power BI , ( , Python). ( SQL). .





Eu gostaria de concluir esta publicação com o seguinte pensamento. As melhores soluções de análise são construídas em torno da combinação ideal dos recursos de várias plataformas, em vez de extrair todos os frutos de uma ferramenta.





Isso é tudo. Toda saúde, boa sorte e sucesso profissional!








All Articles