HBase
 sql >> Database >  >> NoSQL >> HBase

Sincronizzazione dei dati dei cluster HBase con lo strumento HashTable/SyncTable

La replica (trattata in questo precedente articolo del blog) è stata rilasciata da un po' ed è tra le funzionalità più utilizzate di Apache HBase. Avere cluster che replicano i dati con peer diversi è un'implementazione molto comune, sia come strategia di ripristino di emergenza o semplicemente come un modo semplice per replicare i dati tra ambienti di produzione/staging/sviluppo. Sebbene sia un modo efficiente per mantenere sincronizzati diversi database HBase con una latenza inferiore al secondo, la replica funziona solo sui dati acquisiti dopo che la funzionalità è stata abilitata. Ciò significa che tutti i dati preesistenti su tutti i cluster coinvolti nella distribuzione della replica dovranno comunque essere copiati tra i peer in qualche altro modo. Esistono diversi strumenti che possono essere utilizzati per sincronizzare i dati preesistenti su diversi cluster di peer. Snapshots, BulkLoad, CopyTable sono esempi ben noti di tali strumenti trattati nei precedenti post del blog di Cloudera. In questo articolo tratteremo HashTable/SyncTable, dettagliando alcune delle sue logiche di implementazione interna, i pro ei contro del suo utilizzo e come si confronta con alcune delle altre tecniche di copia dei dati menzionate sopra.

HashTable/SyncTable in breve

HashTable/SyncTable è uno strumento implementato come due lavori di riduzione della mappa che vengono eseguiti come singoli passaggi. È simile a CopyTable strumento, che può eseguire copie di dati di tabelle sia parziali che intere. A differenza di CopyTable copia solo i dati divergenti tra i cluster di destinazione, risparmiando sia la rete che le risorse di calcolo durante la procedura di copia.

Il primo passaggio da eseguire nel processo è la HashTable lavoro di riduzione della mappa. Questo dovrebbe essere eseguito sul cluster i cui dati devono essere copiati sul peer remoto, normalmente il cluster di origine. Di seguito viene mostrato un rapido esempio di come eseguirlo, più avanti in questo articolo viene fornita una spiegazione dettagliata di ciascuno dei parametri richiesti: 

hbase org.apache.hadoop.hbase.mapreduce.HashTable --families=cf my-table /hashes/test-tbl
…
20/04/28 05:05:48 INFO mapreduce.Job:  map 100% reduce 100%
20/04/28 05:05:49 INFO mapreduce.Job: Job job_1587986840019_0001 completed successfully
20/04/28 05:05:49 INFO mapreduce.Job: Counters: 68
…
File Input Format Counters 
Bytes Read=0
File Output Format Counters 
Bytes Written=6811788

Una volta che la Tabella Hash l'esecuzione del lavoro con il comando precedente è stata completata, alcuni file di output sono stati generati nel sorgente hdfs /hash/my-table directory:

hdfs dfs -ls -R /hashes/test-tbl
drwxr-xr-x   - root supergroup          0 2020-04-28 05:05 /hashes/test-tbl/hashes
-rw-r--r--   2 root supergroup          0 2020-04-28 05:05 /hashes/test-tbl/hashes/_SUCCESS
drwxr-xr-x   - root supergroup          0 2020-04-28 05:05 /hashes/test-tbl/hashes/part-r-00000
-rw-r--r--   2 root supergroup    6790909 2020-04-28 05:05 /hashes/test-tbl/hashes/part-r-00000/data
-rw-r--r--   2 root supergroup      20879 2020-04-28 05:05 /hashes/test-tbl/hashes/part-r-00000/index
-rw-r--r--   2 root supergroup         99 2020-04-28 05:04 /hashes/test-tbl/manifest
-rw-r--r--   2 root supergroup        153 2020-04-28 05:04 /hashes/test-tbl/partitions

Questi sono necessari come input per la SyncTable correre. Tabella di sincronizzazione deve essere lanciato sul peer di destinazione. Il comando seguente esegue SyncTable per l'output di HashTable dall'esempio precedente. Utilizza il dryrun opzione spiegata più avanti in questo articolo:

