# -*- coding: utf-8 -*- """ @File : report.py @Time : 2023/2/13 09:42 @Author : geekbing @LastEditTime : - @LastEditors : - @Description : 测试报告视图 """ import json import re from ast import literal_eval from shlex import quote from typing import Dict from django.core.exceptions import ObjectDoesNotExist from django.db import transaction from django.shortcuts import render from django.utils.decorators import method_decorator from django.utils import timezone from rest_framework.permissions import AllowAny from rest_framework.response import Response from rest_framework.viewsets import GenericViewSet from backend.utils import pagination from lunarlink import models, serializers from lunarlink.utils import response from lunarlink.utils.convert2hrp import Hrp from lunarlink.utils.decorator import request_log class ConvertRequest: @classmethod def _to_curl(cls, request, compressed: bool = False, verify: bool = True): """Return string with curl command by provided request object :param request: :param compressed: If `True` then `--compressed` argument will be added to result :param verify: :return: """ parts = [ ("curl", None), ("-X", request.method), ] parts += [ (None, request.url), ] for k, v in sorted(request.headers.items()): parts += [("-H", "{0}: {1}".format(k, v))] if request.body: body = request.body if isinstance(body, bytes): body = body.decode("utf-8") if isinstance(body, dict): body = json.dumps(body) parts += [("-d", body)] if compressed: parts += [("--compressed", None)] if not verify: parts += [("--insecure", None)] flat_parts = [] for k, v in parts: if k: flat_parts.append(quote(k)) if v: flat_parts.append(quote(v)) if k == "-H": flat_parts.append(" \\\n") return " ".join(flat_parts) @classmethod def _make_fake_req(cls, request_meta_dict): class RequestMeta: ... req = RequestMeta() setattr(req, "method", request_meta_dict["method"]) setattr(req, "url", request_meta_dict["url"]) setattr(req, "headers", request_meta_dict["headers"]) body = request_meta_dict.get("body") or request_meta_dict.get("data") setattr(req, "body", body) return req @classmethod def to_curl(cls, req: Dict) -> str: _req = cls._make_fake_req(req) return cls._to_curl(_req, compressed=True, verify=False) @classmethod def to_hrp(cls, req: Dict) -> Dict: hrp = Hrp(faster_req_json=req) return hrp.get_testcase().dict() @classmethod def generate_curl(cls, report_details, convert_type=("curl",)): for detail in report_details: for record in detail["records"]: meta_data = record["meta_data"] for t in convert_type: req = meta_data["request"] method_name = f"to_{t}" method = getattr(ConvertRequest, method_name) record["meta_data"][t] = method(req) class ReportView(GenericViewSet): """报告视图""" queryset = models.Report.objects serializer_class = serializers.ReportSerializer pagination_class = pagination.MyPageNumberPagination def get_authenticators(self): # 查看报告详情不需要鉴权 pattern = re.compile(r"/api/lunarlink/reports/\d+") if ( self.request.method == "GET" and re.search(pattern, self.request.path) is not None ): return [] # 不需要任何鉴权 return super().get_authenticators() # 默认所有鉴权 def get_permissions(self): # 如果是look方法,不需要任何权限 if self.action == "look": return [AllowAny()] return super().get_permissions() @method_decorator(request_log(level="DEBUG")) def list(self, request): """获取测试报告列表""" project = request.query_params.get("project") search = request.query_params.get("search") report_type = request.query_params.get("reportType") report_status = request.query_params.get("reportStatus") only_me = request.query_params.get("onlyMe") queryset = ( self.get_queryset().filter(project__id=project).order_by("-update_time") ) # 前端传过来是小写的字符串,不是python的True if only_me == "true": queryset = queryset.filter(creator=request.user) if search != "": queryset = queryset.filter(name__contains=search) if report_type != "": queryset = queryset.filter(type=report_type) if report_status != "": queryset = queryset.filter(status=report_status) page_report = self.paginate_queryset(queryset) serializer = self.get_serializer(page_report, many=True) return self.get_paginated_response(serializer.data) @method_decorator(request_log(level="INFO")) def look(self, request, pk): """ 查看报告 查看报告详情 """ try: report = models.Report.objects.get(id=pk) report_detail = models.ReportDetail.objects.get(report__id=pk) except ObjectDoesNotExist: return Response(response.REPORT_NOT_EXISTS) summary = json.loads(report.summary) summary["details"] = literal_eval(report_detail.summary_detail) ConvertRequest.generate_curl(summary["details"], convert_type=("curl",)) summary["html_report_name"] = report.name return render(request, template_name="report_template.html", context=summary) @method_decorator(request_log(level="INFO")) def destroy(self, request, pk): """ 单个删除 pk: id """ report_obj = models.Report.objects.filter(id=pk).first() if not report_obj: return Response(response.REPORT_NOT_EXISTS) report_obj.is_deleted = True report_obj.updater = request.user.id report_obj.update_time = timezone.now() report_obj.save() # 这将触发 pre_save 信号,并执行 delete_related_report_detail 处理器函数 return Response(response.REPORT_DEL_SUCCESS) @method_decorator(request_log(level="INFO")) def bulk_destroy(self, request): """ 批量删除报告 [{id:int}] """ ids = [content["id"] for content in request.data] # TODO: 如果有大量的数据,考虑性能问题 objs = list(models.Report.objects.filter(id__in=ids)) # 将QuerySet转换为list if not objs: return Response(response.REPORT_NOT_EXISTS) try: with transaction.atomic(): models.Report.objects.filter(id__in=ids).update( is_deleted=True, update_time=timezone.now(), updater=request.user.id, ) models.ReportDetail.objects.filter(report__in=objs).update( is_deleted=True ) except Exception as e: return Response({"error": str(e)}, status=400) return Response(response.REPORT_DEL_SUCCESS) # TODO: 下载报告 待实现 @method_decorator(request_log(level="INFO")) def download(self, request, **kwargs): """下载报告""" pass