个人网站搭建过程记录(一)

OnekkiSite搭建过程

使用的框架

  • flask
  • flask_sqlalchemy
  • flask_script
  • flask-sqlacodegen
  • pymysql
  • flask-wtf
  • flask-bcrypy
  • flask-login
  • flask-principal
  • flask-celery-helper
  • redis
  • flask-cache
  • flask-admin
  • flask-restful

搭建开发环境

第一阶段

创建环境

pipenv —python 3.7

进入环境

pipenv shell

目录结构

.
├── LICENSE
├── Pipfile
├── README.md
├── app
│   ├── __init__.py
│   ├── admin
│   ├── site
│   ├── static
│   ├── templates
│   ├── test
│   └── utils
├── config.py
└── manage.py

编写配置文件

config.py

import os

# 项目绝对路径
BASE_DIR = os.getcwd()
# 模版文件路径
TEMPLATES_DIR = os.path.join(BASE_DIR, 'templates')
# 静态文件路径
STATIC_DIR = os.path.join(BASE_DIR, 'static')

# 数据库URI
SQLALCHEMY_DATABASE_URI = 'mysql+pymysql://root:123456@39.107.230.35:3306/onekki_site'

# 查询跟踪, 不太需要, False, 不占用额外的内存
SQLALCHEMY_TRACK_MODIFICATIONS = False

编写启动文件

app/__init__.py

from flask import Flask
# mysql引入
from flask_sqlalchemy import SQLAlchemy
# 配置文件引入
from config import SQLALCHEMY_DATABASE_URI, SQLALCHEMY_TRACK_MODIFICATIONS

db = SQLAlchemy()

def create_app():
    """创建app的方法"""
    # 生成Flask对象
    app = Flask(__name__)
    # 配置app的mysql
    app.config['SQLALCHEMY_DATABASE_URI'] = SQLALCHEMY_DATABASE_URI
    app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = SQLALCHEMY_TRACK_MODIFICATIONS
    # 用SQLAlchemy初始化app
    db.init_app(app=app)
    
    # 在这还可以配置其他模块, 如socketio
    
    # 放回Flask对象app
    return app

在site模块中创建蓝图

app/site/__init__.py

from flask import Blueprint

from config import TEMPLATES_DIR, STATIC_DIR

# 创建一个蓝图对象, 设置别名, 模板文件地址, 静态文件地址
site = Blueprint('site', __name__,
    template_folder=TEMPLATES_DIR,
    static_folder=STATIC_DIR)

# 这里导入是为了在解释时, 蓝图能加载到views文件中的路由数据
from app.site import views

为site模块设置路由

app/site/views.py

# 获取蓝图
from app.site import site
# 获取数据库模型对象和SQLAlchemy对象db, 注意不可使用App模块中的db
from app.site.models import *

# 设置路由
@site.route('/')
def index():
    return '首页'

admin模块或者其他模块(blog)配置与之相同

注册蓝图启动服务器

**manage.py**
from flask_script import Manager

from app import create_app
from app.site import site

# 创建app
app = create_app()
# 注册site的蓝图
app.register_blueprint(site, url_prefix = '/site')

# 通过app创建manager对象
manager = Manager(app)

if __name__ == '__main__':
    # 运行服务器
    manager.run()
  • 注意:严格遵守python导入包的编写顺序
  1. Python Build In 内构包
  2. Python 第三方库
  3. 用户自定义模块

配置完成,运行服务器

python manager.py runserver -h 127.0.0.1 -p 5000 -d

访问服务器

http://127.0.0.1:5000/site/

以上参考文档

https://www.jianshu.com/p/11dad266f0b5

改善配置方式

config.py

import os

# 项目绝对路径
BASE_DIR = os.getcwd()
# 模版文件路径
TEMPLATES_DIR = os.path.join(BASE_DIR, 'templates')
# 静态文件路径
STATIC_DIR = os.path.join(BASE_DIR, 'static')

# 数据库URI
MYSQL_DATABASE_URI = 'mysql+pymysql://root:123456@39.107.230.35:3306/onekki_site'

# 查询跟踪, 不太需要, False, 不占用额外的内存
SQLALCHEMY_TRACK_MODIFICATIONS = False

class BaseConfig(object):
    SQLALCHEMY_DATABASE_URI = MYSQL_DATABASE_URI
    
    @staticmethod
    def init_app(app):
        pass

class DevelopmentConfig(BaseConfig):
    DEBUG = True
    pass


class TestingConfig(BaseConfig):
    TESTING = True
    pass

class ProductionConfig(BaseConfig):
    pass


configs = {
    'development': DevelopmentConfig,
    'testing': TestingConfig,
    'production': ProductionConfig,
    'default': DevelopmentConfig
}

app/__init__.py

from flask import Flask
# mysql引入
from flask_sqlalchemy import SQLAlchemy
# 配置文件引入
from config import configs

db = SQLAlchemy()

def create_app(config_name):
    """创建app的方法"""
    # 生成Flask对象
    app = Flask(__name__)

    # 导入配置
    app.config.from_object(configs[config_name])
    configs[config_name].init_app(app)
    # 用SQLAlchemy初始化app
    db.init_app(app=app)
    
    # 在这还可以配置其他模块, 如socketio

    # 放回Flask对象app
    return app

manager.py

# 创建app
app = create_app(os.getenv('FLASK_CONFIG') or 'default')

第二阶段

  • 注意

创建数据库时用对应的应用前缀

初始化SQLAchemy

db = SQLAlchemy(app)

创建数据模型和表

  • 方式1:

用数据库客户端创建好数据库, 然后通过flask-sqlacodegen生成model文件(ORM模型)

# 整体映射database数据库并写入model.py文件
flask-sqlacodegen \
'mysql+pymysql://username:password@127.0.0.1/database' \
--outfile 'model.py' \
--flask

# 映射table数据表并写入table.py文件
flask-sqlacodegen \
'mysql+pymysql://username:password@127.0.0.1/database' \
--table table \
--outfile 'model.py' \
--flask

本人使用

flask-sqlacodegen 'mysql+pymysql://root:zwq123456@39.107.230.35/onekki_site' --outfile "gen_models.py" --flask --schema onekki_site

注意下面的写法的区别

flask-sqlacodegen 'mysql+pymysql://root:zwq123456@39.107.230.35/onekki_site' > "gen_models.py"
  • 方式2:

直接定义表结构, 然后通过manage.py添加创建数据库表的指令

manage.py

import os
from flask_script import Manager, Server

from app import create_app, db
from app.site import site

# 创建app
app = create_app(os.getenv('FLASK_CONFIG') or 'default')
# 注册site的蓝图
app.register_blueprint(site, url_prefix = '/site')

# 通过app创建manager对象
manager = Manager(app)

# 从模型创建数据库的指令
manager.add_command("server", Server())
@manager.shell
def make_shell_context():
    return dict(app=app,db=db,Model1=models.Model1,Model2=models.Model2,...)

if __name__ == '__main__':
    # 运行服务器
    manager.run()

在终端执行, 即可完成对应表的创建

(OnekkiSite) bash-3.2$ python manage.py shell
>>> db.create_all()

本人采用了方式1, 但是在后续更新数据表结构会采用另一种方式, 这将在下文中提到

注意把生成文件里的

db = SQLAlchemy()

换成, 避免二次创建db

from app import db

SqlAlchemy数据的CRUD

from models import Users
from sqlalchemy.orm import sessionmaker, scoped_session
from sqlalchemy import create_engine


engine = create_engine("mysql+pymysql://root:199199@127.0.0.1:3306/blog?charset=utf8", max_overflow=2, pool_size=5)
Connection = sessionmaker(bind=engine)

# 每次执行操作, 都要创建一个Connection, 两种连接都可以
# conn = Connection()
conn = scoped_session(Connection)

obj1 = Users(name='dayu')

