MongoDB
 sql >> Database >  >> NoSQL >> MongoDB

Importa CSV utilizzando lo schema Mongoose

Puoi farlo con fast-csv ottenendo le headers dalla definizione dello schema che restituirà le righe analizzate come "oggetti". In realtà hai delle discrepanze, quindi le ho contrassegnate con le correzioni:

const fs = require('mz/fs');
const csv = require('fast-csv');

const { Schema } = mongoose = require('mongoose');

const uri = 'mongodb://localhost/test';

mongoose.Promise = global.Promise;
mongoose.set('debug', true);

const rankSchema = new Schema({
  serverid: Number,
  resetid: Number,
  rank: Number,
  name: String,
  land: String,         // <-- You have this as Number but it's a string
  networth: Number,
  tag: String,
  stuff: String,        // the empty field in the csv
  gov: String,
  gdi: Number,
  protection: Number,
  vacation: Number,
  alive: Number,
  deleted: Number
});

const Rank = mongoose.model('Rank', rankSchema);

const log = data => console.log(JSON.stringify(data, undefined, 2));

(async function() {

  try {
    const conn = await mongoose.connect(uri);

    await Promise.all(Object.entries(conn.models).map(([k,m]) => m.remove()));

    let headers = Object.keys(Rank.schema.paths)
      .filter(k => ['_id','__v'].indexOf(k) === -1);

    console.log(headers);

    await new Promise((resolve,reject) => {

      let buffer = [],
          counter = 0;

      let stream = fs.createReadStream('input.csv')
        .pipe(csv({ headers }))
        .on("error", reject)
        .on("data", async doc => {
          stream.pause();
          buffer.push(doc);
          counter++;
          log(doc);
          try {
            if ( counter > 10000 ) {
              await Rank.insertMany(buffer);
              buffer = [];
              counter = 0;
            }
          } catch(e) {
            stream.destroy(e);
          }

          stream.resume();

        })
        .on("end", async () => {
          try {
            if ( counter > 0 ) {
              await Rank.insertMany(buffer);
              buffer = [];
              counter = 0;
              resolve();
            }
          } catch(e) {
            stream.destroy(e);
          }
        });

    });


  } catch(e) {
    console.error(e)
  } finally {
    process.exit()
  }


})()

Finché lo schema è effettivamente allineato al CSV fornito, va bene. Queste sono le correzioni che posso vedere, ma se hai bisogno che i nomi dei campi effettivi siano allineati in modo diverso, devi adattarli. Ma c'era fondamentalmente un Number nella posizione in cui è presente una String ed essenzialmente un campo aggiuntivo, che presumo sia quello vuoto nel CSV.

Le cose generali sono ottenere l'array di nomi di campo dallo schema e passarlo nelle opzioni quando si crea l'istanza del parser csv:

let headers = Object.keys(Rank.schema.paths)
  .filter(k => ['_id','__v'].indexOf(k) === -1);

let stream = fs.createReadStream('input.csv')
  .pipe(csv({ headers }))

Una volta che lo fai effettivamente, ottieni un "Oggetto" invece di un array:

{
  "serverid": "9",
  "resetid": "1557",
  "rank": "358",
  "name": "286",
  "land": "Mutantville",
  "networth": "4368",
  "tag": "2358026",
  "stuff": "",
  "gov": "M",
  "gdi": "0",
  "protection": "0",
  "vacation": "0",
  "alive": "1",
  "deleted": "0"
}

Non preoccuparti dei "tipi" perché Mongoose eseguirà il cast dei valori in base allo schema.

Il resto avviene all'interno del gestore per i data evento. Per la massima efficienza stiamo usando insertMany() scrivere nel database solo una volta ogni 10.000 righe. Il modo in cui effettivamente va al server e ai processi dipende dalla versione di MongoDB, ma 10.000 dovrebbero essere abbastanza ragionevoli in base al numero medio di campi che importeresti per una singola raccolta in termini di "compromesso" per l'utilizzo della memoria e la scrittura di un richiesta di rete ragionevole. Riduci il numero se necessario.

