| | |
| | | from rest_framework_jwt.settings import api_settings |
| | | from rest_framework.response import Response |
| | | from rest_framework.views import APIView |
| | | from rest_framework.viewsets import GenericViewSet |
| | | from rest_framework.viewsets import GenericViewSet, ModelViewSet |
| | | from rest_framework.decorators import action |
| | | from drf_yasg.utils import swagger_auto_schema |
| | | from rest_framework import status |
| | | from rest_framework.permissions import IsAuthenticated |
| | | from rest_framework.filters import SearchFilter |
| | | from backend.utils.auth import MyJWTAuthentication |
| | | from backend.utils.permissions import CustomIsAdminUser |
| | | import logging |
| | | |
| | | from backend.utils.request_util import save_login_log |
| | | from lunarlink.models import LoginLog |
| | |
| | | except KeyError: |
| | | return Response(response.KEY_MISS) |
| | | |
| | | user = authenticate(username=username, password=password) |
| | | |
| | | if user is None: |
| | | try: |
| | | user = User.objects.get(username=username) |
| | | except User.DoesNotExist: |
| | | return Response(response.LOGIN_FAILED) |
| | | |
| | | # 0后面还需要优化定义明确 |
| | | if not user.check_password(password): |
| | | return Response(response.LOGIN_FAILED) |
| | | |
| | | if hasattr(user, 'status') and user.status == 'pending': |
| | | return Response({ |
| | | "success": False, |
| | | "msg": "你提交的注册还在审批中,暂时无法登录,可联系管理员确认!" |
| | | }, status=status.HTTP_403_FORBIDDEN) |
| | | |
| | | if hasattr(user, 'status') and user.status == 'rejected': |
| | | return Response({ |
| | | "success": False, |
| | | "msg": "你的注册申请未通过审核,无法登录!" |
| | | }, status=status.HTTP_403_FORBIDDEN) |
| | | |
| | | if user.is_active == 0: |
| | | return Response(response.USER_BLOCKED) |
| | | |
| | | # JWT token creation |
| | | from django.utils import timezone |
| | | user.last_login = timezone.now() |
| | | user.save(update_fields=['last_login']) |
| | | |
| | | jwt_payload_handler = api_settings.JWT_PAYLOAD_HANDLER |
| | | jwt_encode_handler = api_settings.JWT_ENCODE_HANDLER |
| | | |
| | |
| | | "user": user.username, |
| | | "name": user.name, |
| | | "is_superuser": user.is_superuser, |
| | | "is_staff": user.is_staff, |
| | | "show_hosts": user.show_hosts, |
| | | "token": token, |
| | | } |
| | |
| | | "code": "9999", |
| | | "success": False, |
| | | "msg": str(e) |
| | | }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) |
| | | }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) |
| | | |
| | | |
| | | class UserManagementViewSet(ModelViewSet): |
| | | """ |
| | | 用户管理视图集 |
| | | 超级用户可以管理所有用户,普通用户只能查看自己的信息 |
| | | """ |
| | | authentication_classes = [MyJWTAuthentication] |
| | | permission_classes = [IsAuthenticated] |
| | | pagination_class = None |
| | | filter_backends = [SearchFilter] |
| | | search_fields = ['username', 'name'] |
| | | |
| | | def get_queryset(self): |
| | | if self.request.user.is_superuser: |
| | | return User.objects.filter(status='approved').prefetch_related('groups') |
| | | else: |
| | | return User.objects.filter(id=self.request.user.id).prefetch_related('groups') |
| | | |
| | | def get_serializer_class(self): |
| | | if self.action == 'create': |
| | | return serializers.UserCreateSerializer |
| | | elif self.action in ['update', 'partial_update']: |
| | | return serializers.UserUpdateSerializer |
| | | else: |
| | | return serializers.UserDetailSerializer |
| | | |
| | | def get_permissions(self): |
| | | if self.action in ['create', 'update', 'partial_update', 'destroy']: |
| | | self.permission_classes = [IsAuthenticated, CustomIsAdminUser] |
| | | return super().get_permissions() |
| | | |
| | | def create(self, request, *args, **kwargs): |
| | | try: |
| | | return super().create(request, *args, **kwargs) |
| | | except Exception as e: |
| | | import logging |
| | | logger = logging.getLogger(__name__) |
| | | logger.error(f"创建用户失败: {str(e)}", exc_info=True) |
| | | return Response({ |
| | | "success": False, |
| | | "msg": f"创建用户失败: {str(e)}" |
| | | }, status=status.HTTP_400_BAD_REQUEST) |
| | | |
| | | def update(self, request, *args, **kwargs): |
| | | try: |
| | | return super().update(request, *args, **kwargs) |
| | | except Exception as e: |
| | | import logging |
| | | logger = logging.getLogger(__name__) |
| | | logger.error(f"更新用户失败: {str(e)}", exc_info=True) |
| | | return Response({ |
| | | "success": False, |
| | | "msg": f"更新用户失败: {str(e)}" |
| | | }, status=status.HTTP_400_BAD_REQUEST) |
| | | |
| | | def perform_create(self, serializer): |
| | | serializer.save() |
| | | |
| | | def perform_update(self, serializer): |
| | | serializer.save() |
| | | |
| | | def perform_destroy(self, instance): |
| | | instance.delete() |
| | | |
| | | |
| | | class RegisterView(APIView): |
| | | """ |
| | | 用户注册视图 |
| | | """ |
| | | |
| | | authentication_classes = () |
| | | permission_classes = () |
| | | |
| | | @swagger_auto_schema(request_body=serializers.UserRegisterSerializer) |
| | | def post(self, request): |
| | | """ |
| | | 用户注册 |
| | | { |
| | | username: str |
| | | password: str |
| | | confirm_password: str |
| | | name: str |
| | | phone: str |
| | | groups: list (optional) |
| | | } |
| | | """ |
| | | serializer = serializers.UserRegisterSerializer(data=request.data) |
| | | if not serializer.is_valid(): |
| | | return Response({ |
| | | "success": False, |
| | | "msg": serializer.errors |
| | | }, status=status.HTTP_400_BAD_REQUEST) |
| | | |
| | | try: |
| | | user = serializer.save() |
| | | return Response({ |
| | | "success": True, |
| | | "msg": "你的注册申请已提交,待管理员审核中!", |
| | | "data": { |
| | | "id": user.id, |
| | | "username": user.username |
| | | } |
| | | }, status=status.HTTP_201_CREATED) |
| | | except Exception as e: |
| | | import logging |
| | | logger = logging.getLogger(__name__) |
| | | logger.error(f"注册失败: {str(e)}", exc_info=True) |
| | | return Response({ |
| | | "success": False, |
| | | "msg": f"注册失败: {str(e)}" |
| | | }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) |
| | | |
| | | |
| | | class UserApprovalViewSet(ModelViewSet): |
| | | """ |
| | | 用户审批视图集 |
| | | """ |
| | | authentication_classes = [MyJWTAuthentication] |
| | | permission_classes = [IsAuthenticated] |
| | | pagination_class = None |
| | | filter_backends = [SearchFilter] |
| | | search_fields = ['username', 'name'] |
| | | |
| | | def get_queryset(self): |
| | | if self.request.user.is_superuser or self.request.user.is_staff: |
| | | return User.objects.filter(status='pending').prefetch_related('groups') |
| | | else: |
| | | return User.objects.none() |
| | | |
| | | def get_serializer_class(self): |
| | | return serializers.UserDetailSerializer |
| | | |
| | | def list(self, request, *args, **kwargs): |
| | | queryset = self.get_queryset() |
| | | serializer = self.get_serializer(queryset, many=True) |
| | | return Response({ |
| | | "success": True, |
| | | "results": serializer.data |
| | | }) |
| | | |
| | | @swagger_auto_schema(request_body=serializers.UserApprovalSerializer) |
| | | @action(detail=True, methods=['post']) |
| | | def approve(self, request, pk=None): |
| | | """ |
| | | 审批用户 |
| | | { |
| | | action: 'approve' | 'reject' |
| | | reason: str (optional) |
| | | } |
| | | """ |
| | | user_id = pk |
| | | try: |
| | | user = User.objects.get(id=user_id) |
| | | except User.DoesNotExist: |
| | | return Response({ |
| | | "success": False, |
| | | "msg": "用户不存在" |
| | | }, status=status.HTTP_404_NOT_FOUND) |
| | | |
| | | serializer = serializers.UserApprovalSerializer(data=request.data) |
| | | if not serializer.is_valid(): |
| | | return Response({ |
| | | "success": False, |
| | | "msg": serializer.errors |
| | | }, status=status.HTTP_400_BAD_REQUEST) |
| | | |
| | | action = serializer.validated_data['action'] |
| | | |
| | | if action == 'approve': |
| | | user.status = 'approved' |
| | | user.is_active = True |
| | | user.save() |
| | | return Response({ |
| | | "success": True, |
| | | "msg": "用户审批已通过" |
| | | }) |
| | | elif action == 'reject': |
| | | user.status = 'rejected' |
| | | user.save() |
| | | return Response({ |
| | | "success": True, |
| | | "msg": "用户审批未通过" |
| | | }) |
| | | |
| | | |
| | | class GroupListView(APIView): |
| | | """ |
| | | 获取分组列表(无需认证) |
| | | 用于注册时选择分组 |
| | | """ |
| | | authentication_classes = () |
| | | permission_classes = () |
| | | |
| | | def get(self, request): |
| | | """ |
| | | 获取所有分组列表 |
| | | """ |
| | | from django.contrib.auth.models import Group |
| | | groups = Group.objects.all() |
| | | group_list = [{"id": group.id, "name": group.name} for group in groups] |
| | | return Response(group_list) |