# 增加一条数据
# conn.add(obj1)
#
# # 增加多条数据
# conn.add_all([
#     Users(name='dayu3'),
#     Users(name='dayu4'),
#     Users(name='dayu5'),
#     Users(name='dayu6')
# ])

# # 查询数据
# user_list = conn.query(Users).all()
# for user in user_list:
#     print(user.id, user.name)

# # 查询id大于2的数据
# user_list = conn.query(Users).filter(Users.id > 2)
# for user in user_list:
#     print(user.id, user.name)

# 删除id大于2的
# conn.query(Users).filter(Users.id > 2).delete()

# 修改
conn.query(Users).filter(Users.id == 2).update({'name': 'da'})

# 提交事务
conn.commit()

# 关闭Connection
conn.close()

数据表的关系

app/blog/models.py

# coding: utf-8
from app import db
from sqlalchemy import Column, DateTime, ForeignKey, Integer, String, Text
from sqlalchemy.orm import relationship


class BlogArticle(db.Model):
    __tablename__ = 'blog_article'

    id = db.Column(db.Integer, primary_key=True)
    title = db.Column(db.String(45))
    content = db.Column(db.Text)
    publish_time = db.Column(db.DateTime)
    user_id = db.Column(db.ForeignKey('blog_user.id'), index=True)
    tag_id = db.Column(db.ForeignKey('blog_tag.id'), index=True)

    tag = db.relationship('BlogTag', primaryjoin='BlogArticle.tag_id == BlogTag.id', backref='blog_articles')
    user = db.relationship('BlogUser', primaryjoin='BlogArticle.user_id == BlogUser.id', backref='blog_articles')


class BlogComment(db.Model):
    __tablename__ = 'blog_comment'

    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(255))
    content = db.Column(db.Text)
    time = db.Column(db.DateTime)
    article_id = db.Column(db.ForeignKey('blog_article.id'), index=True)

    article = db.relationship('BlogArticle', primaryjoin='BlogComment.article_id == BlogArticle.id', backref='blog_comments')


class BlogTag(db.Model):
    __tablename__ = 'blog_tag'

    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(255))
    article_id = db.Column(db.ForeignKey('blog_tag.id'), index=True)

    article = db.relationship('BlogTag', remote_side=[id], primaryjoin='BlogTag.article_id == BlogTag.id', backref='blog_tags')


class BlogUser(db.Model):
    __tablename__ = 'blog_user'

    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(255))
    password = db.Column(db.String(255))

第三阶段

