使用 sqlalchemy 有一段时间了,简单总结下使用方法和常用的查询操作。
初始化
主要包含数据库连接、表创建、创建会话以及通过上下文管理器来管理会话的开启与关闭。
from contextlib import contextmanager
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
# 连接数据库
engine = create_engine("sqlite:///test.db")
Base = declarative_base()
class User(Base):
__tablename__ = "user"
id = Column(Integer, primary_key=True, autoincrement=True, comment="自增主键")
username = Column(String, nullable=False, comment="用户名")
password = Column(String, nullable=False, comment="密码")
age = Column(Integer, comment="年龄")
country_id = Column(Integer, comment="国家id")
class Country(Base):
__tablename__ = "country"
id = Column(Integer, primary_key=True, autoincrement=True, comment="自增主键")
name = Column(Integer, nullable=False, comment="国家名")
# 创建数据表(所有继承了 Base 的表都会被创建)
Base.metadata.create_all(engine)
# 创建当前数据库的 Session 对象
Session = sessionmaker(engine)
# 创建会话对象,会话结束后需要关闭
session = Session()
# 关闭会话
session.close()
# 可以定义一个上下文管理器,实现会话的开启与关闭
@contextmanager
def open_session():
"""
Usage:
with open_session() as session:
# your sql code
"""
session = Session()
try:
yield session
session.commit()
except:
session.rollback()
raise
finally:
session.close()
基本查询
主要记录一下常用的几种查询:and、or、between、连表查询、分组查询还有排序以及对应的 sql 语句(还是原生 sql 写起来顺手一些 - -)。
# 1. and
# select * from user where age > 20 and id > 5;
session.query(User).filter(User.age > 20, User.id > 5).all()
# 2. or
# select * from user where age = 18 or id > 10;
from sqlalchemy import or_
session.query(User).filter(or_(User.age == 18, User.id > 10)).all()
# 3. between
# select * from user where age between 18 and 30;
session.query(User).filter(User.age.between(18, 30)).all()
# 3. 连表查询
# select u.username, u.age, c.name from user u join country c on u.country_id=c.id where u.age >= 18;
session.query(
User.username, User.age, Country.name).join(Country, User.country_id == Country.id).filter(User.age >= 18).all()
# 4. 分组查询
# select country_id, count(1) user_cnt from user group by country_id;
from sqlalchemy import func
session.query(User.country_id, func.count(1).label("user_cnt")).group_by(User.country_id).all()
# 5. 排序
# select * from user where country_id = 1 order by id desc;
session.query(User).filter(User.country_id == 1).order_by(User.id.desc()).all()