hyb
2025-05-14 87453ffd761425b9f363a09a0f8fe07d770cb325
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
# -*- coding: utf-8 -*-
"""
@File    : CI.py
@Time    : 2023/3/14 11:25
@Author  : geekbing
@LastEditTime : -
@LastEditors : -
@Description : 持续集成CI视图
"""
 
import datetime
import json
import re
import time
 
from ast import literal_eval
from typing import Dict
 
import xmltodict
from django.http import HttpResponse
from django.core.exceptions import ObjectDoesNotExist
from django.conf import settings
from django_celery_beat.models import PeriodicTask
from drf_yasg.utils import swagger_auto_schema
 
from lunarlink import models
from lunarlink.utils import loader, qy_message
from lunarlink.utils.decorator import request_log
from django.utils.decorators import method_decorator
from rest_framework import status
from rest_framework.response import Response
from rest_framework.viewsets import GenericViewSet
from lunarlink.serializers import CISerializer, CIReportSerializer
from lunarlink.utils.loader import save_summary
from lunarlink.utils import response
 
 
def summary2junit(summary: Dict) -> Dict:
    """初始化JUnit的数据结构
    :param summary:
    :return:
    """
    res = {
        "testsuites": {
            "testsuite": {
                "errors": 0,
                "failures": 0,
                "hostname": "",
                "name": "",
                "skipped": 0,
                "tests": 0,
                "time": "0",
                "timestamp": "20210524T18:04:50.941913",
                "testcase": [],
            }
        }
    }
 
    time_info = summary.get("time")
    res["testsuites"]["testsuite"]["time"] = time_info.get("duration")
    start_at: str = time_info.get("start_at")
    timestamp = datetime.datetime.fromtimestamp(int(float(start_at))).strftime(
        "%Y-%m-%dT%H:%M:%S.%f"
    )
    res["testsuites"]["testsuite"]["timestamp"] = timestamp
 
    details = summary.get("details", [])
    res["testsuites"]["testsuite"]["tests"] = len(details)
    for detail in details:
        test_case = build_testcase(detail, res)
        res["testsuites"]["testsuite"]["testcase"].append(test_case)
 
    return res
 
 
def build_testcase(detail, res):
    """
    构建Junit Testcase
    :param detail:
    :param res:
    :return:
    """
    test_case = {"classname": "", "file": "", "line": "", "name": "", "time": ""}
    name = detail.get("name")
    test_case["classname"] = name  # 对应junit的Suite
    records = detail.get("records")
    test_case["line"] = len(records)
    test_case["time"] = detail["time"]["duration"]
    result = detail.get("success")
    step_names = []
    for index, record in enumerate(records):
        step_names.append(f"{index}-{record['name']}")
    test_case["name"] = "\n".join(step_names)  # 对应junit的每个case的Name
 
    # 记录错误case的详细信息
    if result is False:
        case_error, failure_details = build_failure_detail(records)
        if case_error:
            res["testsuites"]["testsuite"]["errors"] += 1
        else:
            res["testsuites"]["testsuite"]["failures"] += 1
 
        failure = {"message": "断言或者抽取失败", "#text": "\n".join(failure_details)}
        test_case["failure"] = failure
 
    return test_case
 
 
def build_failure_detail(records):
    """记录错误case的详细信息
    :param records:
    :return:
    """
    case_error = False
    failure_details = []
    for index, record in enumerate(records):
        step_status = record.get("status")
        if step_status == "failure":
            failure_details.append(
                f"{index}-{record['name']}\n{record.get('attachment')}\n{'*' * 68}"
            )
        elif step_status == "error":
            case_error = True
    return case_error, failure_details
 
 
