# -*- coding: utf-8 -*- """ @File : run.py @Time : 2023/2/6 10:51 @Author : geekbing @LastEditTime : - @LastEditors : - @Description : 运行API """ import logging from ast import literal_eval from django.core.exceptions import ObjectDoesNotExist from drf_yasg import openapi from drf_yasg.utils import swagger_auto_schema from rest_framework.decorators import api_view from rest_framework.response import Response from lunarlink.utils import loader, response from lunarlink import tasks from lunarlink.utils.decorator import request_log from lunarlink.utils.parser import Format from lunarlink import models from apps.exceptions.error import ( ApiNotFound, ConfigNotFound, CaseStepNotFound, ) logger = logging.getLogger(__name__) """运行方式 """ config_err = { "success": False, "msg": "指定的配置文件不存在", "code": "9999", } @api_view(["GET"]) @request_log(level="INFO") def run_api_pk(request, pk): """ 运行单个接口 """ config_name = request.query_params.get("config", "请选择") try: api = models.API.objects.get(id=pk) except ObjectDoesNotExist: return Response(response.API_NOT_FOUND) config = ( None if config_name == "请选择" else literal_eval( models.Config.objects.get(name=config_name, project=api.project).body ) ) summary = loader.debug_api( api=literal_eval(api.body), project=api.project.id, name=api.name, config=config, user=request.user.id, ) return Response(summary) @api_view(["POST"]) @request_log(level="INFO") def run_api(request): """run api by body""" config_name = request.data.pop("config", "请选择") api = Format(request.data) api.parse() config = None if config_name != "请选择": try: config = literal_eval( models.Config.objects.get( name=config_name, project__id=api.project ).body ) except ObjectDoesNotExist: logger.error(f"指定配置文件不存在:{config_name}") return Response(config_err) summary = loader.debug_api( api=api.testcase, project=api.project, name=api.name, config=config, user=request.user.id, ) return Response(summary) @swagger_auto_schema( method="get", manual_parameters=[ openapi.Parameter( "project", openapi.IN_QUERY, description="project id", type=openapi.TYPE_INTEGER, required=True, ), openapi.Parameter( "name", openapi.IN_QUERY, description="case name", type=openapi.TYPE_STRING, required=True, ), openapi.Parameter( "async", openapi.IN_QUERY, description="async", type=openapi.TYPE_STRING, ), ], operation_summary="执行单个测试用例集", ) @api_view(["GET"]) @request_log(level="INFO") def run_testsuite_pk(request, **kwargs): """ 执行测试用例集 URL参数 query string: project name """ pk = kwargs.get("pk") project = request.query_params["project"] name = request.query_params["name"] back_async = request.query_params.get("async", False) test_case = [] config = None test_list = ( models.CaseStep.objects.filter(case__id=pk).order_by("step").values("body") ) for content in test_list: body = literal_eval(content["body"]) if "base_url" in body["request"].keys(): try: config = literal_eval( models.Config.objects.get( name=body["name"], project__id=project ).body ) except ObjectDoesNotExist: return Response(response.CONFIG_NOT_EXISTS) else: continue test_case.append(body) # 异步执行 if back_async: tasks.async_debug_api.delay( api=test_case, project=project, name=name, config=config, user=request.user.id, ) summary = response.TASK_RUN_SUCCESS else: # 同步 summary = loader.debug_api( api=test_case, project=project, name=name, config=config, user=request.user.id, ) return Response(summary) @swagger_auto_schema( method="post", request_body=openapi.Schema( type=openapi.TYPE_OBJECT, required=[ "body", "project", "name", ], properties={ "body": openapi.Schema( type=openapi.TYPE_ARRAY, items=openapi.Schema( type=openapi.TYPE_OBJECT, ), description="body", ), "project": openapi.Schema(type=openapi.TYPE_INTEGER, description="project"), "name": openapi.Schema(type=openapi.TYPE_STRING, description="name"), }, ), ) @api_view(["POST"]) @request_log(level="INFO") def run_testsuite(request): """ 调式测试用例集 { name: str, body: dict, project: int, } """ try: body = request.data["body"] project = request.data["project"] name = request.data["name"] except ObjectDoesNotExist: return Response(response.KEY_MISS) test_case = [] config = None for test in body: try: test = loader.load_test(test=test, project=project) except ConfigNotFound: return Response(response.CONFIG_NOT_EXISTS) except ApiNotFound: return Response(response.API_NOT_FOUND) except CaseStepNotFound: return Response(response.CASE_STEP_NOT_EXIST) if "base_url" in test["request"].keys(): config = test continue test_case.append(test) summary = loader.debug_api( api=test_case, project=project, name=name, config=config, user=request.user.id, ) return Response(summary) @api_view(["POST"]) @request_log(level="INFO") def run_test(request): """ 测试用例中调式单个接口 入参: { body: dict, project: int, config: null or dict, } """ body = request.data.get("body") config = request.data.get("config", None) project = request.data.get("project") if config: try: config_obj = models.Config.objects.get(project=project, name=config["name"]) except ObjectDoesNotExist: return Response(response.CONFIG_NOT_EXISTS) config = literal_eval(config_obj.body) try: test = loader.load_test(test=body) except ConfigNotFound: return Response(response.CONFIG_NOT_EXISTS) except ApiNotFound: return Response(response.API_NOT_FOUND) except CaseStepNotFound: return Response(response.CASE_STEP_NOT_EXIST) summary = loader.debug_api( api=test, project=project, name=body.get("name", None), config=config, user=request.user.id, ) return Response(summary) @api_view(["POST"]) @request_log(level="INFO") def run_suite_tree(request): """ 选择目录节点执行用例集 { project: int relation: list name: str async: bool } """ config = None try: project = request.data["project"] relation = request.data["relation"] back_async = request.data["async"] report = request.data["name"] config_id = request.data.get("config_id") except KeyError: return Response(response.KEY_MISS) if config_id: # 前端有指定config, 会覆盖用例本身的config config = literal_eval(models.Config.objects.get(id=config_id).body) test_sets = [] suite_list = [] config_list = [] for relation_id in relation: case_name_id_mapping_list = list( models.Case.objects.filter(project__id=project, relation=relation_id) .order_by("id") .values("id", "name") ) for content in case_name_id_mapping_list: test_list = ( models.CaseStep.objects.filter(case__id=content["id"]) .order_by("step") .values("body") ) testcase_list = [] for test in test_list: body = literal_eval(test["body"]) if body["request"].get("url"): testcase_list.append(body) elif config is None and body["request"].get("base_url"): config = literal_eval( models.Config.objects.get( name=body["name"], project__id=project ).body ) config_list.append(config) test_sets.append(testcase_list) config = None suite_list.extend(case_name_id_mapping_list) if back_async: # 异步 tasks.async_debug_suite.delay( suite=test_sets, project=project, obj=suite_list, report=report, config=config_list, user=request.user.id, ) summary = loader.TEST_NOTE_EXISTS summary["msg"] = "用例运行中,请稍后查看报告" else: summary, _ = loader.debug_suite( suite=test_sets, project=project, obj=suite_list, config=config_list, save=True, user=request.user.id, ) return Response(summary) @api_view(["POST"]) @request_log(level="INFO") def run_multi_tests(request): """ 通过指定id, 运行多个指定用例 { "name": "批量运行2条用例", # 报告名 "project": 11, "case_config_mapping_list": [ { "config_name": "config_name1, "id": 153, # 用例id "name": "case_name1" # 用例名 } ] } """ try: project = request.data["project"] report_name = request.data["name"] except KeyError: return Response(response.KEY_MISS) # 默认同步运行用例 back_async = request.data.get("async") or False case_config_mapping_list = request.data["case_config_mapping_list"] config_body_mapping = {} # 解析用例列表中的配置 for config in case_config_mapping_list: config_name = config["config_name"] if not config_body_mapping.get(config_name): config_body_mapping[config_name] = literal_eval( models.Config.objects.get( name=config["config_name"], project__id=project ).body ) test_sets = [] suite_list = [] config_list = [] for case_config_mapping in case_config_mapping_list: case_id = case_config_mapping["id"] config_name = case_config_mapping["config_name"] # 获取用例的所有步骤 case_step_list = ( models.CaseStep.objects.filter(case__id=case_id) .order_by("step") .values("body") ) parsed_case_step_list = [] for case_step in case_step_list: body = literal_eval(case_step["body"]) if body["request"].get("url"): parsed_case_step_list.append(body) config_body = config_body_mapping[config_name] # 记录当前用例的配置信息 config_list.append(config_body) # 记录已经解析好的用例 test_sets.append(parsed_case_step_list) # 用例和配置的映射关系 suite_list.extend(case_config_mapping_list) if back_async: # 异步 tasks.async_debug_suite.delay( suite=test_sets, project=project, obj=suite_list, report=report_name, config=config_list, user=request.user.id, ) summary = loader.TEST_NOTE_EXISTS summary["msg"] = "用例运行中,请稍后查看报告" else: summary, _ = loader.debug_suite( suite=test_sets, project=project, obj=suite_list, config=config_list, save=True, user=request.user, report_name=report_name, ) return Response(summary)