Ho risolto il mio problema. Il motivo dei conteggi incoerenti era il MongoDefaultPartitioner che avvolge MongoSamplePartitioner che utilizza il campionamento casuale. Ad essere onesti, questo è un default piuttosto strano per quanto mi riguarda. Personalmente preferirei invece avere un partizionatore lento ma coerente. I dettagli per le opzioni del partizionatore sono disponibili nelle opzioni di configurazione documentazione.
codice:
val df = spark.read
.format("com.mongodb.spark.sql.DefaultSource")
.option("uri", "mongodb://127.0.0.1/enron_mail.messages")
.option("partitioner", "spark.mongodb.input.partitionerOptions.MongoPaginateBySizePartitioner ")
.load()