# -*- coding: utf-8 -*- """ @File : schedule.py @Time : 2023/3/9 10:42 @Author : geekbing @LastEditTime : - @LastEditors : - @Description : 定时任务视图 """ import logging import json from ast import literal_eval from django.core.exceptions import ObjectDoesNotExist from django.db.models import Q from django.utils.decorators import method_decorator from django_celery_beat import models from rest_framework.viewsets import GenericViewSet from rest_framework.response import Response from apps.exceptions.error import TaskNotFound from backend.utils import pagination from backend.celery import app from lunarlink import serializers from lunarlink.utils import response from lunarlink.utils.decorator import request_log from lunarlink.utils.task import Task logger = logging.getLogger(__name__) class ScheduleView(GenericViewSet): """ 定时任务增删改查 """ queryset = models.PeriodicTask.objects serializer_class = serializers.PeriodicTaskSerializer pagination_class = pagination.MyPageNumberPagination @method_decorator(request_log(level="INFO")) def list(self, request): """ 获取定时任务列表 query string """ project = request.query_params.get("project") task_name = request.query_params.get("task_name") creator = request.query_params.get("creator") schedule = ( self.get_queryset().filter(description=project).order_by("-date_changed") ) if task_name: schedule = schedule.filter(name__contains=task_name) if creator: schedule = schedule.filter(kwargs__contains=f'"creator": "{creator}"') page_schedule = self.paginate_queryset(schedule) serializer = self.get_serializer(page_schedule, many=True) return self.get_paginated_response(serializer.data) @method_decorator(request_log(level="INFO")) def add(self, request): """ 新增定时任务 { name: str crontab: str switch: bool data: [int, int] strategy: str receiver: str copy: str project: int } """ project = request.data.get("project") name = request.data.get("name") if models.PeriodicTask.objects.filter( name=f"{project}_{name}", description=project ).exists(): return Response(response.TASK_HAS_EXISTS) ser = serializers.ScheduleDeSerializer(data=request.data) if ser.is_valid(): request.data.update({"creator": request.user.name}) task = Task(**request.data) try: task.add_task() except Exception as e: logger.error(f"An unexpected error occurred: {e}", exc_info=True) return Response({"error": "An unexpected error occurred"}, status=500) return Response(response.TASK_ADD_SUCCESS) else: return Response({**response.TASK_ADD_FAILURE, "msg": ser.errors}) @method_decorator(request_log(level="INFO")) def update(self, request, pk): """ 更新定时任务 """ project = request.data.get("project") name = request.data.get("name") if models.PeriodicTask.objects.filter( ~Q(id=pk), name=f"{project}_{name}" ).exists(): return Response(response.TASK_DUPLICATE_NAME) ser = serializers.ScheduleDeSerializer(data=request.data) if ser.is_valid(): task = Task(**request.data) try: task.update_task(task_id=pk) except TaskNotFound: return Response(response.TASK_NOT_EXISTS) return Response(response.TASK_UPDATE_SUCCESS) else: return Response({**response.TASK_UPDATE_FAILURE, "msg": ser.errors}) @method_decorator(request_log(level="INFO")) def patch(self, request, pk): """ 更新任务的状态 {"switch": bool} """ try: switch = request.data["switch"] except KeyError: return Response(response.KEY_MISS) try: task_obj = self.get_queryset().get(pk=pk) except ObjectDoesNotExist: return Response(response.TASK_NOT_EXISTS) task_obj.enabled = switch kwargs = json.loads(task_obj.kwargs) kwargs["updater"] = request.user.name task_obj.kwargs = json.dumps(kwargs, ensure_ascii=False) task_obj.save() return Response(response.TASK_UPDATE_SUCCESS) @method_decorator(request_log(level="INFO")) def delete(self, request, pk): """ 删除任务 query string """ try: task = models.PeriodicTask.objects.get(id=pk) except ObjectDoesNotExist: return Response(response.TASK_NOT_EXISTS) task.enabled = False # 关闭任务 task.delete() return Response(response.TASK_DEL_SUCCESS) @method_decorator(request_log(level="INFO")) def copy(self, request, pk): """ 复制定时任务 {"name": string} """ try: task_name = request.data["name"] except KeyError: return Response(response.KEY_MISS) try: task_obj = self.get_queryset().get(pk=pk) except ObjectDoesNotExist: return Response(response.TASK_NOT_EXISTS) if task_obj.name == task_name: return Response(response.TASK_COPY_FAILURE) if ( self.get_queryset() .filter(name=f"{task_obj.description}_{task_name}") .exists() ): return Response(response.TASK_COPY_FAILURE) task_obj.id = None task_obj.name = f"{task_obj.description}_{task_name}" task_obj.total_run_count = 0 kwargs = json.loads(task_obj.kwargs) kwargs["creator"] = request.user.name kwargs["updater"] = "" task_obj.kwargs = json.dumps(kwargs, ensure_ascii=False) task_obj.save() return Response(response.TASK_COPY_SUCCESS) @method_decorator(request_log(level="INFO")) def run(self, request, pk): """ 手动执行定时任务 query string """ try: task_obj = models.PeriodicTask.objects.get(id=pk) except ObjectDoesNotExist: return Response(response.TASK_NOT_EXISTS) task_name = "lunarlink.tasks.schedule_debug_suite" args = literal_eval(task_obj.args) kwargs = json.loads(task_obj.kwargs) kwargs["task_id"] = task_obj.id app.send_task( name=task_name, args=args, kwargs=kwargs, queue="beat_tasks", ) return Response(response.TASK_RUN_SUCCESS)