Introduzione
Python è ampiamente utilizzato da Data Engineer e Data Scientist per risolvere tutti i tipi di problemi, dalle pipeline ETL/ELT alla creazione di modelli di machine learning. Apache HBase è un sistema di archiviazione dati efficace per molti flussi di lavoro, ma l'accesso a questi dati in modo specifico tramite Python può essere difficile. Per i professionisti dei dati che desiderano utilizzare i dati archiviati in HBase, il recente progetto a monte "hbase-connectors" può essere utilizzato con PySpark per le operazioni di base.
In questa serie di blog, spiegheremo come configurare PySpark e HBase insieme per l'uso di base di Spark e per i lavori gestiti in CDSW. Per coloro che non hanno familiarità con CDSW, è una piattaforma di data science aziendale self-service sicura per i data scientist per gestire le proprie pipeline di analisi, accelerando così i progetti di machine learning dall'esplorazione alla produzione. Per ulteriori informazioni su CDSW, visita la pagina del prodotto Cloudera Data Science Workbench.
In questo post, verranno spiegate e dimostrate diverse operazioni insieme all'output di esempio. Per contesto, tutte le operazioni di esempio in questo post del blog specifico vengono eseguite con una distribuzione CDSW.
Prerequisiti:
- Disporre di un cluster CDP con HBase e Spark
- Se hai intenzione di seguire gli esempi tramite CDSW, avrai bisogno che sia installato – Installazione di Cloudera Data Science Workbench
- Python 3 è installato su ogni nodo nello stesso percorso
Configurazione:
Innanzitutto, HBase e Spark devono essere configurati insieme affinché le query Spark SQL funzionino correttamente. Per fare ciò ci sono due parti:in primo luogo, configurare i server della regione HBase tramite Cloudera Manager; e in secondo luogo, assicurati che il runtime di Spark disponga di binding HBase. Una nota da tenere a mente, tuttavia, è che Cloudera Manager ha già impostato alcune variabili di configurazione e di ambiente per puntare automaticamente Spark su HBase per te. Tuttavia, il primo passaggio della configurazione delle query Spark SQL è comune a tutti i tipi di distribuzione su cluster CDP, ma il secondo è leggermente diverso a seconda del tipo di distribuzione.
Configurazione dei server della regione HBase
- Vai su Cloudera Manager e seleziona il servizio HBase.
- Cerca "ambiente regionserver"
- Aggiungi una nuova variabile di ambiente utilizzando lo snippet di configurazione avanzata dell'ambiente RegionServer (Valvola di sicurezza):
- Chiave:HBASE_CLASSPATH
- Valore:/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark.jar:/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib /hbase-spark-protocol-shaded.jar:/opt/cloudera/parcels/CDH/jars/scala-library-2.11.12.jar
Assicurati di utilizzare i numeri di versione appropriati.
- Riavvia i server regionali.
Dopo aver eseguito i passaggi precedenti, segui i passaggi seguenti a seconda se desideri una distribuzione CDSW o non CDSW.
Aggiunta di binding HBase a Spark Runtime in distribuzioni non CDSW
Per distribuire la shell o utilizzare correttamente spark-submit, utilizza i seguenti comandi per assicurarti che spark disponga dei collegamenti HBase corretti.
pyspark –jars /opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark.jar,/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark-protocol-shaded. vaso
spark-submit –jars /opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark.jar,/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark-protocol- vaso.ombreggiato
Aggiunta di binding HBase a Spark Runtime nelle distribuzioni CDSW
Per configurare CDSW con HBase e PySpark, sono necessari alcuni passaggi.
1) Assicurati che Python 3 sia installato su ogni nodo del cluster e prendi nota del percorso
2) Crea un nuovo progetto in CDSW e usa un modello PySpark
3) Apri il progetto, vai su Impostazioni -> Motore -> Variabili d'ambiente.
4) Imposta PYSPRK3_DRIVER_PYTHON e PYSPRK3_PYTHON al percorso in cui Python è installato sui nodi del cluster (percorso annotato nel passaggio 1).
Di seguito è riportato un esempio di come dovrebbe apparire.
5) Nel tuo progetto, vai su File -> spark-defaults.conf e aprilo nel Workbench
6) Copia e incolla la riga sottostante in quel file e assicurati che sia salvato prima di iniziare una nuova sessione.
spark.jars=/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark.jar,/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark-protocol-shaded.jar
A questo punto, CDSW è ora configurato per eseguire lavori PySpark su HBase! Il resto di questo post del blog fa riferimento ad alcune operazioni di esempio su una distribuzione CDSW.
Operazioni di esempio
Operazioni di posizionamento
Esistono due modi per inserire e aggiornare le righe in HBase. Il primo e più consigliato metodo è creare un catalogo, che è uno schema che mapperà le colonne di una tabella HBase su un dataframe PySpark specificando il nome della tabella e lo spazio dei nomi. La creazione di questo formato JSON definito dall'utente è il metodo preferito poiché può essere utilizzato anche con altre operazioni. Per ulteriori informazioni sui cataloghi, fare riferimento a questa documentazione http://hbase.apache.org/book.html#_define_catalog. Il secondo metodo utilizza un parametro di mappatura specifico chiamato "hbase.columns.mapping" che accetta solo una stringa di coppie chiave-valore.
- Utilizzo dei cataloghi
from pyspark.sql import Row from pyspark.sql import SparkSession spark = SparkSession\ .builder\ .appName("SampleApplication")\ .getOrCreate() tableCatalog = ''.join("""{ "table":{"namespace":"default", "name":"tblEmployee", "tableCoder":"PrimitiveType"}, "rowkey":"key", "columns":{ "key":{"cf":"rowkey", "col":"key", "type":"int"}, "empId":{"cf":"personal","col":"empId","type":"string"}, "empName":{"cf":"personal", "col":"empName", "type":"string"}, "empState":{"cf":"personal", "col":"empWeight", "type":"string"} } }""".split()) employee = [(10, 'jonD', 'Jon Daniels', 'CA'), (6, 'billR', 'Bill Robert', 'FL')] employeeRDD = spark.sparkContext.parallelize(employee) employeeMap = employeeRDD.map(lambda x: Row(key=int(x[0]), empId=x[1], empName=x[2], empState=x[3])) employeeDF = spark.createDataFrame(employeeMap) employeeDF.write.format("org.apache.hadoop.hbase.spark") \ .options(catalog=tableCatalog, newTable=5) \ .option("hbase.spark.use.hbasecontext", False) \ .save() # newTable refers to the NumberOfRegions which has to be > 3
Verifica che una nuova tabella chiamata "tblEmployee" venga creata in HBase semplicemente aprendo la shell HBase ed eseguendo il comando seguente:
scansiona 'tblEmployee', {'LIMIT' => 2}
L'utilizzo dei cataloghi può anche consentire di caricare facilmente le tabelle HBase. Questo sarà discusso in una prossima puntata.
- Utilizzo di hbase.columns.mapping
Durante la scrittura del PySpark Dataframe, è possibile aggiungere un'opzione denominata "hbase.columns.mapping" per includere una stringa che mappa correttamente le colonne. Questa opzione ti consente solo di inserire righe in tabelle esistenti.
Nella shell HBase, creiamo prima una tabella crea 'tblEmployee2', 'personal'
Ora in PySpark inseriamo 2 righe usando "hbase.columns.mapping"
from pyspark.sql import Row from pyspark.sql import SparkSession spark = SparkSession\ .builder\ .appName("SampleApplication")\ .getOrCreate() employee = [(10, 'jonD', 'Jon Daniels', 170.7), (6, 'billR', 'Bill Robert', 200.1)] employeeRDD = spark.sparkContext.parallelize(employee) employeeMap = employeeRDD.map(lambda x: Row(key=int(x[0]), empId=x[1], empName=x[2], empWeight=float(x[3]))) employeeDF = spark.createDataFrame(employeeMap) employeeDF.write.format("org.apache.hadoop.hbase.spark") \ .option("hbase.columns.mapping", "key INTEGER :key, empId STRING personal:empId, empName STRING personal:empName, empWeight FLOAT personal:empWeight") \ .option("hbase.table", "tblEmployee2") \ .option("hbase.spark.use.hbasecontext", False) \ .save()
Ancora una volta, verifica che una nuova tabella denominata "tblEmployee2" abbia queste nuove righe.
scansiona 'tblEmployee2', {'LIMIT' => 2}
Questo completa i nostri esempi su come inserire righe tramite PySpark nelle tabelle HBase. Nella prossima puntata, parlerò delle operazioni di acquisizione e scansione, PySpark SQL e alcuni problemi di risoluzione dei problemi. Fino ad allora dovresti ottenere un cluster CDP e farti strada attraverso questi esempi.