Il problema è nel tuo codice. Poiché sovrascrivi una tabella da cui stai tentando di leggere, cancelli in modo efficace tutti i dati prima che Spark possa effettivamente accedervi.
Ricorda che Spark è pigro. Quando crei un Dataset
Spark recupera i metadati richiesti, ma non carica i dati. Quindi non esiste una cache magica che conserverà il contenuto originale. I dati verranno caricati quando è effettivamente richiesto. Eccolo quando esegui write
azione e quando inizi a scrivere non ci sono più dati da recuperare.
Quello che ti serve è qualcosa del genere:
- Crea un
Dataset
. -
Applica le trasformazioni richieste e scrivi i dati in una tabella MySQL intermedia.
-
TRUNCATE
l'input originale eINSERT INTO ... SELECT
dalla tabella intermedia oDROP
la tabella originale eRENAME
tavola intermedia.
Un approccio alternativo, ma meno favorevole, sarebbe:
- Crea un
Dataset
. - Applica le trasformazioni richieste e scrivi i dati in una tabella Spark persistente (
df.write.saveAsTable(...)
o equivalente) TRUNCATE
l'input originale.- Rileggi i dati e salva (
spark.table(...).write.jdbc(...)
) - Tavolo Drop Spark.
Non possiamo sottolineare abbastanza che usando Spark cache
/ persist
non è la strada da percorrere. Anche con il conservativo StorageLevel
(MEMORY_AND_DISK_2
/ MEMORY_AND_DISK_SER_2
) i dati memorizzati nella cache possono andare persi (errori del nodo), causando errori di correttezza silenziosi.