# encoding: utf-8 import io import logging import os import platform import time import unittest from base64 import b64encode try: from collections.abc import Iterable except ImportError: from collections import Iterable from datetime import datetime from jinja2 import Template from markupsafe import escape from httprunner.__about__ import __version__ from httprunner.compat import basestring, bytes, json, numeric_types logger = logging.getLogger(__name__) def get_platform(): return { "httprunner_version": __version__, "python_version": "{} {}".format( platform.python_implementation(), platform.python_version() ), "platform": platform.platform(), } def get_summary(result): """get summary from test result""" summary = { "success": result.wasSuccessful(), "stat": { "testsRun": result.testsRun, "failures": len(result.failures), "errors": len(result.errors), "skipped": len(result.skipped), "expectedFailures": len(result.expectedFailures), "unexpectedSuccesses": len(result.unexpectedSuccesses), }, } summary["stat"]["successes"] = ( summary["stat"]["testsRun"] - summary["stat"]["failures"] - summary["stat"]["errors"] - summary["stat"]["skipped"] - summary["stat"]["expectedFailures"] - summary["stat"]["unexpectedSuccesses"] ) if getattr(result, "records", None): summary["time"] = { "start_at": result.start_at, "setup_hooks_duration": result.setup_hooks_duration, "teardown_hooks_duration": result.teardown_hooks_duration, "duration": result.duration, } summary["records"] = result.records else: summary["records"] = [] return summary def aggregate_stat(origin_stat, new_stat): """aggregate new_stat to origin_stat. Args: origin_stat (dict): origin stat dict, will be updated with new_stat dict. new_stat (dict): new stat dict. """ for key in new_stat: if key not in origin_stat: origin_stat[key] = new_stat[key] elif key == "start_at": # start datetime origin_stat[key] = min(origin_stat[key], new_stat[key]) else: origin_stat[key] += new_stat[key] def render_html_report(summary, html_report_name=None, html_report_template=None): """render html report with specified report name and template if html_report_name is not specified, use current datetime if html_report_template is not specified, use default report template """ if not html_report_template: html_report_template = os.path.join( os.path.abspath(os.path.dirname(__file__)), "templates", "report_template.html", ) logger.debug("No html report template specified, use default.") else: logger.info("render with html report template: {}".format(html_report_template)) logger.info("Start to render Html report ...") logger.debug("render data: {}".format(summary)) report_dir_path = os.path.join(os.getcwd(), "reports") start_at_timestamp = int(summary["time"]["start_at"]) summary["time"]["start_datetime"] = datetime.fromtimestamp( start_at_timestamp ).strftime("%Y-%m-%d %H:%M:%S") if html_report_name: summary["html_report_name"] = html_report_name report_dir_path = os.path.join(report_dir_path, html_report_name) html_report_name += "-{}.html".format(start_at_timestamp) else: summary["html_report_name"] = "" html_report_name = "{}.html".format(start_at_timestamp) if not os.path.isdir(report_dir_path): os.makedirs(report_dir_path) for index, suite_summary in enumerate(summary["details"]): if not suite_summary.get("name"): suite_summary["name"] = "test suite {}".format(index) for record in suite_summary.get("records"): meta_data = record["meta_data"] stringify_data(meta_data, "request") stringify_data(meta_data, "response") with io.open(html_report_template, "r", encoding="utf-8") as fp_r: template_content = fp_r.read() report_path = os.path.join(report_dir_path, html_report_name) with io.open(report_path, "w", encoding="utf-8") as fp_w: rendered_content = Template( template_content, extensions=["jinja2.ext.loopcontrols"] ).render(summary) fp_w.write(rendered_content) logger.info("Generated Html report: {}".format(report_path)) return report_path def stringify_data(meta_data, request_or_response): """ meta_data = { "request": {}, "response": {} } """ headers = meta_data[request_or_response]["headers"] request_or_response_dict = meta_data[request_or_response] for key, value in request_or_response_dict.items(): if isinstance(value, list): value = json.dumps(value, indent=2, ensure_ascii=False) elif isinstance(value, bytes): try: encoding = meta_data["response"].get("encoding") if not encoding or encoding == "None": encoding = "utf-8" if ( request_or_response == "response" and key == "content" and "image" in meta_data["response"]["content_type"] ): # display image value = "data:{};base64,{}".format( meta_data["response"]["content_type"], b64encode(value).decode(encoding), ) else: value = escape(value.decode(encoding)) except UnicodeDecodeError: pass elif not isinstance(value, (basestring, numeric_types, Iterable)): # class instance, e.g. MultipartEncoder() value = repr(value) meta_data[request_or_response][key] = value class HtmlTestResult(unittest.TextTestResult): """A html result class that can generate formatted html results. Used by TextTestRunner. """ def __init__(self, stream, descriptions, verbosity): super(HtmlTestResult, self).__init__(stream, descriptions, verbosity) self.records = [] def _record_test(self, test, status, attachment=""): data = { "name": test.shortDescription(), "status": status, "attachment": attachment, "meta_data": {}, } if hasattr(test, "meta_data"): data["meta_data"] = test.meta_data self.records.append(data) def startTestRun(self): self.start_at = time.time() def startTest(self, test): """add start test time""" super(HtmlTestResult, self).startTest(test) logger.info(test.shortDescription()) def addSuccess(self, test): super(HtmlTestResult, self).addSuccess(test) self._record_test(test, "success") print("") def addError(self, test, err): super(HtmlTestResult, self).addError(test, err) self._record_test(test, "error", self._exc_info_to_string(err, test)) print("") def addFailure(self, test, err): super(HtmlTestResult, self).addFailure(test, err) self._record_test(test, "failure", self._exc_info_to_string(err, test)) print("") def addSkip(self, test, reason): super(HtmlTestResult, self).addSkip(test, reason) self._record_test(test, "skipped", reason) print("") def addExpectedFailure(self, test, err): super(HtmlTestResult, self).addExpectedFailure(test, err) self._record_test(test, "ExpectedFailure", self._exc_info_to_string(err, test)) print("") def addUnexpectedSuccess(self, test): super(HtmlTestResult, self).addUnexpectedSuccess(test) self._record_test(test, "UnexpectedSuccess") print("") @property def duration(self): case_elapsed = 0 for record in self.records: elapsed_ms = record["meta_data"]["response"]["elapsed_ms"] if isinstance(elapsed_ms, (int, float)): case_elapsed += record["meta_data"]["response"]["elapsed_ms"] # 毫秒转秒级,保留三位 total_duration = ( case_elapsed / 1000 + self.setup_hooks_duration + self.teardown_hooks_duration ) return round(total_duration, 3) @property def setup_hooks_duration(self): # 整个case的前置函数消耗时间 res = 0 for record in self.records: setup_hooks_duration = record["meta_data"]["request"].get( "setup_hooks_duration" ) if setup_hooks_duration: res += setup_hooks_duration return res @property def teardown_hooks_duration(self): # 整个case的后置函数消耗时间 res = 0 for record in self.records: teardown_hooks_duration = record["meta_data"]["response"].get( "teardown_hooks_duration" ) if teardown_hooks_duration: res += teardown_hooks_duration return res