HBase
 sql >> Database >  >> NoSQL >> HBase

Apache HBase + Apache Hadoop + Xceivers

Introduzione

Alcune delle proprietà di configurazione trovate in Apache Hadoop hanno un effetto diretto sui client, come Apache HBase. Una di queste proprietà si chiama "dfs.datanode.max.xcievers" e appartiene al sottoprogetto HDFS. Definisce il numero di thread lato server e, in una certa misura, socket utilizzati per le connessioni dati. L'impostazione di questo numero troppo basso può causare problemi durante la crescita o l'aumento dell'utilizzo del cluster. Questo post ti aiuterà a capire cosa succede tra il client e il server e come determinare un numero ragionevole per questa proprietà.

Il problema

Poiché HBase archivia tutto ciò di cui ha bisogno all'interno di HDFS, il limite superiore rigido imposto dalla proprietà di configurazione "dfs.datanode.max.xcievers" può comportare la disponibilità di risorse insufficienti per HBase, manifestandosi come IOException su entrambi i lati della connessione. Ecco un esempio dalla mailing list HBase [1], in cui i seguenti messaggi sono stati inizialmente registrati sul lato RegionServer:

2008-11-11 19:55:52,451 INFO org.apache.hadoop.dfs.DFSClient: Eccezione in createBlockOutputStream java.io.IOException:impossibile leggere dallo stream
2008-11-11 19:55:52,451 INFO org.apache.hadoop.dfs.DFSClient: abbandono del blocco blk_-5467014108758633036_595771
2008-11- 11 19:55:58,455 WARN org.apache.hadoop.dfs.DFSClient: DataStreamer Eccezione:java.io.IOException:Impossibile creare un nuovo blocco.
2008-11-11 19:55:58,455 WARN org.apache .hadoop.dfs.DFSClient:Error Recovery for block blk_-5467014108758633036_595771 bad datanode[0]
2008-11-11 19:55:58,482 FATAL org.apache.hadoop.hbase.regionserver.Flusher:è richiesta la riproduzione dell'hlog . Forzare lo spegnimento del server

La correlazione con i log di Hadoop DataNode ha rivelato la seguente voce:

ERRORE org.apache.hadoop.dfs.DataNode: DatanodeRegistration(10.10.10.53:50010,storageID=DS-1570581820-10.10.10.53-50010-1224117842339,infoPort=50075, ipcPort=50020):DataXceiver:java.io.IOException: xceiverCount 258 supera il limite di xciever simultanei 256

In questo esempio, il valore basso di "dfs.datanode.max.xcievers" per i DataNode ha causato la chiusura dell'intero RegionServer. Questa è davvero una brutta situazione. Sfortunatamente, non esiste una regola rigida che spieghi come calcolare il limite richiesto. Si consiglia comunemente di aumentare il numero dal valore predefinito di 256 a qualcosa come 4096 (vedere [1], [2], [3], [4] e [5] per riferimento). Questo viene fatto aggiungendo questa proprietà al file hdfs-site.xml di tutti i DataNode (notare che è scritto in modo errato):

    dfs.datanode.max.xcievers
4096

Nota:dovrai riavviare i tuoi DataNode dopo aver apportato questa modifica al file di configurazione.

Questo dovrebbe aiutare con il problema di cui sopra, ma potresti comunque voler sapere di più su come funziona tutto insieme e cosa sta facendo HBase con queste risorse. Ne discuteremo nel resto di questo post. Ma prima di farlo, dobbiamo essere chiari sul motivo per cui non puoi semplicemente impostare questo numero molto alto, diciamo 64K e farla finita.

C'è una ragione per un limite superiore, ed è duplice:in primo luogo, i thread hanno bisogno del proprio stack, il che significa che occupano memoria. Per i server attuali ciò significa 1 MB per thread[6] per impostazione predefinita. In altre parole, se si utilizzano tutti i 4096 thread DataXceiver, sono necessari circa 4 GB di heap per ospitarli. Ciò riduce lo spazio assegnato ai memstore e alle cache dei blocchi, nonché a tutte le altre parti mobili della JVM. Nel peggiore dei casi, potresti imbatterti in un'OutOfMemoryException e il processo RegionServer è toast. Vuoi impostare questa proprietà su un numero ragionevolmente alto, ma nemmeno troppo alto.

