HBase
 sql >> Database >  >> NoSQL >> HBase

Procedura:Scansione di tabelle Apache HBase salate con intervalli di chiavi specifici per regione in MapReduce

Grazie a Pengyu Wang, sviluppatore di software presso FINRA, per il permesso di ripubblicare questo post.

Le tabelle HBase Apache salate con pre-split sono una soluzione HBase comprovata ed efficace per fornire una distribuzione uniforme del carico di lavoro tra i RegionServer e prevenire gli hot spot durante le scritture di massa. In questo progetto, una chiave di riga è composta da una chiave logica più il sale all'inizio. Un modo per generare sale è calcolare n (numero di regioni) modulo sul codice hash della chiave della riga logica (data, ecc.).

Chiavi di salatura

Ad esempio, una tabella che accetta il caricamento di dati su base giornaliera potrebbe utilizzare chiavi di riga logiche che iniziano con una data e vogliamo suddividere in anticipo questa tabella in 1.000 regioni. In questo caso, prevediamo di generare 1.000 diversi sali. Il sale può essere generato, ad esempio, come:

StringUtils.leftPad(Integer.toString(Math.abs(keyCore.hashCode() % 1000)), 3, "0") + "|" + logicalKey 

logicalKey = 2015-04-26|abc
rowKey = 893|2015-04-26|abc

L'output di hashCode() con modulo fornisce la casualità per il valore del sale da "000" a "999". Con questa trasformazione chiave, la tabella viene pre-divisa sui confini del sale mentre viene creata. Ciò renderà i volumi di riga distribuiti uniformemente durante il caricamento degli HFiles con il bulkload di MapReduce. Garantisce che le chiavi di riga con lo stesso sale cadano nella stessa regione.

In molti casi d'uso, come l'archiviazione dei dati, è necessario eseguire la scansione o copiare i dati su un particolare intervallo di chiavi logiche (intervallo di date) utilizzando il processo MapReduce. I lavori MapReduce della tabella standard vengono impostati fornendo il Scan istanza con attributi di intervallo di chiavi.

Scan scan = new Scan();
scan.setCaching(1000);
scan.setCacheBlocks(false);
scan.setBatch(1000);
scan.setMaxVersions(1);
scan.setStartRow(Bytes.toBytes("2015-04-26"));
scan.setStopRow(Bytes.toBytes("2015-04-27"));

/* Setup the table mapper job */
TableMapReduceUtil.initTableMapperJob(
tablename,
scan,
DataScanMapper.class,
ImmutableBytesWritable.class,
KeyValue.class,
job, 
true, 
TableInputFormat.class
);
…

Tuttavia, l'impostazione di un tale lavoro diventa difficile per i tavoli pre-spaccati salati. Le chiavi di inizio e fine riga saranno diverse per ciascuna regione perché ognuna ha un sale univoco. E non possiamo specificare più intervalli in una Scan esempio.

Per risolvere questo problema, dobbiamo esaminare come funziona la tabella MapReduce. In genere, il framework MapReduce crea un'attività mappa per leggere ed elaborare ogni suddivisione di input. Ogni divisione viene generata in InputFormat base della classe, con il metodo getSplits() .

Nel lavoro MapReduce della tabella HBase, TableInputFormat è usato come InputFormat . All'interno dell'implementazione, getSplits() il metodo viene sovrascritto per recuperare le chiavi di inizio e fine riga da Scan esempio. Poiché i tasti della riga di inizio e fine si estendono su più regioni, l'intervallo è diviso per i confini della regione e restituisce l'elenco di TableSplit oggetti che coprono l'intervallo della chiave di scansione. Invece di essere basato sul blocco HDFS, TableSplit s sono basati sulla regione. Sovrascrivendo getSplits() metodo, siamo in grado di controllare il TableSplit .

Creazione di TableInputFormat personalizzato

Per modificare il comportamento di getSplits() metodo, una classe personalizzata che estende TableInputFormat è obbligatorio. Lo scopo di getSplits() qui serve per coprire l'intervallo di chiavi logiche in ciascuna regione, costruire il proprio intervallo di chiavi di riga con il loro sale unico. La classe HTable fornisce il metodo getStartEndKeys() che restituisce le chiavi di inizio e fine riga per ciascuna regione. Da ogni tasto di avvio, analizza il sale corrispondente per la regione.

Pair keys = table.getStartEndKeys();
for (int i = 0; i < keys.getFirst().length; i++) {

// The first 3 bytes is the salt, for the first region, start key is empty, so apply “000”
if (keys.getFirst()[i].length == 0) {
regionSalt = "000";
} else {
regionSalt = Bytes.toString(keys.getFirst()[i]).substring(0, 3);
}
…
}

La configurazione del lavoro supera l'intervallo di chiavi logiche

TableInputFormat recupera la chiave di avvio e arresto da Scan esempio. Dal momento che non possiamo utilizzare Scan nel nostro lavoro MapReduce, potremmo usare Configuration invece di passare queste due variabili e solo la chiave logica di inizio e fine è abbastanza buona (una variabile potrebbe essere una data o altre informazioni commerciali). Il getSplits() il metodo ha JobContext argomento, L'istanza di configurazione può essere letta come context.getConfiguration() .

Nel driver MapReduce:

Configuration conf = getConf();
conf = HBaseConfiguration.addHbaseResources(conf);

conf.set("logical.scan.start", "2015-04-26");
conf.set("logical.scan.stop", "2015-04-27");

In Custom TableInputFormat :

