1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# -*- coding: utf-8 -*-
"""
@File    : request.py
@Time    : 2023/9/8 15:06
@Author  : geekbing
@LastEditTime : -
@LastEditors : -
@Description : 转化mitmproxy请求和响应数据(request and response data)
"""
import json
from typing import TypeVar
 
from pydantic import BaseModel
from loguru import logger
 
body = TypeVar("body", bytes, str)
 
 
def convert_cookies_to_dict(cookies):
    """
    将 mitmproxy 的 cookies 对象转换为可 JSON 序列化的字典
 
    :param cookies: mitmproxy cookies 对象
    :return: 可序列化的字典
    """
    result = {}
    for key, value in cookies.items():
        if hasattr(value, 'items'):
            result[key] = dict(value.items())
        else:
            result[key] = value
    return result
 
 
class RequestInfo(BaseModel):
    url: str
    body: str
    request_method: str
    request_headers: dict
    response_headers: dict
    cookies: dict
    request_cookies: dict
    response_content: str
    status_code: int
 
    def __init__(self, flow=None, **kwargs):
        if flow:
            kwargs.update(
                dict(
                    status_code=flow.response.status_code,
                    url=flow.request.url,
                    request_method=flow.request.method,
                    request_headers=dict(flow.request.headers),
                    response_headers=dict(flow.response.headers),
                    response_content=self.get_response(flow.response),
                    body=self.get_body(flow.request),
                    cookies=convert_cookies_to_dict(flow.response.cookies),
                    request_cookies=convert_cookies_to_dict(flow.request.cookies),
                )
            )
        super().__init__(**kwargs)
 
    @classmethod
    def translate_json(cls, request_data):
        """
        将请求数据转换为json格式
 
        :param request_data: 接口请求数据
        :return: 返回json格式字符串
        """
        try:
            if isinstance(request_data, dict):
                return json.dumps(request_data, indent=4, ensure_ascii=False)
            else:
                return json.dumps(
                    json.loads(request_data), indent=4, ensure_ascii=False
                )
        except (TypeError, json.JSONDecodeError) as e:
            logger.bind(name=None).warning(f"解析json格式失败: {e}")
            return request_data
 
    @classmethod
    def get_response(cls, response):
        content_type = response.headers.get("Content-Type").lower()
        if "json" in content_type:
            return cls.translate_json(response.text)
        if "text" in content_type or "xml" in content_type:
            return response.text
        return response.data.decode("utf-8")
 
    @classmethod
    def get_body(cls, request):
        if len(request.content) == 0:
            return ""
        content_type = request.headers.get("Content-Type").lower()
        if "json" in content_type:
            return cls.translate_json(request.text)
        if "text" in content_type or "xml" in content_type:
            return request.text
        if "x-www-form-urlencoded" in content_type:
            return json.dumps(dict(request.urlencoded_form))
        return request.data.decode("utf-8")
 
    def dumps(self):
        try:
            return self.json(ensure_ascii=False)
        except Exception as e:
            logger.error(f"序列化错误: {e}")
            return None