Learning SQLAlchemy
Basic knowledges
-
SQLAlchemy 有两个主要部分:
- Core(Relational Model focused)
- Object Relational Model(ORM)(User Data Model focused)
-
SQLAlchemy提供了一种统一的方式来操作不同的数据库,包括:SQLite,PostgreSQL,MySQL,MS SQL,Oracle……
from sqlalchemy import create_engine engine = create_engine("URL") connection = engine.connect() # URL的结构:dialaect[+driver]://user:password@host:port/dbname # dialaect:数据库的名称,比如mysql,oracle,postgresql…… # e.g."mysql://scott:tiger@localhost/test # e.g."sqlite:///abc.sqlite"
-
Engine:SQLAlchemy与其他数据库交互的接口
-
Connection string(URL)
:提供寻找(登陆)数据库所必须的所有详细信息 -
engine.table_names()
:返回数据库中所有表的名称的列表
-
Reflection:读取数据库,构建SQLAlchemy表格对象
from sqlalchemy import MetaData, Table
# MetaData:存储数据库信息(比如各种表格)的目录
metadata = MetaData()
#第一个参数传入数据库的名称
# autoload参数默认为False,此时可以手动定义和添加column对象,若参数设定为True,则自动从数据库中导出column对象,导出的对象可能会替换我们设定的column对象
census = Table('census',metadata,autoload=True,autoload_with=engine)
#使用repr()功能来预览表格的细节,可以查到列名、列数据的类型
print(repr(census))
print(metadata.tables) #以字典的形式返回metadata中的所有表格
print(metadata.tables['census']) #等价于repr(census)
# databasename.columns.keys() 返回列名组成的列表
print(census.columns.keys())
- 基本的SQL querying:
from seqalchemy import create_engine
engine = create_engine('sqlite:///census.sqlite')
connection = engine.connect()
stmt = 'SELECT * FROM people'
result_proxy = connection.execute(stmt)
results = result_proxy.fetchall()
#输出行
first_row = results[0]
print(first_row)
>> ('Illinois','M',0,89600,95012)
#输出该行数据对应的列名,使用.keys()方法,返回结果是列表
print(first_row.keys())
>>['state','sex','age','pop2000','pop2008']
#输出具体某一列的数值
print(first_row.state)
- 使用SQLAlchemy querying:
- 创建
engine
- 创建
connection
- 创建
metadata
reflection table
- 选择
query
的方法 -
execute
&fetchall()
-
execute
的结果是ResultProxy
-
fetchall()
的结果是ResultSet
,是真实的数据值
-
- 创建
from sqlalchemy import create_engine, MetaData, Table, select
engine = create_engine('sqlite:///census.sqlite')
connection = engine.connect()
metadata = MetaData()
census = Table('census',metadata,autoload=True,autoload_with=engine)
stmt = select([census]) #select的参数是个list,这里仅包含了census一个元素
results = connection.execute(stmt).fetchall()
#读取第一行第一列的数据
print(results[0][0]) or print(results[0]['column name'])
Applying Filtering, Ordering and Grouping to Queries
Filter
- 使用
where
方法来进行条件过滤
stmt = select([census])
stmt = stmt.where(census.columns.state == 'California')
results = connection.execute(stmt).fetchall()
for result in results:
print(result.state, result.age)
- 复杂的条件判断
-
in_(),like(),between()
:-
in_()
:匹配列表中的值 -
like()
:匹配通配符的部分值 -
between()
:检查返回值是否在提供的区间内
-
- 这些表达式都是列对象的方法
-
stmt = selcet([census])
stmt = stmt.where(census.columns.state.startwith('New'))
for result in connection.execute(stmt): #SQLAlchemy的特性,可以直接使ResultProxy作为循环的目标
print(result.state, result.pop2000)
-
连词的使用:
and_(),not_(),or_()
- 注意使用前需要导入算子,
and_()
和or_()
方法也可以使用|
和&
算子来实现,但是要记得不同条件要用括号括起来
e.g.1
- 注意使用前需要导入算子,
from sqlalchemy import or_
stmt = select([census])
stmt = stmt.where(
or_(census.columns.state == 'California',
census.columns.state == 'New York')
)
#写法二:
#stmt = stmt.where(
# (census.columns.state == 'California') |
# (census.columns.state == 'New York')
#)
for result in connection.execute(stmt):
print(result.state, result.sex)
e.g.2
stmt = select([census])
#查找名字在列表states中的州
stmt = stmt.where(census.columns.state.in_(states))
e.g.3
#查找纽约州年纪为21岁或37岁的数据
stmt = select([census])
stmt = stmt.where(
and_(census.columns.state == 'New York',
or_(census.columns.age == 21,
census.columns.age == 37)
)
)
-
输出结果排序:
简单排序:
order_by()
语句,默认按升序排序,对于字母,即按字母表顺序排序-
e.g.
stmt = select([census.columns.state]) stmt = stmt.order_by(census.columns.state) #降序排序 from sqlalchemy import desc stmt = stmt.order_by(desc(census.columns.state))
复杂排序:在
order_by()
语句中传入多列,用逗号隔开,按传入列的先后顺序排序-
e.g.
from sqlalchemy import desc stmt = select([census.columns.state,census.columns.age]) #州按升序排序,年龄按降序排序 stmt = stmt.order_by(census.columns.state, desc(census.columns.age)) results = connection.execute(stmt).fetchall() print(results) >> [('Alabama', 85), ('Alabama', 85), ('Alabama', 84), ('Alabama', 84), ('Alabama', 83), ('Alabama', 83), ('Alabama', 82), ('Alabama', 82), ('Alabama', 81), ('Alabama', 81), ('Alabama', 80), ('Alabama', 80), ('Alabama', 79), ('Alabama', 79), ('Alabama', 78), ('Alabama', 78), ('Alabama', 77), ('Alabama', 77), ('Alabama', 76), ('Alabama', 76)]
Counting,Summing and Grouping Data
- 聚合函数的功能集成在
func
模块中,注意不要直接import sum()……
,会与Python的内置函数冲突
from sqlalchemy import func
#求和
stmt = select([func.sum(census.columns.pop2008)])
results = connection.execute(stmt).scalar()
#.scalar() get just the value of a query that returns only one row and column
print(results)
>>302876613
-
Group by
stmt = select([census.columns.sex,func.sum(census.columns.pop2008)]) stmt = stmt.group_by(census.columns.sex) results = connection.execute(stmt).fetchall() print(results) >> [('F',153959198),('M',148917415)]
-
SQLAlchemy 在 ResultSet 中自动为 functions 生成列名
- 列名通常为:
func_#
,比如count_1
- 导致操作困难
- 使用
label()
函数来更改列名
print(results[0].keys()) >> ['sex',u'sum_1'] #更改生成列的列名 stmt = select([census.columns.sex, func.sum(census.columns.pop2008).label('pop2008_sum')]) stmt = stmt.group_by(census.columns.sex) results = connection.execute(stmt).fetchall() print(results[0],keys()) >> ['sex','pop2008_sum']
- 列名通常为:
#多列聚合类似于多类排序,按照传入列的顺序进行聚合
#求出不同性别下,各年龄段在2008年的总人口数
stmt = select([census.columns.sex,census.columns.age,
func.sum(census.columns.pop2008)])
stmt = stmt.group_by(census.columns.sex, census.columns.age)
results = connection.execute(stmt).fetchall()
print(results)
>>
[('F',0,2105442),('F',1,2087705),('F',2,2037280)……]
-
distinct()
方法:按列中不同的值分类
#求出一共统计了多少个州的数据
stmt = select([func.count(census.columns.state.distinct())])
distinct_state_count = connection.execute(stmt).scalar()
print(distinct_state_count)
>>51
#打印出统计的各个州的名称
stmt = select([census.columns.state.distinct()])
different_state = connection.execute(stmt).fetchall()
print(different_state)
>>
[('Illinois',), ('New Jersey',), ('District of Columbia',), ('North Dakota',), ('Florida',), ('Maryland',), ('Idaho',), ('Massachusetts',), ('Oregon',), ('Nevada',), ('Michigan',), ('Wisconsin',), ('Missouri',), ('Washington',), ('North Carolina',), ('Arizona',), ('Arkansas',), ('Colorado',), ……]
#复杂聚合
from sqlalchemy import func
stmt = select([census.columns.state,func.count(census.columns.age)])
stmt = stmt.group_by(census.columns.state)
results = connection.execute(stmt).fetchall() #返回结果是list
print(results)
print(results[0].keys())
>>
[('Alabama', 172), ('Alaska', 172), ('Arizona', 172), ('Arkansas', 172), ('California', 172), ('Colorado', 172), ('Connecticut', 172), ('Delaware', 172), ('District of Columbia', 172), ('Florida', 172), ('Georgia', 172), ('Hawaii', 172), ('Idaho', 172), ('Illinois', 172), ('Indiana', 172), ('Iowa', 172), ('Kansas', 172), ('Kentucky', 172), ('Louisiana', 172), ('Maine', 172), ('Maryland', 172), ('Massachusetts', 172), ('Michigan', 172), ('Minnesota', 172), ('Mississippi', 172), ('Missouri', 172), ('Montana', 172), ……]
['state', 'count_1']
- 可以先将
func
函数的表达式写出并赋给一个变量,同时完成新增列的命名
#求出每个州2008年的总人数
from sqlalchemy import func
pop2008_sum = func.sum(census.columns.pop2008).label('population')
stmt = select([census.columns.state,pop2008_sum])
stmt = group_by(census.columns.state)
results = connection.execute(stmt).fetchall()
print(results)
>>
[('Alabama', 4649367), ('Alaska', 664546), ('Arizona', 6480767), ('Arkansas', 2848432), ('California', 36609002), ('Colorado', 4912947), ('Connecticut', 3493783), ('Delaware', 869221), ('District of Columbia', 588910), ('Florida', 18257662), ('Georgia', 9622508), ('Hawaii', 1250676), ('Idaho', 1518914), ('Illinois', 12867077), ('Indiana', 6373299), ('Iowa', 3000490), ('Kansas', 2782245), ('Kentucky', 4254964), ('Louisiana', 4395797), ('Maine', 1312972), ('Maryland', 5604174), ('Massachusetts', 6492024), ('Michigan', 9998854), ……]
- ResultsProxy 可以直接与pandas交互转换成DataFrame
import pandas as pd
df = pd.DataFrame(results)
df.columns = results[0].keys() #列名提取作为DataFrame的列
Advanced SQLAlchemy Queries
- 数值计算
#计算2000年到2008年之间人数最多的前5个年龄段
from sqlalchemy import desc
stmt = select([census.columns.age,
(census.columns.pop2008-census.columns.pop2000)
.label('pop_change')])
stmt = stmt.group_by(census.columns.age)
stmt = stmt.group_by(desc('pop_change'))
stmt = stmt.limit(5) #仅返回前5名
results = connection.execute(stmt).fetchall()
print(results)
- Case Statement
- 接受条件的列表来进行匹配,最终返回一个满足条件匹配的列对象
- 条件匹配最终以else子句结束,用来处理那些不匹配条件的情况
from sqlalchemy import case,func
#求纽约2008年的人口数
stmt = select([
func.sum(
case([
(census.columns.state == 'New York',census.columns.pop2008),
else_=0 #如果数据来自纽约,则返回其2008年人口数用以求和,否则返回0
])
)
])
results = connection.execute(stmt).fetchall()
print(results)
>> [(19465159,)]
- Cast Statement
- 用来进行数据类型的转换
- 整型转为浮点型方便进行除法运算
- 字符串转为日期和时间
- 参数接受列对象或者是表达式,以及目标数据类型
- 用来进行数据类型的转换
#求出居住在纽约的总人口占比
from sqlalchemy import case, cast, Float
stmt = select([
(func.sum(
case([(census.columns.state == 'New York',
censeus.columns.pop2008)],
else_=0)) #纽约的总人口数
/cast(func.sum(census.columns.pop2008),Float)*100 #除以2008年的总人口数 *100%
).label('ny_percent')
])
results = connection.execute(stmt).fetchall()
print(results)
>> [(Decimal('6.4267619765'),)]
#为了方便阅读也可以分开写
NY_pop2008 = func.sum(
case([
(census.columns.state == 'New York',census.columns.pop2008)
],else_=0)
) #求纽约的人口数
total_pop2008 = cast(func.sum(census.columns.pop2008),Float) #求总的人口数
stmt = select([NY_pop2008/total_pop2008*100])
percent = connection.execute(stmt).scalar()
print(percent)
SQL Relationships
- 对于已经定义好表间关系的表格,使用SQLAlchemy自动结合两张表
stmt = select([census.columns.pop2008,state_fact.columns.abbreviation])
results = connection.execute(stmt).fetchall()
- 对于没有预定义表间关系的表格,
join
接受一个表格以及额外的表达式来解释两张表的关系- 给
join
子句传入一个布尔表达式来解释两张表是怎样关联的 - only join rows from each table that can be related between the two columns
- 不允许在数据类型不同的列间建立关系
- 给
-
join
子句紧跟着select()
子句且在任意where()
,order_by
或group_by()
子句之前 - 当我们需要创建一个请求,不再从每个列表中选择需要的列,而是同时使用两个表时,使用
select_from
语句来实现,join
子句插入其中 - e.g.1
stmt = select([func.sum(census.columns.pop2000)])
stmt = stmt.select_from(census.join(state_fact))
stmt = stmt.where(state_fact.columns.circuit_court == '10')
result = connection.execute(stmt).scalar()
- e.g.2
stmt = select([func.sum(census.columns.pop2000)])
stmt = stmt.select_from(census.join(state_fact,
census.columns.state == state_fact.colums.name))
stmt = stmt.where(state_fact.columns.census_division_name == 'East South Central')
result = connection.execute(stmt).scalar()
- 使用分级表
- 分级表(hierarchical tables):
- Contain a relationship with themselves
- 通常用来储存:组织图(organizational charts),地理资料(geographic data),网络(networks)和关系图(relationship graphs)等
-
alias()
方法用来对同一张表创建两个不同的名字,即提供了一种通过不同名称来访问同一张表的方法
- 分级表(hierarchical tables):
managers = employees.alias()
stmt = select([manager.columns.name.label('manager'),
employees.colums.name.label('employee')])
stmt = stmt.select_from(employees.join(managers,
managers.columns.id == employees.colums.manager))
stmt = stmt.order_by(managers.colums.name)
print(connection.execute(stmt).fetchall())
>>
[(u'FILLMORE',u'GRANT'),(u'FILLMORE',u'ADAMS'),……]
- Dealing with Large ResultSet
- 当数据量过大时,可能会引起存储空间不够的问题
-
fetchmany()
方法允许仅读取部分数据,将需要提取数据的数量传入该方法 - 当没有数据时,返回空列表
- 在完成数据处理后要关闭
ResultProxy
#已经完成以下定义:
#more_results = True,字典state_count用来存储每个州出现的次数,results_proxy是ResultsProxy类型
while more_results:
partial_results = results_proxy.fetchmany(50)
if partial_results == []:
more_results = False
for row in partial_results:
state_count[row.state] += 1
results_proxy.close()
Creating and Manipulating your own Databases
Creating Databases and Tables
- 对于SQLite,可以直接使用
create_engine()
来创建数据库
from sqlalchemy import create_engine, Metadata
from sqlalchemy import (Table, Column, String, Integer, Decimal, Boolean)
engine = create_engine(URL)
metadata = Metadata()
employees = Table('employees',metadata,
Column('id', Integer()),
#设定name字段不允许出现重复值和空值
Column('name', String(255), unique=True, nullable=False),
#设定salary字段的默认值为100
Column('salary', Decimal(),default=100.00),
#设定active字段的默认值为True
Column('active', Boolean(),default=True))
metadata.create_all(engine)
#可以使用.constraints方法来查看表中设定了哪些限制
print(employees.constraints)
- 添加数据
from sqlalchemy import insert
#insert()方法接受表名作为参数,插入的数值写在.values()里
stmt = insert(employees).values(id=1,name='Jason',salary=1.00,active=True)
result_proxy = connection.execute(stmt) #注意insert方法不返回任何行,所以不用调用fetchall
print(result_proxy.rowcount) #.rowcount属性可以查看添加了多少行
#添加多行的方法:
#构建一条不带任何值的statement语句,构建一个字典的列表用来存储需要添加的值,然后在connection中同时将列表和语句传给execute()方法作为参数
stmt = insert(employees)
values_list = [
{'id':2, 'name':'Rebecca', 'salary':2.00, 'active':True},
{'id':3, 'name':'Bob', 'salary':0.00, 'active':False}
]
result_proxy = connection.execute(stmt,values_list)
print(result_proxy.rowcount)
>> 2
-
将CSV文件添加进表格
- 使用
CSV
模块来建立一个csv_reader
,csv_reader
是一个阅读器对象,可以迭代CSV文件中的行
import csv file = open('XXX.csv') csv_reader = csv.reader(file) stmt = insert(census) values_list = [] total_rowcount = 0 #使用枚举方法迭代读取csv文件 for idx, row in enumerate(csv_reader): data = {'state':row[0], 'sex':row[1], 'age':row[2], 'pop2000':row[3], 'pop2008':row[4]} values_list.append(data) if idx % 51 == 0:#验证数据是否添加完整(数据中共统计了52个州,即0-51) results = connection.execute(stmt,values_list) total_rowcount += results.rowcount #求出一共添加了多少组数据
- 使用
Updating Date in a Database
- 使用
update()
语句来进行更新,语法结构类似于insert()
- 使用
where()
子句来选择要更新的数据 - 使用
.values()
子句来更新数据的值
from sqlalchemy import update
stmt = update(employees)
stmt = stmt.where(employees.columns.id == '3')
stmt = stmt.values(active = True)
results_proxy = connection.execute(stmt)
print(result_proxy.rowcount)
#更新多条数据
stmt = update(employees)
stmt = stmt.where(employees.colums.active == True)
stmt = stmt.values(active = False, salary = 0.00)
result_proxy = connection.execute(stmt)
print(result_proxy.rowcount)
#同步更新:从本表或其他表中选出某个数据,用来作为更新的值更新现有表格中的数据
new_salary = select([employees.columns.salary])
new_salary = new_salary.order_by(desc(employees.columns.salary))
new_salary = new_salary.limit(1) #选出工资最高的值
stmt = update(employees)
stmt = stmt.values(salary = new_salary) #修改所有数据
result_proxy = connection.execute(stmt)
从表格中删除数据
- 使用
delete()
语句来执行删除功能 - 添加
where()
子句来确定需要删除的数据 - 删除的数据不易恢复,所以执行删除操作时请务必谨慎
- 检查删除的行数来防止误删除太多的数据
from sqlalchemy import delete
delete_stmt = delete(extra_employees)
result_proxy = connection.execute(delete_stmt) #不加任何条件限制,删除说有数据
stmt = delete(employees).where(employees.columns.id == '3')
result_proxy = connection.execute(stmt)
- 删除数据库中的表格,使用
drop()
语句
extra_employees.drop(engine)
print(extra_employees.exists(engine))
>> False
- 使用
drop_all()
语句删除所有表格
metadata.drop_all(engine)
print(engine.table_names())
>> []