Mysql
 sql >> Database >  >> RDS >> Mysql

Utilizzo di Python e MySQL nel processo ETL:Utilizzo di Python e SQLAlchemy

Nei due articoli precedenti di questa serie, abbiamo discusso su come utilizzare Python e SQLAlchemy per eseguire il processo ETL. Oggi faremo lo stesso, ma questa volta utilizzando Python e SQL Alchemy senza comandi SQL in formato testuale. Questo ci consentirà di utilizzare SQLAlchemy indipendentemente dal motore di database a cui siamo connessi. Quindi, iniziamo.

Oggi parleremo di come eseguire il processo ETL utilizzando Python e SQLAlchemy. Creeremo uno script per estrarre i dati giornalieri dal nostro database operativo, trasformarlo e quindi caricarlo nel nostro data warehouse.

Questo è il terzo articolo della serie. Se non hai letto i primi due articoli (Utilizzo di Python e MySQL nel processo ETL e SQLAlchemy), ti consiglio vivamente di farlo prima di continuare.

L'intera serie è una continuazione della nostra serie di data warehouse:

  • Creazione di un DWH, prima parte:un modello di dati aziendali in abbonamento
  • Creazione di un DWH, parte seconda:un modello di dati aziendali in abbonamento
  • Creazione di un data warehouse, parte 3:un modello di dati aziendali in abbonamento

Bene, ora iniziamo con l'argomento di oggi. Per prima cosa, diamo un'occhiata ai modelli di dati.

I modelli di dati



Modello di dati del database operativo (attivo)




Modello dati DWH


Questi sono i due modelli di dati che useremo. Per ulteriori informazioni sui data warehouse (DWH), consulta questi articoli:

  • Lo schema a stella
  • Lo schema del fiocco di neve
  • Schema a stella vs. Schema a fiocco di neve

Perché SQLAlchemy?

L'idea alla base di SQLAlchemy è che dopo aver importato i database, non abbiamo bisogno di codice SQL specifico per il relativo motore di database. Invece, possiamo importare oggetti in SQLAlchemy e usare la sintassi SQLAlchemy per le istruzioni. Ciò ci consentirà di utilizzare la stessa lingua, indipendentemente dal motore di database a cui siamo collegati. Il vantaggio principale qui è che uno sviluppatore non ha bisogno di prendersi cura delle differenze tra i diversi motori di database. Il tuo programma SQLAlchemy funzionerà esattamente allo stesso modo (con modifiche minori) se esegui la migrazione a un motore di database diverso.

Ho deciso di utilizzare solo i comandi SQLAlchemy e gli elenchi Python per comunicare con la memoria temporanea e tra database diversi. I motivi principali alla base di questa decisione sono che 1) gli elenchi Python sono ben noti e 2) il codice sarebbe leggibile per chi non ha competenze in Python.

Questo non vuol dire che SQLAlchemy sia perfetto. Ha alcune limitazioni, di cui parleremo più avanti. Per ora, diamo solo un'occhiata al codice qui sotto:

Esecuzione dello script e del risultato

Questo è il comando Python usato per chiamare il nostro script. Lo script controlla i dati nel database operativo, confronta i valori con il DWH e importa i nuovi valori. In questo esempio, stiamo aggiornando i valori in due tabelle dimensionali e una tabella dei fatti; lo script restituisce l'output appropriato. L'intero script è scritto in modo da poterlo eseguire più volte al giorno. Eliminerà i dati "vecchi" per quel giorno e li sostituirà con nuovi.

Analizziamo l'intero script, partendo dall'alto.

Importazione di SQLAlchemy

La prima cosa che dobbiamo fare è importare i moduli che useremo nello script. Di solito, importerai i tuoi moduli mentre scrivi lo script. Nella maggior parte dei casi, non saprai esattamente di quali moduli avrai bisogno all'inizio.

from datetime import date

# import SQLAlchemy
from sqlalchemy import create_engine, select, MetaData, Table, and_, func, case

Abbiamo importato datetime di Python modulo, che ci fornisce classi che funzionano con le date.

Successivamente, abbiamo la sqlalchemy modulo. Non importeremo l'intero modulo, solo le cose di cui abbiamo bisogno, alcune specifiche per SQLAlchemy (create_engine , MetaData , Table ), alcune parti di istruzioni SQL (select , and_ , case ), e func , che ci consente di utilizzare funzioni come count() e somma() .

Collegamento ai database

