MongoDB
 sql >> Database >  >> NoSQL >> MongoDB

Salva un CSV molto grande su mongoDB usando mongoose

Benvenuto in streaming. Quello che vuoi veramente è un "stream con eventi" che elabori il tuo input "un pezzo alla volta" e, naturalmente, idealmente da un delimitatore comune come il carattere "newline" che stai attualmente utilizzando.

Per cose davvero efficienti, puoi aggiungere l'utilizzo di MongoDB "Bulk API" inserti per rendere il caricamento il più veloce possibile senza consumare tutta la memoria della macchina o i cicli della CPU.

Non sostenendo in quanto sono disponibili varie soluzioni, ma ecco un elenco che utilizza la line- pacchetto flusso di input per rendere semplice la parte "terminatore di linea".

Definizioni dello schema solo per "esempio":

var LineInputStream = require("line-input-stream"),
    fs = require("fs"),
    async = require("async"),
    mongoose = require("mongoose"),
    Schema = mongoose.Schema;

var entrySchema = new Schema({},{ strict: false })

var Entry = mongoose.model( "Schema", entrySchema );

var stream = LineInputStream(fs.createReadStream("data.txt",{ flags: "r" }));

stream.setDelimiter("\n");

mongoose.connection.on("open",function(err,conn) { 

    // lower level method, needs connection
    var bulk = Entry.collection.initializeOrderedBulkOp();
    var counter = 0;

    stream.on("error",function(err) {
        console.log(err); // or otherwise deal with it
    });

    stream.on("line",function(line) {

        async.series(
            [
                function(callback) {
                    var row = line.split(",");     // split the lines on delimiter
                    var obj = {};             
                    // other manipulation

                    bulk.insert(obj);  // Bulk is okay if you don't need schema
                                       // defaults. Or can just set them.

                    counter++;

                    if ( counter % 1000 == 0 ) {
                        stream.pause();
                        bulk.execute(function(err,result) {
                            if (err) callback(err);
                            // possibly do something with result
                            bulk = Entry.collection.initializeOrderedBulkOp();
                            stream.resume();
                            callback();
                        });
                    } else {
                        callback();
                    }
               }
           ],
           function (err) {
               // each iteration is done
           }
       );

    });

    stream.on("end",function() {

        if ( counter % 1000 != 0 )
            bulk.execute(function(err,result) {
                if (err) throw err;   // or something
                // maybe look at result
            });
    });

});

Quindi generalmente l'interfaccia "stream" "scompone l'input" per elaborare "una riga alla volta". Ciò ti impedisce di caricare tutto in una volta.

Le parti principali sono "Bulk Operations API" da MongoDB. Ciò ti consente di "accodare" molte operazioni alla volta prima di inviarle effettivamente al server. Quindi in questo caso con l'uso di un "modulo", le scritture vengono inviate solo per 1000 voci elaborate. Puoi davvero fare qualsiasi cosa fino al limite di 16 MB BSON, ma mantienilo gestibile.

Oltre alle operazioni che vengono elaborate in blocco, è disponibile un "limitatore" aggiuntivo da async biblioteca. Non è realmente necessario, ma ciò garantisce che essenzialmente non sia in elaborazione più del "limite di modulo" dei documenti in qualsiasi momento. Gli "inserimenti" batch generali non hanno costi di IO diversi dalla memoria, ma le chiamate "execute" indicano che l'IO è in elaborazione. Quindi aspettiamo piuttosto che fare la fila per più cose.

Ci sono sicuramente soluzioni migliori che puoi trovare per i dati di tipo CSV di "elaborazione del flusso" che sembra essere. Ma in generale questo ti dà i concetti su come farlo in modo efficiente in termini di memoria senza consumare anche i cicli della CPU.