import sys import os import asyncio import aiohttp import traceback from tqdm import tqdm # 导入进度条库 import time def get_parent_directory(file_path, levels=1): """获取指定层级的父目录""" path = os.path.abspath(file_path) for _ in range(levels): path = os.path.dirname(path) return path parent_dir = get_parent_directory(__file__, 5) # 获取上五级目录 sys.path.append(parent_dir) from 测试组.脚本.造数脚本2.Util.random_util import RandomUtil from 测试组.脚本.造数脚本2.Util.dingtalk_helper import DingTalkHelper # 钉钉机器人 access_token 和 secret ACCESS_TOKEN = '4625f6690acd9347fae5b3a05af598be63e73d604b933a9b3902425b8f136d4d' SECRET = 'SEC3b6937550bd297b5491855f6f40c2ff1b41bc8c495e118ba9848742b1ddf8f19' # 请求URL和请求头 apiname = "设备维保通知" url = "http://192.168.6.168:5537/api/notify/save" headers = { "token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjE3NjA2MDczMTAsInVzZXJuYW1lIjoiZ2x5In0.3DC4-4oUMuIpSDT-dYn8bv9JvuPadIssUQfIKAnjAL4", "Content-Type": "application/json" } dingtalk_helper = DingTalkHelper(ACCESS_TOKEN, SECRET) gender = "1" mother_id = "" mother_wholeCode = "" father_id = "" father_wholeCode = "" algebra = 1 preCode = "性能测试" # 生成请求体 def create_animal_data(): random_code = RandomUtil.generate_random_number_string(0, 999999999) # random_date = RandomUtil.generate_random_date("2023-01-01", "2025-03-25") return { "id": "", "type": "6", "title": f"压测中{random_code}", "content": ( "压测中" * 500 ), "files": "/userfiles/1588133301094375425/程序附件/notify/notify/2025/10/15/173933/cs.jpg", "status": "1", "notifyRecordIds": "", "isNotice": "0", "receivingRange": "1" } # 异步发送单个请求,并返回是否成功 async def create_animal(session: aiohttp.ClientSession, index: int, max_retries: int = 3): data = create_animal_data() attempt = 0 while attempt < max_retries: try: async with session.post(url, json=data, headers=headers) as response: resp_text = await response.text() if response.status == 200: print(f"[INFO] 第 {index} 个{apiname}创建成功,响应:{resp_text}") return True else: print(f"[ERROR] 第 {index} 个{apiname}创建失败,状态码:{response.status},响应:{resp_text}") return False except Exception as e: attempt += 1 status_code = getattr(e, 'status', 'N/A') error_trace = traceback.format_exc() full_log = ( f"请求URL: {url}\n" f"请求头: {headers}\n" f"请求数据: {data}\n" f"状态码: {status_code}\n" f"异常信息: {str(e)}\n" f"堆栈跟踪:\n{error_trace}\n" f"重试次数: {attempt}/{max_retries}" ) print(f"[EXCEPTION] 第 {index} 个{apiname}请求异常:\n{full_log}\n") if attempt < max_retries: # 重试前等待 1 秒 await asyncio.sleep(1) else: return False # 批量异步创建信息,同时显示进度条 async def batch_create_animals(batch_size: int): tasks = [] # 设置超时时间为60秒(可根据需要调整) timeout = aiohttp.ClientTimeout(total=60) async with aiohttp.ClientSession(timeout=timeout) as session: for i in range(1, batch_size + 1): task = asyncio.ensure_future(create_animal(session, i)) tasks.append(task) success_count = 0 # 使用 asyncio.as_completed 逐个等待任务完成,并更新进度条 with tqdm(total=batch_size, desc="创建进度") as pbar: for finished in asyncio.as_completed(tasks): result = await finished if result: success_count += 1 pbar.update(1) summary = f"\n[SUMMARY] 成功创建 {success_count}/{batch_size} 个{apiname}" print(summary) dingtalk_helper.send_message( f"你的批量创建{apiname}模块数据已完成:成功创建 {success_count}/{batch_size} 个{apiname},可前往系统查看!") # 主入口 if __name__ == '__main__': n = 100000 # 可根据需要调整批量创建数量 asyncio.run(batch_create_animals(n))