On this page
执行增量 Map-Reduce
在本页面
Map-reduce 操作可以处理复杂的聚合任务。为了执行 map-reduce 操作,MongoDB 提供了mapReduce命令,并在mongo shell 中提供了db.collection.mapReduce() wrapper 方法。
如果 map-reduce 数据集不断增长,则可能需要执行增量 map-reduce 而不是每次都对整个数据集执行 map-reduce 操作。
要执行增量 Map-Reduce:
在当前集合上运行 map-reduce 作业,然后将结果输出到单独的集合。
当您有更多数据要处理时,请使用以下命令运行后续的 map-reduce 作业:
query参数,该参数指定仅与新文档匹配的条件。out参数,该参数指定reduce动作以将新结果合并到现有输出集合中。
请考虑以下示例,在该示例中,您计划在每天结束时运行的sessions集合上进行 map-reduce 操作。
Data Setup
sessions集合包含每天记录用户会话的文档,例如:
db.sessions.save( { userid: "a", ts: ISODate('2011-11-03 14:17:00'), length: 95 } );
db.sessions.save( { userid: "b", ts: ISODate('2011-11-03 14:23:00'), length: 110 } );
db.sessions.save( { userid: "c", ts: ISODate('2011-11-03 15:02:00'), length: 120 } );
db.sessions.save( { userid: "d", ts: ISODate('2011-11-03 16:45:00'), length: 45 } );
db.sessions.save( { userid: "a", ts: ISODate('2011-11-04 11:05:00'), length: 105 } );
db.sessions.save( { userid: "b", ts: ISODate('2011-11-04 13:14:00'), length: 120 } );
db.sessions.save( { userid: "c", ts: ISODate('2011-11-04 17:00:00'), length: 130 } );
db.sessions.save( { userid: "d", ts: ISODate('2011-11-04 15:37:00'), length: 65 } );
当前集合的初始 Map-Reduce
运行第一个 map-reduce 操作,如下所示:
- 定义将
useridMap 到包含字段userid,total_time,count和avg_time的对象的 Map 函数:
var mapFunction = function() {
var key = this.userid;
var value = {
userid: this.userid,
total_time: this.length,
count: 1,
avg_time: 0
};
emit( key, value );
};
- 使用两个参数
key和values定义相应的 reduce 函数,以计算总时间和计数。key对应于userid,而values是一个数组,其元素对应于mapFunction中 Map 到userid的各个对象。
var reduceFunction = function(key, values) {
var reducedObject = {
userid: key,
total_time: 0,
count:0,
avg_time:0
};
values.forEach( function(value) {
reducedObject.total_time += value.total_time;
reducedObject.count += value.count;
}
);
return reducedObject;
};
- 使用两个参数
key和reducedValue定义 finalize 函数。该函数修改reducedValue文档以添加另一个字段average并返回修改后的文档。
var finalizeFunction = function (key, reducedValue) {
if (reducedValue.count > 0)
reducedValue.avg_time = reducedValue.total_time / reducedValue.count;
return reducedValue;
};
- 使用
mapFunction,reduceFunction和finalizeFunction函数对session集合执行 map-reduce。将结果输出到集合session_stat。如果session_stat集合已经存在,则该操作将替换内容:
db.sessions.mapReduce( mapFunction,
reduceFunction,
{
out: "session_stat",
finalize: finalizeFunction
}
)
后续增量 Map-Reduce
以后,随着sessions集合的增长,您可以运行其他 map-reduce 操作。例如,将新文档添加到sessions集合中:
db.sessions.save( { userid: "a", ts: ISODate('2011-11-05 14:17:00'), length: 100 } );
db.sessions.save( { userid: "b", ts: ISODate('2011-11-05 14:23:00'), length: 115 } );
db.sessions.save( { userid: "c", ts: ISODate('2011-11-05 15:02:00'), length: 125 } );
db.sessions.save( { userid: "d", ts: ISODate('2011-11-05 16:45:00'), length: 55 } );
在一天结束时,对sessions集合执行增量 map-reduce,但使用query字段仅选择新文档。将结果输出到集合session_stat,但将reduce的内容与增量 map-reduce 的结果一起输出:
db.sessions.mapReduce( mapFunction,
reduceFunction,
{
query: { ts: { $gt: ISODate('2011-11-05 00:00:00') } },
out: { reduce: "session_stat" },
finalize: finalizeFunction
}
);