Questo post sul blog è stato pubblicato su Hortonworks.com prima della fusione con Cloudera. Alcuni collegamenti, risorse o riferimenti potrebbero non essere più accurati.
Siamo orgogliosi di annunciare l'anteprima tecnica di Spark-HBase Connector, sviluppato da Hortonworks in collaborazione con Bloomberg.
Il connettore Spark-HBase sfrutta l'API dell'origine dati (SPRK-3247) introdotta in Spark-1.2.0. Colma il divario tra il semplice archivio HBase Key Value e complesse query SQL relazionali e consente agli utenti di eseguire analisi dei dati complesse su HBase utilizzando Spark. Un DataFrame HBase è un DataFrame Spark standard ed è in grado di interagire con qualsiasi altra origine dati come Hive, ORC, Parquet, JSON, ecc.
Sfondo
Esistono diversi connettori Spark HBase open source disponibili come pacchetti Spark, come progetti indipendenti o nel trunk HBase.
Spark è passato alle API Dataset/DataFrame, che forniscono l'ottimizzazione del piano di query integrata. Ora, gli utenti finali preferiscono utilizzare l'interfaccia basata su DataFrames/Dataset.
Il connettore HBase nel trunk HBase ha un ricco supporto a livello RDD, ad es. BulkPut, ecc., ma il suo supporto DataFrame non è così ricco. Il connettore trunk HBase si basa sull'HadoopRDD standard con TableInputFormat integrato in HBase e presenta alcune limitazioni delle prestazioni. Inoltre, BulkGet eseguito nel driver potrebbe essere un singolo punto di errore.
Ci sono altre implementazioni alternative. Prendi Spark-SQL-on-HBase come esempio. Applica tecniche di ottimizzazione personalizzate molto avanzate incorporando il proprio piano di ottimizzazione delle query all'interno del motore Spark Catalyst standard, invia l'RDD a HBase ed esegue attività complicate, come l'aggregazione parziale, all'interno del coprocessore HBase. Questo approccio è in grado di ottenere prestazioni elevate, ma è difficile da mantenere a causa della sua complessità e della rapida evoluzione di Spark. Anche consentire l'esecuzione di codice arbitrario all'interno di un coprocessore può comportare rischi per la sicurezza.
Il connettore Spark-on-HBase (SHC) è stato sviluppato per superare questi potenziali colli di bottiglia e punti deboli. Implementa l'API Spark Datasource standard e sfrutta il motore Spark Catalyst per l'ottimizzazione delle query. Parallelamente, l'RDD viene costruito da zero invece di utilizzare TableInputFormat al fine di ottenere prestazioni elevate. Con questo RDD personalizzato, tutte le tecniche critiche possono essere applicate e implementate completamente, come lo sfoltimento delle partizioni, lo sfoltimento delle colonne, il pushdown dei predicati e la località dei dati. Il design rende la manutenzione molto semplice, pur ottenendo un buon compromesso tra prestazioni e semplicità.
Architettura
Presumiamo che Spark e HBase siano distribuiti nello stesso cluster e che gli esecutori Spark siano posizionati insieme ai server della regione, come illustrato nella figura seguente.
Figura 1. Architettura del connettore Spark-on-HBase
Ad alto livello, il connettore tratta sia Scan che Get in modo simile ed entrambe le azioni vengono eseguite negli esecutori. Il driver elabora la query, aggrega le scansioni/acquisisce in base ai metadati della regione e genera attività per regione. Le attività vengono inviate agli esecutori preferiti collocati insieme al server della regione e vengono eseguite in parallelo negli esecutori per ottenere una migliore localizzazione e concorrenza dei dati. Se una regione non contiene i dati richiesti, al server della regione non viene assegnata alcuna attività. Un'attività può essere costituita da più scansioni e BulkGet e le richieste di dati da parte di un'attività vengono recuperate da un solo server dell'area e questo server dell'area sarà anche la località preferita per l'attività. Si noti che il driver non è coinvolto nell'esecuzione del lavoro reale, ad eccezione delle attività di pianificazione. Ciò evita che il conducente sia il collo di bottiglia.
Catalogo tabelle
Per portare la tabella HBase come tabella relazionale in Spark, definiamo una mappatura tra le tabelle HBase e Spark, denominata Catalogo tabelle. Ci sono due parti critiche di questo catalogo. Uno è la definizione della chiave di riga e l'altro è il mapping tra la colonna della tabella in Spark e la famiglia di colonne e il qualificatore di colonna in HBase. Fare riferimento alla sezione Utilizzo per i dettagli.
Supporto nativo di Avro
Il connettore supporta il formato Avro in modo nativo, poiché è una pratica molto comune mantenere i dati strutturati in HBase come array di byte. L'utente può salvare direttamente il record Avro in HBase. Internamente, lo schema Avro viene convertito automaticamente in un tipo di dati Spark Catalyst nativo. Si noti che entrambe le parti chiave-valore in una tabella HBase possono essere definite in formato Avro. Fare riferimento agli esempi/casi di test nel repository per l'utilizzo esatto.
Predicare Pushdown
Il connettore recupera solo le colonne richieste dal server della regione per ridurre il sovraccarico della rete ed evitare l'elaborazione ridondante nel motore Spark Catalyst. I filtri HBase standard esistenti vengono utilizzati per eseguire il push-down del predicato senza sfruttare la capacità del coprocessore. Poiché HBase non è a conoscenza del tipo di dati ad eccezione dell'array di byte e dell'incoerenza dell'ordine tra i tipi primitivi Java e l'array di byte, dobbiamo preelaborare la condizione del filtro prima di impostare il filtro nell'operazione di scansione per evitare qualsiasi perdita di dati. All'interno del server della regione, i record che non corrispondono alla condizione della query vengono filtrati.
Eliminazione delle partizioni
Estraendo la chiave di riga dai predicati, dividiamo Scan/BulkGet in più intervalli non sovrapposti, solo i server della regione che hanno i dati richiesti eseguiranno Scan/BulkGet. Attualmente, l'eliminazione della partizione viene eseguita sulla prima dimensione delle chiavi di riga. Ad esempio, se una chiave di riga è "key1:key2:key3", l'eliminazione della partizione sarà basata solo su "key1". Si noti che le condizioni WHERE devono essere definite con attenzione. In caso contrario, l'eliminazione della partizione potrebbe non avere effetto. Ad esempio, WHERE rowkey1> "abc" OR column ="xyz" (dove rowkey1 è la prima dimensione di rowkey e column è una normale colonna hbase) risulterà in una scansione completa, poiché dobbiamo coprire tutti gli intervalli perché della O logica.
Località dati
Quando un esecutore Spark è collocato insieme ai server dell'area HBase, la località dei dati viene ottenuta identificando la posizione del server dell'area e si impegna a posizionare l'attività con il server dell'area. Ogni esecutore esegue Scan/BulkGet sulla parte dei dati co-localizzati sullo stesso host.
Scansione e raccolta in blocco
Questi due operatori sono esposti agli utenti specificando WHERE CLAUSE, ad es. WHERE column> x e column
Utilizzo
Di seguito viene illustrata la procedura di base su come utilizzare il connettore. Per maggiori dettagli e casi d'uso avanzati, come Avro e supporto per chiavi composite, fare riferimento agli esempi nel repository.
1) Definisci il catalogo per la mappatura dello schema:
[code language="scala"]def catalog = s"""{ |"table":{"namespace":"default", "name":"table1"}, |"rowkey":"key", |"columns":{ |"col0":{"cf":"rowkey", "col":"key", "type":"string"}, |"col1":{"cf":"cf1", "col":"col1", "type":"boolean"}, |"col2":{"cf":"cf2", "col":"col2", "type":"double"}, |"col3":{"cf":"cf3", "col":"col3", "type":"float"}, |"col4":{"cf":"cf4", "col":"col4", "type":"int"}, |"col5":{"cf":"cf5", "col":"col5", "type":"bigint"}, |"col6":{"cf":"cf6", "col":"col6", "type":"smallint"}, |"col7":{"cf":"cf7", "col":"col7", "type":"string"}, |"col8":{"cf":"cf8", "col":"col8", "type":"tinyint"} |} |}""".stripMargin [/code]
2) Preparare i dati e popolare la tabella HBase:
case class HBaseRecord(col0:String, col1:Boolean, col2:Double, col3:Float, col4:Int, col5:Long, col6:Short, col7:String, col8:Byte)
oggetto HBaseRecord {def apply(i:Int, t:String):HBaseRecord ={ val s =s”””row${“%03d”.format(i)}””” HBaseRecord(s, i % 2 ==0, i.toDouble, i.toFloat, i, i.toLong, i.toShort, s"String$i:$t", i.toByte) }}
val data =(da 0 a 255).map { i => HBaseRecord(i, "extra")}
sc.parallelize(data).toDF.write.options(
Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> “5”))
.format(“org.apache.spark. sql.execution.datasources.hbase”)
.save()
3) Caricare DataFrame:
def withCatalog(cat:String):DataFrame ={
sqlContext
.read
.options(Map(HBaseTableCatalog.tableCatalog->cat))
.format( “org.apache.spark.sql.execution.datasources.hbase”)
.load()
}
val df =withCatalog(catalog)
4) Query integrata nella lingua:
val s =df.filter((($”col0″ <=“riga050″ &&$”col0”> “riga040”) ||
$”col0″ ===“riga005” ||
$”col0″ ===“riga020” ||
$”col0″ === “r20” ||
$”col0″ <=“riga005”) &&
($”col4″ ===1 ||
$”col4″ ===42))
.select(“col0”, “col1”, “col4”)
s .mostra
5) Query SQL:
df.registerTempTable(“table”)
sqlContext.sql(“select count(col1) from table”).show
Configurazione del pacchetto Spark
Gli utenti possono utilizzare il connettore Spark-on-HBase come pacchetto Spark standard. Per includere il pacchetto nella tua applicazione Spark usa:
spark-shell, pyspark o spark-submit
> $SPRK_HOME/bin/spark-shell –pacchetti zhzhan:shc:0.0.11-1.6.1-s_2.10
Gli utenti possono anche includere il pacchetto come dipendenza nel file SBT. Il formato è spark-package-name:version
spDependencies +=“zhzhan/shc:0.0.11-1.6.1-s_2.10”
Esecuzione in cluster protetto
Per l'esecuzione in un cluster abilitato per Kerberos, l'utente deve includere i jar relativi a HBase nel percorso di classe poiché il recupero e il rinnovo del token HBase viene eseguito da Spark ed è indipendente dal connettore. In altre parole, l'utente deve avviare l'ambiente nel modo normale, tramite kinit o fornendo principal/keytab. Gli esempi seguenti mostrano come eseguire in un cluster sicuro sia con la modalità yarn-client che con la modalità yarn-cluster. Nota che SPARK_CLASSPATH deve essere impostato per entrambe le modalità e il jar di esempio è solo un segnaposto per Spark.
export SPARK_CLASSPATH=/usr/hdp/current/hbase-client/lib/hbase-common.jar:/usr/hdp/current/hbase-client/lib/hbase-client.jar:/usr/hdp/current/hbase- client/lib/hbase-server.jar:/usr/hdp/current/hbase-client/lib/hbase-protocol.jar:/usr/hdp/current/hbase-client/lib/guava-12.0.1.jar
Supponiamo che hrt_qa sia un account senza testa, l'utente può utilizzare il seguente comando per kinit:
kinit -k -t /tmp/hrt_qa.headless.keytab hrt_qa
/usr/hdp/current/spark-client/bin/spark-submit –class org.apache.spark.sql.execution.datasources.hbase.examples.HBaseSource –master yarn-client –packages zhzhan:shc:0.0.11- 1.6.1-s_2.10 –num-executors 4 –driver-memory 512m –executor-memory 512m –executor-cores 1 /usr/hdp/current/spark-client/lib/spark-examples-1.6.1.2.4.2. 0-106-hadoop2.7.1.2.4.2.0-106.jar
/usr/hdp/current/spark-client/bin/spark-submit –class org.apache.spark.sql.execution.datasources.hbase.examples.HBaseSource –master yarn-cluster –files /etc/hbase/conf/hbase -site.xml –packages zhzhan:shc:0.0.11-1.6.1-s_2.10 –num-executors 4 –driver-memory 512m –executor-memory 512m –executor-cores 1 /usr/hdp/current/spark- client/lib/spark-examples-1.6.1.2.4.2.0-106-hadoop2.7.1.2.4.2.0-106.jar
Mettere tutto insieme
Abbiamo appena fornito una rapida panoramica di come HBase supporta Spark a livello di DataFrame. Con l'API DataFrame, le applicazioni Spark possono lavorare con i dati archiviati nella tabella HBase con la stessa facilità con i dati archiviati in altre origini dati. Con questa nuova funzionalità, i dati nelle tabelle HBase possono essere facilmente utilizzati dalle applicazioni Spark e da altri strumenti interattivi, ad es. gli utenti possono eseguire una query SQL complessa su una tabella HBase all'interno di Spark, eseguire un join di tabelle su Dataframe o integrarsi con Spark Streaming per implementare un sistema più complicato.
Cosa c'è dopo?
Attualmente, il connettore è ospitato nel repository Hortonworks e pubblicato come pacchetto Spark. È in corso la migrazione al trunk Apache HBase. Durante la migrazione, abbiamo identificato alcuni bug critici nel trunk HBase e verranno corretti insieme all'unione. Il lavoro della community è monitorato dall'ombrello HBase JIRA HBASE-14789, inclusi HBASE-14795 e HBASE-14796 per ottimizzare l'architettura di elaborazione sottostante per Scan and BulkGet, HBASE-14801 per fornire un'interfaccia utente JSON per facilità d'uso, HBASE-15336 per il percorso di scrittura DataFrame, HBASE-15334 per il supporto Avro, HBASE-15333 per supportare i tipi primitivi Java, come short, int, long, float e double, ecc., HBASE-15335 per supportare la chiave di riga composita e HBASE-15572 per aggiungere una semantica di timestamp opzionale. Non vediamo l'ora di produrre una versione futura del connettore che renda il connettore ancora più facile da utilizzare.
Riconoscimento
Vogliamo ringraziare Hamel Kothari, Sudarshan Kadambi e il team di Bloomberg per averci guidato in questo lavoro e anche aiutandoci a convalidare questo lavoro. Vogliamo anche ringraziare la community di HBase per aver fornito il proprio feedback e per averlo migliorato. Infine, questo lavoro ha sfruttato le lezioni delle precedenti integrazioni di Spark HBase e vogliamo ringraziare i loro sviluppatori per aver aperto la strada.
Riferimento:
SHC:https://github.com/hortonworks/shc-release
Pacchetto Spark:http://spark-packages.org/package/zhzhan/shc
Apache HBase: https://hbase.apache.org/
Apache Spark:http://spark.apache.org/