In secondo luogo, avendo questi numerosi thread attivi vedrai anche la tua CPU sempre più caricata. Ci saranno molti cambi di contesto per gestire tutto il lavoro simultaneo, il che sottrae risorse per il lavoro reale. Come per le preoccupazioni sulla memoria, vuoi che il numero di thread non cresca all'infinito, ma fornisca un limite superiore ragionevole, ed è a questo che serve "dfs.datanode.max.xcievers".

Dettagli del file system Hadoop

Dal lato client, la libreria HDFS fornisce l'astrazione denominata Path. Questa classe rappresenta un file in un file system supportato da Hadoop, rappresentato dalla classe FileSystem. Esistono alcune implementazioni concrete della classe astratta FileSystem, una delle quali è DistributedFileSytem, ​​che rappresenta HDFS. Questa classe a sua volta racchiude l'effettiva classe DFSClient che gestisce tutte le interazioni con i server remoti, ovvero il NameNode e i numerosi DataNode.

Quando un client, come HBase, apre un file, lo fa, ad esempio, chiamando i metodi open() o create() della classe FileSystem, qui le incarnazioni più semplicistiche

  public DFSInputStream open(String src) genera IOException
public FSDataOutputStream create(Path f) genera IOException

L'istanza di flusso restituita è ciò che richiede un socket e un thread lato server, che vengono utilizzati per leggere e scrivere blocchi di dati. Fanno parte del contratto per lo scambio di dati tra il client e il server. Nota che ci sono altri protocolli basati su RPC in uso tra le varie macchine, ma ai fini di questa discussione possono essere ignorati.

L'istanza stream restituita è una classe DFSOutputStream o DFSInputStream specializzata, che gestisce tutte le interazioni con NameNode per capire dove risiedono le copie dei blocchi e la comunicazione di dati per blocco per DataNode.

Sul lato server, DataNode esegue il wrapping di un'istanza di DataXceiverServer, che è la classe effettiva che legge la chiave di configurazione sopra e genera anche l'eccezione sopra quando viene superato il limite.

All'avvio di DataNode, crea un gruppo di thread e avvia l'istanza DataXceiverServer citata in questo modo:

  this.threadGroup =new ThreadGroup(“dataXceiverServer”);
this.dataXceiverServer =new Daemon( threadGroup,
new DataXceiverServer(ss, conf, this));
this.threadGroup.setDaemon(true); // distruggi automaticamente quando è vuoto

Tieni presente che il thread DataXceiverServer occupa già un punto del gruppo di thread. Il DataNode ha anche questa classe interna per recuperare il numero di thread attualmente attivi in ​​questo gruppo:

  /** Numero di ricevitori simultanei per nodo. */
int getXceiverCount() {
return threadGroup ==null ? 0 :threadGroup.activeCount();
}

La lettura e la scrittura di blocchi, avviati dal client, determina la creazione di una connessione, che viene racchiusa dal thread DataXceiverServer in un'istanza DataXceiver. Durante questo passaggio di consegne, viene creato un thread e registrato nel gruppo di thread sopra. Quindi per ogni operazione di lettura e scrittura attiva viene tracciato un nuovo thread sul lato server. Se il conteggio dei thread nel gruppo supera il massimo configurato, la suddetta eccezione viene generata e registrata nei log di DataNode:

  if (curXceiverCount> dataXceiverServer.maxXceiverCount) {
throw new IOException(“xceiverCount ” + curXceiverCount
+ ” supera il limite di xciever simultanei ”
+ dataXceiverServer.maxXceiverCount);
}

Implicazioni per i clienti