class CIView(GenericViewSet):
    authentication_classes = []
    serializer_class = CISerializer
    pagination_class = None
 
    @swagger_auto_schema(operation_summary="gitlab-ci触发自动化用例运行")
    @method_decorator(request_log(level="INFO"))
    def run_ci_tests(self, request):
        """
        gitlab-ci发送请求
 
        测试平台解析参数,定时任务中的ci_project_ic和ci_env同时匹配时,返回触发执行任务
        """
        ser = CISerializer(data=request.data)
        if ser.is_valid():
            task_name = "lunarlink.tasks.schedule_debug_suite"
            ci_project_id: int = ser.validated_data.get("ci_project_id")
            ci_env: str = ser.validated_data.get("env")
 
            query = PeriodicTask.objects.filter(enabled=1, task=task_name)
            pk_kwargs_list = query.values("pk", "kwargs", "description")
            enabled_task_ids = []
            project = None
            for pk_kwargs in pk_kwargs_list:
                pk: int = pk_kwargs["pk"]
                kwargs: dict = json.loads(pk_kwargs["kwargs"])
                ci_project_ids: list = literal_eval(
                    kwargs.get("ci_project_ids") or "[]"
                )
                if isinstance(ci_project_ids, int):
                    ci_project_ids = [ci_project_ids]
 
                # 定时任务中的ci_project_id和ci_env同时匹配
                if (
                    ci_project_id in ci_project_ids
                    and ci_env
                    and ci_env == kwargs.get("ci_env")
                ):
                    enabled_task_ids.append(pk)
                    project = pk_kwargs["description"]
 
            # 没有匹配用例,直接返回
            if not enabled_task_ids:
                timestamp = datetime.datetime.fromtimestamp(int(time.time())).strftime(
                    "%Y-%m-%dT%H:%M:%S.%f"
                )
                not_found_case_res = {
                    "testsuites": {
                        "testsuite": {
                            "errors": 0,
                            "failures": 0,
                            "hostname": "",
                            "name": "没有找到匹配的用例",
                            "skipped": 0,
                            "tests": 0,
                            "time": "0",
                            "timestamp": timestamp,
                            "testcase": [],
                        }
                    }
                }
 
                xml_data = xmltodict.unparse(not_found_case_res)
                return HttpResponse(xml_data, content_type="text/xml")
 
            test_sets = []
            suite_list = []
            config_list = []
            config = None
            webhook_set = set()
            override_config_body = None
            for task_id in enabled_task_ids:
                task_obj = query.filter(id=task_id).first()
                # 如果task中存在重载配置,就覆盖用例中的配置
                override_config = json.loads(task_obj.kwargs).get("config")
                if (
                    override_config_body is None
                    and override_config
                    and override_config != "请选择"
                ):
                    try:
                        override_config_body = literal_eval(
                            models.Config.objects.get(
                                name=override_config, project__id=project
                            ).body
                        )
                    except ObjectDoesNotExist:
                        return Response(response.CONFIG_NOT_EXISTS)
 
                if task_obj:
                    # 判断webhook是否合法
                    case_ids = task_obj.args
                    url = json.loads(task_obj.kwargs).get("webhook")
                    url_pattern = r"http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+"
                    if re.match(url_pattern, url):
                        webhook_set.add(url)
                else:
                    continue
                # 反查出一个task中包含的所有用例
                suite = list(
                    models.Case.objects.filter(pk__in=literal_eval(case_ids))
                    .order_by("id")
                    .values("id", "name")
                )
                for case in suite:
                    case_step_list = (
                        models.CaseStep.objects.filter(case__id=case["id"])
                        .order_by("step")
                        .values("body")
                    )
                    testcase_list = []
                    for case_step in case_step_list:
                        body = literal_eval(case_step["body"])
                        if body["request"].get("url"):
                            testcase_list.append(body)
                        elif config is None and body["request"].get("base_url"):
                            # 当前步骤时配置
                            # 如果task中存在重载配置,就覆盖用例中的配置
                            if override_config_body:
                                config = override_config_body
                            else:
                                try:
                                    config = literal_eval(
                                        models.Config.objects.get(
                                            name=body["name"], project__id=project
                                        ).body
                                    )
                                except ObjectDoesNotExist:
                                    return Response(response.CONFIG_NOT_EXISTS)
                    config_list.append(config)
                    test_sets.append(testcase_list)
                    config = None
                suite_list.extend(suite)
                override_config_body = None
            # 同步运行用例
            summary, _ = loader.debug_suite(
                suite=test_sets,
                project=project,
                obj=suite_list,
                config=config_list,
                save=False,
            )
            ci_project_namespace = ser.validated_data["ci_project_namespace"]
            ci_project_name = ser.validated_data["ci_project_name"]
            ci_job_id = ser.validated_data["ci_job_id"]
            summary["name"] = f"{ci_project_namespace}_{ci_project_name}_job{ci_job_id}"
 
            report_id = save_summary(
                name=summary.get("name"),
                summary=summary,
                project=project,
                report_type=4,
                user=ser.validated_data["start_job_user"],
                ci_metadata=ser.validated_data,
            )
            junit_results = summary2junit(summary)
            xml_data = xmltodict.unparse(junit_results)
            summary["task_name"] = "gitlab-ci_" + summary.get("name")
            summary["report_id"] = report_id
            for webhook in webhook_set:
                # TODO: 还需要优化企微发送,加入ci集成参数
                qy_message.send_message(
                    summary=summary,
                    webhook=webhook,
                    ci_job_url=ser.validated_data["ci_job_url"],
                    ci_pipeline_url=ser.validated_data["ci_pipeline_url"],
                    case_count=junit_results["testsuites"]["testsuite"]["tests"],
                )
            return HttpResponse(xml_data, content_type="text/xml")
        else:
            return Response(ser.errors, status=status.HTTP_400_BAD_REQUEST)
 
    @swagger_auto_schema(
        query_serializer=CIReportSerializer(),
        operation_summary="获取gitlab-ci运行的报告url",
    )
    def get_ci_report_url(self, request):
        """获取gitlab-ci运行的报告url"""
        ser = CIReportSerializer(data=request.query_params)
        if ser.is_valid():
            ci_job_id = ser.validated_data["ci_job_id"]
            report_obj = models.Report.objects.filter(ci_job_id=ci_job_id).first()
            if report_obj:
                report_url = f"{settings.BASE_REPORT_URL}/{report_obj.id}/"
            else:
                return Response(data=f"查找的ci_job_id: {ci_job_id}不存在")
            return Response(data=report_url)
        else:
            return Response(data=ser.errors, status=status.HTTP_400_BAD_REQUEST)