使用Mongo聚合操作来进行重复的数据项清洗,并使用PyMongo加入到数据清洗组件中。
当前环境:PyMongo 3.6.1 / MongoDB 3.4.7 / Python 3.6.4 :: Anaconda, Inc.
在爬虫中断续爬时会出现少量数据重复的问题,我将数据去重放在了数据清洗环节,清洗的过程中顺带将重复的数据删除。
Mongo老版本的解决方案是建立单一索引,Mongo3.+可以使用聚合操作将重复的数据检索出来并进行删除。
元数据结构如下:
item = {
"_id" : ObjectId("..."),
"title" : "...", # 数据标题
"date" : "...", # 数据日期
"url" : "...", # 数据来源
"content" : "...",
"source" : "..."
"category" : "...",
...
}
需要根据「相同标题+相同日期+相同来源」判定数据重复,在管道中根据这三项条件分组($group)后计数将数量>1的匹配($match)出来,最后遍历删除(db.collections.remove())
聚合操作的过程
$group: 使用title/date/url作为条件进行分组组成新的_id,并计数+1,dups中存放元数据的_id
$match: 在$group得到的分组基础上匹配数量>1的项
Mongo Shell 查询重复数据的操作如下:
db.test.aggregate([
{
$group: { _id: {'title': '$title','date':'$date','url': '$url'},count: {$sum: 1},dups: {$addToSet: '$_id'}}
},
{
$match: {count: {$gt: 1}}
}
])
Mongo Shell 将查询到的结果删除操作:
db.test.aggregate([
... // 同上聚合操作,此处略
]).forEach(function(doc){
doc.dups.shift(); // 去除重复组的第一个元数据_id,得到除第一个之外的其他元组
db.test.remove({_id: {$in: doc.dups}}); // remove()删除这些重复的数据
})
PyMongo 操作代码如下:
使用bulk_write()进行批量删除
pipeline = [
{
'$group': {
'_id': {'title': '$title', 'date': '$date', 'url': '$url'},
'count': {'$sum': 1},
'dups': {
'$addToSet': '$_id'
}
},
},
{
'$match': {
'count': {
'$gt': 1
}
}
}
]
map_id = map(lambda doc: doc['dups'][1:], db['data_value'].aggregate(pipeline=pipeline))
list_id = [item for sublist in map_id for item in sublist]
print(db['data_value'] \
.bulk_write(list(map(lambda _id: DeleteOne({'_id': _id}), list_id))) \
.bulk_api_result)
一行代码鬼畜版:
print(db['data_value'].bulk_write(list(map(lambda _id: DeleteOne({'_id': _id}), [item for sublist in map(lambda doc: doc['dups'][1:], db['data_value'].aggregate(pipeline=[{'$group': {'_id': {'title': '$title', 'date': '$date', 'url': '$url'},'count': {'$sum': 1},'dups': {'$addToSet': '$_id'}},},{'$match': {'count': {'$gt': 1}}}])) for item in sublist]))).bulk_api_result)