Ora, la domanda è:in che modo la lettura e la scrittura del client sono correlate ai thread lato server. Prima di entrare nei dettagli, però, utilizziamo le informazioni di debug che la classe DataXceiver registra quando viene creata e chiusa

  LOG.debug("Il numero di connessioni attive è:" + datanode.getXceiverCount());

LOG.debug(datanode.dnRegistration + “:Il numero di connessioni attive è:”     + datanode.getXceiverCount());

e monitora durante l'avvio di HBase cosa è registrato sul DataNode. Per semplicità, questo viene fatto su una configurazione pseudo distribuita con una singola istanza DataNode e RegionServer. Quanto segue mostra la parte superiore della pagina di stato di RegionServer.

La parte importante è nella sezione "Metriche", dove dice "storefiles=22". Quindi, supponendo che HBase abbia almeno tanti file da gestire, oltre ad alcuni file extra per il registro write-ahead, dovremmo vedere lo stato del messaggio dei registri sopra che abbiamo almeno 22 "connessioni attive". Avviamo HBase e controlliamo i file di registro di DataNode e RegionServer:

Riga di comando:

$ bin/start-hbase.sh

Registro DataNode:

2012-03-05 13:01:35,309 DEBUG org.apache.hadoop.hdfs.server.datanode. DataNode:il numero di connessioni attive è:1
2012-03-05 13:01:35,315 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:DatanodeRegistration(127.0.0.1:50010, storageID=DS- 1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):il numero di connessioni attive è:2
12/03/05 13:01:35 INFO regionserver.MemStoreFlusher:globalMemStoreLimit=396.7m, globalMemStoreLimitLowMark=347.1m, maxHeap=991.7m
03/12/05 13:01:39 INFO http.HttpServer:La porta restituita da webServer.getConnectors()[0].getLocalPort() prima che open() sia -1 . Apertura del listener su 60030
2012-03-05 13:01:40,003 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:Il numero di connessioni attive è:1
12/03/05 13:01:40 INFO regionserver.HRegionServer:Ricevuta richiesta per aprire la regione:-ROOT-,,0.70236052
2012-03-05 13:01:40,882 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode :Il numero di connessioni attive è:3
2012-03-05 13:01:40,884 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448 -10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):il numero di connessioni attive è:4
2012-03-05 13:01:40,888 DEBUG org.apache.hadoop.hdfs.server. datanode.DataNode:Il numero di connessioni attive è:3

03/12/05 13:01:40 INFO regionserver.HRegion:Onlined -ROOT-,,0.70236052; next sequenceid=63083
2012-03-05 13:01:40,982 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:Il numero di connessioni attive è:3
2012-03-05 13 :01:40,983 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=5002):Il numero di connessioni attive è:4

03/12/05 13:01:41 INFO regionserver.HRegionServer:Ricevuta richiesta per aprire la regione:.META.,,1.1028785192
2012-03 -05 13:01:41,026 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:Il numero di connessioni attive è:3
2012-03-05 13:01:41,027 DEBUG org.apache.hadoop. hdfs.server.datanode.DataNode:DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):il numero di connessioni attive è:4

12/03/05 13:01:41 INFO regionserver.HRegione:Onlined .META.,,1.1028785192; next sequenceid=63082
2012-03-05 13:01:41,109 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:Il numero di connessioni attive è:3
2012-03-05 13 :01:41,114 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:Il numero di connessioni attive è:4
2012-03-05 13:01:41,117 DEBUG org.apache.hadoop.hdfs.server .datanode.DataNode:Il numero di connessioni attive è:5
12/03/05 13:01:41 INFO regionserver.HRegionServer:Ricevuta richiesta per aprire 16 regioni
12/03/05 13 :01:41 INFO regionserver.HRegionServer:Ricevuta richiesta per aprire la regione:usertable,,1330944810191.62a312d67981c86c42b6bc02e6ec7e3f.
03/12/05 13:01:41 INFO regionserver.HRegionServer:Ricevuta richiesta per aprire la regione:usertable,user112031 1330944810191.90d287473fe223f0ddc137020efda25d.