Jinjia的使用

  • 变量
{{ variables }}
  • 注释
{# Documents #}
  • 控制语句
{% controller_flow %}

if语句

{% if user.is_logged_in() %}
    <a href='/logout'>Logout</a>
{% else %}
    <a href='/login'>Login</a>
{% endif %}

循环

{% for post in posts if post.text %}
    <div>
        <h1>{{ post.title }}</h1>
        <p>{{ post.text | safe }}</p>
    </div>
{% endfor %}

# or

{% for post in posts %}
    {{ loop.index }}-{{ post.title }}
{% endfor %}
  • 过滤器
无参数
{{ variable | filter_name }}
带参数
{{ variables | filter_name(*args) }}
{% macro input(name, label, value='', type='text')%}
    <div class="form-group">
        <label for"{{ name }}">{{ label }}</div>
        <input type="{{ type }}" name="{{ name }}"
            value="{{ value | escape }}" class="form-control">
    </div>
{% endmacro %}

调用宏

{{ input('name', 'Name') }}

结果

<div class="form-group">
  <label for"name">Name</div>
  <input type="text" name="name" value="" class="form-control">
</div>
  • 兼容javascript
{% raw %}
    JavaScript statements
{% endraw %}

Jinjia常用过滤器

  • default
{{ post.date | default("2016-11-22") }}
  • float
{{ 75 | float }}
  • int
{{ 75.5 | int }}
  • length
The Count: {{ post.tags | length }}
  • title
# 首字母大写
{{ "post title" | title }}
  • round
# 浮点数精度
{{ 3.14159 | round(1) }}

# common 参数:四舍五入
# floor 参数:截取整数部分
# ceil 参数:向上取整
{{ 4.7 | rount(1, "common")}}
  • join
# 列表转","分割的字符串
{{ ['JmilkFan', 'fanguiju' ] | join(',')}}
  • tojson
{{ {"key": "value"} | tojson }}
{{ posts | tojson | safe }}
  • truncate
# 截取字符串, 并在后面加省略号
{{ "a long stringggggggggggggggggg " | truncate(5) }}
  • escape
# 转义打印
{{ "<h1>Title<\h1>" | escape }}
  • safe
# 用escape会转义失败, 换safe解决
{{ "<h1>Post Title"</h1> | safe }}
  • 自定义过滤器

定义函数

def count_substring_from_python(string, sub):
    return string.count(sub)

声明函数

app.jinja_env.filters['count_substring'] = count_substring_from_python

调用

{{ variable | filter_name("String") }}

Flask的特殊变量和方法(在模版中可以直接访问)

  • config对象

app.config 对象, 其包含了 class DevConfig 的属性

{{ config.SQLALCHEMY_DATABASE_URI }}
  • request对象

Flask 中表示当前请求的 request 对象

{{ request.url }}
  • session对象

Flask 的 session 对象

{{ session.new }}
  • url_for方法
{{ url_for('home') }}
# /post/1024
{{ url_for('post', post_id=1024) }}  
  • get_flashed_messages方法

返回之前在 Flask 中通过 flash() 传入的信息列表, 类似实现一个队列

{% for message in get_flashed_messages() %}
    {{ message }}
{% endfor %}

优化调整

  • 将app/admin/models.py、app/blog/models.py、app/site/models.py 删除, model写在app/db/models.py下
  • 将db = SQLAlchemy()写在models.py文件中
  • 配置文件添加JSON_AS_ASCII = False来让返回的json数据中文不显示为unicode

编写视图和前端页面

app/blog/views.py

from flask import render_template, jsonify
from sqlalchemy import func
# from flask_sqlalchemy import Pagination
# 获取蓝图
from app.blog import blog
# 获取数据库模型对象和SQLAlchemy对象db, 注意不可使用App模块中的db
# from app.blog.models import BlogArticle, BlogTag
# from app import db
from app.db.models import *

def json_return(code, msg,data):
    return jsonify({"code":code, "msg":msg, "data": data})

# 设置路由
@blog.route('/')
def index():
    return render_template("blog/index.html")
   
@blog.route("/article/<int:page>")
def article(page=1):
    articles = BlogArticle.query.paginate(page, 10)
    # return json_return(200, "success", [i.serialize for i in articles])
    return render_template("blog/article.html", articles=articles)

@blog.route("/user/<int:page>")
def user(page=1):
    users = BlogUser.query.all()
    # return json_return(200, "success", [i.serialize for i in articles])
    return render_template("blog/user.html", users=users)

@blog.route("/tag/<int:page>")
def tag(page=1):
    tags = BlogTag.query.all()
    # return json_return(200, "success", [i.serialize for i in articles])
    return render_template("blog/tag.html", tags=tags)

@blog.route("/comment/<int:page>")
def comment(page=1):
    comments = BlogComment.query.all()
    # return json_return(200, "success", [i.serialize for i in articles])
    return render_template("blog/comment.html", comments=comments)

app/templates/blog/article.html

{{ articles }}

其他页面和上面类似:comment.html、tag.html、user.html

编写Jinjia模版

在编写之前到bootstrap官网下载css和js文件, 解压到static文件夹

app/tempates/blog/base.html

<!DOCTYPE html>
<html>
    <head>
        <meta charset="utf-8">
        <title>{% block title %}{% endblock %}</title>
        <meta http-equiv="X-UA-Compatible" content="IE=edge">
        <meta name="viewport" content="width=device-width,, initial-scale=1">
        <!-- 全局引入bootstrap css -->
        <link rel="stylesheet" type="text/css" href="{{ url_for('static', filename='bootstrap/css/bootstrap.min.css') }}">
        <!-- <link rel="stylesheet" type="text/css" href="{{ url_for('static', filename='css/test.css') }}"> -->
        <!-- 宏定义用于生成 Bootstrap 风格的分页链接列表 -->
        {% macro render_pagination(pagination, endpoint) %}
            <nav>
                <ul class="pagination">
                    <li class="page-item">
                        {% if pagination.has_prev %}
                            <a class="page-link" href="{{ url_for('blog.article', page=pagination.prev().page) }}" aria-label="Previous">
                        {% else %}
                            <a class="page-link" href="{{ url_for('blog.article', page=1) }}" aria-label="Previous">
                        {% endif %}
                            <span aria-hidden="true">«</span>
                        </a>
                    </li>
                    {% for page in pagination.iter_pages() %}
                        {% if page %}
                             {% if page!=pagination.page %}
                                <li class="page-item">
                                    <a class="page-link" href="{{ url_for(endpoint, page=page) }}">
                                        {{page}}
                                    </a>
                                </li>
                            {% else %}
                                <li class="page-item active"><a class="page-link" href="">{{page}}</a></li>
                            {% endif %}
                        {% else %}
                            <li class="page-item"><a class="page-link">...</a></li>
                        {% endif %}
                    {% endfor %}
                    <li class="page-item">
                        <a class="page-link" href="{{ url_for('blog.article', page=pagination.next().page) }}" aria-label="Next">
                            <span aria-hidden="true">»</span>
                        </a>
                    </li>
                </ul>
            </nav>
        {% endmacro %}
    </head>
    <body>
        <div class="container">
            <div class="jumbotron">
                <h1><a class="button" href="{{ url_for('blog.article', page=1) }}">Onekki's Blog</a></h1>
                <p>Welcom to onekki's blog!</p>
            </div>
            {% block body %}
                body_content
            {% endblock %}
        </div>
        <!-- 全局引入jquery -->
        <script src="{{  url_for('static', filename='js/jquery-3.4.1.min.js') }}"></script>
        <script src="{{  url_for('static', filename='js/jquery-1.11.3.min.js') }}"></script>
        <!-- 全局引入bootstrap js -->
        <script src="{{ url_for('static', filename='bootstrap/js/bootstrap.min.js') }}"></script>
        <!-- <script src="{{ url_for('static', filename='js/bootstrap.js') }}"></script> -->
    </body>
</html>

模版继承

article.html

{% extends "blog/base.html" %}
{% block title %}Onekki Blog{% endblock %}
{% block body %}{{ articles }}{% endblock %}

添加分页

{% extends "blog/base.html" %}
{% block title %}Onekki Blog{% endblock %}
{% block body %}
    <div class="row">
        <div class="col-lg-9">
            <!-- 分页 -->
            {% for article in articles.items %}
                <div class="row">
                    <div class="col-lg-12">
                        <h2>{{ article.title }}</h2>
                    </div>
                </div>
                <div class="row">
                    <div class="col-lg-12">
                        {{ article.content|truncate(255)|safe}}
                        <!-- <a href="{{ url_for('blog.article', page=1) }}">Read More</a> -->
                    </div>
                </div>
            {% endfor %}
        </div>
    </div>
    <div class="row">
        <!-- 调用父模版的 macro render_pagination -->
        {{ render_pagination(articles, 'blog.article') }}
    </div>
{% endblock %}
  • 记录一下项目结构
.
├── LICENSE
├── Pipfile
├── Pipfile.lock
├── README.md
├── __pycache__
│   ├── config.cpython-37.pyc
│   └── manager.cpython-37.pyc
├── app
│   ├── __init__.py
│   ├── __pycache__
│   │   └── __init__.cpython-37.pyc
│   ├── admin
│   │   ├── __init__.py
│   │   └── views.py
│   ├── blog
│   │   ├── __init__.py
│   │   ├── __pycache__
│   │   │   ├── __init__.cpython-37.pyc
│   │   │   └── views.cpython-37.pyc
│   │   └── views.py
│   ├── db
│   │   ├── __init__.py
│   │   ├── __pycache__
│   │   │   ├── __init__.cpython-37.pyc
│   │   │   └── models.cpython-37.pyc
│   │   └── models.py
│   ├── site
│   │   ├── __init__.py
│   │   ├── __pycache__
│   │   │   ├── __init__.cpython-37.pyc
│   │   │   └── views.cpython-37.pyc
│   │   └── views.py
│   ├── static
│   │   ├── css
│   │   │   ├── bootstrap-grid.css
│   │   │   ├── bootstrap-grid.css.map
│   │   │   ├── bootstrap-grid.min.css
│   │   │   ├── bootstrap-grid.min.css.map
│   │   │   ├── bootstrap-reboot.css
│   │   │   ├── bootstrap-reboot.css.map
│   │   │   ├── bootstrap-reboot.min.css
│   │   │   ├── bootstrap-reboot.min.css.map
│   │   │   ├── bootstrap.css
│   │   │   ├── bootstrap.css.map
│   │   │   ├── bootstrap.min.css
│   │   │   └── bootstrap.min.css.map
│   │   └── js
│   │       ├── bootstrap.bundle.js
│   │       ├── bootstrap.bundle.js.map
│   │       ├── bootstrap.bundle.min.js
│   │       ├── bootstrap.bundle.min.js.map
│   │       ├── bootstrap.js
│   │       ├── bootstrap.js.map
│   │       ├── bootstrap.min.js
│   │       └── bootstrap.min.js.map
│   ├── templates
│   │   └── blog
│   │       ├── article.html
│   │       ├── base.html
│   │       ├── comment.html
│   │       ├── index.html
│   │       ├── tag.html
│   │       └── user.html
│   ├── test
│   └── utils
├── config.py
├── gen_models.py
└── manager.py

服务器表单验证 - WTForms

WTForms作用
  • 保证安全的前提下, 对常见的表单类型进行输入的合法性验证
  • Jinja HTML 渲染
  • 预防跨域请求伪造(CSRF)和SQL 注入
WTForms组成
  • 字段

    表示表单的输入框, 会做一些初步的输入检查

  • 检验器

    是一组被附加到字段(输入框)上的函数, 用于对输入数据的检验, 确保输入我们期望的数据

  • 表单

    是一个 Python 类, 其中包含了 字段(类属性) 和 检验器, 在接收到 HTTP POST 请求时, 会根据定义的检验器规则来对输入数据进行检验

  • 常用字段类型

    • fields.DateField

      对应了 Python 中的 Date 对象,可以接收一个 format 可选参数来设定 Date 格式,该参数需要传入一个 strftime(格式化输出时间) 的字符串。

    • fields.IntegerField

      将提交的数据强制转换成为整数,并在模板上渲染成为一个数字类型的输入框。

    • fileds.FloatField

      将提交的数据强制转换成为浮点数,并在模板上渲染成为一个数字类型的输入框。

    • fields.StringField

      普通的文本输入框,会将输入的内容强制转换成为 String 类型对象。

    • fields.RadioField

      代表一组单项选择框,接收一个 choices 参数,该参数需要传入一个以 Tuple 为元素的 List 类型对象,这些参数值表示了显示的选项和返回值。

    • fields.SelectField/fields.SelectMultipleField

  • 校验器

    一般能够通过其命名来得知其作用,所有的检验器都能够接收一个 message 参数,该参数表示了输入的数据没有通过验证时,返回的错误信息。

    • validators.DataRequired()

    • validators.Email()

    • validators.Length(min=-1, max=-1)

    • validators.NumberRange(min=None, max=None)

    • validators.Optional()

    • validators.Regexp(regex)

    • validators.URL()

    • 自定义校验器

      自定义检验器所需要做的事情就是:实现一个函数 接收表单对象和字段对象作为参数 当没有通过验证时,触发一个 wtform.VaildationError 异常

      例如

import re

from wtforms import ValidationError

def custom_email(form_object, field_object):
    """Define a vaildator"""

    if not re.match(r"[^@+@[^@]+\.[^@]]+", field_object.data):
        raise ValidationError('Field must be a valid email address.')
基本使用
  • 配置密钥

自己生成密钥

cat /dev/urandom | tr -cd 'a-f0-9' | head -c 32

将生成的密钥写入配置项

SECRET_KEY = '生成的密钥'
  • 用法

    • 表单类需要继承 Flask WTF 扩展提供的 Form 类
    • 表单类中的一个类属性,就代表了一个字段,即输入框。wtforms 提供了多种类型的字段类
    • 字段类的第一个参数为输入框标题,第二个参数为绑定到该字段的检验器列表,由 wtforms.validators 提供

    app/form/forms.py

from flask_wtf import Form
from wtforms import StringField, TextField
from wtforms.validators import DataRequired, Length


class CommentForm(Form):
    """Form vaildator for comment."""

    # Set some field(InputBox) for enter the data.
    # patam validators: setup list of validators
    name = StringField(
        'Name',
        validators=[DataRequired(), Length(max=255)])

    text = TextField(u'Comment', validators=[DataRequired()])

app/__init__.py

import wt_forms
完善博客列表的视图和模版以及命名修改
  • 主要内容
    • 通过点亮tag来筛选博客
    • article.html修改为article_list.html其他也都类似

编写博客页面

页面包含了博客标题、内容, 评论数据, tag数据, 添加评论的功能

编写视图

app/blog/views.py

@blog.route('/article/<int:id>', methods=['GET','POST'])
def article(id):

    form = CommentForm()
    if form.validate_on_submit():
        new_comment = BlogComment()
        new_comment.name=form.name.data
        new_comment.content=form.content.data
        new_comment.time=datetime.datetime.now()
        new_comment.article_id=id
        db.session.add(new_comment)
        db.session.commit()

    article = BlogArticle.query.get_or_404(id)
    tag_list = [article.tag]
    comment_list = BlogComment.query.filter(BlogComment.article_id==id).all()
    
    recent_article_list, top_tag_list = sidebar_data()

    return render_template('blog/article.html',
                            article=article,
                            form=form,
                            tag_list=tag_list,
                            comment_list=comment_list,
                            recent_article_list=recent_article_list,
                            top_tag_list=top_tag_list)

app/templates/article.html

{% extends "blog/base.html" %}
{% block title %}Onekki Blog{% endblock %}
{% block body %}
    <h3>{{ article.title }}</h3>
    {{ article.publish_time }}:
    {{ article.content }}

    <div class="col-lg-12">
        <h3>New Comment</h3>
        <form method="POST" action="{{ url_for('blog.article', id=article.id) }}">
            {{ form.hidden_tag() }}
            <div>
                {{ form.name.label }}
                {% if form.name.errors %}
                    {% for e in form.name.errors %}
                        <p class="help-block">{{ e }}</p>
                    {% endfor %}
                {% endif %}
                {{ form.name(class_="form-control") }}
            </div>
            <div class="form-group">
                {{ form.content.label }}
                {% if form.content.errors %}
                    {% for e in form.content.errors %}
                        <p class="help-block">{{ e }}</p>
                    {% endfor %}
                {% endif %}
                {{ form.content(class_='form-control') }}
            </div>
            <input class="btn btn-primary" type="submit" value="Add Comment">
        </form>
    </div>
    <div class="row">
        <h5>Tags</h5>
        <ul>
            {% for tag in tag_list %}
                <li><a href="{{ url_for('blog.article_list', page=1, tag_id=tag.id) }}">{{ tag.name }}</a></li>
            {% endfor %}
        </ul>
    </div>
    <div class="row">
        <h5>Comments</h5>
        <ul>
            {% for comment in comment_list %}
                <div>
                    name:{{ comment.name }} <br/>
                    content:{{ comment.content }}
                </div>
            {% endfor %}
        </ul>
    </div>
{% endblock %}

第四阶段

用工厂模式生成应用对象

  • 工厂模式

通过某个接口或对对象来创建另一个对戏, 这个接口函数被称之为工厂函数
工厂模式可以推迟到在程序运行的时候才动态决定要创建哪个类的实例,而不是在编译时就必须知道要实例化哪个类

  • 工厂函数

一个用于创建对象的接口(create_object_interface(variables)), 让子类来决定(根据不同 variables 作为条件来判断)实例化那一个类的对象

例如

#!/usr/bin/env python
# -*- coding: utf-8 -*-

class Circle(object):
    def draw(self):
        print 'draw circle'

class Rectangle(object):
    def draw(self):
        print 'draw Rectangle'

class ShapeFactory(object):
    def create(self, shape):
        if shape == 'Circle':
            return Circle()
        elif shape == 'Rectangle':
            return Rectangle()
        else:
            return None

fac = ShapeFactory()
obj = fac.create('Circle')
obj.draw()

TODO

Bcrypt密文存储账户信息

  • 与普通的哈希加密方式, 更不容易被暴力破解
安装配置Flask Bcrypy
bcrypt = Bcrypt()
bcrypt.init_app(app)

app/db/models.py

class BlogUser(db.Model):
    __tablename__ = 'blog_user'

    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(255))
    password = db.Column(db.String(255))

    def __init__(self, name, password):
        self.name = name
        self.password = self.set_password(password)

    def __repr__(self):
        return self.name
    
    def set_password(self, password):
        return bcrypt.check_password_hash(self.password, password)
    
    def check_password(self, password):
        return bcrypt.check_password_hash(self.password, password)

