Django JWT认证实现

配置JWT认证

先通过 pip install djangorestframework 命令下载 Django REST framework 库,再通过 pip install djangorestframework-simplejwt 命令下载 Django REST framework Simple JWT 库。它们提供了 JWT 的 Django 应用。

配置与编码

settings.py 文件里加入以下内容,以支持 JWT 认证:

REST_FRAMEWORK = {
    'DEFAULT_AUTHENTICATION_CLASSES': [
        'rest_framework_simplejwt.authentication.JWTAuthentication'
    ],
}

在某个应用的 views.py 文件下,写一个测试用的视图。

from rest_framework.views import APIView
from rest_framework import permissions
from rest_framework.response import Response
from rest_framework_simplejwt import authentication

class AutoTestView(APIView):
    permission_classes = [permissions.IsAuthenticated]
    authentication_classes = (authentication.JWTAuthentication,)

    def get(self, request, *args, **kwargs):
        print('authenticate: ', request.successful_authenticator.authenticate(request))
        print('authenticate_header: ', request.successful_authenticator.authenticate_header(request))
        print('get_header: ', request.successful_authenticator.get_header(request))
        print('get_raw_token: ', request.successful_authenticator.get_raw_token(request.successful_authenticator.get_header(request)))
        print('get_validated_token: ', request.successful_authenticator.get_validated_token(request.successful_authenticator.get_raw_token(request.successful_authenticator.get_header(request))))
        print('get_user: ', request.successful_authenticator.get_user(request.successful_authenticator.get_validated_token(request.successful_authenticator.get_raw_token(request.successful_authenticator.get_header(request)))))
        print('www_authenticate_realm: ', request.successful_authenticator.www_authenticate_realm)
        return Response('O get K')

    def post(self, request, *args, **kwargs):
        return Response('O post K')

urls.py 文件下导入 JWT 的两个视图,以及我们的测试视图的路由:

...
from rest_framework_simplejwt.views import (TokenObtainPairView, TokenRefreshView)
from django.conf.urls import url
from foundation import views as foundation_views

urlpatterns = [
    ...
    url(r'^firmware/auth/token/obtain/$', TokenObtainPairView.as_view(), name='obtain_token'),
    url(r'^firmware/auth/token/refresh/$', TokenRefreshView.as_view(), name='refresh_token'),
    url(r'^firmware/auth/token/test/$', foundation_views.AutoTestView.as_view(), name='test_token'),
]

使用示例

获取 Token:

django_djangorestframeworksimplejwt_1.png

通过 Token 获取视图信息:

django_djangorestframeworksimplejwt_3.png

通过 refresh 刷新 Token:

django_djangorestframeworksimplejwt_2.png

自定义JWT认证

同样的,要先通过 pip install djangorestframework 命令下载 Django REST framework 库,不同的是,接下来要通过 pip install djangorestframework-jwt 命令下载 Django REST framework JWT 库。

配置认证库

settings.py 文件里加入 djangorestframeworkdjangorestframework-jwt 库的配置:

……
INSTALLED_APPS = [
    ……
    'rest_framework',
]
……
REST_FRAMEWORK = {
    'DEFAULT_AUTHENTICATION_CLASSES': (
        # 自定义 JWT Token 认证类
        'foundation.utils.jwt_customize.TokenAuth',
    ),
}
JWT_AUTH = {
    # 用户 Token 有效期
    'JWT_EXPIRATION_DELTA': datetime.timedelta(days=1),
    # Token 前缀
    'JWT_AUTH_HEADER_PREFIX': 'JWT',
    # 自定义响应信息
    'JWT_RESPONSE_PAYLOAD_HANDLER': 'foundation.utils.jwt_customize.jwt_response_payload_handler'
}
AUTHENTICATION_BACKENDS = [
    # 自定义 JWT 的企业 AD 域认证
    'foundation.utils.jwt_customize.UsernameAdAuthBackend',
]
……
# 企业 AD 域信息
AD_DOMAIN_INFO = {
    'ACTIVATE': True,
    'AD_SERVER': ['xxx.xx.xx.xxx'],
    'AD_SERVER_PORT': xxx,
    'AD_DN': 'xxx@xxx.local',
    'AD_PASSWORD': 'xxx',
}

