hyb
2025-05-20 e8003b5c66494c398fa8b716e0872771e2ea4af8
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
# encoding: utf-8
 
import json
import re
import logging
 
import pydash
import jsonpath
from httprunner import exceptions, utils
from loguru import logger as log
from httprunner.compat import OrderedDict, basestring, is_py2
 
text_extractor_regexp_compile = re.compile(r".*\(.*\).*")
list_condition_extractor_regexp_compile = re.compile(r'^for#\w+.*#\w.*')
 
logger = logging.getLogger(__name__)
 
 
class ResponseObject(object):
 
    def __init__(self, resp_obj):
        """ initialize with a requests.Response object
        @param (requests.Response instance) resp_obj
        """
        self.resp_obj = resp_obj
 
    def __getattr__(self, key):
        try:
            if key == "json":
                value = self.resp_obj.json()
            else:
                value = getattr(self.resp_obj, key)
 
            self.__dict__[key] = value
            return value
        except AttributeError:
            err_msg = "ResponseObject does not have attribute: {}".format(key)
            logger.error(err_msg)
            raise exceptions.ParamsError(err_msg)
 
    def _extract_field_with_regex(self, field):
        """ extract field from response content with regex.
            requests.Response body could be json or html text.
        @param (str) field should only be regex string that matched r".*\(.*\).*"
        e.g.
            self.text: "LB123abcRB789"
            field: "LB[\d]*(.*)RB[\d]*"
            return: abc
        """
        matched = re.search(field, self.text)
        if not matched:
            err_msg = u"Failed to extract data with regex! => {}\n".format(field)
            err_msg += u"response body: {}\n".format(self.text)
            logger.error(err_msg)
            raise exceptions.ExtractFailure(err_msg)
 
        return matched.group(1)
 
    def _extract_field_with_delimiter(self, field):
        """ response content could be json or html text.
        @param (str) field should be string joined by delimiter.
        e.g.
            "status_code"
            "headers"
            "cookies"
            "content"
            "headers.content-type"
            "content.person.name.first_name"
 
        support request body
        e.g.
            "request.body"
            "request.body.key"
        """
        # string.split(sep=None, maxsplit=-1) -> list of strings
        # e.g. "content.person.name" => ["content", "person.name"]
 
        try:
            top_query, sub_query = field.split('.', 1)
        except ValueError:
            top_query = field
            sub_query = None
 
        # request
        if top_query == 'request' and sub_query is not None:
            req = self.resp_obj.request
            if hasattr(req, 'body'):
                body = json.loads(req.body)
                if sub_query == 'body':
                    return body
                query_path = sub_query.replace('body.', '', 1)
                err_msg = f"request body not found: {field}"
                res = pydash.get(body, query_path, exceptions.ExtractFailure(err_msg))
                if isinstance(res, exceptions.ExtractFailure):
                    raise res
                return res
 
        # status_code
        if top_query in ["status_code", "encoding", "ok", "reason", "url"]:
            if sub_query:
                # status_code.XX
                err_msg = u"Failed to extract: {}\n".format(field)
                logger.error(err_msg)
                raise exceptions.ParamsError(err_msg)
 
            return getattr(self, top_query)
 
        # cookies
        elif top_query == "cookies":
            cookies = self.cookies.get_dict()
            if not sub_query:
                # extract cookies
                return cookies
 
            try:
                return cookies[sub_query]
            except KeyError:
                err_msg = u"Failed to extract cookie! => {}\n".format(field)
                err_msg += u"response cookies: {}\n".format(cookies)
                logger.error(err_msg)
                raise exceptions.ExtractFailure(err_msg)
 
        # elapsed
        elif top_query == "elapsed":
            available_attributes = u"available attributes: days, seconds, microseconds, total_seconds"
            if not sub_query:
                err_msg = u"elapsed is datetime.timedelta instance, attribute should also be specified!\n"
                err_msg += available_attributes
                logger.error(err_msg)
                raise exceptions.ParamsError(err_msg)
            elif sub_query in ["days", "seconds", "microseconds"]:
                return getattr(self.elapsed, sub_query)
            elif sub_query == "total_seconds":
                return self.elapsed.total_seconds()
            else:
                err_msg = "{} is not valid datetime.timedelta attribute.\n".format(sub_query)
                err_msg += available_attributes
                logger.error(err_msg)
                raise exceptions.ParamsError(err_msg)
 
        # headers
        elif top_query == "headers":
            headers = self.headers
            if not sub_query:
                # extract headers
                return headers
 
            try:
                return headers[sub_query]
            except KeyError:
                err_msg = u"Failed to extract header! => {}\n".format(field)
                err_msg += u"response headers: {}\n".format(headers)
                logger.error(err_msg)
                raise exceptions.ExtractFailure(err_msg)
 
        # response body
        elif top_query in ["content", "text", "json"]:
            try:
                body = self.json
            except exceptions.JSONDecodeError:
                body = self.text
 
            if not sub_query:
                # extract response body
                return body
 
            # 当body是dict时,使用jsonpath替换原有的取值方式
            return self._extract_with_jsonpath(body, field)
 
            if isinstance(body, (dict, list)):
                # content = {"xxx": 123}, content.xxx
                return utils.query_json(body, sub_query)
            elif sub_query.isdigit():
                # content = "abcdefg", content.3 => d
                return utils.query_json(body, sub_query)
            else:
                # content = "<html>abcdefg</html>", content.xxx
                err_msg = u"Failed to extract attribute from response body! => {}\n".format(field)
                err_msg += u"response body: {}\n".format(body)
                logger.error(err_msg)
                raise exceptions.ExtractFailure(err_msg)
 
        # new set response attributes in teardown_hooks
        elif top_query in self.__dict__:
            attributes = self.__dict__[top_query]
 
            if not sub_query:
                # extract response attributes
                return attributes
 
            if isinstance(attributes, (dict, list)):
                # attributes = {"xxx": 123}, content.xxx
                return utils.query_json(attributes, sub_query)
            elif sub_query.isdigit():
                # attributes = "abcdefg", attributes.3 => d
                return utils.query_json(attributes, sub_query)
            else:
                # content = "attributes.new_attribute_not_exist"
                err_msg = u"Failed to extract cumstom set attribute from teardown hooks! => {}\n".format(field)
                err_msg += u"response set attributes: {}\n".format(attributes)
                logger.error(err_msg)
                raise exceptions.TeardownHooksFailure(err_msg)
 
        # others
        else:
            err_msg = u"Failed to extract attribute from response! => {}\n".format(field)
            err_msg += u"available response attributes: status_code, cookies, elapsed, headers, content, text, json, encoding, ok, reason, url.\n\n"
            err_msg += u"If you want to set attribute in teardown_hooks, take the following example as reference:\n"
            err_msg += u"response.new_attribute = 'new_attribute_value'\n"
            logger.error(err_msg)
            raise exceptions.ParamsError(err_msg)
 
    def _extract_with_condition(self, field: str):
        """ condition extract
         for#content.res.list,id==1#content.a
        """
        field = field.replace(" ", "")
        separator = '#'
        keyword, valuepath_and_expression, extract_path = field.split(separator)
 
        if keyword == 'for':
            try:
                content = self.json
            except exceptions.JSONDecodeError:
                err_msg = "按条件提取只支持json格式的响应"
                log.error(err_msg)
                raise exceptions.ExtractFailure(err_msg)
 
            condition_list_path, expression = valuepath_and_expression.split(",")
            # 取值的时候,需要移除content.前缀
            condition_list = pydash.get(content,  condition_list_path.replace('content.', "", 1), None)
 
            err_msg = ""
            if not condition_list:
                err_msg = f'抽取条件:{condition_list_path}取值不存在'
            elif isinstance(condition_list, list) is False:
                err_msg = f'抽取条件的值只能是list类型,实际是{type(condition_list)}'
 
            if err_msg:
                log.error(err_msg)
                raise exceptions.ExtractFailure(err_msg)
 
            try:
                expect_path, expect_value = expression.split("==")
            except ValueError:
                err_msg = '抽取条件的表达式错误,正确写法如:id==1'
                log.error(err_msg)
                raise exceptions.ExtractFailure(err_msg)
 
            extract_value = None
            for d in condition_list:
                if expect_value == str(pydash.get(d, expect_path, "")):
                    # 当抽取条件满足时
                    # 如果抽取路径以content.开头,就从整个json取
                    # 否则,从当前的对象取
                    if extract_path.startswith('content.'):
                        extract_value = pydash.get(content, extract_path.replace('content.', "", 1))
                    else:
                        extract_value = pydash.get(d, extract_path)
                    break
 
            if not extract_value:
                err_msg = '抽取结果不存在'
                log.error(err_msg)
                raise exceptions.ExtractFailure(err_msg)
            return extract_value
 
    @staticmethod
    def _extract_with_jsonpath(obj: dict, field: str):
        path = field.replace("content", "$", 1)
        res: list = jsonpath.jsonpath(obj, path)
        if not res:
            err_msg = u"Failed to extract attribute from response body! => {}\n".format(field)
            err_msg += u"response body: {}\n".format(obj)
            raise exceptions.ExtractFailure(err_msg)
        else:
            return res[0]
 
    def extract_field(self, field):
        """ extract value from requests.Response.
        """
        if not isinstance(field, basestring):
            err_msg = u"Invalid extractor! => {}\n".format(field)
            logger.error(err_msg)
            raise exceptions.ParamsError(err_msg)
 
        msg = "extract: {}".format(field)
 
        if text_extractor_regexp_compile.match(field) and field.startswith("content.") is False:
            value = self._extract_field_with_regex(field)
        elif list_condition_extractor_regexp_compile.match(field.replace(" ", "")):
            value = self._extract_with_condition(field)
        else:
            value = self._extract_field_with_delimiter(field)
 
        if is_py2 and isinstance(value, unicode):
            value = value.encode("utf-8")
 
        msg += "\t=> {}".format(value)
        logger.debug(msg)
 
        return value
 
    def extract_response(self, extractors, context):
        """ extract value from requests.Response and store in OrderedDict.
        @param (list) extractors
            [
                {"resp_status_code": "status_code"},
                {"resp_headers_content_type": "headers.content-type"},
                {"resp_content": "content"},
                {"resp_content_person_first_name": "content.person.name.first_name"}
            ]
        @return (OrderDict) variable binds ordered dict
        """
        if not extractors:
            return {}
 
        logger.info("start to extract from response object.")
        extracted_variables_mapping = OrderedDict()
        extract_binds_order_dict = utils.convert_mappinglist_to_orderdict(extractors)
 
        for key, field in extract_binds_order_dict.items():
            if '$' in field:
                field = context.eval_content(field)
            extracted_variables_mapping[key] = self.extract_field(field)
 
        return extracted_variables_mapping