app/form/forms.py

class LoginForm(FlaskForm):
    name = StringField('name', [DataRequired()], Length(max=255))
    password = PasswordField('password', [DataRequired()])
    def validate(self):
        is_valid = super(LoginForm, self).validate()
        
        # 输入是否合法
        if not is_valid:
            return False
        # 用户是否存在
        user = BlogUser.query.filter_by(name=self.name.data).first()
        if not user:
            self.name.errors.append('用户不存在')
            return False
        # 密码是否正确
        if not user.check_password(self.password.data):
            self.name.errors.append("密码错误")
            return False
        return True

实现注册表单与应用 reCAPTCHA 来实现验证码

reCAPTCHA

CMU 设计了一个名叫 reCAPTCHA 的强大系统,让他们的电脑去向人类求助。具体做法是:将 OCR 软件无法识别的文字扫描图传给世界各大网站,用以替换原来的验证码图片,这些网站的用户在正确识别出这些文字之后,其答案便会被传回CMU。所以 reCAPTCHA 本质上是一个披着验证码皮的分布式文字识别系统(OCR)
(卧槽, 长见识了)

  • 部署reCAPTCHA

    • 注册一个 Google 用户名
    • 进入到 reCAPTCHA 官网
    • 输入你的 blog 名(随意填写)和域名(只支持域名和子域名,现在我们暂时使用 localhost,等部署到线上之后也需要将新的域名填入),就会得到一个 Public Key,就可以把它用在你的 reCAPTCHA 插件上了,同时 reCAPTCHA 也支持多个站点
  • 客户端

写在模版里

<script src='https://www.google.com/recaptcha/api.js'></script>
<div class="g-recaptcha" data-sitekey="<your public key>"></div>
  • 服务端

在服务端请求

https://www.google.com/recaptcha/api/siteverify
  • 实际应用

    config.py

class BaseConfig(object):
    SQLALCHEMY_DATABASE_URI = MYSQL_DATABASE_URI
    # 查询跟踪,不太需要,False,不占用额外的内存
    SQLALCHEMY_TRACK_MODIFICATIONS = False
    JSON_AS_ASCII = False
    SECRET_KEY = 'CCCD756A-954C-4390-A38F-C018340769BD'
    RECAPTCHA_PUBLIC_KEY = "6LcqjqsUAAAAAItx4Fhs-bhG9L4rcXT4hbXooZTL"
    RECAPTCHA_PRIVATE_KEY = "6LcqjqsUAAAAAJlGpkoXjT1HY-nQ1lQLklwqUWug"
    @staticmethod
    def init_app(app):
        pass

