hyb
2025-05-14 87453ffd761425b9f363a09a0f8fe07d770cb325
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
# -*- coding: utf-8 -*-
"""
@File    : loader.py
@Time    : 2023/1/16 15:27
@Author  : geekbing
@LastEditTime : -
@LastEditors : -
@Description : -
"""
import concurrent.futures
import copy
import datetime
import importlib
import json
import logging
import os
import shutil
import sys
import time
import types
import tempfile
from ast import literal_eval
from typing import Dict, List, Tuple, Union
from concurrent.futures import ThreadPoolExecutor
 
from bs4 import BeautifulSoup
from django.core.exceptions import ObjectDoesNotExist
from requests.utils import dict_from_cookiejar
from requests.cookies import RequestsCookieJar
 
from backend.settings import BASE_DIR
from lunarlink import models
from lunarlink.utils.parser import Format
from lunarlink.views.report import ConvertRequest
from httprunner import HttpRunner
from apps.exceptions.error import (
    ApiNotFound,
    ConfigNotFound,
    CaseStepNotFound,
)
 
logger = logging.getLogger(__name__)
 
TEST_NOTE_EXISTS = {
    "code": "0102",
    "status": False,
    "msg": "节点下没有接口或用例集",
}
 
 
def is_function(tup: Tuple) -> bool:
    """
    判断元组内的对象是否为function, 真返回True, 否返回False
    :param tup:
    :return:
    """
    name, item = tup
    return isinstance(item, types.FunctionType)
 
 
def is_variable(tup: Tuple):
    """
    Takes (name, object) tuple, returns True if it is a variable.
    :param tup:
    :return:
    """
    name, item = tup
    if callable(item):
        # function or class
        return False
 
    if isinstance(item, types.ModuleType):
        # imported module
        return False
 
    if name.startswith("_"):
        # private property
        return False
 
    return True
 
 
class FileLoader:
    @staticmethod
    def dump_python_file(python_file, data):
        """dump python file"""
        with open(python_file, "w", encoding="utf-8") as stream:
            stream.write(data)
 
    @staticmethod
    def load_python_module(file_path):
        """load python module.
 
        :param file_path: python path
        :return:
            dict: variables and functions mapping for specified python module
 
                {
                    "variables": {},
                    "functions": {}
                }
        """
        debugtalk_module = {"variables": {}, "functions": {}}
        debugtalk_module_name = "debugtalk"
        # 修复切换项目后,debugtalk 有缓存
        if sys.modules.get(debugtalk_module_name):
            del sys.modules[debugtalk_module_name]
        sys.path.insert(0, file_path)
        module = importlib.import_module(debugtalk_module_name)
        # 修复重载bug
        importlib.reload(module)
        sys.path.pop(0)
 
        for name, item in vars(module).items():
            if is_function((name, item)):
                debugtalk_module["functions"][name] = item
            elif is_variable((name, item)):
                if isinstance(item, tuple):
                    continue
                debugtalk_module["variables"][name] = item
            else:
                pass
 
        return debugtalk_module
 
 
def load_debugtalk(project: int):
    """
    import debugtalk.py in sys.path and reload
    :param project:
    :return:
    """
    # debugtalk.py
    code = models.Debugtalk.objects.get(project__id=project).code
 
    tempfile_path = tempfile.mkdtemp(
        prefix="LunarLink", dir=os.path.join(BASE_DIR, "tempWorkDir")
    )
    file_path = os.path.join(tempfile_path, "debugtalk.py")
    os.chdir(tempfile_path)
    try:
        FileLoader.dump_python_file(file_path, code)
        debugtalk = FileLoader.load_python_module(os.path.dirname(file_path))
        return debugtalk, file_path
    except Exception as e:
        logger.error(e)
        os.chdir(BASE_DIR)  # 递归删除文件夹下的所有子文件夹和子文件
        shutil.rmtree(os.path.dirname(file_path))
 
 
