有一张数据库,存储的是从kafka过来的流水数据,由于一次误操作,导致部分数据出现的两份,已经引起线上问题,需要进行修复。
db.tablecardlogin.aggregate([{
$group: {
//按照以上字段聚合
_id:{account:'$account', customerId:'$customerId', openid:'$openid',opt_time: '$opt_time'},
//然后求和放入count字段中
count:{$sum:1},
//取出每一条的id加到返回值中的dups字段中
dups: {$addToSet: '$_id'}
}
},
{
$match:{
//选择count大于1的
count:{$gt:1}
},
}],
//数据集太大,需要支持放到磁盘计算
{allowDiskUse: true}
).forEach(function(doc){
//删除dups数组中的第一个元素
doc.dups.shift();
//从数据库中删除
db.tablecardlogin.remove({_id: {$in: doc.dups}});
})