# -*- coding: utf-8 -*-
|
"""
|
@File : auth.py
|
@Time : 2023/1/13 15:06
|
@Author : geekbing
|
@LastEditTime : -
|
@LastEditors : -
|
@Description : 登录认证
|
"""
|
|
import jwt
|
from django.contrib.auth import get_user_model
|
from django.utils.translation import gettext as _
|
from rest_framework import exceptions
|
from rest_framework_jwt.authentication import JSONWebTokenAuthentication
|
from rest_framework_jwt.authentication import jwt_get_username_from_payload
|
from rest_framework_jwt.settings import api_settings
|
|
jwt_payload_handler = api_settings.JWT_PAYLOAD_HANDLER
|
jwt_encode_handler = api_settings.JWT_ENCODE_HANDLER
|
jwt_decode_handler = api_settings.JWT_DECODE_HANDLER
|
|
|
class MyJWTAuthentication(JSONWebTokenAuthentication):
|
def authenticate(self, request):
|
"""
|
Returns a two-tuple of `User` and token if a valid signature has been
|
supplied using JWT-based authentication. Otherwise, returns `None`.
|
"""
|
jwt_value = request.META.get("HTTP_AUTHORIZATION", None)
|
try:
|
payload = jwt_decode_handler(jwt_value)
|
except jwt.ExpiredSignature:
|
msg = "签名过期"
|
raise exceptions.AuthenticationFailed(msg)
|
except jwt.DecodeError:
|
msg = "签名解析失败"
|
raise exceptions.AuthenticationFailed(msg)
|
except jwt.InvalidTokenError:
|
raise exceptions.AuthenticationFailed()
|
|
user = self.authenticate_credentials(payload)
|
|
return user, jwt_value
|
|
def authenticate_credentials(self, payload):
|
"""
|
Returns an active user that matches the payload's user id and email.
|
"""
|
User = get_user_model()
|
username = jwt_get_username_from_payload(payload)
|
|
if not username:
|
msg = _("Invalid payload.")
|
raise exceptions.AuthenticationFailed(msg)
|
|
try:
|
user = User.objects.get_by_natural_key(username)
|
except User.DoesNotExist:
|
msg = "用户不存在"
|
raise exceptions.AuthenticationFailed(msg)
|
|
if not user.is_active:
|
msg = "用户已禁用"
|
raise exceptions.AuthenticationFailed(msg)
|
|
return user
|