Le operazioni di aggregazione in MongoDB consentono di elaborare i record di dati, raggrupparli e restituire i risultati calcolati. MongoDB supporta tre tipi di operazioni di aggregazione:
- Comandi di aggregazione a scopo unico
- Riduci mappa
- Condotto di aggregazione
Puoi utilizzare questo documento di confronto MongoDB per vedere quale si adatta alle tue esigenze.
Conduttura di aggregazione
La pipeline di aggregazione è un framework MongoDB che fornisce l'aggregazione dei dati tramite una pipeline di elaborazione dati. Cioè, i documenti vengono inviati attraverso una pipeline in più fasi, filtrando, raggruppando e trasformando in altro modo i documenti in ogni fase. Fornisce SQL "GRUPPO PER ..." tipo di costrutti per MongoDB che vengono eseguiti sul database stesso. La documentazione di aggregazione fornisce utili esempi di creazione di tali pipeline.
Perché eseguire aggregazioni sul secondario?
Le pipeline di aggregazione sono operazioni ad alta intensità di risorse:ha senso scaricare i lavori di aggregazione sui secondari di un set di repliche MongoDB quando è possibile operare su dati leggermente obsoleti. Questo è in genere vero per le operazioni "batch" poiché non si aspettano di essere eseguite sui dati più recenti. Se l'output deve essere scritto in una raccolta, i lavori di aggregazione vengono eseguiti solo sul primario poiché solo il primario è scrivibile in MongoDB.
In questo post, ti mostreremo come garantire che le pipeline di aggregazione vengano eseguite sul secondario sia dalla shell mongo che da Java.
Esegui pipeline di aggregazione sul secondario da Mongo Shell e Java in MongoDBFai clic per twittareNota:utilizziamo il set di dati di esempio fornito da MongoDB nel loro esempio di aggregazione di codici postali per mostrare i nostri esempi. Puoi scaricarlo come indicato nell'esempio.
Pipeline di aggregazione sui set di repliche
Shell MongoDB
Impostazione della preferenza di lettura su secondaria fa il trucco quando si esegue un lavoro di aggregazione dalla shell mongo. Proviamo a recuperare tutti gli stati con popolazione superiore a 10 milioni (1a aggregazione nell'esempio dei codici postali). Sia la shell che il server eseguono MongoDB versione 3.2.10.
mongo -u admin -p <pwd> --authenticationDatabase admin --host RS-repl0-0/server-1.servers.example.com:27017,server-2.servers.example.com:27017 RS-repl0-0:PRIMARY> use test switched to db test RS-repl0-0:PRIMARY> db.setSlaveOk() // Ok to run commands on a slave RS-repl0-0:PRIMARY> db.getMongo().setReadPref('secondary') // Set read pref RS-repl0-0:PRIMARY> db.getMongo().getReadPrefMode() secondary RS-repl0-0:PRIMARY> db.zips.aggregate( [ ... { $group: { _id: "$state", totalPop: { $sum: "$pop" } } }, ... { $match: { totalPop: { $gte: 10*1000*1000 } } } ... ] ) { "_id" : "CA", "totalPop" : 29754890 } { "_id" : "FL", "totalPop" : 12686644 } { "_id" : "PA", "totalPop" : 11881643 } { "_id" : "NY", "totalPop" : 17990402 } { "_id" : "OH", "totalPop" : 10846517 } { "_id" : "IL", "totalPop" : 11427576 } { "_id" : "TX", "totalPop" : 16984601 }
Uno sguardo ai log di MongoDB (con la registrazione abilitata per i comandi) sul secondario mostra che l'aggregazione è stata effettivamente eseguita sul secondario:
... 2016-12-05T06:20:14.783+0000 I COMMAND [conn200] command test.zips command: aggregate { aggregate: "zips", pipeline: [ { $group: { _id: "$state", totalPop: { $sum: "$pop" } } }, { $match: { totalPop: { $gte: 10000000.0 } } } ], cursor: {} } keyUpdates:0 writeConflicts:0 numYields:229 reslen:338 locks:{ Global: { acquireCount: { r: 466 } }, Database: { acquire Count: { r: 233 } }, Collection: { acquireCount: { r: 233 } } } protocol:op_command 49ms ...
Java
Dal driver Java MongoDB, impostare ancora una volta la preferenza di lettura fa il trucco. Ecco un esempio che utilizza la versione del driver 3.2.2:
public class AggregationChecker { /* * Data and code inspired from: * https://docs.mongodb.com/v3.2/tutorial/aggregation-zip-code-data-set/#return-states-with-populations-above-10-million */ private static final String MONGO_END_POINT = "mongodb://admin:[email protected]:27017,server-2.servers.example.com:27017/admin?replicaSet=RS-repl0-0"; private static final String COL_NAME = "zips"; private static final String DEF_DB = "test"; public AggregationChecker() { } public static void main(String[] args) { AggregationChecker writer = new AggregationChecker(); writer.aggregationJob(); } private void aggregationJob() { printer("Initializing..."); Builder options = MongoClientOptions.builder().readPreference(ReadPreference.secondary()); MongoClientURI uri = new MongoClientURI(MONGO_END_POINT, options); MongoClient client = new MongoClient(uri); try { final DB db = client.getDB(DEF_DB); final DBCollection coll = db.getCollection(COL_NAME); // Avg city pop by state: https://docs.mongodb.com/manual/tutorial/aggregation-zip-code-data-set/#return-average-city-population-by-state Iterable iterable = coll.aggregate( Arrays.asList( new BasicDBObject("$group", new BasicDBObject("_id", new BasicDBObject("state", "$state").append("city", "$city")).append("pop", new BasicDBObject("$sum", "$pop"))), new BasicDBObject("$group", new BasicDBObject("_id", "$_id.state").append("avgCityPop", new BasicDBObject("$avg", "$pop"))))).results(); for (DBObject entry : iterable) { printer(entry.toString()); } } finally { client.close(); } printer("Done..."); } ... }
Accede al secondario:
... 2016-12-01T10:54:18.667+0000 I COMMAND [conn4113] command test.zips command: aggregate { aggregate: "zipcodes", pipeline: [ { $group: { _id: { state: "$state", city: "$city" }, pop: { $sum: "$pop" } } }, { $group: { _id: "$_id.state", avgCityPop: { $avg: "$pop" } } } ] } keyUpdates:0 writeConflicts:0 numYields:229 reslen:2149 locks:{ Global: { acquireCount: { r: 466 } }, Database: { acquireCount: { r: 233 } }, Collection: { acquireCount: { r: 233 } } } protocol:op_query 103ms ...
Nessuna operazione è stata registrata sul primario.
Pipeline di aggregazione su cluster partizionati
Le pipeline di aggregazione sono supportate nei cluster partizionati. Il comportamento dettagliato è spiegato nella documentazione. Per quanto riguarda l'implementazione, c'è poca differenza tra il set di repliche e il cluster partizionato quando si utilizza una pipeline di aggregazione.
Come impostare una pipeline di aggregazione su cluster partizionati in MongoDBClick To TweetShell MongoDB
Prima di importare i dati nel cluster partizionato, abilita il partizionamento orizzontale nella raccolta.
mongos> sh.enableSharding("test") mongos> sh.shardCollection("test.zips", { "_id" : "hashed" } )
Dopodiché, le operazioni sono le stesse del set di repliche:
mongos> db.setSlaveOk() mongos> db.getMongo().setReadPref('secondary') mongos> db.getMongo().getReadPrefMode() secondary mongos> db.zips.aggregate( [ ... { $group: { _id: "$state", totalPop: { $sum: "$pop" } } }, ... { $match: { totalPop: { $gte: 10*1000*1000 } } } ... ] ) { "_id" : "TX", "totalPop" : 16984601 } { "_id" : "PA", "totalPop" : 11881643 } { "_id" : "CA", "totalPop" : 29754890 } { "_id" : "FL", "totalPop" : 12686644 } { "_id" : "NY", "totalPop" : 17990402 } { "_id" : "OH", "totalPop" : 10846517 } { "_id" : "IL", "totalPop" : 11427576 }
Registri da uno dei secondari:
... 2016-12-02T05:46:24.627+0000 I COMMAND [conn242] command test.zips command: aggregate { aggregate: "zips", pipeline: [ { $group: { _id: "$state", totalPop: { $sum: "$pop" } } } ], fromRouter: true, cursor: { batchSize: 0 } } cursorid:44258973083 keyUpdates:0 writeConflicts:0 numYields:0 reslen:115 locks:{ Global: { acquireCount: { r: 4 } }, Database: { acquireCount: { r: 2 } }, Collection: { acquireCount: { r: 2 } } } protocol:op_query 0ms 2016-12-02T05:46:24.641+0000 I COMMAND [conn131] getmore test.zips query: { aggregate: "zips", pipeline: [ { $group: { _id: "$state", totalPop: { $sum: "$pop" } } } ], fromRouter: true, cursor: { batchSize: 0 } } planSummary: PIPELINE_PROXY cursorid:44258973083 ntoreturn:0 keysExamined:0 docsExamined:0 cursorExhausted:1 keyUpdates:0 writeConflicts:0 numYields:112 nreturned:51 reslen:1601 locks:{ Global: { acquireCount: { r: 230 } }, Database: { acquireCount: { r: 115 } }, Collection: { acquireCount: { r: 115 } } } 13ms ...
Java
Lo stesso codice applicabile nel set di repliche funziona correttamente con un cluster partizionato. Basta sostituire la stringa di connessione del set di repliche con quella del cluster partizionato. I registri di un secondario indicano che il lavoro è stato effettivamente eseguito sui secondari:
... 2016-12-02T05:39:12.339+0000 I COMMAND [conn130] command test.zips command: aggregate { aggregate: "zips", pipeline: [ { $group: { _id: { state: "$state", city: "$city" }, pop: { $sum: "$pop" } } } ], fromRouter: true, cursor: { batchSize: 0 } } cursorid:44228970872 keyUpdates:0 writeConflicts:0 numYields:0 reslen:115 locks:{ Global: { acquireCount: { r: 4 } }, Database: { acquireCount: { r: 2 } }, Collection: { acquireCount: { r: 2 } } } protocol:op_query 0ms 2016-12-02T05:39:12.371+0000 I COMMAND [conn131] getmore test.zips query: { aggregate: "zips", pipeline: [ { $group: { _id: { state: "$state", city: "$city" }, pop: { $sum: "$pop" } } } ], fromRouter: true, cursor: { batchSize: 0 } } planSummary: PIPELINE_PROXY cursorid:44228970872 ntoreturn:0 keysExamined:0 docsExamined:0 cursorExhausted:1 keyUpdates:0 writeConflicts:0 numYields:112 nreturned:12902 reslen:741403 locks:{ Global: { acquireCount: { r: 230 } }, Database: { acquireCount: { r: 115 } }, Collection: { acquireCount: { r: 115 } } } 30ms ...
Questo contenuto è stato utile? Fatecelo sapere twittandoci @scaledgridio e come sempre, se avete domande fatecelo sapere nei commenti qui sotto. Oh e! Non dimenticare di dare un'occhiata ai nostri prodotti di hosting MongoDB che possono risparmiare fino al 40% sui costi di hosting MongoDB® a lungo termine.