# -*- coding: utf-8 -*-
|
"""
|
@File : tasks.py
|
@Time : 2023/2/6 11:07
|
@Author : geekbing
|
@LastEditTime : -
|
@LastEditors : -
|
@Description : 定时任务、异步任务
|
"""
|
|
import logging
|
from ast import literal_eval
|
from enum import IntEnum
|
|
from celery import shared_task, Task
|
from django_bulk_update.helper import bulk_update
|
from django_celery_beat.models import PeriodicTask
|
from django.db.models import F
|
from django.core.exceptions import ObjectDoesNotExist
|
from lunarlink import models
|
from lunarlink.utils.loader import save_summary, debug_api, debug_suite
|
from lunarlink.utils.parser import Yapi
|
from lunarlink.utils import qy_message, email_helper
|
from lunarlink.utils.message_template import dd_msg_template
|
from lunarlink.utils import response
|
from lunarlink.utils.dingtalk_helper import DingTalkHelper
|
from lunarlink.utils.message_template import parse_message
|
import time
|
|
# 填写你的钉钉机器人 access_token 和 secret
|
ACCESS_TOKEN = 'd42ec7c82717e8cad1c06c2caef4484cac590eb779aecec8e541750c29588d16'
|
SECRET = 'SEC96c0ef4507f86bdcb5c0c1237a32260130127825ec9439f9c54f69e9fd1a7056'
|
# 创建 DingTalkHelper 实例
|
dingtalk_helper = DingTalkHelper(ACCESS_TOKEN, SECRET)
|
|
logger = logging.getLogger(__name__)
|
|
|
class ReportType(IntEnum):
|
DEPLOY = 4
|
TIMING = 3
|
|
|
def update_task_total_run_count(task_id):
|
"""增加任务的总运行次数
|
:param task_id:
|
:return:
|
"""
|
if task_id:
|
PeriodicTask.objects.filter(id=task_id).update(
|
date_changed=F("date_changed"),
|
total_run_count=F("total_run_count") + 1,
|
)
|
|
|
class MyBaseTask(Task):
|
def run(self, *args, **kwargs):
|
pass
|
|
def on_failure(self, exc, task_id, args, kwargs, einfo):
|
update_task_total_run_count(kwargs.get("task_id"))
|
|
def on_success(self, retval, task_id, args, kwargs):
|
update_task_total_run_count(kwargs.get("task_id"))
|
|
|
@shared_task
|
def async_import_yapi_api(yapi_base_url, yapi_token, project_id):
|
"""异步导入yapi接口"""
|
yapi = Yapi(
|
yapi_base_url=yapi_base_url,
|
token=yapi_token,
|
faster_project_id=project_id,
|
)
|
apis_imported_from_yapi = models.API.objects.filter(
|
project_id=project_id,
|
creator__name="yapi",
|
)
|
imported_apis_mapping = {
|
api.yapi_id: api.yapi_up_time for api in apis_imported_from_yapi
|
}
|
create_ids, update_ids = yapi.get_create_or_update_apis(
|
imported_apis_mapping=imported_apis_mapping
|
)
|
try:
|
# 获取yapi的分组, 然后更新api tree
|
yapi.create_relation_id(project_id=yapi.faster_project_id)
|
except Exception as e:
|
logger.error(f"导入yapi失败:{e}")
|
return {"status": "error", "message": response.YAPI_ADD_FAILED}
|
|
# 通过id获取所有api的详情
|
create_ids.extend(update_ids)
|
if not create_ids:
|
return {"status": "error", "message": response.YAPI_NOT_NEED_CREATE_OR_UPDATE}
|
|
new_api_instances = []
|
update_api_instances = []
|
for api_detail in yapi.get_batch_api_detail(api_ids=create_ids):
|
# 把yapi解析成符合faster的api格式
|
api_instances = yapi.get_parsed_apis(api_info=[api_detail])
|
updated_api, new_api = yapi.merge_api(
|
api_instances=api_instances,
|
apis_imported_from_yapi=apis_imported_from_yapi,
|
)
|
new_api_instances.extend(new_api)
|
update_api_instances.extend(updated_api)
|
|
created_objs = models.API.objects.bulk_create(objs=new_api_instances)
|
bulk_update(update_api_instances)
|
|
created_apis_count = len(created_objs)
|
updated_apis_count = len(update_api_instances)
|
return {
|
"status": "success",
|
"created_apis_count": created_apis_count,
|
"updated_apis_count": updated_apis_count,
|
}
|
|
|
@shared_task
|
def async_debug_api(api, project, name, config=None, user=None):
|
"""
|
异步执行api
|
:param api:
|
:param project:
|
:param name:
|
:param config:
|
:param user:
|
:return:
|
"""
|
summary = debug_api(
|
api=api,
|
project=project,
|
config=config,
|
save=False,
|
user=user,
|
)
|
report_id = save_summary(
|
name=name,
|
summary=summary,
|
project=project,
|
user=user,
|
)
|
|
return {
|
"status": "success",
|
"report_id": report_id,
|
}
|
|
|
@shared_task
|
def async_debug_suite(suite, project, obj, report, config, user=None):
|
"""异步执行suite"""
|
summary, _ = debug_suite(
|
suite=suite,
|
project=project,
|
obj=obj,
|
config=config,
|
save=False,
|
user=user,
|
)
|
report_id = save_summary(
|
name=report,
|
summary=summary,
|
project=project,
|
user=user,
|
)
|
|
return {
|
"status": "success",
|
"report_id": report_id,
|
}
|
|
|
def get_test_suite(args):
|
"""
|
获取测试用例集
|
:param args:
|
:return:
|
"""
|
case_ids = [pk for pk in args if models.Case.objects.filter(id=pk).exists()]
|
cases = models.Case.objects.in_bulk(case_ids)
|
return [{"name": case.name, "id": case.id} for case in cases.values()]
|
|
|
def process_override_config(kwargs, project):
|
"""
|
处理覆盖用例原有配置
|
|
:param kwargs:
|
:param project:
|
:return:
|
"""
|
override_config = kwargs.get("config", "")
|
override_config_body = None
|
if override_config and override_config != "请选择":
|
try:
|
override_config_body = literal_eval(
|
models.Config.objects.get(
|
name=override_config, project__id=project
|
).body
|
)
|
except ObjectDoesNotExist:
|
logger.error(response.CONFIG_NOT_EXISTS["msg"])
|
return override_config_body
|
|
|
def build_test_sets(suite, project, override_config_body):
|
"""
|
构建测试集
|
|
:param suite:
|
:param project:
|
:param override_config_body:
|
:return:
|
"""
|
test_sets = []
|
config_list = []
|
for content in suite:
|
test_list = (
|
models.CaseStep.objects.filter(case__id=content["id"])
|
.order_by("step")
|
.values("body")
|
)
|
|
testcase_list = []
|
config = None
|
for test in test_list:
|
body = literal_eval(test["body"])
|
if "base_url" in body["request"].keys():
|
if override_config_body:
|
config = override_config_body
|
continue
|
try:
|
config = literal_eval(
|
models.Config.objects.get(
|
name=body["name"], project__id=project
|
).body
|
)
|
except ObjectDoesNotExist:
|
logger.error(response.CONFIG_NOT_EXISTS["msg"])
|
continue
|
testcase_list.append(body)
|
config_list.append(config)
|
test_sets.append(testcase_list)
|
|
return test_sets, config_list
|
|
|
def execute_test_suite(test_sets, project, suite, config_list, is_parallel):
|
"""
|
执行测试套件
|
|
:param test_sets:
|
:param project:
|
:param suite:
|
:param config_list:
|
:param is_parallel:
|
:return:
|
"""
|
return debug_suite(
|
suite=test_sets,
|
project=project,
|
obj=suite,
|
config=config_list,
|
allow_parallel=is_parallel,
|
save=False,
|
)
|
|
|
def prepare_report_details(kwargs):
|
"""
|
准备报告详情
|
|
:param kwargs:
|
:return:
|
"""
|
task_name = kwargs["task_name"]
|
if kwargs.get("run_type") == "deploy":
|
task_name = "部署_" + task_name
|
report_type = ReportType.DEPLOY.value
|
else:
|
report_type = ReportType.TIMING.value
|
|
return task_name, report_type
|
|
|
def save_schedule_summary(task_name, summary, project, report_type):
|
"""
|
保存测试报告
|
|
:param task_name:
|
:param summary:
|
:param project:
|
:param report_type:
|
:return:
|
"""
|
return save_summary(
|
name=task_name,
|
summary=summary,
|
project=project,
|
report_type=report_type,
|
)
|
|
|
def send_notifications(args, kwargs, summary, task_name, report_id):
|
"""
|
发送通知
|
|
:param args:
|
:param kwargs:
|
:param summary:
|
:param task_name:
|
:param report_id:
|
:return:
|
"""
|
strategy = kwargs.get("strategy")
|
if strategy == "始终发送" or (
|
strategy == "仅失败发送" and summary["stat"]["failures"] > 0
|
):
|
summary.update({"task_name": task_name, "report_id": report_id})
|
webhook = kwargs.get("webhook", "")
|
email_recipient = kwargs.get("receiver", "")
|
email_cc = kwargs.get("mail_cc", "")
|
|
# 解析 summary 获取各项数据
|
parsed_data = parse_message(summary, case_count=len(args))
|
t_name = parsed_data["task_name"]
|
duration = parsed_data["duration"]
|
case_count = parsed_data["case_count"]
|
pass_count = parsed_data["pass_count"]
|
error_count = parsed_data["error_count"]
|
fail_count = parsed_data["fail_count"]
|
fail_rate = parsed_data["fail_rate"]
|
report_url = parsed_data["report_url"]
|
creator = kwargs.get("creator", "")
|
updater = kwargs.get("updater", "")
|
|
# 生成钉钉消息模板(返回字典,包含 msgtype 和 markdown)
|
dd_message = dd_msg_template(
|
task_name=t_name,
|
duration=duration,
|
case_count=case_count,
|
pass_count=pass_count,
|
error_count=error_count,
|
fail_count=fail_count,
|
fail_rate=fail_rate,
|
report_url=report_url,
|
creator=creator,
|
updater=updater,
|
)
|
# 将 dd_message 中的 markdown 内容传递给钉钉发送方法
|
message_text = dd_message["markdown"]["content"]
|
|
if webhook:
|
qy_message.send_message(
|
summary=summary,
|
webhook=webhook,
|
case_count=len(args),
|
)
|
dingtalk_helper.send_markdown(title=t_name, text=message_text)
|
if email_recipient:
|
email_helper.send(
|
summary=summary,
|
email_recipient=email_recipient,
|
email_cc=email_cc,
|
case_count=len(args),
|
)
|
dingtalk_helper.send_markdown(title=t_name, text=message_text)
|
|
|
@shared_task(base=MyBaseTask, queue="beat_tasks")
|
def schedule_debug_suite(*args, **kwargs):
|
"""定时任务"""
|
project = kwargs.get("project")
|
suite = get_test_suite(args)
|
override_config_body = process_override_config(kwargs, project)
|
test_sets, config_list = build_test_sets(
|
suite=suite,
|
project=project,
|
override_config_body=override_config_body,
|
)
|
|
is_parallel = kwargs.get("is_parallel", False)
|
summary, _ = execute_test_suite(
|
test_sets=test_sets,
|
project=project,
|
suite=suite,
|
config_list=config_list,
|
is_parallel=is_parallel,
|
)
|
|
task_name, report_type = prepare_report_details(kwargs)
|
|
report_id = save_schedule_summary(
|
task_name=task_name,
|
summary=summary,
|
project=project,
|
report_type=report_type,
|
)
|
|
send_notifications(
|
args,
|
kwargs,
|
summary,
|
task_name,
|
report_id,
|
)
|
|
return {
|
"status": "success",
|
"report_id": report_id,
|
}
|