Perform Incremental Map-Reduce

Map-reduce operations can handle complex aggregation tasks. To perform map-reduce operations, MongoDB provides the mapReduce command and, in the mongo shell, the db.collection.mapReduce() wrapper method.

If the map-reduce data set is constantly growing, you may want to perform an incremental map-reduce rather than performing the map-reduce operation over the entire data set each time.

To perform incremental map-reduce:

  1. Run a map-reduce job over the current collection and output the result to a separate collection.
  2. When you have more data to process, run subsequent map-reduce job with:
    • the query parameter that specifies conditions that match only the new documents.
    • the out parameter that specifies the reduce action to merge the new results into the existing output collection.

Consider the following example where you schedule a map-reduce operation on a sessions collection to run at the end of each day.

Data Setup

The sessions collection contains documents that log users’ sessions each day, for example:

db.sessions.save( { userid: "a", ts: ISODate('2011-11-03 14:17:00'), length: 95 } );
db.sessions.save( { userid: "b", ts: ISODate('2011-11-03 14:23:00'), length: 110 } );
db.sessions.save( { userid: "c", ts: ISODate('2011-11-03 15:02:00'), length: 120 } );
db.sessions.save( { userid: "d", ts: ISODate('2011-11-03 16:45:00'), length: 45 } );

db.sessions.save( { userid: "a", ts: ISODate('2011-11-04 11:05:00'), length: 105 } );
db.sessions.save( { userid: "b", ts: ISODate('2011-11-04 13:14:00'), length: 120 } );
db.sessions.save( { userid: "c", ts: ISODate('2011-11-04 17:00:00'), length: 130 } );
db.sessions.save( { userid: "d", ts: ISODate('2011-11-04 15:37:00'), length: 65 } );

Initial Map-Reduce of Current Collection

Run the first map-reduce operation as follows:

  1. Define the map function that maps the userid to an object that contains the fields userid, total_time, count, and avg_time:

    var mapFunction = function() {
                          var key = this.userid;
                          var value = {
                                        userid: this.userid,
                                        total_time: this.length,
                                        count: 1,
                                        avg_time: 0
                                       };
    
                          emit( key, value );
                      };
    
  2. Define the corresponding reduce function with two arguments key and values to calculate the total time and the count. The key corresponds to the userid, and the values is an array whose elements corresponds to the individual objects mapped to the userid in the mapFunction.

    var reduceFunction = function(key, values) {
    
                            var reducedObject = {
                                                  userid: key,
                                                  total_time: 0,
                                                  count:0,
                                                  avg_time:0
                                                };
    
                            values.forEach( function(value) {
                                                  reducedObject.total_time += value.total_time;
                                                  reducedObject.count += value.count;
                                            }
                                          );
                            return reducedObject;
                         };
    
  3. Define the finalize function with two arguments key and reducedValue. The function modifies the reducedValue document to add another field average and returns the modified document.

    var finalizeFunction = function (key, reducedValue) {
    
                              if (reducedValue.count > 0)
                                  reducedValue.avg_time = reducedValue.total_time / reducedValue.count;
    
                              return reducedValue;
                           };
    
  4. Perform map-reduce on the session collection using the mapFunction, the reduceFunction, and the finalizeFunction functions. Output the results to a collection session_stat. If the session_stat collection already exists, the operation will replace the contents:

    db.sessions.mapReduce( mapFunction,
                           reduceFunction,
                           {
                             out: "session_stat",
                             finalize: finalizeFunction
                           }
                         )
    

Subsequent Incremental Map-Reduce

Later, as the sessions collection grows, you can run additional map-reduce operations. For example, add new documents to the sessions collection:

db.sessions.save( { userid: "a", ts: ISODate('2011-11-05 14:17:00'), length: 100 } );
db.sessions.save( { userid: "b", ts: ISODate('2011-11-05 14:23:00'), length: 115 } );
db.sessions.save( { userid: "c", ts: ISODate('2011-11-05 15:02:00'), length: 125 } );
db.sessions.save( { userid: "d", ts: ISODate('2011-11-05 16:45:00'), length: 55 } );

At the end of the day, perform incremental map-reduce on the sessions collection, but use the query field to select only the new documents. Output the results to the collection session_stat, but reduce the contents with the results of the incremental map-reduce:

db.sessions.mapReduce( mapFunction,
                       reduceFunction,
                       {
                         query: { ts: { $gt: ISODate('2011-11-05 00:00:00') } },
                         out: { reduce: "session_stat" },
                         finalize: finalizeFunction
                       }
                     );