在MongoDB中使用批量操作或块操作「Bulk Write」在效率上有非常大的提升,适合大量写操作
第一次尝试使用批量操作进行数据清洗,并且用PyMongo模拟了少量数据来进行测试,构造50w条数据进行插入或更新操作。
模拟环境:
PyMongo 3.6.1
MongoDB 3.4.7
Python 3.6.4 :: Anaconda, Inc.
模拟数据项:
items = [
{'i': 0},
{'i': 1},
{'i': 2},
{'i': 3},
{'i': 4},
...
{'i': 500000},
]
按条插入/更新的情况如下:
方法 | 总数 | 单次条数 | 时间 | 语句 |
---|---|---|---|---|
save | 50w | 1 | 00:02:54 | db['test'].save(item) |
insert | 50w | 1 | 00:02:50 | db['test'].insert(item) |
insert批量插入的情况如下:
方法 | 总数 | 单次条数 | 时间 | 语句 |
---|---|---|---|---|
insert | 50w | 1k | 00:00:07 | db['test'].insert(items) |
insert | 50w | 10k | 00:00:08 | db['test'].insert(items) |
块操作的情况如下:
方法 | 总数 | 单次 | 时间 | 语句 |
---|---|---|---|---|
bulk_write + InsertOne | 50w | 1k | 00:00:09 | db['test'].bulk_write(list(map(InsertOne, items))) |
bulk_write + InsertOne | 50w | 10k | 00:00:07 | db['test'].bulk_write(list(map(InsertOne, items))) |
bulk_write + InsertOne | 50w | 50w | 00:00:09 | db['test'].bulk_write(list(map(InsertOne, items))) |
bulk_write + ReplaceOne | 50w | 1k | 00:00:20 | db['test'].bulk_write(list(map(lambda item: ReplaceOne({'_id': item['_id']}, item, upsert=True), items))) |
bulk_write + ReplaceOne | 50w | 10k | 00:00:21 | db['test'].bulk_write(list(map(lambda item: ReplaceOne({'_id': item['_id']}, item, upsert=True), items))) |
bulk_write + ReplaceOne | 50w | 50w | 00:00:22 | db['test'].bulk_write(list(map(lambda item: ReplaceOne({'_id': item['_id']}, item, upsert=True), items))) |
bulk_write + UpdateOne | 50w | 1k | 00:00:20 | db['test'].bulk_write(list(map(lambda item: UpdateOne({'_id': item['_id']}, {'$set': {'i': item['i']}}, upsert=True),items))) |
bulk_write + UpdateOne | 50w | 10k | 00:00:21 | db['test'].bulk_write(list(map(lambda item: UpdateOne({'_id': item['_id']}, {'$set': {'i': item['i']}}, upsert=True),items))) |
bulk_write + UpdateOne | 50w | 50w | 00:00:22 | db['test'].bulk_write(list(map(lambda item: UpdateOne({'_id': item['_id']}, {'$set': {'i': item['i']}}, upsert=True),items))) |
bulk_write + UpdateOne + InsertOne | 100w | 10k | 00:00:38 | db['test'].bulk_write(list(map(InsertOne, items1)) + list(map(lambda item: UpdateOne({'_id': item['_id']}, {'$set': {'i': 0}}, upsert=True),items2))) |
模拟代码如下:
import pymongo
import time
from pymongo import InsertOne, ReplaceOne, UpdateOne
from pymongo.errors import BulkWriteError
settings = {
'MONGO_HOST': "***", # 数据库地址
'MONGO_PORT': ***, # 数据库端口
'MONGO_DB': "***", # 数据库名
'MONGO_USER': "***", # 用户名
'MONGO_PSW': "***", # 密码
}
client = pymongo.MongoClient(host=settings['MONGO_HOST'],port=settings['MONGO_PORT'])
client.admin.authenticate(settings['MONGO_USER'], settings['MONGO_PSW'],mechanism='SCRAM-SHA-1')
db = client[settings['MONGO_DB']]
l1 = []
for i in range(500000, 1000001):
l1.append({'i': i})
l2 = list(db['test'].find({}))
start_time = time.time()
page = 0
count = 10000
while True:
skip = page * count
page = page + 1
items1 = l1[skip:skip + count]
items2 = l2[skip:skip + count]
items = list(map(InsertOne, items1)) + list(map(InsertOne, items1))
try:
db['test'].bulk_write( \
list(map(InsertOne, items1)) + \
list(map(lambda item: UpdateOne({'_id': item['_id']}, {'$set': {'i': 0}}, upsert=True),items2)))
except BulkWriteError as bwe:
print(bwe.details)
else:
print(page)
if page == 50:
break
end_time = time.time()
consume_time = end_time - start_time
consume_time = '{:0>2s}'.format(str(int(consume_time // 3600))) \
+ ':{:0>2s}'.format(str(int((consume_time // 60) % 60))) \
+ ':{:0>2s}'.format(str(int(consume_time % 60)))
print(consume_time)
注意:bulk_write(list)传入的list不能为空,会出现报错信息。
经过测试,可以看到批量操作与单条操作的写入效率相差非常大,Insert批量插入与Bulk Write快操作效率基本相同。
但bulk_write()可以将增删改操作合在一起,具有更好的灵活性。
吐槽:手贱循环了一个亿的数据进列表,系统直接跑死机了,PyCharm/SecureCRT/Studio 3T环境全部崩溃,连搜狗输入法都崩了!!!摔!!!