Redis
 sql >> Database >  >> NoSQL >> Redis

Come implementare un flusso di future per una chiamata di blocco utilizzando futures.rs e Redis PubSub?

Avvertimento pesante Non ho mai usato questa libreria prima e la mia conoscenza di basso livello di alcuni concetti è un po'... carente. Per lo più sto leggendo il tutorial. Sono abbastanza sicuro che chiunque abbia svolto un lavoro asincrono lo leggerà e riderà, ma potrebbe essere un utile punto di partenza per altre persone. Avvertimento!

Iniziamo con qualcosa di un po' più semplice, dimostrando come uno Stream lavori. Possiamo convertire un iteratore di Result s in un flusso:

extern crate futures;

use futures::Future;
use futures::stream::{self, Stream};

fn main() {
    let payloads: Vec<Result<String, ()>> = vec![Ok("a".into()), Ok("b".into())];
    let payloads = stream::iter(payloads.into_iter());

    let foo = payloads
        .and_then(|payload| futures::finished(println!("{}", payload)))
        .for_each(|_| Ok(()));

    foo.forget();
}

Questo ci mostra un modo per consumare il flusso. Usiamo and_then per fare qualcosa per ogni payload (qui semplicemente stampandolo) e poi for_each per convertire il Stream di nuovo in un Future . Possiamo quindi eseguire il futuro chiamando lo strano nome forget metodo.

Il prossimo è collegare la libreria Redis al mix, gestendo un solo messaggio. Dal momento che get_message() il metodo sta bloccando, dobbiamo introdurre alcuni thread nel mix. Non è una buona idea eseguire una grande quantità di lavoro in questo tipo di sistema asincrono poiché tutto il resto verrà bloccato. Ad esempio:

A meno che non sia diversamente disposto, è necessario assicurarsi che le implementazioni di questa funzione si concludano molto rapidamente .

In un mondo ideale, la cassa redis verrebbe costruita su una libreria come i futures ed esporrebbe tutto questo in modo nativo.

extern crate redis;
extern crate futures;

use std::thread;
use futures::Future;
use futures::stream::{self, Stream};

fn main() {
    let client = redis::Client::open("redis://127.0.0.1/").expect("Unable to connect to Redis");

    let mut pubsub = client.get_pubsub().expect("Unable to create pubsub handle");
    pubsub.subscribe("rust").expect("Unable to subscribe to redis channel");

    let (tx, payloads) = stream::channel();

    let redis_thread = thread::spawn(move || {
        let msg = pubsub.get_message().expect("Unable to get message");
        let payload: Result<String, _> = msg.get_payload();
        tx.send(payload).forget();
    });

    let foo = payloads
        .and_then(|payload| futures::finished(println!("{}", payload)))
        .for_each(|_| Ok(()));

    foo.forget();
    redis_thread.join().expect("unable to join to thread");
}

La mia comprensione diventa più sfocata qui. In un thread separato, blocchiamo il messaggio e lo inseriamo nel canale quando lo riceviamo. Quello che non capisco è perché dobbiamo tenerci sul manico del filo. Mi aspetterei che foo.forget si bloccherebbe, aspettando che lo stream sia vuoto.

In una connessione telnet al server Redis, invia questo:

publish rust awesome

E vedrai che funziona. L'aggiunta di istruzioni print mostra che (per me) il foo.forget viene eseguita prima che il thread venga generato.

I messaggi multipli sono più complicati. Il Sender si consuma per evitare che la parte generatrice sia troppo avanti rispetto alla parte consumatrice. Ciò si ottiene restituendo un altro futuro da send ! Dobbiamo spostarlo indietro da lì per riutilizzarlo per la prossima iterazione del ciclo:

extern crate redis;
extern crate futures;

use std::thread;
use std::sync::mpsc;

use futures::Future;
use futures::stream::{self, Stream};

fn main() {
    let client = redis::Client::open("redis://127.0.0.1/").expect("Unable to connect to Redis");

    let mut pubsub = client.get_pubsub().expect("Unable to create pubsub handle");
    pubsub.subscribe("rust").expect("Unable to subscribe to redis channel");

    let (tx, payloads) = stream::channel();

    let redis_thread = thread::spawn(move || {
        let mut tx = tx;

        while let Ok(msg) = pubsub.get_message() {
            let payload: Result<String, _> = msg.get_payload();

            let (next_tx_tx, next_tx_rx) = mpsc::channel();

            tx.send(payload).and_then(move |new_tx| {
                next_tx_tx.send(new_tx).expect("Unable to send successor channel tx");
                futures::finished(())
            }).forget();

            tx = next_tx_rx.recv().expect("Unable to receive successor channel tx");
        }
    });

    let foo = payloads
        .and_then(|payload| futures::finished(println!("{}", payload)))
        .for_each(|_| Ok(()));

    foo.forget();
    redis_thread.join().expect("unable to join to thread");
}

Sono sicuro che col passare del tempo ci sarà più ecosistema per questo tipo di interoperabilità. Ad esempio, la cassa di futures-cpupool potrebbe probabilmente essere esteso per supportare un caso d'uso simile a questo.