app/form/forms.py

from flask_wtf import FlaskForm, RecaptchaField
from wtforms import StringField, TextField, IntegerField, PasswordField
from wtforms.validators import DataRequired, Length, EqualTo

from app.db.models import BlogUser

class RegisterForm(FlaskForm):
    name = StringField('name', [DataRequired(), Length(max=255)])
    password = PasswordField('password', [DataRequired(), Length(min=2)])
    confirm = PasswordField('password', [DataRequired(), EqualTo('password')])
    recaptcha = RecaptchaField()
    
    def validate(self):
        is_valid = not super(RegisterForm, self).validate()

        # 输入是否合法
        if not is_valid:
            return False
        
        # 用户是否存在
        user = BlogUser.query.filter_by(name=self.name.data).first()
        if user:
            self.name.errors.append('用户已经存在')
            return False
        return True

class LoginForm(FlaskForm):
    name = StringField('name', [DataRequired(), Length(max=255)])
    password = PasswordField('password', [DataRequired()])
    def validate(self):
        is_valid = super(LoginForm, self).validate()
        
        # 输入是否合法
        if not is_valid:
            return False
        # 用户是否存在
        user = BlogUser.query.filter_by(name=self.name.data).first()
        if not user:
            self.name.errors.append('用户不存在')
            return False
        # 密码是否正确
        if not user.check_password(self.password.data):
            self.name.errors.append("密码错误")
            return False
        return True
    

class CommentForm(FlaskForm):
    name = StringField(
        'Name',
        validators=[DataRequired(), Length(max=255)]
    )
    content = TextField(
        'Content',
        validators = [DataRequired()]
    )

app/db/models.py

# coding: utf-8
from app import db
from sqlalchemy.orm import relationship
from flask_sqlalchemy import SQLAlchemy
from flask_bcrypt import Bcrypt

bcrypt = Bcrypt()
db = SQLAlchemy()

def dump_datetime(value):
    if value is None:
        return None
    return [value.strftime("%Y-%m-%d"), value.strftime("%H:%M:%S")]

class BlogArticle(db.Model):
    __tablename__ = 'blog_article'

    id = db.Column(db.Integer, primary_key=True)
    title = db.Column(db.String(45))
    content = db.Column(db.Text)
    publish_time = db.Column(db.DateTime)
    user_id = db.Column(db.ForeignKey('blog_user.id'), index=True)
    tag_id = db.Column(db.ForeignKey('blog_tag.id'), index=True)

    tag = db.relationship('BlogTag', primaryjoin='BlogArticle.tag_id == BlogTag.id', backref='blog_articles')
    user = db.relationship('BlogUser', primaryjoin='BlogArticle.user_id == BlogUser.id', backref='blog_articles')
    
    def __repr__(self):
        return self.title
        
    @property
    def serialize(self):
        return {
            "id": self.id,
            "title": self.title,
            "content": self.content,
            "publish_time": dump_datetime(self.publish_time),
            "user_id": self.user_id,
            "tag_id": self.tag_id
        } 

class BlogComment(db.Model):
    __tablename__ = 'blog_comment'

    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(255))
    content = db.Column(db.Text)
    time = db.Column(db.DateTime)
    article_id = db.Column(db.ForeignKey('blog_article.id'), index=True)

    article = db.relationship('BlogArticle', primaryjoin='BlogComment.article_id == BlogArticle.id', backref='blog_comments')

    def __repr__(self):
        return self.name

class BlogTag(db.Model):
    __tablename__ = 'blog_tag'

    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(255))
   
    def __repr__(self):
        return self.name

class BlogUser(db.Model):
    __tablename__ = 'blog_user'

    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(255))
    password = db.Column(db.String(255))

    def __init__(self, name, password):
        self.name = name
        self.password = self.set_password(password)

    def __repr__(self):
        return self.name
    
    def set_password(self, password):
        return bcrypt.generate_password_hash(password)
    
    def check_password(self, password):
        return bcrypt.check_password_hash(self.password, password)

app/tempates/blog/base.html

<!DOCTYPE html>
<html>
    <head>
        <meta charset="utf-8">
        <title>{% block title %}{% endblock %}</title>
        <meta http-equiv="X-UA-Compatible" content="IE=edge">
        <meta name="viewport" content="width=device-width,, initial-scale=1">
        <!-- 全局引入bootstrap css -->
        <link rel="stylesheet" type="text/css" href="{{ url_for('static', filename='bootstrap/css/bootstrap.min.css') }}">
        <!-- <link rel="stylesheet" type="text/css" href="{{ url_for('static', filename='css/test.css') }}"> -->
        <!-- 宏定义用于生成 Bootstrap 风格的分页链接列表 -->
        {% macro render_pagination(endpoint, pagination,  tag_id) %}
            <nav>
                <ul class="pagination">
                    <li class="page-item">
                        {% if pagination.has_prev %}
                            <a class="page-link" href="{{ url_for(endpoint, page=pagination.prev().page, tag_id=tag_id) }}" aria-label="Previous">
                        {% else %}
                            <a class="page-link" href="{{ url_for(endpoint, page=1, tag_id=tag_id) }}" aria-label="Previous">
                        {% endif %}
                            <span aria-hidden="true">«</span>
                        </a>
                    </li>
                    {% for page in pagination.iter_pages() %}
                        {% if page %}
                             {% if page!=pagination.page %}
                                <li class="page-item">
                                    <a class="page-link" href="{{ url_for(endpoint, page=page, tag_id=tag_id) }}">
                                        {{page}}
                                    </a>
                                </li>
                            {% else %}
                                <li class="page-item active"><a class="page-link" href="">{{page}}</a></li>
                            {% endif %}
                        {% else %}
                            <li class="page-item"><a class="page-link">...</a></li>
                        {% endif %}
                    {% endfor %}
                    <li class="page-item">
                        <a class="page-link" href="{{ url_for(endpoint, page=pagination.next().page, tag_id=tag_id) }}" aria-label="Next">
                            <span aria-hidden="true">»</span>
                        </a>
                    </li>
                </ul>
            </nav>
        {% endmacro %}
        <!-- 添加对reCAPTCHA的支持 -->
        {% block captcha %}
        {% endblock %}
    </head>
    <body>
        <div class="container">
            <div class="jumbotron">
                <h1><a href="{{ url_for('blog.article_list', page=1, tag_id=tag_id) }}"></a>Onekki's Blog</a></h1>
                <p>Welcom to onekki's blog!</p>
            </div>
            {% with message_list = get_flashed_messages(with_categories=true) %}
                {% if message %}
                    {% for category, message in message_list %}
                        <div class="button close" data-dismiss="alert" aria-label="Close">
                            <button type="button" class="close" data-dismiss="alert" aria-label="Close">
                                <span aria-hidden="true">&times;</span>
                            </button>
                            {{ message_list }}
                        </div>
                    {% endfor %}
                {% endif %}
                
            {% endwith %}
            {% block body %}
                body_content
            {% endblock %}
        </div>
        <!-- 全局引入jquery -->
        <script src="{{  url_for('static', filename='js/jquery-3.4.1.min.js') }}"></script>
        <script src="{{  url_for('static', filename='js/jquery-1.11.3.min.js') }}"></script>
        <!-- 全局引入bootstrap js -->
        <script src="{{ url_for('static', filename='bootstrap/js/bootstrap.min.js') }}"></script>
        <!-- <script src="{{ url_for('static', filename='js/bootstrap.js') }}"></script> -->
    </body>
</html>

app/templates/login.html

