Redis
 sql >> Database >  >> NoSQL >> Redis

Redis su Spark:attività non serializzabile

In Spark, le funzioni su RDD s (come map qui) vengono serializzati e inviati agli esecutori per l'elaborazione. Ciò implica che tutti gli elementi contenuti in tali operazioni devono essere serializzabili.

La connessione Redis qui non è serializzabile in quanto apre connessioni TCP al DB di destinazione che sono legate alla macchina su cui è stata creata.

La soluzione è creare quelle connessioni sugli esecutori, nel contesto di esecuzione locale. Ci sono pochi modi per farlo. Due che mi vengono in mente sono:

  • rdd.mapPartitions :consente di elaborare un'intera partizione in una volta, e quindi di ammortizzare il costo di creazione delle connessioni)
  • Gestione connessioni singleton:crea la connessione una volta per esecutore

mapPartitions è più semplice in quanto tutto ciò che richiede è una piccola modifica alla struttura del programma:

val perhit = perhitFile.mapPartitions{partition => 
    val r = new RedisClient("192.168.1.101", 6379) // create the connection in the context of the mapPartition operation
    val res = partition.map{ x =>
        ...
        val refStr = r.hmget(...) // use r to process the local data
    }
    r.close // take care of resources
    res
}

Un gestore di connessione singleton può essere modellato con un oggetto che contiene un riferimento pigro a una connessione (nota:funzionerà anche un riferimento mutabile).

object RedisConnection extends Serializable {
   lazy val conn: RedisClient = new RedisClient("192.168.1.101", 6379)
}

Questo oggetto può quindi essere utilizzato per creare un'istanza di 1 connessione per JVM di lavoro e viene utilizzato come Serializable oggetto in un'operazione di chiusura.

val perhit = perhitFile.map{x => 
    val param = f(x)
    val refStr = RedisConnection.conn.hmget(...) // use RedisConnection to get a connection to the local data
    }
}

Il vantaggio dell'utilizzo dell'oggetto singleton è un sovraccarico minore poiché le connessioni vengono create solo una volta da JVM (invece di 1 per partizione RDD)

Ci sono anche alcuni svantaggi:

  • La pulizia delle connessioni è complicata (gancio di spegnimento/timer)
  • è necessario garantire la sicurezza dei thread delle risorse condivise

(*) codice fornito a scopo illustrativo. Non compilato o testato.