MongoDB
 sql >> Database >  >> NoSQL >> MongoDB

Streaming di dati in tempo reale con MongoDB Change Streams

Di recente, MongoDB ha rilasciato una nuova funzionalità a partire dalla versione 3.6, Change Streams. Questo ti dà l'accesso istantaneo ai tuoi dati che ti aiuta a rimanere aggiornato con le modifiche ai tuoi dati. Nel mondo di oggi, tutti vogliono le notifiche istantanee piuttosto che riceverle dopo alcune ore o minuti. Per alcune applicazioni, è fondamentale inviare notifiche in tempo reale a tutti gli utenti iscritti per ogni singolo aggiornamento. MongoDB ha reso questo processo davvero semplice introducendo questa funzionalità. In questo articolo, conosceremo il flusso di modifiche di MongoDB e le sue applicazioni con alcuni esempi.

Definizione dei flussi di cambiamento

I flussi di modifiche non sono altro che il flusso in tempo reale di eventuali modifiche che si verificano nel database o nella raccolta o anche nelle distribuzioni. Ad esempio, ogni volta che si verifica un aggiornamento (Inserisci, Aggiorna o Elimina) in una raccolta specifica, MongoDB attiva un evento di modifica con tutti i dati che sono stati modificati.

Puoi definire flussi di modifiche su qualsiasi raccolta proprio come qualsiasi altro normale operatore di aggregazione utilizzando l'operatore $changeStream e il metodo watch(). Puoi anche definire il flusso di modifiche utilizzando il metodo MongoCollection.watch().

Esempio

db.myCollection.watch()

Modifica le funzionalità degli stream

  • Filtraggio modifiche

    Puoi filtrare le modifiche per ricevere notifiche di eventi solo per alcuni dati mirati.

    Esempio:

    pipeline = [
       {
         $match: { name: "Bob" }
       } ];
    changeStream = collection.watch(pipeline);

    Questo codice ti assicurerà di ricevere aggiornamenti solo per i record il cui nome è uguale a Bob. In questo modo puoi scrivere qualsiasi pipeline per filtrare i flussi di modifiche.

  • Riprendere i flussi di modifica

    Questa funzione garantisce che non vi siano perdite di dati in caso di guasti. Ogni risposta nel flusso contiene il token di ripristino che può essere utilizzato per riavviare il flusso da un punto specifico. Per alcuni frequenti errori di rete, il driver mongodb tenterà di ristabilire la connessione con gli abbonati utilizzando il token di ripristino più recente. Sebbene, in caso di errore completo dell'applicazione, i client dovrebbero mantenere il token di ripristino per riprendere lo stream.

  • Stream di modifiche ordinati

    MongoDB utilizza un orologio logico globale per ordinare tutti gli eventi del flusso di modifiche su tutte le repliche e gli shard di qualsiasi cluster, quindi il ricevitore riceverà sempre le notifiche nello stesso ordine in cui sono stati applicati i comandi al database.

  • Eventi con documenti completi

    MongoDB restituisce la parte dei documenti corrispondenti per impostazione predefinita. Tuttavia, puoi modificare la configurazione del flusso di modifiche per ricevere un documento completo. Per farlo, passa { fullDocument:“updateLookup”} al metodo di osservazione.
    Esempio:

    collection = db.collection("myColl")
    changeStream = collection.watch({ fullDocument: “updateLookup”})
  • Durata

    I flussi di modifiche notificheranno solo i dati impegnati nella maggior parte delle repliche. Ciò assicurerà che gli eventi siano generati dai dati di persistenza della maggioranza garantendo la durabilità del messaggio.

  • Sicurezza/Controllo accessi

    I flussi di modifica sono molto sicuri. Gli utenti possono creare flussi di modifiche solo sulle raccolte per le quali dispongono delle autorizzazioni di lettura. Puoi creare flussi di modifiche in base ai ruoli utente.

