from django.contrib.auth import authenticate, get_user_model, logout from django.db.models import Q from django.utils.decorators import method_decorator from rest_framework_jwt.settings import api_settings from rest_framework.response import Response from rest_framework.views import APIView from rest_framework.viewsets import GenericViewSet from drf_yasg.utils import swagger_auto_schema from rest_framework import status from rest_framework.permissions import IsAuthenticated from backend.utils.auth import MyJWTAuthentication from backend.utils.request_util import save_login_log from lunarlink.models import LoginLog from lunaruser.common import response from lunaruser import serializers from lunarlink.utils.decorator import request_log User = get_user_model() class LoginView(APIView): """ 登录视图,用户名与密码匹配返回token """ authentication_classes = () permission_classes = () @swagger_auto_schema(request_body=serializers.UserLoginSerializer) def post(self, request): """ 用户名密码一致返回token { username: str password: str } """ try: username = request.data["username"] password = request.data["password"] except KeyError: return Response(response.KEY_MISS) user = authenticate(username=username, password=password) if user is None: return Response(response.LOGIN_FAILED) # 0后面还需要优化定义明确 if user.is_active == 0: return Response(response.USER_BLOCKED) # JWT token creation jwt_payload_handler = api_settings.JWT_PAYLOAD_HANDLER jwt_encode_handler = api_settings.JWT_ENCODE_HANDLER payload = jwt_payload_handler(user) token = jwt_encode_handler(payload) response.LOGIN_SUCCESS.update( { "id": user.id, "user": user.username, "name": user.name, "is_superuser": user.is_superuser, "show_hosts": user.show_hosts, "token": token, } ) request.user = user save_login_log(request=request) return Response(response.LOGIN_SUCCESS) class UserView(APIView): def get(self, request): users = User.objects.filter(is_active=1) ser = serializers.UserModelSerializer(instance=users, many=True) return Response(ser.data) class LoginLogView(GenericViewSet): """ 登录日志接口 list:查询 """ queryset = LoginLog.objects.all() serializer_class = serializers.LoginLogSerializer @method_decorator(request_log(level="DEBUG")) def list(self, request): search = request.query_params.get("search") queryset = self.get_queryset().order_by("-create_time") if search: queryset = queryset.filter( Q(ip__contains=search) | Q(name__contains=search) ) pagination_queryset = self.paginate_queryset(queryset) serializer = self.get_serializer(pagination_queryset, many=True) return self.get_paginated_response(serializer.data) # 在 LoginView 下方添加 class ChangePasswordView(APIView): authentication_classes = [MyJWTAuthentication] permission_classes = [IsAuthenticated] @swagger_auto_schema(request_body=serializers.ChangePasswordSerializer) def post(self, request): serializer = serializers.ChangePasswordSerializer(data=request.data) if not serializer.is_valid(): return Response({ "code": "0100", "success": False, "msg": serializer.errors }, status=status.HTTP_400_BAD_REQUEST) user = request.user if not user.check_password(serializer.validated_data['old_password']): return Response({ "code": "0106", "success": False, "msg": "旧密码不正确" }, status=status.HTTP_400_BAD_REQUEST) try: user.set_password(serializer.validated_data['new_password']) user.save() # 先执行登出再返回响应 logout(request) return Response({ "code": "0000", # 确保前后端使用统一成功码 "success": True, "msg": "密码修改成功,请重新登录" # 统一提示消息 }) except Exception as e: return Response({ "code": "9999", "success": False, "msg": str(e) }, status=status.HTTP_500_INTERNAL_SERVER_ERROR)