测试组/Test_platform/Interface_automation/backend/apps/lunaruser/views.py
@@ -4,11 +4,15 @@
from rest_framework_jwt.settings import api_settings
from rest_framework.response import Response
from rest_framework.views import APIView
from rest_framework.viewsets import GenericViewSet
from rest_framework.viewsets import GenericViewSet, ModelViewSet
from rest_framework.decorators import action
from drf_yasg.utils import swagger_auto_schema
from rest_framework import status
from rest_framework.permissions import IsAuthenticated
from rest_framework.filters import SearchFilter
from backend.utils.auth import MyJWTAuthentication
from backend.utils.permissions import CustomIsAdminUser
import logging
from backend.utils.request_util import save_login_log
from lunarlink.models import LoginLog
@@ -43,16 +47,33 @@
        except KeyError:
            return Response(response.KEY_MISS)
        user = authenticate(username=username, password=password)
        if user is None:
        try:
            user = User.objects.get(username=username)
        except User.DoesNotExist:
            return Response(response.LOGIN_FAILED)
        # 0后面还需要优化定义明确
        if not user.check_password(password):
            return Response(response.LOGIN_FAILED)
        if hasattr(user, 'status') and user.status == 'pending':
            return Response({
                "success": False,
                "msg": "你提交的注册还在审批中,暂时无法登录,可联系管理员确认!"
            }, status=status.HTTP_403_FORBIDDEN)
        if hasattr(user, 'status') and user.status == 'rejected':
            return Response({
                "success": False,
                "msg": "你的注册申请未通过审核,无法登录!"
            }, status=status.HTTP_403_FORBIDDEN)
        if user.is_active == 0:
            return Response(response.USER_BLOCKED)
        # JWT token creation
        from django.utils import timezone
        user.last_login = timezone.now()
        user.save(update_fields=['last_login'])
        jwt_payload_handler = api_settings.JWT_PAYLOAD_HANDLER
        jwt_encode_handler = api_settings.JWT_ENCODE_HANDLER
@@ -65,6 +86,7 @@
                "user": user.username,
                "name": user.name,
                "is_superuser": user.is_superuser,
                "is_staff": user.is_staff,
                "show_hosts": user.show_hosts,
                "token": token,
            }
@@ -143,4 +165,206 @@
                "code": "9999",
                "success": False,
                "msg": str(e)
            }, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
            }, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
class UserManagementViewSet(ModelViewSet):
    """
    用户管理视图集
    超级用户可以管理所有用户,普通用户只能查看自己的信息
    """
    authentication_classes = [MyJWTAuthentication]
    permission_classes = [IsAuthenticated]
    pagination_class = None
    filter_backends = [SearchFilter]
    search_fields = ['username', 'name']
    def get_queryset(self):
        if self.request.user.is_superuser:
            return User.objects.filter(status='approved').prefetch_related('groups')
        else:
            return User.objects.filter(id=self.request.user.id).prefetch_related('groups')
    def get_serializer_class(self):
        if self.action == 'create':
            return serializers.UserCreateSerializer
        elif self.action in ['update', 'partial_update']:
            return serializers.UserUpdateSerializer
        else:
            return serializers.UserDetailSerializer
    def get_permissions(self):
        if self.action in ['create', 'update', 'partial_update', 'destroy']:
            self.permission_classes = [IsAuthenticated, CustomIsAdminUser]
        return super().get_permissions()
    def create(self, request, *args, **kwargs):
        try:
            return super().create(request, *args, **kwargs)
        except Exception as e:
            import logging
            logger = logging.getLogger(__name__)
            logger.error(f"创建用户失败: {str(e)}", exc_info=True)
            return Response({
                "success": False,
                "msg": f"创建用户失败: {str(e)}"
            }, status=status.HTTP_400_BAD_REQUEST)
    def update(self, request, *args, **kwargs):
        try:
            return super().update(request, *args, **kwargs)
        except Exception as e:
            import logging
            logger = logging.getLogger(__name__)
            logger.error(f"更新用户失败: {str(e)}", exc_info=True)
            return Response({
                "success": False,
                "msg": f"更新用户失败: {str(e)}"
            }, status=status.HTTP_400_BAD_REQUEST)
    def perform_create(self, serializer):
        serializer.save()
    def perform_update(self, serializer):
        serializer.save()
    def perform_destroy(self, instance):
        instance.delete()
class RegisterView(APIView):
    """
    用户注册视图
    """
    authentication_classes = ()
    permission_classes = ()
    @swagger_auto_schema(request_body=serializers.UserRegisterSerializer)
    def post(self, request):
        """
        用户注册
        {
            username: str
            password: str
            confirm_password: str
            name: str
            phone: str
            groups: list (optional)
        }
        """
        serializer = serializers.UserRegisterSerializer(data=request.data)
        if not serializer.is_valid():
            return Response({
                "success": False,
                "msg": serializer.errors
            }, status=status.HTTP_400_BAD_REQUEST)
        try:
            user = serializer.save()
            return Response({
                "success": True,
                "msg": "你的注册申请已提交,待管理员审核中!",
                "data": {
                    "id": user.id,
                    "username": user.username
                }
            }, status=status.HTTP_201_CREATED)
        except Exception as e:
            import logging
            logger = logging.getLogger(__name__)
            logger.error(f"注册失败: {str(e)}", exc_info=True)
            return Response({
                "success": False,
                "msg": f"注册失败: {str(e)}"
            }, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
class UserApprovalViewSet(ModelViewSet):
    """
    用户审批视图集
    """
    authentication_classes = [MyJWTAuthentication]
    permission_classes = [IsAuthenticated]
    pagination_class = None
    filter_backends = [SearchFilter]
    search_fields = ['username', 'name']
    def get_queryset(self):
        if self.request.user.is_superuser or self.request.user.is_staff:
            return User.objects.filter(status='pending').prefetch_related('groups')
        else:
            return User.objects.none()
    def get_serializer_class(self):
        return serializers.UserDetailSerializer
    def list(self, request, *args, **kwargs):
        queryset = self.get_queryset()
        serializer = self.get_serializer(queryset, many=True)
        return Response({
            "success": True,
            "results": serializer.data
        })
    @swagger_auto_schema(request_body=serializers.UserApprovalSerializer)
    @action(detail=True, methods=['post'])
    def approve(self, request, pk=None):
        """
        审批用户
        {
            action: 'approve' | 'reject'
            reason: str (optional)
        }
        """
        user_id = pk
        try:
            user = User.objects.get(id=user_id)
        except User.DoesNotExist:
            return Response({
                "success": False,
                "msg": "用户不存在"
            }, status=status.HTTP_404_NOT_FOUND)
        serializer = serializers.UserApprovalSerializer(data=request.data)
        if not serializer.is_valid():
            return Response({
                "success": False,
                "msg": serializer.errors
            }, status=status.HTTP_400_BAD_REQUEST)
        action = serializer.validated_data['action']
        if action == 'approve':
            user.status = 'approved'
            user.is_active = True
            user.save()
            return Response({
                "success": True,
                "msg": "用户审批已通过"
            })
        elif action == 'reject':
            user.status = 'rejected'
            user.save()
            return Response({
                "success": True,
                "msg": "用户审批未通过"
            })
class GroupListView(APIView):
    """
    获取分组列表(无需认证)
    用于注册时选择分组
    """
    authentication_classes = ()
    permission_classes = ()
    def get(self, request):
        """
        获取所有分组列表
        """
        from django.contrib.auth.models import Group
        groups = Group.objects.all()
        group_list = [{"id": group.id, "name": group.name} for group in groups]
        return Response(group_list)