Diversinines Diventa un DBA MongoDB - Portare MongoDB in produzioneScopri ciò che devi sapere per distribuire, monitorare, gestire e ridimensionare MongoDBScarica gratuitamente

Esempio di flussi di cambiamento

In questo esempio, creeremo flussi di modifiche sulla raccolta di azioni per ricevere una notifica quando il prezzo di un'azione supera qualsiasi soglia.

  • Configura il cluster

    Per utilizzare i flussi di modifiche, dobbiamo prima creare un set di repliche. Eseguire il comando seguente per creare un set di repliche a nodo singolo.

    mongod --dbpath ./data --replSet “rs”
  • Inserisci alcuni record nella raccolta Azioni

    var docs = [
     { ticker: "AAPL", price: 210 },
     { ticker: "AAPL", price: 260 },
     { ticker: "AAPL", price: 245 },
     { ticker: "AAPL", price: 255 },
     { ticker: "AAPL", price: 270 }
    ];
    db.Stocks.insert(docs)
  • Configura l'ambiente del nodo e installa le dipendenze

    mkdir mongo-proj && cd mongo-proj
    npm init -y
    npm install mongodb --save
  • Iscriviti per le modifiche

    Crea un file index.js e inserisci il codice seguente.

    const mongo = require("mongodb").MongoClient;
    mongo.connect("mongodb://localhost:27017/?replicaSet=rs0").then(client => {
     console.log("Connected to MongoDB server");
     // Select DB and Collection
     const db = client.db("mydb");
     const collection = db.collection("Stocks");
     pipeline = [
       {
         $match: { "fullDocument.price": { $gte: 250 } }
       }
     ];
     // Define change stream
     const changeStream = collection.watch(pipeline);
     // start listen to changes
     changeStream.on("change", function(event) {
       console.log(JSON.stringify(event));
     });
    });

    Ora esegui questo file:

    node index.js
  • Inserisci un nuovo record in db per ricevere un aggiornamento

    db.Stocks.insert({ ticker: “AAPL”, price: 280 })

    Ora controlla la tua console, riceverai un aggiornamento da MongoDB.
    Esempio di risposta:

    {
    "_id":{
    "_data":"825C5D51F70000000129295A1004E83608EE8F1B4FBABDCEE73D5BF31FC946645F696400645C5D51F73ACA83479B48DE6E0004"},
    "operationType":"insert",
    "clusterTime":"6655565945622233089",
    "fullDocument":{
    "_id":"5c5d51f73aca83479b48de6e",
    "ticker":"AAPL",
    "Price":300
    },
    "ns":{"db":"mydb","coll":"Stocks"},
    "documentKey":{"_id":"5c5d51f73aca83479b48de6e"}
    }

Qui puoi modificare il valore del parametro operationType con le seguenti operazioni per ascoltare diversi tipi di modifiche in una raccolta:

  • Inserisci
  • Sostituisci (tranne ID univoco)
  • Aggiorna
  • Elimina
  • Invalida (ogni volta che Mongo restituisce un cursore non valido)

Altre modalità di modifica dei flussi

È possibile avviare i flussi di modifica rispetto a un database e alla distribuzione allo stesso modo della raccolta. Questa funzione è stata rilasciata da MongoDB versione 4.0. Di seguito sono riportati i comandi per aprire un flusso di modifiche rispetto al database e alle distribuzioni.

Against DB: db.watch()
Against deployment: Mongo.watch()

Conclusione

MongoDB Change Streams semplifica l'integrazione tra frontend e backend in tempo reale e senza interruzioni. Questa funzione può aiutarti a utilizzare MongoDB per il modello pubsub in modo da non dover più gestire le distribuzioni Kafka o RabbitMQ. Se la tua applicazione richiede informazioni in tempo reale, devi controllare questa funzionalità di MongoDB. Spero che questo post ti aiuti a iniziare con i flussi di modifiche di MongoDB.