2012-03-05 13:01:41,246 DEBUG org.apache.hadoop.hdfs.server.datanode. DataNode:il numero di connessioni attive è:6
2012-03-05 13:01:41,248 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:il numero di connessioni attive è:7
...
2012-03-05 13:01:41,257 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772 , infoPort=50075, ipcPort=50020):Il numero di connessioni attive è:10
2012-03-05 13:01:41,257 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:DatanodeRegistration(127.0. 0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):il numero di connessioni attive è:9

03/12/05 13:01:41 INFO regionserver.HRegion:tabella utente online,user1120311784,1330944810191.90d287473fe223f0ddc137020efda25d.; next sequenceid=62917
12/03/05 13:01:41 INFO regionserver.HRegion:Onlined usertable,,1330944810191.62a312d67981c86c42b6bc02e6ec7e3f.; next sequenceid=62916

12/03/05 13:01:41 INFO regionserver.HRegion:Onlined usertable,user1361265841,1330944811370.80663fcf291e3ce00080599964f406ba.; next sequenceid=62919
2012-03-05 13:01:41,474 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:Il numero di connessioni attive è:6
2012-03-05 13 :01:41,491 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:Il numero di connessioni attive è:7
2012-03-05 13:01:41,495 DEBUG org.apache.hadoop.hdfs.server .datanode.DataNode:DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):il numero di connessioni attive è:8
2012-03 -05 13:01:41,508 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:Il numero di connessioni attive è:7

03/12/05 13:01:41 INFO regionserver .HRegion:tabella utente in linea,user1964968041,1330944848231.dd89596e9129e1caa7e07f8a491c9734.; next sequenceid=62920
2012-03-05 13:01:41,618 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:Il numero di connessioni attive è:6
2012-03-05 13 :01:41,621 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=5002):Il numero di connessioni attive è:7

2012-03-05 13:01:41,829 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:DatanodeRegistration(127.0.0.1:50010, storageID =DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Il numero di connessioni attive è:7
12/03/05 13:01:41 INFO regionserver.HRegion:Onlined usertable ,utente515290649,1330944849739.d23924dc9e9d5891f332c337977af83d.; next sequenceid=62926
2012-03-05 13:01:41,832 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:Il numero di connessioni attive è:6
2012-03-05 13 :01:41,838 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=5002):Il numero di connessioni attive è:7
12/03/05 13:01:41 INFO regionserver.HRegion:Onlined usertable,user757669512,1330944850808.cd0d6f16d8ae9cf0c9277f5d6c6c6b9f.; next sequenceid=62929

2012-03-05 14:01:39,711 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:il numero di connessioni attive è:4
2012 -03-05 22:48:41,945 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):il numero di connessioni attive è:4
12/03/05 22:48:41 INFO regionserver.HRegion:Onlined usertable,user757669512,1330944850808.cd0d6f16d8ae9cf0c9277f5d6c6c6b9f.; next sequenceid=62929
2012-03-05 22:48:41,963 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64 -50010-1321352233772, infoPort=50075, ipcPort=50020):il numero di connessioni attive è:4

Puoi vedere come le regioni vengono aperte una dopo l'altra, ma quello che potresti anche notare è che il numero di connessioni attive non sale mai a 22, arriva a malapena a 10. Perché? Per capirlo meglio, dobbiamo vedere come i file in HDFS vengono mappati all'istanza di DataXceiver lato server e i thread effettivi che rappresentano.

Immersione profonda di Hadoop

I summenzionati DFSInputStream e DFSOutputStream sono davvero delle facciate attorno ai soliti concetti di flusso. Avvolgono la comunicazione client-server in queste interfacce Java standard, mentre internamente instradano il traffico a un DataNode selezionato, che è quello che contiene una copia del blocco corrente. Ha la libertà di aprire e chiudere queste connessioni secondo necessità. Quando un client legge un file in HDFS, le classi della libreria client passano in modo trasparente da un blocco all'altro, e quindi da DataNode a DataNode, quindi deve aprire e chiudere le connessioni secondo necessità.

