import sys import os import asyncio import aiohttp import traceback import time from tqdm import tqdm import logging def get_parent_directory(file_path, levels=1): """获取指定层级的父目录""" path = os.path.abspath(file_path) for _ in range(levels): path = os.path.dirname(path) return path parent_dir = get_parent_directory(__file__, 5) # 获取上五级目录 sys.path.append(parent_dir) from 测试组.脚本.造数脚本2.Util.random_util import RandomUtil from 测试组.脚本.造数脚本2.Util.dingtalk_helper import DingTalkHelper # 钉钉机器人 access_token 和 secret ACCESS_TOKEN = '4625f6690acd9347fae5b3a05af598be63e73d604b933a9b3902425b8f136d4d' SECRET = 'SEC3b6937550bd297b5491855f6f40c2ff1b41bc8c495e118ba9848742b1ddf8f19' apiname = "设备维保通知" url = "http://192.168.6.168:5537/api/notify/save" headers = { "token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjE3NjA2MDczMTAsInVzZXJuYW1lIjoiZ2x5In0.3DC4-4oUMuIpSDT-dYn8bv9JvuPadIssUQfIKAnjAL4", "Content-Type": "application/json" } # 并发 worker 数(控制并发请求数),根据本机性能与被测服务能力调整,例如 50、100 NUM_WORKERS = 100 # 总请求数(不要一开始就设过高) TOTAL_REQUESTS = 2000000 # 每个请求最大重试次数 MAX_RETRIES = 3 # aiohttp 连接/超时设置 REQUEST_TIMEOUT = 60 # 秒 # ------------------------ dingtalk_helper = DingTalkHelper(ACCESS_TOKEN, SECRET) # 预生成不变的大 body 字段,减少每次构造开销 LARGE_CONTENT = "压测中" * 500 FILES_PATH = "/userfiles/1588133301094375425/程序附件/notify/notify/2025/10/15/173933/cs.jpg" def create_animal_data(idx: int): random_code = RandomUtil.generate_random_number_string(0, 999999999) return { "id": "", "type": "6", "title": f"压测中{random_code}", "content": LARGE_CONTENT, "files": FILES_PATH, "status": "1", "notifyRecordIds": "", "isNotice": "0", "receivingRange": "1" } async def create_animal(session: aiohttp.ClientSession, index: int, max_retries: int = MAX_RETRIES): """ 单个请求,包含重试。返回 (success: bool, index: int, errmsg: str|None) """ attempt = 0 while attempt < max_retries: data = create_animal_data(index) try: async with session.post(url, json=data, headers=headers) as resp: text = await resp.text() if resp.status == 200: # 成功不频繁打印,交给外层进度统计 return True, index, None else: # 非 200 当成失败(可自定义对特定状态的判定) errmsg = f"状态码 {resp.status}, body: {text}" attempt += 1 # 指数退避,避免瞬时打爆目标 await asyncio.sleep(min(10, 2 ** attempt)) except Exception as e: # 记录简短异常信息(避免每次打印超长堆栈) err_msg = f"{type(e).__name__}: {str(e)}" attempt += 1 await asyncio.sleep(min(10, 2 ** attempt)) # 走到这里表示重试失败 return False, index, f"重试 {max_retries} 次后失败" async def worker(name: int, queue: asyncio.Queue, session: aiohttp.ClientSession, pbar: tqdm, success_counter: dict, failed_list: list, lock: asyncio.Lock): """ worker 从队列获取任务并执行 create_animal """ while True: idx = await queue.get() if idx is None: queue.task_done() break try: ok, index, errmsg = await create_animal(session, idx) async with lock: if ok: success_counter["count"] += 1 else: failed_list.append((index, errmsg)) pbar.update(1) except Exception as e: # 捕获意外错误,记录并继续 async with lock: failed_list.append((idx, f"Worker异常: {type(e).__name__}:{e}")) pbar.update(1) finally: queue.task_done() async def batch_create_animals(total: int, num_workers: int): timeout = aiohttp.ClientTimeout(total=REQUEST_TIMEOUT) connector = aiohttp.TCPConnector(limit=num_workers, limit_per_host=num_workers, force_close=False) # 减少session请求头每次设置的开销,也可在 post 中单独传 headers async with aiohttp.ClientSession(timeout=timeout, connector=connector) as session: queue = asyncio.Queue() for i in range(1, total + 1): await queue.put(i) # 结束信号 for _ in range(num_workers): await queue.put(None) success_counter = {"count": 0} failed_list = [] lock = asyncio.Lock() # tqdm 在主线程创建 with tqdm(total=total, desc="创建进度") as pbar: workers = [ asyncio.create_task(worker(i, queue, session, pbar, success_counter, failed_list, lock)) for i in range(num_workers) ] # 等待所有 worker 完成 await asyncio.gather(*workers) # 汇总 total_success = success_counter["count"] total_failed = len(failed_list) summary = f"\n[SUMMARY] 成功创建 {total_success}/{total} 个 {apiname},失败 {total_failed} 个" print(summary) # 可只发送简短信息到钉钉(避免太长) dingtalk_helper.send_message( f"你的批量创建{apiname}模块数据已完成:成功创建 {total_success}/{total} 个,失败 {total_failed} 个(详见日志)。" ) # 如果需要打印失败详情,这里只打印前 50 条,避免控制台炸屏 if total_failed: print("失败示例(最多显示50条):") for idx, err in failed_list[:50]: print(f" #{idx} => {err}") if __name__ == '__main__': # 运行前建议: # 1) 先用小的总数(例如 100 或 1000)跑通; # 2) 观察目标服务器与本机负载,再把 TOTAL_REQUESTS 调高; # 3) 调整 NUM_WORKERS 与 REQUEST_TIMEOUT 以获得最佳吞吐/稳定性。 NUM_WORKERS = NUM_WORKERS TOTAL_REQUESTS = TOTAL_REQUESTS # 简单日志设置(只打印 WARNING 及以上,避免太多日志) logging.basicConfig(level=logging.WARNING, format='%(asctime)s %(levelname)s %(message)s') asyncio.run(batch_create_animals(TOTAL_REQUESTS, NUM_WORKERS))