Dovremo connetterci a due database sul nostro server. Se necessario, potremmo connetterci a più database (MySQL, SQL Server o qualsiasi altro) da server diversi. In questo caso, entrambi i database sono database MySQL e sono archiviati sul mio computer locale.

# connect to databases
engine_live = sqlalchemy.create_engine('mysql+pymysql://:@localhost:3306/subscription_live')
connection_live = engine_live.connect()
engine_dwh = sqlalchemy.create_engine('mysql+pymysql://:@localhost:3306/subscription_dwh')
connection_dwh = engine_dwh.connect()

metadata = MetaData(bind=None)

Abbiamo creato due motori e due connessioni. Non entrerò nei dettagli qui perché l'abbiamo già spiegato nell'articolo precedente.

Aggiornamento di dim_time Dimensione

Obiettivo:inserire la data di ieri se non è già inserita nella tabella.

Nel nostro script, aggiorneremo due tabelle dimensionali con nuovi valori. Il resto segue lo stesso schema, quindi lo esamineremo solo una volta; non abbiamo bisogno di annotare un codice quasi identico un paio di volte in più.

L'idea è molto semplice. Eseguiremo sempre lo script per inserire nuovi dati per ieri. Pertanto, dobbiamo verificare se quella data è stata inserita nella tabella delle dimensioni. Se è già lì, non faremo nulla; se non lo è, lo aggiungeremo. Diamo un'occhiata al codice per aggiornare il dim_time tabella.

Innanzitutto, verificheremo se la data esiste. Se non esiste, lo aggiungeremo. Iniziamo con la memorizzazione della data di ieri in una variabile. In Python, lo fai in questo modo:

yesterday = date.fromordinal(date.today().toordinal()-1)
yesterday_str = str(yesterday)

La prima riga prende una data corrente, la converte in un valore numerico, sottrae 1 da quel valore e riconverte quel valore numerico in una data (ieri =oggi – 1 ). La seconda riga memorizza la data in formato testuale.

Successivamente, verificheremo se la data è già nel database:

table_dim_time = Table('dim_time', metadata, autoload = True, autoload_with = engine_dwh)
stmt = select([table_dim_time]).where(table_dim_time.columns.time_date == yesterday_str)
result = connection_dwh.execute(stmt).fetchall()
date_exists = len(result)

Dopo aver caricato la tabella, eseguiremo una query che dovrebbe restituire tutte le righe della tabella delle dimensioni in cui il valore di data/ora è uguale a ieri. Il risultato potrebbe avere 0 (nessuna data nella tabella) o 1 riga (la data è già nella tabella).

Se la data non è già nella tabella, useremo il comando insert() per aggiungerla:

if date_exists == 0:
  print("New value added.")
  stmt = table_dim_time.insert().values(time_date=yesterday, time_year=yesterday.year, time_month=yesterday.month, time_week=yesterday.isocalendar()[1], time_weekday=yesterday.weekday())
  connection_dwh.execute(stmt)
else:
  print("No new values.")

Una cosa nuova qui che vorrei sottolineare è l'utilizzo di. .year , .month , .isocalendar()[1] e .weekday per ottenere dateparts.

Aggiornamento di dim_city Dimensione

Obiettivo:inserire nuove città se ce ne sono (ovvero confrontare l'elenco delle città nel database live con l'elenco delle città nel DWH e aggiungere quelle mancanti).

Aggiornamento di dim_time la dimensione era piuttosto semplice. Abbiamo semplicemente verificato se nella tabella era presente una data e l'abbiamo inserita se non era già presente. Per testare un valore nel database DWH, abbiamo utilizzato una variabile Python (ieri ). Utilizzeremo di nuovo quel processo, ma questa volta con gli elenchi.

Poiché non esiste un modo semplice per combinare tabelle di database diversi in una singola query SQLAlchemy, non possiamo utilizzare l'approccio descritto nella Parte 1 di questa serie. Pertanto, avremo bisogno di un oggetto per memorizzare i valori necessari per comunicare tra questi due database. Ho deciso di usare le liste, perché sono comuni e fanno il loro lavoro.

Per prima cosa, caricheremo il country e city tabelle da un database live negli oggetti rilevanti.

# dim_city
print("\nUpdating... dim_city")
table_city = Table('city', metadata, autoload = True, autoload_with = engine_live)
table_country = Table('country', metadata, autoload = True, autoload_with = engine_live)
table_dim_city = Table('dim_city', metadata, autoload = True, autoload_with = engine_dwh)

