MongoDB聚合

MongoDB聚合操作用于对数据的批量操作,将集合按条件分组后在进行一系列操作,诸如求和、求均值等。聚合操作能对集合进行复杂的操作,主要用于数理统计和数据挖掘。MongoDB中聚合操作的输入是i集合中的文档,输出可以是一个文档,也可是多条文档。MongoDB提供非常强大的聚合操作,可分为三种方式

  • 聚合管道(Aggregation Pipeline)
  • 单目聚合操作(Single Purpose Aggregation Operation)
  • MapReduce 编程模型

聚合管道

POSIX多线程使用方式中,有种叫做管道(流水线)的方式,其数据元素流串行地被一组线程按顺序执行。

聚合管道由阶段(Stage)组成,文档在一个阶段处理完毕后,聚合管道会把处理结果传到下一个阶段。聚合管的功能

  • 对文档进行过滤,查询出符合条件的文档。
  • 对文档进行变换,改变文档的输出形式。

聚合管道的每个阶段使用阶段操作符(Stage Operators)定义,在每个阶段操作符中可使用表达式操作符(Expression Operators)计算总和、均值、拼接或分割字符串等操作,直到每个阶段完结最后返回结果,返回的结果可直接输出,也可存储到集合中。

聚合管道到的用法

处理流程

  • db.collection.aggregate() 可同时使用多个管道,方便数据处理。
  • db.collection.aggregate() 使用 MongoDB 内置原生操作,聚合效率高且支持类似SQL中GroupBy的操作,不在需要用户编写自定义的JS例程。
  • 每个阶段管道限制100M的内存,若单节点管道超出极限,MongoDB产生错误。为了能够处理大型数据集,可设置 allowDiskUsetrue 为聚合管道节点把数据写入临时文件,以解决100M内存限制。
  • db.collection.aggregate() 可作用于分片集合,但结果不能输在分片集合,MapReduce可作用于在分片集合其结果也可输在分片集合中。
  • db.collection.aggregate() 返回一个指针(cursor),数据存放在内存中可直接操作,跟MongoShell一样。
  • db.collection.aggregate() 输出的结果只能保存在文档中,BSON Document大小限制为16M。

语法解析

db.collection.aggregate(pipeline, options)
db.collection.aggregate([{<stage>,...}], ...)

pipeline 参数

  • $project 对输入文档添加新字段或删除现有字段,可自定义显示哪些字段。
  • $match 根据条件过滤仅输出符合条件的文档,若放在pipeline前面,根据条件过滤数据并传输到写一个阶段管道,可提高后续数据处理效率。也可放在out之前用以对结果再一次过滤。
  • $redact 字段所处的document结构的级别
  • $limit 用来限制MongoDB聚合管道返回的文档数量
  • $skip 在聚合管道中跳过指定数量的文档并返回剩余的文档
  • $unwind 将文档中某个数组类型字段拆分成多条,每条包含数组中的一个值。
  • $sample 随机选择从起输入指定数量的文档,若大于或等于5%的collection的文档,$sample进行收集扫描并排序随后选择顶部文件。因此$sample在收集阶段是受排序的内存限制。
  • $sort 将输入文档排序后输出
  • $geoNear 用于地理位置数据分析
  • $out 必须为pipeline最后一个阶段管道,将最后计算结果写入到指定collection中。
  • $indexStats 返回数据集合的每个索引使用情况
  • $group 将集合中的文档分组,可用于统计结果,$group 首先将数据根据 key 进行分组。

$match

筛选条件,过滤不满足条件的文档,可使用常规查询操作符。

db.users.aggregate( {$match: {'age':{$gte:18}} } )

$project

  • 用于包含、排除字段,设置需查询或过滤的字段,0为过滤掉字段不显示,1为需查询的字段。
  • 用于对字段重命名
  • 投射中可使用表达式
db.users.aggregate(
  {$match: {age: {$gte: 18}}},
  {$project: {_id:0, username:1, created_at:1}}
)

// $project 当字段值为0或1时用于过滤字段,当键名为一个自定义的字符串,键值为$紧跟原字段表示要对该字段进行重命名。
db.users.aggregate(
  {$match: {age: {$gte: 18}}},
  {$project: {_id:0, username:1, nickname:$username }}
)

// 通过修改字段名达到生成字段副本,以便后续操作符使用。
db.users.aggregate(
  { $match: {age: {$gte: 18 } } },
  { $project: {id:$_id, username:1 } }
)