Il DFSInputStream ha un'istanza di una classe DFSClient.BlockReader che apre la connessione al DataNode. L'istanza stream chiama blockSeekTo() per ogni chiamata a read() che si occupa dell'apertura della connessione, se non ce n'è già una. Una volta che un blocco è stato letto completamente, la connessione viene chiusa. La chiusura dello stream ha ovviamente lo stesso effetto.

Il DFSOutputStream ha una classe helper simile, il DataStreamer. Tiene traccia della connessione al server, che viene avviata dal metodo nextBlockOutputStream(). Ha ulteriori classi interne che aiutano a scrivere i dati del blocco, che omettiamo qui per brevità.

Sia i blocchi di scrittura che quelli di lettura richiedono un thread per contenere il socket e i dati intermedi sul lato server, racchiusi nell'istanza DataXceiver. A seconda di ciò che sta facendo il tuo client, vedrai il numero di connessioni fluttuare attorno al numero di file attualmente a cui si accede in HDFS.

Tornando all'enigma di HBase sopra:il motivo per cui non vedi fino a 22 (e più) connessioni durante l'avvio è che mentre le regioni si aprono, l'unico dato richiesto è il blocco di informazioni di HFile. Questo blocco viene letto per ottenere dettagli vitali su ciascun file, ma poi viene chiuso di nuovo. Ciò significa che la risorsa lato server viene rilasciata in rapida successione. Le restanti quattro connessioni sono più difficili da determinare. Puoi usare JStack per scaricare tutti i thread sul DataNode, che in questo esempio mostra questa voce:

"DataXceiver for client /127.0.0.1:64281 [sending block blk_5532741233443227208_4201]" daemon prio=5 tid=7fb96481d000 nid=0x1178b4000 eseguibile [1178b3000]
java.lang.Thread.State:RUNNABLE

"DataXceiver for client /127.0.0.1:64172 [receiving block blk_-2005512129579433420_4199 client=DFSClient_hb_rs_10.0.0.29 ,60020,1330984111693_1330984118810]” daemon prio=5 tid=7fb966109000 nid=0x1169cb000 eseguibile [1169ca000]
java.lang.Thread.State:RUNNABLE

Queste sono le uniche voci DataXceiver (in questo esempio), quindi il conteggio nel gruppo di thread è un po' fuorviante. Ricordiamo che il thread del demone DataXceiverServer rappresenta già una voce in più, che combinata con le due precedenti rappresenta le tre connessioni attive, il che in realtà significa tre thread attivi. Il motivo per cui il registro indica invece quattro è che registra il conteggio da un thread attivo che sta per terminare. Quindi, poco dopo che il conteggio di quattro è stato registrato, in realtà è uno in meno, cioè tre e quindi corrisponde al nostro numero di thread attivi.

Si noti inoltre che le classi helper interne, ad esempio PacketResponder, occupano un altro thread nel gruppo mentre sono attive. L'output di JStack indica questo fatto, elencando il thread come tale:

 "PacketResponder 0 for Block blk_-2005512129579433420_4199" daemon prio=5 tid=7fb96384d000 nid=0x116ace000 in Object.wait () [116acd000]
java.lang.Thread.State:TIMED_WAITING (su oggetto monitor)
at java.lang.Object.wait (metodo nativo)
at org.apache.hadoop. hdfs.server.datanode.BlockReceiver$PacketResponder \
.lastDataNodeRun(BlockReceiver.java:779)
– bloccato (un org.apache.hadoop.hdfs.server.datanode.BlockReceiver$PacketResponder)
su org.apache.hadoop.hdfs.server.datanode.BlockReceiver$PacketResponder.run(BlockReceiver.java:870)
su java.lang.Thread.run(Thread.java:680)

