文 / 秦未
文/秦未
为了先了解Flask_SqlAlchemy,我先来张思维导图:
实时预览地址:https://mubu.com/doc/5vS_tvNin
代码示例看下面:
先下载 flask-sqlalchemy 库
pip install flask-sqlalchemy
然后在app.py中导入:
from flask_sqlalchemy import SQLAlchemy
# 数据库操作2/获取当前目录绝对路径
basedir = path.abspath(path.dirname(__file__))
# 数据库操作3/数据库配置
app.config['SQLALCHEMY_DATABASE_URL'] = 'sqlite:///' + path.join(basedir, 'data_sqlite')
app.config['SQLALCHEMY_COMMIT_ON_TEARDOWN'] = True
db = SQLAlchemy(app)
注意代码位置,为避免混乱,再贴一次源代码:
# /app/app.py
# -*- coding:utf-8 -*-
import datetime
from os import path
from flask import Flask, render_template, make_response
from flask_script import Manager
# from flask_bootstrap import Bootstrap
# 数据库操作1
from flask_sqlalchemy import SQLAlchemy
from werkzeug.routing import BaseConverter
class RegexConverter(BaseConverter):
def __init__(self, url_map, *items):
super(RegexConverter, self).__init__(url_map)
self.regex = items[0]
app = Flask(__name__)
app.url_map.converters['regex'] = RegexConverter
app.config.from_pyfile('config')
# Bootstrap(app)
# 数据库操作2/获取当前目录绝对路径
basedir = path.abspath(path.dirname(__file__))
# 数据库操作3/数据库配置
app.config['SQLALCHEMY_DATABASE_URL'] = 'sqlite:///' + path.join(basedir, 'data_sqlite')
app.config['SQLALCHEMY_COMMIT_ON_TEARDOWN'] = True
db = SQLAlchemy(app)
manager = Manager(app)
@app.route('/')
def index():
# 获取cookies
# username = request.cookies.get('username')
# 封装render_template
response = make_response(render_template('blog/index.html', **{
'text': 'Hello, World',
}))
# 设置cookies--参数1:名称,参数2:值,expires指定过期时间;(记得导入datetime模块!)
outdate = datetime.datetime.today() + datetime.timedelta(days=30)
response.set_cookie('username', 'admin', expires=outdate)
return response
@app.route('/about')
def about():
return render_template('blog/about.html', **{
'text': '<h1>Hello, World</h1>',
'body': '## 演示'
})
@app.route('/user/<username>')
def user(username):
return F'<h1>{username}</h1>'
@app.route('/login/', methods=['GET', 'POST'])
def login():
# 判断请求方式
# if request.method == 'POST':
# # POST方式时数据是在form里面
# username = request.form.get('username')
# password = request.form.get('password')
# # 获取文件
# my_file = request.files.get('file')
# # 获取当前路径
# basepath = path.abspath(path.dirname(__file__))
# # 将路径连接
# upload_path = path.join(basepath, 'static/uploads/')
# # 保存文件并将文件名获取封装
# my_file.save(upload_path + secure_filename(my_file.filename))
# # 跳转upload函数
# return redirect(url_for('upload'))
# else:
# # GET方式时数据是在args里面
# username = request.args.get('username')
# password = request.args.get('password')
# return render_template('blog/login.html', method='GET')
from forms import LoginForm
form = LoginForm()
return render_template('blog/login.html', form=form)
@app.route('/upload')
def upload():
return '<h1>上传成功!</h1>'
# @manager.command
# def dev():
# # 导入包
# from livereload import Server
# # 封装
# live_server = Server(app.wsgi_app)
# # 设置监测文件路径
# live_server.watch('**/*.*')
# # 同时让它刷新
# live_server.serve(open_url=True)
@app.template_filter('md')
def markdown_to_html(text):
import mistune
markdown = mistune.Markdown()
return markdown(text)
def read_md(filename):
from functools import reduce
# 获取当前路径
basepath = path.abspath(path.dirname(__file__))
# 将路径连接
upload_path = path.join(basepath, filename)
with open(upload_path, encoding='UTF-8') as md_file:
content = reduce(lambda x, y: x + y, md_file.readlines())
return content
# 将read_md函数传递到前台
@app.context_processor
def inject_methods():
return dict(read_md=read_md)
if __name__ == '__main__':
# app.run(debug=True)
manager.run()
然后我们新建两个类:
class Role(db.Model):
__tablename__ = 'roles'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String, nullable=True)
# 与users表建立关系
users = db.relationship('User', basedir='roles')
class User(db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String, nullable=True)
password = db.Column(db.String, nullable=True)
# 设置外键
role_id = db.Column(db.Integer, db.ForeignKey('roles.id'))
说明:我们新建立了两个类,它们都继承了db.Model,tablename为表名称的定义,然后建立字段,
db的Column方法为我们定义了列的属性,第一个参数为字段类型,具体支持多少种,看上面的思维导图,
primary_key设置主键,nullable允许为空值,具体的参数定义非常的多,想详细了解的看:
Flask-SQLAlchemy 2.0 文档:http://www.pythondoc.com/flask-sqlalchemy/quickstart.html
现在我们的app.py内容为:
# /app/app.py
# -*- coding:utf-8 -*-
import datetime
from os import path
from flask import Flask, render_template, make_response
from flask_script import Manager
# from flask_bootstrap import Bootstrap
# 数据库操作1
from flask_sqlalchemy import SQLAlchemy
from werkzeug.routing import BaseConverter
class RegexConverter(BaseConverter):
def __init__(self, url_map, *items):
super(RegexConverter, self).__init__(url_map)
self.regex = items[0]
app = Flask(__name__)
app.url_map.converters['regex'] = RegexConverter
app.config.from_pyfile('config')
# Bootstrap(app)
# 数据库操作2/获取当前目录绝对路径
basedir = path.abspath(path.dirname(__file__))
# 数据库操作3/数据库配置
app.config['SQLALCHEMY_DATABASE_URL'] = 'sqlite:///' + path.join(basedir, 'data_sqlite')
app.config['SQLALCHEMY_COMMIT_ON_TEARDOWN'] = True
db = SQLAlchemy(app)
manager = Manager(app)
class Role(db.Model):
__tablename__ = 'roles'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String, nullable=True)
# 与users表建立关系
users = db.relationship('User', basedir='roles')
class User(db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String, nullable=True)
password = db.Column(db.String, nullable=True)
# 设置外键
role_id = db.Column(db.Integer, db.ForeignKey('roles.id'))
@app.route('/')
def index():
# 获取cookies
# username = request.cookies.get('username')
# 封装render_template
response = make_response(render_template('blog/index.html', **{
'text': 'Hello, World',
}))
# 设置cookies--参数1:名称,参数2:值,expires指定过期时间;(记得导入datetime模块!)
outdate = datetime.datetime.today() + datetime.timedelta(days=30)
response.set_cookie('username', 'admin', expires=outdate)
return response
@app.route('/about')
def about():
return render_template('blog/about.html', **{
'text': '<h1>Hello, World</h1>',
'body': '## 演示'
})
@app.route('/user/<username>')
def user(username):
return F'<h1>{username}</h1>'
@app.route('/login/', methods=['GET', 'POST'])
def login():
# 判断请求方式
# if request.method == 'POST':
# # POST方式时数据是在form里面
# username = request.form.get('username')
# password = request.form.get('password')
# # 获取文件
# my_file = request.files.get('file')
# # 获取当前路径
# basepath = path.abspath(path.dirname(__file__))
# # 将路径连接
# upload_path = path.join(basepath, 'static/uploads/')
# # 保存文件并将文件名获取封装
# my_file.save(upload_path + secure_filename(my_file.filename))
# # 跳转upload函数
# return redirect(url_for('upload'))
# else:
# # GET方式时数据是在args里面
# username = request.args.get('username')
# password = request.args.get('password')
# return render_template('blog/login.html', method='GET')
from forms import LoginForm
form = LoginForm()
return render_template('blog/login.html', form=form)
@app.route('/upload')
def upload():
return '<h1>上传成功!</h1>'
# @manager.command
# def dev():
# # 导入包
# from livereload import Server
# # 封装
# live_server = Server(app.wsgi_app)
# # 设置监测文件路径
# live_server.watch('**/*.*')
# # 同时让它刷新
# live_server.serve(open_url=True)
@app.template_filter('md')
def markdown_to_html(text):
import mistune
markdown = mistune.Markdown()
return markdown(text)
def read_md(filename):
from functools import reduce
# 获取当前路径
basepath = path.abspath(path.dirname(__file__))
# 将路径连接
upload_path = path.join(basepath, filename)
with open(upload_path, encoding='UTF-8') as md_file:
content = reduce(lambda x, y: x + y, md_file.readlines())
return content
# 将read_md函数传递到前台
@app.context_processor
def inject_methods():
return dict(read_md=read_md)
if __name__ == '__main__':
# app.run(debug=True)
manager.run()
---end---