OnekkiSite搭建过程
使用的框架
- flask
- flask_sqlalchemy
- flask_script
- flask-sqlacodegen
- pymysql
- flask-wtf
- flask-bcrypy
- flask-login
- flask-principal
- flask-celery-helper
- redis
- flask-cache
- flask-admin
- flask-restful
搭建开发环境
第一阶段
创建环境
pipenv —python 3.7
进入环境
pipenv shell
目录结构
.
├── LICENSE
├── Pipfile
├── README.md
├── app
│ ├── __init__.py
│ ├── admin
│ ├── site
│ ├── static
│ ├── templates
│ ├── test
│ └── utils
├── config.py
└── manage.py
编写配置文件
config.py
import os
# 项目绝对路径
BASE_DIR = os.getcwd()
# 模版文件路径
TEMPLATES_DIR = os.path.join(BASE_DIR, 'templates')
# 静态文件路径
STATIC_DIR = os.path.join(BASE_DIR, 'static')
# 数据库URI
SQLALCHEMY_DATABASE_URI = 'mysql+pymysql://root:123456@39.107.230.35:3306/onekki_site'
# 查询跟踪, 不太需要, False, 不占用额外的内存
SQLALCHEMY_TRACK_MODIFICATIONS = False
编写启动文件
app/__init__.py
from flask import Flask
# mysql引入
from flask_sqlalchemy import SQLAlchemy
# 配置文件引入
from config import SQLALCHEMY_DATABASE_URI, SQLALCHEMY_TRACK_MODIFICATIONS
db = SQLAlchemy()
def create_app():
"""创建app的方法"""
# 生成Flask对象
app = Flask(__name__)
# 配置app的mysql
app.config['SQLALCHEMY_DATABASE_URI'] = SQLALCHEMY_DATABASE_URI
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = SQLALCHEMY_TRACK_MODIFICATIONS
# 用SQLAlchemy初始化app
db.init_app(app=app)
# 在这还可以配置其他模块, 如socketio
# 放回Flask对象app
return app
在site模块中创建蓝图
app/site/__init__.py
from flask import Blueprint
from config import TEMPLATES_DIR, STATIC_DIR
# 创建一个蓝图对象, 设置别名, 模板文件地址, 静态文件地址
site = Blueprint('site', __name__,
template_folder=TEMPLATES_DIR,
static_folder=STATIC_DIR)
# 这里导入是为了在解释时, 蓝图能加载到views文件中的路由数据
from app.site import views
为site模块设置路由
app/site/views.py
# 获取蓝图
from app.site import site
# 获取数据库模型对象和SQLAlchemy对象db, 注意不可使用App模块中的db
from app.site.models import *
# 设置路由
@site.route('/')
def index():
return '首页'
admin模块或者其他模块(blog)配置与之相同
注册蓝图启动服务器
**manage.py**
from flask_script import Manager
from app import create_app
from app.site import site
# 创建app
app = create_app()
# 注册site的蓝图
app.register_blueprint(site, url_prefix = '/site')
# 通过app创建manager对象
manager = Manager(app)
if __name__ == '__main__':
# 运行服务器
manager.run()
- 注意:严格遵守python导入包的编写顺序
- Python Build In 内构包
- Python 第三方库
- 用户自定义模块
配置完成,运行服务器
python manager.py runserver -h 127.0.0.1 -p 5000 -d
访问服务器
http://127.0.0.1:5000/site/
以上参考文档
改善配置方式
config.py
import os
# 项目绝对路径
BASE_DIR = os.getcwd()
# 模版文件路径
TEMPLATES_DIR = os.path.join(BASE_DIR, 'templates')
# 静态文件路径
STATIC_DIR = os.path.join(BASE_DIR, 'static')
# 数据库URI
MYSQL_DATABASE_URI = 'mysql+pymysql://root:123456@39.107.230.35:3306/onekki_site'
# 查询跟踪, 不太需要, False, 不占用额外的内存
SQLALCHEMY_TRACK_MODIFICATIONS = False
class BaseConfig(object):
SQLALCHEMY_DATABASE_URI = MYSQL_DATABASE_URI
@staticmethod
def init_app(app):
pass
class DevelopmentConfig(BaseConfig):
DEBUG = True
pass
class TestingConfig(BaseConfig):
TESTING = True
pass
class ProductionConfig(BaseConfig):
pass
configs = {
'development': DevelopmentConfig,
'testing': TestingConfig,
'production': ProductionConfig,
'default': DevelopmentConfig
}
app/__init__.py
from flask import Flask
# mysql引入
from flask_sqlalchemy import SQLAlchemy
# 配置文件引入
from config import configs
db = SQLAlchemy()
def create_app(config_name):
"""创建app的方法"""
# 生成Flask对象
app = Flask(__name__)
# 导入配置
app.config.from_object(configs[config_name])
configs[config_name].init_app(app)
# 用SQLAlchemy初始化app
db.init_app(app=app)
# 在这还可以配置其他模块, 如socketio
# 放回Flask对象app
return app
manager.py
# 创建app
app = create_app(os.getenv('FLASK_CONFIG') or 'default')
第二阶段
- 注意
创建数据库时用对应的应用前缀
初始化SQLAchemy
db = SQLAlchemy(app)
创建数据模型和表
- 方式1:
用数据库客户端创建好数据库, 然后通过flask-sqlacodegen生成model文件(ORM模型)
# 整体映射database数据库并写入model.py文件
flask-sqlacodegen \
'mysql+pymysql://username:password@127.0.0.1/database' \
--outfile 'model.py' \
--flask
# 映射table数据表并写入table.py文件
flask-sqlacodegen \
'mysql+pymysql://username:password@127.0.0.1/database' \
--table table \
--outfile 'model.py' \
--flask
本人使用
flask-sqlacodegen 'mysql+pymysql://root:zwq123456@39.107.230.35/onekki_site' --outfile "gen_models.py" --flask --schema onekki_site
注意下面的写法的区别
flask-sqlacodegen 'mysql+pymysql://root:zwq123456@39.107.230.35/onekki_site' > "gen_models.py"
- 方式2:
直接定义表结构, 然后通过manage.py添加创建数据库表的指令
manage.py
import os
from flask_script import Manager, Server
from app import create_app, db
from app.site import site
# 创建app
app = create_app(os.getenv('FLASK_CONFIG') or 'default')
# 注册site的蓝图
app.register_blueprint(site, url_prefix = '/site')
# 通过app创建manager对象
manager = Manager(app)
# 从模型创建数据库的指令
manager.add_command("server", Server())
@manager.shell
def make_shell_context():
return dict(app=app,db=db,Model1=models.Model1,Model2=models.Model2,...)
if __name__ == '__main__':
# 运行服务器
manager.run()
在终端执行, 即可完成对应表的创建
(OnekkiSite) bash-3.2$ python manage.py shell
>>> db.create_all()
本人采用了方式1, 但是在后续更新数据表结构会采用另一种方式, 这将在下文中提到
注意把生成文件里的
db = SQLAlchemy()
换成, 避免二次创建db
from app import db
SqlAlchemy数据的CRUD
from models import Users
from sqlalchemy.orm import sessionmaker, scoped_session
from sqlalchemy import create_engine
engine = create_engine("mysql+pymysql://root:199199@127.0.0.1:3306/blog?charset=utf8", max_overflow=2, pool_size=5)
Connection = sessionmaker(bind=engine)
# 每次执行操作, 都要创建一个Connection, 两种连接都可以
# conn = Connection()
conn = scoped_session(Connection)
obj1 = Users(name='dayu')
# 增加一条数据
# conn.add(obj1)
#
# # 增加多条数据
# conn.add_all([
# Users(name='dayu3'),
# Users(name='dayu4'),
# Users(name='dayu5'),
# Users(name='dayu6')
# ])
# # 查询数据
# user_list = conn.query(Users).all()
# for user in user_list:
# print(user.id, user.name)
# # 查询id大于2的数据
# user_list = conn.query(Users).filter(Users.id > 2)
# for user in user_list:
# print(user.id, user.name)
# 删除id大于2的
# conn.query(Users).filter(Users.id > 2).delete()
# 修改
conn.query(Users).filter(Users.id == 2).update({'name': 'da'})
# 提交事务
conn.commit()
# 关闭Connection
conn.close()
数据表的关系
app/blog/models.py
# coding: utf-8
from app import db
from sqlalchemy import Column, DateTime, ForeignKey, Integer, String, Text
from sqlalchemy.orm import relationship
class BlogArticle(db.Model):
__tablename__ = 'blog_article'
id = db.Column(db.Integer, primary_key=True)
title = db.Column(db.String(45))
content = db.Column(db.Text)
publish_time = db.Column(db.DateTime)
user_id = db.Column(db.ForeignKey('blog_user.id'), index=True)
tag_id = db.Column(db.ForeignKey('blog_tag.id'), index=True)
tag = db.relationship('BlogTag', primaryjoin='BlogArticle.tag_id == BlogTag.id', backref='blog_articles')
user = db.relationship('BlogUser', primaryjoin='BlogArticle.user_id == BlogUser.id', backref='blog_articles')
class BlogComment(db.Model):
__tablename__ = 'blog_comment'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(255))
content = db.Column(db.Text)
time = db.Column(db.DateTime)
article_id = db.Column(db.ForeignKey('blog_article.id'), index=True)
article = db.relationship('BlogArticle', primaryjoin='BlogComment.article_id == BlogArticle.id', backref='blog_comments')
class BlogTag(db.Model):
__tablename__ = 'blog_tag'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(255))
article_id = db.Column(db.ForeignKey('blog_tag.id'), index=True)
article = db.relationship('BlogTag', remote_side=[id], primaryjoin='BlogTag.article_id == BlogTag.id', backref='blog_tags')
class BlogUser(db.Model):
__tablename__ = 'blog_user'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(255))
password = db.Column(db.String(255))
第三阶段
Jinjia的使用
- 变量
{{ variables }}
- 注释
{# Documents #}
- 控制语句
{% controller_flow %}
if语句
{% if user.is_logged_in() %}
<a href='/logout'>Logout</a>
{% else %}
<a href='/login'>Login</a>
{% endif %}
循环
{% for post in posts if post.text %}
<div>
<h1>{{ post.title }}</h1>
<p>{{ post.text | safe }}</p>
</div>
{% endfor %}
# or
{% for post in posts %}
{{ loop.index }}-{{ post.title }}
{% endfor %}
- 过滤器
无参数
{{ variable | filter_name }}
带参数
{{ variables | filter_name(*args) }}
- 宏
{% macro input(name, label, value='', type='text')%}
<div class="form-group">
<label for"{{ name }}">{{ label }}</div>
<input type="{{ type }}" name="{{ name }}"
value="{{ value | escape }}" class="form-control">
</div>
{% endmacro %}
调用宏
{{ input('name', 'Name') }}
结果
<div class="form-group">
<label for"name">Name</div>
<input type="text" name="name" value="" class="form-control">
</div>
- 兼容javascript
{% raw %}
JavaScript statements
{% endraw %}
Jinjia常用过滤器
- default
{{ post.date | default("2016-11-22") }}
- float
{{ 75 | float }}
- int
{{ 75.5 | int }}
- length
The Count: {{ post.tags | length }}
- title
# 首字母大写
{{ "post title" | title }}
- round
# 浮点数精度
{{ 3.14159 | round(1) }}
# common 参数:四舍五入
# floor 参数:截取整数部分
# ceil 参数:向上取整
{{ 4.7 | rount(1, "common")}}
- join
# 列表转","分割的字符串
{{ ['JmilkFan', 'fanguiju' ] | join(',')}}
- tojson
{{ {"key": "value"} | tojson }}
{{ posts | tojson | safe }}
- truncate
# 截取字符串, 并在后面加省略号
{{ "a long stringggggggggggggggggg " | truncate(5) }}
- escape
# 转义打印
{{ "<h1>Title<\h1>" | escape }}
- safe
# 用escape会转义失败, 换safe解决
{{ "<h1>Post Title"</h1> | safe }}
- 自定义过滤器
定义函数
def count_substring_from_python(string, sub):
return string.count(sub)
声明函数
app.jinja_env.filters['count_substring'] = count_substring_from_python
调用
{{ variable | filter_name("String") }}
Flask的特殊变量和方法(在模版中可以直接访问)
- config对象
app.config 对象, 其包含了 class DevConfig 的属性
{{ config.SQLALCHEMY_DATABASE_URI }}
- request对象
Flask 中表示当前请求的 request 对象
{{ request.url }}
- session对象
Flask 的 session 对象
{{ session.new }}
- url_for方法
{{ url_for('home') }}
# /post/1024
{{ url_for('post', post_id=1024) }}
- get_flashed_messages方法
返回之前在 Flask 中通过
flash()
传入的信息列表, 类似实现一个队列
{% for message in get_flashed_messages() %}
{{ message }}
{% endfor %}
优化调整
- 将app/admin/models.py、app/blog/models.py、app/site/models.py 删除, model写在app/db/models.py下
- 将db = SQLAlchemy()写在models.py文件中
- 配置文件添加JSON_AS_ASCII = False来让返回的json数据中文不显示为unicode
编写视图和前端页面
app/blog/views.py
from flask import render_template, jsonify
from sqlalchemy import func
# from flask_sqlalchemy import Pagination
# 获取蓝图
from app.blog import blog
# 获取数据库模型对象和SQLAlchemy对象db, 注意不可使用App模块中的db
# from app.blog.models import BlogArticle, BlogTag
# from app import db
from app.db.models import *
def json_return(code, msg,data):
return jsonify({"code":code, "msg":msg, "data": data})
# 设置路由
@blog.route('/')
def index():
return render_template("blog/index.html")
@blog.route("/article/<int:page>")
def article(page=1):
articles = BlogArticle.query.paginate(page, 10)
# return json_return(200, "success", [i.serialize for i in articles])
return render_template("blog/article.html", articles=articles)
@blog.route("/user/<int:page>")
def user(page=1):
users = BlogUser.query.all()
# return json_return(200, "success", [i.serialize for i in articles])
return render_template("blog/user.html", users=users)
@blog.route("/tag/<int:page>")
def tag(page=1):
tags = BlogTag.query.all()
# return json_return(200, "success", [i.serialize for i in articles])
return render_template("blog/tag.html", tags=tags)
@blog.route("/comment/<int:page>")
def comment(page=1):
comments = BlogComment.query.all()
# return json_return(200, "success", [i.serialize for i in articles])
return render_template("blog/comment.html", comments=comments)
app/templates/blog/article.html
{{ articles }}
其他页面和上面类似:comment.html、tag.html、user.html
编写Jinjia模版
在编写之前到bootstrap官网下载css和js文件, 解压到static文件夹
app/tempates/blog/base.html
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<title>{% block title %}{% endblock %}</title>
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width,, initial-scale=1">
<!-- 全局引入bootstrap css -->
<link rel="stylesheet" type="text/css" href="{{ url_for('static', filename='bootstrap/css/bootstrap.min.css') }}">
<!-- <link rel="stylesheet" type="text/css" href="{{ url_for('static', filename='css/test.css') }}"> -->
<!-- 宏定义用于生成 Bootstrap 风格的分页链接列表 -->
{% macro render_pagination(pagination, endpoint) %}
<nav>
<ul class="pagination">
<li class="page-item">
{% if pagination.has_prev %}
<a class="page-link" href="{{ url_for('blog.article', page=pagination.prev().page) }}" aria-label="Previous">
{% else %}
<a class="page-link" href="{{ url_for('blog.article', page=1) }}" aria-label="Previous">
{% endif %}
<span aria-hidden="true">«</span>
</a>
</li>
{% for page in pagination.iter_pages() %}
{% if page %}
{% if page!=pagination.page %}
<li class="page-item">
<a class="page-link" href="{{ url_for(endpoint, page=page) }}">
{{page}}
</a>
</li>
{% else %}
<li class="page-item active"><a class="page-link" href="">{{page}}</a></li>
{% endif %}
{% else %}
<li class="page-item"><a class="page-link">...</a></li>
{% endif %}
{% endfor %}
<li class="page-item">
<a class="page-link" href="{{ url_for('blog.article', page=pagination.next().page) }}" aria-label="Next">
<span aria-hidden="true">»</span>
</a>
</li>
</ul>
</nav>
{% endmacro %}
</head>
<body>
<div class="container">
<div class="jumbotron">
<h1><a class="button" href="{{ url_for('blog.article', page=1) }}">Onekki's Blog</a></h1>
<p>Welcom to onekki's blog!</p>
</div>
{% block body %}
body_content
{% endblock %}
</div>
<!-- 全局引入jquery -->
<script src="{{ url_for('static', filename='js/jquery-3.4.1.min.js') }}"></script>
<script src="{{ url_for('static', filename='js/jquery-1.11.3.min.js') }}"></script>
<!-- 全局引入bootstrap js -->
<script src="{{ url_for('static', filename='bootstrap/js/bootstrap.min.js') }}"></script>
<!-- <script src="{{ url_for('static', filename='js/bootstrap.js') }}"></script> -->
</body>
</html>
模版继承
article.html
{% extends "blog/base.html" %}
{% block title %}Onekki Blog{% endblock %}
{% block body %}{{ articles }}{% endblock %}
添加分页
{% extends "blog/base.html" %}
{% block title %}Onekki Blog{% endblock %}
{% block body %}
<div class="row">
<div class="col-lg-9">
<!-- 分页 -->
{% for article in articles.items %}
<div class="row">
<div class="col-lg-12">
<h2>{{ article.title }}</h2>
</div>
</div>
<div class="row">
<div class="col-lg-12">
{{ article.content|truncate(255)|safe}}
<!-- <a href="{{ url_for('blog.article', page=1) }}">Read More</a> -->
</div>
</div>
{% endfor %}
</div>
</div>
<div class="row">
<!-- 调用父模版的 macro render_pagination -->
{{ render_pagination(articles, 'blog.article') }}
</div>
{% endblock %}
- 记录一下项目结构
.
├── LICENSE
├── Pipfile
├── Pipfile.lock
├── README.md
├── __pycache__
│ ├── config.cpython-37.pyc
│ └── manager.cpython-37.pyc
├── app
│ ├── __init__.py
│ ├── __pycache__
│ │ └── __init__.cpython-37.pyc
│ ├── admin
│ │ ├── __init__.py
│ │ └── views.py
│ ├── blog
│ │ ├── __init__.py
│ │ ├── __pycache__
│ │ │ ├── __init__.cpython-37.pyc
│ │ │ └── views.cpython-37.pyc
│ │ └── views.py
│ ├── db
│ │ ├── __init__.py
│ │ ├── __pycache__
│ │ │ ├── __init__.cpython-37.pyc
│ │ │ └── models.cpython-37.pyc
│ │ └── models.py
│ ├── site
│ │ ├── __init__.py
│ │ ├── __pycache__
│ │ │ ├── __init__.cpython-37.pyc
│ │ │ └── views.cpython-37.pyc
│ │ └── views.py
│ ├── static
│ │ ├── css
│ │ │ ├── bootstrap-grid.css
│ │ │ ├── bootstrap-grid.css.map
│ │ │ ├── bootstrap-grid.min.css
│ │ │ ├── bootstrap-grid.min.css.map
│ │ │ ├── bootstrap-reboot.css
│ │ │ ├── bootstrap-reboot.css.map
│ │ │ ├── bootstrap-reboot.min.css
│ │ │ ├── bootstrap-reboot.min.css.map
│ │ │ ├── bootstrap.css
│ │ │ ├── bootstrap.css.map
│ │ │ ├── bootstrap.min.css
│ │ │ └── bootstrap.min.css.map
│ │ └── js
│ │ ├── bootstrap.bundle.js
│ │ ├── bootstrap.bundle.js.map
│ │ ├── bootstrap.bundle.min.js
│ │ ├── bootstrap.bundle.min.js.map
│ │ ├── bootstrap.js
│ │ ├── bootstrap.js.map
│ │ ├── bootstrap.min.js
│ │ └── bootstrap.min.js.map
│ ├── templates
│ │ └── blog
│ │ ├── article.html
│ │ ├── base.html
│ │ ├── comment.html
│ │ ├── index.html
│ │ ├── tag.html
│ │ └── user.html
│ ├── test
│ └── utils
├── config.py
├── gen_models.py
└── manager.py
服务器表单验证 - WTForms
WTForms作用
- 保证安全的前提下, 对常见的表单类型进行输入的合法性验证
- Jinja HTML 渲染
- 预防跨域请求伪造(CSRF)和SQL 注入
WTForms组成
-
字段
表示表单的输入框, 会做一些初步的输入检查
-
检验器
是一组被附加到字段(输入框)上的函数, 用于对输入数据的检验, 确保输入我们期望的数据
-
表单
是一个 Python 类, 其中包含了 字段(类属性) 和 检验器, 在接收到 HTTP POST 请求时, 会根据定义的检验器规则来对输入数据进行检验
-
常用字段类型
-
fields.DateField
对应了 Python 中的 Date 对象,可以接收一个 format 可选参数来设定 Date 格式,该参数需要传入一个 strftime(格式化输出时间) 的字符串。
-
fields.IntegerField
将提交的数据强制转换成为整数,并在模板上渲染成为一个数字类型的输入框。
-
fileds.FloatField
将提交的数据强制转换成为浮点数,并在模板上渲染成为一个数字类型的输入框。
-
fields.StringField
普通的文本输入框,会将输入的内容强制转换成为 String 类型对象。
-
fields.RadioField
代表一组单项选择框,接收一个 choices 参数,该参数需要传入一个以 Tuple 为元素的 List 类型对象,这些参数值表示了显示的选项和返回值。
fields.SelectField/fields.SelectMultipleField
-
-
校验器
一般能够通过其命名来得知其作用,所有的检验器都能够接收一个 message 参数,该参数表示了输入的数据没有通过验证时,返回的错误信息。
validators.DataRequired()
validators.Email()
validators.Length(min=-1, max=-1)
validators.NumberRange(min=None, max=None)
validators.Optional()
validators.Regexp(regex)
validators.URL()
-
自定义校验器
自定义检验器所需要做的事情就是:实现一个函数 接收表单对象和字段对象作为参数 当没有通过验证时,触发一个 wtform.VaildationError 异常
例如
import re
from wtforms import ValidationError
def custom_email(form_object, field_object):
"""Define a vaildator"""
if not re.match(r"[^@+@[^@]+\.[^@]]+", field_object.data):
raise ValidationError('Field must be a valid email address.')
基本使用
- 配置密钥
自己生成密钥
cat /dev/urandom | tr -cd 'a-f0-9' | head -c 32
将生成的密钥写入配置项
SECRET_KEY = '生成的密钥'
-
用法
- 表单类需要继承 Flask WTF 扩展提供的 Form 类
- 表单类中的一个类属性,就代表了一个字段,即输入框。wtforms 提供了多种类型的字段类
- 字段类的第一个参数为输入框标题,第二个参数为绑定到该字段的检验器列表,由 wtforms.validators 提供
app/form/forms.py
from flask_wtf import Form
from wtforms import StringField, TextField
from wtforms.validators import DataRequired, Length
class CommentForm(Form):
"""Form vaildator for comment."""
# Set some field(InputBox) for enter the data.
# patam validators: setup list of validators
name = StringField(
'Name',
validators=[DataRequired(), Length(max=255)])
text = TextField(u'Comment', validators=[DataRequired()])
app/__init__.py
import wt_forms
完善博客列表的视图和模版以及命名修改
- 主要内容
- 通过点亮tag来筛选博客
- article.html修改为article_list.html其他也都类似
编写博客页面
页面包含了博客标题、内容, 评论数据, tag数据, 添加评论的功能
编写视图
app/blog/views.py
@blog.route('/article/<int:id>', methods=['GET','POST'])
def article(id):
form = CommentForm()
if form.validate_on_submit():
new_comment = BlogComment()
new_comment.name=form.name.data
new_comment.content=form.content.data
new_comment.time=datetime.datetime.now()
new_comment.article_id=id
db.session.add(new_comment)
db.session.commit()
article = BlogArticle.query.get_or_404(id)
tag_list = [article.tag]
comment_list = BlogComment.query.filter(BlogComment.article_id==id).all()
recent_article_list, top_tag_list = sidebar_data()
return render_template('blog/article.html',
article=article,
form=form,
tag_list=tag_list,
comment_list=comment_list,
recent_article_list=recent_article_list,
top_tag_list=top_tag_list)
app/templates/article.html
{% extends "blog/base.html" %}
{% block title %}Onekki Blog{% endblock %}
{% block body %}
<h3>{{ article.title }}</h3>
{{ article.publish_time }}:
{{ article.content }}
<div class="col-lg-12">
<h3>New Comment</h3>
<form method="POST" action="{{ url_for('blog.article', id=article.id) }}">
{{ form.hidden_tag() }}
<div>
{{ form.name.label }}
{% if form.name.errors %}
{% for e in form.name.errors %}
<p class="help-block">{{ e }}</p>
{% endfor %}
{% endif %}
{{ form.name(class_="form-control") }}
</div>
<div class="form-group">
{{ form.content.label }}
{% if form.content.errors %}
{% for e in form.content.errors %}
<p class="help-block">{{ e }}</p>
{% endfor %}
{% endif %}
{{ form.content(class_='form-control') }}
</div>
<input class="btn btn-primary" type="submit" value="Add Comment">
</form>
</div>
<div class="row">
<h5>Tags</h5>
<ul>
{% for tag in tag_list %}
<li><a href="{{ url_for('blog.article_list', page=1, tag_id=tag.id) }}">{{ tag.name }}</a></li>
{% endfor %}
</ul>
</div>
<div class="row">
<h5>Comments</h5>
<ul>
{% for comment in comment_list %}
<div>
name:{{ comment.name }} <br/>
content:{{ comment.content }}
</div>
{% endfor %}
</ul>
</div>
{% endblock %}
第四阶段
用工厂模式生成应用对象
- 工厂模式
通过某个接口或对对象来创建另一个对戏, 这个接口函数被称之为工厂函数
工厂模式可以推迟到在程序运行的时候才动态决定要创建哪个类的实例,而不是在编译时就必须知道要实例化哪个类
- 工厂函数
一个用于创建对象的接口(create_object_interface(variables)), 让子类来决定(根据不同 variables 作为条件来判断)实例化那一个类的对象
例如
#!/usr/bin/env python
# -*- coding: utf-8 -*-
class Circle(object):
def draw(self):
print 'draw circle'
class Rectangle(object):
def draw(self):
print 'draw Rectangle'
class ShapeFactory(object):
def create(self, shape):
if shape == 'Circle':
return Circle()
elif shape == 'Rectangle':
return Rectangle()
else:
return None
fac = ShapeFactory()
obj = fac.create('Circle')
obj.draw()
TODO
Bcrypt密文存储账户信息
- 与普通的哈希加密方式, 更不容易被暴力破解
安装配置Flask Bcrypy
bcrypt = Bcrypt()
bcrypt.init_app(app)
app/db/models.py
class BlogUser(db.Model):
__tablename__ = 'blog_user'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(255))
password = db.Column(db.String(255))
def __init__(self, name, password):
self.name = name
self.password = self.set_password(password)
def __repr__(self):
return self.name
def set_password(self, password):
return bcrypt.check_password_hash(self.password, password)
def check_password(self, password):
return bcrypt.check_password_hash(self.password, password)
app/form/forms.py
class LoginForm(FlaskForm):
name = StringField('name', [DataRequired()], Length(max=255))
password = PasswordField('password', [DataRequired()])
def validate(self):
is_valid = super(LoginForm, self).validate()
# 输入是否合法
if not is_valid:
return False
# 用户是否存在
user = BlogUser.query.filter_by(name=self.name.data).first()
if not user:
self.name.errors.append('用户不存在')
return False
# 密码是否正确
if not user.check_password(self.password.data):
self.name.errors.append("密码错误")
return False
return True
实现注册表单与应用 reCAPTCHA 来实现验证码
reCAPTCHA
CMU 设计了一个名叫 reCAPTCHA 的强大系统,让他们的电脑去向人类求助。具体做法是:将 OCR 软件无法识别的文字扫描图传给世界各大网站,用以替换原来的验证码图片,这些网站的用户在正确识别出这些文字之后,其答案便会被传回CMU。所以 reCAPTCHA 本质上是一个披着验证码皮的分布式文字识别系统(OCR)
(卧槽, 长见识了)
-
部署reCAPTCHA
- 注册一个 Google 用户名
- 进入到 reCAPTCHA 官网
- 输入你的 blog 名(随意填写)和域名(只支持域名和子域名,现在我们暂时使用 localhost,等部署到线上之后也需要将新的域名填入),就会得到一个 Public Key,就可以把它用在你的 reCAPTCHA 插件上了,同时 reCAPTCHA 也支持多个站点
客户端
写在模版里
<script src='https://www.google.com/recaptcha/api.js'></script>
<div class="g-recaptcha" data-sitekey="<your public key>"></div>
- 服务端
在服务端请求
https://www.google.com/recaptcha/api/siteverify
-
实际应用
config.py
class BaseConfig(object):
SQLALCHEMY_DATABASE_URI = MYSQL_DATABASE_URI
# 查询跟踪,不太需要,False,不占用额外的内存
SQLALCHEMY_TRACK_MODIFICATIONS = False
JSON_AS_ASCII = False
SECRET_KEY = 'CCCD756A-954C-4390-A38F-C018340769BD'
RECAPTCHA_PUBLIC_KEY = "6LcqjqsUAAAAAItx4Fhs-bhG9L4rcXT4hbXooZTL"
RECAPTCHA_PRIVATE_KEY = "6LcqjqsUAAAAAJlGpkoXjT1HY-nQ1lQLklwqUWug"
@staticmethod
def init_app(app):
pass
app/form/forms.py
from flask_wtf import FlaskForm, RecaptchaField
from wtforms import StringField, TextField, IntegerField, PasswordField
from wtforms.validators import DataRequired, Length, EqualTo
from app.db.models import BlogUser
class RegisterForm(FlaskForm):
name = StringField('name', [DataRequired(), Length(max=255)])
password = PasswordField('password', [DataRequired(), Length(min=2)])
confirm = PasswordField('password', [DataRequired(), EqualTo('password')])
recaptcha = RecaptchaField()
def validate(self):
is_valid = not super(RegisterForm, self).validate()
# 输入是否合法
if not is_valid:
return False
# 用户是否存在
user = BlogUser.query.filter_by(name=self.name.data).first()
if user:
self.name.errors.append('用户已经存在')
return False
return True
class LoginForm(FlaskForm):
name = StringField('name', [DataRequired(), Length(max=255)])
password = PasswordField('password', [DataRequired()])
def validate(self):
is_valid = super(LoginForm, self).validate()
# 输入是否合法
if not is_valid:
return False
# 用户是否存在
user = BlogUser.query.filter_by(name=self.name.data).first()
if not user:
self.name.errors.append('用户不存在')
return False
# 密码是否正确
if not user.check_password(self.password.data):
self.name.errors.append("密码错误")
return False
return True
class CommentForm(FlaskForm):
name = StringField(
'Name',
validators=[DataRequired(), Length(max=255)]
)
content = TextField(
'Content',
validators = [DataRequired()]
)
app/db/models.py
# coding: utf-8
from app import db
from sqlalchemy.orm import relationship
from flask_sqlalchemy import SQLAlchemy
from flask_bcrypt import Bcrypt
bcrypt = Bcrypt()
db = SQLAlchemy()
def dump_datetime(value):
if value is None:
return None
return [value.strftime("%Y-%m-%d"), value.strftime("%H:%M:%S")]
class BlogArticle(db.Model):
__tablename__ = 'blog_article'
id = db.Column(db.Integer, primary_key=True)
title = db.Column(db.String(45))
content = db.Column(db.Text)
publish_time = db.Column(db.DateTime)
user_id = db.Column(db.ForeignKey('blog_user.id'), index=True)
tag_id = db.Column(db.ForeignKey('blog_tag.id'), index=True)
tag = db.relationship('BlogTag', primaryjoin='BlogArticle.tag_id == BlogTag.id', backref='blog_articles')
user = db.relationship('BlogUser', primaryjoin='BlogArticle.user_id == BlogUser.id', backref='blog_articles')
def __repr__(self):
return self.title
@property
def serialize(self):
return {
"id": self.id,
"title": self.title,
"content": self.content,
"publish_time": dump_datetime(self.publish_time),
"user_id": self.user_id,
"tag_id": self.tag_id
}
class BlogComment(db.Model):
__tablename__ = 'blog_comment'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(255))
content = db.Column(db.Text)
time = db.Column(db.DateTime)
article_id = db.Column(db.ForeignKey('blog_article.id'), index=True)
article = db.relationship('BlogArticle', primaryjoin='BlogComment.article_id == BlogArticle.id', backref='blog_comments')
def __repr__(self):
return self.name
class BlogTag(db.Model):
__tablename__ = 'blog_tag'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(255))
def __repr__(self):
return self.name
class BlogUser(db.Model):
__tablename__ = 'blog_user'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(255))
password = db.Column(db.String(255))
def __init__(self, name, password):
self.name = name
self.password = self.set_password(password)
def __repr__(self):
return self.name
def set_password(self, password):
return bcrypt.generate_password_hash(password)
def check_password(self, password):
return bcrypt.check_password_hash(self.password, password)
app/tempates/blog/base.html
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<title>{% block title %}{% endblock %}</title>
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width,, initial-scale=1">
<!-- 全局引入bootstrap css -->
<link rel="stylesheet" type="text/css" href="{{ url_for('static', filename='bootstrap/css/bootstrap.min.css') }}">
<!-- <link rel="stylesheet" type="text/css" href="{{ url_for('static', filename='css/test.css') }}"> -->
<!-- 宏定义用于生成 Bootstrap 风格的分页链接列表 -->
{% macro render_pagination(endpoint, pagination, tag_id) %}
<nav>
<ul class="pagination">
<li class="page-item">
{% if pagination.has_prev %}
<a class="page-link" href="{{ url_for(endpoint, page=pagination.prev().page, tag_id=tag_id) }}" aria-label="Previous">
{% else %}
<a class="page-link" href="{{ url_for(endpoint, page=1, tag_id=tag_id) }}" aria-label="Previous">
{% endif %}
<span aria-hidden="true">«</span>
</a>
</li>
{% for page in pagination.iter_pages() %}
{% if page %}
{% if page!=pagination.page %}
<li class="page-item">
<a class="page-link" href="{{ url_for(endpoint, page=page, tag_id=tag_id) }}">
{{page}}
</a>
</li>
{% else %}
<li class="page-item active"><a class="page-link" href="">{{page}}</a></li>
{% endif %}
{% else %}
<li class="page-item"><a class="page-link">...</a></li>
{% endif %}
{% endfor %}
<li class="page-item">
<a class="page-link" href="{{ url_for(endpoint, page=pagination.next().page, tag_id=tag_id) }}" aria-label="Next">
<span aria-hidden="true">»</span>
</a>
</li>
</ul>
</nav>
{% endmacro %}
<!-- 添加对reCAPTCHA的支持 -->
{% block captcha %}
{% endblock %}
</head>
<body>
<div class="container">
<div class="jumbotron">
<h1><a href="{{ url_for('blog.article_list', page=1, tag_id=tag_id) }}"></a>Onekki's Blog</a></h1>
<p>Welcom to onekki's blog!</p>
</div>
{% with message_list = get_flashed_messages(with_categories=true) %}
{% if message %}
{% for category, message in message_list %}
<div class="button close" data-dismiss="alert" aria-label="Close">
<button type="button" class="close" data-dismiss="alert" aria-label="Close">
<span aria-hidden="true">×</span>
</button>
{{ message_list }}
</div>
{% endfor %}
{% endif %}
{% endwith %}
{% block body %}
body_content
{% endblock %}
</div>
<!-- 全局引入jquery -->
<script src="{{ url_for('static', filename='js/jquery-3.4.1.min.js') }}"></script>
<script src="{{ url_for('static', filename='js/jquery-1.11.3.min.js') }}"></script>
<!-- 全局引入bootstrap js -->
<script src="{{ url_for('static', filename='bootstrap/js/bootstrap.min.js') }}"></script>
<!-- <script src="{{ url_for('static', filename='js/bootstrap.js') }}"></script> -->
</body>
</html>
app/templates/login.html
{% extends "blog/base.html" %}
{% block body %}
<div class="col-lg-3">
<form method="POST" action="{{ url_for('blog.login') }}">
{{ form.hidden_tag() }}
<div>
{{ form.name.label }}
{% if form.name.errors %}
{% for error in form.name.errors %}
<p class="help-block">{{ error }}</p>
{% endfor %}
{% endif %}
{{ form.name(class_="form-control") }}
</div>
<div>
{{ form.password.label }}
{% if form.password.errors %}
{% for error in form.password.errors %}
<p class="help-block">{{ error }}</p>
{% endfor %}
{% endif %}
{{ form.password(class_="form-control") }}
</div>
<input type="submit" class="btn btn-primary" value="Login" >
</form>
</div>
{% endblock %}
app/templates/register.html
{% extends "blog/base.html" %}
{% block title %}
Register
{% endblock %}
{% block captcha %}
<script src="https://www.google.com/recaptcha/api.js" async defer></script>
{% endblock %}
{% block body %}
<div class="col-lg-3">
<form method="POST" action="{{ url_for('blog.register') }}">
{{ form.hidden_tag() }}
<div>
{{ form.name.label }}
{{ form.name(class_="form-control")}}
</div>
<div class="form-group">
{{ form.password.label }}
{{ form.password(class_="form-control") }}
</div>
<div class="form-group">
{{ form.confirm.label }}
{{ form.confirm(class_="form-control") }}
</div>
<input type="submit" class="btn btn-primary" value="Register">
<div class="g-recaptcha" data-sitekey="6LcqjqsUAAAAAItx4Fhs-bhG9L4rcXT4hbXooZTL"></div>
</form>
</div>
{% endblock %}
新增文章、修改文章
-
新增文章 修改文章
app/form/forms.py
class ArticleForm(FlaskForm):
title = StringField('title', [DataRequired(), Length(max=255)])
content = TextAreaField('content', [DataRequired()])
app/blog/views.py
# 新增文章
@blog.route('/article_add', methods=['GET', 'POST'])
def article_add():
form = ArticleForm()
if form.validate_on_submit():
new_article = BlogArticle()
new_article.title = form.title.data
new_article.content = form.content.data
new_article.publish_time = datetime.datetime.now()
db.session.add(new_article)
db.session.commit()
return redirect(url_for('blog.article_list', page=1))
return render_template('blog/article_add.html', form=form)
# 修改文章
@blog.route('/article_update/<int:id>', methods=['GET', 'POST'])
def article_update(id):
article = BlogArticle.query.get_or_404(id)
form = ArticleForm()
if form.validate_on_submit():
article.title = form.title.data
article.content = form.content.data
article.publish_time = datetime.datetime.now()
db.session.add(article)
db.session.commit()
return redirect(url_for('blog.article', id=article.id))
form.title.data = article.title
form.content.data = article.content
return render_template('blog/article_update.html', form=form, article=article)
包括ckeditor.js 富文本编辑器
app/templates/article_add.html
{% extends "blog/base.html" %}
{% block title %} New Article {% endblock %}
{% block body %}
<div class="row">
<h1 class="text-center">Create A New Article</h1>
<form method="POST" action="{{ url_for('blog.article_add') }}">
{{ form.hidden_tag() }}
<div class="form-group">
{% if form.title.errors %}
{% for e in form.title.errors %}
<pa class="help-block">{{ e }}</pa>
{% endfor %}
{% endif %}
{{ form.title(class_="form-control") }}
</div>
<div class="form-group">
{{ form.content.label }}
{% if form.content.errors %}
{% for e in form.content.errors %}
<p class="help-block">{{ e }}</p>
{% endfor %}
{% endif %}
{{ form.content(class_="form-control", id="editor", rows="10", cols="80") }}
</div>
<input class="btn btn-primary" type="submit" value="Submit">
</form>
</div>
{% endblock %}
{% block js %}
<script src="{{ url_for('static', filename='ckeditor/ckeditor.js') }}"></script>
<script>
CKEDITOR.replace('editor');
</script>
{% endblock js %}
app/templates/article_update.html
{% extends 'blog/base.html' %}
{% block title %} Edit Article {% endblock %}
{% block body %}
<div class="row">
<h1 class="text-center">Edit the Article</h1>
<form method="POST" action="{{ url_for('blog.article_update', id=article.id) }}">
{{ form.hidden_tag() }}
<div class="form-group">
{{ form.title.label }}
{% if form.title.errors %}
{% for e in form.title.errors %}
<p class="help-block">{{ e }}</p>
{% endfor %}
{% endif %}
{{ form.title(class_="form-control", value=article.title) }}
</div>
<div class="form-group">
{{ form.content.label }}
{% if form.content.errors %}
{% for e in form.content.errors %}
<p class="help-block">{{ e }}</p>
{% endfor %}
{% endif %}
{{ form.content(class_="form-control", id="editor", rows="10", cols="80") }}
</div>
<input class="btn btn-primary" type="submit" value="Submit">
</form>
</div>
{% endblock body %}
{% block js %}
<script src="{{ url_for('static', filename='ckeditor/ckeditor.js') }}"></script>
<script>
CKEDITOR.replace('editor');
</script>
{% endblock %}
article.html
...
<h3>{{ article.title }}</h3>
<div class="row">
<div class="col-lg-6">
<p>Written By {{ article.user_id }} on {{ article.publish_time }}</p>
<p>{{ article.content|safe }}</p>
</div>
<div class="column">
<div class="col-lg-2">
<a class="btn btn-primary" href="{{ url_for('blog.article_add') }}">New</a>
</div>
</div>
<div class="column">
<div class="col-lg-2">
<a class="btn btn-primary" href="{{ url_for('blog.article_update', id=article.id) }}">Edit</a>
</div>
</div>
</div>
<div class="col-lg-12">
...
第五阶段
使用flask-login完成用户登录信息的保存
-
配置
app/plugin/login_manager.py
from flask_login import LoginManager
login_manager = LoginManager()
login_manager.login_view = "blog.login"
login_manager.session_protection = "strong"
login_manager.login_message = "Please login to access this page."
login_manager.login_message_category = "info"
@login_manager.user_loader
def user_loader(id):
"""Load the user's info."""
from app.database.models import BlogUser
return BlogUser.query.filter_by(id=id).first()
注意
期间修改了数据库表的结构
- 把tag和article改成多对多
- 把user和role改成多对多
重构了models目录
修改了插件引入方式
现在的目录结构为:
.
├── LICENSE
├── Pipfile
├── Pipfile.lock
├── README.md
├── app
│ ├── __init__.py
│ ├── admin
│ │ ├── __init__.py
│ │ └── views.py
│ ├── blog
│ │ ├── __init__.py
│ │ └── views.py
│ ├── database
│ │ ├── __init__.py
│ │ │ └── __init__.cpython-37.pyc
│ │ └── models
│ │ ├── __init__.py
│ │ ├── blog_article.py
│ │ ├── blog_comment.py
│ │ ├── blog_role.py
│ │ ├── blog_tag.py
│ │ ├── blog_user.py
│ │ └── many2many.py
│ ├── form
│ │ ├── __init__.py
│ │ ├── blog_article.py
│ │ ├── blog_comment.py
│ │ ├── blog_login.py
│ │ └── blog_register.py
│ ├── plugin
│ │ ├── __init__.py
│ │ ├── bcrypt.py
│ │ ├── login_manager.py
│ │ ├── principal.py
│ │ └── sqlalchemy.py
│ ├── site
│ │ ├── __init__.py
│ │ └── views.py
│ ├── static
│ │ ├── bootstrap
│ │ │ ├── css
│ │ │ │ ├── ...
│ │ │ └── js
│ │ │ ├── ...
│ │ ├── ckeditor
│ │ │ ├── ...
│ │ ├── css
│ │ │ └── test.css
│ │ └── js
│ │ ├── jquery-1.11.3.min.js
│ │ └── jquery-3.4.1.min.js
│ ├── templates
│ │ └── blog
│ │ ├── article.html
│ │ ├── article_add.html
│ │ ├── article_list.html
│ │ ├── article_update.html
│ │ ├── base.html
│ │ ├── comment.html
│ │ ├── index.html
│ │ ├── login.html
│ │ ├── register.html
│ │ ├── tag.html
│ │ └── user.html
│ └── utils
├── config.py
├── gen_models.py
└── manager.py
-
配置模型
app/database/models/BlogUser.py
# coding: utf-8
from flask_login import AnonymousUserMixin
from app import db, bcrypt
class BlogUser(db.Model):
__tablename__ = 'blog_user'
__table_args__ = {'schema': 'onekki_site'}
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(255), unique=True)
password = db.Column(db.String(255))
roles = db.relationship('BlogRole', secondary='onekki_site.blog_user_role', backref=db.backref('blog_users', lazy='dynamic'))
# 自定义
def __init__(self, name, password):
self.name = name
self.password = self.set_password(password)
defalut = db.session.query('BlogRole').filter_by(name="default").one()
self.roles.append(defalut)
def __repr__(self):
return "<BlogUser:`{}`>".format(self.name)
def set_password(self, password):
return bcrypt.generate_password_hash(password)
def check_password(self, password):
return bcrypt.check_password_hash(self.password, password)
def is_authenticated(self):
if isinstance(self, AnonymousUserMixin):
return False
else:
return True
def is_active(self):
return True
def is_anonymous(self):
if isinstance(self, AnonymousUserMixin):
return True
else:
return False
def get_id(self):
return self.id
app/blog/views.py
# 登录
@blog.route('/login', methods=['GET', 'POST'])
def login():
form = LoginForm()
if form.validate_on_submit():
user = BlogUser.query.filter_by(name=form.name.data).one()
login_user(user, remember=form.remember.data)
flash("登录成功", category="success")
return redirect(url_for('blog.article_list', page=1))
return render_template('blog/login.html',form=form)
# 新增文章
@blog.route('/article_add', methods=['GET', 'POST'])
@login_required
def article_add():
...
# 修改文章
@blog.route('/article_update/<int:id>', methods=['GET', 'POST'])
@login_required
使用flask-principal验证用户的权限
-
配置插件
app/plugin/principal.py
from flask_principal import Principal, Permission, RoleNeed
principal = Principal()
permission_admin = Permission(RoleNeed('admin'))
permission_poster = Permission(RoleNeed('poster'))
permission_default = Permission(RoleNeed('default'))
-
使用
app/__init__.py
def create_app(config_name):
...
principal.init_app(app)
@identity_loaded.connect_via(app)
def on_identity_loaded(sender, identity):
identity.user = current_user
if hasattr(current_user, 'id'):
identity.provides.add(UserNeed(current_user.id))
if hasattr(current_user, 'roles'):
for role in current_user.roles:
identity.provides.add(RoleNeed(role.name))
...
return app
app/blog/views.py
# 登录
@blog.route('/login', methods=['GET', 'POST'])
def login():
form = LoginForm()
if form.validate_on_submit():
user = BlogUser.query.filter_by(name=form.name.data).one()
login_user(user, remember=form.remember.data)
identity_changed.send(
current_app._get_current_object(), identity=Identity(user.id)
)
flash("登录成功", category="success")
return redirect(url_for('blog.article_list', page=1))
return render_template('blog/login.html',form=form)
# 登出
@blog.route('/logout', methods=['GET', 'POST'])
def logout():
logout_user()
identity_changed.send(
current_app._get_current_object(), identity=AnonymousIdentity()
)
flash("退出登录成功", category="success")
return redirect(url_for('blog.login'))
# 修改文章
@blog.route('/article_update/<int:id>', methods=['GET', 'POST'])
@login_required
@permission_poster.require(http_exception=403)
def article_update(id):
article = BlogArticle.query.get_or_404(id)
if not current_user:
return redirect(url_for('blog.login'))
if current_user != article.user:
return redirect(url_for('blog.article', id=id))
permission = Permission(UserNeed(article.user.id))
if permission.can() or permission_admin.can():
form = ArticleForm()
if form.validate_on_submit():
article.title = form.title.data
article.content = form.content.data
article.publish_time = datetime.datetime.now()
db.session.add(article)
db.session.commit()
return redirect(url_for('blog.article', id=article.id))
else:
abort(403)
form.title.data = article.title
form.content.data = article.content
return render_template('blog/article_update.html', form=form, article=article)
gen_models.py
# coding: utf-8
from sqlalchemy import Column, DateTime, ForeignKey, Integer, String, Table, Text
from sqlalchemy.orm import relationship
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()
class BlogArticle(db.Model):
__tablename__ = 'blog_article'
__table_args__ = {'schema': 'onekki_site'}
id = db.Column(db.Integer, primary_key=True)
title = db.Column(db.String(45))
content = db.Column(db.Text)
publish_time = db.Column(db.DateTime)
user_id = db.Column(db.ForeignKey('onekki_site.blog_user.id'), index=True)
user = db.relationship('BlogUser', primaryjoin='BlogArticle.user_id == BlogUser.id', backref='blog_articles')
tags = db.relationship('BlogTag', secondary='onekki_site.blog_article_tag', backref='blog_articles')
t_blog_article_tag = db.Table(
'blog_article_tag',
db.Column('article_id', db.ForeignKey('onekki_site.blog_article.id'), primary_key=True, nullable=False),
db.Column('tag_id', db.ForeignKey('onekki_site.blog_tag.id'), primary_key=True, nullable=False, index=True),
schema='onekki_site'
)
class BlogComment(db.Model):
__tablename__ = 'blog_comment'
__table_args__ = {'schema': 'onekki_site'}
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(255))
content = db.Column(db.Text)
time = db.Column(db.DateTime)
article_id = db.Column(db.ForeignKey('onekki_site.blog_article.id'), index=True)
article = db.relationship('BlogArticle', primaryjoin='BlogComment.article_id == BlogArticle.id', backref='blog_comments')
class BlogRole(db.Model):
__tablename__ = 'blog_role'
__table_args__ = {'schema': 'onekki_site'}
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(255), unique=True)
description = db.Column(db.String(255))
users = db.relationship('BlogUser', secondary='onekki_site.blog_user_role', backref='blog_roles')
class BlogTag(db.Model):
__tablename__ = 'blog_tag'
__table_args__ = {'schema': 'onekki_site'}
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(255))
class BlogUser(db.Model):
__tablename__ = 'blog_user'
__table_args__ = {'schema': 'onekki_site'}
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(255), unique=True)
password = db.Column(db.String(255))
t_blog_user_role = db.Table(
'blog_user_role',
db.Column('user_id', db.ForeignKey('onekki_site.blog_user.id'), primary_key=True, nullable=False),
db.Column('role_id', db.ForeignKey('onekki_site.blog_role.id'), primary_key=True, nullable=False, index=True),
schema='onekki_site'
)
Flask-Celery-Helper 实现异步任务
- 文件上传、邮件发送等
- 要和消息队列配合使用
- 这里采用redis
安装redis
- 我选择在ubuntu服务器上安装了redius
配置方法网上都有
配置celery
config.py
class DevelopmentConfig(BaseConfig):
DEBUG = True
CELERY_RESULT_BACKEND = 'redis://39.107.230.35:6379/0'
CELERY_BROKER_URL = 'redis://39.107.230.35:6379/0'
CACHE_TYPE = 'simple'
pass
- 在app中注册celery对象
- 要知道 celery application和flask application是两个进程
- flask支持将context注入到其他application中,这样其他application就可以访问flask application里的内容
用工厂模式创建celery对象
celery_runner.py
import os
from celery import Celery, Task
from app import create_app
def make_celery(app):
celery = Celery(
app.import_name,
backend=app.config['CELERY_RESULT_BACKEND'],
broker=app.config['CELERY_BROKER_URL']
)
celery.conf.update(app.config)
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
return celery
flask_app = create_app(os.getenv('FLASK_CONFIG') or 'default')
celery = make_celery(flask_app)
-
数据库新建表
app/database/models/blog_reminder.py
from app import db
class BlogReminder(db.Model):
__tablename__ = 'blog_reminder'
__table_args__ = {'schema': 'onekki_site'}
id = db.Column(db.Integer, primary_key=True)
time = db.Column(db.DateTime)
email = db.Column(db.String(255))
content = db.Column(db.Text)
-
创建tasks
app/tasks/blog_task.py
import smtplib
from datetime import datetime
from email.mime.text import MIMEText
from flask_mail import Message
from app.plugins import celery, mail
from app.database.models import BlogReminder
@celery.task(
bind=True,
ignore_result=True,
default_retry_delay=300,
max_retries=5
)
def remind(self, primary_key):
reminder = BlogReminder.query.get(primary_key)
msg = MIMEText(remind.content)
msg['Subject'] = "Welcome!"
msg['From'] = "<your email>"
msg['To'] = reminder.email
try:
smtp_server = smtplib.SMTP('39.107.230.35')
smtp_server.starttls()
smtp_server.login("<user>", "<password>")
smtp_server.sendmail(
"<your_email>", [reminder.email], msg.as_string()
)
smtp_server.close()
return
except Exception as e:
self.rety(exc=e)
def on_reminder_save(mapper, connect, self):
remind.apply_async(args=(self.id), eta=self.date)
回调函数on_reminder_save
- 主程序监听数据库插入消息
from sqlalchemy import event
def create_app(object_name):
...
event.listen(BlogReminder, 'after_insert', on_reminder_save)
...
用flask-cache实现网页缓存加速
注册flask-cache到app
在config中配置缓存类型
class DevConfig(Config):
...
CACHE_TYPE = 'simple'
-
缓存无参数的普通函数
app/controllers/blog/views.py
@cache.cached(timeout=7200, key_prefix='sidebar_dada')
def sidebar_data():
recent_article_list = BlogArticle.query.order_by(
BlogArticle.publish_time.desc()
).limit(5).all()
count = func.count(t_blog_article_tag.c.article_id).label('total')
top_tag_list = db.session.query(
BlogTag.id, BlogTag.name, count
).filter(
BlogTag.id==t_blog_article_tag.c.tag_id
).group_by(BlogTag.id).order_by(count.desc()).limit(5).all()
return recent_article_list, top_tag_list
- 缓存带参数的普通函数(我没写)
@staticmethod
@cache.memoize(60)
def verify_auth_token(token):
"""Validate the token whether is night."""
serializer = Serializer(
current_app.config['SECRET_KEY'])
try:
# serializer object already has tokens in itself and wait for
# compare with token from HTTP Request /api/posts Method `POST`.
data = serializer.loads(token)
except SignatureExpired:
return None
except BadSignature:
return None
user = User.query.filter_by(id=data['id']).first()
return user
- 缓存无动态参数的视图函数
@blog.route('/article_list/<int:page>')
@cache.cached(timeout=60)
def article_list(page=1):
filters = []
tag_id = request.values.get('tag_id')
if tag_id != None:
filters.append(t_blog_article_tag.c.tag_id==tag_id)
...
- 缓存带动态参数的视图函数
# 获取文章
@blog.route('/article/<int:id>', methods=['GET','POST'])
@cache.cached(timeout=60, key_prefix=make_cache_key)
def article(id):
form = CommentForm()
...
flask-admin实现后台管理
注册flask-admin
BaseView基础管理页面
ModelView模型管理页面
文章修改增强页面(为TextArea注入class)
文件管理
-
权限安全
app/controllers/views.py
# 获取蓝图
# from app.admin import admin
# # 获取数据库模型对象和SQLAlchemy对象db,注意不可使用App模块中的db
# # from app.admin.models import *
# # 设置路由
# @admin.route('/')
# def index():
# return 'admin首页'
from flask_admin import BaseView, expose
class CustomView(BaseView):
@expose('/')
def index(self):
return self.render('admin/custom.html')
@expose('/second_page')
def second_page(self):
return self.render('admin/second_page.html')
from flask_admin.contrib.sqla import ModelView
class CustomModelView(ModelView):
pass
from app.forms import CKTextAreaField
class ArticleView(CustomModelView):
form_overrides = dict(content=CKTextAreaField)
column_searchable_list = ('content', 'title')
column_filters = ('publish_time',)
create_template = 'admin/article_update.html'
edit_template = 'admin/article_update.html'
from flask_admin.contrib.fileadmin import FileAdmin
class CustomFileView(FileAdmin):
pass
其中ArticleView要在forms新建widgets
app/forms/blog/blog_admin.py
from wtforms import (
widgets,
StringField,
TextField,
TextAreaField,
PasswordField,
BooleanField,
ValidationError
)
class CKTextAreaWidget(widgets.TextArea):
def __call__(self, field, **kwargs):
kwargs.setdefault('class_', 'ckeditor')
return super(CKTextAreaWidget, self).__call__(field, **kwargs)
class CKTextAreaField(TextAreaField):
widget = CKTextAreaWidget()
app/__init__.py
def create_app(config_name):
admin.init_app(app)
admin.add_view(CustomView(name='Custom'))
model_list = [BlogComment, BlogReminder, BlogRole, BlogTag, BlogUser]
for model in model_list:
admin.add_view(
CustomModelView(model, db.session, category='Models')
)
admin.add_view(ArticleView(BlogArticle, db.session, name='EditArticle'))
admin.add_view(
CustomFileView(os.path.join(os.path.dirname(__file__), 'static'),
'/static',
name='Static Files')
)
custom.html
{% extends 'admin/master.html' %}
{% block body %}
This is the custom view!
<a href="{{ url_for('customview.second_page') }}">Link</a>
{% endblock %}
article_edit.html
{% extends 'admin/model/edit.html' %}
{% block tail %}
{{ super() }}
<script src="//cdn.ckeditor.com/4.4.7/standard/ckeditor.js"></script>
{% endblock %}