自定义认证类

如上面的路径,创建一个 jwt_customize.py 文件,并编写三个自定义的认证类与方法:

from django.contrib.auth.backends import ModelBackend
from rest_framework.exceptions import AuthenticationFailed
from rest_framework_jwt.serializers import VerifyJSONWebTokenSerializer
from rest_framework import serializers
from x_atp_firmware.settings import JWT_AUTH
from django.contrib.auth.models import User, Group
from foundation.utils.ad_login import ldap_auth
from django.contrib.auth import get_user_model
from x_atp_firmware.settings import AD_DOMAIN_INFO

UserModel = get_user_model()

class TokenAuth:
    """
    自定义 JWT Token 认证类
    """
    @staticmethod
    def authenticate(request):
        """
        重写 authenticate
        """
        # 获取请求头的 Authorization 字段
        headers_token = request.headers.get('Authorization', None)
        # 校验 Authorization 是否符合规范
        if not headers_token:
            raise serializers.ValidationError({'Authorization': '该字段是必填项。'})
        elif headers_token.find(JWT_AUTH['JWT_AUTH_HEADER_PREFIX'] + ' ') == -1:
            raise serializers.ValidationError({'Token': '该字段不符合规范。'})
        # 提取 Authorization 中的 JWT Token 信息
        headers_token = headers_token.split(JWT_AUTH['JWT_AUTH_HEADER_PREFIX'] + ' ')[1]
        token = {'token': headers_token}
        # 调用默认的验证逻辑
        valid_data = VerifyJSONWebTokenSerializer().validate(token)
        user = valid_data['user']
        if user:
            # 返回用户名称与认证Token
            return user, valid_data['token']
        else:
            raise AuthenticationFailed('认证失败')

class UsernameAdAuthBackend(ModelBackend):
    """
    自定义企业 AD 域的登录验证
    """
    def authenticate(self, request, username=None, password=None, **kwargs):
        """
        重写 authenticate
        """
        if request and request.path_info == '/admin/login/':
            # 如果是来自Admin后台的请求,直接传递给默认认证函数
            try:
                user = User.objects.get(username=username)
                # 判断密码是否正确
                if not user.check_password(password):
                    return None
            except UserModel.DoesNotExist:
                return None
        elif not AD_DOMAIN_INFO['ACTIVATE']:
            # 如果未启用AD域登录,则跳过验证
            user = User.objects.get(username=username)
        else:
            login_res = ldap_auth(username, password)
            # 判断返回值是否正常的字典类型
            if isinstance(login_res, dict):
                # 判断是否AD域认证是否成功
                if login_res['result'] is True:
                    # 判断数据库中是否已存在用户组
                    if not Group.objects.filter(name=login_res['organization']['title']):
                        # 创建新用户组
                        group = Group.objects.create(name=login_res['organization']['title'])
                    else:
                        group = Group.objects.get(name=login_res['organization']['title'])
                    # 判断数据库中是否已存在用户信息
                    if not User.objects.filter(username=login_res['account']['s_am_account_name']):
                        # 创建新用户数据
                        user = User.objects.create_user(username=login_res['account']['s_am_account_name'],
                                                        email=login_res['account']['mail'],
                                                        last_name=login_res['account']['sn'],
                                                        first_name=login_res['account']['given_name'],
                                                        password=password)
                    else:
                        user = User.objects.get(username=username)
                    # 为登录用户设置用户组
                    user.groups.add(group)
                else:
                    raise serializers.ValidationError(detail='企业AD域异常: ' + str(login_res))
            elif not login_res:
                # 判断返回值是否为False
                raise serializers.ValidationError(detail='内部账户名称不存在')
            elif login_res == 'auth fail':
                # 判断返回值是否为`auth fail`
                raise serializers.ValidationError(detail='内部登录密码不正确')
            else:
                raise serializers.ValidationError(detail='企业AD域异常: ' + str(login_res))
        return user

