例子是网上找的
想总结下芒果的基本用法
cmd操作看这里
MongoDB使用小结:一些常用操作分享
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Date : 2017-07-13 14:04:21
# @Author : ditto (969956574@qq.com)
# @Link : https://github.com/dittoyy
# @Version : $Id$
######################这是没封装的pymongo1
# -*- coding: utf-8 -*-
import pymongo
import sys
import traceback
MONGODB_CONFIG = {
'host': '127.0.0.1',
'port': 27017,
'db_name': 'test',
'username': None,
'password': None
}
class MongoConn(object):
def __init__(self):
# connect db
try:
self.conn = pymongo.MongoClient(MONGODB_CONFIG['host'], MONGODB_CONFIG['port'])
self.db = self.conn[MONGODB_CONFIG['db_name']] # connect db
self.username=MONGODB_CONFIG['username']
self.password=MONGODB_CONFIG['password']
if self.username and self.password:
self.connected = self.db.authenticate(self.username, self.password)
else:
self.connected = True
except Exception:
# print traceback.format_exc()
print 'Connect Statics Database Fail.'
sys.exit(1)
if __name__ == "__main__":
my_conn = MongoConn()
# datas = [
# # #set _id with number
# # #是int32 int32
# {'_id':1, 'data':12},
# {'_id':2, 'data':22},
# {'_id':3, 'data':'cc'}
# # {'data':12},
# # {'data':22},
# # {'data':'cc'}
# ]
'''
js里是这样的哈~
js = 'for(var i=0;i<=1000;i++){db.my1test1.insert({'_id':i,'js1':'js'+i,$inc:{'age':i}})}'
js1="for(var i=0;i<=100;i++){db.my1test1.update({'_id':i},{$inc:{'age':i}},true)}"'''
# js循环插入数据
# for i in range(1000000):
# db.my_collection.insert({"test":"tnt","index":i})
'''#变成python是这样的
for i in range(1000):
my_conn.db.my1test1.insert({'_id':i,'js1':'js'+str(i),$inc:{'age':i}})
#这是最开始插入数据搞不懂为啥不可以像上面那样$inc'''
# for i in range(1000):
# my_conn.db.my1test1.update({'_id':i},{'$inc':{'age':i}},True)
#插入数据,'mytest'是上文中创建的表名
# my_conn.db['my1test'].insert(datas)
import time
start = time.time()
#查询数据,'my1test'是上文中创建的表名
res=my_conn.db['my1test1'].find({})
for k in res:
print k
end=time.time()
cost_time=end-start#处理时间
print my_conn.db['my1test1'].find().count()
<code>
插入
for i in range(100):
my_conn.db.my1test1.insert({'_id':i,'js1':'js'+str(i),$inc:{'age':i}})
更新
for i in range(1000):
my_conn.db.my1test1.update({'_id':i},{'$inc':{'age':i}},True)
res=my_conn.db['my1test1'].find({})
for k in res:
print k
print my_conn.db['my1test1'].find().count()
</code>
>pip install pymongo==2.8 感叹一句,比mysql友好多了==
#pymongo里的
cost=time.time()-start#这个可以计算秒差%d
db.collection_names()#所有聚集名字
db.col.find_one()#查看聚集的一条记录
db.col.find_one().keys()#查看聚集的所有key
for i in db.col.find():
print i#查看聚集的所有记录
db.col.find().count()#查看记录总数
db.col.find().sort("name", pymongo.ASCENDING)#对查询结果进行排序 (默认升序ASCENDING)
db.col.find().sort([("name", pymongo.ASCENDING), ("active_time", pymongo.DESCENDING)])#降序
db.col.update({"name": "mike"}, {"$set": {"active_time": "20130408120000"}})#更新记录
db.col.remove({"name": "mike"})#删除记录 (不带条件表示全部删除)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Date : 2017-07-13 15:32:19
# @Author : ditto (969956574@qq.com)
# @Link : https://github.com/dittoyy
# @Version : $Id$
######################这是封装的pymongo2网上的
# -*- coding: utf-8 -*-
import pymongo
import sys
import traceback
MONGODB_CONFIG = {
'host': '127.0.0.1',
'port': 27017,
'db_name': 'test',
'username': None,
'password': None
}
class Singleton(object):
# 单例模式写法,参考:http://ghostfromheaven.iteye.com/blog/1562618
def __new__(cls, *args, **kwargs):
if not hasattr(cls, '_instance'):
orig = super(Singleton, cls)
cls._instance = orig.__new__(cls, *args, **kwargs)
return cls._instance
class MongoConn(Singleton):
def __init__(self):
# connect db
try:
self.conn = pymongo.MongoClient(MONGODB_CONFIG['host'], MONGODB_CONFIG['port'])
self.db = self.conn[MONGODB_CONFIG['db_name']] # connect db
self.username=MONGODB_CONFIG['username']
self.password=MONGODB_CONFIG['password']
if self.username and self.password:
self.connected = self.db.authenticate(self.username, self.password)
else:
self.connected = True
except Exception:
# print traceback.format_exc()
print 'Connect Statics Database Fail.'
sys.exit(1)
def check_connected(conn):
#检查是否连接成功
if not conn.connected:
raise NameError, 'stat:connected Error'
def save(table, value):
# 一次操作一条记录,根据‘_id’是否存在,决定插入或更新记录
try:
my_conn = MongoConn()
check_connected(my_conn)
my_conn.db[table].save(value)
except Exception:
print traceback.format_exc()
def insert(table, value):
# 可以使用insert直接一次性向mongoDB插入整个列表,也可以插入单条记录,但是'_id'重复会报错
try:
my_conn = MongoConn()
check_connected(my_conn)
my_conn.db[table].insert(value, continue_on_error=True)
except Exception:
print traceback.format_exc()
def update(table, conditions, value, s_upsert=False, s_multi=False):
try:
my_conn = MongoConn()
check_connected(my_conn)
my_conn.db[table].update(conditions, value, upsert=s_upsert, multi=s_multi)
except Exception:
print traceback.format_exc()
def upsert_mary(table, datas):
#批量更新插入,根据‘_id’更新或插入多条记录。
#把‘_id’值不存在的记录,则插入数据库
#如果更新的字段在mongo中不存在,则直接新增一个字段
try:
my_conn = MongoConn()
check_connected(my_conn)
bulk = my_conn.db[table].initialize_ordered_bulk_op()
for data in datas:
_id=data['_id']
bulk.find({'_id': _id}).upsert().update({'$set': data})
bulk.execute()
except Exception:
print traceback.format_exc()
def upsert_one(table, data):
#更新插入,根据‘_id’更新一条记录,如果‘_id’的值不存在,则插入一条记录
try:
my_conn = MongoConn()
check_connected(my_conn)
query = {'_id': data.get('_id','')}
if not my_conn.db[table].find_one(query):
my_conn.db[table].insert(data)
else:
data.pop('_id') #删除'_id'键
my_conn.db[table].update(query, {'$set': data})
except Exception:
print traceback.format_exc()
def find_one(table, value):
#根据条件进行查询,返回一条记录
try:
my_conn = MongoConn()
check_connected(my_conn)
return my_conn.db[table].find_one(value)
except Exception:
print traceback.format_exc()
def find(table, value):
#根据条件进行查询,返回所有记录
try:
my_conn = MongoConn()
check_connected(my_conn)
return my_conn.db[table].find(value)
except Exception:
print traceback.format_exc()
def select_colum(table, value, colum):
#查询指定列的所有值
try:
my_conn = MongoConn()
check_connected(my_conn)
return my_conn.db[table].find(value, {colum:1})
except Exception:
print traceback.format_exc()
if __name__ == "__main__":
file_path = 'testmon2.txt'#读取文本里的数再导入,数据类型为string
company_list = []
with open(file_path, "r") as in_file:
for line in in_file:
dic={}
dic['_id']=line.split()[0]#生成的是字符串
dic['name']=line.split()[1]
company_list.append(dic)
print '-'*15
# print company_list
# upsert_mary('myt1',company_list)#this
datas = [
{'_id':8, 'data':88},
{'_id':9, 'data':99},
{'_id':36, 'data':3366}
]
#插入,'_id' 的值必须不存在,否则报错?
#为什么没有报错,原来一个是数字8,一个是字符串8???
# insert('myt1', datas)
# #插入
# data={'_id':6, 'data':66}
# save('myt1',data)
#更新数据
# update('myt1',{'_id':8},{'$set':{'data':'888'}}, False, False)
#更新或插入
# data={'_id':36, 'data':'dsd'}
# upsert_one('myt1',data)
# #查找。相对于 select _id from myt1
# res=select_colum('myt1',{},'_id')
# for k in res:
# for key, value in k.iteritems():
# print key,":",value
# #查找。相对于 select * from myt1
# res=find('myt1',{})
# for k in res:
# for key, value in k.iteritems():
# print key,":",value,
# print
# #查找。相对于 select * from myt1 limit 1
res=find_one('myt1',{})
for k in res:
print k,':',res[k]
'''
####可惜没有搞定索引,还是用别人的吧
def ensure_index(table, colum):
#插入索引
try:
my_conn = MongoConn()
check_connected(my_conn)
return my_conn.db[table].ensureIndex({colum:-1})
except Exception:
print traceback.format_exc()
def get_indexes(table):
#检索索引
try:
my_conn = MongoConn()
check_connected(my_conn)
return my_conn.db[table].getIndexes()
except Exception:
print traceback.format_exc()
def drop_indexes(table,colum):
#删除索引
try:
my_conn = MongoConn()
check_connected(my_conn)
return my_conn.db[table].dropIndexes({colum})
except Exception:
print traceback.format_exc()
'''
##########这是tests文件夹里的,感觉好神奇。用的是pytest的fixture,but我没运行成功,暂时不研究
from __future__ import absolute_import, division, print_function
import pytest
pymongo = pytest.importorskip('pymongo')
import os
from odo import discover, convert, append, resource, dshape, odo
from toolz import pluck
from copy import deepcopy
from bson.objectid import ObjectId
@pytest.fixture(scope='module')
def mongo_host_port():
import os
return (os.environ.get('MONGO_IP', 'localhost'),
os.environ.get('MONGO_PORT', 27017))
@pytest.fixture(scope='module')
def conn(mongo_host_port):
host, port = mongo_host_port
try:
return pymongo.MongoClient(host=host, port=port)
except pymongo.errors.ConnectionFailure:
pytest.skip('No mongo server running')
@pytest.yield_fixture
def db(conn):
try:
yield conn._test_db
finally:
conn.drop_database('_test_db')
@pytest.fixture
def raw_bank():
return [
{'name': 'Alice', 'amount': 100},
{'name': 'Alice', 'amount': 200},
{'name': 'Bob', 'amount': 100},
{'name': 'Bob', 'amount': 200},
{'name': 'Bob', 'amount': 300}
]
@pytest.yield_fixture
def bank(db, raw_bank):
db.bank.insert(deepcopy(raw_bank))
try:
yield db.bank
finally:
db.drop_collection('bank')
@pytest.yield_fixture
def empty_bank(db):
try:
yield db.empty_bank
finally:
db.drop_collection('empty_bank')
ds = dshape('var * {name: string, amount: int}')
def test_discover(bank, raw_bank):
assert discover(bank) == discover(raw_bank)
def test_discover_empty_db(db):
# NOTE: nothing has been added to the database because our fixtures create
# a new db and collections for each test that depends on them, so
# 'system.indexes' won't exist here yet
# The same is true in test_resource_db
assert discover(db).measure.names == []
def test_discover_db(bank, db):
assert 'bank' in set(discover(db).measure.names)
def test_resource_db(mongo_host_port):
db = resource('mongodb://{}:{}/_test_db'.format(*mongo_host_port))
assert db.name == '_test_db'
assert discover(db).measure.names == []
def test_resource_collection(mongo_host_port):
host, port = mongo_host_port
coll = resource('mongodb://{}:{}/db::mycoll'.format(*mongo_host_port))
assert coll.name == 'mycoll'
assert coll.database.name == 'db'
assert coll.database.connection.host == host
assert coll.database.connection.port == port
def test_append_convert(empty_bank, raw_bank):
ds = discover(raw_bank)
assert set(ds.measure.names) == {'name', 'amount'}
append(empty_bank, raw_bank, dshape=ds)
assert odo(empty_bank, list, dshape=ds) == list(
pluck(ds.measure.names, raw_bank)
)
@pytest.yield_fixture
def multiple_object_ids(db):
data = [
{'x': 1, 'y': 2, 'other': ObjectId('1' * 24)},
{'x': 3, 'y': 4, 'other': ObjectId('2' * 24)}
]
db.multiple_object_ids.insert(data)
try:
yield db.multiple_object_ids
finally:
db.drop_collection('multiple_object_ids')
def test_multiple_object_ids(multiple_object_ids):
assert discover(multiple_object_ids) == dshape('2 * {x: int64, y: int64}')
assert convert(list, multiple_object_ids) == [(1, 2), (3, 4)]