Avvertimento pesante Non ho mai usato questa libreria prima e la mia conoscenza di basso livello di alcuni concetti è un po'... carente. Per lo più sto leggendo il tutorial. Sono abbastanza sicuro che chiunque abbia svolto un lavoro asincrono lo leggerà e riderà, ma potrebbe essere un utile punto di partenza per altre persone. Avvertimento!
Iniziamo con qualcosa di un po' più semplice, dimostrando come uno Stream
lavori. Possiamo convertire un iteratore di Result
s in un flusso:
extern crate futures;
use futures::Future;
use futures::stream::{self, Stream};
fn main() {
let payloads: Vec<Result<String, ()>> = vec![Ok("a".into()), Ok("b".into())];
let payloads = stream::iter(payloads.into_iter());
let foo = payloads
.and_then(|payload| futures::finished(println!("{}", payload)))
.for_each(|_| Ok(()));
foo.forget();
}
Questo ci mostra un modo per consumare il flusso. Usiamo and_then
per fare qualcosa per ogni payload (qui semplicemente stampandolo) e poi for_each
per convertire il Stream
di nuovo in un Future
. Possiamo quindi eseguire il futuro chiamando lo strano nome forget
metodo.
Il prossimo è collegare la libreria Redis al mix, gestendo un solo messaggio. Dal momento che get_message()
il metodo sta bloccando, dobbiamo introdurre alcuni thread nel mix. Non è una buona idea eseguire una grande quantità di lavoro in questo tipo di sistema asincrono poiché tutto il resto verrà bloccato. Ad esempio:
A meno che non sia diversamente disposto, è necessario assicurarsi che le implementazioni di questa funzione si concludano molto rapidamente .
In un mondo ideale, la cassa redis verrebbe costruita su una libreria come i futures ed esporrebbe tutto questo in modo nativo.
extern crate redis;
extern crate futures;
use std::thread;
use futures::Future;
use futures::stream::{self, Stream};
fn main() {
let client = redis::Client::open("redis://127.0.0.1/").expect("Unable to connect to Redis");
let mut pubsub = client.get_pubsub().expect("Unable to create pubsub handle");
pubsub.subscribe("rust").expect("Unable to subscribe to redis channel");
let (tx, payloads) = stream::channel();
let redis_thread = thread::spawn(move || {
let msg = pubsub.get_message().expect("Unable to get message");
let payload: Result<String, _> = msg.get_payload();
tx.send(payload).forget();
});
let foo = payloads
.and_then(|payload| futures::finished(println!("{}", payload)))
.for_each(|_| Ok(()));
foo.forget();
redis_thread.join().expect("unable to join to thread");
}
La mia comprensione diventa più sfocata qui. In un thread separato, blocchiamo il messaggio e lo inseriamo nel canale quando lo riceviamo. Quello che non capisco è perché dobbiamo tenerci sul manico del filo. Mi aspetterei che foo.forget
si bloccherebbe, aspettando che lo stream sia vuoto.
In una connessione telnet al server Redis, invia questo:
publish rust awesome
E vedrai che funziona. L'aggiunta di istruzioni print mostra che (per me) il foo.forget
viene eseguita prima che il thread venga generato.
I messaggi multipli sono più complicati. Il Sender
si consuma per evitare che la parte generatrice sia troppo avanti rispetto alla parte consumatrice. Ciò si ottiene restituendo un altro futuro da send
! Dobbiamo spostarlo indietro da lì per riutilizzarlo per la prossima iterazione del ciclo:
extern crate redis;
extern crate futures;
use std::thread;
use std::sync::mpsc;
use futures::Future;
use futures::stream::{self, Stream};
fn main() {
let client = redis::Client::open("redis://127.0.0.1/").expect("Unable to connect to Redis");
let mut pubsub = client.get_pubsub().expect("Unable to create pubsub handle");
pubsub.subscribe("rust").expect("Unable to subscribe to redis channel");
let (tx, payloads) = stream::channel();
let redis_thread = thread::spawn(move || {
let mut tx = tx;
while let Ok(msg) = pubsub.get_message() {
let payload: Result<String, _> = msg.get_payload();
let (next_tx_tx, next_tx_rx) = mpsc::channel();
tx.send(payload).and_then(move |new_tx| {
next_tx_tx.send(new_tx).expect("Unable to send successor channel tx");
futures::finished(())
}).forget();
tx = next_tx_rx.recv().expect("Unable to receive successor channel tx");
}
});
let foo = payloads
.and_then(|payload| futures::finished(println!("{}", payload)))
.for_each(|_| Ok(()));
foo.forget();
redis_thread.join().expect("unable to join to thread");
}
Sono sicuro che col passare del tempo ci sarà più ecosistema per questo tipo di interoperabilità. Ad esempio, la cassa di futures-cpupool potrebbe probabilmente essere esteso per supportare un caso d'uso simile a questo.