Map-reduce è forse la più versatile delle operazioni di aggregazione supportate da MongoDB.
Map-Reduce è un popolare modello di programmazione originato da Google per l'elaborazione e l'aggregazione di grandi volumi di dati in parallelo. Una discussione dettagliata su Map-Reduce non rientra nell'ambito di questo articolo, ma essenzialmente è un processo di aggregazione in più fasi. I due passaggi più importanti sono la fase della mappa (elabora ogni documento ed emetti i risultati) e la fase di riduzione (raccogli i risultati emessi durante la fase della mappa).
MongoDB supporta tre tipi di operazioni di aggregazione:Map-Reduce, pipeline di aggregazione e comandi di aggregazione a scopo singolo. Puoi utilizzare questo documento di confronto MongoDB per vedere quale si adatta alle tue esigenze.https://scalegrid.io/blog/mongodb-performance-running-mongodb-map-reduce-operations-on-secondaries/
Nel mio ultimo post, abbiamo visto, con esempi, come eseguire pipeline di aggregazione sui secondari. In questo post, analizzeremo l'esecuzione dei lavori Map-Reduce sulle repliche secondarie di MongoDB.
Riduci mappa MongoDB
MongoDB supporta l'esecuzione di lavori Map-Reduce sui server di database. Ciò offre la flessibilità di scrivere attività di aggregazione complesse che non sono facilmente eseguibili tramite pipeline di aggregazione. MongoDB ti consente di scrivere mappe personalizzate e ridurre funzioni in Javascript che possono essere passate al database tramite Mongo shell o qualsiasi altro client. Su set di dati di grandi dimensioni e in costante crescita, si può anche considerare l'esecuzione di processi Map-Reduce incrementali per evitare di elaborare ogni volta dati meno recenti.
Storicamente, i metodi map e reduce venivano eseguiti in un contesto a thread singolo. Tuttavia, tale limitazione è stata rimossa nella versione 2.4.
Perché eseguire i lavori Map-Reduce sul secondario?
Come altri lavori di aggregazione, anche Map-Reduce è un lavoro "batch" ad alta intensità di risorse, quindi è adatto per l'esecuzione su repliche di sola lettura. Le avvertenze in tal senso sono:
1) Dovrebbe essere corretto utilizzare dati leggermente obsoleti. Oppure puoi modificare il problema di scrittura per assicurarti che le repliche siano sempre sincronizzate con il primario. Questa seconda opzione presuppone che un colpo sulle prestazioni di scrittura sia accettabile.
2) L'output del lavoro Map-Reduce non deve essere scritto in un'altra raccolta all'interno del database, ma piuttosto essere restituito all'applicazione (cioè nessuna scrittura sul database).
Diamo un'occhiata a come farlo tramite esempi, sia dalla shell mongo che dal driver Java.
Riduzione mappa sui set di repliche
Set di dati
A scopo illustrativo, utilizzeremo un set di dati piuttosto semplice:un dump giornaliero del record di transazione da un rivenditore. Una voce di esempio è simile a:
RS-replica-0:PRIMARY> use test switched to db test RS-replica-0:PRIMARY> show tables txns RS-replica-0:PRIMARY> db.txns.findOne() { "_id" : ObjectId("584a3b71cdc1cb061957289b"), "custid" : "cust_66", "txnval" : 100, "items" : [{"sku": sku1", "qty": 1, "pr": 100}, ...], ... }
Nei nostri esempi, calcoleremo la spesa totale di un determinato cliente in quel giorno. Pertanto, dato il nostro schema, i metodi map e reduce saranno simili a:
var mapFunction = function() { emit(this.custid, this.txnval); } // Emit the custid and txn value from each record var reduceFunction = function(key, values) { return Array.sum(values); } // Sum all the txn values for a given custid
Con il nostro schema stabilito, diamo un'occhiata a Map-Reduce in azione.
Shell MongoDB
Per garantire che un lavoro Map-Reduce venga eseguito sul secondario, la preferenza di lettura deve essere impostata su secondaria . Come abbiamo detto sopra, affinché un Map-Reduce venga eseguito su un secondario, l'output del risultato deve essere inline (In effetti, questo è l'unico valore in uscita consentito sui secondari). Vediamo come funziona.
$ mongo -u admin -p pwd --authenticationDatabase admin --host RS-replica-0/server-1.servers.example.com:27017,server-2.servers.example.com:27017 MongoDB shell version: 3.2.10 connecting to: RS-replica-0/server-1.servers.example.com:27017,server-2.servers.example.com:27017/test 2016-12-09T08:15:19.347+0000 I NETWORK [thread1] Starting new replica set monitor for server-1.servers.example.com:27017,server-2.servers.example.com:27017 2016-12-09T08:15:19.349+0000 I NETWORK [ReplicaSetMonitorWatcher] starting RS-replica-0:PRIMARY> db.setSlaveOk() RS-replica-0:PRIMARY> db.getMongo().setReadPref('secondary') RS-replica-0:PRIMARY> db.getMongo().getReadPrefMode() secondary RS-replica-0:PRIMARY> var mapFunc = function() { emit(this.custid, this.txnval); } RS-replica-0:PRIMARY> var reduceFunc = function(key, values) { return Array.sum(values); } RS-replica-0:PRIMARY> db.txns.mapReduce(mapFunc, reduceFunc, {out: { inline: 1 }}) { "results" : [ { "_id" : "cust_0", "value" : 72734 }, { "_id" : "cust_1", "value" : 67737 }, ... ] "timeMillis" : 215, "counts" : { "input" : 10000, "emit" : 10000, "reduce" : 909, "output" : 101 }, "ok" : 1 }
Uno sguardo ai log sul secondario conferma che il lavoro è stato effettivamente eseguito sul secondario.
... 2016-12-09T08:17:24.842+0000 D COMMAND [conn344] mr ns: test.txns 2016-12-09T08:17:24.843+0000 I COMMAND [conn344] command test.$cmd command: listCollections { listCollections: 1, filter: { name: "txns" }, cursor: {} } keyUpdates:0 writeConflicts:0 numYields:0 reslen:150 locks:{ Global: { acquireCount: { r: 4 } }, Database: { acquireCount: { r: 1, R: 1 } }, Collection: { acquireCount: { r: 1 } } } protocol:op_query 0ms 2016-12-09T08:17:24.865+0000 I COMMAND [conn344] query test.system.js planSummary: EOF ntoreturn:0 ntoskip:0 keysExamined:0 docsExamined:0 cursorExhausted:1 keyUpdates:0 writeConflicts:0 numYields:0 nreturned:0 reslen:20 locks:{ Global: { acquireCount: { r: 6 } }, Database: { acquireCount: { r: 2, R: 1 } }, Collection: { acquireCount: { r: 2 } } } 0ms 2016-12-09T08:17:25.063+0000 I COMMAND [conn344] command test.txns command: mapReduce { mapreduce: "txns", map: function () { emit(this.custid, this.txnval); }, reduce: function (key, values) { return Array.sum(values); }, out: { inline: 1.0 } } planSummary: COUNT keyUpdates:0 writeConflicts:0 numYields:78 reslen:4233 locks:{ Global: { acquireCount: { r: 366 } }, Database: { acquireCount: { r: 3, R: 180 } }, Collection: { acquireCount: { r: 3 } } } protocol:op_command 220ms ...
Java
Ora proviamo a eseguire un lavoro Map-Reduce sulle repliche di lettura da un'applicazione Java. Sul driver Java MongoDB, l'impostazione della preferenza di lettura fa il trucco. L'output è inline per impostazione predefinita, quindi non è necessario passare parametri aggiuntivi. Ecco un esempio che utilizza la versione del driver 3.2.2:
public class MapReduceExample { private static final String MONGO_END_POINT = "mongodb://admin:[email protected]:27017,server-2.servers.example.com:27017/admin?replicaSet=RS-replica-0"; private static final String COL_NAME = "txns"; private static final String DEF_DB = "test"; public MapReduceExample() { } public static void main(String[] args) { MapReduceExample writer = new MapReduceExample(); writer.mapReduce(); } public static final String mapfunction = "function() { emit(this.custid, this.txnval); }"; public static final String reducefunction = "function(key, values) { return Array.sum(values); }"; private void mapReduce() { printer("Initializing..."); Builder options = MongoClientOptions.builder().readPreference(ReadPreference.secondary()); MongoClientURI uri = new MongoClientURI(MONGO_END_POINT, options); MongoClient client = new MongoClient(uri); MongoDatabase database = client.getDatabase(DEF_DB); MongoCollection collection = database.getCollection(COL_NAME); MapReduceIterable iterable = collection.mapReduce(mapfunction, reducefunction); // inline by default MongoCursor cursor = iterable.iterator(); while (cursor.hasNext()) { Document result = cursor.next(); printer("Customer: " + result.getString("_id") + ", Total Txn value: " + result.getDouble("value")); } printer("Done..."); } ... }
Come risulta dai log, il lavoro è stato eseguito sul secondario:
... 2016-12-09T08:32:31.419+0000 D COMMAND [conn371] mr ns: test.txns 2016-12-09T08:32:31.420+0000 I COMMAND [conn371] command test.$cmd command: listCollections { listCollections: 1, filter: { name: "txns" }, cursor: {} } keyUpdates:0 writeConflicts:0 numYields:0 reslen:150 locks:{ Global: { acquireCount: { r: 4 } }, Database: { acquireCount: { r: 1, R: 1 } }, Collection: { acquireCount: { r: 1 } } } protocol:op_query 0ms 2016-12-09T08:32:31.444+0000 I COMMAND [conn371] query test.system.js planSummary: EOF ntoreturn:0 ntoskip:0 keysExamined:0 docsExamined:0 cursorExhausted:1 keyUpdates:0 writeConflicts:0 numYields:0 nreturned:0 reslen:20 locks:{ Global: { acquireCount: { r: 6 } }, Database: { acquireCount: { r: 2, R: 1 } }, Collection: { acquireCount: { r: 2 } } } 0ms 2016-12-09T08:32:31.890+0000 I COMMAND [conn371] command test.txns command: mapReduce { mapreduce: "txns", map: function() { emit(this.custid, this.txnval); }, reduce: function(key, values) { return Array.sum(values); }, out: { inline: 1 }, query: null, sort: null, finalize: null, scope: null, verbose: true } planSummary: COUNT keyUpdates:0 writeConflicts:0 numYields:156 reslen:4331 locks:{ Global: { acquireCount: { r: 722 } }, Database: { acquireCount: { r: 3, R: 358 } }, Collection: { acquireCount: { r: 3 } } } protocol:op_query 470ms ...
MongoDB Map-Reduce su cluster sharded
MongoDB supporta Map-Reduce su cluster partizionati, sia quando una raccolta partizionata è l'input sia quando è l'output di un lavoro Map-Reduce. Tuttavia, MongoDB attualmente non supporta l'esecuzione di processi di riduzione della mappa sui secondari di un cluster partizionato. Quindi anche se l'opzione fuori è impostato su inline , i lavori Map-Reduce verranno sempre eseguiti sui primari di un cluster partizionato. Questo problema viene monitorato tramite questo bug JIRA.
La sintassi dell'esecuzione di un processo Map-Reduce su un cluster partizionato è la stessa di un set di repliche. Quindi valgono gli esempi forniti nella sezione precedente. Se l'esempio Java precedente viene eseguito su un cluster partizionato, sui primari vengono visualizzati messaggi di registro che indicano che il comando è stato eseguito lì.
... 2016-11-24T08:46:30.828+0000 I COMMAND [conn357] command test.$cmd command: mapreduce.shardedfinish { mapreduce.shardedfinish: { mapreduce: "txns", map: function() { emit(this.custid, this.txnval); }, reduce: function(key, values) { return Array.sum(values); }, out: { in line: 1 }, query: null, sort: null, finalize: null, scope: null, verbose: true, $queryOptions: { $readPreference: { mode: "secondary" } } }, inputDB: "test", shardedOutputCollection: "tmp.mrs.txns_1479977190_0", shards: { Shard-0/primary.shard0.example.com:27017,secondary.shard0.example.com:27017: { result: "tmp.mrs.txns_1479977190_0", timeMillis: 123, timing: { mapTime: 51, emitLoop: 116, reduceTime: 9, mode: "mixed", total: 123 }, counts: { input: 9474, emit: 9474, reduce: 909, output: 101 }, ok: 1.0, $gleS tats: { lastOpTime: Timestamp 1479977190000|103, electionId: ObjectId('7fffffff0000000000000001') } }, Shard-1/primary.shard1.example.com:27017,secondary.shard1.example.com:27017: { result: "tmp.mrs.txns_1479977190_0", timeMillis: 71, timing: { mapTime: 8, emitLoop: 63, reduceTime: 4, mode: "mixed", total: 71 }, counts: { input: 1526, emit: 1526, reduce: 197, output: 101 }, ok: 1.0, $gleStats: { lastOpTime: Timestamp 1479977190000|103, electionId: ObjectId('7fffffff0000000000000001') } } }, shardCounts: { Sha rd-0/primary.shard0.example.com:27017,secondary.shard0.example.com:27017: { input: 9474, emit: 9474, reduce: 909, output: 101 }, Shard-1/primary.shard1.example.com:27017,secondary.shard1.example.com:27017: { inpu t: 1526, emit: 1526, reduce: 197, output: 101 } }, counts: { emit: 11000, input: 11000, output: 202, reduce: 1106 } } keyUpdates:0 writeConflicts:0 numYields:0 reslen:4368 locks:{ Global: { acquireCount: { r: 2 } }, Database: { acquireCount: { r: 1 } }, Collection: { acqu ireCount: { r: 1 } } } protocol:op_command 115ms 2016-11-24T08:46:30.830+0000 I COMMAND [conn46] CMD: drop test.tmp.mrs.txns_1479977190_0 ...
Visita la nostra pagina del prodotto MongoDB per scoprire il nostro ampio elenco di funzionalità.