Questo thread è attualmente nello stato TIMED_WAITING e non è considerato attivo. Ecco perché il conteggio emesso dalle istruzioni di registro DataXceiver non include questo tipo di thread. Se diventano attivi a causa dell'invio di dati da parte del client, il conteggio dei thread attivi aumenterà nuovamente. Un'altra cosa da notare è che questo thread non ha bisogno di una connessione separata, o socket, tra il client e il server. Il PacketResponder è solo un thread sul lato server per ricevere i dati del blocco e trasmetterli al successivo DataNode nella pipeline di scrittura.

Il comando Hadoop fsck ha anche un'opzione per segnalare quali file sono attualmente aperti per la scrittura:

$ hadoop fsck /hbase -openforwrite
FSCK iniziato da larsgeorge da /10.0.0.29 per il percorso / hbase a lun mar 05 22:59:47 CET 2012
……/hbase/.logs/10.0.0.29,60020,1330984111693/10.0.0.29%3A60020.1330984118842 0 byte, 1 blocco(i), OPENFORWRITE:………………………………..Stato:SANO
Dimensione totale:     2088783626 B
Dir totali:     54
File totali:   45

Ciò non si riferisce immediatamente a un thread lato server occupato, poiché questi sono allocati dall'ID blocco. Ma puoi dedurne che c'è un blocco aperto per la scrittura. Il comando Hadoop ha opzioni aggiuntive per stampare i file effettivi e l'ID blocco di cui sono composti:

$ hadoop fsck /hbase -files -blocks
FSCK iniziato da larsgeorge da /10.0.0.29 per percorso /hbase a mar 06 mar 10:39:50 CET 2012


/hbase/.META./1028785192/.tmp


/hbase/.META./1028785192/info
/hbase/.META./1028785192/info/4027596949915293355 36517 byte, 1 blocco/i:  OK
0. blk_5532741233443227208_4201 len=36517 repl=1


Stato:HEALTHY
Dimensione totale:     2088788703 B
Total dirs :     54
File totali:     45 (File attualmente in fase di scrittura:1)
Blocchi totali (convalidati):     64 (dimensione media del blocco 32637323 B) (Blocchi di file aperti totali (non convalidati):1)
Blocchi minimamente replicati:     64 (100,0 %)

Questo ti dà due cose. Innanzitutto, il riepilogo afferma che esiste un blocco di file aperto al momento dell'esecuzione del comando, corrispondente al conteggio riportato dall'opzione "-openforwrite" sopra. In secondo luogo, l'elenco dei blocchi accanto a ciascun file consente di abbinare il nome del thread al file che contiene il blocco a cui si accede. In questo esempio il blocco con l'ID “blk_5532741233443227208_4201” viene inviato dal server al client, qui un RegionServer. Questo blocco appartiene all'HBase .META. tabella, come mostrato dall'output del comando Hadoop fsck. La combinazione di JStack e fsck può fungere da povero sostituto di lsof (uno strumento sulla riga di comando di Linux per "elencare i file aperti").

Il JStack segnala anche che esiste un thread DataXceiver, con un PacketResponder di accompagnamento, per l'ID blocco "blk_-2005512129579433420_4199", ma questo ID manca dall'elenco dei blocchi riportati da fsck. Questo perché il blocco non è ancora terminato e quindi non disponibile per i lettori. In altre parole, Hadoop fsck segnala solo blocchi completi (o sincronizzati[7][8], per le versioni Hadoop che supportano questa funzione).

Torna a HBase

L'apertura di tutte le regioni non richiede tutte le risorse sul server che ti saresti aspettato. Tuttavia, se esegui la scansione dell'intera tabella HBase, forzi HBase a leggere tutti i blocchi in tutti i file H:

Shell HBase:

hbase(main):003:0> scan 'usertable'

1000000 riga/e in 1460,3120 secondi

Registro DataNode:

2012-03-05 14:42:20,580 DEBUG org.apache.hadoop.hdfs.server.datanode. DataNode:il numero di connessioni attive è:6
2012-03-05 14:43:23,293 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:il numero di connessioni attive è:7
2012 -03-05 14:43:23,299 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):il numero di connessioni attive è:8