算术表达式可对数值运算

db.users.aggregate(
  // 对 score 字段值加1后作为 scores 的值
  { $project: {scores: { $add : [$score,$score,1]  } } },
)
db.users.aggregate(
  // $subtract:[exp1, exp2] 数组中第一个元素减去第二个元素
  { $project: {scores: { $subtract: [$score,1]  } } },
)
db.users.aggregate(
  // $multiply:[exp1, exp2] 数组中多个元素相乘
  { $project: {scores: { $multiply: [$score, 2, 5]  } } },
)
db.users.aggregate(
  // $divide:[exp1, exp2] 数组中第一个元素除以第二个元素
  { $project: {scores: { $divide: [$score, 2, 5]  } } },
)
db.users.aggregate(
  // $mod:[exp1, exp2] 数组中第一个元素除以第二个元素的余数
  { $project: {scores: { $mod: [$score, 2]  } } },
)

字符串操作

db.users.aggregate(
  // $substr:[exp, startOffset, numToReturn] 字符串截取
  { $project: {nickname: { $substr: [$nickname, 2, 6]  } } },
)

db.users.aggregate(
  // $concat:[exp1, exp2, exp3...] 字符串拼接,将数组中多个元素拼接在一起
  { $project: {fullname: { $concat: [$firstname, $lastname]  } } },
)

db.users.aggregate(
  // $toLower:exp 字符串转为小写
  { $project: {nickname: { $toLower: $username } } },
)

db.users.aggregate(
  // $toUpper:exp 字符串转为大写
  { $project: {nickname: { $toUpper: $username } } },
)

为所有文档新增字段

db.users.update({}, {$set, {publish_at:new Date()} }, true, true)

日期表达式

db.users.aggregate(
  { $project: { 
      'year': { $year: $created_at },
      'month': { $month: $created_at }, 
      'dayOfMonth': { $dayOfMonth: $created_at }, 
      'dayOfWeek': { $dayOfWeek: $created_at }, 
      'dayOfYear': { $dayOfYear: $created_at },  
      'hour': { $hour: $created_at }, 
      'minute': { $minute: $created_at }, 
      'second': { $second: $created_at }, 
    } 
  }
)

时间间隔(秒数)

db.users.aggregate(
  { 
    $project: { 
      'fasttime': {
         $subtract: [ { $second: new Date() }, { $second: $created_at } ]
        } 
     } 
  }
)

字符串比较

db.users.aggregate(
  // $cmp:[exp1, exp2] 字符串比较,相同为0,小于返回负数, 大于返回正数
  { $project: { result : { $cmp: [ $age, 18 ] } } }
)
db.users.aggregate(
  // $strcasecmp:[exp1, exp2] 字符串比较,相同为0,小于返回-1, 大于返回1
  { $project: { result : { $strcasecmp: [ $username, 'junchow' ] } } }
)

逻辑条件

db.users.aggregate(
  // $eq 判断表达式是否相等
  { $project: { result : {$eq: [$username, 'junchow' ] } } }
)

db.users.aggregate(
  // $and [exp1, exp2...expn] 连接多条件,所有条件为真则表达式为真
  { $project: { result: { $and : [ {$eq : [$username:'junchow']}, {$gt:[$age : 18]} ] } }  }
)

db.users.aggregate({
  // $not exp 用于取反操作
  $project: {result: {$not:{$eq:[$username, 'junchow']}} }
})

db.users.aggregate({
  // $cond:[booleanExp, trueExp, falseExp] 三目运算符
  $project : { result: {$cond: [{$eq:[$username:'junchow']}, true, false] } }
})

db.users.aggregate({
  // $ifNull:[exp, replacementExpr] 若条件为null则返回表达式值,若字段不存在时字段值为null
  $project: { result: { $ifNull : [ $notExistField, 'not exist is null' ] } }
})

$group

$group分组使用_id指定要分组的键名,用来自定义字段统计。

db.users.aggregate({
  $match : { age: { $gte : 18 } }
},{
  $group : { _id:$username, count:{$sum:1} }
});

// 多字段分组
db.users.aggregate({
  $match: {age: {$gte:18}  }},
  $group: {_id:{username:$username, age:$ge}, 'count':{$sum:1} }        
})