hbase org.apache.hadoop.hbase.mapreduce.SyncTable --dryrun --sourcezkcluster=zk1.example.com,zk2.example.com,zk3.example.com:2181:/hbase hdfs://source-cluster-active-nn/hashes/test-tbl test-tbl test-tbl
…
org.apache.hadoop.hbase.mapreduce.SyncTable$SyncMapper$Counter
BATCHES=97148
HASHES_MATCHED=97146
HASHES_NOT_MATCHED=2
MATCHINGCELLS=17
MATCHINGROWS=2
RANGESNOTMATCHED=2
ROWSWITHDIFFS=2
SOURCEMISSINGCELLS=1
TARGETMISSINGCELLS=1

Come riferimento rapido, puoi semplicemente sostituire i parametri forniti su entrambi gli esempi con i valori ambientali effettivi. Il resto di questo articolo tratterà i dettagli di implementazione in modo più approfondito.

Perché due passaggi diversi?

L'obiettivo principale di questo strumento è identificare e copiare solo i dati mancanti tra i due cluster. Tabella Hash funziona come un lavoro di partizionamento orizzontale/indicizzazione, analizzando batch di dati della tabella e generando indici hash per ciascuno di questi lotti. Questi sono gli output scritti nei file sotto /hash/my-table directory hdfs passata come uno dei parametri del lavoro. Come accennato in precedenza, questo output è richiesto da SyncTable lavoro. Tabella di sincronizzazione esegue la scansione della tabella di destinazione in locale nelle stesse dimensioni batch utilizzate da HashTable, e calcola anche i valori hash per questi batch utilizzando la stessa funzione utilizzata da HashTable. Quindi confronta l'hash batch locale valore con quello della HashTable produzione. Se i valori hash sono uguali, significa che l'intero batch è identico nei due cluster e non è necessario copiare nulla su quel segmento. In caso contrario, apre una scansione per il batch nel cluster di origine, controllando se ciascuna delle celle esiste già nel cluster di destinazione, copiando solo quelle divergenti. Su set di dati sparsi e leggermente diversi, ciò comporterebbe la copia di molti meno dati tra i due cluster. Richiederebbe anche solo un piccolo numero di celle da scansionare nell'origine per verificare la mancata corrispondenza.

Parametri richiesti

Tabella Hash richiede solo due parametri:il nome della tabella e il percorso di output in cui verranno scritti gli hash correlati e altri meta file di informazioni. Tabella di sincronizzazione utilizza HashTable output dir come input, insieme ai nomi delle tabelle rispettivamente nel cluster di origine e nel cluster di destinazione. Dal momento che stiamo usando HashTable /Tabella di sincronizzazione per spostare i dati tra cluster remoti, sourcezkcluster l'opzione deve essere definita per SyncTable . Questo dovrebbe essere l'indirizzo del quorum di Zookeeper del cluster di origine. In questo esempio di articolo, abbiamo anche fatto riferimento direttamente all'indirizzo del namenode attivo del cluster di origine, in modo che SyncTable leggerà il file di output hash direttamente dal cluster di origine. In alternativa, Tabella Hash l'output può essere copiato manualmente dal cluster di origine al cluster remoto (con distcp, ad esempio).

NOTA:l'utilizzo di cluster remoti in diversi regni Kerberos è supportato solo da CDH 6.2.1 in poi.

Opzioni avanzate

Entrambi HashTable e SyncTable offrono opzioni opzionali extra che possono essere regolate per risultati ottimali.

Tabella Hash consente di filtrare i dati sia per chiave di riga che per ora di modifica, con startrow/starttime, stoprow/stoptime proprietà, rispettivamente. L'ambito del set di dati può anche essere limitato da versioni e famiglie proprietà. La dimensione batch La proprietà definisce la dimensione di ogni porzione che verrà sottoposta a hash. Ciò influisce direttamente sulle prestazioni di sincronizzazione. In casi di pochissime discrepanze, l'impostazione di valori di dimensioni batch maggiori può portare a prestazioni migliori poiché porzioni maggiori del set di dati verrebbero ignorate senza che sia necessario scansionare da SyncTable.