2012-03-05 14:49:24,332 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:DatanodeRegistration(127.0. 0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):il numero di connessioni attive è:11
2012-03-05 14:49:24,332 DEBUG org .apache.hadoop.hdfs.server.datanode.DataNode:Il numero di connessioni attive è:10
2012-03-05 14:49:59,987 DEBUG org.apache.hadoop.hdfs.server.datanod e.DataNode:il numero di connessioni attive è:11
2012-03-05 14:51:12,603 ​​DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):il numero di connessioni attive è:12
2012-03-05 14:51:12,605 DEBUG org.apache.hadoop.hdfs .server.datanode.DataNode:Il numero di connessioni attive è:11
2012-03-05 14:51:46,473 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:Il numero di connessioni attive è:12

2012-03-05 14:56:59,420 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:Il numero di connessioni attive è:15
2012-03-05 14:57:31,722 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:Il numero di connessioni attive è:16
2012-03-05 14:58:24,909 DEBUG org.apache.hadoop.hdfs. server.datanode.DataNode:DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of act ive connessioni è:17
2012-03-05 14:58:24,910 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:Il numero di connessioni attive è:16

2012-03-05 15:04:17,688 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:Il numero di connessioni attive è:21
2012-03-05 15:04:17,689 DEBUG org.apache .hadoop.hdfs.server.datanode.DataNode:DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):il numero di connessioni attive è:22
2012-03-05 15:04:54,545 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:Il numero di connessioni attive è:21
2012-03-05 15:05:55,901 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):il numero di connessioni attive è :22
2012-03-05 15:05:55,901 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:Il numero di connessioni attive è:21

Il numero di connessioni attive raggiunge ora le 22 inafferrabili. Nota che questo conteggio include già il thread del server, quindi siamo ancora un po' a corto di quello che potremmo considerare il massimo teorico, basato sul numero di file che HBase deve gestire.

Cosa significa tutto questo?

Quindi, di quanti "xcievers (sic)" hai bisogno? Dato che usi solo HBase, puoi semplicemente monitorare la metrica "storefiles" sopra (che ottieni anche tramite Ganglia o JMX) e aggiungere una piccola percentuale per i file di registro intermedi e write-ahead. Questo dovrebbe funzionare per i sistemi in movimento. Tuttavia, se dovessi determinare quel numero su un sistema inattivo e completamente compattato e presumere che sia il massimo, potresti scoprire che questo numero è troppo basso una volta che inizi ad aggiungere più file di archivio durante i normali svuotamenti del memstore, cioè non appena inizi a aggiungere dati alle tabelle HBase. Oppure, se utilizzi anche MapReduce sullo stesso cluster, l'aggregazione del registro Flume e così via. Dovrai tenere conto di quei file extra e, cosa più importante, aprire blocchi per la lettura e la scrittura.

Nota ancora che gli esempi in questo post utilizzano un singolo DataNode, qualcosa che non avrai su un cluster reale. A tal fine, dovrai dividere il numero totale di file di archivio (secondo la metrica HBase) per il numero di DataNode che hai. Se, ad esempio, hai un conteggio di file store di 1000 e il tuo cluster ha 10 DataNode, dovresti essere a posto con il valore predefinito di 256 thread xceiver per DataNode.

Il caso peggiore sarebbe il numero di tutti i lettori e scrittori attivi, cioè quelli che stanno attualmente inviando o ricevendo dati. Ma poiché questo è difficile da determinare in anticipo, potresti prendere in considerazione la possibilità di creare una riserva decente. Inoltre, poiché il processo di scrittura ha bisogno di un thread aggiuntivo, sebbene di durata più breve (per PacketResponder), devi tenerne conto. Quindi una formula ragionevole, ma piuttosto semplicistica potrebbe essere:

