hyb
2025-05-14 87453ffd761425b9f363a09a0f8fe07d770cb325
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
from django.contrib.auth import authenticate, get_user_model, logout
from django.db.models import Q
from django.utils.decorators import method_decorator
from rest_framework_jwt.settings import api_settings
from rest_framework.response import Response
from rest_framework.views import APIView
from rest_framework.viewsets import GenericViewSet
from drf_yasg.utils import swagger_auto_schema
from rest_framework import status
from rest_framework.permissions import IsAuthenticated
from backend.utils.auth import MyJWTAuthentication
 
from backend.utils.request_util import save_login_log
from lunarlink.models import LoginLog
from lunaruser.common import response
from lunaruser import serializers
from lunarlink.utils.decorator import request_log
 
 
User = get_user_model()
 
 
class LoginView(APIView):
    """
    登录视图,用户名与密码匹配返回token
    """
 
    authentication_classes = ()
    permission_classes = ()
 
    @swagger_auto_schema(request_body=serializers.UserLoginSerializer)
    def post(self, request):
        """
        用户名密码一致返回token
        {
            username: str
            password: str
        }
        """
        try:
            username = request.data["username"]
            password = request.data["password"]
        except KeyError:
            return Response(response.KEY_MISS)
 
        user = authenticate(username=username, password=password)
 
        if user is None:
            return Response(response.LOGIN_FAILED)
 
        # 0后面还需要优化定义明确
        if user.is_active == 0:
            return Response(response.USER_BLOCKED)
 
        # JWT token creation
        jwt_payload_handler = api_settings.JWT_PAYLOAD_HANDLER
        jwt_encode_handler = api_settings.JWT_ENCODE_HANDLER
 
        payload = jwt_payload_handler(user)
        token = jwt_encode_handler(payload)
 
        response.LOGIN_SUCCESS.update(
            {
                "id": user.id,
                "user": user.username,
                "name": user.name,
                "is_superuser": user.is_superuser,
                "show_hosts": user.show_hosts,
                "token": token,
            }
        )
        request.user = user
        save_login_log(request=request)
        return Response(response.LOGIN_SUCCESS)
 
 
class UserView(APIView):
    def get(self, request):
        users = User.objects.filter(is_active=1)
        ser = serializers.UserModelSerializer(instance=users, many=True)
        return Response(ser.data)
 
 
class LoginLogView(GenericViewSet):
    """
    登录日志接口
    list:查询
    """
 
    queryset = LoginLog.objects.all()
    serializer_class = serializers.LoginLogSerializer
 
    @method_decorator(request_log(level="DEBUG"))
    def list(self, request):
        search = request.query_params.get("search")
        queryset = self.get_queryset().order_by("-create_time")
        if search:
            queryset = queryset.filter(
                Q(ip__contains=search) | Q(name__contains=search)
            )
 
        pagination_queryset = self.paginate_queryset(queryset)
        serializer = self.get_serializer(pagination_queryset, many=True)
 
        return self.get_paginated_response(serializer.data)
 
 
# 在 LoginView 下方添加
class ChangePasswordView(APIView):
    authentication_classes = [MyJWTAuthentication]
    permission_classes = [IsAuthenticated]
 
    @swagger_auto_schema(request_body=serializers.ChangePasswordSerializer)
    def post(self, request):
        serializer = serializers.ChangePasswordSerializer(data=request.data)
        if not serializer.is_valid():
            return Response({
                "code": "0100",
                "success": False,
                "msg": serializer.errors
            }, status=status.HTTP_400_BAD_REQUEST)
 
        user = request.user
        if not user.check_password(serializer.validated_data['old_password']):
            return Response({
                "code": "0106",
                "success": False,
                "msg": "旧密码不正确"
            }, status=status.HTTP_400_BAD_REQUEST)
 
        try:
            user.set_password(serializer.validated_data['new_password'])
            user.save()
            # 先执行登出再返回响应
            logout(request)
            return Response({
                "code": "0000",  # 确保前后端使用统一成功码
                "success": True,
                "msg": "密码修改成功,请重新登录"  # 统一提示消息
            })
        except Exception as e:
            return Response({
                "code": "9999",
                "success": False,
                "msg": str(e)
            }, status=status.HTTP_500_INTERNAL_SERVER_ERROR)