import collections import csv import importlib import io import json import os import sys import logging import yaml from httprunner import builtin, exceptions, parser, utils, validator from httprunner.compat import OrderedDict logger = logging.getLogger(__name__) ############################################################################### # file loader ############################################################################### def _check_format(file_path, content): """check testcase format if valid""" # TODO: replace with JSON schema validation if not content: # testcase file content is empty err_msg = "Testcase file content is empty: {}".format(file_path) logger.error(err_msg) raise exceptions.FileFormatError(err_msg) elif not isinstance(content, (list, dict)): # testcase file content does not match testcase format err_msg = "Testcase file content format invalid: {}".format(file_path) logger.error(err_msg) raise exceptions.FileFormatError(err_msg) def load_yaml_file(yaml_file): """load yaml file and check file content format""" with io.open(yaml_file, "r", encoding="utf-8") as stream: yaml_content = yaml.load(stream) _check_format(yaml_file, yaml_content) return yaml_content def load_json_file(json_file): """load json file and check file content format""" with io.open(json_file, encoding="utf-8") as data_file: try: json_content = json.load(data_file) except exceptions.JSONDecodeError: err_msg = "JSONDecodeError: JSON file format error: {}".format(json_file) logger.error(err_msg) raise exceptions.FileFormatError(err_msg) _check_format(json_file, json_content) return json_content def load_csv_file(csv_file): """load csv file and check file content format @param csv_file: csv file path e.g. csv file content: username,password test1,111111 test2,222222 test3,333333 @return list of parameter, each parameter is in dict format e.g. [ {'username': 'test1', 'password': '111111'}, {'username': 'test2', 'password': '222222'}, {'username': 'test3', 'password': '333333'} ] """ csv_content_list = [] with io.open(csv_file, encoding="utf-8") as csvfile: reader = csv.DictReader(csvfile) for row in reader: csv_content_list.append(row) return csv_content_list def load_file(file_path): if not os.path.isfile(file_path): raise exceptions.FileNotFound("{} does not exist.".format(file_path)) file_suffix = os.path.splitext(file_path)[1].lower() if file_suffix == ".json": return load_json_file(file_path) elif file_suffix in [".yaml", ".yml"]: return load_yaml_file(file_path) elif file_suffix == ".csv": return load_csv_file(file_path) else: # '' or other suffix err_msg = "Unsupported file format: {}".format(file_path) logger.warning(err_msg) return [] def load_folder_files(folder_path, recursive=True): """load folder path, return all files endswith yml/yaml/json in list. Args: folder_path (str): specified folder path to load recursive (bool): load files recursively if True Returns: list: files endswith yml/yaml/json """ if isinstance(folder_path, (list, set)): files = [] for path in set(folder_path): files.extend(load_folder_files(path, recursive)) return files if not os.path.exists(folder_path): return [] file_list = [] for dirpath, dirnames, filenames in os.walk(folder_path): filenames_list = [] for filename in filenames: if not filename.endswith((".yml", ".yaml", ".json")): continue filenames_list.append(filename) for filename in filenames_list: file_path = os.path.join(dirpath, filename) file_list.append(file_path) if not recursive: break return file_list def load_dot_env_file(dot_env_path): """load .env file. Args: dot_env_path (str): .env file path Returns: dict: environment variables mapping { "UserName": "debugtalk", "Password": "123456", "PROJECT_KEY": "ABCDEFGH" } Raises: exceptions.FileFormatError: If .env file format is invalid. """ if not os.path.isfile(dot_env_path): raise exceptions.FileNotFound(".env file path is not exist.") logger.info("Loading environment variables from {}".format(dot_env_path)) env_variables_mapping = {} with io.open(dot_env_path, "r", encoding="utf-8") as fp: for line in fp: # maxsplit=1 if "=" in line: variable, value = line.split("=", 1) elif ":" in line: variable, value = line.split(":", 1) else: raise exceptions.FileFormatError(".env format error") env_variables_mapping[variable.strip()] = value.strip() utils.set_os_environ(env_variables_mapping) return env_variables_mapping def locate_file(start_path, file_name): """locate filename and return file path. searching will be recursive upward until current working directory. Args: start_path (str): start locating path, maybe file path or directory path Returns: str: located file path. None if file not found. Raises: exceptions.FileNotFound: If failed to locate file. """ if os.path.isfile(start_path): start_dir_path = os.path.dirname(start_path) elif os.path.isdir(start_path): start_dir_path = start_path else: raise exceptions.FileNotFound("invalid path: {}".format(start_path)) file_path = os.path.join(start_dir_path, file_name) if os.path.isfile(file_path): return file_path # current working directory if os.path.abspath(start_dir_path) in [os.getcwd(), os.path.abspath(os.sep)]: raise exceptions.FileNotFound( "{} not found in {}".format(file_name, start_path) ) # locate recursive upward return locate_file(os.path.dirname(start_dir_path), file_name) ############################################################################### ## debugtalk.py module loader ############################################################################### def load_python_module(module): """load python module. Args: module: python module Returns: dict: variables and functions mapping for specified python module { "variables": {}, "functions": {} } """ debugtalk_module = {"variables": {}, "functions": {}} for name, item in vars(module).items(): if validator.is_function((name, item)): debugtalk_module["functions"][name] = item elif validator.is_variable((name, item)): if isinstance(item, tuple): continue debugtalk_module["variables"][name] = item else: pass return debugtalk_module def load_builtin_module(): """load built_in module""" built_in_module = load_python_module(builtin) return built_in_module def load_debugtalk_module(): """load project debugtalk.py module debugtalk.py should be located in project working directory. Returns: dict: debugtalk module mapping { "variables": {}, "functions": {} } """ # load debugtalk.py module imported_module = importlib.import_module("debugtalk") debugtalk_module = load_python_module(imported_module) return debugtalk_module def get_module_item(module_mapping, item_type, item_name): """get expected function or variable from module mapping. Args: module_mapping(dict): module mapping with variables and functions. { "variables": {}, "functions": {} } item_type(str): "functions" or "variables" item_name(str): function name or variable name Returns: object: specified variable or function object. Raises: exceptions.FunctionNotFound: If specified function not found in module mapping exceptions.VariableNotFound: If specified variable not found in module mapping """ try: return module_mapping[item_type][item_name] except KeyError: err_msg = "{} not found in debugtalk.py module!\n".format(item_name) err_msg += "module mapping: {}".format(module_mapping) if item_type == "functions": raise exceptions.FunctionNotFound(err_msg) else: raise exceptions.VariableNotFound(err_msg) ############################################################################### ## testcase loader ############################################################################### def _load_teststeps(test_block, project_mapping): """load teststeps with api/testcase references Args: test_block (dict): test block content, maybe in 3 formats. # api reference { "name": "add product to cart", "api": "api_add_cart()", "validate": [] } # testcase reference { "name": "add product to cart", "suite": "create_and_check()", "validate": [] } # define directly { "name": "checkout cart", "request": {}, "validate": [] } Returns: list: loaded teststeps list """ def extend_api_definition(block): ref_call = block["api"] def_block = _get_block_by_name(ref_call, "def-api", project_mapping) _extend_block(block, def_block) teststeps = [] # reference api if "api" in test_block: extend_api_definition(test_block) teststeps.append(test_block) # reference testcase elif "suite" in test_block: # TODO: replace suite with testcase ref_call = test_block["suite"] block = _get_block_by_name(ref_call, "def-testcase", project_mapping) # TODO: bugfix lost block config variables for teststep in block["teststeps"]: if "api" in teststep: extend_api_definition(teststep) teststeps.append(teststep) # define directly else: teststeps.append(test_block) return teststeps def _load_testcase(raw_testcase, project_mapping): """load testcase/testsuite with api/testcase references Args: raw_testcase (list): raw testcase content loaded from JSON/YAML file: [ # config part { "config": { "name": "", "def": "suite_order()", "request": {} } }, # teststeps part { "test": {...} }, { "test": {...} } ] project_mapping (dict): project_mapping Returns: dict: loaded testcase content { "config": {}, "teststeps": [teststep11, teststep12] } """ loaded_testcase = {"config": {}, "teststeps": []} for item in raw_testcase: # TODO: add json schema validation if not isinstance(item, dict) or len(item) != 1: raise exceptions.FileFormatError("Testcase format error: {}".format(item)) key, test_block = item.popitem() if not isinstance(test_block, dict): raise exceptions.FileFormatError("Testcase format error: {}".format(item)) if key == "config": loaded_testcase["config"].update(test_block) elif key == "test": loaded_testcase["teststeps"].extend( _load_teststeps(test_block, project_mapping) ) else: logger.warning( "unexpected block key: {}. block key should only be 'config' or 'test'.".format( key ) ) return loaded_testcase def _get_block_by_name(ref_call, ref_type, project_mapping): """get test content by reference name. Args: ref_call (str): call function. e.g. api_v1_Account_Login_POST($UserName, $Password) ref_type (enum): "def-api" or "def-testcase" project_mapping (dict): project_mapping Returns: dict: api/testcase definition. Raises: exceptions.ParamsError: call args number is not equal to defined args number. """ function_meta = parser.parse_function(ref_call) func_name = function_meta["func_name"] call_args = function_meta["args"] block = _get_test_definition(func_name, ref_type, project_mapping) def_args = block.get("function_meta", {}).get("args", []) if len(call_args) != len(def_args): err_msg = "{}: call args number is not equal to defined args number!\n".format( func_name ) err_msg += "defined args: {}\n".format(def_args) err_msg += "reference args: {}".format(call_args) logger.error(err_msg) raise exceptions.ParamsError(err_msg) args_mapping = {} for index, item in enumerate(def_args): if call_args[index] == item: continue args_mapping[item] = call_args[index] if args_mapping: block = parser.substitute_variables(block, args_mapping) return block def _get_test_definition(name, ref_type, project_mapping): """get expected api or testcase. Args: name (str): api or testcase name ref_type (enum): "def-api" or "def-testcase" project_mapping (dict): project_mapping Returns: dict: expected api/testcase info if found. Raises: exceptions.ApiNotFound: api not found exceptions.TestcaseNotFound: testcase not found """ block = project_mapping.get(ref_type, {}).get(name) if not block: err_msg = "{} not found!".format(name) if ref_type == "def-api": raise exceptions.ApiNotFound(err_msg) else: # ref_type == "def-testcase": raise exceptions.TestcaseNotFound(err_msg) return block def _extend_block(ref_block, def_block): """extend ref_block with def_block. Args: def_block (dict): api definition dict. ref_block (dict): reference block Returns: dict: extended reference block. Examples: >>> def_block = { "name": "get token 1", "request": {...}, "validate": [{'eq': ['status_code', 200]}] } >>> ref_block = { "name": "get token 2", "extract": [{"token": "content.token"}], "validate": [{'eq': ['status_code', 201]}, {'len_eq': ['content.token', 16]}] } >>> _extend_block(def_block, ref_block) { "name": "get token 2", "request": {...}, "extract": [{"token": "content.token"}], "validate": [{'eq': ['status_code', 201]}, {'len_eq': ['content.token', 16]}] } """ # TODO: override variables def_validators = def_block.get("validate") or def_block.get("validators", []) ref_validators = ref_block.get("validate") or ref_block.get("validators", []) def_extrators = ( def_block.get("extract") or def_block.get("extractors") or def_block.get("extract_binds", []) ) ref_extractors = ( ref_block.get("extract") or ref_block.get("extractors") or ref_block.get("extract_binds", []) ) ref_block.update(def_block) ref_block["validate"] = _merge_validator(def_validators, ref_validators) ref_block["extract"] = _merge_extractor(def_extrators, ref_extractors) def _convert_validators_to_mapping(validators): """convert validators list to mapping. Args: validators (list): validators in list Returns: dict: validators mapping, use (check, comparator) as key. Examples: >>> validators = [ {"check": "v1", "expect": 201, "comparator": "eq"}, {"check": {"b": 1}, "expect": 200, "comparator": "eq"} ] >>> _convert_validators_to_mapping(validators) { ("v1", "eq"): {"check": "v1", "expect": 201, "comparator": "eq"}, ('{"b": 1}', "eq"): {"check": {"b": 1}, "expect": 200, "comparator": "eq"} } """ validators_mapping = {} for validator in validators: validator = parser.parse_validator(validator) if not isinstance(validator["check"], collections.Hashable): check = json.dumps(validator["check"]) else: check = validator["check"] key = (check, validator["comparator"]) validators_mapping[key] = validator return validators_mapping def _merge_validator(def_validators, ref_validators): """merge def_validators with ref_validators. Args: def_validators (list): ref_validators (list): Returns: list: merged validators Examples: >>> def_validators = [{'eq': ['v1', 200]}, {"check": "s2", "expect": 16, "comparator": "len_eq"}] >>> ref_validators = [{"check": "v1", "expect": 201}, {'len_eq': ['s3', 12]}] >>> _merge_validator(def_validators, ref_validators) [ {"check": "v1", "expect": 201, "comparator": "eq"}, {"check": "s2", "expect": 16, "comparator": "len_eq"}, {"check": "s3", "expect": 12, "comparator": "len_eq"} ] """ if not def_validators: return ref_validators elif not ref_validators: return def_validators else: def_validators_mapping = _convert_validators_to_mapping(def_validators) ref_validators_mapping = _convert_validators_to_mapping(ref_validators) def_validators_mapping.update(ref_validators_mapping) return list(def_validators_mapping.values()) def _merge_extractor(def_extrators, ref_extractors): """merge def_extrators with ref_extractors Args: def_extrators (list): [{"var1": "val1"}, {"var2": "val2"}] ref_extractors (list): [{"var1": "val111"}, {"var3": "val3"}] Returns: list: merged extractors Examples: >>> def_extrators = [{"var1": "val1"}, {"var2": "val2"}] >>> ref_extractors = [{"var1": "val111"}, {"var3": "val3"}] >>> _merge_extractor(def_extrators, ref_extractors) [ {"var1": "val111"}, {"var2": "val2"}, {"var3": "val3"} ] """ if not def_extrators: return ref_extractors elif not ref_extractors: return def_extrators else: extractor_dict = OrderedDict() for api_extrator in def_extrators: if len(api_extrator) != 1: logger.warning("incorrect extractor: {}".format(api_extrator)) continue var_name = list(api_extrator.keys())[0] extractor_dict[var_name] = api_extrator[var_name] for test_extrator in ref_extractors: if len(test_extrator) != 1: logger.warning("incorrect extractor: {}".format(test_extrator)) continue var_name = list(test_extrator.keys())[0] extractor_dict[var_name] = test_extrator[var_name] extractor_list = [] for key, value in extractor_dict.items(): extractor_list.append({key: value}) return extractor_list def load_folder_content(folder_path): """load api/testcases/testsuites definitions from folder. Args: folder_path (str): api/testcases/testsuites files folder. Returns: dict: api definition mapping. { "tests/api/basic.yml": [ {"api": {"def": "api_login", "request": {}, "validate": []}}, {"api": {"def": "api_logout", "request": {}, "validate": []}} ] } """ items_mapping = {} for file_path in load_folder_files(folder_path): items_mapping[file_path] = load_file(file_path) return items_mapping def load_api_folder(api_folder_path): """load api definitions from api folder. Args: api_folder_path (str): api files folder. api file should be in the following format: [ { "api": { "def": "api_login", "request": {}, "validate": [] } }, { "api": { "def": "api_logout", "request": {}, "validate": [] } } ] Returns: dict: api definition mapping. { "api_login": { "function_meta": {"func_name": "api_login", "args": [], "kwargs": {}} "request": {} }, "api_logout": { "function_meta": {"func_name": "api_logout", "args": [], "kwargs": {}} "request": {} } } """ api_definition_mapping = {} api_items_mapping = load_folder_content(api_folder_path) for api_file_path, api_items in api_items_mapping.items(): # TODO: add JSON schema validation for api_item in api_items: key, api_dict = api_item.popitem() api_def = api_dict.pop("def") function_meta = parser.parse_function(api_def) func_name = function_meta["func_name"] if func_name in api_definition_mapping: logger.warning("API definition duplicated: {}".format(func_name)) api_dict["function_meta"] = function_meta api_definition_mapping[func_name] = api_dict return api_definition_mapping def load_test_folder(test_folder_path): """load testcases definitions from folder. Args: test_folder_path (str): testcases files folder. testcase file should be in the following format: [ { "config": { "def": "create_and_check", "request": {}, "validate": [] } }, { "test": { "api": "get_user", "validate": [] } } ] Returns: dict: testcases definition mapping. { "create_and_check": [ {"config": {}}, {"test": {}}, {"test": {}} ], "tests/testcases/create_and_get.yml": [ {"config": {}}, {"test": {}}, {"test": {}} ] } """ test_definition_mapping = {} test_items_mapping = load_folder_content(test_folder_path) for test_file_path, items in test_items_mapping.items(): # TODO: add JSON schema validation testcase = {"config": {}, "teststeps": []} for item in items: key, block = item.popitem() if key == "config": testcase["config"].update(block) if "def" not in block: test_definition_mapping[test_file_path] = testcase continue testcase_def = block.pop("def") function_meta = parser.parse_function(testcase_def) func_name = function_meta["func_name"] if func_name in test_definition_mapping: logger.warning("API definition duplicated: {}".format(func_name)) testcase["function_meta"] = function_meta test_definition_mapping[func_name] = testcase else: # key == "test": testcase["teststeps"].append(block) return test_definition_mapping def locate_debugtalk_py(start_path): """locate debugtalk.py file. Args: start_path (str): start locating path, maybe testcase file path or directory path """ try: debugtalk_path = locate_file(start_path, "debugtalk.py") return os.path.abspath(debugtalk_path) except exceptions.FileNotFound: return None def load_project_tests(test_path, dot_env_path=None): """load api, testcases, .env, builtin module and debugtalk.py. api/testcases folder is relative to project_working_directory Args: test_path (str): test file/folder path, locate pwd from this path. dot_env_path (str): specified .env file path Returns: dict: project loaded api/testcases definitions, environments and debugtalk.py module. """ project_mapping = {} debugtalk_path = locate_debugtalk_py(test_path) # locate PWD with debugtalk.py path if debugtalk_path: # The folder contains debugtalk.py will be treated as PWD. project_working_directory = os.path.dirname(debugtalk_path) else: # debugtalk.py is not found, use os.getcwd() as PWD. project_working_directory = os.getcwd() # add PWD to sys.path sys.path.insert(0, project_working_directory) # load .env dot_env_path = dot_env_path or os.path.join(project_working_directory, ".env") if os.path.isfile(dot_env_path): project_mapping["env"] = load_dot_env_file(dot_env_path) else: project_mapping["env"] = {} # load debugtalk.py if debugtalk_path: project_mapping["debugtalk"] = load_debugtalk_module() else: project_mapping["debugtalk"] = {"variables": {}, "functions": {}} project_mapping["def-api"] = load_api_folder( os.path.join(project_working_directory, "api") ) # TODO: replace suite with testcases project_mapping["def-testcase"] = load_test_folder( os.path.join(project_working_directory, "suite") ) return project_mapping def load_tests(path, dot_env_path=None): """load testcases from file path, extend and merge with api/testcase definitions. Args: path (str/list): testcase file/foler path. path could be in several types: - absolute/relative file path - absolute/relative folder path - list/set container with file(s) and/or folder(s) dot_env_path (str): specified .env file path Returns: list: testcases list, each testcase is corresponding to a file [ { # testcase data structure "config": { "name": "desc1", "path": "testcase1_path", "variables": [], # optional "request": {} # optional "refs": { "debugtalk": { "variables": {}, "functions": {} }, "env": {}, "def-api": {}, "def-testcase": {} } }, "teststeps": [ # teststep data structure { 'name': 'test step desc1', 'variables': [], # optional 'extract': [], # optional 'validate': [], 'request': {}, 'function_meta': {} }, teststep2 # another teststep dict ] }, testcase_dict_2 # another testcase dict ] """ if isinstance(path, (list, set)): testcases_list = [] for file_path in set(path): testcases = load_tests(file_path, dot_env_path) if not testcases: continue testcases_list.extend(testcases) return testcases_list if not os.path.exists(path): err_msg = "path not exist: {}".format(path) logger.error(err_msg) raise exceptions.FileNotFound(err_msg) if not os.path.isabs(path): path = os.path.join(os.getcwd(), path) if os.path.isdir(path): files_list = load_folder_files(path) testcases_list = load_tests(files_list, dot_env_path) elif os.path.isfile(path): try: raw_testcase = load_file(path) project_mapping = load_project_tests(path, dot_env_path) testcase = _load_testcase(raw_testcase, project_mapping) testcase["config"]["path"] = path testcase["config"]["refs"] = project_mapping testcases_list = [testcase] except exceptions.FileFormatError: testcases_list = [] return testcases_list def load_locust_tests(path, dot_env_path=None): """load locust testcases Args: path (str): testcase/testsuite file path. dot_env_path (str): specified .env file path Returns: dict: locust testcases with weight { "config": {...}, "tests": [ # weight 3 [teststep11], [teststep11], [teststep11], # weight 2 [teststep21, teststep22], [teststep21, teststep22] ] } """ raw_testcase = load_file(path) project_mapping = load_project_tests(path, dot_env_path) config = {"refs": project_mapping} tests = [] for item in raw_testcase: key, test_block = item.popitem() if key == "config": config.update(test_block) elif key == "test": teststeps = _load_teststeps(test_block, project_mapping) weight = test_block.pop("weight", 1) for _ in range(weight): tests.append(teststeps) return {"config": config, "tests": tests}