# -*- coding: utf-8 -*-
|
"""
|
@File : task.py
|
@Time : 2023/3/9 10:57
|
@Author : geekbing
|
@LastEditTime : -
|
@LastEditors : -
|
@Description : -
|
"""
|
|
import json
|
import logging
|
|
from django_celery_beat import models as celery_models
|
|
from apps.exceptions.error import TaskNotFound
|
from lunarlink.utils.parser import format_json
|
|
|
logger = logging.getLogger(__name__)
|
|
|
class Task:
|
"""
|
定时任务操作
|
"""
|
|
def __init__(self, **body):
|
"""
|
数据初始化
|
:param body: 请求体数据
|
"""
|
logger.info(f"before process task data:\n {format_json(body)}")
|
self.__name = body["name"]
|
self.__data = body["data"]
|
self.__crontab = body["crontab"]
|
self.__switch = body["switch"]
|
self.__task = "lunarlink.tasks.schedule_debug_suite"
|
self.__project = body["project"]
|
self.__email = {
|
"strategy": body["strategy"],
|
"mail_cc": body.get("mail_cc"),
|
"receiver": body.get("receiver"),
|
"crontab": self.__crontab,
|
"project": self.__project,
|
"task_name": self.__name,
|
"webhook": body.get("webhook"),
|
"updater": body.get("updater"),
|
"creator": body.get("creator"),
|
"ci_project_ids": body.get("ci_project_ids", []),
|
"ci_env": body.get("ci_env", "请选择"),
|
"is_parallel": body.get("is_parallel", False),
|
"config": body.get("config", "请选择"),
|
}
|
self.__crontab_time = None
|
|
def format_crontab(self):
|
"""
|
格式化时间
|
"""
|
cron_fields = self.__crontab.split(" ")
|
self.__crontab_time = {
|
"day_of_week": cron_fields[4],
|
"month_of_year": cron_fields[3],
|
"day_of_month": cron_fields[2],
|
"hour": cron_fields[1],
|
"minute": cron_fields[0],
|
}
|
|
def add_task(self):
|
"""
|
add tasks
|
"""
|
self.format_crontab()
|
|
crontab = celery_models.CrontabSchedule.objects.filter(
|
**self.__crontab_time
|
).first()
|
if crontab is None:
|
crontab = celery_models.CrontabSchedule.objects.create(
|
**self.__crontab_time
|
)
|
celery_models.PeriodicTask.objects.create(
|
name=f"{self.__project}_{self.__name}", # 兼容定时任务名称必须唯一
|
task=self.__task,
|
args=json.dumps(self.__data, ensure_ascii=False),
|
kwargs=json.dumps(self.__email, ensure_ascii=False),
|
enabled=self.__switch,
|
description=self.__project,
|
crontab=crontab,
|
)
|
|
def update_task(self, task_id):
|
"""
|
update task
|
|
:param task_id:
|
:return:
|
"""
|
self.format_crontab()
|
try:
|
task_obj = celery_models.PeriodicTask.objects.get(id=task_id)
|
except celery_models.PeriodicTask.DoesNotExist:
|
raise TaskNotFound(f"task {task_id} not found")
|
|
crontab = celery_models.CrontabSchedule.objects.filter(
|
**self.__crontab_time
|
).first()
|
if crontab is None:
|
crontab = celery_models.CrontabSchedule.objects.create(
|
**self.__crontab_time
|
)
|
|
task_obj.name = f"{self.__project}_{self.__name}"
|
task_obj.crontab = crontab
|
task_obj.enabled = self.__switch
|
task_obj.args = json.dumps(self.__data, ensure_ascii=False)
|
task_obj.kwargs = json.dumps(self.__email, ensure_ascii=False)
|
task_obj.save()
|