In Spark, le funzioni su RDD
s (come map
qui) vengono serializzati e inviati agli esecutori per l'elaborazione. Ciò implica che tutti gli elementi contenuti in tali operazioni devono essere serializzabili.
La connessione Redis qui non è serializzabile in quanto apre connessioni TCP al DB di destinazione che sono legate alla macchina su cui è stata creata.
La soluzione è creare quelle connessioni sugli esecutori, nel contesto di esecuzione locale. Ci sono pochi modi per farlo. Due che mi vengono in mente sono:
rdd.mapPartitions
:consente di elaborare un'intera partizione in una volta, e quindi di ammortizzare il costo di creazione delle connessioni)- Gestione connessioni singleton:crea la connessione una volta per esecutore
mapPartitions
è più semplice in quanto tutto ciò che richiede è una piccola modifica alla struttura del programma:
val perhit = perhitFile.mapPartitions{partition =>
val r = new RedisClient("192.168.1.101", 6379) // create the connection in the context of the mapPartition operation
val res = partition.map{ x =>
...
val refStr = r.hmget(...) // use r to process the local data
}
r.close // take care of resources
res
}
Un gestore di connessione singleton può essere modellato con un oggetto che contiene un riferimento pigro a una connessione (nota:funzionerà anche un riferimento mutabile).
object RedisConnection extends Serializable {
lazy val conn: RedisClient = new RedisClient("192.168.1.101", 6379)
}
Questo oggetto può quindi essere utilizzato per creare un'istanza di 1 connessione per JVM di lavoro e viene utilizzato come Serializable
oggetto in un'operazione di chiusura.
val perhit = perhitFile.map{x =>
val param = f(x)
val refStr = RedisConnection.conn.hmget(...) // use RedisConnection to get a connection to the local data
}
}
Il vantaggio dell'utilizzo dell'oggetto singleton è un sovraccarico minore poiché le connessioni vengono create solo una volta da JVM (invece di 1 per partizione RDD)
Ci sono anche alcuni svantaggi:
- La pulizia delle connessioni è complicata (gancio di spegnimento/timer)
- è necessario garantire la sicurezza dei thread delle risorse condivise
(*) codice fornito a scopo illustrativo. Non compilato o testato.