Questo post sul blog è stato pubblicato su Hortonworks.com prima della fusione con Cloudera. Alcuni collegamenti, risorse o riferimenti potrebbero non essere più accurati.
Nel 2016 abbiamo pubblicato la seconda versione v1.0.1 di Spark HBase Connector (SHC). In questo blog analizzeremo le principali funzionalità che abbiamo implementato quest'anno.
Supporta il programmatore Phoenix
SHC può essere utilizzato per scrivere i dati nel cluster HBase per un'ulteriore elaborazione a valle. Supporta la serializzazione Avro per i dati di input e output e per impostazione predefinita è una serializzazione personalizzata utilizzando un semplice meccanismo di codifica nativo. Durante la lettura dei dati di input, SHC spinge verso il basso i filtri su HBase per scansioni efficienti dei dati. Data la popolarità dei dati Phoenix in HBase, sembra naturale supportare i dati Phoenix come input per HBase oltre ai dati Avro. Inoltre, l'impostazione predefinita della semplice codifica binaria nativa sembra suscettibile di modifiche future ed è un rischio per gli utenti che scrivono dati da SHC in HBase. Ad esempio, con il futuro di SHC, la compatibilità con le versioni precedenti deve essere gestita correttamente. Quindi l'impostazione predefinita, SHC deve passare a un formato più standard e ben testato come Phoenix.
Per il supporto della chiave composita, prima di questa funzione, era necessario fissare la lunghezza del valore di ciascuna dimensione, ad eccezione dell'ultima dimensione della chiave composita. Questa limitazione è stata rimossa dal programmatore Phoenix. Attualmente, se gli utenti scelgono Phoenix come codificatore di dati, non devono specificare la lunghezza di ciascuna parte della chiave composita nel catalogo.
Poiché Phoenix è il codificatore predefinito, l'unica modifica per gli utenti è che se desiderano utilizzare PrimitiveType come codificatore di dati, devono specificare "tableCoder":"PrimitiveType" nei loro cataloghi per notificare a SHC che desiderano utilizzare PrimitiveType invece di Phoenix come “tableCoder”.
def catalog =s”””{
|”table”:{“namespace”:”default”, “name”:”table1″, “tableCoder”:”PrimitiveType”},
|”rowkey ”:”key”,
|”columns”:{
|”col0″:{“cf”:”rowkey”, “col”:”key”, “type”:”string”} ,
|”col1″:{“cf”:”cf1″, “col”:”col1″, “type”:”boolean”},
|”col2″:{“cf”:”cf2″, “col”:”col2″, “type”:”double”},
|”col3″:{“cf”:”cf3″, “col”:”col3″, “type” :”float”},
|”col4″:{“cf”:”cf4″, “col”:”col4″, “type”:”int”},
|”col5″:{“cf”:”cf5″, “col”:”col5″, “type”:”bigint”},
|”col6″:{“cf”:”cf6″, “col”:”col6 ″, “type”:”smallint”},
|”col7″:{“cf”:”cf7″, “col”:”col7″, “type”:”string”},
|”col8″:{“cf”:”cf8″, “col”:”col8″, “type”:”tinyint”}
|}
|}”””.stripMargin
Memorizza nella cache le connessioni Spark HBase
SHC non ha memorizzato nella cache gli oggetti di connessione su HBase prima. In particolare, la chiamata a "ConnectionFactory.createConnection" veniva effettuata ogni volta che SHC aveva bisogno di visitare le tabelle e le regioni HBase. Gli utenti possono vederlo semplicemente guardando i registri dell'esecutore e osservando le connessioni di Zookeeper stabilite per ogni richiesta. Nella documentazione di Interface Connection, si dice che la creazione della connessione è un'operazione pesante e che le implementazioni della connessione sono thread-safe. Quindi, per i processi di lunga durata, sarebbe molto utile per SHC mantenere una connessione memorizzata nella cache. Con questa funzione, SHC riduce drasticamente il numero di connessioni create e migliora notevolmente le sue prestazioni nel processo.
Supporta le famiglie di colonne duplicate
SHC ha supportato il supporto delle famiglie di colonne duplicate. Ora gli utenti possono definire i propri cataloghi in questo modo:
def catalog =s”””{
|”table”:{“namespace”:”default”, “name”:”table1″, “tableCoder”:”PrimitiveType”},
|”rowkey ”:”key”,
|”columns”:{
|”col0″:{“cf”:”rowkey”, “col”:”key”, “type”:”string”} ,
|”col1″:{“cf”:”cf1″, “col”:”col1″, “type”:”boolean”},
|”col2″:{“cf”:”cf1″, “col”:”col2″, “type”:”double”},
|”col3″:{“cf”:”cf1″, “col”:”col3″, “type” :”float”},
|”col4″:{“cf”:”cf2″, “col”:”col4″, “type”:”int”},
|”col5″:{“cf”:”cf2″, “col”:”col5″, “type”:”bigint”},
|”col6″:{“cf”:”cf3″, “col”:”col6 ″, “type”:”smallint”},
|”col7″:{“cf”:”cf3″, “col”:”col7″, “type”:”string”},
|”col8″:{“cf”:”cf3″, “col”:”col8″, “type”:”tinyint”}
|}
|}”””.stripMargin
Nella definizione del catalogo sopra, le colonne "col0", "col1" e "col2" hanno la stessa famiglia di colonne "cf1".
Utilizza l'API Spark UnhandledFilters
SHC ha anche implementato Spark API unhandledFilters, che è un'ottimizzazione efficace. Questa API informa Spark sui filtri che SHC non stanno implementando invece di restituire tutti i filtri. Il comportamento precedente, in questo caso, consisteva nel riapplicare tutti i filtri una volta estratti i dati in Spark. Questo dovrebbe essere idempotente, quindi non cambia alcun dato, ma può essere costoso se i filtri sono complicati.
Comunità SHC
La comunità SHC è più grande e più influente di un anno fa. Nel 2016 abbiamo tenuto discorsi all'Hadoop Summit e al meetup HBase/Spark e abbiamo scritto blog dettagliati. Con l'aumento del numero di utenti SHC, stiamo ricevendo un numero maggiore di domande degli utenti. Siamo molto felici di vedere una maggiore adozione di SHC e se hai qualche idea su come migliorarlo ulteriormente, ti preghiamo di fornirci un feedback tramite Hortonworks Community Connection.
RICONOSCIMENTO
Vogliamo ringraziare il team di Bloomberg per averci guidato in questo lavoro e anche aiutandoci a convalidare questo lavoro. Vogliamo anche ringraziare la community di HBase per aver fornito il proprio feedback e per averlo migliorato. Infine, questo lavoro ha sfruttato le lezioni delle precedenti integrazioni di Spark HBase e vogliamo ringraziare i loro sviluppatori per aver aperto la strada.
RIFERIMENTO:
SHC: https://github.com/hortonworks-spark/shc
Apache HBase: https://hbase.apache.org/
Apache Spark: http://spark.apache.org/
Apache Phoenix: https://phoenix.apache.org/