SQLAlchemy 入门教程
前言
目前,许多主流的语言,都实现了对象关系映射(Object Relational Mapper
,简称ORM
)的库包。ORM
的主要功能是将数据库表中的每条记录映射成一个对象。所有的数据库操作,都转化为对象的操作。这样可以增加代码的可读性和安全性。
ORM
优点:
- 简洁易读:将数据表抽象为对象(数据模型),更直观易读。
- 可移植:封装了多种数据库引擎,面对多个数据库,操作基本一致,代码易维护。
- 更安全:有效避免
SQL
注入。
当然性能上会低于直接执行SQL
语句,本文介绍SQLAlchemy
的一些基础操作。
1. 建立连接
任何SQLAlchemy
应用程序的开始都是一个名为engine
. 此对象充当连接到特定数据库的中心源,提供工厂和称为 connection pool
对于这些数据库连接。引擎通常是一个只为特定数据库服务器创建一次的全局对象,并使用一个URL
字符串进行配置,该字符串将描述如何连接到数据库主机或后端。
# dialect[+driver]://user:password@host/dbname[?key=value..]
engine = create_engine("mysql://scott:tiger@hostname/dbname", encoding='latin1', echo=True)
创建engine
的URL
格式为dialect[+driver]://user:password@host/dbname[?key=value..]
,其中dialect
表示数据库类型例如:mysql
、oracle
、postgresql
等,而driver
代表使用的数据库API
如:psycopg2
、pyodbc
等。
create_engine.echo
:设置为True
时会打印日志。可以查看调用的具体SQL
语句方便调试。create_engine.future
:标志设置为True
以便我们充分利用 2.0 style ,1.x
-->2.0
最主要的API
更改是从select()
改使用Query
对象。create_engine.pool_size
:连接池的大小默认为5个,设置为0时表示连接无限制。create_engine.pool_recycle
:设置时间以限制数据库多久没连接自动断开。
# python3 下使用pymysql连接mysql。
engine = create_engine(
"mysql+pymysql://user:pwd@host/dbname",
echo=True,
pool_size=8,
pool_recycle=3600
)
# 连接SQLite
engine = create_engine("sqlite+pysqlite:///:memory:", echo=True, future=True)
2. ORM的会话
使用ORM
时,基本的事务/数据库交互对象称为Session
。
在SQLAlchemy
中,这个对象通常传递我们给它的SQL
语句,它管理ORM
映射对象的持久性操作。
Session
的主要目的是建立与数据库的会话,它维护你加载和关联的所有数据库对象。它是数据库查询Query
的一个入口。
Session
通常在我们需要对数据库进行操作时创建。
一旦一个Session
创建成功,我们在这个Session
下完成多个事务(transaction
)。
究竟何时创建和关闭Session
,不能一概而论,但是一个原则是Session
不应该在操作事务的方法中创建。
sessionmaker
函数是配置Session
的工厂,通过它建立的配置参数创建产生新的Session
。
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
some_engine = create_engine('mysql+pymysql://username:password@localhost/mydb?charset=utf8')
ession = sessionmaker(bind=some_engine)
session = Session()
Session
的常见操作方法包括:
flush
:预提交,提交到数据库文件,还未写入数据库文件中。commit
:提交了一个事务。rollback
:回滚。close
:关闭会话。
3. 创建模型
前面有提到ORM
的重要特点,那么我们操作表的时候就需要通过操作对象来实现,现在我们来创建一个类,以常见的用户表举例:
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, String, Integer, SMALLINT, DECIMAL, Enum, TEXT, TIMESTAMP
Base = declarative_base()
class Users(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
name = Column(String(64), unique=True)
email = Column(String(64))
def __init__(self, name, email):
self.name = name
self.email = email
declarative_base()
是sqlalchemy
内部封装的一个方法,通过其构造一个基类,这个基类和它的子类,可以将Python
类和数据库表关联映射起来。
数据库表模型类通过__tablename__
和表关联起来,Column
表示数据表的列。
4. 生成表
Base.metadata.create_all(engine)
创建表,如果存在则忽略,执行以上代码,就会发现在数据库中创建了users
表。
5. 抽象模型
现在我们修改之前的用户表,加入新字段和一张订单表。
使用了一个基类,定义了共同的字段:id
和创建更新时间。值得注意的是基类中设置了__abstract__
如果不设置将会报错。
order
中定义了一个枚举类型,同时order
和user
使用user_id
进行关联并未使用外键。
import enum
from sqlalchemy import Column, String, Integer, SMALLINT, FLOAT, DECIMAL, Enum
from sqlalchemy.sql.functions import current_timestamp, current_time
from orm.base_model import BaseTimestampModel
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class PayType(enum.Enum):
alipay = 0
unionpay = 1
weixin = 2
balance = 3
combo = 4
package = 5
company = 6
offline = 7
class BaseModel(Base):
__abstract__ = True
id = Column(Integer, primary_key=True, autoincrement=True)
created_at = Column(TIMESTAMP, nullable=False, default=current_timestamp())
updated_at = Column(TIMESTAMP, nullable=False, default=current_timestamp()
class User(BaseModel):
__tablename__ = 'user'
mobile = Column(String)
nickname = Column(String)
status = Column(SMALLINT, default=1)
appid = Column(String, nullable=True)
def __init__(self, nickname, mobile):
self.nickname = nickname
self.mobile = mobile
class Order(BaseModel):
__tablename__ = 'order'
user_id = Column(Integer)
ordersn = Column(String, unique=True)
order_type = Column(SMALLINT, default=2)
pay_type = Column(Enum(PayType), default=PayType.alipay)
price = Column(DECIMAL)
6. 新增数据
add_user = User("test", "1351232322")
session.add(add_user)
session.commit()
session.add()
将会把Model
加入当前session
维护的持久空间(可以从session.dirty
看到)中,直到commit
时提交到数据库。
- Q1:
add
之后如何直接返回对象的属性?
可以在add
之后执行db.session.flush()
,这样便可在session
中get
到对象的属性。
- Q2:如何进行批量插入,性能比较?
批量插入共有以下几种方法,对它们的批量做了比较,分别是:
session.add_all()
< bulk_save_object()
< bulk_insert_mappings()
< SQLAlchemy_core()
7. 查询数据
查询是最常用的一个操作了,举个最简单的查询例子:
# 检索id = 1
user = session.query(Users).filter(id == 1).first()
# 检索全部用户
users = session.query(Users).all()
使用Join
查询
# 未使用外键
q = session.query(User).join(Order, User.id==Order.user_id)
# 方法2
q = session.query(User).join(Order, Order.user_id)
# 配置了外键
q = session.query(User).join(Order)
下一个例子使用SQL
中的函数和Left join
,检索当日下单用户昵称和订单信息。
from sqlalchemy.sql.functions import current_date
result = session.query(Order, User.nickname).outerjoin(
User, User.id == Order.user_id
).filter(
Order.order_type != 2,
sqlalchemy.func.date(Order.created_at) == current_date
).all()
使用 in
、or
查询语句。
# 检索id在[1, 2, 3]中的用户。
r = session.query(User).filter(User.id.in_([1,2,3])).all()
# 使用or
query = bus_session.query(User)
filters = [User.id.in_([1, 2, 3]), User.nickname.ilike("ab%")]
r = query.filter(sqlalchemy.or_(*filters)).all()
8. 更新数据
更新数据有两种方法,一种是使用query
中的update
方法:
session.query(User).filter_by(id=1).update({'name': "Jack"})
另一种是操作对应的表模型:
user = session.query(User).filter_by(name="Jack").first()
user.name = "test"
session.add(users)
session.commit()
9. 删除数据
和更新数据类似,删除数据也有两种方法,第一种:
user = session.query(User).filter(User.name == "test").first()
session.delete(user)
session.commit()
批量删除的时候建议使用第二种:
session.query(User).filter(User.name == "test").delete()
session.commit()