from django.contrib.auth import authenticate, get_user_model, logout from django.db.models import Q from django.utils.decorators import method_decorator from rest_framework_jwt.settings import api_settings from rest_framework.response import Response from rest_framework.views import APIView from rest_framework.viewsets import GenericViewSet, ModelViewSet from rest_framework.decorators import action from drf_yasg.utils import swagger_auto_schema from rest_framework import status from rest_framework.permissions import IsAuthenticated from rest_framework.filters import SearchFilter from backend.utils.auth import MyJWTAuthentication from backend.utils.permissions import CustomIsAdminUser import logging from backend.utils.request_util import save_login_log from lunarlink.models import LoginLog from lunaruser.common import response from lunaruser import serializers from lunarlink.utils.decorator import request_log User = get_user_model() class LoginView(APIView): """ 登录视图,用户名与密码匹配返回token """ authentication_classes = () permission_classes = () @swagger_auto_schema(request_body=serializers.UserLoginSerializer) def post(self, request): """ 用户名密码一致返回token { username: str password: str } """ try: username = request.data["username"] password = request.data["password"] except KeyError: return Response(response.KEY_MISS) try: user = User.objects.get(username=username) except User.DoesNotExist: return Response(response.LOGIN_FAILED) if not user.check_password(password): return Response(response.LOGIN_FAILED) if hasattr(user, 'status') and user.status == 'pending': return Response({ "success": False, "msg": "你提交的注册还在审批中,暂时无法登录,可联系管理员确认!" }, status=status.HTTP_403_FORBIDDEN) if hasattr(user, 'status') and user.status == 'rejected': return Response({ "success": False, "msg": "你的注册申请未通过审核,无法登录!" }, status=status.HTTP_403_FORBIDDEN) if user.is_active == 0: return Response(response.USER_BLOCKED) from django.utils import timezone user.last_login = timezone.now() user.save(update_fields=['last_login']) jwt_payload_handler = api_settings.JWT_PAYLOAD_HANDLER jwt_encode_handler = api_settings.JWT_ENCODE_HANDLER payload = jwt_payload_handler(user) token = jwt_encode_handler(payload) response.LOGIN_SUCCESS.update( { "id": user.id, "user": user.username, "name": user.name, "is_superuser": user.is_superuser, "is_staff": user.is_staff, "show_hosts": user.show_hosts, "token": token, } ) request.user = user save_login_log(request=request) return Response(response.LOGIN_SUCCESS) class UserView(APIView): def get(self, request): users = User.objects.filter(is_active=1) ser = serializers.UserModelSerializer(instance=users, many=True) return Response(ser.data) class LoginLogView(GenericViewSet): """ 登录日志接口 list:查询 """ queryset = LoginLog.objects.all() serializer_class = serializers.LoginLogSerializer @method_decorator(request_log(level="DEBUG")) def list(self, request): search = request.query_params.get("search") queryset = self.get_queryset().order_by("-create_time") if search: queryset = queryset.filter( Q(ip__contains=search) | Q(name__contains=search) ) pagination_queryset = self.paginate_queryset(queryset) serializer = self.get_serializer(pagination_queryset, many=True) return self.get_paginated_response(serializer.data) # 在 LoginView 下方添加 class ChangePasswordView(APIView): authentication_classes = [MyJWTAuthentication] permission_classes = [IsAuthenticated] @swagger_auto_schema(request_body=serializers.ChangePasswordSerializer) def post(self, request): serializer = serializers.ChangePasswordSerializer(data=request.data) if not serializer.is_valid(): return Response({ "code": "0100", "success": False, "msg": serializer.errors }, status=status.HTTP_400_BAD_REQUEST) user = request.user if not user.check_password(serializer.validated_data['old_password']): return Response({ "code": "0106", "success": False, "msg": "旧密码不正确" }, status=status.HTTP_400_BAD_REQUEST) try: user.set_password(serializer.validated_data['new_password']) user.save() # 先执行登出再返回响应 logout(request) return Response({ "code": "0000", # 确保前后端使用统一成功码 "success": True, "msg": "密码修改成功,请重新登录" # 统一提示消息 }) except Exception as e: return Response({ "code": "9999", "success": False, "msg": str(e) }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) class UserManagementViewSet(ModelViewSet): """ 用户管理视图集 超级用户可以管理所有用户,普通用户只能查看自己的信息 """ authentication_classes = [MyJWTAuthentication] permission_classes = [IsAuthenticated] pagination_class = None filter_backends = [SearchFilter] search_fields = ['username', 'name'] def get_queryset(self): if self.request.user.is_superuser: return User.objects.filter(status='approved').prefetch_related('groups') else: return User.objects.filter(id=self.request.user.id).prefetch_related('groups') def get_serializer_class(self): if self.action == 'create': return serializers.UserCreateSerializer elif self.action in ['update', 'partial_update']: return serializers.UserUpdateSerializer else: return serializers.UserDetailSerializer def get_permissions(self): if self.action in ['create', 'update', 'partial_update', 'destroy']: self.permission_classes = [IsAuthenticated, CustomIsAdminUser] return super().get_permissions() def create(self, request, *args, **kwargs): try: return super().create(request, *args, **kwargs) except Exception as e: import logging logger = logging.getLogger(__name__) logger.error(f"创建用户失败: {str(e)}", exc_info=True) return Response({ "success": False, "msg": f"创建用户失败: {str(e)}" }, status=status.HTTP_400_BAD_REQUEST) def update(self, request, *args, **kwargs): try: return super().update(request, *args, **kwargs) except Exception as e: import logging logger = logging.getLogger(__name__) logger.error(f"更新用户失败: {str(e)}", exc_info=True) return Response({ "success": False, "msg": f"更新用户失败: {str(e)}" }, status=status.HTTP_400_BAD_REQUEST) def perform_create(self, serializer): serializer.save() def perform_update(self, serializer): serializer.save() def perform_destroy(self, instance): instance.delete() class RegisterView(APIView): """ 用户注册视图 """ authentication_classes = () permission_classes = () @swagger_auto_schema(request_body=serializers.UserRegisterSerializer) def post(self, request): """ 用户注册 { username: str password: str confirm_password: str name: str phone: str groups: list (optional) } """ serializer = serializers.UserRegisterSerializer(data=request.data) if not serializer.is_valid(): return Response({ "success": False, "msg": serializer.errors }, status=status.HTTP_400_BAD_REQUEST) try: user = serializer.save() return Response({ "success": True, "msg": "你的注册申请已提交,待管理员审核中!", "data": { "id": user.id, "username": user.username } }, status=status.HTTP_201_CREATED) except Exception as e: import logging logger = logging.getLogger(__name__) logger.error(f"注册失败: {str(e)}", exc_info=True) return Response({ "success": False, "msg": f"注册失败: {str(e)}" }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) class UserApprovalViewSet(ModelViewSet): """ 用户审批视图集 """ authentication_classes = [MyJWTAuthentication] permission_classes = [IsAuthenticated] pagination_class = None filter_backends = [SearchFilter] search_fields = ['username', 'name'] def get_queryset(self): if self.request.user.is_superuser or self.request.user.is_staff: return User.objects.filter(status='pending').prefetch_related('groups') else: return User.objects.none() def get_serializer_class(self): return serializers.UserDetailSerializer def list(self, request, *args, **kwargs): queryset = self.get_queryset() serializer = self.get_serializer(queryset, many=True) return Response({ "success": True, "results": serializer.data }) @swagger_auto_schema(request_body=serializers.UserApprovalSerializer) @action(detail=True, methods=['post']) def approve(self, request, pk=None): """ 审批用户 { action: 'approve' | 'reject' reason: str (optional) } """ user_id = pk try: user = User.objects.get(id=user_id) except User.DoesNotExist: return Response({ "success": False, "msg": "用户不存在" }, status=status.HTTP_404_NOT_FOUND) serializer = serializers.UserApprovalSerializer(data=request.data) if not serializer.is_valid(): return Response({ "success": False, "msg": serializer.errors }, status=status.HTTP_400_BAD_REQUEST) action = serializer.validated_data['action'] if action == 'approve': user.status = 'approved' user.is_active = True user.save() return Response({ "success": True, "msg": "用户审批已通过" }) elif action == 'reject': user.status = 'rejected' user.save() return Response({ "success": True, "msg": "用户审批未通过" }) class GroupListView(APIView): """ 获取分组列表(无需认证) 用于注册时选择分组 """ authentication_classes = () permission_classes = () def get(self, request): """ 获取所有分组列表 """ from django.contrib.auth.models import Group groups = Group.objects.all() group_list = [{"id": group.id, "name": group.name} for group in groups] return Response(group_list)