hyb
2026-01-30 44480e71b27aa9d4cb8441f50c873f1b110e9691
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# -*- coding: utf-8 -*-
"""
@File    : task.py
@Time    : 2023/3/9 10:57
@Author  : geekbing
@LastEditTime : -
@LastEditors : -
@Description : -
"""
 
import json
import logging
 
from django_celery_beat import models as celery_models
 
from apps.exceptions.error import TaskNotFound
from lunarlink.utils.parser import format_json
 
 
logger = logging.getLogger(__name__)
 
 
class Task:
    """
    定时任务操作
    """
 
    def __init__(self, **body):
        """
        数据初始化
        :param body: 请求体数据
        """
        logger.info(f"before process task data:\n {format_json(body)}")
        self.__name = body["name"]
        self.__data = body["data"]
        self.__crontab = body["crontab"]
        self.__switch = body["switch"]
        self.__task = "lunarlink.tasks.schedule_debug_suite"
        self.__project = body["project"]
        self.__email = {
            "strategy": body["strategy"],
            "mail_cc": body.get("mail_cc"),
            "receiver": body.get("receiver"),
            "crontab": self.__crontab,
            "project": self.__project,
            "task_name": self.__name,
            "webhook": body.get("webhook"),
            "updater": body.get("updater"),
            "creator": body.get("creator"),
            "ci_project_ids": body.get("ci_project_ids", []),
            "ci_env": body.get("ci_env", "请选择"),
            "is_parallel": body.get("is_parallel", False),
            "config": body.get("config", "请选择"),
        }
        self.__crontab_time = None
 
    def format_crontab(self):
        """
        格式化时间
        """
        cron_fields = self.__crontab.split(" ")
        self.__crontab_time = {
            "day_of_week": cron_fields[4],
            "month_of_year": cron_fields[3],
            "day_of_month": cron_fields[2],
            "hour": cron_fields[1],
            "minute": cron_fields[0],
        }
 
    def add_task(self):
        """
        add tasks
        """
        self.format_crontab()
 
        crontab = celery_models.CrontabSchedule.objects.filter(
            **self.__crontab_time
        ).first()
        if crontab is None:
            crontab = celery_models.CrontabSchedule.objects.create(
                **self.__crontab_time
            )
        celery_models.PeriodicTask.objects.create(
            name=f"{self.__project}_{self.__name}",  # 兼容定时任务名称必须唯一
            task=self.__task,
            args=json.dumps(self.__data, ensure_ascii=False),
            kwargs=json.dumps(self.__email, ensure_ascii=False),
            enabled=self.__switch,
            description=self.__project,
            crontab=crontab,
        )
 
    def update_task(self, task_id):
        """
        update task
 
        :param task_id:
        :return:
        """
        self.format_crontab()
        try:
            task_obj = celery_models.PeriodicTask.objects.get(id=task_id)
        except celery_models.PeriodicTask.DoesNotExist:
            raise TaskNotFound(f"task {task_id} not found")
 
        crontab = celery_models.CrontabSchedule.objects.filter(
            **self.__crontab_time
        ).first()
        if crontab is None:
            crontab = celery_models.CrontabSchedule.objects.create(
                **self.__crontab_time
            )
 
        task_obj.name = f"{self.__project}_{self.__name}"
        task_obj.crontab = crontab
        task_obj.enabled = self.__switch
        task_obj.args = json.dumps(self.__data, ensure_ascii=False)
        task_obj.kwargs = json.dumps(self.__email, ensure_ascii=False)
        task_obj.save()