from django.contrib.auth import authenticate, get_user_model, logout
|
from django.db.models import Q
|
from django.utils.decorators import method_decorator
|
from rest_framework_jwt.settings import api_settings
|
from rest_framework.response import Response
|
from rest_framework.views import APIView
|
from rest_framework.viewsets import GenericViewSet, ModelViewSet
|
from rest_framework.decorators import action
|
from drf_yasg.utils import swagger_auto_schema
|
from rest_framework import status
|
from rest_framework.permissions import IsAuthenticated
|
from rest_framework.filters import SearchFilter
|
from backend.utils.auth import MyJWTAuthentication
|
from backend.utils.permissions import CustomIsAdminUser
|
import logging
|
|
from backend.utils.request_util import save_login_log
|
from lunarlink.models import LoginLog
|
from lunaruser.common import response
|
from lunaruser import serializers
|
from lunarlink.utils.decorator import request_log
|
|
|
User = get_user_model()
|
|
|
class LoginView(APIView):
|
"""
|
登录视图,用户名与密码匹配返回token
|
"""
|
|
authentication_classes = ()
|
permission_classes = ()
|
|
@swagger_auto_schema(request_body=serializers.UserLoginSerializer)
|
def post(self, request):
|
"""
|
用户名密码一致返回token
|
{
|
username: str
|
password: str
|
}
|
"""
|
try:
|
username = request.data["username"]
|
password = request.data["password"]
|
except KeyError:
|
return Response(response.KEY_MISS)
|
|
try:
|
user = User.objects.get(username=username)
|
except User.DoesNotExist:
|
return Response(response.LOGIN_FAILED)
|
|
if not user.check_password(password):
|
return Response(response.LOGIN_FAILED)
|
|
if hasattr(user, 'status') and user.status == 'pending':
|
return Response({
|
"success": False,
|
"msg": "你提交的注册还在审批中,暂时无法登录,可联系管理员确认!"
|
}, status=status.HTTP_403_FORBIDDEN)
|
|
if hasattr(user, 'status') and user.status == 'rejected':
|
return Response({
|
"success": False,
|
"msg": "你的注册申请未通过审核,无法登录!"
|
}, status=status.HTTP_403_FORBIDDEN)
|
|
if user.is_active == 0:
|
return Response(response.USER_BLOCKED)
|
|
from django.utils import timezone
|
user.last_login = timezone.now()
|
user.save(update_fields=['last_login'])
|
|
jwt_payload_handler = api_settings.JWT_PAYLOAD_HANDLER
|
jwt_encode_handler = api_settings.JWT_ENCODE_HANDLER
|
|
payload = jwt_payload_handler(user)
|
token = jwt_encode_handler(payload)
|
|
response.LOGIN_SUCCESS.update(
|
{
|
"id": user.id,
|
"user": user.username,
|
"name": user.name,
|
"is_superuser": user.is_superuser,
|
"is_staff": user.is_staff,
|
"show_hosts": user.show_hosts,
|
"token": token,
|
}
|
)
|
request.user = user
|
save_login_log(request=request)
|
return Response(response.LOGIN_SUCCESS)
|
|
|
class UserView(APIView):
|
def get(self, request):
|
users = User.objects.filter(is_active=1)
|
ser = serializers.UserModelSerializer(instance=users, many=True)
|
return Response(ser.data)
|
|
|
class LoginLogView(GenericViewSet):
|
"""
|
登录日志接口
|
list:查询
|
"""
|
|
queryset = LoginLog.objects.all()
|
serializer_class = serializers.LoginLogSerializer
|
|
@method_decorator(request_log(level="DEBUG"))
|
def list(self, request):
|
search = request.query_params.get("search")
|
queryset = self.get_queryset().order_by("-create_time")
|
if search:
|
queryset = queryset.filter(
|
Q(ip__contains=search) | Q(name__contains=search)
|
)
|
|
pagination_queryset = self.paginate_queryset(queryset)
|
serializer = self.get_serializer(pagination_queryset, many=True)
|
|
return self.get_paginated_response(serializer.data)
|
|
|
# 在 LoginView 下方添加
|
class ChangePasswordView(APIView):
|
authentication_classes = [MyJWTAuthentication]
|
permission_classes = [IsAuthenticated]
|
|
@swagger_auto_schema(request_body=serializers.ChangePasswordSerializer)
|
def post(self, request):
|
serializer = serializers.ChangePasswordSerializer(data=request.data)
|
if not serializer.is_valid():
|
return Response({
|
"code": "0100",
|
"success": False,
|
"msg": serializer.errors
|
}, status=status.HTTP_400_BAD_REQUEST)
|
|
user = request.user
|
if not user.check_password(serializer.validated_data['old_password']):
|
return Response({
|
"code": "0106",
|
"success": False,
|
"msg": "旧密码不正确"
|
}, status=status.HTTP_400_BAD_REQUEST)
|
|
try:
|
user.set_password(serializer.validated_data['new_password'])
|
user.save()
|
# 先执行登出再返回响应
|
logout(request)
|
return Response({
|
"code": "0000", # 确保前后端使用统一成功码
|
"success": True,
|
"msg": "密码修改成功,请重新登录" # 统一提示消息
|
})
|
except Exception as e:
|
return Response({
|
"code": "9999",
|
"success": False,
|
"msg": str(e)
|
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
|
|
|
class UserManagementViewSet(ModelViewSet):
|
"""
|
用户管理视图集
|
超级用户可以管理所有用户,普通用户只能查看自己的信息
|
"""
|
authentication_classes = [MyJWTAuthentication]
|
permission_classes = [IsAuthenticated]
|
pagination_class = None
|
filter_backends = [SearchFilter]
|
search_fields = ['username', 'name']
|
|
def get_queryset(self):
|
if self.request.user.is_superuser:
|
return User.objects.filter(status='approved').prefetch_related('groups')
|
else:
|
return User.objects.filter(id=self.request.user.id).prefetch_related('groups')
|
|
def get_serializer_class(self):
|
if self.action == 'create':
|
return serializers.UserCreateSerializer
|
elif self.action in ['update', 'partial_update']:
|
return serializers.UserUpdateSerializer
|
else:
|
return serializers.UserDetailSerializer
|
|
def get_permissions(self):
|
if self.action in ['create', 'update', 'partial_update', 'destroy']:
|
self.permission_classes = [IsAuthenticated, CustomIsAdminUser]
|
return super().get_permissions()
|
|
def create(self, request, *args, **kwargs):
|
try:
|
return super().create(request, *args, **kwargs)
|
except Exception as e:
|
import logging
|
logger = logging.getLogger(__name__)
|
logger.error(f"创建用户失败: {str(e)}", exc_info=True)
|
return Response({
|
"success": False,
|
"msg": f"创建用户失败: {str(e)}"
|
}, status=status.HTTP_400_BAD_REQUEST)
|
|
def update(self, request, *args, **kwargs):
|
try:
|
return super().update(request, *args, **kwargs)
|
except Exception as e:
|
import logging
|
logger = logging.getLogger(__name__)
|
logger.error(f"更新用户失败: {str(e)}", exc_info=True)
|
return Response({
|
"success": False,
|
"msg": f"更新用户失败: {str(e)}"
|
}, status=status.HTTP_400_BAD_REQUEST)
|
|
def perform_create(self, serializer):
|
serializer.save()
|
|
def perform_update(self, serializer):
|
serializer.save()
|
|
def perform_destroy(self, instance):
|
instance.delete()
|
|
|
class RegisterView(APIView):
|
"""
|
用户注册视图
|
"""
|
|
authentication_classes = ()
|
permission_classes = ()
|
|
@swagger_auto_schema(request_body=serializers.UserRegisterSerializer)
|
def post(self, request):
|
"""
|
用户注册
|
{
|
username: str
|
password: str
|
confirm_password: str
|
name: str
|
phone: str
|
groups: list (optional)
|
}
|
"""
|
serializer = serializers.UserRegisterSerializer(data=request.data)
|
if not serializer.is_valid():
|
return Response({
|
"success": False,
|
"msg": serializer.errors
|
}, status=status.HTTP_400_BAD_REQUEST)
|
|
try:
|
user = serializer.save()
|
return Response({
|
"success": True,
|
"msg": "你的注册申请已提交,待管理员审核中!",
|
"data": {
|
"id": user.id,
|
"username": user.username
|
}
|
}, status=status.HTTP_201_CREATED)
|
except Exception as e:
|
import logging
|
logger = logging.getLogger(__name__)
|
logger.error(f"注册失败: {str(e)}", exc_info=True)
|
return Response({
|
"success": False,
|
"msg": f"注册失败: {str(e)}"
|
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
|
|
|
class UserApprovalViewSet(ModelViewSet):
|
"""
|
用户审批视图集
|
"""
|
authentication_classes = [MyJWTAuthentication]
|
permission_classes = [IsAuthenticated]
|
pagination_class = None
|
filter_backends = [SearchFilter]
|
search_fields = ['username', 'name']
|
|
def get_queryset(self):
|
if self.request.user.is_superuser or self.request.user.is_staff:
|
return User.objects.filter(status='pending').prefetch_related('groups')
|
else:
|
return User.objects.none()
|
|
def get_serializer_class(self):
|
return serializers.UserDetailSerializer
|
|
def list(self, request, *args, **kwargs):
|
queryset = self.get_queryset()
|
serializer = self.get_serializer(queryset, many=True)
|
return Response({
|
"success": True,
|
"results": serializer.data
|
})
|
|
@swagger_auto_schema(request_body=serializers.UserApprovalSerializer)
|
@action(detail=True, methods=['post'])
|
def approve(self, request, pk=None):
|
"""
|
审批用户
|
{
|
action: 'approve' | 'reject'
|
reason: str (optional)
|
}
|
"""
|
user_id = pk
|
try:
|
user = User.objects.get(id=user_id)
|
except User.DoesNotExist:
|
return Response({
|
"success": False,
|
"msg": "用户不存在"
|
}, status=status.HTTP_404_NOT_FOUND)
|
|
serializer = serializers.UserApprovalSerializer(data=request.data)
|
if not serializer.is_valid():
|
return Response({
|
"success": False,
|
"msg": serializer.errors
|
}, status=status.HTTP_400_BAD_REQUEST)
|
|
action = serializer.validated_data['action']
|
|
if action == 'approve':
|
user.status = 'approved'
|
user.is_active = True
|
user.save()
|
return Response({
|
"success": True,
|
"msg": "用户审批已通过"
|
})
|
elif action == 'reject':
|
user.status = 'rejected'
|
user.save()
|
return Response({
|
"success": True,
|
"msg": "用户审批未通过"
|
})
|
|
|
class GroupListView(APIView):
|
"""
|
获取分组列表(无需认证)
|
用于注册时选择分组
|
"""
|
authentication_classes = ()
|
permission_classes = ()
|
|
def get(self, request):
|
"""
|
获取所有分组列表
|
"""
|
from django.contrib.auth.models import Group
|
groups = Group.objects.all()
|
group_list = [{"id": group.id, "name": group.name} for group in groups]
|
return Response(group_list)
|