{% extends "blog/base.html" %}
{% block body %}
    <div class="col-lg-3">
        <form method="POST" action="{{ url_for('blog.login') }}">
            {{ form.hidden_tag() }}
            <div>
                {{ form.name.label }}
                {% if form.name.errors %}
                    {% for error in form.name.errors %}
                        <p class="help-block">{{ error }}</p>
                    {% endfor %}
                {% endif %}
                {{ form.name(class_="form-control") }}
            </div>
            <div>
                {{ form.password.label }}
                {% if form.password.errors %}
                    {% for error in form.password.errors %}
                        <p class="help-block">{{ error }}</p>
                    {% endfor %}
                {% endif %}
                {{ form.password(class_="form-control") }}
            </div>
            <input type="submit" class="btn btn-primary" value="Login" >
        </form>
    </div>
{% endblock %}

app/templates/register.html

{% extends "blog/base.html" %}
{% block title %}
    Register
{% endblock %}
{% block captcha %}
    <script src="https://www.google.com/recaptcha/api.js" async defer></script>
{% endblock %}

{% block body %}
    <div class="col-lg-3">
        <form method="POST" action="{{ url_for('blog.register') }}">
            {{ form.hidden_tag() }}
            <div>
                {{ form.name.label }}
                {{ form.name(class_="form-control")}}
            </div>
            <div class="form-group">
                {{ form.password.label }}
                {{ form.password(class_="form-control") }}
            </div>
            <div class="form-group">
                {{ form.confirm.label }}
                {{ form.confirm(class_="form-control") }}
            </div>
            <input type="submit" class="btn btn-primary" value="Register">
            <div class="g-recaptcha" data-sitekey="6LcqjqsUAAAAAItx4Fhs-bhG9L4rcXT4hbXooZTL"></div>
        </form>
    </div>
{% endblock %}

新增文章、修改文章

  • 新增文章 修改文章

    app/form/forms.py

class ArticleForm(FlaskForm):
    title = StringField('title', [DataRequired(), Length(max=255)])
    content = TextAreaField('content', [DataRequired()])

app/blog/views.py

# 新增文章
@blog.route('/article_add', methods=['GET', 'POST'])
def article_add():
    form = ArticleForm()
    if form.validate_on_submit():
        new_article = BlogArticle()
        new_article.title = form.title.data
        new_article.content = form.content.data
        new_article.publish_time = datetime.datetime.now()

        db.session.add(new_article)
        db.session.commit()
        return redirect(url_for('blog.article_list', page=1))

    return render_template('blog/article_add.html', form=form)

# 修改文章
@blog.route('/article_update/<int:id>', methods=['GET', 'POST'])
def article_update(id):
    article = BlogArticle.query.get_or_404(id)
    form = ArticleForm()
    if form.validate_on_submit():
        article.title = form.title.data
        article.content = form.content.data
        article.publish_time = datetime.datetime.now()

        db.session.add(article)
        db.session.commit()
        return redirect(url_for('blog.article', id=article.id))
    
    form.title.data = article.title
    form.content.data = article.content
    return render_template('blog/article_update.html', form=form, article=article)

包括ckeditor.js 富文本编辑器

app/templates/article_add.html

{% extends "blog/base.html" %}

{% block title %} New Article {% endblock %}

{% block body %}
    <div class="row">
        <h1 class="text-center">Create A New Article</h1>
        <form method="POST" action="{{ url_for('blog.article_add') }}">
            {{ form.hidden_tag() }}
            <div class="form-group">
                {% if form.title.errors %}
                    {% for e in form.title.errors %}
                        <pa class="help-block">{{ e }}</pa>
                    {% endfor %}
                {% endif %}
                {{ form.title(class_="form-control") }}
            </div>
            <div class="form-group">
                {{ form.content.label }}
                {% if form.content.errors %}
                    {% for e in form.content.errors %}
                        <p class="help-block">{{ e }}</p>
                    {% endfor %}
                {% endif %}
                {{ form.content(class_="form-control", id="editor", rows="10", cols="80") }}
            </div>
            <input class="btn btn-primary" type="submit" value="Submit">
        </form>
    </div>
{% endblock %}

{% block js %}
    <script src="{{ url_for('static', filename='ckeditor/ckeditor.js') }}"></script>
    <script>
        CKEDITOR.replace('editor');
    </script>
{% endblock js %}

app/templates/article_update.html

{% extends 'blog/base.html' %}

{% block title %} Edit Article {% endblock %}

{% block body %}
    <div class="row">
        <h1 class="text-center">Edit the Article</h1>
        <form method="POST" action="{{ url_for('blog.article_update', id=article.id) }}">
            {{ form.hidden_tag() }}
            <div class="form-group">
                {{ form.title.label }}
                {% if form.title.errors %}
                    {% for e in form.title.errors %}
                        <p class="help-block">{{ e }}</p>
                    {% endfor %}
                {% endif %}
                {{ form.title(class_="form-control", value=article.title) }}                
            </div>
            <div class="form-group">
                {{ form.content.label }}
                {% if form.content.errors %}
                    {% for e in form.content.errors %}
                        <p class="help-block">{{ e }}</p>
                    {% endfor %}
                {% endif %}
                {{ form.content(class_="form-control", id="editor", rows="10", cols="80") }}              
            </div>
            <input class="btn btn-primary" type="submit" value="Submit">
        </form>
    </div>
{% endblock body %} 

{% block js %}
    <script src="{{ url_for('static', filename='ckeditor/ckeditor.js') }}"></script>
    <script>
        CKEDITOR.replace('editor');
    </script>
{% endblock %}

article.html

...
<h3>{{ article.title }}</h3>

    <div class="row">
        <div class="col-lg-6">
            <p>Written By {{ article.user_id }} on {{ article.publish_time }}</p>
            <p>{{ article.content|safe }}</p>
        </div> 
        <div class="column">
            <div class="col-lg-2">
                <a class="btn btn-primary" href="{{ url_for('blog.article_add') }}">New</a>
            </div>
        </div>
        <div class="column">
            <div class="col-lg-2">
                <a class="btn btn-primary" href="{{ url_for('blog.article_update', id=article.id) }}">Edit</a>
            </div>
        </div>
    </div>

    <div class="col-lg-12">
...

第五阶段

使用flask-login完成用户登录信息的保存

  • 配置

    app/plugin/login_manager.py

from flask_login import LoginManager
login_manager = LoginManager()

login_manager.login_view = "blog.login"
login_manager.session_protection = "strong"
login_manager.login_message = "Please login to access this page."
login_manager.login_message_category = "info"


@login_manager.user_loader
def user_loader(id):
    """Load the user's info."""

    from app.database.models import BlogUser
    return BlogUser.query.filter_by(id=id).first()

注意

期间修改了数据库表的结构

  • 把tag和article改成多对多
  • 把user和role改成多对多

重构了models目录

修改了插件引入方式

现在的目录结构为:

.
├── LICENSE
├── Pipfile
├── Pipfile.lock
├── README.md
├── app
│   ├── __init__.py
│   ├── admin
│   │   ├── __init__.py
│   │   └── views.py
│   ├── blog
│   │   ├── __init__.py
│   │   └── views.py
│   ├── database
│   │   ├── __init__.py
│   │   │   └── __init__.cpython-37.pyc
│   │   └── models
│   │       ├── __init__.py
│   │       ├── blog_article.py
│   │       ├── blog_comment.py
│   │       ├── blog_role.py
│   │       ├── blog_tag.py
│   │       ├── blog_user.py
│   │       └── many2many.py
│   ├── form
│   │   ├── __init__.py
│   │   ├── blog_article.py
│   │   ├── blog_comment.py
│   │   ├── blog_login.py
│   │   └── blog_register.py
│   ├── plugin
│   │   ├── __init__.py
│   │   ├── bcrypt.py
│   │   ├── login_manager.py
│   │   ├── principal.py
│   │   └── sqlalchemy.py
│   ├── site
│   │   ├── __init__.py
│   │   └── views.py
│   ├── static
│   │   ├── bootstrap
│   │   │   ├── css
│   │   │   │   ├── ...
│   │   │   └── js
│   │   │       ├── ...
│   │   ├── ckeditor
│   │   │   ├── ...
│   │   ├── css
│   │   │   └── test.css
│   │   └── js
│   │       ├── jquery-1.11.3.min.js
│   │       └── jquery-3.4.1.min.js
│   ├── templates
│   │   └── blog
│   │       ├── article.html
│   │       ├── article_add.html
│   │       ├── article_list.html
│   │       ├── article_update.html
│   │       ├── base.html
│   │       ├── comment.html
│   │       ├── index.html
│   │       ├── login.html
│   │       ├── register.html
│   │       ├── tag.html
│   │       └── user.html
│   └── utils
├── config.py
├── gen_models.py
└── manager.py
  • 配置模型

    app/database/models/BlogUser.py