// $sum:val 对每个文档加val求和
// $avg:val 对每个文档求均值
db.users.aggregate({
  $group: { _id:$username, count:{$avg:$age} }
})

db.users.aggregate({
  $group: { _id:$username, count:{$max:$age}  }
})

db.users.aggregate({
  $group: {_id:$username, count:{$min:$age} }
})

// $first:val 获取分组中首个
db.users.aggregate({
  $group:{_id:$username, count:{$first: $age} }
})
db.users.aggregate({
  $group:{_id:$username, count:{$last: $age} }
})
db.users.aggregate({
  $group: {_id:$username, count:{$addToSet: $age} }
})
db.users.aggregate({
  $group:{_id:$username, count:{$push: $age} }
})

聚合运算

group

先选定分组所依据的键,而后将集合依据选定键值的不同分成若干组。然后可通过聚合每一组内的文档,产生一个结果文档。

group不支持分片集群,无法进行分布式运算(shard cluster)。若需要支持分布式需使用aggregatemapReduce

db.collection.group(document)

{
  # 分组字段
  key:{key1, key2:1},
  # 查询条件
  cond:{},
  # 聚合函数
  reduce:function(current, result){},
  # 初始化
  initial:{},
  # 统计一组后的回调函数
  finalize:function(){}
}

计算每个栏目下商品个数

SELECT COUNT(*) FROM goods GROUP BY category_id;

db.goods.group({
  key:{category_id:1},
  cond:{},//所有
  reduce:function(current, result){//current对应当前行,result对应分组中的多行
    result.total += 1;
  },
  initial:{total:0}
})

查看每个栏目下商品价格大于100的数量

SELECT category,goods_name FROM goods WHERE 1=1 AND price>100 GROUP BY category_id

db.goods.group({
  key:{category_id:1},
  cond:{price:{$gt:100}},
  reduce:function(current,result){
    result.count += 1;
  },
  initial:{count:0}
})

计算每个栏目下商品库存量

SELECT category_id,SUM(store) FROM goods WHERE 1=1 AND GROUP BY category_id

db.goods.group({
  key:{category_id:1},
  cond:{},
  initial:{sum:0},
  reduce:function(current,result){
    result += current.store;
  }
});

获取每个栏目下最贵的商品价格

SELECT catetory_id,MAX(price) FROM goods GROUP BY category_id

db.goods.group({
  key:{category_id:1},
  cond:{},
  initial:{max:0},
  reduce:function(current,result){
    if(current.price > result.max){
      result.max = current.price;
    }
  }
});

查询每个栏目下商品的平均价格

SELECT category_id,AVERAGE(price) FROM goods GROUP BY category_id

db.goods.group({
  key:{category_id:1},
  cond:{},
  reduce:function(current,result){
    result.total += current.price;
    result.count  += 1;
  },
  initial:{total:0, count:0},//进组result
  finalize:functioin(result){//出组 result
    result.average = result.total/result.count;
  }
})

aggregate

aggregate聚合框架与sql对比

  • $match WHERE
  • $group GROUP BY
  • $project SELECT
  • $sort ORDER BY
  • $limit LIMIT
  • $sum SUM()
  • $sum COUNT()

查询每个栏目下商品数量

SELECT COUNT(*) FROM goods GROUP BY category_id

db.goods.aggregate([
  {$group:{_id:'$category_id', count:{$sum:1}}},
  {$project:{_id:0, category_id:'$category_id', count:'$count'}},
  {$sort:{count:1}}
])

查询每个栏目下价格大于100的商品个数

SELECT COUNT(*) FROM goods WHERE 1=1 AND price>100 GROUP BY category_id

db.goods.aggregate([
  {$match:{price:{$gt:100}}},
  {$group:{_id:'$category_id'}, count:{$sum:1}},
  {$project:{_id:0, category_id:'$_id.category_id', count:'$count'}},
  {$sort:{count:1}}
])

查询每个栏目下价格大于100的商品个数,仅显示个数大于3的。

SELECT category_id,COUNT(*) AS count WHERE 1=1 AND price>100 GROUP BY category_id HAVING count>3

db.goods.aggregate([
  {$match:{price:{$gt:100}}},
  {$group:{_id:'$category_id', count:{$sum:1}}},
  {$match:{count:{$gt:3}}},
  {$project:{_id:0, category_id:'$_id.category_id', count:'$count'}},
  {$sort:{count:1}}
])

查看每个栏目下商品的库存量

