""" 集成压测脚本(带压测报告生成并通过钉钉发送摘要) 说明: - 该脚本基于之前的稳定 worker/队列 实现,运行后会记录每条请求的时间、状态码和延迟。 - 运行结束后会调用Util目录下的压测报告生成器(文件名: stress_test_report_generator.py),输出 HTML/JSON/CSV/(可选)DOCX 等文件。 - 生成后会把关键统计摘要通过 DingTalk 机器人发送(调用 DingTalkHelper.send_message)。 - 安装依赖:aiohttp, tqdm, numpy/pandas/matplotlib/python-docx(可选) - 确保 DingTalkHelper 类在你的 `Util.dingtalk_helper` 中可用,且 ACCESS_TOKEN/SECRET 正确。 """ import sys import os # 将上一级目录加入模块搜索路径 sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) import asyncio import aiohttp import time import traceback import datetime from tqdm import tqdm from Util.random_util import RandomUtil from Util.dingtalk_helper import DingTalkHelper import pymysql import random # --- 配置 --- ACCESS_TOKEN = '4625f6690acd9347fae5b3a05af598be63e73d604b933a9b3902425b8f136d4d' SECRET = 'SEC3b6937550bd297b5491855f6f40c2ff1b41bc8c495e118ba9848742b1ddf8f19' # 数据库配置 DB_CONFIG = { 'host': '192.168.6.190', 'port': 3306, 'user': 'dev', 'password': 'Hello@112', 'database': 'srps_ecnu', 'charset': 'utf8mb4', 'cursorclass': pymysql.cursors.DictCursor } apiname = "入驻笼位" url = "http://192.168.6.190:5561/api/base/cage/cage/enterCage" headers = { "token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjE3NjcwODE5NzMsInVzZXJuYW1lIjoiZ2x5In0.Gk5C1A26dmC3Q-deDUQtwS5Ssj0DSzQ7PcUNSJKl2Mw", "Content-Type": "application/json" } NUM_WORKERS = 100 TOTAL_REQUESTS = 244469 MAX_RETRIES = 3 REQUEST_TIMEOUT = 60 OUTPUT_DIR = './load_test_report' # --- 初始化 --- dingtalk_helper = DingTalkHelper(ACCESS_TOKEN, SECRET) LARGE_CONTENT = "压测中" * 500 FILES_PATH = "/userfiles/1588133301094375425/程序附件/notify/notify/2025/10/15/173933/cs.jpg" # 全局变量,存储从数据库获取的笼位列表 cage_list = [] def fetch_cages_from_db(): """从数据库获取笼位信息""" global cage_list try: connection = pymysql.connect(**DB_CONFIG) with connection.cursor() as cursor: # 查询所有笼位的id和code,可以添加where条件过滤需要测试的笼位 sql = "SELECT id, code FROM l_cage WHERE status = 1" # 假设有status字段,可以根据实际情况调整 cursor.execute(sql) results = cursor.fetchall() if not results: # 如果没有查询条件,查询所有笼位 sql = "SELECT id, code FROM l_cage" cursor.execute(sql) results = cursor.fetchall() cage_list = results print(f"从数据库获取到 {len(cage_list)} 个笼位") # 如果笼位数量少于请求数,打印警告 if len(cage_list) < TOTAL_REQUESTS: print(f"警告: 笼位数量({len(cage_list)})少于请求数({TOTAL_REQUESTS}),将循环使用笼位") connection.close() return cage_list except Exception as e: print(f"数据库连接失败: {e}") # 如果数据库连接失败,使用默认的笼位作为后备 cage_list = [{"id": "10386", "code": "hyb压测笼架0001-1-A"}] print(f"使用默认笼位: {cage_list[0]}") return cage_list def get_random_cage(index: int): """根据索引获取笼位,如果笼位数量不足则循环使用""" if not cage_list: # 如果列表为空,尝试重新获取 fetch_cages_from_db() if not cage_list: # 如果还是为空,返回默认值 return {"id": "10386", "code": "hyb压测笼架0001-1-A"} # 使用索引取模来循环获取笼位 cage_index = index % len(cage_list) return cage_list[cage_index] def create_animal_data(idx: int): """创建动物数据,使用动态获取的笼位信息""" random_code = RandomUtil.generate_random_number_string(0, 999999999) random_femaleNum = RandomUtil.generate_random_number_string(1, 5) random_maleNum = RandomUtil.generate_random_number_string(1, 5) random_data = RandomUtil.generate_random_date("2023-01-01", "2025-12-19") # 获取笼位信息 cage_info = get_random_cage(idx) return { "enterCageList": [ { "cage": { "id": cage_info["id"], "code": cage_info["code"] }, "cageStatus": "2", # 笼具状态 "user": { "id": "1995379969088860162" # 操作用户ID }, "researchGroup": { "id": "1995379941721026561", "name": "hyb课题组2" # 课题组信息 }, "femaleNum": random_femaleNum, # 雌性数量 "maleNum": random_maleNum, # 雄性数量 "enterTime": random_data, # 入笼时间 "expectLeaveTime": random_data, # 预计离笼时间 "leaveCenterTime": "", # 离开中心时间(为空) "animalVariety": { "id": "1595669319989637121", "name": "小鼠" # 动物品种 }, "animalStrain": { "id": "1595669618858962945", "name": "BALB/c" # 动物品系 }, "specifications": f"规格规格{random_code}", # 规格说明 "animalList": [], # 动物列表(空数组) "groupUser": [], # 组内用户(空数组) "ethicCode": f"伦理编号伦理编号{random_code}" # 伦理编号 } ] } async def perform_request(session: aiohttp.ClientSession, index: int, max_retries: int = MAX_RETRIES): attempt = 0 last_err = None while attempt < max_retries: data = create_animal_data(index) start = time.time() try: async with session.post(url, json=data, headers=headers) as resp: text = await resp.text() latency_ms = (time.time() - start) * 1000.0 status = resp.status if status == 200: return { 'index': index, 'timestamp': time.time(), 'status_code': status, 'latency_ms': latency_ms, 'response_size': len(text) if text is not None else None, 'error': None, 'cage_id': data['enterCageList'][0]['cage']['id'], # 记录使用的笼位ID 'cage_code': data['enterCageList'][0]['cage']['code'] # 记录使用的笼位编码 } else: last_err = f'status_{status}:{text}' attempt += 1 await asyncio.sleep(min(10, 2 ** attempt)) except Exception as e: latency_ms = (time.time() - start) * 1000.0 last_err = f'{type(e).__name__}:{str(e)}' attempt += 1 await asyncio.sleep(min(10, 2 ** attempt)) # 获取当前请求使用的笼位信息用于记录 data = create_animal_data(index) # 最终失败 return { 'index': index, 'timestamp': time.time(), 'status_code': 0, 'latency_ms': latency_ms if 'latency_ms' in locals() else 0, 'response_size': None, 'error': last_err, 'cage_id': data['enterCageList'][0]['cage']['id'], 'cage_code': data['enterCageList'][0]['cage']['code'] } async def worker(name: int, queue: asyncio.Queue, session: aiohttp.ClientSession, gen, pbar, success_counter: dict, failed_list: list, lock: asyncio.Lock): while True: idx = await queue.get() if idx is None: queue.task_done() break try: res = await perform_request(session, idx) # 记录到报告生成器 gen.record_result( index=res['index'], timestamp=res['timestamp'], status_code=int(res['status_code']), latency_ms=float(res['latency_ms']), response_size=res.get('response_size'), error=res.get('error') ) async with lock: if res['status_code'] and 200 <= res['status_code'] < 300: success_counter['count'] += 1 # 记录成功的笼位信息 if 'cage_id' in res and 'cage_code' in res: if 'success_cages' not in success_counter: success_counter['success_cages'] = [] cage_info = f"{res['cage_code']}({res['cage_id']})" if cage_info not in success_counter['success_cages']: success_counter['success_cages'].append(cage_info) else: failed_list.append((res['index'], res.get('error'), res.get('cage_code', '未知笼位'))) pbar.update(1) except Exception as e: async with lock: failed_list.append((idx, f'Worker异常:{type(e).__name__}:{e}', '未知笼位')) pbar.update(1) finally: queue.task_done() async def batch_create_animals(total: int, num_workers: int): # 首先从数据库获取笼位信息 print("正在从数据库获取笼位信息...") fetch_cages_from_db() if not cage_list: print("错误: 无法获取笼位信息,压测终止") return print(f"获取到 {len(cage_list)} 个笼位,将进行{apiname}压测") # 动态加载报告生成器模块(支持中文文件名) gen = None try: import importlib.util script_dir = os.path.dirname(os.path.abspath(__file__)) report_path = os.path.join(script_dir, 'H:\\项目\\造数脚本\\Util\\stress_test_report_generator.py') if os.path.exists(report_path): spec = importlib.util.spec_from_file_location('report_module', report_path) report_module = importlib.util.module_from_spec(spec) spec.loader.exec_module(report_module) LoadTestReportGenerator = getattr(report_module, 'LoadTestReportGenerator') else: # 备用:尝试直接导入模块名(若你的文件名已改为 ascii) from report_generator import LoadTestReportGenerator # type: ignore gen = LoadTestReportGenerator(test_name='压测任务', report_title='压测详细报告') except Exception as e: print('无法加载压测报告生成器,请确认stress_test_report_generator.py 文件位置正确。\n', e) raise timeout = aiohttp.ClientTimeout(total=REQUEST_TIMEOUT) connector = aiohttp.TCPConnector(limit=num_workers, limit_per_host=num_workers, force_close=False) async with aiohttp.ClientSession(timeout=timeout, connector=connector) as session: queue = asyncio.Queue() for i in range(1, total + 1): await queue.put(i) for _ in range(num_workers): await queue.put(None) success_counter = {'count': 0} failed_list = [] lock = asyncio.Lock() with tqdm(total=total, desc='创建进度') as pbar: workers = [ asyncio.create_task(worker(i, queue, session, gen, pbar, success_counter, failed_list, lock)) for i in range(num_workers) ] await asyncio.gather(*workers) # 任务完成,生成报告 os.makedirs(OUTPUT_DIR, exist_ok=True) outputs = gen.generate_report(OUTPUT_DIR, formats=['html', 'json', 'csv', 'docx']) stats = gen.compute_stats() # 构造钉钉摘要消息(中文) now_str = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') msg = [f'【{apiname} 压测报告】', f'生成时间:{now_str}'] msg.append(f"总请求数:{stats.get('total_requests',0)},成功:{stats.get('success_count',0)},失败:{stats.get('fail_count',0)},成功率:{stats.get('success_rate',0):.2%}") msg.append(f"总耗时(s):{stats.get('duration_seconds',0):.2f},平均吞吐(req/s):{stats.get('throughput_rps',0):.2f}") lat = stats.get('latency_ms', {}) msg.append(f"延迟(ms) - 平均:{lat.get('avg',0):.2f},P90:{lat.get('p90',0):.2f},P95:{lat.get('p95',0):.2f},P99:{lat.get('p99',0):.2f}") # 添加笼位信息统计 msg.append(f"数据库笼位总数:{len(cage_list)}") if 'success_cages' in success_counter: msg.append(f"成功使用的笼位数:{len(success_counter['success_cages'])}") # 显示部分成功笼位示例 if len(success_counter['success_cages']) > 0: sample_cages = success_counter['success_cages'][:5] # 显示前5个 msg.append(f"成功笼位示例:{', '.join(sample_cages)}") if len(success_counter['success_cages']) > 5: msg.append(f"...等{len(success_counter['success_cages'])}个笼位") # 列出生成的报告文件 file_list = [] for k, v in outputs.items(): if k == 'charts': for cname, cpath in v.items(): file_list.append(os.path.abspath(cpath)) else: file_list.append(os.path.abspath(v)) msg.append('生成文件:') for p in file_list: msg.append(p) final_msg = '\n'.join(msg) # 发送钉钉消息 try: dingtalk_helper.send_message(final_msg) except Exception as e: print('发送钉钉消息失败:', e) print('\n[SUMMARY] 已生成报告并发送钉钉摘要。') print('成功数:', success_counter['count'], ' 失败数:', len(failed_list)) print(f'数据库笼位总数:{len(cage_list)}') if 'success_cages' in success_counter: print(f'成功使用的笼位数:{len(success_counter["success_cages"])}') if failed_list: print('失败示例(最多显示50条):') for idx, err, cage_code in failed_list[:50]: print(f' #{idx} 笼位[{cage_code}] => {err}') if __name__ == '__main__': # 检查数据库连接 try: test_conn = pymysql.connect(**DB_CONFIG) test_conn.close() print("数据库连接测试成功") except Exception as e: print(f"数据库连接测试失败: {e}") print("将使用默认笼位进行压测") # 运行前建议先用小规模测试 TOTAL_REQUESTS = TOTAL_REQUESTS NUM_WORKERS = NUM_WORKERS asyncio.run(batch_create_animals(TOTAL_REQUESTS, NUM_WORKERS))