@Override 
public List getSplits(JobContext context) throws IOException {
conf = context.getConfiguration();
String scanStart = conf.get("logical.scan.start");
String scanStop = conf.get("logical.scan.stop");
…
}

Ricostruisci la gamma di chiavi salate per regione

Ora che abbiamo il salt e la chiave logica di avvio/arresto per ciascuna regione, possiamo ricostruire l'intervallo di chiavi di riga effettivo.

byte[] startRowKey = Bytes.toBytes(regionSalt + "|" + scanStart);
byte[] endRowKey = Bytes.toBytes(regionSalt + "|" + scanStop);

Creazione di un TableSplit per ogni regione

Con l'intervallo di chiavi di riga, ora possiamo inizializzare TableSplit esempio per la regione.

List splits = new ArrayList(keys.getFirst().length);
for (int i = 0; i < keys.getFirst().length; i++) {
…
byte[] startRowKey = Bytes.toBytes(regionSalt + "|" + scanStart);
byte[] endRowKey = Bytes.toBytes(regionSalt + "|" + scanStop);

InputSplit split = new TableSplit(table.getTableName(), startRowKey, endRowKey, regionLocation);
splits.add(split);
}

Un'altra cosa da guardare è la località dei dati. Il framework utilizza le informazioni sulla posizione in ogni divisione di input per assegnare un'attività della mappa nel relativo host locale. Per il nostro TableInputFormat , utilizziamo il metodo getTableRegionLocation() per recuperare la posizione della regione che serve la chiave di riga.

Questa posizione viene quindi passata a TableSplit costruttore. Ciò assicurerà che il mapper che elabora la suddivisione della tabella si trovi sullo stesso server della regione. Un metodo, chiamato DNS.reverseDns() , richiede l'indirizzo del server dei nomi HBase. Questo attributo è memorizzato nella configurazione “hbase.nameserver.address “.

this.nameServer = context.getConfiguration().get("hbase.nameserver.address", null);
…

public String getTableRegionLocation(HTable table, byte[] rowKey) throws IOException {
HServerAddress regionServerAddress = table.getRegionLocation(rowKey).getServerAddress();
InetAddress regionAddress = regionServerAddress.getInetSocketAddress().getAddress();
String regionLocation;
try {
regionLocation = reverseDNS(regionAddress);
} catch (NamingException e) {
regionLocation = regionServerAddress.getHostname();
}
return regionLocation;
}

protected String reverseDNS(InetAddress ipAddress) throws NamingException {
String hostName = this.reverseDNSCacheMap.get(ipAddress);
if (hostName == null) {
hostName = Strings.domainNamePointerToHostName(DNS.reverseDns(ipAddress, this.nameServer));
this.reverseDNSCacheMap.put(ipAddress, hostName);
}
return hostName;
}

Un codice completo di getSplits sarà simile a questo:

@Override 
public List getSplits(JobContext context) throws IOException {
conf = context.getConfiguration();
table = getHTable(conf);
if (table == null) {
throw new IOException("No table was provided.");
}

// Get the name server address and the default value is null.
this.nameServer = conf.get("hbase.nameserver.address", null);
String scanStart = conf.get("region.scan.start");
String scanStop = conf.get("region.scan.stop");

Pair keys = table.getStartEndKeys();
if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) {
throw new RuntimeException("At least one region is expected");
}
List splits = new ArrayList(keys.getFirst().length);
for (int i = 0; i < keys.getFirst().length; i++) {

String regionLocation = getTableRegionLocation(table, keys.getFirst()[i]);

String regionSalt = null;
if (keys.getFirst()[i].length == 0) {
regionSalt = "000";
} else {
regionSalt = Bytes.toString(keys.getFirst()[i]).substring(0, 3);
}
byte[] startRowKey = Bytes.toBytes(regionSalt + "|" + scanStart);
byte[] endRowKey = Bytes.toBytes(regionSalt + "|" + scanStop);

InputSplit split = new TableSplit(table.getTableName(), startRowKey, endRowKey, regionLocation);
splits.add(split);
}
log.info("Total table splits: " + splits.size());
return splits;
}

Utilizzare TableInoutFormat personalizzato nel driver MapReduce

Ora dobbiamo sostituire TableInputFormat classe con la build personalizzata che abbiamo usato per la configurazione del lavoro MapReduce della tabella.

Configuration conf = getConf();
conf = HBaseConfiguration.addHbaseResources(conf);
HTableInterface status_table = new HTable(conf, status_tablename);

conf.set("logical.scan.start", "2015-04-26");
conf.set("logical.scan.stop", "2015-04-27");

Scan scan = new Scan();
scan.setCaching(1000);
scan.setCacheBlocks(false);
scan.setBatch(1000);
scan.setMaxVersions(1);

/* Setup the table mapper job */
TableMapReduceUtil.initTableMapperJob(
tablename,
scan,
DataScanMapper.class,
ImmutableBytesWritable.class,
KeyValue.class,
job, 
true, 
MultiRangeTableInputFormat.class
);

L'approccio del TableInputFormat personalizzato fornisce una capacità di scansione efficiente e scalabile per le tabelle HBase progettate per utilizzare il sale per un carico di dati bilanciato. Poiché la scansione può ignorare qualsiasi chiave di riga non correlata, indipendentemente dalle dimensioni della tabella, la complessità della scansione è limitata solo alla dimensione dei dati di destinazione. Nella maggior parte dei casi d'uso, questo può garantire tempi di elaborazione relativamente coerenti man mano che la tabella cresce.