# -*- coding: utf-8 -*- """ @File : CI.py @Time : 2023/3/14 11:25 @Author : geekbing @LastEditTime : - @LastEditors : - @Description : 持续集成CI视图 """ import datetime import json import re import time from ast import literal_eval from typing import Dict import xmltodict from django.http import HttpResponse from django.core.exceptions import ObjectDoesNotExist from django.conf import settings from django_celery_beat.models import PeriodicTask from drf_yasg.utils import swagger_auto_schema from lunarlink import models from lunarlink.utils import loader, qy_message from lunarlink.utils.decorator import request_log from django.utils.decorators import method_decorator from rest_framework import status from rest_framework.response import Response from rest_framework.viewsets import GenericViewSet from lunarlink.serializers import CISerializer, CIReportSerializer from lunarlink.utils.loader import save_summary from lunarlink.utils import response def summary2junit(summary: Dict) -> Dict: """初始化JUnit的数据结构 :param summary: :return: """ res = { "testsuites": { "testsuite": { "errors": 0, "failures": 0, "hostname": "", "name": "", "skipped": 0, "tests": 0, "time": "0", "timestamp": "20210524T18:04:50.941913", "testcase": [], } } } time_info = summary.get("time") res["testsuites"]["testsuite"]["time"] = time_info.get("duration") start_at: str = time_info.get("start_at") timestamp = datetime.datetime.fromtimestamp(int(float(start_at))).strftime( "%Y-%m-%dT%H:%M:%S.%f" ) res["testsuites"]["testsuite"]["timestamp"] = timestamp details = summary.get("details", []) res["testsuites"]["testsuite"]["tests"] = len(details) for detail in details: test_case = build_testcase(detail, res) res["testsuites"]["testsuite"]["testcase"].append(test_case) return res def build_testcase(detail, res): """ 构建Junit Testcase :param detail: :param res: :return: """ test_case = {"classname": "", "file": "", "line": "", "name": "", "time": ""} name = detail.get("name") test_case["classname"] = name # 对应junit的Suite records = detail.get("records") test_case["line"] = len(records) test_case["time"] = detail["time"]["duration"] result = detail.get("success") step_names = [] for index, record in enumerate(records): step_names.append(f"{index}-{record['name']}") test_case["name"] = "\n".join(step_names) # 对应junit的每个case的Name # 记录错误case的详细信息 if result is False: case_error, failure_details = build_failure_detail(records) if case_error: res["testsuites"]["testsuite"]["errors"] += 1 else: res["testsuites"]["testsuite"]["failures"] += 1 failure = {"message": "断言或者抽取失败", "#text": "\n".join(failure_details)} test_case["failure"] = failure return test_case def build_failure_detail(records): """记录错误case的详细信息 :param records: :return: """ case_error = False failure_details = [] for index, record in enumerate(records): step_status = record.get("status") if step_status == "failure": failure_details.append( f"{index}-{record['name']}\n{record.get('attachment')}\n{'*' * 68}" ) elif step_status == "error": case_error = True return case_error, failure_details class CIView(GenericViewSet): authentication_classes = [] serializer_class = CISerializer pagination_class = None @swagger_auto_schema(operation_summary="gitlab-ci触发自动化用例运行") @method_decorator(request_log(level="INFO")) def run_ci_tests(self, request): """ gitlab-ci发送请求 测试平台解析参数,定时任务中的ci_project_ic和ci_env同时匹配时,返回触发执行任务 """ ser = CISerializer(data=request.data) if ser.is_valid(): task_name = "lunarlink.tasks.schedule_debug_suite" ci_project_id: int = ser.validated_data.get("ci_project_id") ci_env: str = ser.validated_data.get("env") query = PeriodicTask.objects.filter(enabled=1, task=task_name) pk_kwargs_list = query.values("pk", "kwargs", "description") enabled_task_ids = [] project = None for pk_kwargs in pk_kwargs_list: pk: int = pk_kwargs["pk"] kwargs: dict = json.loads(pk_kwargs["kwargs"]) ci_project_ids: list = literal_eval( kwargs.get("ci_project_ids") or "[]" ) if isinstance(ci_project_ids, int): ci_project_ids = [ci_project_ids] # 定时任务中的ci_project_id和ci_env同时匹配 if ( ci_project_id in ci_project_ids and ci_env and ci_env == kwargs.get("ci_env") ): enabled_task_ids.append(pk) project = pk_kwargs["description"] # 没有匹配用例,直接返回 if not enabled_task_ids: timestamp = datetime.datetime.fromtimestamp(int(time.time())).strftime( "%Y-%m-%dT%H:%M:%S.%f" ) not_found_case_res = { "testsuites": { "testsuite": { "errors": 0, "failures": 0, "hostname": "", "name": "没有找到匹配的用例", "skipped": 0, "tests": 0, "time": "0", "timestamp": timestamp, "testcase": [], } } } xml_data = xmltodict.unparse(not_found_case_res) return HttpResponse(xml_data, content_type="text/xml") test_sets = [] suite_list = [] config_list = [] config = None webhook_set = set() override_config_body = None for task_id in enabled_task_ids: task_obj = query.filter(id=task_id).first() # 如果task中存在重载配置,就覆盖用例中的配置 override_config = json.loads(task_obj.kwargs).get("config") if ( override_config_body is None and override_config and override_config != "请选择" ): try: override_config_body = literal_eval( models.Config.objects.get( name=override_config, project__id=project ).body ) except ObjectDoesNotExist: return Response(response.CONFIG_NOT_EXISTS) if task_obj: # 判断webhook是否合法 case_ids = task_obj.args url = json.loads(task_obj.kwargs).get("webhook") url_pattern = r"http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+" if re.match(url_pattern, url): webhook_set.add(url) else: continue # 反查出一个task中包含的所有用例 suite = list( models.Case.objects.filter(pk__in=literal_eval(case_ids)) .order_by("id") .values("id", "name") ) for case in suite: case_step_list = ( models.CaseStep.objects.filter(case__id=case["id"]) .order_by("step") .values("body") ) testcase_list = [] for case_step in case_step_list: body = literal_eval(case_step["body"]) if body["request"].get("url"): testcase_list.append(body) elif config is None and body["request"].get("base_url"): # 当前步骤时配置 # 如果task中存在重载配置,就覆盖用例中的配置 if override_config_body: config = override_config_body else: try: config = literal_eval( models.Config.objects.get( name=body["name"], project__id=project ).body ) except ObjectDoesNotExist: return Response(response.CONFIG_NOT_EXISTS) config_list.append(config) test_sets.append(testcase_list) config = None suite_list.extend(suite) override_config_body = None # 同步运行用例 summary, _ = loader.debug_suite( suite=test_sets, project=project, obj=suite_list, config=config_list, save=False, ) ci_project_namespace = ser.validated_data["ci_project_namespace"] ci_project_name = ser.validated_data["ci_project_name"] ci_job_id = ser.validated_data["ci_job_id"] summary["name"] = f"{ci_project_namespace}_{ci_project_name}_job{ci_job_id}" report_id = save_summary( name=summary.get("name"), summary=summary, project=project, report_type=4, user=ser.validated_data["start_job_user"], ci_metadata=ser.validated_data, ) junit_results = summary2junit(summary) xml_data = xmltodict.unparse(junit_results) summary["task_name"] = "gitlab-ci_" + summary.get("name") summary["report_id"] = report_id for webhook in webhook_set: # TODO: 还需要优化企微发送,加入ci集成参数 qy_message.send_message( summary=summary, webhook=webhook, ci_job_url=ser.validated_data["ci_job_url"], ci_pipeline_url=ser.validated_data["ci_pipeline_url"], case_count=junit_results["testsuites"]["testsuite"]["tests"], ) return HttpResponse(xml_data, content_type="text/xml") else: return Response(ser.errors, status=status.HTTP_400_BAD_REQUEST) @swagger_auto_schema( query_serializer=CIReportSerializer(), operation_summary="获取gitlab-ci运行的报告url", ) def get_ci_report_url(self, request): """获取gitlab-ci运行的报告url""" ser = CIReportSerializer(data=request.query_params) if ser.is_valid(): ci_job_id = ser.validated_data["ci_job_id"] report_obj = models.Report.objects.filter(ci_job_id=ci_job_id).first() if report_obj: report_url = f"{settings.BASE_REPORT_URL}/{report_obj.id}/" else: return Response(data=f"查找的ci_job_id: {ci_job_id}不存在") return Response(data=report_url) else: return Response(data=ser.errors, status=status.HTTP_400_BAD_REQUEST)