# -*- coding: utf-8 -*- """ @File : parser.py @Time : 2023/1/14 15:10 @Author : geekbing @LastEditTime : - @LastEditors : - @Description : API用例解析 """ import datetime import json import logging from ast import literal_eval from concurrent.futures import ThreadPoolExecutor, as_completed from typing import Any, Dict, Generator, List, Tuple, Union import json5 import requests from lunarlink import models from lunaruser import models as user_models from lunarlink.utils.tree import ( get_all_ycatid, get_tree_max_id, get_tree_ycatid_mapping, ) logger = logging.getLogger(__name__) class Format: """ 解析标准HttpRunner脚本 前端->后端 """ def __init__(self, body: Dict, level: str = "test"): """初始化要解析的参数 body => { header: header -> [{key:'', value:'', desc:''},], request: request -> { form: formData - > [{key: '', value: '', type: 1, desc: ''},], json: jsonData -> {},- params: paramsData -> [{key: '', value: '', type: 1, desc: ''},] files: files -> {"fields","binary"} }, extract: extract -> [{key:'', value:'', desc:''}], validate: validate -> [{expect: '', actual: '', comparator: 'equals', type: 1},], variables: variables -> [{key: '', value: '', type: 1, desc: ''},], hooks: hooks -> [{setup: '', teardown: ''},], url: url -> string method: method -> string name: name -> string } """ try: self.name = body.pop("name", None) self.__headers = body.get("header", {}).get("header", {}) self.__variables = body.get("variables", {}).get("variables", {}) self.__setup_hooks = body.get("hooks", {}).get("setup_hooks", {}) self.__teardown_hooks = body.get("hooks", {}).get("teardown_hooks", {}) if level == "test": # 配置移除request参数 self.__params = ( body.get("request", {}).get("params", {}).get("params", {}) ) self.__data = body.get("request", {}).get("form", {}).get("data", {}) self.__json = body.get("request", {}).get("json", {}) self.__files = body.get("request", {}).get("files", {}).get("files", {}) self.__desc = { "header": body.get("header", {}).get("desc", {}), "data": body.get("request", {}).get("form", {}).get("desc", {}), "files": body.get("request", {}).get("files", {}).get("desc", {}), "params": body.get("request", {}).get("params", {}).get("desc", {}), "variables": body.get("variables", {}).get("desc", {}), } self.url = body.pop("url", None) self.method = body.pop("method", None) self.__times = body.pop("times", None) self.__extract = body.get("extract", {}).get("extract", {}) self.__validate = body.pop("validate", {}).get("validate", {}) self.__desc["extract"] = body.get("extract", {}).get("desc", {}) else: self.__params = {} self.__data = {} self.__json = {} self.__files = {} if level == "config": self.__desc = { "header": body.get("header", {}).get("desc", {}), "variables": body.get("variables", {}).get("desc", {}), } self.base_url = body.pop("base_url") self.is_default = body.pop("is_default") self.__parameters = body["parameters"].pop("parameters") self.__desc["parameters"] = body["parameters"].pop("desc") self.__level = level self.testcase = None self.project = body.pop("project", None) self.relation = body.pop("nodeId", None) # lunarlink的API没有rig_id字段,需要兼容 self.rig_id = body.get("rig_id", 0) self.rig_env = body.get("rig_env", 0) except KeyError: pass def parse(self): """ 返回标准化HttpRunner "desc" 字段, 执行时需去除 :return: """ if not hasattr(self, "rig_id"): self.rig_id = None if not hasattr(self, "rig_env"): self.rig_env = 0 test = {} if self.__level == "test": test = { "name": self.name, "rig_id": self.rig_id, "times": self.__times, "request": {"url": self.url, "method": self.method, "verify": False}, "desc": self.__desc, } if self.__extract: test["extract"] = self.__extract if self.__validate: test["validate"] = self.__validate elif self.__level == "config": test = { "name": self.name, "request": { "base_url": self.base_url, }, "desc": self.__desc, } if self.__parameters: test["parameters"] = self.__parameters if self.__headers: test["request"]["headers"] = self.__headers if self.__params: test["request"]["params"] = self.__params if self.__data: test["request"]["data"] = self.__data if self.__json: test["request"]["json"] = self.__json # 兼容一些接口需要传空json if self.__json == {}: test["request"]["json"] = {} if self.__files: test["request"]["files"] = self.__files if self.__variables: test["variables"] = self.__variables if self.__setup_hooks: test["setup_hooks"] = self.__setup_hooks if self.__teardown_hooks: test["teardown_hooks"] = self.__teardown_hooks self.testcase = test class Parse: """ 标准HttpRunner脚本解析至前端 后端->前端 """ def __init__(self, body: Dict, level: str = "test"): """ body: => { "name": "get token with $user_agent, $os_platform, $app_version", "request": { "url": "/api/get-token", "method": "POST", "headers": { "app_version": "$app_version", "os_platform": "$os_platform", "user_agent": "$user_agent" }, "json": { "sign": "${get_sign($user_agent, $device_sn, $os_platform, $app_version)}" }, "extract": [ {"token": "content.token"} ], "validate": [ {"eq": ["status_code", 200]}, {"eq": ["headers.Content-Type", "application/json"]}, {"eq": ["content.success", true]} ], "setup_hooks": [], "teardown_hooks": [] } """ self.name = body.get("name") self.__request = body.get("request") # self.__variables = body.get("variables") self.__setup_hooks = body.get("setup_hooks", []) self.__teardown_hooks = body.get("teardown_hooks", []) self.__desc = body.get("desc") if level == "test": self.__times = body.get("times", 1) # 如果导入没有times 默认为1 self.__extract = body.get("extract") self.__validate = body.get("validate") if level == "config": self.__parameters = body.get("parameters") self.__level = level self.testcase = None @staticmethod def __get_type(content: Any) -> Tuple: """返回data_type 默认string""" var_type = { "str": 1, "int": 2, "float": 3, "bool": 4, "list": 5, "dict": 6, "NoneType": 7, } key = str(type(content).__name__) # 黑魔法,为了兼容值是int,但又是$引用变量的情况 if key == "str" and "$int" in content: return var_type["int"], content if key == "NoneType": return var_type["NoneType"], content if key in ["list", "dict"]: content = json.dumps(content, ensure_ascii=False) else: content = str(content) return var_type[key], content def parse_http(self): """解析成标准前端脚本格式""" init = [ { "key": "", "value": "", "desc": "", } ] init_p = [ { "key": "", "value": "", "desc": "", "type": 1, } ] # 初始化test结构 test = { "name": self.name, "header": init, "request": { "data": init_p, "params": init_p, "json_data": "", }, "variables": init_p, "hooks": [ { "setup": "", "teardown": "", } ], } if self.__level == "test": test["times"] = self.__times test["method"] = self.__request["method"] test["url"] = self.__request["url"] test["validate"] = [ { "expect": "", "actual": "", "comparator": "equals", "type": 1, } ] test["extract"] = init if self.__extract: test["extract"] = [] for content in self.__extract: for key, value in content.items(): test["extract"].append( { "key": key, "value": value, "desc": self.__desc["extract"][key], } ) if self.__validate: test["validate"] = [] for content in self.__validate: for key, value in content.items(): obj = Parse.__get_type(value[1]) # 兼容旧的断言 desc = "" if len(value) >= 3: # value[2]为None时,设置为'' desc = value[2] or "" test["validate"].append( { "expect": obj[1], "actual": value[0], "comparator": key, "type": obj[0], "desc": desc, } ) elif self.__level == "config": test["base_url"] = self.__request["base_url"] test["parameters"] = init if self.__parameters: test["parameters"] = [] for content in self.__parameters: for key, value in content.items(): test["parameters"].append( { "key": key, "value": Parse.__get_type(value)[1], "desc": self.__desc["parameters"][key], } ) if self.__request.get("headers"): test["header"] = [] for key, value in self.__request.pop("headers").items(): test["header"].append( {"key": key, "value": value, "desc": self.__desc["header"][key]} ) if self.__request.get("data"): test["request"]["data"] = [] for key, value in self.__request.pop("data").items(): obj = Parse.__get_type(value) test["request"]["data"].append( { "key": key, "value": obj[1], "type": obj[0], "desc": self.__desc["data"][key], } ) if self.__request.get("params"): test["request"]["params"] = [] for key, value in self.__request.pop("params").items(): test["request"]["params"].append( { "key": key, "value": value, "type": 1, "desc": self.__desc["params"][key], } ) if self.__request.get("json"): test["request"]["json_data"] = json.dumps( self.__request.pop("json"), indent=4, separators=(",", ": "), ensure_ascii=False, ) if self.__variables: test["variables"] = [] for content in self.__variables: for key, value in content.items(): obj = Parse.__get_type(value) test["variables"].append( { "key": key, "value": obj[1], "desc": self.__desc["variables"][key], "type": obj[0], } ) if self.__setup_hooks or self.__teardown_hooks: test["hooks"] = [] if len(self.__setup_hooks) > len(self.__teardown_hooks): for index in range(0, len(self.__setup_hooks)): teardown = "" if index < len(self.__teardown_hooks): teardown = self.__teardown_hooks[index] test["hooks"].append( { "setup": self.__setup_hooks[index], "teardown": teardown, } ) else: for index in range(0, len(self.__teardown_hooks)): setup = "" if index < len(self.__setup_hooks): setup = self.__setup_hooks[index] test["hooks"].append( { "setup": setup, "teardown": self.__teardown_hooks[index], } ) self.testcase = test class Yapi: def __init__( self, yapi_base_url: str, token: str, faster_project_id: int, ): self.__yapi_base_url = yapi_base_url self.__token = token self.faster_project_id = faster_project_id self.api_info: List = [] self.api_ids: List = [] # self.category_info: List = [] # api基础信息,不包含请求报文 self.api_list_url = self.__yapi_base_url + "/api/interface/list" # api详情,包含详细的请求报文 self.api_details_url = self.__yapi_base_url + "/api/interface/get" # api所有分组目录,也包含了api的基础信息 self.category_info_url = self.__yapi_base_url + "/api/interface/list_menu" def get_category_info(self) -> Dict: """获取接口菜单列表 :return: """ try: res = requests.get( self.category_info_url, params={"token": self.__token} ).json() except Exception as e: logger.error(f"获取yapi的目录失败:{e}") else: if res["errcode"] == 0: return res else: return {"errcode": 1, "errmsg": "获取yapi的目录失败!", "data": []} def get_api_uptime_mapping(self): """yapi所有api的更新时间映射关系,{api_id: api_up_time} :return: """ "" category_info_list = self.get_category_info() mapping = {} for category_info in category_info_list["data"]: category_detail = category_info.get("list", []) for category in category_detail: api_id = category["_id"] up_time = category["up_time"] mapping[api_id] = up_time return mapping def get_category_id_name_mapping(self): """获取yapi的分组信息映射关系,{category_id: category_name} :return: """ try: res = self.get_category_info() except Exception as e: logger.error(f"获取yapi的目录失败:{e}") else: if res["errcode"] == 0: # {'category_id': 'category_name'} category_id_name_mapping = {} for category_info in res["data"]: # 排除为空的分组 if category_info.get("list"): category_name = category_info.get("name") category_id = category_info.get("_id") category_id_name_mapping[category_id] = category_name return category_id_name_mapping def get_api_info_list(self): """获取接口列表数据 :return: """ try: res = requests.get( self.api_list_url, params={ "token": self.__token, "page": 1, "limit": 100000, }, ).json() except Exception as e: logger.error(f"获取api list失败: {e}") else: if res["errcode"] == 0: return res def get_api_ids(self) -> List: """ 获取yapi的api_ids :return: """ api_list = self.get_api_info_list() return [api["id"] for api in api_list["data"]["list"]] def get_batch_api_detail(self, api_ids: List[int]) -> Generator[dict, None, None]: """ 获取yapi的所有api的详细信息 :param api_ids: :return: """ token = self.__token session = requests.Session() # 创建一个 Session 对象 def fetch_api_detail(api_id): try: response = session.get( f"{self.api_details_url}?token={token}&id={api_id}" ) response.raise_for_status() # 如果状态码不是200,会引发HTTPError异常 res = response.json() return res["data"] except requests.HTTPError as http_err: logger.error(f"HTTP error occurred: {http_err}") except Exception as e: logger.error(f"Error occurred: {e}") with ThreadPoolExecutor(max_workers=10) as executor: futures = {executor.submit(fetch_api_detail, api_id) for api_id in api_ids} for future in as_completed(futures): api_detail = future.result() if api_detail is not None: yield api_detail # 使用 yield 关键字,返回一个生成器 @staticmethod def get_variable_default_value( variable_type: str, variable_value: Union[Dict, Any] ): """ 获取变量默认值 :param variable_type: :param variable_value: :return: """ if isinstance(variable_value, dict) is False: return "" variable_type = variable_type.lower() if variable_type in ("integer", "number", "bigdecimal"): return variable_value.get("default", 0) elif variable_type == "date": return datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") elif variable_type == "string": return "" return "" def create_relation_id(self, project_id): """创建yapi所属目录 :param project_id: :return: """ category_id_name_mapping: Dict = self.get_category_id_name_mapping() obj = models.Relation.objects.get(project_id=project_id, type=1) eval_tree: List = literal_eval(obj.tree) yapi_catids: List = [yapi_catid for yapi_catid in get_all_ycatid(eval_tree, [])] if category_id_name_mapping is None: return for cat_id, cat_name in category_id_name_mapping.items(): if cat_id not in yapi_catids: tree_id = get_tree_max_id(tree=eval_tree) base_tree_node = { "id": tree_id + 1, "yapi_catid": cat_id, "label": cat_name, "children": [], } eval_tree.append(base_tree_node) obj.tree = json.dumps(eval_tree, ensure_ascii=False) obj.save() def yapi2faster(self, source_api_info): """yapi单个api转成faster格式 :param source_api_info: :return: """ logger.info(f"正在处理yapi的接口id是{source_api_info.get('_id')}") api_info_template = { "header": { "header": {}, "desc": {}, }, "request": { "form": { "data": {}, "desc": {}, }, "json": {}, "params": { "params": {}, "desc": {}, }, "files": { "files": {}, "desc": {}, }, }, "extract": { "extract": [], "desc": {}, }, "validate": { "validate": [], }, "variables": { "variables": [], "desc": {}, }, "hooks": { "setup_hooks": [], "teardown_hooks": [], }, "url": "", "method": "", "name": "", "times": 1, "nodeId": 0, "project": self.faster_project_id, } default_validator = {"equals": ["status_code", 200]} api_info_template["validate"]["validate"].append(default_validator) # 限制api的名称最大长度,避免溢出 api_info_template["name"] = source_api_info.get("title", "默认api名称")[:100] # path中{var}替换成$var格式 api_info_template["url"] = ( source_api_info.get("path", "").replace("{", "$").replace("}", "") ) api_info_template["method"] = source_api_info.get("method", "GET") # yapi的分组id api_info_template["yapi_catid"] = source_api_info["catid"] api_info_template["yapi_id"] = source_api_info["_id"] # 十位时间戳 api_info_template["yapi_add_time"] = source_api_info.get("add_time", "") api_info_template["yapi_up_time"] = source_api_info.get("up_time", "") # yapi原作者名 api_info_template["yapi_username"] = source_api_info.get("username", "") req_body_type = source_api_info.get("req_body_type") req_body_other = source_api_info.get("req_body_other", "") if req_body_type == "json" and req_body_other != "": try: req_body = json.loads(req_body_other) except json.decoder.JSONDecodeError: # TODO: 解析带注释的json req_body没起作用 req_body = json5.loads(req_body_other, encoding="utf8") except Exception as e: logger.error( f"yapi: {source_api_info['_id']}, req_body json loads failed: {source_api_info.get('req_body_other', e)}" ) else: # TODO: 递归遍历properties所有节点 if isinstance(req_body, dict): req_body_properties = req_body.get("properties") if isinstance(req_body_properties, dict): for field_name, field_value in req_body_properties.items(): if isinstance(field_value, dict) is False: continue any_of = field_value.get("anyOf") if isinstance(any_of, list): if len(any_of) > 0: field_value: dict = any_of[0] field_type = field_value.get("type", "unKnow") if field_type == "unKnow": logger.error( f'yapi: {source_api_info["_id"]}, req_body json type is unKnow' ) if not (field_type == "array" or field_type == "object"): self.set_ordinary_variable( api_info_template=api_info_template, field_name=field_name, field_type=field_type, field_value=field_value, ) if field_type == "array": items: dict = field_value["items"] # 特殊字段处理,通用的查询条件 if field_name == "conditions": set_customized_variable(api_info_template, items) else: items_type: str = items.get("type") if items_type != "array" and items_type != "object": self.set_ordinary_variable( api_info_template=api_info_template, field_name=field_name, field_type=field_type, field_value=field_value, ) if field_type == "object": properties: dict = field_value.get("properties") if properties and isinstance(properties, dict): for ( property_name, property_value, ) in properties.items(): field_type = property_value["type"] if not ( field_type == "array" or field_type == "object" ): self.set_ordinary_variable( api_info_template=api_info_template, field_name=property_name, field_type=field_type, field_value=property_value, ) req_query: List = source_api_info.get("req_query", []) if req_query: for param in req_query: param_name = param["name"] param_desc = param.get("desc", "") param_example = param.get("example", "") api_info_template["request"]["params"]["params"][ param_name ] = f"${param_name}" api_info_template["request"]["params"]["desc"][param_name] = param_desc api_info_template["variables"]["variables"].append( {param_name: param_example} ) api_info_template["variables"]["desc"][param_name] = param_desc req_body_form: List = source_api_info.get("req_body_form", []) if req_body_form: for data in req_body_form: form_name = data.get("name") form_desc = data.get("desc", "") form_example = data.get("example", "") api_info_template["request"]["form"]["data"][ form_name ] = f"${form_name}" api_info_template["request"]["form"]["desc"][form_name] = form_desc api_info_template["variables"]["variables"].append( {form_name: form_example} ) api_info_template["variables"]["desc"][form_name] = form_desc req_params: List = source_api_info.get("req_params", []) if req_params: for param in req_params: param_name = param.get("name") param_desc = param.get("desc", "") param_example = param.get("example", "") api_info_template["variables"]["variables"].append( {param_name: param_example} ) api_info_template["variables"]["desc"][param_name] = param_desc return api_info_template def set_ordinary_variable( self, api_info_template, field_name, field_type, field_value ): api_info_template["request"]["json"][field_name] = f"${field_name}" api_info_template["variables"]["variables"].append( {field_name: self.get_variable_default_value(field_type, field_value)} ) api_info_template["variables"]["desc"][field_name] = field_value.get( "description", "" ) def get_parsed_apis(self, api_info) -> List: """ 批量创建fastapi格式的api :param api_info: :return: 返回多个 API 类的实例 """ apis = [ self.yapi2faster(api) for api in api_info if isinstance(api, dict) is True ] proj = models.Project.objects.get(id=self.faster_project_id) obj = models.Relation.objects.get(project_id=self.faster_project_id, type=1) yapi_user = user_models.MyUser.objects.filter(name="yapi").first() yapi_user_id = yapi_user.id if yapi_user else None eval_tree: List = literal_eval(obj.tree) tree_ycatid_mapping = get_tree_ycatid_mapping(value=eval_tree) api_instances = [] for api in apis: format_api = Format(api) format_api.parse() yapi_catid: int = api["yapi_catid"] api_body = { "name": format_api.name, "body": format_api.testcase, "url": format_api.url, "method": format_api.method, "project": proj, "relation": tree_ycatid_mapping.get(yapi_catid, 0), # 直接从yapi原来的api中获取 "yapi_catid": yapi_catid, "yapi_id": api["yapi_id"], "yapi_add_time": api["yapi_add_time"], "yapi_up_time": api["yapi_up_time"], "yapi_username": api["yapi_username"], # 默认为yapi用户 "creator_id": yapi_user_id, } api_instances.append(models.API(**api_body)) return api_instances @staticmethod def merge_api( api_instances: List, apis_imported_from_yapi: List ) -> Tuple[List, List]: """ 将 yapi 获取的 api 和已导入测试平台的 api 进行合并 两种情况: 1. parsed_api.yapi_id不存在测试平台 2. yapi的id已经存在测试平台,新获取的 parsed_api.yapi_up_time > imported_api.yapi_up_time :param api_instances: 解析后的 API 实例 :param apis_imported_from_yapi: 原 api 信息 :return: 返回要更新的 API 实例和要新增的 API 实例 """ imported_apis_mapping = { api.yapi_id: api.yapi_up_time for api in apis_imported_from_yapi } imported_apis_index = { api.yapi_id: index for index, api in enumerate(apis_imported_from_yapi) } new_api_instances = [] update_api_instances = [] imported_apis_ids = set(imported_apis_mapping.keys()) for api in api_instances: yapi_id = api.yapi_id # parsed_api.yapi_id不存在测试平台 if yapi_id not in imported_apis_ids: new_api_instances.append(api) else: # yapi的id已经存在测试平台 imported_yapi_up_time = imported_apis_mapping[yapi_id] if api.yapi_up_time > int(imported_yapi_up_time): index = imported_apis_index[yapi_id] imported_api = apis_imported_from_yapi[index] imported_api.method = api.method imported_api.name = api.name imported_api.url = api.url imported_api.body = api.body imported_api.yapi_up_time = api.yapi_up_time update_api_instances.append(imported_api) return update_api_instances, new_api_instances def get_create_or_update_apis(self, imported_apis_mapping): """ 返回需要新增和更新的api_id imported_apis_mapping: {yapi_id: yapi_up_time} 新增: yapi_id不存在测试平台imported_apis_mapping中 更新: yapi_id存在测试平台imported_apis_mapping, 且up_time大于测试平台的 :param imported_apis_mapping: :return: """ api_uptime_mapping: Dict = self.get_api_uptime_mapping() create_ids = [] update_ids = [] for yapi_id, yapi_up_time in api_uptime_mapping.items(): imported_yapi_up_time = imported_apis_mapping.get(yapi_id) if not imported_yapi_up_time: # 新增 create_ids.append(yapi_id) elif yapi_up_time > int(imported_yapi_up_time): # 更新 update_ids.append(yapi_id) return create_ids, update_ids # 特殊字段conditions def set_customized_variable(api_info_template, items): if items["type"] == "object": properties: dict = items["properties"] attr_name: dict = properties.get("attributeName", {}) attribute_name_enum: list = attr_name.get("enum", [""]) if len(attribute_name_enum) == 0: attribute_name_enum = [""] target_value: list = [f"${value}" for value in attribute_name_enum] # 查询条件字段默认模板 api_info_template["request"]["json"]["conditions"] = { "attributeName": f"${attribute_name_enum[0]}", "rangeType": "$rangeType", "targetValue": target_value, } for attr in attribute_name_enum: api_info_template["variables"]["variables"].append({attr: ""}) api_info_template["variables"]["desc"][attr] = attr_name.get( "description", "" ) # 查询条件比较类型 range_type: dict = properties.get("rangeType", {}) range_type_enum: list = range_type.get("enum", [""]) api_info_template["variables"]["variables"].append( {"rangeType": range_type_enum[0]} ) api_info_template["variables"]["desc"][ "rangeType" ] = f"条件匹配方式:{','.join(range_type_enum)}" # 默认排序 api_info_template["request"]["json"]["orderBy"] = [ { "attributeName": f"${attribute_name_enum[0]}", "rankType": "DESC", } ] def format_json(value): """ 将一个JSON对象格式化为易读的字符串。 如果输入值因为任何原因无法被序列化为JSON,就返回原始输入值。 :param value: 需要格式化的JSON对象 :return: 如果成功,返回格式化后的字符串;否则,返回原始输入值。 """ try: return json.dumps(value, indent=4, separators=(",", ": "), ensure_ascii=False) except (TypeError, OverflowError): return value