La parte importante è contrassegnare queste chiamate come async funzioni e await il risultato di insertMany() prima di continuare. Inoltre abbiamo bisogno di pause() lo stream e resume() su ogni articolo altrimenti si corre il rischio di sovrascrivere il buffer di documenti da inserire prima che vengano effettivamente inviati. Il pause() e resume() sono necessari per mettere "contropressione" sul tubo, altrimenti gli oggetti continuano semplicemente a "uscire" e sparare i data evento.

Naturalmente il controllo per le 10.000 voci richiede di verificarlo sia ad ogni iterazione che al completamento dello stream in modo da svuotare il buffer e inviare eventuali documenti rimanenti al server.

Questo è davvero quello che vuoi fare, dato che di certo non vuoi inviare una richiesta asincrona al server sia su "ogni" iterazione attraverso i data evento o essenzialmente senza attendere il completamento di ciascuna richiesta. Riuscirai a non controllarlo per "file molto piccoli", ma per qualsiasi carico del mondo reale supererai sicuramente lo stack di chiamate a causa delle chiamate asincrone "in volo" che non sono ancora state completate.

Cordiali saluti:un package.json Usato. Il mz è facoltativo in quanto è solo una Promise modernizzata libreria abilitata di librerie "integrate" del nodo standard che sono semplicemente abituato a usare. Il codice è ovviamente completamente intercambiabile con il fs modulo.

{
  "description": "",
  "main": "index.js",
  "dependencies": {
    "fast-csv": "^2.4.1",
    "mongoose": "^5.1.1",
    "mz": "^2.7.0"
  },
  "keywords": [],
  "author": "",
  "license": "ISC"
}

In realtà, con Node v8.9.xe versioni successive, possiamo anche semplificare tutto questo con un'implementazione di AsyncIterator attraverso lo stream-to-iterator modulo. È ancora in Iterator<Promise<T>> modalità, ma dovrebbe funzionare fino a quando il nodo v10.x non diventa LTS stabile:

const fs = require('mz/fs');
const csv = require('fast-csv');
const streamToIterator = require('stream-to-iterator');

const { Schema } = mongoose = require('mongoose');

const uri = 'mongodb://localhost/test';

mongoose.Promise = global.Promise;
mongoose.set('debug', true);

const rankSchema = new Schema({
  serverid: Number,
  resetid: Number,
  rank: Number,
  name: String,
  land: String,
  networth: Number,
  tag: String,
  stuff: String,        // the empty field
  gov: String,
  gdi: Number,
  protection: Number,
  vacation: Number,
  alive: Number,
  deleted: Number
});

const Rank = mongoose.model('Rank', rankSchema);

const log = data => console.log(JSON.stringify(data, undefined, 2));

(async function() {

  try {
    const conn = await mongoose.connect(uri);

    await Promise.all(Object.entries(conn.models).map(([k,m]) => m.remove()));

    let headers = Object.keys(Rank.schema.paths)
      .filter(k => ['_id','__v'].indexOf(k) === -1);

    //console.log(headers);

    let stream = fs.createReadStream('input.csv')
      .pipe(csv({ headers }));

    const iterator = await streamToIterator(stream).init();

    let buffer = [],
        counter = 0;

    for ( let docPromise of iterator ) {
      let doc = await docPromise;
      buffer.push(doc);
      counter++;

      if ( counter > 10000 ) {
        await Rank.insertMany(buffer);
        buffer = [];
        counter = 0;
      }
    }

    if ( counter > 0 ) {
      await Rank.insertMany(buffer);
      buffer = [];
      counter = 0;
    }

  } catch(e) {
    console.error(e)
  } finally {
    process.exit()
  }

})()

Fondamentalmente, tutta la gestione degli "eventi" dello stream, la messa in pausa e la ripresa vengono sostituiti da un semplice for ciclo:

const iterator = await streamToIterator(stream).init();

for ( let docPromise of iterator ) {
  let doc = await docPromise;
  // ... The things in the loop
}

Facile! Questo viene ripulito nella successiva implementazione del nodo con for..await..of quando diventa più stabile. Ma quanto sopra funziona bene dalla versione specificata e superiore.