Flask token配置与 用户分组(三)

Token的配置

生成Token: TimedJSONWebSignatureSerializer进行序列化

from itsdangerous import TimedJSONWebSignatureSerializer as Serializer

from flask import current_app, jsonify
from itsdangerous import TimedJSONWebSignatureSerializer as Serializer

from app.libs.enums import ClientTypeEnum
from app.libs.redprint import RedPrint
from app.models.user import User
from app.validators.forms import ClientForm

api = RedPrint('token')


@api.route('', methods=['POST'])
def get_token():
    form = ClientForm().validate_for_api()
    promise = {
        ClientTypeEnum.USER_EMAIL: User.verify
    }
    identify = promise[ClientTypeEnum(form.type.data)](
        form.account.data,
        form.secret.data
    )
    # 生成令牌
    expiration = current_app.config['TOKEN_EXPIRATION']
    token = generate_auth_token(identify['uid'],
                                form.type.data,
                                identify['scope'],
                                expiration)

    # 注意需要进行 ascii 编码
    t = {
        'token': token.decode('ascii')
    }
    return jsonify(t, 201)


def generate_auth_token(uid, ac_type, scope=None, expiration=7200):
    """
    生成token,将用户的id,作用域,用户类型,过期时间写入token
    :param uid: 用户id
    :param ac_type: 用户类型
    :param scope: 权限域
    :param expiration: 过期时间 秒
    :return:
    """
    s = Serializer(current_app.config['SECRET_KEY'],
                   expires_in=expiration,
                   )
    return s.dumps({'uid': uid,
                    'type': ac_type.value,
                    'scope': scope
                    })

Token的验证,在用户调取接口等相关的操作的时候,对token进行验证的操作,传递token是在http的请求头中 Authorization 字段中

#使用示例,注意必须放到放到方法名上
@api.route('', methods=['GET'])
@auth.login_required
def get_user():
    uid = g.user.uid
    user = User.query.filter_by(id=uid).first_or_404()
    return jsonify(user)


#验证相关代码
from collections import namedtuple
from flask import current_app, g, request
from flask_httpauth import HTTPBasicAuth
from itsdangerous import TimedJSONWebSignatureSerializer as Serializer, BadSignature, SignatureExpired
from app.libs.error_code import AuthFailed, Forbidden
from app.libs.scope import is_in_scope

auth = HTTPBasicAuth()
#使用nametuple生成类
User = namedtuple('User', ['uid', 'ac_type', 'scope'])

@auth.verify_password
def verify_password(token, password):
    # http的协议规范
    # key = Authorization
    # value = basic base64(rjl:111111)
    user_info = verify_auth_token(token)
    if not user_info:
        return False
    else:
        g.user = user_info
        return True

# 验证不通过,直接返回相关的信息,从token中获取用户相关的信息
def verify_auth_token(token):
    s = Serializer(current_app.config['SECRET_KEY'])
    try:
        data = s.loads(token)
    except BadSignature:
        raise AuthFailed(msg='token is invalid', error_code=1002)
    except SignatureExpired:
        raise AuthFailed(msg='token is expired', error_code=1003)
    uid = data['uid']
    ac_type = data['type']
    scope = data['scope']
    # request 视图函数
    allow = is_in_scope(scope, request.endpoint)
    if not allow:
        raise Forbidden()
    return User(uid, ac_type, scope)

用户对接口操作权限的判断

在用户登录操作的时候把用户的等级标志写入token中,根据token的信息,与当前用户的等级进行判断!

将相应用户可以访问的视图函数,存储起来,当用户请求的时候,查看是> 否在该作用域中。

定义作于域:

class Scope:
    # 视图函数级别的区分
    allow_api = []

    # 模块级别的定义区分
    allow_module = []

    # 排除
    forbidden_api = []

    def __add__(self, other):
        self.allow_api = self.allow_api + other.allow_api
        # set去重,再转为list
        self.allow_api = list(set(self.allow_api))

        self.allow_module = self.allow_module + other.allow_module
        self.allow_module = list(set(self.allow_module))

        return self

#定义管理权限组
class AdminScope(Scope):
    allow_module = [
        'v1.user'
    ]
    # def __init__(self):
    #     self + UserScope()

#普通用户组
class UserScope(Scope):
    allow_api = [
        'v1.user+get_user'
    ]

    def __init__(self):
        pass

#超级管理员组
class SuperScope(Scope):
    """
    超级管理员,权限相加
    """
    allow_api = [

    ]
    allow_module = [
        'v1.user'
    ]
    def __init__(self):
        self + AdminScope() + UserScope()


#验证权限
def is_in_scope(scope, endpoint):
    """
    根据视图函数名判断,该视图函数是否在 权限组中
    :param scope:
    :param endpoint:
    :return:
    """
    # 反射 ,注意endpoint的路径是带有 blueprint的路径的
    scope = globals()[scope]()
    # 查看定义 redprint的 位置
    splits = endpoint.split('+')
    red_name = splits[0]
    if endpoint in scope.forbidden_api:
        return False
    if endpoint in scope.allow_api:
        return True
    if red_name in scope.allow_module:
        return True
    else:
        return False

与token的数据进行校验

. . .
uid = data['uid']
ac_type = data['type']
scope = data['scope']
# request 视图函数
allow = is_in_scope(scope, request.endpoint)
if not allow:
    raise Forbidden()
. . .

tips:模块级别的校验,需要更改 redprint的 endpoint

class RedPrint:
def __init__(self, name):
    self.name = name
    self.mound = []

def route(self, rule, **options):
    def decorator(f):
        self.mound.append((f, rule, options))
        return f

    return decorator

def register(self, bp, url_prefix=None):
    """
    将 redprint注册到 blueprint,实际调用 blueprint代码
    """
    if url_prefix is None:
        url_prefix = '/' + self.name
    for f, rule, options in self.mound:
        # 自定义 endpoint
        endpoint = self.name + '+' + options.pop("endpoint", f.__name__)
        bp.add_url_rule(url_prefix + rule, endpoint, f, **options)
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • 国家电网公司企业标准(Q/GDW)- 面向对象的用电信息数据交换协议 - 报批稿:20170802 前言: 排版 ...
    庭说阅读 11,187评论 6 13
  • 22年12月更新:个人网站关停,如果仍旧对旧教程有兴趣参考 Github 的markdown内容[https://...
    tangyefei阅读 35,241评论 22 257
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 135,010评论 19 139
  • 日子一天一天过,看着2018年过了六分之一的时候,感觉一阵恐慌,什么事情都还没有做,时间流逝太快,再不明确和改变,...
    王少军_6de4阅读 106评论 0 1
  • 2018年6月 8日 周五 晴 我身体不舒服了两天,儿子就像变了一个小大人,今天上学要求他爸...
    张志成娘阅读 336评论 0 4