Tabella di sincronizzazione fornisce una corsa a secco opzione che consente un'anteprima delle modifiche da applicare nella destinazione.

Tabella di sincronizzazione il comportamento predefinito consiste nel rispecchiare i dati di origine sul lato di destinazione, quindi qualsiasi cella aggiuntiva presente nella destinazione ma assente nell'origine finisce per essere eliminata sul lato di destinazione. Ciò può essere indesiderabile durante la sincronizzazione di cluster in una configurazione di replica attivo-attivo e, in tali casi, doDeletes le opzioni possono essere impostate su false, saltando la replica delle eliminazioni sulla destinazione. C'è anche un simile doPuts flag per i casi in cui celle aggiuntive non devono essere inserite nel cluster di destinazione.

Analisi delle uscite

Tabella Hash genera alcuni file con metainformazioni per SyncTable, quelli, tuttavia, non sono leggibili dall'uomo. Non esegue alcuna modifica sui dati esistenti, quindi le informazioni correlate sono di scarso interesse per un contesto utente.

Tabella di sincronizzazione è il passaggio che applica realmente le modifiche al target e potrebbe essere importante rivederne il riepilogo prima di alterare effettivamente i dati del cluster target (vedi dryrun opzione sopra menzionata). Pubblica alcuni contatori rilevanti alla fine della mappa riduce l'esecuzione. Osservando i valori dell'esempio sopra, possiamo vedere che c'erano 97148 hash delle partizioni (segnalato da BATCHES contatore), che SyncTable rilevate divergenze solo in due di esse (secondo gli HASHES_MATCHED e HASHES_NOT_MACTHED contatori). Inoltre, all'interno delle due partizioni con hash diversi,  17 celle su 2 righe corrispondevano (come riportato da MATCHING_CELLS e MATCHING_ROWS, rispettivamente), ma c'erano anche 2 righe divergenti, su queste due partizioni (secondo RANGESNOTMATCHED e ROWSWITHDIFFS ). Infine, SOURCEMISSINGCELLS e TARGETMISSINGCELLS dicci in dettaglio se le celle erano presenti solo sul cluster di origine o di destinazione. In questo esempio, il cluster di origine aveva una cella che non era nella destinazione, ma la destinazione aveva anche una cella che non era nell'origine. Da SyncTable è stato eseguito senza specificare dryrun opzione e impostazione doDeletes opzione su falso , il lavoro ha eliminato la cella aggiuntiva nel cluster di destinazione e ha aggiunto la cella aggiuntiva trovata nell'origine al cluster di destinazione. Supponendo nessuna scrittura si verificano su entrambi i cluster, una successiva esecuzione della stessa SyncTable il comando nel cluster di destinazione non mostrerebbe differenze:

hbase org.apache.hadoop.hbase.mapreduce.SyncTable --sourcezkcluster=zk1.example.com,zk2.example.com,zk3.example.com:2181:/hbase hdfs://nn:9000/hashes/test-tbl test-tbl test-tbl
…
org.apache.hadoop.hbase.mapreduce.SyncTable$SyncMapper$Counter
BATCHES=97148
HASHES_MATCHED=97148
…

Scenari applicabili

Sincronizzazione dati

A prima vista, HashTable/SyncTable potrebbe sembrare sovrapporsi a CopyTable strumento, ma ci sono ancora scenari specifici in cui entrambi gli strumenti sarebbero più adatti. Come primo esempio di confronto, utilizzando HashTable/SyncTable per un caricamento iniziale di una tabella contenente 100.004 righe e una dimensione totale dei dati di 5,17 GB sono necessari pochi minuti solo per SyncTable per completare:

