Il problema è nel tuo codice. Poiché sovrascrivi una tabella da cui stai tentando di leggere, cancelli in modo efficace tutti i dati prima che Spark possa effettivamente accedervi.
Ricorda che Spark è pigro. Quando crei un Dataset Spark recupera i metadati richiesti, ma non carica i dati. Quindi non esiste una cache magica che conserverà il contenuto originale. I dati verranno caricati quando è effettivamente richiesto. Eccolo quando esegui write azione e quando inizi a scrivere non ci sono più dati da recuperare.
Quello che ti serve è qualcosa del genere:
- Crea un
Dataset. -
Applica le trasformazioni richieste e scrivi i dati in una tabella MySQL intermedia.
-
TRUNCATEl'input originale eINSERT INTO ... SELECTdalla tabella intermedia oDROPla tabella originale eRENAMEtavola intermedia.
Un approccio alternativo, ma meno favorevole, sarebbe:
- Crea un
Dataset. - Applica le trasformazioni richieste e scrivi i dati in una tabella Spark persistente (
df.write.saveAsTable(...)o equivalente) TRUNCATEl'input originale.- Rileggi i dati e salva (
spark.table(...).write.jdbc(...)) - Tavolo Drop Spark.
Non possiamo sottolineare abbastanza che usando Spark cache / persist non è la strada da percorrere. Anche con il conservativo StorageLevel (MEMORY_AND_DISK_2 / MEMORY_AND_DISK_SER_2 ) i dati memorizzati nella cache possono andare persi (errori del nodo), causando errori di correttezza silenziosi.