Nelle due parti precedenti, abbiamo presentato il modello di database live per un'azienda basata su abbonamento e un data warehouse (DWH) che potremmo utilizzare per la creazione di report. Sebbene sia ovvio che dovrebbero lavorare insieme, non c'era alcuna connessione tra questi due modelli. Oggi faremo il passo successivo e scriveremo il codice per trasferire i dati dal database live al nostro DWH.
I modelli di dati
Prima di immergerci nel codice, ricordiamo a noi stessi i due modelli con cui lavoreremo. Il primo è il modello di dati transazionali che utilizzeremo per archiviare i nostri dati in tempo reale. Tenendo conto del fatto che gestiamo un'attività basata su abbonamento, dovremo memorizzare i dettagli del cliente e dell'abbonamento, gli ordini dei clienti e lo stato degli ordini.
C'è davvero molto che potremmo aggiungere a questo modello, come il monitoraggio dei pagamenti e la memorizzazione dei dati storici (in particolare le modifiche ai dati dei clienti e degli abbonamenti). Per enfatizzare il processo ETL (estrazione, trasformazione e caricamento), tuttavia, voglio mantenere questo modello il più semplice possibile.
L'utilizzo di un modello di dati transazionale come database di reporting potrebbe funzionare in alcuni casi, ma non in tutti i casi. Ne abbiamo già parlato, ma vale la pena ripeterlo. Se vogliamo separare le nostre attività di reporting dai nostri processi in tempo reale, dovremmo creare una sorta di database di reporting. Un data warehouse è una soluzione.
Il nostro DWH è incentrato su quattro tabelle dei fatti. I primi due tengono traccia del numero di clienti e abbonamenti a livello giornaliero. Gli altri due tengono traccia del numero di consegne e dei prodotti inclusi in queste consegne.
La mia ipotesi è che eseguiremo il nostro processo ETL una volta al giorno. Innanzitutto, compileremo le tabelle delle dimensioni con nuovi valori (ove necessario). Successivamente, compileremo le tabelle dei fatti.
Per evitare inutili ripetizioni, dimostrerò solo il codice che popolerà le prime due tabelle dimensionali e le prime due tabelle dei fatti. Le tabelle rimanenti possono essere popolate utilizzando un codice molto simile. Ti incoraggio a scrivere tu stesso il codice. Non c'è modo migliore per imparare qualcosa di nuovo che provarlo.
L'idea:tabelle dimensionali
L'idea generale è quella di creare procedure memorizzate che potremmo utilizzare regolarmente per popolare le tabelle delle dimensioni di DWH e delle tabelle dei fatti. Queste procedure trasferiranno i dati tra due database sullo stesso server. Ciò significa che alcune query all'interno di queste procedure utilizzeranno le tabelle di entrambi i database. Questo è previsto; dobbiamo confrontare lo stato del DWH con il DB live e apportare modifiche al DWH in base a ciò che sta accadendo nel DB live.
Abbiamo quattro tabelle dimensionali nel nostro DWH:dim_time
, dim_city
, dim_product
e dim_delivery_status
.
La dimensione temporale viene popolata aggiungendo la data precedente. Il presupposto principale è che eseguiremo questa procedura ogni giorno, dopo la chiusura dell'attività.
La città e le dimensioni del prodotto dipenderanno dai valori correnti memorizzati nella city
e product
dizionari nel database live. Se aggiungiamo qualcosa a questi dizionari, i nuovi valori verranno aggiunti alle tabelle delle dimensioni al prossimo aggiornamento DWH.
L'ultima tabella delle dimensioni è il dim_delivery_status
tavolo. Non verrà aggiornato perché contiene solo tre valori predefiniti. Una consegna è in transito, annullata o consegnata.
L'idea:tabelle dei fatti
Il popolamento delle tabelle dei fatti è in realtà il vero lavoro. Mentre i dizionari nel database live non contengono un attributo timestamp, le tabelle con i dati inseriti come risultato delle nostre operazioni lo fanno. Noterai due attributi di timestamp, time_inserted
e time_updated
, nel modello dati.
Ancora una volta, presumo che eseguiremo correttamente l'importazione DWH una volta al giorno. Questo ci permette di aggregare i dati a livello giornaliero. Conteremo il numero di clienti e abbonamenti attivi e cancellati, nonché le consegne e i prodotti consegnati per quella data.
Il nostro modello live funziona bene se eseguiamo una procedura di inserimento dopo il COB (chiusura dell'attività). Tuttavia, se vogliamo maggiore flessibilità, dovremmo apportare alcune modifiche al modello. Una di queste modifiche potrebbe essere una tabella della cronologia separata per tenere traccia del momento esatto in cui i dati relativi ai clienti o agli abbonamenti sono cambiati. Con la nostra attuale organizzazione, sapremo che la modifica è avvenuta, ma non sapremo se ci sono state modifiche prima di questa (ad es. un cliente ha cancellato ieri, ha riattivato il suo account dopo mezzanotte e poi ha cancellato di nuovo oggi) .
Popolare le tabelle dimensionali
Come accennato in precedenza, partirò dal presupposto che eseguiremo l'importazione DWH esattamente una volta al giorno. In caso contrario, avremmo bisogno di codice aggiuntivo per eliminare i dati appena inseriti dalle tabelle delle dimensioni e dei fatti. Per le tabelle dimensionali, ciò si limiterebbe all'eliminazione della data specificata.
Innanzitutto, verificheremo se la data specificata esiste nel dim_time
tavolo. In caso contrario, aggiungeremo una nuova riga alla tabella; se lo fa, non dobbiamo fare nulla. Nella maggior parte dei casi, tutte le date vengono inserite durante la distribuzione di produzione iniziale. Ma seguirò questo esempio per scopi didattici.
Per il dim_city
e dim_product
dimensioni, aggiungerò solo i nuovi valori rilevati nella city
e product
tavoli. Non effettuerò alcuna eliminazione perché qualsiasi valore inserito in precedenza potrebbe essere referenziato in una tabella dei fatti. Potremmo optare per una cancellazione graduale, ad es. avere un flag “attivo” che potremmo accendere e spegnere.
Per l'ultima tabella, dim_delivery_status
, non farò nulla perché conterrà sempre gli stessi tre valori.
Il codice seguente crea una procedura che popolerà le tabelle delle dimensioni dim_time
e dim_city
.
Per la dimensione temporale, aggiungerò la data di ieri. Presumo che il processo ETL inizi subito dopo la mezzanotte. Controllerò se quella dimensione esiste già e, in caso contrario, aggiungerò la nuova data nella tabella.
Per la dimensione della città, userò un LEFT JOIN per unire i dati dal database live e il database DWH per determinare quali righe mancano. Quindi aggiungerò solo i dati mancanti alla tabella delle dimensioni. Vale la pena ricordare che ci sono alcuni modi per verificare se i dati sono stati modificati. Questo processo è chiamato Change Data Capture o CDC. Un metodo comune consiste nel controllare timestamp o versioni aggiornate. Ci sono alcuni modi aggiuntivi, ma non rientrano nell'ambito di questo articolo.
Diamo un'occhiata al codice ora, che è scritto usando la sintassi MySQL .
DROP PROCEDURE IF EXISTS p_update_dimensions// CREATE PROCEDURE p_update_dimensions () BEGIN SET @time_exists = 0; SET @time_date = DATE_ADD(DATE(NOW()), INTERVAL -1 DAY); -- procedure populates dimension tables with new values -- dim_time SET @time_exists = (SELECT COUNT(*) FROM subscription_dwh.dim_time dim_time WHERE dim_time.time_date = @time_date); IF (@time_exists = 0) THEN INSERT INTO subscription_dwh.`dim_time`(`time_date`, `time_year`, `time_month`, `time_week`, `time_weekday`, `ts`) SELECT @time_date AS time_date, YEAR(@time_date) AS time_year, MONTH(@time_date) AS time_month, WEEK(@time_date) AS time_week, WEEKDAY(@time_date) AS time_weekday, NOW() AS ts; END IF; -- dim_city INSERT INTO subscription_dwh.`dim_city`(`city_name`, `postal_code`, `country_name`, `ts`) SELECT city_live.city_name, city_live.postal_code, country_live.country_name, Now() FROM subscription_live.city city_live INNER JOIN subscription_live.country country_live ON city_live.country_id = country_live.id LEFT JOIN subscription_dwh.dim_city city_dwh ON city_live.city_name = city_dwh.city_name AND city_live.postal_code = city_dwh.postal_code AND country_live.country_name = city_dwh.country_name WHERE city_dwh.id IS NULL; END// -- CALL p_update_dimensions ()
Esecuzione di questa procedura, che eseguiamo utilizzando la procedura commentata CALL -- inserisce una nuova data e tutte le città mancanti nelle tabelle dimensionali. Prova ad aggiungere il tuo codice per popolare le restanti tabelle a due dimensioni con nuovi valori.
Il processo ETL in un data warehouse
L'idea principale alla base del data warehousing è quella di contenere i dati aggregati nel formato desiderato. Ovviamente, dovremmo conoscere quel formato prima ancora di iniziare a costruire il magazzino. Se abbiamo fatto tutto come previsto, possiamo ottenere tutti i vantaggi che un DWH ci offre. Il vantaggio principale è il miglioramento delle prestazioni durante l'esecuzione di query. Le nostre query funzionano con un minor numero di record (perché aggregati) e vengono eseguite sul database dei rapporti (anziché su quello attivo).
Ma prima di poter interrogare, dobbiamo memorizzare i fatti nel nostro database. Il modo in cui lo faremo dipende da cosa dobbiamo fare con i nostri dati in seguito. Se non abbiamo un buon quadro generale prima di iniziare a costruire il nostro DWH, potremmo presto trovarci nei guai! presto.
Il nome di questo processo è ETL:E =Estrai, T =Trasforma, L =Carica. Afferra i dati, li trasforma per adattarli alla struttura DWH e li carica nel DWH. Per essere precisi, il processo effettivo che utilizzeremo è ELT:Extract, Load, Transform. Poiché utilizziamo procedure archiviate, estrarremo i dati, li caricheremo e quindi li trasformeremo per soddisfare le nostre esigenze. È bene sapere che mentre ETL ed ELT sono leggermente diversi, i termini a volte sono usati in modo intercambiabile.
Popolare le tabelle dei fatti
Il popolamento delle tabelle dei fatti è il motivo per cui siamo davvero qui. Oggi compilerò due tabelle dei fatti, il fact_customer_subscribed
tabella e il fact_subscription_status
tavolo. Le restanti due tabelle dei fatti sono tue da provare come compiti a casa.
Prima di passare al popolamento della tabella dei fatti, è necessario presumere che le tabelle delle dimensioni siano popolate con nuovi valori. Il popolamento delle tabelle dei fatti segue lo stesso schema. Dal momento che hanno la stessa struttura, li spiegherò entrambi insieme.
Stiamo raggruppando i dati in base a due dimensioni:ora e città. La dimensione temporale verrà impostata su ieri e troveremo l'ID del record correlato nel dim_time
tabella confrontando le date (l'ultimo INNER JOIN in entrambe le query).
L'ID di dim_city
viene estratto unendo tutti gli attributi che formano una combinazione UNICA nella tabella delle dimensioni (nome della città, codice postale e nome del paese).
In questa query, testeremo i valori con CASE e poi li SOMMA. Per i clienti attivi e inattivi, non ho testato la data. Tuttavia, ho selezionato i valori così come sono per questi campi. Per gli account nuovi e cancellati, ho testato l'ora aggiornata.
DROP PROCEDURE IF EXISTS p_update_facts// CREATE PROCEDURE p_update_facts () BEGIN SET @time_date = DATE_ADD(DATE(NOW()), INTERVAL -1 DAY); -- procedure populates fact tables with new values -- fact_customer_subscribed INSERT INTO `fact_customer_subscribed`(`dim_city_id`, `dim_time_id`, `total_active`, `total_inactive`, `daily_new`, `daily_canceled`, `ts`) SELECT city_dwh.id AS dim_ctiy_id, time_dwh.id AS dim_time_id, SUM(CASE WHEN customer_live.active = 1 THEN 1 ELSE 0 END) AS total_active, SUM(CASE WHEN customer_live.active = 0 THEN 1 ELSE 0 END) AS total_inactive, SUM(CASE WHEN customer_live.active = 1 AND DATE(customer_live.time_updated) = @time_date THEN 1 ELSE 0 END) AS daily_new, SUM(CASE WHEN customer_live.active = 0 AND DATE(customer_live.time_updated) = @time_date THEN 1 ELSE 0 END) AS daily_canceled, MIN(NOW()) AS ts FROM subscription_live.`customer` customer_live INNER JOIN subscription_live.`city` city_live ON customer_live.city_id = city_live.id INNER JOIN subscription_live.`country` country_live ON city_live.country_id = country_live.id INNER JOIN subscription_dwh.dim_city city_dwh ON city_live.city_name = city_dwh.city_name AND city_live.postal_code = city_dwh.postal_code AND country_live.country_name = city_dwh.country_name INNER JOIN subscription_dwh.dim_time time_dwh ON time_dwh.time_date = @time_date GROUP BY city_dwh.id, time_dwh.id; -- fact_subscription_status INSERT INTO `fact_subscription_status`(`dim_city_id`, `dim_time_id`, `total_active`, `total_inactive`, `daily_new`, `daily_canceled`, `ts`) SELECT city_dwh.id AS dim_ctiy_id, time_dwh.id AS dim_time_id, SUM(CASE WHEN subscription_live.active = 1 THEN 1 ELSE 0 END) AS total_active, SUM(CASE WHEN subscription_live.active = 0 THEN 1 ELSE 0 END) AS total_inactive, SUM(CASE WHEN subscription_live.active = 1 AND DATE(subscription_live.time_updated) = @time_date THEN 1 ELSE 0 END) AS daily_new, SUM(CASE WHEN subscription_live.active = 0 AND DATE(subscription_live.time_updated) = @time_date THEN 1 ELSE 0 END) AS daily_canceled, MIN(NOW()) AS ts FROM subscription_live.`customer` customer_live INNER JOIN subscription_live.`subscription` subscription_live ON subscription_live.customer_id = customer_live.id INNER JOIN subscription_live.`city` city_live ON customer_live.city_id = city_live.id INNER JOIN subscription_live.`country` country_live ON city_live.country_id = country_live.id INNER JOIN subscription_dwh.dim_city city_dwh ON city_live.city_name = city_dwh.city_name AND city_live.postal_code = city_dwh.postal_code AND country_live.country_name = city_dwh.country_name INNER JOIN subscription_dwh.dim_time time_dwh ON time_dwh.time_date = @time_date GROUP BY city_dwh.id, time_dwh.id; END// -- CALL p_update_facts ()
Ancora una volta, ho commentato l'ultima riga. Rimuovi il commento e puoi usare questa riga per chiamare la procedura e inserire nuovi valori. Tieni presente che non ho eliminato alcun vecchio valore esistente, quindi questa procedura non funzionerà se abbiamo già valori per quella data e città. Questo può essere risolto eseguendo eliminazioni prima degli inserimenti.
Ricorda, dobbiamo popolare le restanti tabelle dei fatti nel nostro DWH. Ti incoraggio a provarlo tu stesso!
Un'altra cosa che consiglierei sicuramente è inserire l'intero processo all'interno di una transazione. Ciò assicurerebbe che tutti gli inserimenti abbiano esito positivo o che non ne venga eseguito nessuno. Questo è molto importante quando vogliamo evitare che i dati vengano inseriti parzialmente, ad es. se abbiamo più procedure per inserire dimensioni e fatti e alcune di esse fanno il loro lavoro mentre altre falliscono.
Cosa ne pensi?
Oggi abbiamo visto come potremmo eseguire il processo ELT/ETL e caricare i dati da un database live in un data warehouse. Sebbene il processo che abbiamo dimostrato sia piuttosto semplificato, contiene tutti gli elementi necessari per E(estrarre) i dati, T(trasformarli) in un formato adatto e infine L(caricarli) nel DWH. Cosa ne pensi? Raccontaci le tue esperienze nei commenti qui sotto.