...
20/04/29 03:48:00 INFO mapreduce.Job: Running job: job_1587985272792_0011
20/04/29 03:48:09 INFO mapreduce.Job: Job job_1587985272792_0011 running in uber mode : false
20/04/29 03:48:09 INFO mapreduce.Job:  map 0% reduce 0%
20/04/29 03:54:08 INFO mapreduce.Job:  map 100% reduce 0%
20/04/29 03:54:09 INFO mapreduce.Job: Job job_1587985272792_0011 completed successfully
…
org.apache.hadoop.hbase.mapreduce.SyncTable$SyncMapper$Counter
BATCHES=97148
EMPTY_BATCHES=97148
HASHES_NOT_MATCHED=97148
RANGESNOTMATCHED=97148
ROWSWITHDIFFS=100004
TARGETMISSINGCELLS=749589
TARGETMISSINGROWS=100004

Anche su un set di dati così piccolo, CopyTable eseguito più rapidamente (circa 3 minuti, mentre SyncTable ci sono voluti 6 minuti per copiare l'intero set di dati):

...
20/04/29 05:12:07 INFO mapreduce.Job: Running job: job_1587986840019_0005
20/04/29 05:12:24 INFO mapreduce.Job: Job job_1587986840019_0005 running in uber mode : false
20/04/29 05:12:24 INFO mapreduce.Job:  map 0% reduce 0%
20/04/29 05:13:16 INFO mapreduce.Job:  map 25% reduce 0%
20/04/29 05:13:49 INFO mapreduce.Job:  map 50% reduce 0%
20/04/29 05:14:37 INFO mapreduce.Job:  map 75% reduce 0%
20/04/29 05:15:14 INFO mapreduce.Job:  map 100% reduce 0%
20/04/29 05:15:14 INFO mapreduce.Job: Job job_1587986840019_0005 completed successfully
…
HBase Counters
BYTES_IN_REMOTE_RESULTS=2787236791
BYTES_IN_RESULTS=5549784428
MILLIS_BETWEEN_NEXTS=130808
NOT_SERVING_REGION_EXCEPTION=0
NUM_SCANNER_RESTARTS=0
NUM_SCAN_RESULTS_STALE=0
REGIONS_SCANNED=4
REMOTE_RPC_CALLS=1334
REMOTE_RPC_RETRIES=0
ROWS_FILTERED=0
ROWS_SCANNED=100004
RPC_CALLS=2657
RPC_RETRIES=0

Ora utilizziamo di nuovo entrambi gli strumenti per gestire le differenze sparse nel set di dati. Il test-tbl la tabella utilizzata in tutti questi esempi ha quattro regioni nel cluster di origine. Dopo che tutto il set di dati originale è stato copiato nel cluster di destinazione nell'esempio precedente, abbiamo aggiunto quattro righe aggiuntive solo sul lato sorgente, una per ciascuna delle regioni esistenti, quindi abbiamo eseguito HashTable/SyncTable di nuovo per sincronizzare entrambi i cluster:

20/04/29 05:29:23 INFO mapreduce.Job: Running job: job_1587985272792_0013
20/04/29 05:29:39 INFO mapreduce.Job: Job job_1587985272792_0013 running in uber mode : false
20/04/29 05:29:39 INFO mapreduce.Job:  map 0% reduce 0%
20/04/29 05:29:53 INFO mapreduce.Job:  map 50% reduce 0%
20/04/29 05:30:42 INFO mapreduce.Job:  map 100% reduce 0%
20/04/29 05:30:42 INFO mapreduce.Job: Job job_1587985272792_0013 completed successfully
…
org.apache.hadoop.hbase.mapreduce.SyncTable$SyncMapper$Counter
BATCHES=97148
HASHES_MATCHED=97144
HASHES_NOT_MATCHED=4
MATCHINGCELLS=42
MATCHINGROWS=5
RANGESNOTMATCHED=4
ROWSWITHDIFFS=4
TARGETMISSINGCELLS=4
TARGETMISSINGROWS=4

Lo possiamo vedere con solo quattro partizioni non corrispondenti, SyncTable è stato notevolmente più veloce (circa un minuto per il completamento). Utilizzo di CopyTable per eseguire la stessa sincronizzazione ha mostrato i seguenti risultati:

20/04/29 08:32:38 INFO mapreduce.Job: Running job: job_1587986840019_0008
20/04/29 08:32:52 INFO mapreduce.Job: Job job_1587986840019_0008 running in uber mode : false
20/04/29 08:32:52 INFO mapreduce.Job:  map 0% reduce 0%
20/04/29 08:33:38 INFO mapreduce.Job:  map 25% reduce 0%
20/04/29 08:34:15 INFO mapreduce.Job:  map 50% reduce 0%
20/04/29 08:34:48 INFO mapreduce.Job:  map 75% reduce 0%
20/04/29 08:35:31 INFO mapreduce.Job:  map 100% reduce 0%
20/04/29 08:35:32 INFO mapreduce.Job: Job job_1587986840019_0008 completed successfully
…
HBase Counters
BYTES_IN_REMOTE_RESULTS=2762547723
BYTES_IN_RESULTS=5549784600
MILLIS_BETWEEN_NEXTS=340672
NOT_SERVING_REGION_EXCEPTION=0
NUM_SCANNER_RESTARTS=0
NUM_SCAN_RESULTS_STALE=0
REGIONS_SCANNED=4
REMOTE_RPC_CALLS=1323
REMOTE_RPC_RETRIES=0
ROWS_FILTERED=0
ROWS_SCANNED=100008
RPC_CALLS=2657
RPC_RETRIES=0

Copia tabella per sincronizzare le tabelle è stato necessario lo stesso tempo impiegato per la copia dell'intero set di dati, anche se ce n'erano solo quattro celle da copiare. Questo era ancora ok per questo set di dati molto piccolo e con un cluster inattivo, ma in casi d'uso di produzione con set di dati più grandi e in cui il cluster di destinazione potrebbe essere utilizzato anche da molte applicazioni client che scrivono dati su di esso, CopyTable peggioramento delle prestazioni rispetto a SyncTable sarebbe ancora più alto.

Vale la pena ricordare che ci sono anche strumenti/funzionalità aggiuntivi che potrebbero essere utilizzati in combinazione per un caricamento iniziale di un cluster di destinazione (la destinazione non ha alcun dato), come l'esportazione di snapshot, il caricamento in blocco o anche una copia diretta dell'originale directory della tabella dal cluster di origine. Per i caricamenti iniziali con grandi quantità di dati da copiare, acquisire uno snapshot di una tabella e quindi utilizzare ExportSnapshot strumento supererà gli strumenti di copia online come SyncTable o Copia tabella.

Verifica dell'integrità della replica

Un altro uso comune di HashTable/SyncTable è per il monitoraggio dello stato di replica tra cluster, durante la risoluzione di possibili problemi di replica. In questo scenario funziona come alternativa allo strumento VerifyReplication. In genere, quando si verifica lo stato tra due cluster, non si verificano discrepanze o un problema temporaneo in via opzionale ha causato la mancata sincronizzazione di una piccola parte del set di dati più grande. Nell'ambiente di test che abbiamo utilizzato per il nostro esempio precedente dovrebbero esserci 100.008 righe con valori corrispondenti su entrambi i cluster. L'esecuzione di SyncTable sul cluster di destinazione con l'opzione dryrun ci consentirà di identificare eventuali differenze:

20/05/04 10:47:25 INFO mapreduce.Job: Running job: job_1588611199158_0004
…
20/05/04 10:48:48 INFO mapreduce.Job:  map 100% reduce 0%
20/05/04 10:48:48 INFO mapreduce.Job: Job job_1588611199158_0004 completed successfully
…
HBase Counters
BYTES_IN_REMOTE_RESULTS=3753476784
BYTES_IN_RESULTS=5549784600
ROWS_SCANNED=100008
…
org.apache.hadoop.hbase.mapreduce.SyncTable$SyncMapper$Counter
BATCHES=97148
HASHES_MATCHED=97148
...

Unlike SyncTable we must run the VerifyReplication tool on the source cluster. We pass the peer id as one of its parameters so that it can find the remote cluster to scan for comparison:
20/05/04 11:01:58 INFO mapreduce.Job: Running job: job_1588611196128_0001
…
20/05/04 11:04:39 INFO mapreduce.Job:  map 100% reduce 0%
20/05/04 11:04:39 INFO mapreduce.Job: Job job_1588611196128_0001 completed successfully
…
HBase Counters
BYTES_IN_REMOTE_RESULTS=2761955495
BYTES_IN_RESULTS=5549784600
…

org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication$Verifier$Counters
GOODROWS=100008
...

Senza differenze, SyncTable trova tutti gli hash corrispondenti tra le partizioni di origine e di destinazione e quindi evita la necessità di eseguire nuovamente la scansione del cluster di origine remoto. VerifyReplication esegue un confronto uno per uno per ciascuna cella in entrambi i cluster che potrebbero già comportare un costo di rete elevato anche quando si tratta di set di dati così piccoli.

Aggiunta di una riga aggiuntiva nel cluster di origine ed esecuzione di nuovo dei controlli. Con VerifyReplication:

20/05/05 11:14:05 INFO mapreduce.Job: Running job: job_1588611196128_0004
…
20/05/05 11:16:32 INFO mapreduce.Job:  map 100% reduce 0%
20/05/05 11:16:32 INFO mapreduce.Job: Job job_1588611196128_0004 completed successfully
…
org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication$Verifier$Counters
BADROWS=1
GOODROWS=100008
ONLY_IN_SOURCE_TABLE_ROWS=1
…

Prima di poter utilizzare SyncTable, dobbiamo rigenerare nuovamente gli hash all'origine con HashTable, poiché ora è disponibile una nuova cella:

20/05/04 11:31:48 INFO mapreduce.Job: Running job: job_1588611196128_0003
…
20/05/04 11:33:15 INFO mapreduce.Job: Job job_1588611196128_0003 completed successfully
...

Ora SyncTable:

20/05/07 05:47:51 INFO mapreduce.Job: Running job: job_1588611199158_0014
…

20/05/07 05:49:20 INFO mapreduce.Job: Job job_1588611199158_0014 completed successfully

org.apache.hadoop.hbase.mapreduce.SyncTable$SyncMapper$Counter
BATCHES=97148
HASHES_NOT_MATCHED=97148
MATCHINGCELLS=749593
MATCHINGROWS=100008
RANGESMATCHED=97147
RANGESNOTMATCHED=1
ROWSWITHDIFFS=1
TARGETMISSINGCELLS=1
TARGETMISSINGROWS=1

Possiamo vedere l'aumento del tempo di esecuzione dovuto alla scansione aggiuntiva e al confronto delle celle tra i due cluster remoti. Nel frattempo, il tempo di esecuzione di VerifyReplication ha mostrato poche variazioni.

Conclusione

Tabella Hash/Tabella di sincronizzazione è uno strumento prezioso per spostare i dati quando si tratta di discrepanze sparse tra i set di dati di due cluster. Fa uso del partizionamento dei dati e dell'hashing per rilevare in modo efficiente le differenze sugli intervalli dei due set di dati, riducendo il numero di celle da scansionare mentre si confrontano i dati dei due cluster, evitando anche inutili inserimenti di valori già esistenti nel cluster di destinazione. Tuttavia, non è un proiettile d'argento, come dimostrato in alcuni degli esempi precedenti, mentre potrebbe sembrare sovrapporsi in funzione con la CopyTable strumento, quest'ultimo può offrire prestazioni migliori quando si gestisce un intervallo sequenziale più ampio di celle non corrispondenti. In questo senso, HashTable/SyncTable sarebbe più applicabile nei casi in cui entrambi i cluster hanno già alcuni dati residenti o nei casi in cui una configurazione di replica esistente è stata interrotta dalla temporanea indisponibilità di uno dei peer.

Articoli correlati:

https://blog.cloudera.com/apache-hbase-replication-overview/

https://blog.cloudera.com/approaches-to-backup-and-disaster-recovery-in-hbase/

https://blog.cloudera.com/online-apache-hbase-backups-with-copytable/

https://blog.cloudera.com/introduction-to-apache-hbase-snapshots/