hyb
2025-12-30 5e753a15ff53faab2261a53367e44d38caf87041
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
import asyncio
import aiohttp
import traceback
import time
from Util.random_util import RandomUtil  # 你已有的工具
from Util.dingtalk_helper import DingTalkHelper
from tqdm import tqdm
import logging
 
# 钉钉机器人 access_token 和 secret
ACCESS_TOKEN = '4625f6690acd9347fae5b3a05af598be63e73d604b933a9b3902425b8f136d4d'
SECRET = 'SEC3b6937550bd297b5491855f6f40c2ff1b41bc8c495e118ba9848742b1ddf8f19'
 
apiname = "设备维保通知"
url = "http://192.168.6.168:5537/api/notify/save"
headers = {
    "token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjE3NjA2MDczMTAsInVzZXJuYW1lIjoiZ2x5In0.3DC4-4oUMuIpSDT-dYn8bv9JvuPadIssUQfIKAnjAL4",
    "Content-Type": "application/json"
}
 
# 并发 worker 数(控制并发请求数),根据本机性能与被测服务能力调整,例如 50、100
NUM_WORKERS = 100
 
# 总请求数(不要一开始就设过高)
TOTAL_REQUESTS = 2000000
 
# 每个请求最大重试次数
MAX_RETRIES = 3
 
# aiohttp 连接/超时设置
REQUEST_TIMEOUT = 60  # 秒
# ------------------------
 
dingtalk_helper = DingTalkHelper(ACCESS_TOKEN, SECRET)
 
# 预生成不变的大 body 字段,减少每次构造开销
LARGE_CONTENT = "压测中" * 500
FILES_PATH = "/userfiles/1588133301094375425/程序附件/notify/notify/2025/10/15/173933/cs.jpg"
 
 
def create_animal_data(idx: int):
    random_code = RandomUtil.generate_random_number_string(0, 999999999)
    return {
        "id": "",
        "type": "6",
        "title": f"压测中{random_code}",
        "content": LARGE_CONTENT,
        "files": FILES_PATH,
        "status": "1",
        "notifyRecordIds": "",
        "isNotice": "0",
        "receivingRange": "1"
    }
 
 
async def create_animal(session: aiohttp.ClientSession, index: int, max_retries: int = MAX_RETRIES):
    """
    单个请求,包含重试。返回 (success: bool, index: int, errmsg: str|None)
    """
    attempt = 0
    while attempt < max_retries:
        data = create_animal_data(index)
        try:
            async with session.post(url, json=data, headers=headers) as resp:
                text = await resp.text()
                if resp.status == 200:
                    # 成功不频繁打印,交给外层进度统计
                    return True, index, None
                else:
                    # 非 200 当成失败(可自定义对特定状态的判定)
                    errmsg = f"状态码 {resp.status}, body: {text}"
                    attempt += 1
                    # 指数退避,避免瞬时打爆目标
                    await asyncio.sleep(min(10, 2 ** attempt))
        except Exception as e:
            # 记录简短异常信息(避免每次打印超长堆栈)
            err_msg = f"{type(e).__name__}: {str(e)}"
            attempt += 1
            await asyncio.sleep(min(10, 2 ** attempt))
    # 走到这里表示重试失败
    return False, index, f"重试 {max_retries} 次后失败"
 
 
async def worker(name: int, queue: asyncio.Queue, session: aiohttp.ClientSession,
                 pbar: tqdm, success_counter: dict, failed_list: list, lock: asyncio.Lock):
    """
    worker 从队列获取任务并执行 create_animal
    """
    while True:
        idx = await queue.get()
        if idx is None:
            queue.task_done()
            break
        try:
            ok, index, errmsg = await create_animal(session, idx)
            async with lock:
                if ok:
                    success_counter["count"] += 1
                else:
                    failed_list.append((index, errmsg))
                pbar.update(1)
        except Exception as e:
            # 捕获意外错误,记录并继续
            async with lock:
                failed_list.append((idx, f"Worker异常: {type(e).__name__}:{e}"))
                pbar.update(1)
        finally:
            queue.task_done()
 
 
async def batch_create_animals(total: int, num_workers: int):
    timeout = aiohttp.ClientTimeout(total=REQUEST_TIMEOUT)
    connector = aiohttp.TCPConnector(limit=num_workers, limit_per_host=num_workers, force_close=False)
    # 减少session请求头每次设置的开销,也可在 post 中单独传 headers
    async with aiohttp.ClientSession(timeout=timeout, connector=connector) as session:
        queue = asyncio.Queue()
        for i in range(1, total + 1):
            await queue.put(i)
 
        # 结束信号
        for _ in range(num_workers):
            await queue.put(None)
 
        success_counter = {"count": 0}
        failed_list = []
        lock = asyncio.Lock()
 
        # tqdm 在主线程创建
        with tqdm(total=total, desc="创建进度") as pbar:
            workers = [
                asyncio.create_task(worker(i, queue, session, pbar, success_counter, failed_list, lock))
                for i in range(num_workers)
            ]
            # 等待所有 worker 完成
            await asyncio.gather(*workers)
 
        # 汇总
        total_success = success_counter["count"]
        total_failed = len(failed_list)
        summary = f"\n[SUMMARY] 成功创建 {total_success}/{total} 个 {apiname},失败 {total_failed} 个"
        print(summary)
 
        # 可只发送简短信息到钉钉(避免太长)
        dingtalk_helper.send_message(
            f"你的批量创建{apiname}模块数据已完成:成功创建 {total_success}/{total} 个,失败 {total_failed} 个(详见日志)。"
        )
 
        # 如果需要打印失败详情,这里只打印前 50 条,避免控制台炸屏
        if total_failed:
            print("失败示例(最多显示50条):")
            for idx, err in failed_list[:50]:
                print(f"  #{idx} => {err}")
 
 
if __name__ == '__main__':
    # 运行前建议:
    # 1) 先用小的总数(例如 100 或 1000)跑通;
    # 2) 观察目标服务器与本机负载,再把 TOTAL_REQUESTS 调高;
    # 3) 调整 NUM_WORKERS 与 REQUEST_TIMEOUT 以获得最佳吞吐/稳定性。
    NUM_WORKERS = NUM_WORKERS
    TOTAL_REQUESTS = TOTAL_REQUESTS
 
    # 简单日志设置(只打印 WARNING 及以上,避免太多日志)
    logging.basicConfig(level=logging.WARNING, format='%(asctime)s %(levelname)s %(message)s')
 
    asyncio.run(batch_create_animals(TOTAL_REQUESTS, NUM_WORKERS))