def parse_tests(
    testcases: List,
    debugtalk: Dict,
    name=None,
    config=None,
    project=None,
):
    """
    get test case structure
 
    :param testcases:
    :param debugtalk: 驱动代码
    :param name:
    :param config: 配置文件
    :param project:
    :return:
    """
 
    refs = {
        "env": {},
        "def-api": {},
        "def-testcase": {},
        "debugtalk": debugtalk,
    }
 
    test_set = {
        "config": {"name": testcases[-1]["name"], "variables": []},
        "teststeps": testcases,
    }
 
    if config:
        test_set["config"] = config
 
    if name:
        test_set["config"]["name"] = name
 
    # 获取当前项目的全局变量
    global_variables = (
        models.Variables.objects.filter(project=project).all().values("key", "value")
    )
    # 并集,重复内容只保留一个
    all_config_variables_keys = set().union(
        *(d.keys() for d in test_set["config"].setdefault("variables", []))
    )
    global_variables_list_of_dict = []
    for item in global_variables:
        if item["key"] not in all_config_variables_keys:
            global_variables_list_of_dict.append({item["key"]: item["value"]})
 
    # 有 variables 就直接 extend,没有就加一个[], 再 extend
    # 配置的 variables 和全局变量重叠,优先使用配置中的 variables
    test_set["config"].setdefault("variables", []).extend(global_variables_list_of_dict)
    test_set["config"]["refs"] = refs
 
    # 配置中的变量和全局变量合并
    variables_mapping = {}
    if config:
        for variables in config["variables"]:
            variables_mapping.update(variables)
 
    return test_set
 
 
def debug_api(
    api: Union[Dict, List],
    project: int,
    name=None,
    config=None,
    save=True,
    user=None,
):
    """
    调式接口
 
    :param api:
    :param project:
    :param name:
    :param config:
    :param save:
    :param user:
    :return:
    """
 
    if len(api) == 0:
        return TEST_NOTE_EXISTS
 
    # testcase
    if isinstance(api, dict):
        """
        httprunner scripts or teststeps
        """
        api = [api]
 
    # 参数化参数过滤,只加载api中调用到的参数
    if config and config.get("parameters"):
        api_params = []
        for item in api:
            params = (
                item["request"].get("params", {})
                or item["request"].get("json", {})
                or item["request"].get("data", {})
            )
            for v in params.values():
                if isinstance(v, list):
                    api_params.extend(v)
                else:
                    api_params.append(v)
        parameters = []
        for value in config["parameters"]:
            for key in value.keys():
                # key可能是key-key1这种模式, 所以需要分割
                for i in key.split("-"):
                    if "$" + i in api_params:
                        parameters.append(value)
                        break
 
        config["parameters"] = parameters
 
    debugtalk = load_debugtalk(project=project)
    debugtalk_content = debugtalk[0]
    debugtalk_path = debugtalk[1]
    os.chdir(os.path.dirname(debugtalk_path))
    try:
        testcase_list = [
            parse_tests(
                testcases=api,
                debugtalk=debugtalk_content,
                name=name,
                config=config,
                project=project,
            )
        ]
 
        kwargs = {"failfast": False}
        runner = HttpRunner(**kwargs)
        runner.run(path_or_testcases=testcase_list)
        summary = parse_summary(summary=runner.summary)
 
        if save:
            # 保存报告信息
            save_summary(
                name=name,
                summary=summary,
                project=project,
                report_type=1,
                user=user,
            )
 
        # 复制一份 response 的 json
        for details in summary.get("details", []):
            for record in details.get("records", []):
                json_data = record["meta_data"]["response"].pop("json", {})
                if json_data:
                    record["meta_data"]["response"]["jsonCopy"] = json_data
        ConvertRequest.generate_curl(report_details=summary["details"])
        return summary
    except Exception as e:
        logger.error(f"debug_api error")
        raise SyntaxError(str(e))
    finally:
        os.chdir(BASE_DIR)
        shutil.rmtree(os.path.dirname(debugtalk_path))
 
 
