前言
分享一些Mongdb常用的数据清洗方式
注:"Exceeded memory limit for $group, but didn't allow external sort. Pass allowDiskUse:true to opt in." 原因与解决:
- 原因
大数据计算、数据统计时,每个计算任务(job或task)都会使用独立有限的内存空间,mongodb没有提供复杂的内存分配模型(任务调度算法),仅限定每个stage最多使用100M内存,如果超过此值将终止计算并返回error;为了支持较大数据集合的处理,我们可以指定“allowDiskUse”参数将“溢出”的数据写入本地的临时文件中(临时的collection),这个参数我们通常需要设定为true。
- 解决方案:
{allowDiskUse:true}
查询重复数据
db.feedImg_all.aggregate([
{
$group: { _id: {'mvid': '$mvid','feed_id':'$feed_id'},count: {$sum: 1},dups: {$addToSet: '$_id'}}
},
{
$match: {count: {$gt: 1}}
}
],{allowDiskUse:true})
删除重复数据
db.xiuxiu_all.aggregate([
{
$group: { _id: {'mvid': '$mvid','feed_id':'$feed_id'},count: {$sum: 1},dups: {$addToSet: '$_id'}}
},
{
$match: {count: {$gt: 1}}
}
],{allowDiskUse:true}).forEach(function(doc){
doc.dups.shift(); // 去除重复组的第一个元数据_id,得到除第一个之外的其他元组
db.xiuxiu_all.remove({_id: {$in: doc.dups}}); // remove()删除这些重复的数据
})
删除重复数据(python版本)
# -*- coding:utf-8 -*-
import pymongo
from pymongo import DeleteOne
'''
@author: lcx
@time: 2018/11/15
@desc:
'''
pipeline = [
{
'$group': {
'_id': {'mvid': '$mvid', 'feed_id': '$feed_id'},
'count': {'$sum': 1},
'dups': {
'$addToSet': '$_id'
}
},
},
{
'$match': {
'count': {
'$gt': 1
}
}
}
]
myclient = pymongo.MongoClient(host='m3005.test.com',port=3005,connect=False)
db = myclient.deepnet_test
if __name__ == '__main__':
map_id = map(lambda doc: doc['dups'][1:], db['xiuxiu_all'].aggregate(pipeline=pipeline,allowDiskUse=True))
list_id = [item for sublist in map_id for item in sublist]
print(db['xiuxiu_all']
.bulk_write(list(map(lambda _id: DeleteOne({'_id': _id}), list_id)))
.bulk_api_result)
复制collection里的数据到另一个collection中
db.xiuxiu_all.find().forEach(function(x){
db.xiuxiu_all_bak.insert(x);
})
过滤重复字段并统计总记录数
db.feedImg_all.aggregate(
[
{$match:{"createTime": {"$gte": 1541606400, "$lt": 1541692800}}}, // 添加过滤条件
{$project:{"feedId": true}},
{$group:{_id: "$feedId"}},
{$group:{_id: null, count: {$sum:1}}}
], {allowDiskUse: true})