MongoDB聚合操作用于对数据的批量操作,将集合按条件分组后在进行一系列操作,诸如求和、求均值等。聚合操作能对集合进行复杂的操作,主要用于数理统计和数据挖掘。MongoDB中聚合操作的输入是i集合中的文档,输出可以是一个文档,也可是多条文档。MongoDB提供非常强大的聚合操作,可分为三种方式
- 聚合管道(Aggregation Pipeline)
- 单目聚合操作(Single Purpose Aggregation Operation)
- MapReduce 编程模型
聚合管道
POSIX多线程使用方式中,有种叫做管道(流水线)的方式,其数据元素流串行地被一组线程按顺序执行。
聚合管道由阶段(Stage)组成,文档在一个阶段处理完毕后,聚合管道会把处理结果传到下一个阶段。聚合管的功能
- 对文档进行过滤,查询出符合条件的文档。
- 对文档进行变换,改变文档的输出形式。
聚合管道的每个阶段使用阶段操作符(Stage Operators)定义,在每个阶段操作符中可使用表达式操作符(Expression Operators)计算总和、均值、拼接或分割字符串等操作,直到每个阶段完结最后返回结果,返回的结果可直接输出,也可存储到集合中。
处理流程
-
db.collection.aggregate()
可同时使用多个管道,方便数据处理。 -
db.collection.aggregate()
使用 MongoDB 内置原生操作,聚合效率高且支持类似SQL中GroupBy的操作,不在需要用户编写自定义的JS例程。 - 每个阶段管道限制100M的内存,若单节点管道超出极限,MongoDB产生错误。为了能够处理大型数据集,可设置
allowDiskUse
为true
为聚合管道节点把数据写入临时文件,以解决100M内存限制。 -
db.collection.aggregate()
可作用于分片集合,但结果不能输在分片集合,MapReduce可作用于在分片集合其结果也可输在分片集合中。 -
db.collection.aggregate()
返回一个指针(cursor),数据存放在内存中可直接操作,跟MongoShell一样。 -
db.collection.aggregate()
输出的结果只能保存在文档中,BSON Document大小限制为16M。
语法解析
db.collection.aggregate(pipeline, options)
db.collection.aggregate([{<stage>,...}], ...)
pipeline 参数
-
$project
对输入文档添加新字段或删除现有字段,可自定义显示哪些字段。 -
$match
根据条件过滤仅输出符合条件的文档,若放在pipeline前面,根据条件过滤数据并传输到写一个阶段管道,可提高后续数据处理效率。也可放在out之前用以对结果再一次过滤。 -
$redact
字段所处的document结构的级别 -
$limit
用来限制MongoDB聚合管道返回的文档数量 -
$skip
在聚合管道中跳过指定数量的文档并返回剩余的文档 -
$unwind
将文档中某个数组类型字段拆分成多条,每条包含数组中的一个值。 -
$sample
随机选择从起输入指定数量的文档,若大于或等于5%的collection的文档,$sample进行收集扫描并排序随后选择顶部文件。因此$sample在收集阶段是受排序的内存限制。 -
$sort
将输入文档排序后输出 -
$geoNear
用于地理位置数据分析 -
$out
必须为pipeline最后一个阶段管道,将最后计算结果写入到指定collection中。 -
$indexStats
返回数据集合的每个索引使用情况 -
$group
将集合中的文档分组,可用于统计结果,$group 首先将数据根据 key 进行分组。
$match
筛选条件,过滤不满足条件的文档,可使用常规查询操作符。
db.users.aggregate( {$match: {'age':{$gte:18}} } )
$project
- 用于包含、排除字段,设置需查询或过滤的字段,0为过滤掉字段不显示,1为需查询的字段。
- 用于对字段重命名
- 投射中可使用表达式
db.users.aggregate(
{$match: {age: {$gte: 18}}},
{$project: {_id:0, username:1, created_at:1}}
)
// $project 当字段值为0或1时用于过滤字段,当键名为一个自定义的字符串,键值为$紧跟原字段表示要对该字段进行重命名。
db.users.aggregate(
{$match: {age: {$gte: 18}}},
{$project: {_id:0, username:1, nickname:$username }}
)
// 通过修改字段名达到生成字段副本,以便后续操作符使用。
db.users.aggregate(
{ $match: {age: {$gte: 18 } } },
{ $project: {id:$_id, username:1 } }
)
算术表达式可对数值运算
db.users.aggregate(
// 对 score 字段值加1后作为 scores 的值
{ $project: {scores: { $add : [$score,$score,1] } } },
)
db.users.aggregate(
// $subtract:[exp1, exp2] 数组中第一个元素减去第二个元素
{ $project: {scores: { $subtract: [$score,1] } } },
)
db.users.aggregate(
// $multiply:[exp1, exp2] 数组中多个元素相乘
{ $project: {scores: { $multiply: [$score, 2, 5] } } },
)
db.users.aggregate(
// $divide:[exp1, exp2] 数组中第一个元素除以第二个元素
{ $project: {scores: { $divide: [$score, 2, 5] } } },
)
db.users.aggregate(
// $mod:[exp1, exp2] 数组中第一个元素除以第二个元素的余数
{ $project: {scores: { $mod: [$score, 2] } } },
)
字符串操作
db.users.aggregate(
// $substr:[exp, startOffset, numToReturn] 字符串截取
{ $project: {nickname: { $substr: [$nickname, 2, 6] } } },
)
db.users.aggregate(
// $concat:[exp1, exp2, exp3...] 字符串拼接,将数组中多个元素拼接在一起
{ $project: {fullname: { $concat: [$firstname, $lastname] } } },
)
db.users.aggregate(
// $toLower:exp 字符串转为小写
{ $project: {nickname: { $toLower: $username } } },
)
db.users.aggregate(
// $toUpper:exp 字符串转为大写
{ $project: {nickname: { $toUpper: $username } } },
)
为所有文档新增字段
db.users.update({}, {$set, {publish_at:new Date()} }, true, true)
日期表达式
db.users.aggregate(
{ $project: {
'year': { $year: $created_at },
'month': { $month: $created_at },
'dayOfMonth': { $dayOfMonth: $created_at },
'dayOfWeek': { $dayOfWeek: $created_at },
'dayOfYear': { $dayOfYear: $created_at },
'hour': { $hour: $created_at },
'minute': { $minute: $created_at },
'second': { $second: $created_at },
}
}
)
时间间隔(秒数)
db.users.aggregate(
{
$project: {
'fasttime': {
$subtract: [ { $second: new Date() }, { $second: $created_at } ]
}
}
}
)
字符串比较
db.users.aggregate(
// $cmp:[exp1, exp2] 字符串比较,相同为0,小于返回负数, 大于返回正数
{ $project: { result : { $cmp: [ $age, 18 ] } } }
)
db.users.aggregate(
// $strcasecmp:[exp1, exp2] 字符串比较,相同为0,小于返回-1, 大于返回1
{ $project: { result : { $strcasecmp: [ $username, 'junchow' ] } } }
)
逻辑条件
db.users.aggregate(
// $eq 判断表达式是否相等
{ $project: { result : {$eq: [$username, 'junchow' ] } } }
)
db.users.aggregate(
// $and [exp1, exp2...expn] 连接多条件,所有条件为真则表达式为真
{ $project: { result: { $and : [ {$eq : [$username:'junchow']}, {$gt:[$age : 18]} ] } } }
)
db.users.aggregate({
// $not exp 用于取反操作
$project: {result: {$not:{$eq:[$username, 'junchow']}} }
})
db.users.aggregate({
// $cond:[booleanExp, trueExp, falseExp] 三目运算符
$project : { result: {$cond: [{$eq:[$username:'junchow']}, true, false] } }
})
db.users.aggregate({
// $ifNull:[exp, replacementExpr] 若条件为null则返回表达式值,若字段不存在时字段值为null
$project: { result: { $ifNull : [ $notExistField, 'not exist is null' ] } }
})
$group
$group分组使用_id指定要分组的键名,用来自定义字段统计。
db.users.aggregate({
$match : { age: { $gte : 18 } }
},{
$group : { _id:$username, count:{$sum:1} }
});
// 多字段分组
db.users.aggregate({
$match: {age: {$gte:18} }},
$group: {_id:{username:$username, age:$ge}, 'count':{$sum:1} }
})
// $sum:val 对每个文档加val求和
// $avg:val 对每个文档求均值
db.users.aggregate({
$group: { _id:$username, count:{$avg:$age} }
})
db.users.aggregate({
$group: { _id:$username, count:{$max:$age} }
})
db.users.aggregate({
$group: {_id:$username, count:{$min:$age} }
})
// $first:val 获取分组中首个
db.users.aggregate({
$group:{_id:$username, count:{$first: $age} }
})
db.users.aggregate({
$group:{_id:$username, count:{$last: $age} }
})
db.users.aggregate({
$group: {_id:$username, count:{$addToSet: $age} }
})
db.users.aggregate({
$group:{_id:$username, count:{$push: $age} }
})
聚合运算
group
先选定分组所依据的键,而后将集合依据选定键值的不同分成若干组。然后可通过聚合每一组内的文档,产生一个结果文档。
group
不支持分片集群,无法进行分布式运算(shard cluster)。若需要支持分布式需使用aggregate
或mapReduce
。
db.collection.group(document)
{
# 分组字段
key:{key1, key2:1},
# 查询条件
cond:{},
# 聚合函数
reduce:function(current, result){},
# 初始化
initial:{},
# 统计一组后的回调函数
finalize:function(){}
}
计算每个栏目下商品个数
SELECT COUNT(*) FROM goods GROUP BY category_id;
db.goods.group({
key:{category_id:1},
cond:{},//所有
reduce:function(current, result){//current对应当前行,result对应分组中的多行
result.total += 1;
},
initial:{total:0}
})
查看每个栏目下商品价格大于100的数量
SELECT category,goods_name FROM goods WHERE 1=1 AND price>100 GROUP BY category_id
db.goods.group({
key:{category_id:1},
cond:{price:{$gt:100}},
reduce:function(current,result){
result.count += 1;
},
initial:{count:0}
})
计算每个栏目下商品库存量
SELECT category_id,SUM(store) FROM goods WHERE 1=1 AND GROUP BY category_id
db.goods.group({
key:{category_id:1},
cond:{},
initial:{sum:0},
reduce:function(current,result){
result += current.store;
}
});
获取每个栏目下最贵的商品价格
SELECT catetory_id,MAX(price) FROM goods GROUP BY category_id
db.goods.group({
key:{category_id:1},
cond:{},
initial:{max:0},
reduce:function(current,result){
if(current.price > result.max){
result.max = current.price;
}
}
});
查询每个栏目下商品的平均价格
SELECT category_id,AVERAGE(price) FROM goods GROUP BY category_id
db.goods.group({
key:{category_id:1},
cond:{},
reduce:function(current,result){
result.total += current.price;
result.count += 1;
},
initial:{total:0, count:0},//进组result
finalize:functioin(result){//出组 result
result.average = result.total/result.count;
}
})
aggregate
aggregate聚合框架与sql对比
- $match WHERE
- $group GROUP BY
- $project SELECT
- $sort ORDER BY
- $limit LIMIT
- $sum SUM()
- $sum COUNT()
查询每个栏目下商品数量
SELECT COUNT(*) FROM goods GROUP BY category_id
db.goods.aggregate([
{$group:{_id:'$category_id', count:{$sum:1}}},
{$project:{_id:0, category_id:'$category_id', count:'$count'}},
{$sort:{count:1}}
])
查询每个栏目下价格大于100的商品个数
SELECT COUNT(*) FROM goods WHERE 1=1 AND price>100 GROUP BY category_id
db.goods.aggregate([
{$match:{price:{$gt:100}}},
{$group:{_id:'$category_id'}, count:{$sum:1}},
{$project:{_id:0, category_id:'$_id.category_id', count:'$count'}},
{$sort:{count:1}}
])
查询每个栏目下价格大于100的商品个数,仅显示个数大于3的。
SELECT category_id,COUNT(*) AS count WHERE 1=1 AND price>100 GROUP BY category_id HAVING count>3
db.goods.aggregate([
{$match:{price:{$gt:100}}},
{$group:{_id:'$category_id', count:{$sum:1}}},
{$match:{count:{$gt:3}}},
{$project:{_id:0, category_id:'$_id.category_id', count:'$count'}},
{$sort:{count:1}}
])
查看每个栏目下商品的库存量
SELECT category_id,SUM(store) WHERE 1=1 GROUP BY category_id
db.goods.aggregate([
{$group:{_id:'$category_id', count:{$sum:1}, total:{$sum:'$store'}}},
{$project:{_id:0, category_id:'$_id.category_id', count:'$count', total:'$total'}},
{$sort:{total:1}}
])
查询每个栏目下商品的平均价格并升序排序
SELECT category_id,AVG(price) AS avg FROM goods GROUP BY category_id ORDER BY avg ASC
db.goods.aggregate([
{$group:{_id:'$category_id', count:{$sum:1}, average:{$avg:'$price'}}},
{$sort:average:-1}
])
mapReduce
mapReduce是一个轻松并行化到多台服务器的聚合方法,它会拆分问题,再将各个部分发送到不同机器上,让每台机器都完成一部分。当所有机器都完成后,再将结果汇集起来形成最终完整的结果。
mapReduce最开始是映射(map),将操作映射到集合中的每个文档。这个操作要么无作为,要么产生一些键和x个值。接着进入中间环节(洗牌shuffle),按照分组并将产生的键值组成列表放到对应的键中。化简(reduce)则把列表中的值化简成一个单值。这个值被返回,然后接着进行洗牌,直到每个键的列表只有一个值为止,这个值就是最后的结果。
mapReduce的代价是速度,group不是很快,mapReduce更慢,绝不要用在实时环境中,要作为后台任务运行,将创建一个保存结果的结合,可对这个集合进行实时查询。
mapReduce的工作过程
- map 映射
现将同一个组的数据映射到一个文档(数组)上,在映射环节想要得到文档中每个键,map()使用emit()返回要处理的值。edit()会给mapReduce一个键和一个值,键类似group所使用键key。 - reduce 归约
将数组(同一组)数据进行运算,reduce()由两个参数,一个是key也就是emit()返回的第一个值,另一个是数组,由一个或多个对应于键的文档构成。reduce()一定要能够被反复调用,不论是映射环节还是前一个简化环节。
mapReduce语法
db.runCommand({
mapreduce:字符串,集合名
map:函数,
reduce:函数,
[query:文档,发往map()前先给过渡的文档],
[sort:文档,发往map()前先给文档排序],
[limit:整数,发往map()的文档数量上限],
[out:字符串,统计结果保存的集合],
[keeptemp:布尔值,链接关闭时临时结果集合是否保存],
[finalize:函数,将reduce的结果发给此函数做最后处理],
[scope:文档,js代码中要用到的变量],
//jsMode=true时 BSON>JS>map>reduce>BSON
//jsMode=false时 BSON>JS>map>BSON>JS>reduce>BSON,可处理非常大的mapreduce
[jsMode:布尔值,是否减少执行过程中BSON和JS的转换,默认为true],
[verbose:布尔值,是否产生更加详细的服务器日期,默认为true]
})
MongoDB没有模式,所以并不晓得每个文档由多少个键,通常找到集合的所有键的最好方法就是用MapReduce。
查询结合中所有键
var map = function(){
for(var k in this){ //this当前映射文档的引用
emit(k, {count:1});//将文档某个键的计数返回
}
}
var reduce = function(key,emits){
var total = 0;
for(var k in emits){
}
}
计算每个栏目下商品库存总量
SELECT category_id,SUM(store) AS sum FROM goods GROUP BY category_id
var map = function(){
emit(this.category_id, this.store);//获得栏目下商品的库存量
};
var reduce = function(key,store){
return Array.sum(store);
}
db.goods.mapReduce(map,reduce,query,{out:'result'});
查询每个栏目下商品的平均价格
var map = function(){
emit(this.category_id, this.price);
}
var reduce = function(category_id,price){
return Array.avg(price);
}
db.goods.mapReduce(map, reduce, {out:'result'})
将MongoDB组成的shard分片集群把地震数据分布到各节点上,将中国区域按10个经度10个维度为一组约30块,并用mapReduce计算地震数据,统计每组上每月的地震次数及地震级别。分析出结果把地震高发区用偏红色标注,低发区偏绿标注。