oslo db 是oslo系列工具库里专用于管理数据库连接、管理的工具。它主要对sqlalchemy做进一步封装使用。
github官网: https://github.com/openstack/oslo.db
使用:
一、定义models:
from sqlalchemy import Column, Integer, String, Float, SmallInteger, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import backref, column_property, relationship, validates
BASE = declarative_base()
class Country(BASE):
__tablename__ = "country"
Code = Column(String(3), nullable=False, primary_key=True)
Name = Column(String(52), nullable=False)
Continent = Column(String(25), nullable=False)
Region = Column(String(26), nullable=False)
SurfaceArea = Column(Float(10, 2), nullable=False)
IndepYear = Column(SmallInteger, nullable=False)
Population = Column(Integer, nullable=False)
LifeExpectancy = Column(Float(3, 1))
GNP = Column(Float(10, 2))
GNPOld = Column(Float(10, 2))
LocalName = Column(String(45), nullable=False)
GovernmentForm = Column(String(45), nullable=False)
HeadOfState = Column(String(60))
Capital = Column(Integer)
Code2 = Column(String(2), nullable=False)
class City(BASE):
__tablename__ = "city"
ID = Column(Integer, primary_key=True)
Name = Column(String(35), nullable=False)
CountryCode = Column(String(3), ForeignKey('country.Code'), nullable=False)
District = Column(String(20), nullable=False)
Population = Column(Integer, nullable=False)
一、常用操作
# -*- coding:utf-8 -*-
from oslo_db import exception as db_exc
from oslo_db import options
from oslo_db.sqlalchemy import session as db_session
import sqlalchemy
from sqlalchemy import MetaData
from sqlalchemy import or_, and_, case
from sqlalchemy.orm import joinedload, joinedload_all, undefer_group
from sqlalchemy.orm import RelationshipProperty
from sqlalchemy import sql
from sqlalchemy.sql.expression import bindparam
from sqlalchemy.sql.expression import desc
from sqlalchemy.sql.expression import literal_column
from sqlalchemy.sql.expression import true
from sqlalchemy.sql import func
from sqlalchemy.sql import sqltypes
from sqlalchemy import text
from models import City, Country
_FACADE = None
def _create_facade_lazily():
"""
加载
:return:
"""
global _FACADE
if _FACADE is None:
_FACADE = db_session.EngineFacade(
'mysql+pymysql://root:wyue@localhost:3306/world?charset=latin1'
)
return _FACADE
def get_engine():
"""
获得引擎
:return:
"""
facade = _create_facade_lazily()
return facade.get_engine()
def get_session(**kwargs):
"""
获得会话
:param kwargs:
:return:
"""
facade = _create_facade_lazily()
return facade.get_session(**kwargs)
def dispose_engine():
get_engine().dispose()
def print_city_info(city=None, country=None):
"""
打印信息
:param city:
:param country:
:return:
"""
info = {}
if city:
city_info = {'id': city.ID, 'name': city.Name, 'country_code': city.CountryCode, 'district': city.District,
'population': city.Population}
info.update({'city': city_info})
if country:
country_info = {'code': country.Code, 'name': country.Name, 'continent': country.Continent,
'region': country.Region,
'surfaceArea': country.SurfaceArea, 'IndepYear': country.IndepYear}
info.update({'country': country_info})
print info
def query_city_all():
"""
查询所有
:return:
"""
session = get_session()
with session.begin():
query = session.query(City)
print 'SQL : %s' % str(query)
for c in query.all():
print_city_info(c)
def query_city_filter1():
"""
单条件查询
:return:
"""
session = get_session()
with session.begin():
query = session.query(City).filter(City.Name == 'Gaza')
print 'SQL : %s' % str(query)
for c in query.all():
print_city_info(c)
def query_city_filter2():
"""
多条件查询
:return:
"""
session = get_session()
with session.begin():
condition = {'CountryCode': 'GBR', 'District': 'England'}
query = session.query(City).filter_by(**condition)
print 'SQL : %s' % str(query)
for c in query.all():
print_city_info(c)
def query_city_with_AND():
"""
使用AND运算符
:return:
"""
session = get_session()
with session.begin():
condition = [City.CountryCode == 'GBR', City.District == 'England', City.Name == 'Birkenhead']
query = session.query(City).filter(and_(*condition))
print 'SQL : %s' % str(query)
for c in query.all():
print_city_info(c)
def query_city_with_OR():
"""
使用OR运算符
:return:
"""
session = get_session()
with session.begin():
condition = [City.District == 'England', City.Name == 'Birkenhead']
query = session.query(City).filter(or_(*condition))
print 'SQL : %s' % str(query)
for c in query.all():
print_city_info(c)
def query_city_with_AND_OR():
"""
使用OR + AND 运算符
:return:
"""
session = get_session()
with session.begin():
query = session.query(City).filter(and_(
City.CountryCode == 'GBR',
or_(
City.Name == 'Maidstone',
City.Population == 91069
)
))
print 'SQL : %s' % str(query)
for c in query.all():
print_city_info(c)
def query_city_with_IN():
"""
使用 IN 运算符
:return:
"""
session = get_session()
with session.begin():
ids = [1, 2, 3, 5, 6, 8]
query = session.query(City).filter(City.ID.in_(ids))
print 'SQL : %s' % str(query)
for c in query.all():
print_city_info(c)
def query_city_with_LIKE():
"""
使用 like 运算符模糊查询
:return:
"""
session = get_session()
with session.begin():
query = session.query(City).filter(City.Name.like('He%'))
print 'SQL : %s' % str(query)
for c in query.all():
print_city_info(c)
def query_city_with_union():
"""
使用 union 集合查询
:return:
"""
session = get_session()
with session.begin():
query1 = session.query(City).filter(City.Name.like('He%'))
ids = [1, 2, 3, 5, 6, 8]
query2 = session.query(City).filter(City.ID.in_(ids))
query = query1.union(query2)
print 'SQL : %s' % str(query)
for c in query.all():
print_city_info(c)
def query_city_with_exists():
"""
使用 exist 集合查询
:return:
"""
session = get_session()
with session.begin():
query = session.query(City).filter(sql.exists().where(and_(
City.CountryCode == Country.Code,
Country.Name == 'Aruba'
)))
print 'SQL : %s' % str(query)
for c in query.all():
print_city_info(c)
def query_city_country_join():
"""
连接查询
:return:
"""
session = get_session()
with session.begin():
query = session.query(City.Name, Country.Name).join(Country).filter(Country.Name == 'Aruba')
print 'SQL : %s' % str(query)
print query.first()
# for c in query.all():
# print_city_info(c)
def query_city_group_by():
"""
group by 、sum求和、count求总数
:return:
"""
session = get_session()
with session.begin():
query = session.query(City.CountryCode, func.count(City.ID), func.sum(City.Population)). \
group_by(City.CountryCode). \
having(and_(
func.count(City.ID) > 10,
func.sum(City.Population) > 85699060))
print 'SQL : %s' % str(query)
print query.all()
def query_city_sub_query():
"""
子查询
SQL : SELECT country."Code" AS "country_Code", country."Name" AS "country_Name", country."Continent" AS "country_Continent", country."Region" AS "country_Region", country."SurfaceArea" AS "country_SurfaceArea", country."IndepYear" AS "country_IndepYear", country."Population" AS "country_Population", country."LifeExpectancy" AS "country_LifeExpectancy", country."GNP" AS "country_GNP", country."GNPOld" AS "country_GNPOld", country."LocalName" AS "country_LocalName", country."GovernmentForm" AS "country_GovernmentForm", country."HeadOfState" AS "country_HeadOfState", country."Capital" AS "country_Capital", country."Code2" AS "country_Code2"
FROM country
WHERE country."Code" IN (SELECT city."CountryCode"
FROM city GROUP BY city."CountryCode"
HAVING count(city."ID") > :count_1 AND sum(city."Population") > :sum_1)
:return:
"""
session = get_session()
with session.begin():
stmt = session.query(City.CountryCode). \
group_by(City.CountryCode). \
having(and_(
func.count(City.ID) > 10,
func.sum(City.Population) > 85699060)).subquery()
query = session.query(Country).filter(Country.Code.in_(stmt))
print 'SQL : %s' % str(query)
for c in query.all():
print_city_info(country=c)
def query_city_with_sql():
"""
执行sql
:return:
"""
session = get_session()
with session.begin():
qrySql = "select CountryCode as country_code, count(ID) cnt from city group by CountryCode having count(ID) > 20"
query = session.query('country_code', 'cnt').from_statement(text(qrySql))
print 'SQL : %s' % str(query)
rows = query.all()
for row in rows:
print 'country_code : %s , cnt : %s ' % row
def query_city_order_limit():
"""
排序、分页
:return:
"""
session = get_session()
with session.begin():
query = session.query(City).order_by(City.ID).limit(10).offset(5)
print 'SQL : %s' % str(query)
for c in query.all():
print_city_info(c)
def update_city():
"""
修改信息
:return:
"""
session = get_session()
with session.begin():
print 'before update:'
query = session.query(City).filter(City.ID == 1)
print 'SQL : %s' % str(query)
print_city_info(query.first())
print '1 after update:'
query.update({City.Population: 4022222})
# 使用update方法
print_city_info(query.first())
print '2 after update:'
# 使用赋值法
c = query.first()
c.Population = 10
print_city_info(query.first())
def add_city():
"""
修改信息
:return:
"""
session = get_session()
with session.begin():
c = City()
c.Name = 'SanMing2'
c.CountryCode = 'CHN'
c.District = 'Fujian'
c.Population = 160691
session.add(c)
query = session.query(City).filter(City.Name.like('SanMing%'))
for c in query.all():
print_city_info(c)
def delete_city():
"""
删除数据
:return:
"""
session = get_session()
with session.begin():
query = session.query(City).filter(City.Name == 'SanMing2').delete()
query = session.query(City).filter(City.Name.like('SanMing%'))
for c in query.all():
print_city_info(c)
def delete_city_in():
"""
用 in 作条件 删除数据
:return:
"""
session = get_session()
with session.begin():
print 'before delete'
query1 = session.query(City.ID).filter(City.Name.like('SanMin%'))
print query1.all()
# 删除记录时,默认会尝试删除 session 中符合条件的对象,而 in 操作估计还不支持,于是就出错了。
# 解决办法就是删除时不进行同步,然后再让 session 里的所有实体都过期
# 此外,update 操作也有同样的参数,如果后面立刻提交了,那么加上 synchronize_session=False 参数会更快。
session.query(City).filter(City.ID.in_(range(4086, 4096))).delete(synchronize_session=False)
def query_city_with_IN():
"""
使用 between 运算符, between 8 and 10
SQL : SELECT city."ID" AS "city_ID", city."Name" AS "city_Name", city."CountryCode" AS "city_CountryCode", city."District" AS "city_District", city."Population" AS "city_Population"
FROM city
WHERE city."ID" BETWEEN :ID_1 AND :ID_2
:return:
"""
session = get_session()
with session.begin():
query = session.query(City).filter(City.ID.between(8,10))
print 'SQL : %s' % str(query)
for c in query.all():
print_city_info(c)
if __name__ == '__main__':
delete_city_in()
二、遇到的错误:
错误日志:
oslo_db.exception.DBError: (pymysql.err.InternalError) (1093, u"You can't specify target table 'city' for update in FROM clause")
[SQL: u'DELETE FROM city WHERE city.`ID` IN (SELECT city.`ID` \nFROM city \nWHERE city.`Name` = %(Name_1)s)'] [parameters: {u'Name_1': 'SanMing2'}]
出错的代码:
session = get_session()
with session.begin():
stmt = session.query(City.ID).filter(City.Name == 'SanMing2').subquery()
session.query(City).filter(City.ID.in_(stmt)).delete(synchronize_session=False)
原因:
mysql中You can't specify target table for update in FROM clause错误的意思是说,不能先select出同一表中的某些值,再在同一语句中修改或删除这些值!