安装
需要pip install flask
这是一个十分轻便的框架,开发迅速,成本上相比Django框架要少很多,而且十分灵活,可以完全按自己的需求来写,但是功能上相比Django肯定也差一些,快速开发web服务器的时候这个用的还是很方便的
基本使用
flask框架只需要写好需要返回的结果函数(视图函数),然后在这些结果函数上面定义好对应的每个路由(即网页路径)即可,然后通过访问这些路由就是web的请求结果了
示例1
from flask import Flask
#实例化Flask对象,__name__有两个功能:
#方便Flask框架寻找资源
#当出错时,方便寻找问题处
app = Flask(__name__)
#路由映射
@app.route('/')
def index():
return "<h1>hello<h1>"
#上面那个'/'就是一个路由,路径是127.0.0.1:5000/,在这个路径下就会执行index函数,结果页面就展示hello
if __name__ == '__main__':
app.run(debug=True)
#启动web服务,监听用户请求,相当于while True: Listen()
#debug默认为False,此时一旦服务器出错,只会在控制台显示错误信息,而设置为True就能够在页面显示错误信息
#并且debug下如果服务器文件发生更改,也能够检测到,并修改页面
默认是5000端口,所以在http://127.0.0.1:5000/下可以看到
配置文件
前面的debug参数配置是写在启动函数里的,但有时还是需要一个单独的文件来作为配置文件,所以这里就介绍下通过引用外部文件来进行配置的方法:
1.创建配置文件,里面写参数配置(注意参数必须为大写,比如在主程序里debug是小写,但是到了配置文件里必须写成DEBUG,否则没效果)
2.在主程序导入配置文件
3.使用:app.config.from_object(配置文件)
,来导入配置
举例:
配置文件b.py
DEBUG=True
主程序
from flask import Flask
import b
app = Flask(__name__)
app.config.from_object(b)
#导入配置
@app.route('/')
def index():
return "<h1>hel<h1>"
if __name__ == '__main__':
app.run()
结果可以发现和前面效果是一样的,当然这个是对于文件的配置,app.config
是一个类似字典一样的数据类型,如果只是希望配置简单的几条数据,可以直接在主程序里通过修改app.config
这个字典来进行配置,举例:
from flask import Flask
import b
app = Flask(__name__)
app.config['DEBUG'] = True
#加入单个配置
@app.route('/')
def index():
return "<h1>hel<h1>"
if __name__ == '__main__':
app.run()
结果和前面的也是一样的
路由配置
从上面的示例可以看出路由的配置就是通过装饰器来完成的,这里再来个例子:
示例
from flask import Flask
app = Flask(__name__)
str1 = "aaa"
str2 = "bbb"
str3 = "ccc"
@app.route('/%s' % str1) #分别定义了三个路由
def index1():
return "{'name':'%s','age':20}" % str1
@app.route('/%s' % str2)
def index2():
return "{'name':'%s','age':20}" % str2
@app.route('/%s' % str3)
def index3():
return "{'name':{'%s', '%s', '%s'},'ages':{20, 30, 40} }" % (str1, str2, str3)
if __name__ == '__main__':
app.run(debug=True, host='192.168.17.1', port=6666)
可以看到上面定义了3个路由,每个路由返回的内容都不一样,于是我们用爬虫分别访问这三个路由,观察结果:
import requests
res1 = requests.get("http://192.168.17.1:6666/aaa")
res2 = requests.get("http://192.168.17.1:6666/bbb")
res3 = requests.get("http://192.168.17.1:6666/ccc")
a = eval(res1.text)
print(a, type(a))
b = eval(res2.text)
print(b, type(b))
c = eval(res3.text)
print(c, type(c))
结果:
{'age': 20, 'name': 'aaa'} <class 'dict'>
{'age': 20, 'name': 'bbb'} <class 'dict'>
{'ages': {40, 20, 30}, 'name': {'aaa', 'bbb', 'ccc'}} <class 'dict'>
可以看出不同的路由返回了其对应的请求结果,通过这个就可以很轻松的实现一些简易的小接口之类的了
动态路由
前面的路由都是事先定义好的,而动态路由的话,对方可以请求到没有事先定义的路由,一个简单的方法可以通过<变量名>
来获取内容,如果要控制该变量类型的话可以在前面加变量类型:
,比如int型就:<int:id>
,举例:
web服务端
from flask import Flask
app = Flask(__name__)
@app.route('/abc/<int:id>')
def index2(id):
return "{'name':'%s','age':20}" % id
#路由路径为/abc/数字
@app.route("/name=<name>&psd=<psd>")
def show(name, psd):
return "{'name':'%s', 'password': '%s'}" % (name, psd)
#路由路径为/name=参数,然后会获取该参数并返回
if __name__ == "__main__":
app.run()
用爬虫访问发现:
import requests
res1 = requests.get("http://127.0.0.1:5000/name=111&psd=222")
res2 = requests.get("http://180.212.38.57:6666/abc/1")
res3 = requests.get("http://180.212.38.57:6666/abc/a1")
print(res1.text)
print(res2.text)
print(res3.text)
结果:
{'name':'111', 'password': '222'}
{'name':'1','age':20}
<title>404 Not Found</title>
...
可以看出发送的请求1里,111和222被web服务端获取了,并返回;请求2也正确访问了;请求3因为abc后面不是数字,所以访问错误
获取路由
前面都是通过路由来执行对应的函数,现在我们可以通过函数来获取对应的路由,使用方法:
1.需要使用到flask下的url_for
方法,from flask import url_for
2.在该方法里写入要知道路由的函数名,参数可选
举例:
from flask import Flask, url_for, redirect
app = Flask(__name__)
@app.route('/xxx')
def xxx():
return "hello"
@app.route('/yyy/<id>')
def yyy(id):
return id
@app.route('/get_url')
def main():
return url_for('xxx') + " " + url_for('yyy', id='zzz')
#返回执行xxx函数对应的路由,执行yyy函数,参数为zzz对应的路由
@app.route('/visit_url')
def main2():
return redirect(url_for('xxx'))
#重定向访问执行xxx函数的路由
if __name__ == '__main__':
app.run(debug=True)
访问http://127.0.0.1:5000/get_url和http://127.0.0.1:5000/visit_url结果为:
/xxx /yyy/zzz
hello
重定向
要实现网页的重定向,需要导入flask下的redirect
,然后在需要重定向的地方return redirect('url')
即可,举例:
from flask import Flask, redirect
app = Flask(__name__)
@app.route('/xxx')
def xxx():
return "你跳转到了xxx"
@app.route('/redirect_url')
def main():
return redirect("/xxx")
#需要用到return redirect重定向
return "aaa"
if __name__ == '__main__':
app.run(debug=True)
访问http://127.0.0.1:5000/redirect_url可以发现跳转到了http://127.0.0.1:5000/xxx
请求上下文
每当对该网页进行访问时,必然会发送请求,在flask中如果要读取这些请求,那么就可以通过导入其下的request
来读取,举例:
from flask import Flask
from flask import request
app = Flask(__name__)
@app.route('/<id>')
def index1(id):
res = request
user = res.headers.get('user-agent')
return user
if __name__ == '__main__':
app.run(debug=True, host='180.212.38.57', port=6666)
分别用网站和python爬虫访问得到结果如下:
网站:
Mozilla/5.0 (Windows NT 10.0; WOW64; Trident/7.0; rv:11.0) like Gecko
python爬虫:
python-requests/2.18.4
GET和POST
flask的路由默认都是只允许get方法,如果想要使用post方法,就要在路由里给methods
参数进行配置,如果要判断是哪种请求,可以通过request
下的method
属性获取,如果要获取post参数,则通过request.form
这个字典来获取,举例:
from flask import Flask
from flask import request
app = Flask(__name__)
#两种都能访问
@app.route('/aaa', methods=['GET', 'POST'])
def index1():
if request.method == 'GET':
return "这是GET请求"
else:
return "这是POST请求<br>" + str(request.form)
#只能post
@app.route('/111', methods=['POST'])
def index2():
if request.method == 'GET':
return "GET是不可能GET了"
else:
return "这是POST请求"
#只能get
@app.route('/222')
def index3():
return "POST是不可能POST了,默认只能GET"
if __name__ == '__main__':
app.run(debug=True)
分别通过get和post方法的爬虫访问:
import requests
res_aaa_get = requests.get("http://127.0.0.1:5000/aaa")
res_aaa_post = requests.post("http://127.0.0.1:5000/aaa", data={'user':'aaa','psd':'bbb'})
res_111_get = requests.get("http://127.0.0.1:5000/111")
res_111_post = requests.post("http://127.0.0.1:5000/111")
res_222_get = requests.get("http://127.0.0.1:5000/222")
res_222_post = requests.post("http://127.0.0.1:5000/222")
print(res_aaa_get.text)
print(res_aaa_post.text)
print(res_111_get.text)
print(res_111_post.text)
print(res_222_get.text)
print(res_222_post.text)
结果为:
这是GET请求
这是POST请求<br>ImmutableMultiDict([('user', 'aaa'), ('psd', 'bbb')])
<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">
<title>405 Method Not Allowed</title>
<h1>Method Not Allowed</h1>
<p>The method is not allowed for the requested URL.</p>
这是POST请求
POST是不可能POST了,默认只能GET
<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">
<title>405 Method Not Allowed</title>
<h1>Method Not Allowed</h1>
<p>The method is not allowed for the requested URL.</p>
可以发现默认只能get,可以根据需求设置可以用哪种方法或者都用
注:
当获取的是checkbox
的数据时,通过request.form.getlist()
获取
获取请求参数
获取请求头是通过request
下的headers
获取,如果要获取传递的参数,则可以通过args
获取,举例:
from flask import Flask, redirect
from flask import request
app = Flask(__name__)
@app.route('/<id>')
def index1(id):
res = request.args
return str(res) + "<br>" + res['aa']
if __name__ == '__main__':
app.run(debug=True)
访问http://127.0.0.1:5000/w?aa=sda&sda=gsda,可以看到输出结果为:
ImmutableMultiDict([('aa', 'sda'), ('sda', 'gsda')])
sda
可以发现args下就是一个接收参数的字典,所以可以以字典方式对其进行操作
获取访问ip
通过request
下的remote_addr
获取,举例:
@app.route('/')
def index():
return str(request.remote_addr)
#返回访问的用户ip地址
更多request下属性参考:
https://www.cnblogs.com/wangjikun/p/6935592.html
https://blog.csdn.net/yannanxiu/article/details/53116652
return返回值
return返回参数有3个,第一个是返回的内容,也就是页面展示的信息;第二个是响应的状态码,默认是200;第三个是一个字典,可以添加到http响应当中
响应状态码
视图函数默认返回的状态码是200,如果要返回别的状态码,可以在return里第二个参数进行设置,举例:
@app.route('/xxx')
def xxx():
return "no found", 404
#返回404
通过爬虫访问观察发现:
import requests
a = requests.get('http://127.0.0.1:5000/xxx')
print(a.status_code)
结果为:
404
响应状态码还可以通过flask
下的abort
实现,但是这个和return返回状态码的控制不同,return返回状态码的情况下返回的页面信息也是return里面的,就像前面的例子里返回的内容是我们定义的no found
,在abort下返回的内容则是服务器自带的错误返回信息,举例:
from flask import abort
@app.route('/yyy')
def yyy():
abort(404)
return "nothing"
通过爬虫观察发现:
import requests
import pprint as p
a = requests.get('http://127.0.0.1:5000/yyy')
print(a.status_code)
print(a.text)
结果:
404
<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">
<title>404 Not Found</title>
<h1>Not Found</h1>
<p>The requested URL was not found on the server. If you entered the URL manually please check your spelling and try again.</p>
可以看出虽然返回的也是404,但是返回的内容却不是我们定义的"nothing",而是服务器自带的异常语句
http响应头
视图函数的第三个返回参数,举例:
@app.route('/xxx')
def xxx():
return "no found", 200, {'aaa':'sfas'}
#添加了'aaa'到响应头
通过爬虫观察发现:
import requests
import pprint as p
a = requests.get('http://127.0.0.1:5000/xxx')
p.pprint(eval(str(a.headers)))
结果为:
{
'Content-Length': '8',
'Content-Type': 'text/html; charset=utf-8',
'Date': 'Wed, 03 Oct 2018 08:51:56 GMT',
'Server': 'Werkzeug/0.14.1 Python/3.5.2',
'aaa': 'sfas'
}
可以看到响应的请求头最后一行里多了个'aaa':'sfas'
response响应
前面return返回的三个参数的方式还可以用flask
下的make_response()
来实现,其接收的三个参数和return返回的三个是一样的,在这里可以设置headers、cookie等,举例:
@app.route('/xxx')
def xxx():
response = make_response('it will set a cookie')
#相当于return的第一个参数,即返回内容
response.set_cookie('a_cookie', 'the content of cookie')
#设置cookie名和值
return response
#返回response
通过爬虫观察发现:
import requests
import pprint as p
a = requests.get('http://127.0.0.1:5000/xxx')
p.pprint(eval(str(a.headers)))
print(a.text)
结果:
{
'Content-Length': '20',
'Content-Type': 'text/html; charset=utf-8',
'Date': 'Wed, 03 Oct 2018 09:33:41 GMT',
'Server': 'Werkzeug/0.14.1 Python/3.5.2',
'Set-Cookie': 'a_cookie="the content of cookie"; Path=/'
}
#response headers
it will set a cookie
#content
可以看到返回的响应头里set-cookie
里多了键值
设置headers
如ajax跨域请求通过服务端方式的解决,可以参考:
https://blog.csdn.net/lovebyz/article/details/52584551
模板语言JInjia2
flask也有自己的模板语言,其可以实现主程序和静态文件的交互等
调用静态文件
使用步骤:
1.在主程序导入对应函数:from flask import render_template
2.在当前路径下创建一个名为:templates
的文件夹(flask调用静态文件时会自动在当前路径的templates
文件夹下寻找)
3.如果需要往静态文件里传参,那么就在render_template
里加入参数的键值
4.传入的参数在静态文件里通过语法:{{参数}}
获取
举例:
###### 主程序 ######
from flask import Flask
from flask import render_template
app = Flask(__name__)
@app.route('/111')
def index1():
return render_template('a.html')
#会自动寻找./templates下的a.html文件
@app.route('/222/<name>&<psd>')
def index2(name, psd):
return render_template('a.html', username=name, password=psd)
#传入name和password
@app.route('/333')
def index3():
di = {
'username': 'aaa',
'password': 'bbb'
}
return render_template('a.html', **di)
#字典传参,此时是传字典的所有键值对,而不是传字典,所以在模板语言里di是不存在的
#如果想要传整个字典,那么就可以di=di
if __name__ == '__main__':
app.run(debug=True)
###### 静态文件a.html ######
<!DOCTYPE html>
<html>
<head>
<title>aaa</title>
</head>
<body>
<h1>hello {{username}}</h1>
<h2>you password is {{password}}</h2>
<!-- 显示账号密码 -->
</body>
</html>
结果用爬虫访问可以看到结果:
import requests
res_111 = requests.get("http://127.0.0.1:5000/111")
res_222 = requests.get("http://127.0.0.1:5000/222/abc&def")
res_333 = requests.get("http://127.0.0.1:5000/333")
print(res_111.text)
print(res_222.text)
print(res_333.text)
部分结果为:
<h1>hello </h1>
<h2>you password is </h2>
#res_111
<h1>hello abc</h1>
<h2>you password is def</h2>
#res_222
<h1>hello aaa</h1>
<h2>you password is bbb</h2>
#res_333
注:
因为用render_template
调用静态页面时会通过模板语言渲染该页面,所以静态文件里如果有像js之类的语句很容易被用模板语言进行解析,结果报错,所以一般被调用的文件建议只是单纯的html,而没有css和js的存在,如果文件有css和js,并且不想被模板语言渲染的话,可以用flask下的send_file()
来不渲染直接调用文件,或者用url_for('static', filename='')
直接静态调用static
路径下的文件也可以(后面会说,但是前者是在py文件里用,后者是在支持模板语言的文件里用),这里对send_file()
进行举例:
from flask import send_file
@app.route('/111')
def index1():
return send_file('templates/render.html')
#返回的页面是不经过模板语言渲染的
静态文件获取参数
前面演示了在静态文件中用:{{属性}}
来获取传递的参数,其还有别的调用方法,比如传来的如果是类,那么就可以用.
来调用里面的属性,举例:
###### flask主要代码 ######
class People():
name = 'dawson'
age = 20
def abc(self):
return "这是调用函数返回的"
#新建了一个类
@app.route('/333')
def index3():
p = People()
di = {
'people': p
}
return render_template('a.html', **di)
#传入了一个类
###### a.html ######
<h1>hello {{people.name}}</h1>
<h2>you age is {{people['age']}}</h2>
{{people.abc()}}
<!-- 调用类的属性和方法,用了.和[]两种方式 -->
通过爬虫访问发现结果为:
<h1>hello dawson</h1>
<h2>you age is 20</h2>
这是调用函数返回的
可以看出.
和[]
两种方式都可以获取参数
if判断
模板语言可以在静态文件里进行if判断,基本语法如下:
{% if 条件 %}
...
{% elif %}
...
{% else %}
...
{% endif %}
使用举例
###### 主程序主要代码 ######
@app.route('/111')
def index1():
return render_template('a.html')
@app.route('/222/<name>&<psd>')
def index2(name, psd):
return render_template('a.html', username=name, password=psd)
#可以看出111的时候没传参,222的时候传参了
###### a.html ######
{% if username %}
<h1>{{username}},你好</h1>
{% else %}
<h1>请登录</h1>
{% endif %}
用爬虫分别访问,结果为:
<h1>请登录</h1>
#http://127.0.0.1:5000/111
...
<h1>abc,你好</h1>
#http://127.0.0.1:5000/222/abc&def
for循环
基本语法:
{% for xxx in xxx %}
...
{% endfor %}
使用举例
###### 主程序 ######
@app.route('/333')
def index3():
di = {
'name':'abc',
'age':20
}
return render_template('a.html', di=di)
#传入一个字典
###### a.html ######
{% for k,v in di.items() %}
{{k}}:{{v}}<br>
<!-- 循环输出键值 -->
{% endfor %}
{% for i in range(2) %}
{{i}}<br>
<!-- 也可以用range这样python自带的函数 -->
{% endfor %}
结果为:
name:abc<br>
age:20<br>
0<br>
1<br>
过滤器
模板语言里有很多过滤器,其能够将传来的数据进行一定处理后展示出来,比如默认过滤器(default
)可以当数据不存在时设置默认值,长度过滤器(length
)可以计算参数长度等,基本语法如下:
{{参数 | 过滤器 }}
当然不同过滤器语法也有些不同,举例:
###### 主程序 ######
@app.route('/222/<name>&<psd>')
def index2(name, psd):
return render_template('a.html', username=name, password=psd)
@app.route('/333')
def index3():
di = {
'name':'abc',
'age':20,
'li':[1,2,3,4,5]
}
return render_template('a.html', **di)
###### a.html ######
{{ username | default('默认用户')}}
<!-- 默认过滤器,设置默认值,如果username不存在时为默认用户 -->
<br>
{{ li | length}}
<!-- 长度过滤器,计算列表、字符串等数据长度,li的长度,li不存在默认为0 -->
分别访问结果为:
默认用户
5
#http://127.0.0.1:5000/333
abc
0
#http://127.0.0.1:5000/222/abc&def
更多过滤器参考:https://www.jianshu.com/p/3127ac233518
自定义过滤器参考:https://blog.csdn.net/fanlei5458/article/details/80341278
继承和block
像新闻之类的网站的新闻页基本页面结构都相同,如果说每个页面都去重复写这个结构语句显然太浪费,因此就可以写一个专门的模板文件给这些新闻页继承,然后在里面的某个版块写各自的新闻即可,这里要实现这种情景需要执行以下几步:
1.写一个模板文件xxx.html
2.在模板文件可以被添加内容的版块写上{% block 版块名 %}{% endblock %}
3.在要继承该模板的文件里写上{% extends '模板名' %}
,然后通过语句:{% block 版块名 %}添加在版块的内容{% endblock %}
,来往版块里添加自己的内容
4.模板文件的版块里可以写内容,文件继承后对应版块不会显示该内容,如果想要该内容可以通过{{super()}}
来获取
举例:
###### 模板文件b.html ######
<!DOCTYPE html>
<html>
<head>
<title>模板文件</title>
</head>
<body>
<h1>这是父模板的内容</h1>
<hr>
{% block main %}
<h1>这是父模板块1的内容</h1>
{% endblock %}
{% block main2 %}
<h1>这是父模板块2的内容</h1>
{% endblock %}
</body>
</html>
###### 继承模板的文件 ######
{% extends 'b.html' %}
{% block main %}
{{super()}}
{% endblock %}
{% block main2 %}
<hr>
<h1>这里是子模板的内容</h1>
{% endblock %}
可以看出模板文件新建了两个版块:main
和main2
,然后其他文件继承他,并在这些版块里写内容,其中版块1通过super()
继承了模板文件的内容,版块2没有继承内容,最终页面结果为:
...
<title>模板文件</title>
...
<h1>这是父模板的内容</h1>
<hr>
<h1>这是父模板块1的内容</h1>
<hr>
<h1>这里是子模板的内容2</h1>
调用本地文件
前面的方法调用文件的目录都是在服务器上,比如在html里想要导入本地的e.js
可能会写:
<script type="text/javascript" src="e.js"></script>
结果却是显示文件找不到,因为此时他寻找的路径是:http://127.0.0.1:5000/e.js,而不是本地的相对路径,所以为了导入这些本地路径的文件,需要下面几步:
1.创建一个static文件夹,里面用来存放各种静态文件
2.通过:{{url_for('static', filename='文件名')}}
,来调用static路径下的文件
举例:
<script type="text/javascript" src="{{url_for('static', filename='e.js')}}"></script>
#调用了static/e.js
<a href="{{url_for('static', filename='a.html')}}">aaa</a>
#跳转到static/a.html文件,并且不会用模板语言解析
注:
这种调用本地文件的方式是静态调用,因此不会用模板语言对其进行解析,而且是在支持模板语言的地方使用,在主程序中用这个返回的是个路径,不会调用文件,要调用静态文件就用send_file()
flash消息机制
flash是基于session的一种消息处理机制,其可以传递自定义的flash信息,而后台或者前台通过get_flashed_messages()
方法获取到flash信息,并且该消息机制有个特点就是只能在一次请求中使用,之后消失。使用举例:
- 后端:
from flask import Flask, flash, get_flashed_messages, render_template
import os
app = Flask(__name__)
app.config['SECRET_KEY'] = os.urandom(24)
# 使用flash时需要配置secret_key,从而保证flash和session的唯一性,以免被别的session获取到信息
@app.route('/')
def test():
flash("msg")
# 往前台传递消息
return render_template("test.html")
if __name__ == '__main__':
app.run(debug=True, port=5000)
- 前台:
<body>
{{ get_flashed_messages() }}
{#使用内置方法获取flash,结果为:['msg'],可以看到获取的是flash传递的消息列表#}
</body>
基于flash的重定向消息传递
该消息机制还可以解决重定向时的页面信息交互,比如访问路由A时,在路由A中重定向跳转到了路由B,那么A中的信息就容易丢失而无法传递到B,一种解决方式是重定向时往地址里添加get参数,但这样会导致一个问题:重定向后的页面如果刷新则每次都会获取一样的get参数,并且也可能被恶意利用,在前台传入一些敏感数据。为了避免这些情况,可以使用flash来传递消息,因为flash基于session,所以即使重定向以后,消息还是能够传递的,举例:
from flask import Flask, flash, get_flashed_messages, render_template, redirect
import os
app = Flask(__name__)
app.config['SECRET_KEY'] = os.urandom(24)
# 使用flash时需要配置secret_key,从而保证flash和session的唯一性,以免被别的session获取到信息
@app.route('/')
def test():
flash("msg0", 'aaa')
# 第一个代表返回信息,第二个代表flash类型,可以自定义,然后前台过滤筛选对应类型的信息
flash("msg1", "info")
flash("msg2", "erros")
return redirect('/aaa', 302)
# 重定向到另一个路由,由于flash是基于session,所以可以传递过去
@app.route('/aaa')
def aaa():
msg = get_flashed_messages(category_filter=['aaa', 'info'])
# 筛选出类型为aaa或info的flash信息
if not msg:
return "nothing"
return str(msg)
# 结果会收到:['msg0', 'msg1']
if __name__ == '__main__':
app.run(debug=True, port=5000)
数据库操作
flask的灵活性几乎不限制你使用哪种数据库,所以一种方法是你可以直接导入那个数据库模块,然后进行操作,另一种就是用flask-sqlalchemy框架(pip install flask-sqlalchemy
)来管理数据库,使用该模块能够通过orm模型对数据库进行操作,而不需要像原来那样通过sql语句来操作,从而使得操作更加简便。这里也主要讲flask-sqlalchemy
结合mysql
的使用
flask-sqlalchemy连接池问题
flask-sqlalchemy
是基于sqlalchemy
的数据库orm框架,两个使用方法大同小异,后面几节介绍的是flask-sqlalchemy
的操作方法(但因为连接池的原因,所以本节先介绍一点sqlalchemy
的针对连接池的解决方案)
在flask-sqlalchemy
中,进行数据库session操作后,使用close()
方法并不会将session关闭,而是存在一个连接池里进行复用,但是连接池满了会容易让程序崩了(谁也不希望网站跑着跑着崩了吧...),一种处理方法是设置连接池最大数量:SQLALCHEMY_POOL_SIZE=数量(默认是5)
,但是这个如果调用数据库少还好,一旦到了上限终究也不是办法,此时建议另一种方法:配置时选择不使用连接池,下面给上配置代码:
from sqlalchemy import create_engine // 初始化配置方法
from sqlalchemy.orm import sessionmaker // 创建orm对象
from sqlalchemy.pool import NullPool // 不使用连接池
SQLALCHEMY_DATABASE_URI='mysql+mysqlconnector://root:password@localhost:3306/test'
# 初始化配置,格式:数据库类型+数据库驱动名://用户名:密码@IP:端口/数据库名
engine = create_engine(SQLALCHEMY_DATABASE_URI, poolclass=NullPool)
# 初始化配置,并选择不使用连接池
DB = sessionmaker(bind=engine)
# 创建一个orm操作对象
那么接下来使用的时候只需要实例化一个orm的session
对象,即可进行操作了,并且要记得在操作完成时使用close()
进行关闭,具体可参考廖雪峰老师的教程:https://www.liaoxuefeng.com/wiki/1016959663602400/1017803857459008
连接池问题可参考:
https://blog.csdn.net/Yaokai_AssultMaster/article/details/80958052
https://blog.csdn.net/weiwangchao_/article/details/80185009
连接数据库
连接数据库需要执行以下几步:
1.from flask_sqlalchemy import SQLAlchemy
2.实例化SQLAlchemy
3.配置数据库参数,最好写在配置文件
3.导入配置后,通过create_all()
测试是否配置成功
举例
###### 配置文件config.py ######
# 连接mysql格式:dialect+driver://username:password@host:port/database
DIALECT = 'mysql' #数据库
DRIVER = 'mysqldb' #操作的驱动
#驱动去https://www.lfd.uci.edu/~gohlke/pythonlibs/下安装对应版本mysqlclient,2.x版本安装mysql-python
USERNAME = 'root'
PASSWORD = ''
HOST = '127.0.0.1'
PORT = '3306'
DATABASE = ''
SQLALCHEMY_DATABASE_URI = "%s+%s://%s:%s@%s:%s/%s?charset=utf8" % (DIALECT, DRIVER, USERNAME, PASSWORD, HOST, PORT, DATABASE)
SQLALCHEMY_TRACK_MODIFICATIONS = False
###### 主程序 ######
from flask_sqlalchemy import SQLAlchemy
import config
app = Flask(__name__)
app.config.from_object(config)
#导入配置
db = SQLAlchemy(app)
#实例化
db.create_all()
#一般是创建表或者数据库用,这里用来测试是否连接成功,连接失败程序是跑不起来的
注:
不同数据连接的配置不同,可以参考:https://www.cnblogs.com/fengff/p/8669363.html
创建数据表
在orm模型当中,一个表就是一个类,不过这个类要继承于db.Model
,一个列通过db.Column
来创建,创建时需要定义数据类型和字段修饰等,举例:
from flask_sqlalchemy import SQLAlchemy
import config
app = Flask(__name__)
app.config.from_object(config)
db = SQLAlchemy(app)
class clothes(db.Model):
#继承自db.Model
__tablename__ = 'clothes_shop'
#设置表名,不设置默认为类名
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
#整型,主键,自增
name = db.Column(db.String(50), nullable=False)
#char和varchar都是用String,非空
cost = db.Column(db.Float, nullable=True)
content = db.Column(db.Text, nullable=True)
db.create_all()
#创建表和数据库用的
注:
上面创建表的语句,如果不存在该表则会创建一个,如果存在的话,就不会对原来的表进行修改,而定义的几个列的意思是会用到表里的哪几列数据。此时如果定义了表里没有的列则用到该表时会报错
注2:
一些常见的数据类型:
db.Integer
db.Float
db.String #这个包括char和varchar
db.Text
db.Boolean
db.Date
db.DateTime
db.Time
添加数据
步骤:
1.实例化该类(表),并把要添加的数据作为参数传入
2.通过db.session.add()
添加
3.通过db.session.commit()
提交事务
举例:
@app.route('/111')
def index1():
clothes = Clothes(name='bbb', cost=23, content='sdaf')
#添加数据的内容
db.session.add(clothes)
#添加数据
db.session.commit()
#操作都是事务机制,所以只有提交后才会真正改变数据
return "True"
注:
对于大量数据的插入,如果使用add
循环插入,效率将十分低下,此时可以使用bulk_save_objects([orm对象1, orm对象2, ...])
或者bulk_insert_mappings(orm对象类, [对象参数字典1, 对象参数字典2, ...])
进行批量插入,举例:
# bulk_save_objects示例
li = [Clothes(name='aaa', cost=10, content='aaa'), Clothes(name='bbb', cost=23, content='sdaf')]
db.session.bulk_save_objects(li)
# bulk_insert_mappings示例
li = [{"name": "aaa", "cost": 10, "content":"aaa"}, {"name": "bbb", "cost": 20, "content":"bbb"}]
db.session.bulk_insert_mappings(Clothes, li)
批量插入操作对比:
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
import config
from time import time
app = Flask(__name__)
app.config.from_object(config)
db = SQLAlchemy(app)
class User(db.Model):
__tablename__ = 'user'
name = db.Column(db.String(45), nullable=False, primary_key=True)
password = db.Column(db.String(45))
money = db.Column(db.String(45))
db.create_all()
@app.route('/')
def index():
start = time()
# 方式一:基于orm对象一条一条插入,耗时:12s(相当于执行100000条语句,且每次都需要实例化orm对象,效率低且开销大,不建议)
for i in range(100000):
user = User(name=str(i), password=str(i), money=str(i))
db.session.add(user)
# 方式二:基于bulk_save_objects插入批量orm对象,耗时:6s(需要实例化大量的orm对象,开销也不小)
# li = []
# for i in range(100000):
# user = User(name=str(i), password=str(i), money=str(i))
# li.append(user)
# db.session.bulk_save_objects(li)
# 方式三:基于bulk_insert_mappings插入,耗时:4s(直接传入需要映射的对象和参数字典,相对消耗小一些,在效率上和安全性能上较为平衡)
# li = []
# for i in range(100000):
# li.append({"name": str(i), "password": str(i), "money":str(i)})
# db.session.bulk_insert_mappings(User, li)
# 方式四:合并成单条语句插入,耗时:2.6s(合成单条sql语句,并直接执行,效率较高,但没有对sql进行检查校验,安全性能差一些)
# s = "insert into user values"
# for i in range(100000):
# s += "({}, {}, {}),".format(i, i, i)
# db.session.execute(s[:-1])
db.session.commit()
cost_time = time() - start
return str(cost_time)
if __name__ == '__main__':
app.run(debug=True)
查询数据
一般格式:表名.query.过滤器.过滤函数
常用的过滤器有:all()
-查询所有、filter()
-过滤查询、limit()
-限制结果数量、order_by()
-按条件排序、group_by()
-按条件分组
常用的过滤函数有:all()
-以列表返回所有数据、first()
-返回第一个,没有就None
、first_or_404()
-返回第一个,没有就返回404响应、get()
-查找主键对应行、get_or_404()
-...没有就404、count()
-返回查询结果数量
举例:
from sqlalchemy import or_
@app.route('/111')
def index1():
clothes = Clothes.query.all()
#查询表里所有数据
clothes1 = Clothes.query.filter(Clothes.name == 'bbb').all()
#查询表里name='bbb'的所有数据,all()可以查所有的,返回的是个列表
clothes2 = Clothes.query.filter(Clothes.name == 'bbb').first()
#first相当于返回all的第一条
clothes3 = Clothes.query.filter(or_(Clothes.name == 'bbb', Clothes.name == 'aaa')).first()
#查询name为aaa或bbb的第一条数据
return str(clothes1[0].name) + str(clothes2.name)
#输出查询结果的name
更新数据
步骤:
1.查询要更新的数据
2.从查询结果中更新数据
3.提交事务
举例:
@app.route('/111')
def index1():
clothes = Clothes.query.filter(Clothes.name == 'bbb').first()
clothes.name = 'aaa'
#把第一条name为bbb的改为aaa
db.session.commit()
return str(clothes.name)
#可以发现返回的数据是aaa,数据库对应数据也修改了
注:
批量更新示例:
clothes = Clothes.query.filter(Clothes.name == 'aaa').update({Clothes.name: 'bbb'})
# 所有name为aaa的数据的name都更新为bbb
删除数据
步骤:
1.查询要删除的数据
2.将查询的结果数据删除
3.提交事务
举例:
@app.route('/111')
def index1():
clothes = Clothes.query.filter(Clothes.name == 'aaa').first()
db.session.delete(clothes)
#删除数据
db.session.commit()
return "True"
注:
批量删除示例:
clothes = Clothes.query.filter(Clothes.name == 'aaa').delete()
索引
- 主键索引:直接在字段定义时设置
primary_key
属性值为True
,即可,举例:
xxx = db.Column(db.String(32), nullable=False, primary_key=True)
- 普通索引:直接在字段定义时设置
index
属性值为True
,即可,举例:
xxx = db.Column(db.String(32), nullable=False, index=True)
- 联合索引:在类的
__table_args__
属性中使用Index
创建,第一个参数为索引名,后面传入多个字段,举例:
__table_args__ = (
db.Index('联合索引名字', '字段1', '字段2', '...'),
)
- 唯一索引:直接在字段定义时设置
unique
属性值为True
,即可,举例:
xxx = db.Column(db.String(32), nullable=False, unique=True)
- 联合唯一索引:在类的
__table_args__
属性中使用UniqueConstraint
创建,传入多个字段,并设置name
参数值为联合索引名,举例:
__table_args__ = (
db.UniqueConstraint('字段1', '字段2', '...', name='联合索引名字'),
)
外键
举例:
class Clothes(db.Model):
__tablename__ = 'clothes_shop'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
name = db.Column(db.String(50))
cost = db.Column(db.Float, nullable=True)
psd = db.Column(db.String(20), nullable=False)
class Food(db.Model):
__tablename__ = 'food'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
name = db.Column(db.String, db.ForeignKey('clothes_shop.name'),nullable=False)
#这是name为clothes_shop表下name的外键
#注意外键设置要在字符修饰前(关键字参数必须在位置参数后)
cost = db.Column(db.Integer, nullable=True)
number = db.Column(db.Integer)
更多orm操作参考:
https://www.cnblogs.com/zhaoyunlong/p/10368654.html
Session操作
session一般都是存在服务端,这也是其和cookie的主要区别。但是在flask中有些不一样,session都是加密后然后发送到cookie处
添加Session
步骤
1.from flask import session
2.配置SECRET_KEY
的值
3.设置session
里的键值对(session相当于一个字典)
举例:
from flask import session
import os
app = Flask(__name__)
app.config['SECRET_KEY'] = os.urandom(24)
#生成24位随机字符串
@app.route('/111')
def index1():
session['username'] = 'dawson'
#设置session里的值,其相当于一个字典
return "True"
访问后会发现浏览器里多了个127.0.0.1的cookie,其名为session
读取session
因为session是类似于一个字典的存在,所以读取就像字典那样读取即可,举例:
@app.route('/222')
def index2():
return session.get('username', 'nothing')
#返回session['username']的值,用get设置缺省值,防止不存在报错
删除session
还是像字典那样操作,举例:
@app.route('/333')
def index3():
session.clear()
#清空session
return "Clear"
@app.route('/444')
def index4():
session.pop('username')
#弹出session中的username
return "pop"
设置session有效期
设置session.permanent
为True
即可,默认是31天,可以通过配置PERMANENT_SESSION_LIFETIME
来修改有效时间,举例:
from flask import session
from datetime import timedelta
app = Flask(__name__)
app.config['SECRET_KEY'] = os.urandom(24)
app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(hours=1)
#设置1小时有效时间,格式为时间格式,所以需要用到datetime下的timedelta
#没设置的话默认31天
@app.route('/111')
def index1():
session['username'] = 'dawson'
session['password'] = '123456'
session.permanent = True
#设置其有有效时间,不设置默认会话关闭即失效
return "True"
全局变量g
一般函数里的变量都只有自己能用,因此会通过global
设置为全局变量,但是当调用别的文件里的函数时,global
声明也没用了,只能通过函数传参,但是在flask里,一般调用的函数都是别的文件里的,因此flask下有一个g
模块可以实现多个文件里的全局变量,使用步骤:
1.from flask import g
2.通过g.变量名
设置全局变量
3.通过g.变量名
来调用
举例:
###### 主程序 ######
from flask import Flask
from flask import g
import config
@app.route('/222')
def index2():
g.x = 1000
#定义全局变量x
config.abc()
#调用abc()函数,没有传参
return str(g.x)
#会发现x变成1001
###### 被调用文件config.py ######
from flask import g
#被调用文件也要导入g
def abc():
g.x += 1
#全局变量x+1
可以看出g.x在两个文件里共用了。但是要注意的是这种全局变量只在一次请求中有效,下次请求又没了,比如下面这个:
@app.route('/222')
def index2():
g.x = 1000
config.abc()
g.x *= 2
return str(g.x)
@app.route('/333')
def index3():
return str(g.x)
先访问/222
再访问/333
,结果会发现g.x
不存在,因为第二次访问时,/333
里没有定义g.x
,所以自然就没了,因此g.x
一般用于传递用户数据,或者是在一次请求的地方使用
钩子函数
一般函数都是按顺序执行,比如执行函数A->函数B
,钩子函数则可以插入到他们之间运行,结果就成了函数A->钩子函数->函数B
,有种类似类里的魔法方法的感觉。
在编写上类似于一个装饰器,只需定义其执行的函数即可
before_request
在请求前执行,举例:
@app.route('/333')
def index3():
return "True"
@app.route('/444')
def index4():
return "True"
#钩子函数
@app.before_request
def abc():
print('aaa')
结果会发现每次不论访问哪个链接时都会先在控制台输出'aaa'
after_request
返回请求时执行,会接收response对象,举例:
@app.after_request
def deal_response(response):
print("返回数据!")
return response
context_processor
上下文处理器钩子函数,需要返回一个字典,这个字典会传给所有模板对象,举例:
###### 主程序 ######
@app.route('/333')
def index3():
return render_template('a.html')
@app.route('/444')
def index4():
return render_template('b.html')
@app.context_processor
def abc():
return {'username':'dawson'}
#返回一个字典
###### a.html ######
你好:{{username}}
###### b.html ######
<h1>{{username}}</h1>
结果会发现访问上面的两个链接,跳转时页面里都获得了字典内容
常用钩子函数参考
https://www.jianshu.com/p/17fa68cf38a6
蓝图
用于定义某一路由功能下对应的功能模块,类似django中的app,使用蓝图能让我们更加规范合理的进行代码开发,使用步骤如下:
- 创建蓝图
- 编写蓝图下视图函数
- 注册蓝图
简单示例
为了方便示例,这里将蓝图代码都编写在一个文件当中(然而在实际的开发中,最好还是分别给不同的蓝图创建对应的文件编写视图函数):
from flask import Flask, Blueprint
app = Flask(__name__)
shop = Blueprint('shop', __name__)
# 创建一个蓝图
# shop = Blueprint('shop', __name__, static_folder='/static', template_folder='/templates')
# 多了蓝图的静态资源和模板资源路径的配置
@shop.route('/')
# 蓝图下编写路由函数
def shop_index():
return "shop page"
app.register_blueprint(shop, url_prefix="/shop")
# 注册蓝图,并配置路由前缀
# 注意需要在编写完蓝图代码后注册蓝图
@app.route('/')
def index():
return "main page"
if __name__ == '__main__':
app.run()
基于蓝图的模块化开发
通过蓝图我们可以很方便的进行模块化开发,并且很好的隔离开各部分代码,还有如一般作用于全局的钩子函数也可以只注册才某个蓝图下生效,例如还是对上面的代码进行修改实现指定路由下才生效的钩子函数:
from flask import Flask, Blueprint
app = Flask(__name__)
shop = Blueprint('shop', __name__)
@shop.route('/')
def shop_index():
return "shop page"
@shop.after_request
# 仅蓝图下有效的钩子函数
def deal_shop_response(response):
print("局部钩子")
return response
app.register_blueprint(shop, url_prefix="/shop")
@app.route('/')
def index():
return "main page"
@app.after_request
# 全局下有效的钩子函数
def deal_response(response):
print("全局钩子!")
return response
if __name__ == '__main__':
app.run()
上面我们实现了两个钩子函数,一个全局钩子(deal_response
)、一个局部钩子(deal_shop_response
),结果访问/
路由时只有全局钩子执行,访问/shop/
下路由时全局和局部钩子都执行(先局部、再全局)
其他
基于CBV开发
前面的示例都是基于FBV(基于函数式视图)开发,但flask中也提供了CBV(基于类视图)开发,简单示例如下:
from flask import Flask, views
app = Flask(__name__)
def auth(func):
def wrapper(*args, **kwargs):
print("开始认证")
result = func(*args, **kwargs)
print("认证成功")
return result
return wrapper
class Test(views.MethodView):
# 基于CBV开发需要继承views.MethodView类
methods = ['GET', 'POST']
# 配置允许请求的方式
decorators = [auth, ]
# 配置装饰器
def get(self):
return "get方法"
def post(self):
return "post方法"
app.add_url_rule("/test", view_func=Test.as_view(name='test'))
# 添加路由
if __name__ == '__main__':
app.run(debug=True, port=5000)
项目自动reload导致崩溃问题
开发时,由于使用flask自带的app.run
方法运行,经常容易因为修改代码或者测试而导致程序崩溃,此时可以在app.run
里配置下面的参数禁止程序reload,从而避免问题:
use_reloader=False
并发处理问题
项目部署时如果使用flask自带的app.run
方法默认是只有单线程的,因此如果同时访问时将可能出现阻塞问题(比如进入要路由要等待10秒,那么两个请求同时访问时,必须是等待前一个访问结束后,后一个才能访问),一种解决方法是可以在app.run
方法里加上配置参数:threaded=True
,代表允许多线程,或者配置参数processes=True
,代表允许多进程(注意线程进程只能二选一),代码如下所示:
app.run(threaded=True)
# 支持多线程
# app.run(processes=True)
# 支持多进程
但这种方法终究只能算作一种临时方案,在面对大量的访问请求场景,一般情况下还是通过在外层通过nginx增加服务器数量和负载均衡,内层使用uwsgi进行多进程处理,或者gunicron、gevent等协程处理来实现高并发性能(内层的这几种处理方案基本都是需要在Linux环境下才能够实现,不过有个mod_wsgi是基于apache的,貌似支持windows)
gevent处理并发示例
使用自带的app.run()
无法支持gevent
的协程处理,所以需要使用gevent.pywsgi
下的WSGIServer
实现,举例:
from flask import Flask
from gevent.pywsgi import WSGIServer
from gevent import monkey
import time
monkey.patch_all()
app = Flask(__name__)
@app.route('/')
def test():
time.sleep(10)
return 'test'
@app.route('/2')
def test2():
time.sleep(10)
return 'test2'
if __name__ == '__main__':
http_server = WSGIServer(('127.0.0.1', 5000), app)
http_server.serve_forever()
跨域处理
流内容传输
后台可以使用stream_with_context
包装一个流内容的生成器,并通过Response
对象包装返回,stream_with_context
内部能够保持服务端和客户端通信的上下文,从而不断向客户端发送数据,直到生成器抛出异常。然后前台通过一些异步标签,如<img>
/<video>
/...来接收数据,但最好不要直接页面请求或者通过ajax进行获取,因为服务端会一直等待全部数据发送完毕才会断开,期间请求将会一直处于等待状态,从而造成等待超时之类的情况。
参考:
https://www.cnblogs.com/jsben/p/4909984.html
https://dormousehole.readthedocs.io/en/latest/patterns/streaming.html
流内容传输另一种方式:可以通过websocket与服务器进行连接,然后服务器主动将数据内容发送给客户端,举例:
后端
from flask import Flask
from flask_sockets import Sockets
import datetime
import time
import random
import base64
from glob import glob
app = Flask(__name__)
sockets = Sockets(app)
path = "xxx/*.jpg"
# 图片文件路径
li_path = glob(path)
li_img = []
for each in li_path:
with open(each, "rb") as f:
content = f.read()
li_img.append(content)
li = [base64.encodebytes(each) for each in li_img]
@sockets.route('/echo')
def echo_socket(ws):
while not ws.closed:
ws.send(random.choice(li)) #发送数据
time.sleep(.3)
if __name__ == "__main__":
from gevent import pywsgi
from geventwebsocket.handler import WebSocketHandler
server = pywsgi.WSGIServer(('127.0.0.1', 5000), app, handler_class=WebSocketHandler)
print('server start')
server.serve_forever()
前端
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8" />
<title>Title</title>
<script src="https://cdn.bootcss.com/jquery/3.2.0/jquery.js"></script>
</head>
<body>
<img src="" />
<script>
var ws = new WebSocket("ws://127.0.0.1:5000/echo");
ws.onmessage = function (event) {
var reader = new FileReader();
reader.onload = function (event) {
$("img").attr("src", `data:image/jpg;base64,${reader.result}`);
};
reader.readAsText(event.data);
};
</script>
</body>
</html>
更多flask参考
官方文档(中文):http://docs.jinkan.org/docs/flask/
https://blog.csdn.net/huangzhang_123/article/details/75206316
http://www.pythondoc.com/flask-mega-tutorial/
三种框架的对比参考
https://www.jianshu.com/p/9960a9667a5c
https://www.cnblogs.com/wuzaipei/p/9694379.html