Questo non è stato ancora pubblicato, ma nel ramo principale di Alpakka, MongoSource.apply
accetta un parametro di tipo:
object MongoSource {
def apply[T](query: Observable[T]): Source[T, NotUsed] =
Source.fromPublisher(ObservableToPublisher(query))
}
Pertanto, con la prossima versione 0.18 di Alpakka, sarai in grado di fare quanto segue:
val source: Source[TodoMongo, NotUsed] = MongoSource[TodoMongo](todoCollection.find())
Nota che source
qui presuppone che todoCollection.find()
restituisce un Observable[TodoMongo]
; modificare i tipi secondo necessità.
Nel frattempo, puoi semplicemente aggiungere manualmente il codice sopra. Ad esempio:
package akka.stream.alpakka.mongodb.scaladsl
import akka.NotUsed
import akka.stream.alpakka.mongodb.ObservableToPublisher
import akka.stream.scaladsl.Source
import org.mongodb.scala.Observable
object MyMongoSource {
def apply[T](query: Observable[T]): Source[T, NotUsed] =
Source.fromPublisher(ObservableToPublisher(query))
}
Nota che MyMongoSource
è definito per risiedere in akka.stream.alpakka.mongodb.scaladsl
pacchetto (come MongoSource
), perché ObservableToPublisher
è una classe privata del pacchetto. Dovresti usare MyMongoSource
nello stesso modo in cui useresti MongoSource
:
val source: Source[TodoMongo, NotUsed] = MyMongoSource[TodoMongo](todoCollection.find())