Poi caricheremo il dim_city tabella dall'acqua calda sanitaria in un elenco:

# load whole dwh table in the list
stmt = select([table_dim_city]);
table_dim_city_list = connection_dwh.execute(stmt).fetchall()

Quindi faremo lo stesso per i valori del database live. Ci uniremo ai tavoli country e city quindi abbiamo tutti i dati necessari in questo elenco:

# load all live values in the list
stmt = select([table_city.columns.city_name, table_city.columns.postal_code, table_country.columns.country_name])\
	.select_from(table_city\
	.join(table_country))
table_city_list = connection_live.execute(stmt).fetchall()

Ora scorreremo l'elenco contenente i dati dal database live. Per ogni record, confronteremo i valori (city_name , postal_code e country_name ). Se non troviamo tali valori, aggiungeremo un nuovo record in dim_city tabella.

# loop through live_db table
# for each record test if it is missing in the dwh table
new_values_added = 0
for city in table_city_list:
	id = -1;
	for dim_city in table_dim_city_list:
		if city[0] == dim_city[1] and city[1] == dim_city[2] and city[2] == dim_city[3]:
			id = dim_city[0]
	if id == -1:
		stmt = table_dim_city.insert().values(city_name=city[0], postal_code=city[1], country_name=city[2])
		connection_dwh.execute(stmt)
		new_values_added = 1
if new_values_added == 0:
	print("No new values.")
else:
	print("New value(s) added.")

Per determinare se il valore è già nel DWH, abbiamo testato una combinazione di attributi che dovrebbero essere univoci. (La chiave primaria del database live non ci aiuta molto qui.) Possiamo usare un codice simile per aggiornare altri dizionari. Non è la soluzione più bella, ma è comunque piuttosto elegante. E farà esattamente ciò di cui abbiamo bisogno.

Aggiornamento del fact_customer_subscribed Tabella

Obiettivo:se disponiamo di vecchi dati per la data di ieri, eliminali prima. Aggiungi i dati di ieri nel DWH, indipendentemente dal fatto che abbiamo eliminato qualcosa nel passaggio precedente o meno.

Dopo aver aggiornato tutte le tabelle delle dimensioni, dovremmo aggiornare le tabelle dei fatti. Nel nostro script, aggiorneremo solo una tabella dei fatti. Il ragionamento è lo stesso della sezione precedente:l'aggiornamento di altre tabelle seguirebbe lo stesso schema, quindi ripeteremmo principalmente il codice.

Prima di inserire valori nella tabella dei fatti, è necessario conoscere i valori delle relative chiavi dalle tabelle dimensionali. Per farlo, caricheremo nuovamente le dimensioni negli elenchi e le confronteremo con i valori del database live.

La prima cosa che faremo è caricare il cliente e fact_customer_subscribed tabelle in oggetti:

# fact_customer_subscribed
print("\nUpdating... fact_customer_subscribed")

table_customer = Table('customer', metadata, autoload = True, autoload_with = engine_live)
table_fact_customer_subscribed = Table('fact_customer_subscribed', metadata, autoload = True, autoload_with = engine_dwh)

Ora dovremo trovare le chiavi per la relativa dimensione temporale. Poiché inseriamo sempre i dati di ieri, cercheremo quella data nel dim_time tabella e usa il suo ID. La query restituisce 1 riga e l'ID è nella prima posizione (l'indice inizia da 0, quindi è result[0][0] ):

# find key for the dim_time dimension
stmt = select([table_dim_time]).where(table_dim_time.columns.time_date == yesterday)
result = connection_dwh.execute(stmt).fetchall()
dim_time_id = result[0][0]

Per quel momento, elimineremo tutti i record associati dalla tabella dei fatti:

# delete any existing data in the fact table for that time dimension value
stmt = table_fact_customer_subscribed.delete().where(table_fact_customer_subscribed.columns.dim_time_id == dim_time_id)
connection_dwh.execute(stmt)

Ok, ora abbiamo l'ID della dimensione temporale memorizzata nel dim_time_id variabile. Questo è stato facile perché possiamo avere un solo valore della dimensione temporale. La storia sarà diversa per la dimensione della città. Innanzitutto, caricheremo tutto i valori di cui abbiamo bisogno – valori che descrivono in modo univoco la città (non l'ID) e valori aggregati:

# prepare data for insert
stmt = select([table_city.columns.city_name, table_city.columns.postal_code, table_country.columns.country_name, func.sum(case([(table_customer.columns.active == 1, 1)], else_=0)).label('total_active'), func.sum(case([(table_customer.columns.active == 0, 1)], else_=0)).label('total_inactive'), func.sum(case([(and_(table_customer.columns.active == 1, func.date(table_customer.columns.time_updated) == yesterday), 1)], else_=0)).label('daily_new'), func.sum(case([(and_(table_customer.columns.active == 0, func.date(table_customer.columns.time_updated) == yesterday), 1)], else_=0)).label('daily_canceled')])\
	.select_from(table_customer\
	.join(table_city)\
	.join(table_country))\
	.group_by(table_city.columns.city_name, table_city.columns.postal_code, table_country.columns.country_name)

Ci sono alcune cose che vorrei sottolineare sulla query precedente:

  • func.sum(...) è SOMMA(...) da "SQL standard".
  • Il case(...) la sintassi usa and_ prima delle condizioni, non tra di loro.
  • .label(...) funziona come un alias SQL AS.
  • Stiamo usando \ per passare alla riga successiva e aumentare la leggibilità della query. (Fidati di me, è praticamente illeggibile senza la barra - l'ho provato :))
  • .group_by(...) svolge il ruolo di GROUP BY di SQL.

Successivamente, esamineremo tutti i record restituiti utilizzando la query precedente. Per ogni record, confronteremo i valori che definiscono in modo univoco una città (city_name , postal_code , country_name ) con i valori memorizzati nell'elenco creato da DWH dim_city tavolo. Se tutti e tre i valori corrispondono, memorizzeremo l'ID dall'elenco e lo utilizzeremo per inserire nuovi dati. In questo modo, per ogni record, avremo ID per entrambe le dimensioni:

# loop through all new records
# use time dimension
# for each record find key for city dimension
# insert row
new_values = connection_live.execute(stmt).fetchall()
for new_value in new_values:
	dim_city_id = -1;
	for dim_city in table_dim_city_list:
		if new_value[0] == dim_city[1] and new_value[1] == dim_city[2] and new_value[2] == dim_city[3]:
			dim_city_id = dim_city[0]
	if dim_city_id > 0:	
		stmt_insert = table_fact_customer_subscribed.insert().values(dim_city_id=dim_city_id, dim_time_id=dim_time_id, total_active=new_value[3], total_inactive=new_value[4], daily_new=new_value[5], daily_canceled=new_value[6])
		connection_dwh.execute(stmt_insert)
		dim_city_id = -1
print("Completed.")

E il gioco è fatto. Abbiamo aggiornato il nostro DWH. Lo script sarebbe molto più lungo se aggiornassimo tutte le dimensioni e le tabelle dei fatti. La complessità sarebbe anche maggiore quando una tabella dei fatti è correlata a più tabelle dimensionali. In tal caso, avremmo bisogno di un per loop per ogni tabella dimensionale.

Non funziona!

Sono rimasto molto deluso quando ho scritto questo script e poi ho scoperto che qualcosa del genere non funzionava:

stmt = select([table_city.columns.city_name])\
	.select_from(table_city\
	.outerjoin(table_dim_city, table_city.columns.city_name == table_dim_city.columns.city_name))\
	.where(table_dim_city.columns.id.is_(None))

In questo esempio, sto cercando di utilizzare tabelle di due database diversi. Se stabiliamo due connessioni separate, la prima connessione non "vedrà" le tabelle di un'altra connessione. Se ci colleghiamo direttamente al server e non a un database, non saremo in grado di caricare le tabelle.

Fino a quando questo non cambierà (si spera presto), dovrai utilizzare una sorta di struttura (ad esempio quello che abbiamo fatto oggi) per comunicare tra i due database. Ciò complica il codice, perché è necessario sostituire una singola query con due elenchi e nidificati for loop.

Condividi le tue opinioni su SQLAlchemy e Python

Questo è stato l'ultimo articolo di questa serie. Ma chi lo sa? Forse proveremo un altro approccio nei prossimi articoli, quindi resta sintonizzato. Nel frattempo, condividi le tue opinioni su SQLAlchemy e Python in combinazione con i database. Cosa pensi che ci manchi in questo articolo? Cosa aggiungeresti? Dicci nei commenti qui sotto.

Puoi scaricare lo script completo che abbiamo utilizzato in questo articolo qui.

E un ringraziamento speciale va a Dirk J Bosman (@dirkjobosman), che ha consigliato questa serie di articoli.