def debug_suite(
    suite,
    project,
    obj,
    config=None,
    save=True,
    user=None,
    report_type=1,
    report_name="",
    allow_parallel=False,
):
    """debug suite
 
    :param suite: list[list[dict]], 用例列表
    :param project: int, 项目id
    :param obj: list[dict] [{"id": int "name": str}], 用例的名称和id
    :param config: list[dict], 每个用例运行的配置
    :param save:
    :param user:
    :param report_type: int, 默认类型是调试
    :param report_name:
    :param allow_parallel: bool, 是否允许并行
    :return:
    """
    if len(suite) == 0:
        return TEST_NOTE_EXISTS, 0
 
    debugtalk = load_debugtalk(project=project)
    debugtalk_content = debugtalk[0]
    debugtalk_path = debugtalk[1]
    os.chdir(os.path.dirname(debugtalk_path))
 
    # 先记录配置的名称,parse_tests会改变config
    config_name_list = [d["name"] for d in config]
 
    try:
        test_sets = create_test_sets(
            suite=suite,
            obj=obj,
            debugtalk_content=debugtalk_content,
            config=config,
            project=project,
        )
 
        if allow_parallel:
            summary = debug_suite_parallel(test_sets)
        else:
            kwargs = {"failfast": False}
            runner = HttpRunner(**kwargs)
            runner.run(test_sets)
            summary = parse_summary(runner.summary)
 
        # 统计用例级别的数据
        summary = update_summary(
            obj=obj,
            test_sets=test_sets,
            project=project,
            summary=summary,
            config_name_list=config_name_list,
        )
 
        report_id = 0
        if save:
            report_id = save_summary(
                name=report_name or f"批量运行{len(test_sets)}条用例",
                summary=summary,
                project=project,
                report_type=report_type,
                user=user,
            )
        # 复制一份response的json
        summary = process_response_json(summary)
        return summary, report_id
    except Exception as e:
        raise SyntaxError(str(e))
    finally:
        os.chdir(BASE_DIR)
        shutil.rmtree(os.path.dirname(debugtalk_path))
 
 
def create_test_sets(suite, obj, debugtalk_content, config, project):
    """根据给定的 suite, debugtalk_content, config 和 project,创建 test_sets
    :param obj:
    :param suite:
    :param debugtalk_content:
    :param config:
    :param project:
    :return:
    """
    test_sets = []
    for index in range(len(suite)):
        # copy.deepcopy 修复引用bug
        testcases = copy.deepcopy(
            parse_tests(
                testcases=suite[index],
                debugtalk=debugtalk_content,
                name=obj[index]["name"],
                config=config[index],
                project=project,
            )
        )
        test_sets.append(testcases)
    return test_sets
 
 
def update_summary(obj, test_sets, project, summary, config_name_list):
    """
    根据给定的 summary, config_name_list 和 details,更新 summary。
    """
    details: List = summary["details"]
    failure_case_config_mapping_list = []
    for index, detail in enumerate(details):
        if detail["success"] is False:
            # 用例失败时, 记录用例执行的配置
            failure_case_config = {"config_name": config_name_list[index]}
            failure_case_config.update(obj[index])
            failure_case_config_mapping_list.append(failure_case_config)
    case_count = len(test_sets)
    case_fail_rate = f"{len(failure_case_config_mapping_list) / case_count:.2%}"
    summary["stat"].update(
        {
            "failure_case_config_mapping_list": failure_case_config_mapping_list,
            "case_count": case_count,
            "case_fail_rate": case_fail_rate,
            "project": project,
        }
    )
    return summary
 
 
def process_response_json(summary):
    """
    处理 summary 中的 response json,以便在 summary 中添加 jsonCopy。
    """
    for _details in summary.get("details", []):
        for record in _details.get("records", []):
            json_data = record["meta_data"]["response"].pop("json", {})
            if json_data:
                record["meta_data"]["response"]["jsonCopy"] = json_data
    return summary
 
 
def debug_suite_parallel(test_sets: List):
    """
    并行运行用例
    :param test_sets:
    :return:
    """
 
    def run_test(test_set: Dict):
        kwargs = {"failfast": False}
        runner = HttpRunner(**kwargs)
        runner.run([test_set])
        return parse_summary(runner.summary)
 
    start = time.time()
    # 限制最多10个线程
    workers = min(len(test_sets), 10)
    with ThreadPoolExecutor(max_workers=workers) as executor:
        futures = {executor.submit(run_test, t): t for t in test_sets}
        results = [
            future.result() for future in concurrent.futures.as_completed(futures)
        ]
 
    duration = time.time() - start
    return merge_parallel_result(results, duration)
 
 
