MongoDB, MySQL和Redis的区别和使用场景
- MySQL是关系型数据库,支持事件
- MongoDB和Redis是非关系型数据库,不支持事件
- 如何快速选择
- 希望速度快的时候,使用MongoDB或者Redis
- 数据量过大的时候,可以将频繁使用的数据存入Redis,其他的存入MongoDB
- MongoDB不用提前建表建数据库,使用方便,字段数量不确定的时候使用MongoDB
- 后续需要用到数据之间的关系,可以考虑MySQL
数据库常用操作
查询所有的数据库
show dbs
切换到某个数据库
use databaseName
Notice:
- 当该数据库存在时,会切换到该数据库
- 当数据库不存在时,会自动创建该数据库,并切换到该数据库
查看当前所处的数据库
db
集合常用操作
创建一个新的集合
db.createCollection(name,{capped,autoIndexId,size,max,...})
查询数据
db.collection.find({cretiria})
:
返回所有满足过滤条件的文档
db.collection.findOne({cretiria})
:
返回满足过滤条件的第一个文档
pretty()
:
将输出文档进行格式化
索引文档嵌套字段
对于文档:
{
"address": {
"city": "Los Angeles",
"state": "California",
"pincode": "123"
},
"tags": [
"music",
"cricket",
"blogs"
],
"name": "Tom Benzamin"
}
假设我们需要通过city,state,pincode字段来检索文档
db.users.find({'address.city':'Los Angeles'})
逻辑运算符
and :{key1:value1,key2:value2}
or :{$or:[{key1:value1},{key2:value2}]}
and ,or: {key1:value1,$or:[{key2:value2},{key3:value3}]}
范围运算符
$in
: 等于数组内某个值,{age:{$in:[18,20]}}
$nin
:不等于数组内任何一个值,{age:{$nin:[18,20]}}
投影
db.collection.find({},{field1:value,field2:value})
:
- 0:表示不显示
- 1:表示显示
Notice: -
_id
字段默认显示; - 括号内除
_id
字段,包括项(1)和排除项(0)不能同时使用
MongoDB 的更新操作
以下语句会将 name 为
xiaowang
的整个文档替换为{'name':'xiaozhao'}
db.collection.updateOne({'name':'xiaowang'},{'name':'xiaozhao'})
db.collection.replaceOne({'name':'xiaowang'},{'name':'xiaozhao'})
以下语句会将 name 为
xiaowang
的这个文档中的 name 字段更新为 xiaozhao
db.collection.updateOne({'name':'xiaowang'},{$set:{'name':'xiaozhao'}})
如果希望所有满足条件的数据都得到更新,可以使用
db.collection.updateMany({'name':'xiaowang'},{$set:{'name':'xiaozhao'}})
如果将参数
upsert
设置为 true,那么当没有找到满足条件的文档时,插入的文档将作为新文档插入
db.collection.updateOne({'name':'xiaowang'},{'name':'xiaozhao'},{'upsert':true})
MongoDB 删除操作
删除满足条件的第一条数据
db.collection.deleteOne({criterion})
删除满足条件的所有数据
db.collection.deleteMany({critetion})
统计个数
方法count()用于统计结果集合中文档的数量
db.collectionName.find({criteria}).count()
Notice:在新版本4.0中建议使用countDocuments(criteria,limit,skip,...)
代替,上面的语句等价于:
db.collectionName.countDocuments({criteria})
相当于:
db.collection.aggregate([
{$match:<query>},
{$group:{_id:null,n:{$sum:1}}},
])
消除重复
distinct()
返回文档特定字段的不同值
db.collectionName.distinct('去重字段',{条件})
db.stu.distinct(hometown,{age:{$gt:18}})
skip()
, limit()
, sort()
三个函数在一天语句中的时候,执行的顺序是 sort()
, skip()
, 最后是limit()
.
sort({field1:num1, field2:num2})
: 1表示升序排列,-1表示降序排列
建立索引
MongoDB使用createIndex()函数来创建索引,3.0版本之前使用db.collection.ensureIndex() 函数创建索引。
默认情况下,索引字段是可以有重复的
-
如果想要创建唯一值索引,可以使用:
db.collection.createIndex({field:1},{unique:true})
-
创建联合索引(对多个字段同时创建索引)
db.collection.createIndex({field1:1,field2:-1})
-
查看集合内文档的所有索引
db.collection.getIndexes()
-
删除索引:
-
db.collection.dropIndexes()
## 删除所有索引 -
db.collection.dropIndex(index_name)
## 删除指定索引
-
数据库的备份和恢复
数据库备份语句:
mongodump -h dbhost -d dbname -o dbdirectory
-
dbhost
:服务器地址, -
dbname
:要备份的数据库 -
dbdirectory
:存放备份数据库的路径
Notice:如果要备份本地数据库,可直接使用:
mongodump -o dbdirectory
## 会把所有的数据库文档进行备份
mongodump -d databaseName -o dbdirectory
## 会把指定的数据库进行备份
数据库恢复语句:
mongorestore -h dbhost -d dbname --dir directory
-
dbhost
:服务器地址, -
dbname
:用于存放数据的数据库名称 -
directory
:存放备份数据库集合的路径
Notice:如果要恢复到本地数据库,可直接使用:
mongorestore -d dbname --dir directory
Notice:这里的directory
指的是集合所在的路径,是备份数据库时所指定的路径dbdirectory
的下一级文件夹
数据聚合操作
聚合(aggregate)是基于数据处理的聚合管道,每个文档通过一个由多个阶段(stage)组成的管道,可以对每个阶段的管道进行分组、过滤等功能,然后经过一系列的处理,输出相应的结果。
常用的管道:
-
$match
:过滤数据,只输出满足条件的文档 -
$group
:将集合中的文档分组,用于统计结果 -
$project
:修改输入文档的结构,如重命名、增加、删除字段、创建计算结果 -
$sort
:将输入文档排序后输出 -
$skip
:跳过指定数量的文档,并返回余下的文档 -
$limit
:限制通过该阶段的文档数 -
$unwind
:将数组类型的字段拆分
$group
会根据_id
后面的字段进行分组,取字段的值的时候需要用$
运算符
db.collection.aggregate([
{`$match`:{`field`:value}},
{`$group`:{'_id':field,count:{`$sum`:1},'avg_age':{'$avg':'$age'}}},
{`$project`:{'cate':'_id','counter':'$count','_id':0,'avg_age':1,}},
])
$group
还可以根据多个字段进行分组
db.collection.aggregate([
{$group:{'_id':{'country':'$country','city':'$city'}}},
## 分组之后的结果是{_id:{country:value,city:value}}
## 取字典嵌套的字段的值,可以用`_id.country`,`_id.city`
{$group:{'_id':{'country':'$_id.country','city':'$_id.city'},'count':{$sum:1}}},
{$project:{'country':'$_id.country','city':'$_id.city','_id':0}}
])
$group
的另一种用法:用于统计所有文档
db.collection.aggregate([
{`$match`:{`field`:value}},
{`$group`:{'_id':null,count:{`$sum`:1},'avg_age':{'$avg':'$age'}}},
])
$sort
可对多个字段进行排序
db.collection.aggregate([
{$sort:{field1:1,field2:-1}}
])
$unwind
将文档中某数组类型的字段拆分成多条,每条包含数组中的一个值
db.t.insertOne({'_id':1,'item':'t-shirt',`size`:['S','M','L']})
db.t.aggregate([
{$unwind:'$size'},
])
结果为:
{'_id':1,'item':'t-shirt','size':'S'}
{'_id':1,'item':'t-shirt','size':'M'}
{'_id':1,'item':'t-shirt','size':'L'}
Notice:在默认情况下,使用$unwind
对数组字段展开时,会自动忽略null
和空数组,
如果想要保留null和空数组,可以设置参数preserveNullAndEmptyArrays
为true。
db.collection.aggregate({$unwind:{path:'$field',preserveNullAndEmptyArrays:true}})
explain()
:可用于查看命令执行的详细情况,输出的信息详细级别依次为:queryPlanner
(默认),executionStats
和allPlanExecution
db.collection.explain('executionStats').find()
MongoDB 原子操作常用命令
$set
: 用来指定一个键并更新键值,若该键不存在,就创建该键
{$set:{field:value}}
$inc
:对文档的某个数字类的键进行增减操作
{$inc:{field:value}}
$push
:把value追加到field里面,field一定要是数组类型才行,如果field不存在,会新增加一个数组类型进去。
{$push:{field:value}}
$pushAll
:同$push
,只是一次可以追加多个值到一个数组字段内。
{$pushAll:{field:value_array}}
$pull
:从数组field内删除一个等于value的值
{$pull:{field:_value}}
$pop
:删除数组的第一个或最后一个元素
{$pop:{field:1}}
$rename
:修改字段名称
{$rename:{old_field_name:new_field_name}}
正则表达式
MongoDB使用$regex
操作符来设置匹配字符串的正则表达式。
使用正则表达式之前不需要做任何配置。
考虑以下文档:
{
"post_text": "enjoy the mongodb articles on runoob",
"tags": [
"mongodb",
"runoob"
]
}
使用正则表达式查找 post_text 包含 Runoob 字符串的文章
db.posts.find({post_text:{$regex:'Runoob'}})
db.posts.find({post_text:/Runoob/})
## 查找以enjoy开头
db.posts.find({post_text:/^enjoy/})
## 查找以runoob结尾
db.posts.find({post_text:/Runoob$/})
如果需要检索不区分大小写,可以设置$options
为$i
以下命令将查找不区分大小写的字符串runoob
db.posts.find({post_text:{$regex:"runoob",$options:"$i"}})
优化正则表达式查询
- 如果文档中设置了索引,那么使用索引相比于正则表达式匹配查找所有的数据查询速度更快。
- 如果正则表达式是前缀表达式,所有匹配的数据将以指定的前缀字符串开始;
MongoDB 固定集合 (Capped Collection)
MongoDB 固定集合是性能出色,且有着固定大小的集合,就像一个环形队列,当集合空间用完之后,再插入的元素就会覆盖最初始的元素。
Notice: 固定集合无法使用remove()
函数删除部分文档,但是可以删除全部文档
创建固定集合
db.createCollection('cappedCollection',{capped:true,size:maxSize,max:maxNum})
其中:size是整个集合空间的大小,单位是KB;
max是集合文档个数的上限,单位是'个';
如果空间大小达到上限,插入新文档的时候,会覆盖第一个文档;
如果文档个数到达上限,插入新文档的时候,同样会覆盖第一个文档。
判断一个集合是否为固定集合:
db.collection.isCapped()
如果需要将一个已经存在的集合转换为固定集合,可以使用:
db.runCommand({'convertToCapped':collection,size:value})
固定集合的属性
- 对固定集合进行插入数据,速度极快
- 按照插入顺序的查询,输出速度极快
- 能够在插入最新数据时,淘汰最早的数据
常用用途
- 存储日志信息
- 缓存一些少量的文档
启动 MongoDB 容器:
docker run -d --network host --name mongodb -e MONGO_INITDB_ROOT_USERNAME=admin -e MONGO_INITDB_ROOT_PASSWORD=admin mongo
-
从
MongoDB
中删除集合, 有2种方式:-
db.coll.remove({})
: 删除文件之后, 文件原本占用的物理空间不会被回收; 是删除符合条件的数据, 不会删除其他结构(如索引等); -
db.coll.drop()
: 删除文件之后, 文件原本占用的物理空间会进行回收;
针对物理空间没有进行回收的情况, 可以使用
compact
命令释放这些空间, 具体命令为db.runCommand({"compact":"collection_name"})
. -
-
主币交易要求交易
hash
(数据库中的_id
)和代币交易的交易hash log_idx
block_number
(transaction_hash
,block_number
,log_index
)的组合是唯一值. 在使用pymongo
批量插入时, 需要:try: eth_trans_coll.insert_many( trans, ordered=False, bypass_document_validation=True) except pymongo.errors.BulkWriteError as e: logging.debug(e)
备注:
orderd=False
时, 会并行插入数据库, 可以大幅提高插入效率; 同时, 即便部分数据插入时出错, 也不会影响其他数据的插入.bypass_document_validation=True
可以绕过文件验证环节, 提高插入效率.
索引
删除索引
- 批量删除索引:
db.coll_name.dropIndexes([idx_name_1,idx_name_2])
explain
支持 queryPlanner
(仅给出执行计划)、executionStats
(给出执行计划并执行)和 allPlansExecution
(前两种的结合)三种分析模式,默认使用的是 queryPlanner
:
Mongo
会通过优化分析选择其中一种更好的方案放置到 winningPlan
,最终的执行计划是 winningPlan
所描述的方式。其它稍次的方案则会被放置到 rejectedPlans
中,仅供参考。
所以 queryPlanner
的关注点是 winningPlan
,如果希望排除其它杂项的干扰,可以直接只返回 winningPlan
即可:
db.coll_name.find().explain().queryPlanner.winningPlan
winningPlan
中,总执行流程分为若干个 stage
(阶段),一个 stage
的分析基础可以是其它 stage
的输出结果。从这个案例来说,首先是通过 IXSCAN
(索引扫描)的方式获取到初步结果(索引得到的结果是所有符合查询条件的文档在磁盘中的位置信息),再通过 FETCH
的方式提取到磁盘上各个位置所对应的完整文档。这是一种很常见的索引查询计划(explain
返回的是一个树形结构,实际先执行的是子 stage
,再往上逐级执行父 stage
)
除了 queryPlanner
之外,还有一种非常有用的 executionStats
模式:
db.coll_name.find().explain('executionStats').executionStats
总体上大同小异,一些关键字段可以了解一下:
-
nReturned
:执行返回的文档数 -
executionTimeMillis
: 执行时间(ms) -
totalKeysExamined
:索引扫描条数 -
totalDocsExamined
:文档扫描条数 -
executionStages
:执行步骤
MongoBD
的索引采用B树结构, 相较于时间复杂度为O(1)
的哈希结构, B树虽然效率为O(logN)
, 但它允许范围查询, 这是哈希树无法做到的. 但是以下语句无法使用索引:
- 正则表达式,及'非'操作符, 如
$nin
,$not
等; - 算术运算符, 如
$mod
等; -
$where
子句;
覆盖索引
常见的执行步骤为:
- 在内存中扫描索引(
IXSCAN
), 得到需要的结果(完整文档在磁盘上的位置); - 从磁盘上根据索引结果, 拉取特定位置的文档(
FETCH
); - 执行字段映射, 得到需要的字段;
当查询的字段和要返回的字段都在一个组合索引中, 数据库就不需要取磁盘抓取完整的文档了. _id
这个字段默认会返回, 但是如果不需要的化, 可以不让其返回.
文件排序
索引的2个主要用途就是分组和排序, 使用索引筛选, 也是建立在排序的基础上. 所以可以在排序字段和筛选字段上建立组合索引,
MongoDB
返回的文档顺序和查询时的扫描顺序一致, 扫描顺序又分为集合扫描(COLLSCAN
)和索引扫描(IXSCAN
);
- 集合中没有索引, 或者索引失效时,
MongoDB
会扫描全部文档, 扫描的顺序和该集合中的文档在磁盘上的存储顺序相同; - 当扫描索引时, 会在内存中对索引值进行匹配, 得到所有等匹配到的文档后, 再根据情况决定是否到磁盘拉取相应的文档, 或进行下一阶段. 因为索引本身就是排好序的, 所以扫描顺序就取决于索引本身的顺序;
常见的几个阶段:
-
IXSCAN
:通过索引, 扫描出相关文档的位置; -
FETCH
: 根据前面扫描到的位置到磁盘上抓取相应的完整文档; -
SORT_KEY_GENERATOR
: 获取每一个文档排序所用的健值; -
SORT
: 进行内存排序, 返回最终结果;(在内存中排序, 效率非常低, 当结果集大于32MB时, 还会返回错误).
批量插入/更新
BTC 的主币交易和代币交易都是使用 transaction_hash 作为 _id, 插入重复数据时, 可以直接使用 updateone 和 bulk_write;
其他币种的代币交易在transaction_hash, block_number, log_index
上建有唯一值索引, 需要根据这三个字段筛选数据, 有两种方式
# 逐个指定需要更新字段
tasks = [UpdateOne({'transaction_hash': token_transfer['transaction_hash'],
'block_number': token_transfer['block_number'],
'log_index': token_transfer['log_index']},
{'$set':{'from_address':token_transfer['from_address'],
'to_address':token_transfer['to_address']}}, upsert=True)
for token_transfer in token_trans]
trans_coll.bulk_write(
tasks, ordered=False, bypass_document_validation=True)
# 把重复数据删除 再批量插入
tasks = [DeleteOne({'transaction_hash': token_transfer['transaction_hash'],
'block_number': token_transfer['block_number'],
'log_index': token_transfer['log_index'], })
for token_transfer in token_trans]
trans_coll.bulk_write(
tasks, ordered=False, bypass_document_validation=True)
token_transfer_coll.insert_many(
token_trans, ordered=False, bypass_document_validation=True)
# 错误的方式 整体更新会更变_id, 而MongoDB中 _id 不能更改,所以会报错
tasks = [UpdateOne({'transaction_hash': token_transfer['transaction_hash'],
'block_number': token_transfer['block_number'],
'log_index': token_transfer['log_index']},
{'$set':token_transfer, upsert=True)
for token_transfer in token_trans]
trans_coll.bulk_write(
tasks, ordered=False, bypass_document_validation=True)
TIPS
-
$OR
: 很可能会导致索引失效, 最好能用$IN
代替, 如:# 存在索引 "province_1_city_1" db.collection.find({ $or: [{ province: "广东省" }, { province: "福建省" }], city: "深圳市", }) # 建议使用 db.collection.find({ province: { $in: ["广东省", "福建省"] }, city: "深圳市", })
-
在数组中遍历:
数据样例:{ "_id" : "1d8149eb8d8475b98113b5011cf70e0b7a4dccff71286d28b8b4b641f94f1e46", "block_number" : 700000, "block_timestamp" : ISODate("2021-09-11T04:14:32Z"), "value" : NumberDecimal("640388640"), "is_coinbase" : true, "inputs" : [ [ "0000000000000000000000000000000000000000000000000000000000000000", NumberLong("4294967295") ] ], "outputs" : [ [ "bc1q7wedv4zdu5smt3shljpu5mgns48jn299mukymc", 640388640 ], [ ], [ ] ] } # 需要查询outputs中包含指定地址的交易 db.trans_test.find({'outputs':{'$elemMatch':{'0':{'$eq':'bc1q7wedv4zdu5smt3shljpu5mgns48jn299mukymc'}}}}) db.trans_test.find({'outputs':{'$elemMatch':{'0':'bc1q7wedv4zdu5smt3shljpu5mgns48jn299mukymc'}}})
优化案例
- 在
time
和value
上创建组合索引,db.usdt_token_trans.createIndex({'time':1,'value':1})
: - 分别在
time
和value
上创建索引:
> db.usdt_token_trans.find({$and:[{time:{$gte:ISODate("2021-05-01T18:11:00Z")}},
{time:{$lte:ISODate("2021-05-01T19:11:00Z")}},
{value:{$gte:NumberLong("2998999900")}},
{value:{$lte:NumberLong("2999000100")}}]}).explain("executionStats")
# 创建组合索引时:
"executionStats" : {
"executionSuccess" : true,
"nReturned" : 520,
"executionTimeMillis" : 39,
"totalKeysExamined" : 5411,
"totalDocsExamined" : 520,
"indexBounds" : {
"time" : [
"[new Date(1619892660000), new Date(1619896260000)]"
],
"value" : [
"[49999999900, inf.0]"
]
# 在 time 和 value 上单独创建索引时
"executionStats" : {
"executionSuccess" : true,
"nReturned" : 520,
"executionTimeMillis" : 85,
"totalKeysExamined" : 33226,
"totalDocsExamined" : 33226,
"indexBounds" : {
"time" : [
"[new Date(1619892660000), new Date(1619896260000)]"
]
},
-
由上面的例子可以看出, 创建组合索引时, 查询时需要遵守
最佳左前缀法则
(先使用前面的字段, 然后再使用后面的字段, 否则就是全文扫描);- 组合索引前面的字段是范围查询时, 后面字段的索引依然可以使用;
-
在多个子段上分别建立索引时, 如果查询条件只有该字段时, 会使用索引扫描, 速度比组合索引稍快;
但如果查询条件是多个字段时, 也只会选择一个索引, 速度比组合索引慢很多.
# 创建组合索引"value_1_time_1"
"executionStages" : {
"stage" : "FETCH",
"nReturned" : 13,
"executionTimeMillisEstimate" : 5,
"indexName" : "value_1_time_1",
"indexBounds" : {
"value" : [
"[2998999900, 2999000100]"
],
"time" : [
"[new Date(1619892660000), new Date(1619896260000)]"
]
},
# 创建组合索引 "time_1_value_1"
"executionStats" : {
"executionSuccess" : true,
"nReturned" : 13,
"executionTimeMillis" : 58,
"indexName" : "time_1_value_1",
"indexBounds" : {
"time" : [
"[new Date(1619892660000), new Date(1619896260000)]"
],
"value" : [
"[2998999900, 2999000100]"
]
},
- 两者都是使用组合索引, 只是索引中字段的顺序不一样, 查询速度竟然差别这么大. 不明白为什么? 可能的原因是:
-
time
和value
的索引空间不同,value
的差异性比较小, 索引占用的空间也比较小, 所以更快;
-
查询非空集合
-
MongoDB
查询非空集合,并比较几种用法的效率
db.testcollection.find({"field1.0":{$exists: true}})
db.testcollection.find({"field1":{'$elemMatch':{'$ne':null}}})
db.testcollection.find({$where:"this.field1.length>0"})
db.testcollection.find({"field1":{$gt: []}})
# 在管道中可以这样使用,效率依次下降
{'$match':{'outputs':{'$ne':[]}}},
{'$match':{'outputs':{'$elemMatch':{'ne':null}}}},
{'$match':{'outputs.0':{'$exists':true}}},
{'$match':{'outputs':{'$gt':['$size',0]}}},
allowDiskUse
当数据集过大时, 使用 MongoDB进行查询会突破内存上线,报错信息为‘'QueryExceededMemoryLimitNoDiskUseAllowed'’, 这时可以在 aggregate()
加上一个参数 {allowDiskUse:true}
以便可以使用磁盘进行缓存, 即:db.coll.aggregate([{}...],{'allowDiskUse':true})
. 调用pymongo
模块时,需要写成addr_tag_testdata.aggregate([{}...], allowDiskUse=True)
的形式.
Decimal
格式
-
在
Python
中使用decimal
格式可以使用decimal
模块:import decimal # 可以查看 decimal 的一些设置参数 decimal.getcontext() # 以下为输出: Context(prec=28, rounding=ROUND_HALF_EVEN, Emin=-999999, Emax=999999, capitals=1, clamp=0, flags=[], traps=[Overflow, DivisionByZero, InvalidOperation]) # 修改确定精度最后一位的方式 decimal.getcontext().rounding = ROUND_UP # 'ROUND_HALF_EVEN':5以下舍弃,6以上加1;'ROUND_UP':向上取;'ROUND_DOWN':向下取 # 指定精度为6位(指总位数 包括整数位), 默认为28位; decimal.getcontext().prec=6 decimal.Decimal(arg) # 把科学计算法表示的数字转换成普通方法表示的数字, 可以用以下方法 decimal.Decimal(eval(sci_number))
-
Decimal()
的参数可以是整型, 也可以是字符串, 但不能是浮点型. - 该类型可以进行四则运算;
- 无法直接存入
MongoDB
中; -
建议: 在平常使用时, 通过
Decimal()
或to_decimal()
转化为Decimal
类型; 需要存到MongoDB
中时, 再转化成bson.decimal128.Decimal128()
类型, 从MongoDB
取出之后, 再通过to_decimal()
转化为Decimal
类型.
-
-
bson.decimal128.Decima128()
可以直接存到MongoDB
中, 具体方法为:from bson.decimal128 import Decimal128 from decimal import Decimal Decimal128('1') Decimal128(Decimal(1)) # 在MongoDB中显示为: NumberDecimal("1")
-
Decimal128()
只能接受decimal.Decimal()
类型和字符串类型作为参数, 不接受整型和浮点型; -
Decimal128()
类型无法进行四则运算, 需要通过Decimal128().to_decimal()
转换成Python
自带的Decimal
类型才可以. - 在
MongoDB
的shell
中, 可以使用NumberDecimal('111')
直接创建Decimal
类型;toDecimal()
可以将其他类型转化为Decimal
格式; 最大可以容纳34位的十进制数值. -
注意: 任何类型转化为浮点型后, 都可能会发生变化, 如字符串或整型
1
转化为浮点型后可能变成1.00000000000000456
; 如果需要非常精确地表示, 需要使用decimal.Decimal('1')
, 这样不论结果扩大多少倍, 后面依然是0
.
-
Session
如果客户端和MongoDB
服务器在30分钟内没有互动, 服务器会自动释放客户端连接, 解决方案有2个:
-
创建 session 时, 指定 session_id, 并且每5分钟刷新一次该 session_id (官方推荐方案);
# 可以使用该方法获取 session_id with client.start_session() as session: session_id = session.session_id['id'] # 但是没有在 pymongo api 中找到刷新 session 的方法
在 mongo shell 中可通过以下方式实现: 参考文档
# 创建新的 session, 并获取对应的 session_id var session = db.getMongo().startSession() var sessionId = session.getSessionId().id var cursor = session.getDatabase("examples").getCollection("data").find().noCursorTimeout() # 每隔5分钟刷新一次该 session 的连接 var refreshTimestamp = new Date() // take note of time at operation start while (cursor.hasNext()) { // Check if more than 5 minutes have passed since the last refresh if ( (new Date()-refreshTimestamp)/1000 > 300 ) { print("refreshing session") db.adminCommand({"refreshSessions" : [sessionId]}) refreshTimestamp = new Date() } // process cursor normally }
对集合操作时, 指定 session, 参考文档
with client.start_session() as session:
db.coll.find({}, session=session)
-
查询数据时, 通过
batch_size
指定每个 batch 的大小, 保证该 batch 在30分钟内肯定能执行完;# 依然会返回全部数据, 但是每个 batch 只返回30条 迭代的每个 document 也一样 for document in db.collection.find().batch_size(30) print(document) import time import pymongo # 当 batch_size = 2 时, 耗时:4.8 S # 当 batch_size = 100 时, 耗时:0.42 S # 当 batch_size = 10000 时, 耗时:0.25 S # 不指定 batch_size 时, 耗时: 0.255 S # counter 最终都是 13929 with pymongo.MongoClient('192.168.1.7', 27017)as client: db = client['btc'] block_info_coll = db['block_info'] result = block_info_coll.find({}).batch_size(10) t0 = time.time() counter = 0 for block_info in result: counter += 1 print(counter) t1 = time.time() print(f'共耗时:{t1-t0} S')
Date
Date()
会返回当前时间 (字符串类型)
new Date()
会返回当前时间 (ISODate 类型)
也可以通过传递参数生成指定日期的时间. 可接受的参数类型有以下几种:
-
new Date('2022-01-01')
: 返回 ISODate("2022-01-01T00:00:00Z")(UTC 时区) -
new Date('2022-01-01T08:00:00')
: 返回 ISODate("2022-01-01T00:00:00Z") (会自动转换成UTC时区), 如果MongoDB的服务器时区就是UTC时区, 会返回 ISODate("2022-01-01T08:00:00Z") -
new Date('2022-01-01T08:00:00Z')
: 返回 ISODate("2022-01-01T08:00:00Z") -
new Date(<integer>)
: 以毫秒为单位的 UNIX 时间戳;
数据导出/导入
将 MongoDB 中指定的集合导出为文件的命令:
mongoexport -d btc_b -c transaction_60w --type csv -o transaction_60w.csv --limit 10 --sort '{block_number:1}' -f "_id,block_number,block_timestamp,is_coinbase,value,inputs,outputs,new_inputs"
遇到的问题: 怎么在导入时指定字段的类型, 如 block_number.int32()
, 对应的命令为:
mongoimport -d btc -c transaction_60w -f 'block_number.int32()' --type cs
v --columnsHaveTypes --drop --mode upsert transaction_60w.csv
使用该命令的难点在于: 导入时, 没有找到指定各个字段的类型.
可以导出为 json
格式, 会自带类型, 但是会将二进制导出为字符串, 在导入时没有找到将其类型指定为二进制的方法.
# 导出命令
mongoexport -d btc_b -c transaction_60w --type json -o transaction_60w.json --limit 10 -f "_id,block_number,block_timestamp,is_coinbase,value,inputs,outputs,new_inputs"
# 导入命令
mongoimport -d btc -c transaction_60w --type json --jsonArray --drop --mode upsert transaction_60w.json
使用 mongodump
导出指定的集合, 具体命令为:
# 导出指定集合
mongodump -d btc_b -c transaction_70w -o transaction_70w.bson
# 导入指定集合
mongorestore -d btc -c transaction_70w --drop transaction_70w.bson