# -*- coding: utf-8 -*-
|
"""
|
@File : generator.py
|
@Time : 2023/9/11 15:47
|
@Author : geekbing
|
@LastEditTime : -
|
@LastEditors : -
|
@Description : 根据RequestInfo生成测试用例
|
"""
|
import logging
|
import json
|
import re
|
from collections import defaultdict
|
from enum import IntEnum
|
from typing import Any, Dict, List, Tuple, Union, Set
|
|
from urllib.parse import urlparse, parse_qs
|
|
from apps.exceptions.convert import GenerateError
|
from apps.schema.request import RequestInfo
|
from apps.schema.api_schema import APIBody, APISchema
|
from apps.schema.testcase_schema import RecordCaseSchema, APIBodySchema
|
from lunarlink.models import API
|
from lunarlink.utils.enums.RequestBodyEnum import BodyType
|
from lunarlink.utils.parser import Format
|
|
|
logger = logging.getLogger(__name__)
|
|
|
class CaseTag(IntEnum):
|
INTEGRATION_CASE = 2
|
|
|
# 忽略的字段
|
ignored_headers = {
|
"content-type",
|
"connection",
|
"date",
|
"content-length",
|
"host",
|
"access-control-allow-credentials",
|
"access-control-allow-origin",
|
"user-agent",
|
"server",
|
}
|
|
|
class CaseGenerator:
|
"""
|
根据RequestInfo数组生成测试用例
|
"""
|
|
@staticmethod
|
def ignore(key: str):
|
"""
|
检查指定的HTTP头部字段名称是否在预定义的忽略列表中。
|
|
:param key: 忽略字段 headers.Origin
|
:return: 如果字段应被忽略,则返回True;否则返回False。
|
"""
|
for ig in ignored_headers:
|
if key.lower().endswith(ig):
|
return True
|
return False
|
|
@staticmethod
|
def get_body_type(headers: Dict):
|
headers = {k.lower(): v for k, v in headers.items()}
|
content_type = headers.get("content-type", "").lower()
|
if "json" in content_type:
|
return BodyType.json
|
if "x-www-form" in content_type:
|
return BodyType.x_form
|
if "form" in content_type:
|
return BodyType.form
|
return BodyType.none
|
|
@staticmethod
|
def extract_variables(input_string: str):
|
"""
|
使用正则表达式提取 ${http_res_INDEX_variable::path} 格式的变量、索引和路径
|
|
:param input_string:
|
:return: 返回http_res_INDEX, http_res_INDEX_variable, path
|
"""
|
if isinstance(input_string, str):
|
matches = re.findall(r"\$http_res_(\d+)_(.*?)::(.+)", input_string)
|
else:
|
matches = []
|
|
if matches:
|
idx, var_name, path = matches[0]
|
full_variable = "http_res_" + idx + "_" + var_name
|
return "http_res_" + idx, full_variable, path
|
return None
|
|
@staticmethod
|
def generate_case(
|
length: int,
|
project_id: int,
|
case_dir: int,
|
api_dir: int,
|
config: Dict,
|
case_name: str,
|
requests: List[RequestInfo],
|
user: int,
|
) -> Tuple[RecordCaseSchema, List]:
|
"""
|
根据录制接口生成测试用例
|
|
:param api_dir: 接口目录
|
:param length:
|
:param project_id:
|
:param case_dir: 用例目录
|
:param config:
|
:param case_name:
|
:param requests:
|
:param user: 当前用户
|
:return:
|
"""
|
record_case_body = [config]
|
api_info_map = {}
|
|
# 先将 requests 转成 faster 格式 api
|
for r in range(len(requests)):
|
api_info = APIBodySchema() # 初始化 faster 格式的 api 模板
|
api_name = f"http_res_{r + 1}"
|
api_info.name = api_name
|
api_info.method = requests[r].request_method
|
# 请求头
|
headers = requests[r].request_headers
|
CaseGenerator.merge_headers(headers, api_info)
|
# 请求参数为: form、json
|
requests_body = requests[r].body
|
if requests_body:
|
CaseGenerator.merge_body(headers, api_info, requests_body)
|
# 请求参数为: params
|
CaseGenerator.merge_params(api_info, requests[r].url)
|
|
api_info_map.update({api_name: api_info.dict(by_alias=True)}) # 保持别名
|
|
# 遍历所有接口,将 htt_res_1_token 的值 content.token 加入 extract
|
for api_name, request_data in api_info_map.items():
|
for headers_key, headers_value in request_data["header"]["header"].items():
|
CaseGenerator.append_extract(
|
request_data, api_info_map, headers_key, headers_value, "headers"
|
)
|
|
for k, v in request_data["request"]["form"]["data"].items():
|
CaseGenerator.append_extract(request_data, api_info_map, k, v, "form")
|
|
for k, v in request_data["request"]["json"].items():
|
CaseGenerator.append_extract(request_data, api_info_map, k, v, "json")
|
|
for k, v in request_data["request"]["params"]["params"].items():
|
CaseGenerator.append_extract(request_data, api_info_map, k, v, "params")
|
|
api_instances = []
|
for _, v in api_info_map.items():
|
api = Format(v)
|
api.parse()
|
if api_dir:
|
api_body = APIBody(**api.testcase)
|
api_schema = APISchema(
|
name=api.name,
|
body=api_body,
|
url=api.url,
|
method=api.method,
|
project_id=project_id,
|
relation=api_dir,
|
creator_id=user,
|
).dict(by_alias=True)
|
api_instances.append(API(**api_schema))
|
record_case_body.append({"body": api.testcase})
|
|
return (
|
RecordCaseSchema(
|
length=length,
|
project_id=project_id,
|
relation=case_dir,
|
name=case_name,
|
tag=CaseTag.INTEGRATION_CASE.value,
|
body=record_case_body,
|
),
|
api_instances,
|
)
|
|
@staticmethod
|
def merge_headers(headers: Dict, api_info: APIBodySchema):
|
"""
|
将录制接口中的headers格式化后加入fastapi的headers中
|
|
:param headers: 录制接口中的headers
|
:param api_info: fastapi接口数据
|
:return:
|
"""
|
api_info.header.header.update(headers)
|
api_info.header.desc.update({key: "" for key in headers.keys()})
|
|
@staticmethod
|
def merge_body(headers: Dict, api_info: APIBodySchema, body: str):
|
"""
|
将录制接口中的body格式化后加入fastapi的body中
|
|
:param headers: 录制接口中的headers
|
:param api_info: fastapi接口数据
|
:param body: 接口请求body
|
:return:
|
"""
|
try:
|
body = json.loads(body)
|
except json.JSONDecodeError:
|
pass
|
except Exception as e:
|
logger.error(f"转换body变量失败: {e}")
|
else:
|
body_type = CaseGenerator.get_body_type(headers)
|
if body_type == BodyType.x_form:
|
api_info.request.form.data.update(body)
|
api_info.request.form.desc.update({key: "" for key in body.keys()})
|
elif body_type == BodyType.json:
|
api_info.request.json_data.update(body)
|
|
@staticmethod
|
def merge_params(api_info: APIBodySchema, url: str):
|
"""
|
将录制接口中的 query 参数提取后加入fastapi的params中
|
|
:param api_info: fastapi接口数据
|
:param url: 接口请求url
|
:return:
|
"""
|
parsed_url = urlparse(url)
|
api_info.url = parsed_url.path
|
query_params = parse_qs(parsed_url.query)
|
params = {k: v[0] for k, v in query_params.items()}
|
api_info.request.params.params.update(params)
|
api_info.request.params.desc.update({key: "" for key in params.keys()})
|
|
@staticmethod
|
def append_extract(
|
request_data: Dict, api_info_map: Dict, item_key, item_value, request_type: str
|
):
|
"""
|
提取引用变量,将接口中引用此变量的请求数据替换为变量值
|
|
:param request_data: 接口请求数据, 保存格式为{"header": {...}, "request": {...}}
|
:param api_info_map: 全部接口的请求信息和接口名的映射,保存格式为{"api_name": {"header": {...}, "request": {...}}
|
:param item_key: 键名
|
:param item_value: 键值
|
:param request_type: 消息请求类型(headers, json, form, params)
|
:return:
|
"""
|
variables = CaseGenerator.extract_variables(item_value)
|
if variables:
|
api_name = variables[0]
|
extract_var_name = variables[1]
|
extract_var_path = variables[2]
|
|
if request_type == "headers":
|
CaseGenerator.replace_faster_headers(request_data, item_key, variables)
|
elif request_type == "form":
|
CaseGenerator.replace_faster_form(request_data, item_key, variables)
|
elif request_type == "json":
|
CaseGenerator.replace_faster_json(request_data, item_key, variables)
|
elif request_type == "params":
|
CaseGenerator.replace_faster_params(request_data, item_key, variables)
|
if api_name in api_info_map:
|
if {extract_var_name: extract_var_path} not in api_info_map[api_name][
|
"extract"
|
]["extract"]:
|
api_info_map[api_name]["extract"]["extract"].append(
|
{extract_var_name: extract_var_path}
|
)
|
api_info_map[api_name]["extract"]["desc"].update(
|
{extract_var_name: ""}
|
)
|
|
@staticmethod
|
def replace_faster_form(request_data: Dict, var_name: str, variables: Tuple):
|
"""
|
替换fastapi格式请求form参数
|
|
:param request_data:
|
:param var_name:
|
:param variables:
|
:return:
|
"""
|
request_data["request"]["form"]["data"].update({var_name: f"${variables[1]}"})
|
|
@staticmethod
|
def replace_faster_json(request_data: Dict, var_name: str, variables: Tuple):
|
"""
|
替换fastapi格式请求json参数
|
|
:param request_data:
|
:param var_name:
|
:param variables:
|
:return:
|
"""
|
request_data["request"]["json"].update({var_name: f"${variables[1]}"})
|
|
@staticmethod
|
def replace_faster_params(request_data: Dict, var_name: str, variables: Tuple):
|
"""
|
替换fastapi格式请求params查询参数
|
|
:param request_data:
|
:param var_name:
|
:param variables:
|
:return:
|
"""
|
request_data["request"]["params"]["params"].update(
|
{var_name: f"${variables[1]}"}
|
)
|
|
@staticmethod
|
def replace_faster_headers(request_data: Dict, var_name: str, variables: Tuple):
|
request_data["header"]["header"].update({var_name: f"${variables[1]}"})
|
|
@staticmethod
|
def extract_field(requests: List[RequestInfo]) -> List[str]:
|
"""遍历接口,提取其中的变量并替换
|
|
:param requests: 录制流量接口信息
|
:return:
|
"""
|
value_to_path_map: Dict = defaultdict(list) # 变量值到路径的映射 {变量值: [变量路径, ...], ...}
|
replaced = [] # 替换记录
|
for i, item in enumerate(requests):
|
if "Content-Length" in item.request_headers:
|
item.request_headers.pop("Content-Length")
|
if "Content-Type" in item.response_headers:
|
item.response_headers.pop("Content-Type")
|
# 记录变量
|
CaseGenerator.record_vars(
|
request=item,
|
value_to_path_map=value_to_path_map,
|
var_name=f"http_res_{i + 1}",
|
)
|
if i > 0:
|
# 接口变量替换
|
CaseGenerator.replace_vars(item, value_to_path_map, replaced)
|
return replaced
|
|
@staticmethod
|
def replace_vars(request: RequestInfo, value_to_path_map: Dict, replaced: List):
|
"""
|
替换变量
|
|
:param request: 录制流量接口
|
:param value_to_path_map: 记录变量值, {变量值: [变量路径, ...], ...}
|
:param replaced: 替换记录
|
:return:
|
"""
|
# 提取响应内容和响应头部中的所有值
|
response_values = CaseGenerator.extract_response_values(request)
|
|
CaseGenerator.replace_url(request, value_to_path_map, replaced, response_values)
|
CaseGenerator.replace_headers(
|
request,
|
value_to_path_map,
|
replaced,
|
response_values,
|
)
|
CaseGenerator.replace_body(
|
request,
|
value_to_path_map,
|
replaced,
|
response_values,
|
)
|
|
@staticmethod
|
def extract_response_values(request: RequestInfo) -> Set:
|
"""
|
提取响应内容和响应头部中的所有值。
|
|
:param request: 录制流量接口
|
:return: 响应值集合
|
"""
|
response_values = set()
|
|
# 假设响应内容是 JSON 格式的字符串,提取所有值
|
if request.response_content:
|
try:
|
response_data = json.loads(request.response_content)
|
response_values.update(
|
CaseGenerator.get_values_from_json(response_data)
|
)
|
except json.JSONDecodeError:
|
pass
|
|
# 添加响应头部中的所有值,除了忽略的头部键
|
for key, value in request.response_headers.items():
|
if key.lower() not in ignored_headers:
|
response_values.add(value)
|
|
return response_values
|
|
@staticmethod
|
def get_values_from_json(data: Union[Dict, List]) -> Set:
|
"""
|
从 JSON 数据中递归提取所有值。
|
|
:param data: JSON 数据,可以是字典或列表
|
:return: 包含所有值的集合
|
"""
|
values = set()
|
if isinstance(data, dict):
|
for value in data.values():
|
values.update(CaseGenerator.get_values_from_json(value))
|
elif isinstance(data, list):
|
for item in data:
|
values.update(CaseGenerator.get_values_from_json(item))
|
else:
|
values.add(data)
|
return values
|
|
@staticmethod
|
def parse_url(url: str):
|
"""
|
解析完整的URL,提取协议、域名路径和查询参数。
|
|
:param url: 要解析的完整URL字符串。
|
:return: 一个包含协议、域名路径和查询参数列表的元组。
|
"""
|
url_parts = url.split("?")
|
base_url = url_parts[0]
|
query_params = [param for param in url_parts[1].split("&") if param] if len(url_parts) > 1 else []
|
protocol, path_with_domain = base_url.split("//")
|
return protocol, path_with_domain, query_params
|
|
@staticmethod
|
def replace_path_segments(
|
path_with_domain: str,
|
value_to_path_map: Dict,
|
replaced: List,
|
response_values: Set,
|
):
|
"""
|
替换URL路径中的各个段落。
|
|
:param replaced:
|
:param path_with_domain: 域名和路径组合的字符串。
|
:param value_to_path_map: 一个映射字典,用于替换路径中的变量。
|
:param replaced: 记录替换详情的列表
|
:param response_values: request_headers和request_content响应值集合
|
:return: 替换后的路径段落列表。
|
"""
|
new_url = []
|
domain_and_path_list = path_with_domain.split("/")
|
for segment in domain_and_path_list:
|
# 检查当前字段是否是自身响应中的值,如果是则跳过
|
if segment.lower() in value_to_path_map and segment not in response_values:
|
new_segment = value_to_path_map[segment.lower()][0]
|
new_url.append(new_segment)
|
replaced.append(f"{segment} => ${new_segment}")
|
else:
|
new_url.append(segment)
|
return new_url
|
|
@staticmethod
|
def replace_query_parameters(
|
query_params: List[str],
|
value_to_path_map: Dict,
|
replaced: List,
|
response_values: Set,
|
):
|
"""
|
替换查询参数中的值
|
|
:param query_params: 查询参数列表。
|
:param value_to_path_map: 一个映射字典,用于替换查询参数的值。
|
:param replaced: 替换后的路径段落列表。
|
:return: 替换后的查询参数字符串列表。
|
:param response_values: request_headers和request_content响应值集合
|
"""
|
new_query = []
|
for param in query_params:
|
param_name, param_value = param.split("=")
|
# 检查当前查询参数值是否是自身响应中的值,如果是则跳过
|
if (
|
param_value.lower() in value_to_path_map
|
and param_value not in response_values
|
):
|
new_param_value = value_to_path_map[param_value.lower()][0]
|
new_query.append(f"{param_name}=${new_param_value}")
|
replaced.append(f"{param_value} => ${new_param_value}")
|
else:
|
new_query.append(param)
|
return new_query
|
|
@staticmethod
|
def reconstruct_url(protocol: str, new_url: List[str], new_query: List[str]):
|
"""
|
重构URL,将协议、替换后的路径和查询参数组合成完整的URL。
|
|
:param protocol: URL的协议部分,如 http 或 https。
|
:param new_url: 经过替换的路径段落列表。
|
:param new_query: 经过替换的查询参数字符串列表。
|
:return: 重构后的完整URL字符串。
|
"""
|
return f"{protocol}//{'/'.join(new_url)}{'?' + '&'.join(new_query) if new_query else ''}"
|
|
@staticmethod
|
def replace_url(
|
request: RequestInfo,
|
value_to_path_map: Dict,
|
replaced: List,
|
response_values: Set,
|
):
|
"""
|
替换请求对象中的URL路径和查询参数。
|
|
:param request: 包含原始URL的请求对象。
|
:param value_to_path_map: 一个映射字典,用于替换URL的路径和查询参数中的值。
|
:param replaced: 记录替换详情的列表。
|
:param response_values: request_headers和request_content响应值集合
|
:return: None。函数直接修改传入的请求对象中的URL属性。
|
"""
|
protocol, path_with_domain, query_params = CaseGenerator.parse_url(request.url)
|
|
new_url = CaseGenerator.replace_path_segments(
|
path_with_domain,
|
value_to_path_map,
|
replaced,
|
response_values,
|
)
|
new_query = CaseGenerator.replace_query_parameters(
|
query_params,
|
value_to_path_map,
|
replaced,
|
response_values,
|
)
|
|
request.url = CaseGenerator.reconstruct_url(protocol, new_url, new_query)
|
|
@staticmethod
|
def replace_headers(
|
request: RequestInfo,
|
value_to_path_map: Dict,
|
replaced: List,
|
response_values: Set,
|
):
|
"""
|
替换请求接口中的request_headers。
|
|
:param request: 包含原始request_headers的请求对象。
|
:param value_to_path_map: 一个映射字典,用于替换request_headers的值。
|
:param replaced: 记录替换详情的列表。
|
:param response_values: 从响应内容和头部中提取的所有值的集合。
|
:return: None。函数直接修改传入的请求对象中的request_headers属性。
|
"""
|
for header_key, header_value in list(request.request_headers.items()):
|
if header_value not in response_values:
|
replacement = value_to_path_map.get(header_value)
|
if replacement:
|
new_header_value = replacement[0]
|
request.request_headers[header_key] = f"${new_header_value}"
|
replaced.append(f"{header_key} => ${new_header_value}")
|
|
@staticmethod
|
def replace_body(
|
request: RequestInfo,
|
value_to_path_map: Dict,
|
replaced: List,
|
response_values: Set,
|
):
|
"""
|
替换请求接口中的body。
|
|
:param request: 包含原始body的请求对象。
|
:param value_to_path_map: 一个映射字典,用于替换body的值。,{变量值: [变量路径, ...],...}
|
:param replaced: 记录替换详情的列表。
|
:param response_values: 从响应内容和头部中提取的所有值的集合。
|
:return: None。函数直接修改传入的请求对象中的body属性。
|
"""
|
if request.body:
|
try:
|
request_body = json.loads(request.body)
|
replaced_non_str_variables = [] # 非字符串变量替换路径
|
CaseGenerator.dfs_replace(
|
request_body,
|
value_to_path_map,
|
replaced_non_str_variables,
|
replaced,
|
response_values,
|
)
|
result = json.dumps(request_body, ensure_ascii=False)
|
for v in replaced_non_str_variables:
|
result = result.replace(f"'{v}'", f"{v}")
|
request.body = result
|
except json.JSONDecodeError:
|
pass
|
except Exception as e:
|
logger.error(f"转换body变量失败: {e}")
|
|
@staticmethod
|
def _dfs_replace_dict(
|
request_dict: Dict,
|
value_to_path_map: Dict,
|
replaced_non_str_variables: List,
|
replaced: List,
|
response_values: Set,
|
) -> None:
|
"""
|
递归地替换字典中的变量。
|
|
:param request_dict: 要处理的请求体字典。
|
:param value_to_path_map: 存储变量替换信息的字典。
|
:param replaced_non_str_variables: 存储被替换的非字符串变量。
|
:param replaced: 存储所有替换操作的列表。
|
:param response_values: 从响应内容和头部中提取的所有值的集。
|
:return:
|
"""
|
for key, value in request_dict.items():
|
is_str, new_value = CaseGenerator.dfs_replace(
|
value,
|
value_to_path_map,
|
replaced_non_str_variables,
|
replaced,
|
response_values,
|
)
|
# 如果得到的替换值不是None,并且这个值不在响应值中,进行替换
|
if new_value is not None and new_value not in response_values:
|
request_dict[key] = f"${new_value}"
|
if not is_str:
|
replaced_non_str_variables.append(f"${new_value}")
|
|
@staticmethod
|
def _dfs_replace_list(
|
request_list: List,
|
value_to_path_map: Dict,
|
replaced_non_str_variables: List,
|
replaced: List,
|
response_values: Set,
|
) -> None:
|
"""
|
递归地替换列表中的变量。
|
|
:param request_list: 要处理的请求体列表。
|
:param value_to_path_map: 存储变量替换信息的字典。
|
:param replaced_non_str_variables: 存储被替换的非字符串变量。
|
:param replaced: 存储所有替换操作的列表。
|
:param response_values: 响应体中的值,这些值不应被替换。
|
:return:
|
"""
|
for i, item in enumerate(request_list):
|
is_str, new_value = CaseGenerator.dfs_replace(
|
item,
|
value_to_path_map,
|
replaced_non_str_variables,
|
replaced,
|
response_values,
|
)
|
if new_value is not None and new_value not in response_values:
|
request_list[i] = f"${new_value}"
|
if not is_str:
|
replaced_non_str_variables.append(f"${new_value}")
|
|
@staticmethod
|
def _dfs_replace_value(value, value_to_path_map, replaced, response_values):
|
"""
|
替换基本数据类型的值。
|
|
:param value: 要替换的值。
|
:param value_to_path_map: 存储变量替换信息的字典。
|
:param replaced: 存储所有替换操作的列表。
|
:param response_values: 响应体中的值,这些值不应被替换。
|
:return: 替换结果和是否为字符串的标志。
|
"""
|
# 将非字符串类型的值转换为字符串以进行比较
|
value_str = str(value) if not isinstance(value, str) else value
|
|
# 如果这个值在映射中并且不在响应值中,使用映射中的新值进行替换
|
if value_str in value_to_path_map and value_str not in response_values:
|
new_value = value_to_path_map[value_str][0]
|
replaced.append(f"{value_str} => ${new_value}")
|
# 返回替换状态和新值,非字符串类型的变量也记录在replaced_non_str_variables中
|
return not isinstance(value, str), new_value
|
|
return None, None
|
|
@staticmethod
|
def dfs_replace(
|
request_body: Any,
|
value_to_path_map: Dict,
|
replaced_non_str_variables: List,
|
replaced: List,
|
response_values: Set,
|
):
|
"""
|
对请求体进行深度优先遍历,替换其中的变量。
|
|
:param request_body: 请求体,可能是字典、列表或基本数据类型。
|
:param value_to_path_map: 存储变量替换信息的字典。
|
:param replaced_non_str_variables: 存储被替换的非字符串变量。
|
:param replaced: 存储所有替换操作的列表。
|
:param response_values: 从响应内容和头部中提取的所有值的集。
|
:return:
|
"""
|
|
# 处理字典类型的请求体
|
if isinstance(request_body, dict):
|
CaseGenerator._dfs_replace_dict(
|
request_body,
|
value_to_path_map,
|
replaced_non_str_variables,
|
replaced,
|
response_values,
|
)
|
# 处理列表类型的请求体
|
elif isinstance(request_body, list):
|
CaseGenerator._dfs_replace_list(
|
request_body,
|
value_to_path_map,
|
replaced_non_str_variables,
|
replaced,
|
response_values,
|
)
|
# 处理基本数据类型的请求体
|
else:
|
return CaseGenerator._dfs_replace_value(
|
request_body,
|
value_to_path_map,
|
replaced,
|
response_values,
|
)
|
|
@staticmethod
|
def record_vars(request: RequestInfo, value_to_path_map: Dict, var_name: str):
|
"""
|
记录变量
|
:param request: 录制流量的接口信息
|
:param value_to_path_map: 变量值到路径的映射 {变量值: [变量路径, ...], ...}
|
:param var_name: http_res_{i + 1}
|
:return:
|
"""
|
CaseGenerator.split_headers(
|
request,
|
value_to_path_map,
|
var_name,
|
)
|
CaseGenerator.split_body(
|
request,
|
value_to_path_map,
|
var_name,
|
)
|
|
@staticmethod
|
def split_headers(
|
request: RequestInfo,
|
value_to_path_map: Dict,
|
var_name: str = "",
|
header_path_prefix: str = "headers",
|
):
|
"""
|
分析和记录请求头中的变量及其对应的路径。
|
|
:param request: 包含响应头的请求信息对象。
|
:param value_to_path_map: 一个映射,将变量值映射到其在请求中的路径。
|
:param var_name: 用于变量提取的基本名称。
|
:param header_path_prefix: 用于变量路径的前缀。
|
:return:
|
"""
|
try:
|
CaseGenerator.dfs(
|
request.response_headers,
|
var_name,
|
header_path_prefix,
|
value_to_path_map,
|
headers=True,
|
)
|
except Exception as e:
|
raise GenerateError(f"解析接口headers变量出错:{e}")
|
|
@staticmethod
|
def split_body(
|
request: RequestInfo,
|
value_to_path_map: Dict,
|
var_name: str = "",
|
content_path_prefix: str = "content",
|
):
|
"""
|
分析和记录请求体中的变量及其对应的路径。
|
|
:param request: 包含响应内容的请求信息对象。
|
:param value_to_path_map: 一个映射,将变量值映射到其在请求中的路径。
|
:param var_name: 用于变量提取的基本名称。
|
:param content_path_prefix: 用于变量路径的前缀。
|
:return:
|
"""
|
if request.body:
|
try:
|
body = json.loads(request.response_content)
|
CaseGenerator.dfs(
|
body=body,
|
var_name=var_name,
|
var_path=content_path_prefix,
|
value_to_path_map=value_to_path_map,
|
)
|
except json.JSONDecodeError:
|
# body不是JSON,跳过
|
pass
|
except Exception as e:
|
raise GenerateError(f"解析接口body变量出错:{e}")
|
|
@staticmethod
|
def dfs_dict(
|
body_dict,
|
var_name,
|
var_path,
|
value_to_path_map,
|
headers,
|
):
|
"""
|
递归地遍历响应的头部或内容,提取并记录变量及其路径。
|
|
:param body_dict:
|
:param var_name:
|
:param var_path:
|
:param value_to_path_map:
|
:param headers:
|
:return:
|
"""
|
for key, value in body_dict.items():
|
c_name = f"{var_name}_{key}"
|
c_path = f"{var_path}.{key}"
|
CaseGenerator.dfs(value, c_name, c_path, value_to_path_map, headers)
|
|
@staticmethod
|
def dfs_list(body_list, var_name, var_path, value_to_path_map, headers):
|
"""
|
递归地遍历响应的头部或内容,提取并记录变量及其路径。
|
|
:param body_list:
|
:param var_name:
|
:param var_path:
|
:param value_to_path_map:
|
:param headers:
|
:return:
|
"""
|
for i, item in enumerate(body_list):
|
c_name = f"{var_name}_{i}"
|
c_path = f"{var_path}.{i}"
|
CaseGenerator.dfs(item, c_name, c_path, value_to_path_map, headers)
|
|
@staticmethod
|
def process_basic_type(
|
value,
|
var_name,
|
var_path,
|
value_to_path_map,
|
headers,
|
):
|
"""
|
处理基本数据类型的值。
|
|
:param value:
|
:param var_name:
|
:param var_path:
|
:param value_to_path_map:
|
:param headers:
|
:return:
|
"""
|
if not headers or not CaseGenerator.ignore(var_path):
|
var_name_path = f"{var_name}::{var_path}"
|
# 如果是bool值,需要特殊处理一下,因为Python get False/True会变成get 0 1
|
if value is not None:
|
if isinstance(value, bool):
|
value_to_path_map[str(value)].append(var_name_path)
|
else:
|
value_to_path_map[value].append(var_name_path)
|
|
@staticmethod
|
def dfs(
|
body: Any,
|
var_name: str,
|
var_path: str,
|
value_to_path_map: Dict,
|
headers: bool = False,
|
):
|
"""
|
递归地遍历响应的头部或内容,提取并记录变量及其路径。
|
|
:param body: 可能是响应头或响应内容的部分,可以是字典或列表。
|
:param var_name: 当前变量的名称。
|
:param var_path: 当前变量的提取路径。
|
:param value_to_path_map: 变量到路径的映射。
|
:param headers: 指示当前处理的是否是响应头。
|
:return:
|
"""
|
if isinstance(body, list):
|
CaseGenerator.dfs_list(
|
body,
|
var_name,
|
var_path,
|
value_to_path_map,
|
headers,
|
)
|
elif isinstance(body, dict):
|
CaseGenerator.dfs_dict(
|
body,
|
var_name,
|
var_path,
|
value_to_path_map,
|
headers,
|
)
|
else:
|
CaseGenerator.process_basic_type(
|
body,
|
var_name,
|
var_path,
|
value_to_path_map,
|
headers,
|
)
|