def merge_parallel_result(results: List, duration: float):
    """
    合并并行的结果,保持和串行的运行结果一致
    :param results: 用例执行结果
    :param duration: 用例执行时间
    :return:
    """
    base_result: Dict = results.pop()
    for result in results:
        base_result["success"] = result["success"] and base_result["success"]
        for k, v in base_result["stat"].items():
            base_result["stat"][k] = v + result["stat"][k]
 
        for k, v in base_result["time"].items():
            if k == "start_at":
                base_result["time"][k] = min(v, result["time"][k])
            else:
                base_result["time"][k] = v + result["time"][k]
        base_result["details"].extend(result["details"])
    base_result["time"]["duration"] = duration
 
    # 删除多余的key
    keys = list(base_result.keys())
    for k in keys:
        if k not in ("success", "stat", "time", "platform", "details"):
            base_result.pop(k)
    return base_result
 
 
def parse_summary(summary):
    """序列化summary
    :param summary:
    :return:
    """
 
    for detail in summary["details"]:
        for record in detail["records"]:
            for key, value in record["meta_data"]["request"].items():
                if isinstance(value, bytes):
                    record["meta_data"]["request"][key] = value.decode("utf-8")
                if isinstance(value, RequestsCookieJar):
                    record["meta_data"]["request"][key] = dict_from_cookiejar(value)
 
            for key, value in record["meta_data"]["response"].items():
                if isinstance(value, bytes):
                    record["meta_data"]["response"][key] = value.decode("utf-8")
                if isinstance(value, RequestsCookieJar):
                    record["meta_data"]["response"][key] = dict_from_cookiejar(value)
 
            if "text/html" in record["meta_data"]["response"]["content_type"]:
                record["meta_data"]["response"]["content"] = BeautifulSoup(
                    record["meta_data"]["response"]["content"], features="html.parser"
                ).prettify()
 
            if record["status"] == "failure":
                record["meta_data"].update({"validators": []})
 
    return summary
 
 
def save_summary(name, summary, project, report_type=2, user=None, ci_metadata=None):
    """保存报告信息"""
 
    if ci_metadata is None:
        ci_metadata = {}
 
    if "status" in summary.keys():
        return
 
    if name == "" or name is None:
        name = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
 
    # 需要先复制一份,不然会影响debug_api返回给前端的报告
    summary = copy.deepcopy(summary)
    summary_detail = summary.pop("details")
    report = models.Report.objects.create(
        **{
            "project": models.Project.objects.get(id=project),
            "name": name,
            "type": report_type,
            "status": summary["success"],
            "summary": json.dumps(summary, ensure_ascii=False),
            "creator_id": user,
            "ci_metadata": ci_metadata,
            "ci_project_id": ci_metadata.get("ci_project_id"),
            "ci_job_id": ci_metadata.get("ci_job_id", None),
        }
    )
 
    models.ReportDetail.objects.create(
        summary_detail=summary_detail,
        report=report,
    )
 
    return report.id
 
 
def load_test(test, project=None):
    """
    格式化测试用例
    :param test:
    :param project:
    :return:
    """
    try:
        format_http = Format(test["newBody"])
        format_http.parse()
        testcase = format_http.testcase
    except KeyError:
        if "case" in test.keys():
            if test["body"]["method"] == "config":
                try:
                    case_step = models.Config.objects.get(
                        name=test["body"]["name"], project=project
                    )
                except ObjectDoesNotExist:
                    raise ConfigNotFound("指定的配置不存在")
            else:
                try:
                    case_step = models.CaseStep.objects.get(id=test["id"])
                except ObjectDoesNotExist:
                    raise CaseStepNotFound("指定的用例步骤不存在")
        else:
            if test["body"]["method"] == "config":
                try:
                    case_step = models.Config.objects.get(
                        name=test["body"]["name"], project=project
                    )
                except ObjectDoesNotExist:
                    raise ConfigNotFound("指定的配置不存在")
            else:
                try:
                    case_step = models.API.objects.get(id=test["id"])
                except ObjectDoesNotExist:
                    raise ApiNotFound("指定的接口不存在")
        testcase = literal_eval(case_step.body)
        name = test["body"]["name"]
 
        if case_step.name != name:
            testcase["name"] = name
 
    return testcase