Questa formula tiene conto del fatto che sono necessari circa due thread per uno scrittore attivo e un altro per un lettore attivo. Questo viene quindi sommato e diviso per il numero di DataNode, poiché devi specificare "dfs.datanode.max.xcievers" per DataNode.

Se ritorni allo screenshot HBase RegionServer sopra, hai visto che c'erano 22 file di archivio. These are immutable and will only be read, or in other words occupy one thread only. For all memstores that are flushed to disk you need two threads – but only until they are fully written. The files are finalized and closed for good, cleaning up any thread in the process. So these come and go based on your flush frequency. Same goes for compactions, they will read N files and write them into a single new one, then finalize the new file. As for the write-ahead logs, these will occupy a thread once you have started to add data to any table. There is a log file per server, meaning that you can only have twice as many active threads for these files as you have RegionServers.

For a pure HBase setup (HBase plus its own HDFS, with no other user), we can estimate the number of needed DataXceiver’s with the following formula:

Since you will be hard pressed to determine the active number of store files, flushes, and so on, it might be better to estimate the theoretical maximum instead. This maximum value takes into account that you can only have a single flush and compaction active per region at any time. The maximum number of logs you can have active matches the number of RegionServers, leading us to this formula:

Obviously, the number of store files will increase over time, and the number of regions typically as well. Same for the numbers of servers, so keep in mind to adjust this number over time. In practice, you can add a buffer of, for example, 20%, as shown in the formula below – in an attempt to not force you to change the value too often.

On the other hand, if you keep the number of regions fixed per server[9], and rather split them manually, while adding new servers as you grow, you should be able to keep this configuration property stable for each server.

Final Advice &TL;DR

Here is the final formula you want to use:

It computes the maximum number of threads needed, based on your current HBase vitals (no. of store files, regions, and region servers). It also adds a fudge factor of 20% to give you room for growth. Keep an eye on the numbers on a regular basis and adjust the value as needed. You might want to use Nagios with appropriate checks to warn you when any of the vitals goes over a certain percentage of change.

Note:Please make sure you also adjust the number of file handles your process is allowed to use accordingly[10]. This affects the number of sockets you can use, and if that number is too low (default is often 1024), you will get connection issues first.

Finally, the engineering devil on one of your shoulders should already have started to snicker about how horribly non-Erlang-y this is, and how you should use an event driven approach, possibly using Akka with Scala[11] – if you want to stay within the JVM world. Bear in mind though that the clever developers in the community share the same thoughts and have already started to discuss various approaches[12][13].

Links:

  • [1] http://old.nabble.com/Re%3A-xceiverCount-257-exceeds-the-limit-of-concurrent-xcievers-256-p20469958.html
  • [2] http://ccgtech.blogspot.com/2010/02/hadoop-hdfs-deceived-by-xciever.html
  • [3] https://issues.apache.org/jira/browse/HDFS-1861 “Rename dfs.datanode.max.xcievers and bump its default value”
  • [4] https://issues.apache.org/jira/browse/HDFS-1866 “Document dfs.datanode.max.transfer.threads in hdfs-default.xml”
  • [5] http://hbase.apache.org/book.html#dfs.datanode.max.xcievers
  • [6] http://www.oracle.com/technetwork/java/hotspotfaq-138619.html#threads_oom
  • [7] https://issues.apache.org/jira/browse/HDFS-200 “In HDFS, sync() not yet guarantees data available to the new readers”
  • [8] https://issues.apache.org/jira/browse/HDFS-265 “Revisit append”
  • [9] http://search-hadoop.com/m/CBBoV3z24H1 “HBase, mail # user – region size/count per regionserver”
  • [10] http://hbase.apache.org/book.html#ulimit “ulimit and nproc”
  • [11] http://akka.io/ “Akka”
  • [12] https://issues.apache.org/jira/browse/HDFS-223 “Asynchronous IO Handling in Hadoop and HDFS”
  • [13] https://issues.apache.org/jira/browse/HDFS-918 “Use single Selector and small thread pool to replace many instances of BlockSender for reads”