Apache HBase è tutto incentrato sul darti accesso casuale, in tempo reale, in lettura/scrittura ai tuoi Big Data, ma come puoi ottenere in modo efficiente quei dati in HBase in primo luogo? Intuitivamente, un nuovo utente proverà a farlo tramite le API client o utilizzando un processo MapReduce con TableOutputFormat, ma questi approcci sono problematici, come imparerai di seguito. Al contrario, la funzione di caricamento in blocco HBase è molto più facile da usare e può inserire la stessa quantità di dati più rapidamente.
Questo post del blog introdurrà i concetti di base della funzione di caricamento in blocco, presenterà due casi d'uso e proporrà due esempi.
Panoramica del caricamento in blocco
Se hai uno di questi sintomi, il caricamento in blocco è probabilmente la scelta giusta per te:
- Dovevi modificare i tuoi MemStore per utilizzare la maggior parte della memoria.
- Dovevi utilizzare WAL più grandi o ignorarli del tutto.
- Le code di compattazione e svuotamento sono a centinaia.
- Il tuo GC è fuori controllo perché i tuoi inserti sono compresi nei MB.
- La tua latenza esce dal tuo SLA quando importi i dati.
La maggior parte di questi sintomi viene comunemente definita "dolore della crescita". L'utilizzo del caricamento collettivo può aiutarti a evitarli.
Nel linguaggio HBase, il caricamento in blocco è il processo di preparazione e caricamento di HFiles (il formato di file di HBase) direttamente nei RegionServer, aggirando così il percorso di scrittura e ovviando completamente a questi problemi. Questo processo è simile a ETL e si presenta così:
- Ispeziona la tabella per configurare un partizionamento ordine totale
- Carica il file delle partizioni nel cluster e lo aggiunge a DistributedCache
- Imposta il numero di attività di riduzione in modo che corrisponda al numero corrente di regioni
- Imposta la classe chiave/valore di output in modo che corrisponda ai requisiti di HFileOutputFormat
- Imposta il riduttore per eseguire l'ordinamento appropriato (KeyValueSortReducer o PutSortReducer)
A questo punto, verrà creato un file H per regione nella cartella di output. Tieni presente che i dati di input vengono quasi completamente riscritti, quindi avrai bisogno di almeno il doppio della quantità di spazio disponibile su disco rispetto alla dimensione del set di dati originale. Ad esempio, per un mysqldump da 100 GB dovresti avere almeno 200 GB di spazio su disco disponibile in HDFS. Puoi eliminare il file dump alla fine del processo.
Ecco un'illustrazione di questo processo. Il flusso di dati va dalla sorgente originale a HDFS, dove i RegionServer sposteranno semplicemente i file nelle directory delle loro regioni.
Casi d'uso
Carico del set di dati originale: Tutti gli utenti che migrano da un altro datastore dovrebbero considerare questo caso d'uso. Innanzitutto, devi eseguire l'esercizio di progettazione dello schema della tabella e quindi creare la tabella stessa, pre-divisa. I punti di divisione devono prendere in considerazione la distribuzione delle chiavi di riga e il numero di RegionServer. Consiglio di leggere la presentazione del mio collega Lars George sulla progettazione di schemi avanzati per qualsiasi caso d'uso serio.
Il vantaggio qui è che è molto più veloce scrivere i file direttamente che passare attraverso il percorso di scrittura di RegionServer (scrivendo sia su MemStore che su WAL) e poi eventualmente svuotare, compattare e così via. Significa anche che non devi ottimizzare il tuo cluster per un carico di lavoro pesante in scrittura e quindi ottimizzarlo di nuovo per il tuo normale carico di lavoro.
Carico incrementale: Diciamo che hai un set di dati attualmente servito da HBase, ma ora devi importare più dati in batch da una terza parte o hai un lavoro notturno che genera alcuni gigabyte che devi inserire. Probabilmente non è grande quanto il set di dati che HBase sta già servendo, ma potrebbe influenzare il 95° percentile della tua latenza. L'esecuzione del normale percorso di scrittura avrà l'effetto negativo di attivare più lavaggi e compattazioni durante l'importazione rispetto al normale. Questo ulteriore stress IO competerà con le tue query sensibili alla latenza.
Esempi
Puoi utilizzare i seguenti esempi nel tuo cluster Hadoop, ma le istruzioni sono fornite per Cloudera QuickStart VM, che è un cluster a nodo singolo, un sistema operativo guest e dati ed esempi di esempio inseriti in un'appliance di macchina virtuale per il tuo desktop.
Una volta avviata la VM, digli, tramite l'interfaccia web che si aprirà automaticamente, di distribuire CDH e quindi assicurati che sia avviato anche il servizio HBase.
Caricatore di massa TSV integrato
HBase viene fornito con un lavoro MR in grado di leggere un file di valori separati da delimitatori e di inviarlo direttamente in una tabella HBase o creare file H per il caricamento di massa. Qui andiamo a:
- Ottieni i dati di esempio e caricali su HDFS.
- Esegui il lavoro ImportTsv per trasformare il file in più HFile secondo una tabella preconfigurata.
- Prepara e carica i file in HBase.
Il primo passaggio consiste nell'aprire una console e utilizzare il comando seguente per ottenere dati di esempio:
curl -O https://people.apache.org/~jdcryans/word_count.csv
Ho creato questo file eseguendo un conteggio delle parole sul manoscritto originale di questo stesso post del blog e quindi generando il risultato in formato csv, senza titoli di colonna. Ora carica il file su HDFS:
hdfs dfs -put word_count.csv
Una volta completata la parte di estrazione del caricamento di massa, è necessario trasformare il file. Per prima cosa devi progettare il tavolo. Per semplificare le cose, chiamalo "conteggio parole":le chiavi di riga saranno le parole stesse e l'unica colonna conterrà il conteggio, in una famiglia che chiameremo "f". La procedura migliore durante la creazione di una tabella è dividerla in base alla distribuzione della chiave di riga, ma per questo esempio creeremo solo cinque regioni con punti di divisione distribuiti uniformemente nello spazio chiave. Apri la shell hbase:
hbase shell
Ed esegui il seguente comando per creare la tabella:
create 'wordcount', {NAME => 'f'}, {SPLITS => ['g', 'm', 'r', 'w']}
I quattro punti di divisione genereranno cinque regioni, in cui la prima regione inizia con una chiave di riga vuota. Per ottenere punti di divisione migliori potresti anche fare una rapida analisi per vedere come sono veramente distribuite le parole, ma lascio a te la scelta.
Se punti il browser della tua macchina virtuale su http://localhost:60010/ vedrai la nostra tabella appena creata e le sue cinque regioni tutte assegnate al RegionServer.
Ora è il momento di fare il lavoro pesante. Richiamando il jar HBase sulla riga di comando con lo script "hadoop" verrà mostrato un elenco di strumenti disponibili. Quello che vogliamo si chiama importtsv e ha il seguente utilizzo:
hadoop jar /usr/lib/hbase/hbase-0.94.6-cdh4.3.0-security.jar importtsv ERROR: Wrong number of arguments: 0 Usage: importtsv -Dimporttsv.columns=a,b,c
La riga di comando che useremo è la seguente:
hadoop jar /usr/lib/hbase/hbase-0.94.6-cdh4.3.0- security.jar importtsv -Dimporttsv.separator=, -Dimporttsv.bulk.output=output -Dimporttsv.columns=HBASE_ROW_KEY,f:count wordcount word_count.csv
Ecco una carrellata dei diversi elementi di configurazione:
- -Dimporttsv.separator=, specifica che il separatore è una virgola.
- -Dimporttsv.bulk.output=output è un percorso relativo a dove verranno scritti gli HFiles. Poiché il tuo utente sulla VM è "cloudera" per impostazione predefinita, significa che i file saranno in /user/cloudera/output. Saltare questa opzione farà scrivere il lavoro direttamente su HBase.
- -Dimporttsv.columns=HBASE_ROW_KEY,f:count è un elenco di tutte le colonne contenute in questo file. La chiave di riga deve essere identificata utilizzando la stringa HBASE_ROW_KEY in maiuscolo; altrimenti non avvierà il lavoro. (Ho deciso di utilizzare il qualificatore "count" ma potrebbe essere qualsiasi altra cosa.)
Il lavoro dovrebbe essere completato entro un minuto, date le dimensioni ridotte dell'input. Tieni presente che sono in esecuzione cinque riduttori, uno per regione. Ecco il risultato su HDFS:
-rw-r--r-- 3 cloudera cloudera 4265 2013-09-12 13:13 output/f/2c0724e0c8054b70bce11342dc91897b -rw-r--r-- 3 cloudera cloudera 3163 2013-09-12 13:14 output/f/786198ca47ae406f9be05c9eb09beb36 -rw-r--r-- 3 cloudera cloudera 2487 2013-09-12 13:14 output/f/9b0e5b2a137e479cbc978132e3fc84d2 -rw-r--r-- 3 cloudera cloudera 2961 2013-09-12 13:13 output/f/bb341f04c6d845e8bb95830e9946a914 -rw-r--r-- 3 cloudera cloudera 1336 2013-09-12 13:14 output/f/c656d893bd704260a613be62bddb4d5f
Come puoi vedere, i file attualmente appartengono all'utente "cloudera". Per caricarli dobbiamo cambiare il proprietario in "hbase" o HBase non avrà il permesso di spostare i file. Esegui il seguente comando:
sudo -u hdfs hdfs dfs -chown -R hbase:hbase/user/cloudera/output
Per il passaggio finale, dobbiamo utilizzare lo strumento completebulkload per indicare dove si trovano i file e su quali tabelle stiamo caricando:
hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles output wordcount
Tornando nella shell HBase, puoi eseguire il comando count che ti mostrerà quante righe sono state caricate. Se hai dimenticato di chown, il comando si bloccherà.
Lavoro MR personalizzato
Il caricatore di massa TSV è buono per la prototipazione, ma poiché interpreta tutto come stringhe e non supporta la manipolazione dei campi al momento della trasformazione, finirai per dover scrivere il tuo lavoro di MR. Il mio collega James Kinley, che lavora come architetto di soluzioni in Europa, ha scritto un lavoro del genere che useremo per il nostro prossimo esempio. I dati per il lavoro contengono messaggi pubblici di Facebook e Twitter relativi alle finali NBA 2010 (partita 1) tra Lakers e Celtics. Puoi trovare il codice qui. (La VM Quick Start viene fornita con git e maven installati in modo da poter clonare il repository su di essa.)
Guardando la classe Driver, i bit più importanti sono i seguenti:
job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(KeyValue.class); … // Auto configure partitioner and reducer HFileOutputFormat.configureIncrementalLoad(job, hTable);
Innanzitutto, il tuo Mapper deve produrre un ImmutableBytesWritable che contiene la chiave di riga e il valore di output può essere KeyValue, Put o Delete. Il secondo frammento mostra come configurare il riduttore; è infatti completamente gestito da HFileOutputFormat. configureIncrementalLoad() come descritto nella sezione "Trasformazione" in precedenza.
La classe HBaseKVMapper contiene solo il Mapper che rispetta la chiave di output e i valori configurati:
public class HBaseKVMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue> {
Per eseguirlo dovrai compilare il progetto usando Maven e prendere i file di dati seguendo i collegamenti nel README. (Contiene anche lo script della shell per creare la tabella.) Prima di iniziare il lavoro, non dimenticare di caricare i file su HDFS e di impostare il tuo percorso di classe in modo che sia a conoscenza di HBase perché questa volta non utilizzerai il suo jar :
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/etc/hbase/conf/:/usr/lib/hbase/*
Potrai avviare il lavoro utilizzando una riga di comando simile a questa:
hadoop jar hbase-examples-0.0.1-SNAPSHOT.jar com.cloudera.examples.hbase.bulkimport.Driver -libjars /home/cloudera/.m2/repository/joda-time/joda-time/2.1/joda-time-2.1.jar, /home/cloudera/.m2/repository/net/sf/opencsv/opencsv/2.3/opencsv-2.3.jar RowFeeder\ for\ Celtics\ and\ Lakers\ Game\ 1.csv output2 NBAFinal2010
Come puoi vedere, le dipendenze del lavoro devono essere aggiunte separatamente. Infine, puoi caricare i file cambiando prima il loro proprietario e poi eseguendo lo strumento completebulkload:
sudo -u hdfs hdfs dfs -chown -R hbase:hbase/user/cloudera/output2 hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles output2 NBAFinal2010
Potenziali problemi
Dati eliminati di recente che riappaiono. Questo problema si verifica quando un'eliminazione viene inserita tramite un caricamento di massa e viene compattata in modo significativo mentre il Put corrispondente è ancora in un MemStore. I dati saranno considerati cancellati quando il Delete si trova in un HFile ma, una volta rimosso durante la compattazione, il Put tornerà ad essere visibile. Se hai un caso d'uso del genere, considera la configurazione delle famiglie di colonne per mantenere le celle eliminate con KEEP_DELETED_CELLS nella shell o HColumnDescriptor.setKeepDeletedCells().
I dati caricati in blocco non possono essere sovrascritti da un altro caricamento in blocco. Questo problema si verifica quando due HFile caricati in blocco caricati in momenti diversi tentano di scrivere un valore diverso nella stessa cella, il che significa che hanno la stessa chiave di riga, famiglia, qualificatore e timestamp. Il risultato è che verrà restituito il primo valore inserito anziché il secondo. Questo bug verrà corretto in HBase 0.96.0 e CDH 5 (la prossima versione principale di CDH) e il lavoro viene svolto in HBASE-8521 per il ramo 0.94 e CDH 4.
Il caricamento in blocco provoca importanti compattazioni. Questo problema si verifica quando esegui carichi di massa incrementali e ci sono abbastanza file caricati in blocco per attivare una compattazione minore (la soglia predefinita è 3). Gli HFile vengono caricati con un numero di sequenza impostato su 0, quindi vengono prelevati per primi quando RegionServer seleziona i file per una compattazione e, a causa di un bug, selezionerà anche tutti i file rimanenti. Questo problema riguarderà seriamente coloro che hanno già grandi regioni (più GB) o che caricano spesso in blocco (ogni poche ore e meno) poiché molti dati verranno compattati. HBase 0.96.0 ha la correzione corretta e anche CDH 5; HBASE-8521 risolve il problema in 0.94 poiché agli HFile caricati in blocco viene ora assegnato un numero di sequenza corretto. HBASE-8283 può essere abilitato con hbase.hstore.useExploringCompation dopo 0.94.9 e CDH 4.4.0 per mitigare questo problema semplicemente essendo un algoritmo di selezione della compattazione più intelligente.
I dati caricati in blocco non vengono replicati . Poiché il caricamento in blocco ignora il percorso di scrittura, il WAL non viene scritto come parte del processo. La replica funziona leggendo i file WAL in modo da non vedere i dati caricati in blocco, e lo stesso vale per le modifiche che utilizzano Put.setWriteToWAL(true). Un modo per gestirlo è inviare i file non elaborati o gli HFile all'altro cluster ed eseguire l'altra elaborazione lì.
Conclusione
L'obiettivo di questo post del blog era di presentarti i concetti di base del caricamento in blocco di Apache HBase. Abbiamo spiegato come il processo è come eseguire ETL e che è molto meglio per i grandi set di dati rispetto all'utilizzo dell'API normale poiché ignora il percorso di scrittura. I due esempi sono stati inclusi per mostrare come semplici file TSV possono essere caricati in blocco su HBase e come scrivere il proprio Mapper per altri formati di dati.
Ora puoi provare a fare lo stesso usando un'interfaccia utente grafica tramite Hue.