SELECT category_id,SUM(store) WHERE 1=1 GROUP BY category_id

db.goods.aggregate([
  {$group:{_id:'$category_id', count:{$sum:1}, total:{$sum:'$store'}}},
  {$project:{_id:0, category_id:'$_id.category_id', count:'$count', total:'$total'}},
  {$sort:{total:1}}
])

查询每个栏目下商品的平均价格并升序排序

SELECT category_id,AVG(price) AS avg FROM goods GROUP BY category_id ORDER BY avg ASC

db.goods.aggregate([
  {$group:{_id:'$category_id', count:{$sum:1}, average:{$avg:'$price'}}},
  {$sort:average:-1}
])

mapReduce

mapReduce是一个轻松并行化到多台服务器的聚合方法,它会拆分问题,再将各个部分发送到不同机器上,让每台机器都完成一部分。当所有机器都完成后,再将结果汇集起来形成最终完整的结果。

mapReduce最开始是映射(map),将操作映射到集合中的每个文档。这个操作要么无作为,要么产生一些键和x个值。接着进入中间环节(洗牌shuffle),按照分组并将产生的键值组成列表放到对应的键中。化简(reduce)则把列表中的值化简成一个单值。这个值被返回,然后接着进行洗牌,直到每个键的列表只有一个值为止,这个值就是最后的结果。

mapReduce的代价是速度,group不是很快,mapReduce更慢,绝不要用在实时环境中,要作为后台任务运行,将创建一个保存结果的结合,可对这个集合进行实时查询。

mapReduce的工作过程

  • map 映射
    现将同一个组的数据映射到一个文档(数组)上,在映射环节想要得到文档中每个键,map()使用emit()返回要处理的值。edit()会给mapReduce一个键和一个值,键类似group所使用键key。
  • reduce 归约
    将数组(同一组)数据进行运算,reduce()由两个参数,一个是key也就是emit()返回的第一个值,另一个是数组,由一个或多个对应于键的文档构成。reduce()一定要能够被反复调用,不论是映射环节还是前一个简化环节。

mapReduce语法

db.runCommand({
  mapreduce:字符串,集合名
  map:函数,
  reduce:函数,
  [query:文档,发往map()前先给过渡的文档],
  [sort:文档,发往map()前先给文档排序],
  [limit:整数,发往map()的文档数量上限],
  [out:字符串,统计结果保存的集合],
  [keeptemp:布尔值,链接关闭时临时结果集合是否保存],
  [finalize:函数,将reduce的结果发给此函数做最后处理],
  [scope:文档,js代码中要用到的变量],
  //jsMode=true时 BSON>JS>map>reduce>BSON
  //jsMode=false时 BSON>JS>map>BSON>JS>reduce>BSON,可处理非常大的mapreduce
  [jsMode:布尔值,是否减少执行过程中BSON和JS的转换,默认为true],
  [verbose:布尔值,是否产生更加详细的服务器日期,默认为true]
})
mapReduce

MongoDB没有模式,所以并不晓得每个文档由多少个键,通常找到集合的所有键的最好方法就是用MapReduce。

查询结合中所有键

var map = function(){
  for(var k in this){ //this当前映射文档的引用
    emit(k, {count:1});//将文档某个键的计数返回
  }
}
var reduce = function(key,emits){
  var total = 0;
  for(var k in emits){

    }
}

计算每个栏目下商品库存总量

SELECT category_id,SUM(store) AS sum FROM goods GROUP BY category_id

var map = function(){
  emit(this.category_id, this.store);//获得栏目下商品的库存量
};
var reduce = function(key,store){
  return Array.sum(store);
}
db.goods.mapReduce(map,reduce,query,{out:'result'});

查询每个栏目下商品的平均价格

var map = function(){
  emit(this.category_id, this.price);
}
var reduce = function(category_id,price){
  return Array.avg(price);
}
db.goods.mapReduce(map, reduce, {out:'result'})

将MongoDB组成的shard分片集群把地震数据分布到各节点上,将中国区域按10个经度10个维度为一组约30块,并用mapReduce计算地震数据,统计每组上每月的地震次数及地震级别。分析出结果把地震高发区用偏红色标注,低发区偏绿标注。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,456评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,370评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,337评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,583评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,596评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,572评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,936评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,595评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,850评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,601评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,685评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,371评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,951评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,934评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,167评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,636评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,411评论 2 342