# -*- coding: utf-8 -*- """ @File : suite.py @Time : 2023/2/23 15:50 @Author : geekbing @LastEditTime : - @LastEditors : - @Description : 测试用例 """ import json import logging from enum import Enum from typing import List from django.core.exceptions import ObjectDoesNotExist from django_celery_beat import models as celery_models from django.db import transaction from django.db.models import Q from django.utils import timezone from django.utils.decorators import method_decorator from rest_framework.status import HTTP_400_BAD_REQUEST from rest_framework.views import APIView from rest_framework.mixins import DestroyModelMixin from rest_framework.viewsets import GenericViewSet from rest_framework.response import Response from apps.exceptions.error import ( ApiNotFound, CaseStepNotFound, ConfigNotFound, RelationNotFound, ) from apps.schema.request import RequestInfo from backend.utils.redis_manager import RedisHelper from backend.utils.request_util import get_request_ip from lunarlink import models, serializers from lunarlink.utils import response from lunarlink.utils import prepare from lunarlink.utils.decorator import request_log from lunarlink.utils.enums.TreeTypeEnum import TreeType from lunarlink.utils.query_filters import filter_by_time_range, filter_by_node from lunarlink.utils.request.generator import CaseGenerator logger = logging.getLogger(__name__) class SearchType(Enum): API = "2" CASE = "1" tag_options = { "冒烟用例": 1, "集成用例": 2, "监控用例": 3, "核心用例": 4, } class TestCaseView(GenericViewSet, DestroyModelMixin): queryset = models.Case.objects serializer_class = serializers.CaseSerializer @method_decorator(request_log(level="INFO")) def get(self, request): """ 查询指定CASE列表 { "project": int "node": int } """ ser = serializers.CaseSearchSerializer(data=request.query_params) if ser.is_valid(): node = ser.validated_data.get("node") project = ser.validated_data.get("project") search = ser.validated_data.get("search") search_type = ser.validated_data.get("searchType") case_type = ser.validated_data.get("caseType") only_me = ser.validated_data.get("onlyMe") creator = ser.validated_data.get("creator") start_time = ser.validated_data.get("start_time") end_time = ser.validated_data.get("end_time") queryset = ( self.get_queryset().filter(project__id=project).order_by("-create_time") ) # 根据创建时间过滤 queryset = filter_by_time_range(queryset, start_time, end_time) if only_me is True: queryset = queryset.filter(creator=request.user.id) if creator: queryset = queryset.filter(creator__name=creator) try: queryset = filter_by_node(queryset, project, node, TreeType.CASE.value) except RelationNotFound: return Response(response.TREE_NOT_EXISTS) if case_type != "": queryset = queryset.filter(tag=case_type) if search != "": # 用例名称搜索 if search_type == SearchType.CASE.value: queryset = queryset.filter(name__contains=search) # API名称或API URL搜索 elif search_type == SearchType.API.value: case_id = self.case_step_search(search) queryset = queryset.filter(pk__in=case_id) pagination_query = self.paginate_queryset(queryset) serializer = self.get_serializer(pagination_query, many=True) return self.get_paginated_response(serializer.data) else: return Response(ser.errors, status=HTTP_400_BAD_REQUEST) @method_decorator(request_log(level="INFO")) def post(self, request): """ 新增测试用例集 { length: int, name: str, project: int, relation: int, tag: str, body: [{ id: int, name: str, url: str, method: str, project: int, relation: int, body: {}, rig_env: int, tag: int, tag_name: str, update_time: datetime, is_deleted: bool, creator: str, }], cases: [{ case_name: str, case_id: str, }], relation_name: str, } """ project_id = int(request.data.pop("project", 0)) if not models.Project.objects.filter(id=project_id).exists(): return Response(response.PROJECT_NOT_EXISTS) request.data["tag"] = tag_options[request.data["tag"]] step_body_list = request.data.pop("body", None) # 数据库事务处理 try: with transaction.atomic(): case_obj = models.Case.objects.create( **request.data, project_id=project_id, creator=request.user, ) prepare.generate_casestep( step_body_list=step_body_list, case_obj=case_obj, creator=request.user, ) except ConfigNotFound: return Response(response.CONFIG_NOT_EXISTS) except ApiNotFound: return Response(response.API_NOT_FOUND) except Exception as e: logger.error(f"An unexpected error occurred: {e}", exc_info=True) return Response({"error": "An unexpected error occurred"}, status=500) return Response({"test_id": case_obj.id, **response.CASE_ADD_SUCCESS}) @staticmethod def case_step_search(search: str): """ 搜索case_step的url或name 返回对应的case_id """ case_id = models.CaseStep.objects.filter( Q(name__contains=search) | Q(url__contains=search) ).values("case_id") case_id = set(item["case_id"] for _, item in enumerate(case_id)) return case_id @method_decorator(request_log(level="INFO")) def patch(self, request, pk): """ 更新测试用例集 { name: str id: int body: [] project: int } """ step_body_list = request.data.pop("body", None) try: case_obj = models.Case.objects.get(id=pk) except ObjectDoesNotExist: return Response(response.CASE_NOT_EXISTS) try: prepare.update_casestep( step_body_list=step_body_list, case_obj=case_obj, creator=request.user, updater=request.user.id, ) except ConfigNotFound: return Response(response.CONFIG_NOT_EXISTS) except ApiNotFound: return Response(response.API_NOT_FOUND) except CaseStepNotFound: return Response(response.CASE_STEP_NOT_EXIST) except Exception as e: logger.error(f"An unexpected error occurred: {e}", exc_info=True) return Response({"error": "An unexpected error occurred"}, status=500) request.data["tag"] = tag_options[request.data["tag"]] models.Case.objects.filter(id=pk).update( **request.data, update_time=timezone.now(), updater=request.user.id, ) return Response(response.CASE_UPDATE_SUCCESS) @method_decorator(request_log(level="INFO")) def copy(self, request, pk): """复制测试用例 pk int: test id { name: test name relation: int project: int } """ try: name = request.data["name"] except KeyError: return Response(response.KEY_MISS) username = request.user.id if "|" in name: resp = self.split(pk, name) else: try: case = models.Case.objects.get(id=pk) except ObjectDoesNotExist: return Response(response.CASE_NOT_EXISTS) case.id = None case.name = name case.creator = request.user case.updater = username case.save() case_step = models.CaseStep.objects.filter(case__id=pk) for step in case_step: step.id = None step.case = case step.creator = request.user step.updater = username step.save() resp = response.CASE_ADD_SUCCESS return Response(resp) @method_decorator(request_log(level="INFO")) def destroy(self, request, pk): """ 单个删除 pk: test id """ case_obj = models.Case.objects.filter(id=pk).first() if not case_obj: return Response(response.CASE_NOT_EXISTS) # description为项目id, args为用例id if celery_models.PeriodicTask.objects.filter( args__contains=str(case_obj.id) ).exists(): return Response(response.CASE_IS_USED) case_obj.is_deleted = True case_obj.updater = request.user.id case_obj.update_time = timezone.now() case_obj.save() # 这将触发 pre_save 信号,并执行 delete_related_steps 处理器函数 return Response(response.CASE_DELETE_SUCCESS) @method_decorator(request_log(level="INFO")) def bulk_destroy(self, request, *args, **kwargs): """ 批量删除用例 [ { id:int } ] """ ids = [content["id"] for content in request.data] objs = list(models.Case.objects.filter(id__in=ids)) if not objs: return Response(response.CASE_NOT_EXISTS) unused_ids = [] try: with transaction.atomic(): for obj in objs: if celery_models.PeriodicTask.objects.filter( args__contains=str(obj.id) ).exists(): continue else: unused_ids.append(obj.id) if not unused_ids: return Response(response.CASE_IS_USED) # 批量更新待删除的用例 models.Case.objects.filter(id__in=unused_ids).update( is_deleted=True, update_time=timezone.now(), updater=request.user.id, ) # 批量更新待删除用例的CaseStep models.CaseStep.objects.filter(case__in=unused_ids).update( is_deleted=True, update_time=timezone.now(), updater=request.user.id, ) except Exception as e: return Response({"error": str(e)}, status=400) return Response(response.CASE_DELETE_SUCCESS) @staticmethod def split(pk, name: str): """切割用例 :param pk: 用例id :param name: 用例名称 :return: """ split_case_name = name.split("|")[0] split_condition = name.split("|")[1] # 更新原有case长度 case = models.Case.objects.get(id=pk) case_step = models.CaseStep.objects.filter( case__id=pk, name__contains=split_condition ) case_step_length = len(case_step) case.length -= case_step_length case.save() new_case = models.Case.objects.filter(name=split_case_name).last() if new_case: new_case.length += case_step_length new_case.save() case_step.update(case=new_case) else: # 创建一条新的case case.id = None case.name = split_case_name case.length = case_step_length case.save() # 把原来的case_step中的case_id改成新的case_id case_step.update(case=case) return response.CASE_SPILT_SUCCESS @method_decorator(request_log(level="INFO")) def move(self, request): """ 移动测试用例到其它目录 """ project: int = request.data.get("project") relation: int = request.data.get("relation") cases: list = request.data.get("case") ids = [case["id"] for case in cases] objs = models.Case.objects.filter(project=project, id__in=ids) if objs: objs.update(relation=relation) else: return Response(response.CASE_NOT_EXISTS) return Response(response.CASE_UPDATE_SUCCESS) @method_decorator(request_log(level="INFO")) def update_tag(self, request): """批量更新用例类型""" case_ids: list = request.data.get("case_ids", []) project_id: int = request.data.get("project_id", 0) tag: list = request.data.get("tag") objs = models.Case.objects.filter(project_id=project_id, pk__in=case_ids) if objs: objs.update(tag=tag) else: return Response(response.CASE_NOT_EXISTS) return Response(response.TAG_UPDATE_SUCCESS) @method_decorator(request_log(level="INFO")) def put(self, request, **kwargs): """ 用例步骤同步成功 将api最新的name, body, url, method更新到用例case_step中 pk: case_id """ # case_id pk = kwargs.get("pk") # 在case_step表中找出case_id对应的所有记录,并且排除config api_id_list_of_dict = ( models.CaseStep.objects.filter(case_id=pk) .exclude(method="config") .values("source_api_id", "step") ) if api_id_list_of_dict: # 通过source_api_id找到原来的api # 把原来的api的name, body, url, method更新到case_step中 for item in api_id_list_of_dict: source_api_id: int = item["source_api_id"] # 不存在api_id的直接跳过 if source_api_id == 0: continue step: int = item["step"] source_api = ( models.API.objects.filter(pk=source_api_id) .values("name", "body", "url", "method") .first() ) if source_api is not None: models.CaseStep.objects.filter( case_id=pk, source_api_id=source_api_id, step=step ).update(**source_api) models.Case.objects.filter(pk=pk).update(update_time=timezone.now()) else: return Response(response.CASE_NOT_EXISTS) return Response(response.CASE_STEP_SYNC_SUCCESS) class CaseStepView(APIView): """测试用例step操作视图""" @method_decorator(request_log(level="INFO")) def get(self, request, **kwargs): """ 获取用例集信息 pk: case_id """ pk = kwargs.get("pk") queryset = models.CaseStep.objects.filter(case__id=pk).order_by("step") serializer = serializers.CaseStepSerializer(instance=queryset, many=True) try: case_obj = models.Case.objects.get(id=pk) except ObjectDoesNotExist: return Response(response.CASE_NOT_EXISTS) resp = { "case": serializers.CaseSerializer(instance=case_obj, many=False).data, "step": serializer.data, } return Response(resp) class RecordStartView(APIView): """开始录制接口请求""" @method_decorator(request_log(level="INFO")) def get(self, request): regex = request.query_params.get("regex") client_ip = request.query_params.get("ip") is_local_str = request.query_params.get("local", "true") # 默认为"true"字符串 is_local = is_local_str.lower() == "true" # 转换字符串为布尔值 user_id = request.user.id if not client_ip: client_ip = get_request_ip(request) record = RedisHelper.get_address_record(client_ip) if record: record_data = json.loads(record) if record_data.get("user_id", "") != user_id: return Response(response.RECORD_IS_RUNNING) RedisHelper.set_address_record(user_id, client_ip, regex, is_local) return Response(response.RECORD_START_SUCCESS) class RecordStopView(APIView): """停止录制接口请求""" @method_decorator(request_log(level="INFO")) def get(self, request): client_ip = request.query_params.get("ip") if not client_ip: client_ip = get_request_ip(request) record = RedisHelper.get_address_record(client_ip) if record: record_data = json.loads(record) RedisHelper.remove_user_record(record_data.get("user_id", "")) RedisHelper.remove_address_record(client_ip) return Response(response.RECORD_STOP_SUCCESS) class RecordStatusView(APIView): """获取录制接口请求状态""" @method_decorator(request_log(level="INFO")) def get(self, request): client_ip = request.query_params.get("ip") user_id = request.user.id if not client_ip: client_ip = get_request_ip(request) address_record = RedisHelper.get_address_record(client_ip) user_record = RedisHelper.get_user_record(user_id) is_recording = False regex = "" ip = "" is_local = True if address_record: record_data = json.loads(address_record) if record_data.get("user_id", "") == user_id: is_recording = True regex = record_data.get("regex", "") ip = record_data.get("ip", "") is_local = record_data.get("local", "") else: is_recording = False if user_record: record_data = json.loads(user_record) is_recording = True regex = record_data.get("regex", "") ip = record_data.get("ip", "") is_local = record_data.get("local", "") data = RedisHelper.list_record_data(user_id) return Response( dict(results=data, regex=regex, ip=ip, status=is_recording, local=is_local) ) class RecordRemoveView(APIView): """删除录制数据""" @method_decorator(request_log(level="INFO")) def get(self, request): index = request.query_params.get("index") user_id = request.user.id RedisHelper.remove_record_data(user_id, index) return Response(response.RECORD_REMOVE_SUCCESS) class GenerateCaseView(APIView): """录制生成用例""" @method_decorator(request_log(level="INFO")) def post(self, request): case_name = request.data.get("name") length = request.data.get("length") project_id = request.data.get("project") case_dir = request.data.get("case_dir") api_dir = request.data.get("api_dir") config = request.data.get("config") raw_requests = request.data.get("requests") if not raw_requests: return Response(response.RECORD_DATA_ERROR) if not models.Project.objects.filter(id=project_id).exists(): return Response(response.PROJECT_NOT_EXISTS) config_name = config.get("body").get("name") try: config_obj = models.Config.objects.get( name=config_name, project_id=project_id ) except models.Config.DoesNotExist: return Response(response.CONFIG_NOT_EXISTS) requests = [RequestInfo(**item) for item in raw_requests] CaseGenerator.extract_field(requests) record_case, api_instances = CaseGenerator.generate_case( length=length, project_id=project_id, case_dir=case_dir, api_dir=api_dir, config=config, case_name=case_name, requests=requests, user=request.user.id, ) record_case_dict = record_case.dict() step_body_list = record_case_dict.pop("body", None) try: with transaction.atomic(): case_obj = models.Case.objects.create( **record_case_dict, creator=request.user ) api_ids = self.save_api_instances(api_instances) prepare.generate_record_casestep( api_ids=api_ids, body=step_body_list, case_obj=case_obj, config_obj=config_obj, user=request.user, is_generate_api=bool(api_dir), ) except Exception as e: logger.error(f"An unexpected error occurred: {e}", exc_info=True) return Response({"error": "An unexpected error occurred"}, status=500) return Response({"test_id": case_obj.id, **response.CASE_GENERATOR_SUCCESS}) @staticmethod def save_api_instances(api_instances: List) -> List: """ 保存API实例, 提取主键id并返回 :param api_instances: :return: """ api_ids = [] if api_instances: for instance in api_instances: instance.save() api_ids.append(instance.id) return api_ids class ConvertCaseView(APIView): """导入har或其他用例数据文件生成用例""" pass