最近遇到一个需求,大致情景是这样的:多个worker同时从rabbitmq消息队列中取数进行处理,并将处理后的数据插入或更新到同一个MongoDB表中,该过程会存在大量并发,并涉及一些高级插入和更新语法。
注:由于个人是用python,因此后续的mongodb操作是基于pymongo来进行的
MongoDB文档数据结构设计
数据结构大致是这样的:
{"_id":"id", "name":"公司全称", "abbs":["公司简称1", "公司简称2", "公司简称3"], "source":["网站1", "网站2", "网站3"]}
数据集最终要求
对最终的数据集要求如下:
-
_id
字段为MongoDB自带的唯一值索引字段 -
name
字段,即公司的全称字段在整个表中必须唯一 -
abbs
字段,即公司简称字段为列表结构,更新时需直接用新的结果覆盖原结果 -
source
字段,即数据来源网站字段为列表结构,更新时需将不存在的网站新增,已存在的网站不能重复。
解决思路
当不考虑并发时,常规思路可能是这样的
在每次数据入库前,都先查询当前表中是否已有"name”字段的文档存在, 存在则更新, 不存在则插入:
if db.collection.find({"name": name_example}).count() == 0:
db.collection.insert_one(dict_example)
else:
# 重置更新abbs字段
db.collection.update_one({"name":name_example}, {"$set": {"abbs":abbs_example}})
# 增量不重复的方式更新source字段
db.collection.update_one({"name":name_example}, {"$addToSet":{"abbs":{"$each":abbs_example}}})
这里在更新时使用到了$set
、$addToSet
、$each
操作,这里不得不先讲一下这三个操作的含义:
- $set: 用于修改文档中指定的字段值,若不存在则创建
- $addToSet:将数组作为数据集使用进行的更新的时候, 若要保证数组内的元素不能重复,进行增量更新,则可使用该方法。但该方法并不会拆分给定的数组,会将整个数组作为元素插入到已有数组中。因此,通常会搭配下面的方法使用。
-
$each:一般会结合
$addToSet
一起使用,会先拆分数组元素,再批量进行addToSet唯一写入的操作。
或者你还知道可以使用upsert=True
的方式来更简单的合并插入和更新的操作。
当存在并发操作的情况时,应该这样
但是在我这个需求的实际情况下并不能直接这样操作,因为存在并发插入或更新时,你可以想象一个并发场景:
场景:在数据表中还没有某个name字段数据时,两个name字段相同的写入请求同时进来了,他们同时做了if判断,都得到当前字段不存在的结果,此时两个连接对象都将会做插入的操作,这样得到的结果将是数据表中同时存在两个name字段相同的文档。
然后该怎么避免这种情况呢,其实对于MongoDB来说,也可以简单巧妙的避开这一点,我使用的方式是设置唯一值索引字段,如果name字段是唯一的,那就不会出现重复插入的情况了。
-
为指定字段创建唯一值索引
在MongoDB中_id字段默认时唯一值索引字段,它永远不会重复,我其实们也可以添加其他字段为唯一值索引字段,创建代码如下:
# 单个字段唯一索引
db.collection.createIndex({name:1},{unique:true}
如需要多个字段联合唯一索引,可参照如下方式:
# 多个字段联合索引示例
db.collection.createIndex({name_one:1,name_two:1},{unique:true})
- 在设置好唯一索引字段后,代码逻辑修改如下:
try:
db.collection.insert_one(dict_example)
except DuplicateKeyError:
# 重置更新abbs字段
db.collection.update_one({"name":name_example}, {"$set": {"abbs":abbs_example}})
# 增量不重复的方式更新source字段
db.collection.update_one({"name":name_example}, {"$addToSet":{"abbs":{"$each":abbs_example}}})
这里直接去掉了每次更新数据前的查询操作,改为先直接进行insert操作,如果当前索引字段name
不存在,则会插入成功,若已经存在,则会捕捉异常并执行更新操作。即使在并发情况下,由于唯一索引机制,也不会出现同时插入相同文档的结果,并发下插入失败的的操作将会重新进行后续的更新操作。
这样既提高了MongoDB的写入的效率也解决了并发下重复数据的问题,故在此做个记录,希望能帮到有需要的人。如果有更好的解决办法,欢迎提出。