Perform Incremental Map-Reduce

On this page

Map-reduce operations can handle complex aggregation tasks. To perform map-reduce operations, MongoDB provides the mapReduce command and, in the mongo shell, the db.collection.mapReduce() wrapper method.

If the map-reduce data set is constantly growing, you may want to perform an incremental map-reduce rather than performing the map-reduce operation over the entire data set each time.

To perform incremental map-reduce:

  • Run a map-reduce job over the current collection and output the result to a separate collection.

  • When you have more data to process, run subsequent map-reduce job with:

  • the query parameter that specifies conditions that match only the new documents.

  • the out parameter that specifies the reduce action to merge the new results into the existing output collection.

Consider the following example where you schedule a map-reduce operation on a sessions collection to run at the end of each day.

Data Setup

The sessions collection contains documents that log users’ sessions each day, for example:

db.sessions.save( { userid: "a", ts: ISODate('2011-11-03 14:17:00'), length: 95 } );
db.sessions.save( { userid: "b", ts: ISODate('2011-11-03 14:23:00'), length: 110 } );
db.sessions.save( { userid: "c", ts: ISODate('2011-11-03 15:02:00'), length: 120 } );
db.sessions.save( { userid: "d", ts: ISODate('2011-11-03 16:45:00'), length: 45 } );

db.sessions.save( { userid: "a", ts: ISODate('2011-11-04 11:05:00'), length: 105 } );
db.sessions.save( { userid: "b", ts: ISODate('2011-11-04 13:14:00'), length: 120 } );
db.sessions.save( { userid: "c", ts: ISODate('2011-11-04 17:00:00'), length: 130 } );
db.sessions.save( { userid: "d", ts: ISODate('2011-11-04 15:37:00'), length: 65 } );

Initial Map-Reduce of Current Collection

Run the first map-reduce operation as follows:

  • Define the map function that maps the userid to an object that contains the fields userid , total_time , count , and avg_time :
var mapFunction = function() {
                      var key = this.userid;
                      var value = {
                                    userid: this.userid,
                                    total_time: this.length,
                                    count: 1,
                                    avg_time: 0
                                   };

                      emit( key, value );
                  };
  • Define the corresponding reduce function with two arguments key and values to calculate the total time and the count. The key corresponds to the userid , and the values is an array whose elements corresponds to the individual objects mapped to the userid in the mapFunction .
var reduceFunction = function(key, values) {

                        var reducedObject = {
                                              userid: key,
                                              total_time: 0,
                                              count:0,
                                              avg_time:0
                                            };

                        values.forEach( function(value) {
                                              reducedObject.total_time += value.total_time;
                                              reducedObject.count += value.count;
                                        }
                                      );
                        return reducedObject;
                     };
  • Define the finalize function with two arguments key and reducedValue . The function modifies the reducedValue document to add another field average and returns the modified document.
var finalizeFunction = function (key, reducedValue) {

                          if (reducedValue.count > 0)
                              reducedValue.avg_time = reducedValue.total_time / reducedValue.count;

                          return reducedValue;
                       };
  • Perform map-reduce on the session collection using the mapFunction , the reduceFunction , and the finalizeFunction functions. Output the results to a collection session_stat . If the session_stat collection already exists, the operation will replace the contents:
db.sessions.mapReduce( mapFunction,
                       reduceFunction,
                       {
                         out: "session_stat",
                         finalize: finalizeFunction
                       }
                     )

Subsequent Incremental Map-Reduce

Later, as the sessions collection grows, you can run additional map-reduce operations. For example, add new documents to the sessions collection:

db.sessions.save( { userid: "a", ts: ISODate('2011-11-05 14:17:00'), length: 100 } );
db.sessions.save( { userid: "b", ts: ISODate('2011-11-05 14:23:00'), length: 115 } );
db.sessions.save( { userid: "c", ts: ISODate('2011-11-05 15:02:00'), length: 125 } );
db.sessions.save( { userid: "d", ts: ISODate('2011-11-05 16:45:00'), length: 55 } );

At the end of the day, perform incremental map-reduce on the sessions collection, but use the query field to select only the new documents. Output the results to the collection session_stat , but reduce the contents with the results of the incremental map-reduce:

db.sessions.mapReduce( mapFunction,
                       reduceFunction,
                       {
                         query: { ts: { $gt: ISODate('2011-11-05 00:00:00') } },
                         out: { reduce: "session_stat" },
                         finalize: finalizeFunction
                       }
                     );