# coding: utf-8
from flask_login import AnonymousUserMixin
from app import db, bcrypt


class BlogUser(db.Model):
    __tablename__ = 'blog_user'
    __table_args__ = {'schema': 'onekki_site'}

    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(255), unique=True)
    password = db.Column(db.String(255))

    roles = db.relationship('BlogRole', secondary='onekki_site.blog_user_role', backref=db.backref('blog_users', lazy='dynamic'))

    # 自定义
    def __init__(self, name, password):
        self.name = name
        self.password = self.set_password(password)

        defalut = db.session.query('BlogRole').filter_by(name="default").one()
        self.roles.append(defalut)
    
    def __repr__(self):
        return "<BlogUser:`{}`>".format(self.name)
    
    def set_password(self, password):
        return bcrypt.generate_password_hash(password)
    
    def check_password(self, password):
        return bcrypt.check_password_hash(self.password, password)
    
    def is_authenticated(self):
        if isinstance(self, AnonymousUserMixin):
            return False
        else:
            return True

    def is_active(self):
        return True
    
    def is_anonymous(self):
        if isinstance(self, AnonymousUserMixin):
            return True
        else:
            return False
    
    def get_id(self):
        return self.id

app/blog/views.py

# 登录
@blog.route('/login', methods=['GET', 'POST'])
def login():
    form = LoginForm()
    if form.validate_on_submit():
        user = BlogUser.query.filter_by(name=form.name.data).one()

        login_user(user, remember=form.remember.data)

        flash("登录成功", category="success")
        return redirect(url_for('blog.article_list', page=1))
    return render_template('blog/login.html',form=form)

# 新增文章
@blog.route('/article_add', methods=['GET', 'POST'])
@login_required
def article_add():
  ...
# 修改文章
@blog.route('/article_update/<int:id>', methods=['GET', 'POST'])
@login_required

使用flask-principal验证用户的权限

  • 配置插件

    app/plugin/principal.py

from flask_principal import Principal, Permission, RoleNeed

principal = Principal()

permission_admin = Permission(RoleNeed('admin'))
permission_poster = Permission(RoleNeed('poster'))
permission_default = Permission(RoleNeed('default'))
  • 使用

    app/__init__.py

def create_app(config_name):
   
    ...
    
    principal.init_app(app)
    
    @identity_loaded.connect_via(app)
    def on_identity_loaded(sender, identity):
        identity.user = current_user
        if hasattr(current_user, 'id'):
            identity.provides.add(UserNeed(current_user.id))
        if hasattr(current_user, 'roles'):
            for role in current_user.roles:
                identity.provides.add(RoleNeed(role.name))
        ...
    return app

app/blog/views.py

# 登录
@blog.route('/login', methods=['GET', 'POST'])
def login():
    form = LoginForm()
    if form.validate_on_submit():
        user = BlogUser.query.filter_by(name=form.name.data).one()

        login_user(user, remember=form.remember.data)
        identity_changed.send(
            current_app._get_current_object(), identity=Identity(user.id)
        )

        flash("登录成功", category="success")
        return redirect(url_for('blog.article_list', page=1))
    return render_template('blog/login.html',form=form)

# 登出
@blog.route('/logout', methods=['GET', 'POST'])
def logout():
    logout_user()
    identity_changed.send(
        current_app._get_current_object(), identity=AnonymousIdentity()
    )
    flash("退出登录成功", category="success")
    return redirect(url_for('blog.login'))
  
  
# 修改文章
@blog.route('/article_update/<int:id>', methods=['GET', 'POST'])
@login_required
@permission_poster.require(http_exception=403)
def article_update(id):
    article = BlogArticle.query.get_or_404(id)

    if not current_user:
        return redirect(url_for('blog.login'))

    if current_user != article.user:
        return redirect(url_for('blog.article', id=id))
    
    permission = Permission(UserNeed(article.user.id))
    if permission.can() or permission_admin.can():
        form = ArticleForm()
        if form.validate_on_submit():
            article.title = form.title.data
            article.content = form.content.data
            article.publish_time = datetime.datetime.now()

            db.session.add(article)
            db.session.commit()
            return redirect(url_for('blog.article', id=article.id))
    else:
        abort(403)
    
    form.title.data = article.title
    form.content.data = article.content
    return render_template('blog/article_update.html', form=form, article=article)

gen_models.py

# coding: utf-8
from sqlalchemy import Column, DateTime, ForeignKey, Integer, String, Table, Text
from sqlalchemy.orm import relationship
from flask_sqlalchemy import SQLAlchemy


db = SQLAlchemy()


class BlogArticle(db.Model):
    __tablename__ = 'blog_article'
    __table_args__ = {'schema': 'onekki_site'}

    id = db.Column(db.Integer, primary_key=True)
    title = db.Column(db.String(45))
    content = db.Column(db.Text)
    publish_time = db.Column(db.DateTime)
    user_id = db.Column(db.ForeignKey('onekki_site.blog_user.id'), index=True)

    user = db.relationship('BlogUser', primaryjoin='BlogArticle.user_id == BlogUser.id', backref='blog_articles')
    tags = db.relationship('BlogTag', secondary='onekki_site.blog_article_tag', backref='blog_articles')


t_blog_article_tag = db.Table(
    'blog_article_tag',
    db.Column('article_id', db.ForeignKey('onekki_site.blog_article.id'), primary_key=True, nullable=False),
    db.Column('tag_id', db.ForeignKey('onekki_site.blog_tag.id'), primary_key=True, nullable=False, index=True),
    schema='onekki_site'
)


class BlogComment(db.Model):
    __tablename__ = 'blog_comment'
    __table_args__ = {'schema': 'onekki_site'}

    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(255))
    content = db.Column(db.Text)
    time = db.Column(db.DateTime)
    article_id = db.Column(db.ForeignKey('onekki_site.blog_article.id'), index=True)

    article = db.relationship('BlogArticle', primaryjoin='BlogComment.article_id == BlogArticle.id', backref='blog_comments')


class BlogRole(db.Model):
    __tablename__ = 'blog_role'
    __table_args__ = {'schema': 'onekki_site'}

    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(255), unique=True)
    description = db.Column(db.String(255))

    users = db.relationship('BlogUser', secondary='onekki_site.blog_user_role', backref='blog_roles')


class BlogTag(db.Model):
    __tablename__ = 'blog_tag'
    __table_args__ = {'schema': 'onekki_site'}

    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(255))


class BlogUser(db.Model):
    __tablename__ = 'blog_user'
    __table_args__ = {'schema': 'onekki_site'}

    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(255), unique=True)
    password = db.Column(db.String(255))


t_blog_user_role = db.Table(
    'blog_user_role',
    db.Column('user_id', db.ForeignKey('onekki_site.blog_user.id'), primary_key=True, nullable=False),
    db.Column('role_id', db.ForeignKey('onekki_site.blog_role.id'), primary_key=True, nullable=False, index=True),
    schema='onekki_site'
)