def jwt_response_payload_handler(token, user=None, request=None):
    """
    自定义返回 Token 认证信息
    :param token: JWT 认证 Token
    :param user: 用户对象
    :param request: 请求对象
    :return: 认证信息
    """
    return {
        "token": token,
        'id': user.id,
        'username': user.username,
        'email': user.email,
        'exp': JWT_AUTH['JWT_EXPIRATION_DELTA']
    }

还需要创建一个 ad_login.py 文件,编写企业AD域的连接认证方法:

import logging
from ldap3 import Connection, SUBTREE, ServerPool
from x_atp_firmware.settings import AD_DOMAIN_INFO

# 域控服务器ip地址
LDAP_SERVER_POOL = AD_DOMAIN_INFO['AD_SERVER']
# 端口
LDAP_SERVER_PORT = AD_DOMAIN_INFO['AD_SERVER_PORT']
# 拥有查询权限的域账号
ADMIN_DN = AD_DOMAIN_INFO['AD_DN']
# 对应的密码
ADMIN_PASSWORD = AD_DOMAIN_INFO['AD_PASSWORD']
SEARCH_BASE = 'ou=OU,dc=LEEDARSON,dc=LOCAL'

def ldap_auth(username, password):
    """
    通过 AD 域认证并获取用户资料
    :param username: 用户AD账号
    :param password: 用户AD密码
    :return: 认证信息
    """
    ldap_server_pool = ServerPool(LDAP_SERVER_POOL)
    conn = Connection(ldap_server_pool, user=ADMIN_DN, password=ADMIN_PASSWORD,
                      check_names=True, lazy=False, raise_exceptions=False)
    logging.warning('x_atp_firmware.foundation.utils.ad_login.ldap_auth (AD域连接): ' + str(conn))
    conn.open()
    conn.bind()
    res = conn.search(
        search_base=SEARCH_BASE,
        # 查询所有用户
        search_filter='(sAMAccountName={})'.format(username),
        search_scope=SUBTREE,
        # sAMAccountName=账号,cn=用户中文名,sn=姓,givenName=名,mail=邮件
        # department=部门,manager=经理, title=头衔
        attributes=['cn', 'sn', 'ou', 'givenName', 'mail', 'sAMAccountName', 'department', 'manager', 'title',
                    'directReports'],
        # 使用`ALL_ATTRIBUTES`可以获取所有属性值
        # attributes=ALL_ATTRIBUTES,
        paged_size=5
    )
    if res:
        # 开始同步
        entry = conn.response[0]
        # dn包含了ou信息dc信息等,在做域验登录时可以作为验证账号
        _dn = entry['dn']
        attr_dict = entry['attributes']
        # 使用dn检查密码
        try:
            conn2 = Connection(ldap_server_pool, user=_dn, password=password,
                               check_names=True, lazy=False, raise_exceptions=False)
            conn2.bind()
            if conn2.result['description'] == 'success':
                res = {'result': True,
                       'account': {
                           's_am_account_name': attr_dict['sAMAccountName'],
                           'cn': attr_dict['cn'],
                           'sn': attr_dict['sn'],
                           'given_name': attr_dict['givenName'],
                           'mail': attr_dict['mail'],
                       },
                       'organization': {
                           'title': attr_dict['title'],
                           'department': attr_dict['department'],
                           'manager': attr_dict['manager'],
                           'ou': attr_dict['ou'],
                       }}
                return res
            else:
                # 返回认证失败信息
                return 'auth fail'
        except Exception as exc:
            return exc
    else:
        return False

使用方式

同样,在某个应用的 views.py 文件下写一个测试代码。

from rest_framework.views import APIView

class AutoTestView(APIView):
    """
    基础-验证-Token-测试
    URL /foundation/auth/token/test/
    """
    @staticmethod
    def options(request, *args, **kwargs):
        print(request.user, request.auth)

调用示例

通过用户名称与密码登录:

django_djangorestframeworkjwt_1.png

通过用户获取的 Token 发送请求到服务器:

django_djangorestframeworkjwt_2.png
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,110评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,443评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,474评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,881评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,902评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,698评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,418评论 3 419
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,332评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,796评论 1 316
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,968评论 3 337
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,110评论 1 351
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,792评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,455评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,003评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,130评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,348评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,047评论 2 355