Flask-Celery-Helper 实现异步任务

  • 文件上传、邮件发送等
  • 要和消息队列配合使用
  • 这里采用redis

安装redis

  • 我选择在ubuntu服务器上安装了redius

配置方法网上都有

配置celery

config.py

class DevelopmentConfig(BaseConfig):
    DEBUG = True
    CELERY_RESULT_BACKEND = 'redis://39.107.230.35:6379/0'
    CELERY_BROKER_URL = 'redis://39.107.230.35:6379/0'
    CACHE_TYPE = 'simple'
    pass
  • 在app中注册celery对象
  • 要知道 celery application和flask application是两个进程
  • flask支持将context注入到其他application中,这样其他application就可以访问flask application里的内容

用工厂模式创建celery对象

celery_runner.py

import os
from celery import Celery, Task
from app import create_app

def make_celery(app):
    celery = Celery(
        app.import_name,
        backend=app.config['CELERY_RESULT_BACKEND'],
        broker=app.config['CELERY_BROKER_URL']
    )

    celery.conf.update(app.config)
    TaskBase = celery.Task

    class ContextTask(TaskBase):
        abstract = True

        def __call__(self, *args, **kwargs):
            with app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)
    celery.Task = ContextTask
    return celery

flask_app = create_app(os.getenv('FLASK_CONFIG') or 'default')
celery = make_celery(flask_app)
  • 数据库新建表

    app/database/models/blog_reminder.py

from app import db

class BlogReminder(db.Model):
    __tablename__ = 'blog_reminder'
    __table_args__ = {'schema': 'onekki_site'}

    id = db.Column(db.Integer, primary_key=True)
    time = db.Column(db.DateTime)
    email = db.Column(db.String(255))
    content = db.Column(db.Text)
  • 创建tasks

    app/tasks/blog_task.py

import smtplib
from datetime import datetime
from email.mime.text import MIMEText

from flask_mail import Message

from app.plugins import celery, mail
from app.database.models import BlogReminder

@celery.task(
    bind=True,
    ignore_result=True,
    default_retry_delay=300,
    max_retries=5
)
def remind(self, primary_key):
    reminder = BlogReminder.query.get(primary_key)
    
    msg = MIMEText(remind.content)
    msg['Subject'] = "Welcome!"
    msg['From'] = "<your email>"
    msg['To'] = reminder.email
    try:
        smtp_server = smtplib.SMTP('39.107.230.35')
        smtp_server.starttls()
        smtp_server.login("<user>", "<password>")
        smtp_server.sendmail(
            "<your_email>", [reminder.email], msg.as_string()
        )
        smtp_server.close()
        return
    except Exception as e:
        self.rety(exc=e)

def on_reminder_save(mapper, connect, self):
    remind.apply_async(args=(self.id), eta=self.date)

回调函数on_reminder_save

  • 主程序监听数据库插入消息
from sqlalchemy import event
def create_app(object_name):
  ...
    event.listen(BlogReminder, 'after_insert', on_reminder_save)
  ...

用flask-cache实现网页缓存加速

  • 注册flask-cache到app

  • 在config中配置缓存类型

class DevConfig(Config):
    ...
  CACHE_TYPE = 'simple'
  • 缓存无参数的普通函数

    app/controllers/blog/views.py

@cache.cached(timeout=7200, key_prefix='sidebar_dada')
def sidebar_data():
    recent_article_list = BlogArticle.query.order_by(
        BlogArticle.publish_time.desc()
    ).limit(5).all()
    count = func.count(t_blog_article_tag.c.article_id).label('total')

    top_tag_list = db.session.query(
        BlogTag.id, BlogTag.name, count
    ).filter(
        BlogTag.id==t_blog_article_tag.c.tag_id
    ).group_by(BlogTag.id).order_by(count.desc()).limit(5).all()

    return recent_article_list, top_tag_list
  • 缓存带参数的普通函数(我没写)
@staticmethod
    @cache.memoize(60)
    def verify_auth_token(token):
        """Validate the token whether is night."""

        serializer = Serializer(
            current_app.config['SECRET_KEY'])
        try:
            # serializer object already has tokens in itself and wait for 
            # compare with token from HTTP Request /api/posts Method `POST`.
            data = serializer.loads(token)
        except SignatureExpired:
            return None
        except BadSignature:
            return None

        user = User.query.filter_by(id=data['id']).first()
        return user
  • 缓存无动态参数的视图函数
@blog.route('/article_list/<int:page>')
@cache.cached(timeout=60)
def article_list(page=1):
    filters = []
    tag_id = request.values.get('tag_id')
    if tag_id != None:
        filters.append(t_blog_article_tag.c.tag_id==tag_id)
        ...
  • 缓存带动态参数的视图函数
# 获取文章
@blog.route('/article/<int:id>', methods=['GET','POST'])
@cache.cached(timeout=60, key_prefix=make_cache_key)
def article(id):

    form = CommentForm()
    ...

flask-admin实现后台管理

  • 注册flask-admin

  • BaseView基础管理页面

  • ModelView模型管理页面

  • 文章修改增强页面(为TextArea注入class)

  • 文件管理

  • 权限安全

    app/controllers/views.py

# 获取蓝图
# from app.admin import admin
# # 获取数据库模型对象和SQLAlchemy对象db,注意不可使用App模块中的db
# # from app.admin.models import *

# # 设置路由
# @admin.route('/')
# def index():
#     return 'admin首页'

from flask_admin import BaseView, expose


class CustomView(BaseView):
    @expose('/')
    def index(self):
        return self.render('admin/custom.html')
    
    @expose('/second_page')
    def second_page(self):
        return self.render('admin/second_page.html')

from flask_admin.contrib.sqla import ModelView

class CustomModelView(ModelView):
    pass

from app.forms import CKTextAreaField

class ArticleView(CustomModelView):
    form_overrides = dict(content=CKTextAreaField)

    column_searchable_list = ('content', 'title')

    column_filters = ('publish_time',)

    create_template = 'admin/article_update.html'

    edit_template = 'admin/article_update.html'
   
from flask_admin.contrib.fileadmin import FileAdmin

class CustomFileView(FileAdmin):
    pass

其中ArticleView要在forms新建widgets

app/forms/blog/blog_admin.py

from wtforms import (
    widgets,
    StringField,
    TextField,
    TextAreaField,
    PasswordField,
    BooleanField,
    ValidationError
)

class CKTextAreaWidget(widgets.TextArea):
    def __call__(self, field, **kwargs):
        kwargs.setdefault('class_', 'ckeditor')
        return super(CKTextAreaWidget, self).__call__(field, **kwargs)

class CKTextAreaField(TextAreaField):
    widget = CKTextAreaWidget()

app/__init__.py

def create_app(config_name):
   admin.init_app(app)
    admin.add_view(CustomView(name='Custom'))

    model_list = [BlogComment, BlogReminder, BlogRole, BlogTag, BlogUser]
    for model in model_list:
        admin.add_view(
            CustomModelView(model, db.session, category='Models')
        )
    admin.add_view(ArticleView(BlogArticle, db.session, name='EditArticle'))
    
    admin.add_view(
      CustomFileView(os.path.join(os.path.dirname(__file__), 'static'),
      '/static', 
      name='Static Files')
    )

custom.html

{% extends 'admin/master.html' %}
{% block body %}
  This is the custom view!
  <a href="{{ url_for('customview.second_page') }}">Link</a>
{% endblock %}

article_edit.html

{% extends 'admin/model/edit.html' %}
{% block tail %}
{{ super() }}
<script src="//cdn.ckeditor.com/4.4.7/standard/ckeditor.js"></script>
{% endblock %}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,658评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,482评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,213评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,395评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,487评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,523评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,525评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,300评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,753评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,048评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,223评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,905评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,541评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,168评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,417评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,094评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,088评论 2 352

推荐阅读更多精彩内容