import sys import os # 将上一级目录加入模块搜索路径 sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) import asyncio import aiohttp import time import traceback import datetime import pandas as pd import pymysql import random from tqdm import tqdm from Util.dingtalk_helper import DingTalkHelper # 将上一级目录加入模块搜索路径 sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) # --- 配置 --- ACCESS_TOKEN = '4625f6690acd9347fae5b3a05af598be63e73d604b933a9b3902425b8f136d4d' SECRET = 'SEC3b6937550bd297b5491855f6f40c2ff1b41bc8c495e118ba9848742b1ddf8f19' apiname = "pda笼位更新" url = "http://qilu.baoyizn.com:9935/api/base/cage/cage/trainingCage" headers = { "user-agent": "Mozilla/5.0 (Linux; Android 10; DS-MDT301 Build/QP1A.190711.020; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/74.0.3729.186 Mobile Safari/537.36 uni-app Html5Plus/1.0 (Immersed/24.0)", "Content-Type": "application/json; charset=utf-8", "Accept-Encoding": "gzip" } NUM_WORKERS = 100 TOTAL_REQUESTS = 20000 MAX_RETRIES = 3 REQUEST_TIMEOUT = 60 OUTPUT_DIR = './load_test_report' # --- 数据库配置 --- # DB_CONFIG = { # 'host': 'rm-wz97y18i4t16hfsk6qo.mysql.rds.aliyuncs.com', # 'database': 'srps_qilu', # 'user': 'dev', # 'password': 'Hello@112', # 'charset': 'utf8mb4' # } DB_CONFIG = { 'host': 'rm-wz90i533p18631e685o.mysql.rds.aliyuncs.com', 'database': 'srps_qilu_prod', 'user': 'dev', 'password': 'Hello@112', 'charset': 'utf8mb4' } # --- 初始化 --- dingtalk_helper = DingTalkHelper(ACCESS_TOKEN, SECRET) class DataManager: """数据管理器,负责从数据库和Excel加载数据""" def __init__(self): self.cage_ids = [] self.user_data = [] # 存储用户ID和对应的课题组ID self.user_tokens = [] # 存储用户token def load_user_tokens(self, excel_file='TestData/userinfo.xlsx'): """从Excel加载用户token""" try: df = pd.read_excel(excel_file) self.user_tokens = df['token'].tolist() print(f"成功加载 {len(self.user_tokens)} 个用户token") return True except Exception as e: print(f"加载用户token失败: {e}") return False def load_database_data(self): """从数据库加载笼位ID和用户信息""" try: conn = pymysql.connect(**DB_CONFIG) # 获取笼位ID with conn.cursor() as cursor: cursor.execute("SELECT id FROM l_cage WHERE id IS NOT NULL") self.cage_ids = [row[0] for row in cursor.fetchall()] print(f"成功加载 {len(self.cage_ids)} 个笼位ID") # 获取用户ID和课题组ID with conn.cursor() as cursor: cursor.execute("SELECT id, research_group_ids FROM sys_user WHERE research_group_ids IS NOT NULL AND research_group_ids != ''") for user_id, research_groups in cursor.fetchall(): # 处理多个课题组ID(用逗号分隔的情况) if research_groups and ',' in research_groups: group_ids = [gid.strip() for gid in research_groups.split(',') if gid.strip()] elif research_groups: group_ids = [research_groups.strip()] else: continue self.user_data.append({ 'user_id': user_id, 'research_group_ids': group_ids }) print(f"成功加载 {len(self.user_data)} 个有效用户数据") conn.close() return len(self.cage_ids) > 0 and len(self.user_data) > 0 except Exception as e: print(f"数据库连接失败: {e}") return False def get_random_cage_id(self): """随机获取一个笼位ID""" return random.choice(self.cage_ids) if self.cage_ids else None def get_random_user_data(self): """随机获取一个用户数据(用户ID和课题组ID)""" if not self.user_data: return None, None user = random.choice(self.user_data) user_id = user['user_id'] research_group_id = random.choice(user['research_group_ids']) return user_id, research_group_id def get_random_token(self): """随机获取一个用户token""" return random.choice(self.user_tokens) if self.user_tokens else None # 全局数据管理器 data_manager = DataManager() def create_cage_data(): """创建笼位更新请求数据""" cage_id = data_manager.get_random_cage_id() user_id, research_group_id = data_manager.get_random_user_data() if not cage_id or not user_id or not research_group_id: return None return { "cage": {"id": str(cage_id)}, "researchGroup": {"id": research_group_id}, "user": {"id": user_id} } async def perform_request(session: aiohttp.ClientSession, index: int, max_retries: int = MAX_RETRIES): """执行单个请求""" attempt = 0 last_err = None token = data_manager.get_random_token() if not token: return { 'index': index, 'timestamp': time.time(), 'status_code': 0, 'latency_ms': 0, 'response_size': None, 'error': 'No available token' } # 动态设置token到headers current_headers = headers.copy() current_headers['token'] = token while attempt < max_retries: data = create_cage_data() if not data: return { 'index': index, 'timestamp': time.time(), 'status_code': 0, 'latency_ms': 0, 'response_size': None, 'error': 'No available cage/user data' } start = time.time() try: async with session.post(url, json=data, headers=current_headers) as resp: text = await resp.text() latency_ms = (time.time() - start) * 1000.0 status = resp.status if status == 200: return { 'index': index, 'timestamp': time.time(), 'status_code': status, 'latency_ms': latency_ms, 'response_size': len(text) if text is not None else None, 'error': None } else: last_err = f'status_{status}:{text}' attempt += 1 await asyncio.sleep(min(10, 2 ** attempt)) except Exception as e: latency_ms = (time.time() - start) * 1000.0 last_err = f'{type(e).__name__}:{str(e)}' attempt += 1 await asyncio.sleep(min(10, 2 ** attempt)) # 最终失败 return { 'index': index, 'timestamp': time.time(), 'status_code': 0, 'latency_ms': latency_ms if 'latency_ms' in locals() else 0, 'response_size': None, 'error': last_err } async def worker(name: int, queue: asyncio.Queue, session: aiohttp.ClientSession, gen, pbar, success_counter: dict, failed_list: list, lock: asyncio.Lock): """工作线程""" while True: idx = await queue.get() if idx is None: queue.task_done() break try: res = await perform_request(session, idx) # 记录到报告生成器 if gen: gen.record_result( index=res['index'], timestamp=res['timestamp'], status_code=int(res['status_code']), latency_ms=float(res['latency_ms']), response_size=res.get('response_size'), error=res.get('error') ) async with lock: if res['status_code'] and 200 <= res['status_code'] < 300: success_counter['count'] += 1 else: failed_list.append((res['index'], res.get('error'))) pbar.update(1) except Exception as e: async with lock: failed_list.append((idx, f'Worker异常:{type(e).__name__}:{e}')) pbar.update(1) finally: queue.task_done() async def batch_update_cages(total: int, num_workers: int): """批量更新笼位""" # 动态加载报告生成器模块 gen = None try: import importlib.util # 使用您提供的绝对路径 report_path = 'H:\\项目\\造数脚本\\Util\\stress_test_report_generator.py' if os.path.exists(report_path): spec = importlib.util.spec_from_file_location('report_module', report_path) report_module = importlib.util.module_from_spec(spec) spec.loader.exec_module(report_module) LoadTestReportGenerator = getattr(report_module, 'LoadTestReportGenerator') gen = LoadTestReportGenerator(test_name=f'{apiname}压测任务', report_title='压测详细报告') print(f"成功加载压测报告生成器: {report_path}") else: # 备用:尝试相对路径 script_dir = os.path.dirname(os.path.abspath(__file__)) report_path = os.path.join(script_dir, 'Util', 'stress_test_report_generator.py') if os.path.exists(report_path): spec = importlib.util.spec_from_file_location('report_module', report_path) report_module = importlib.util.module_from_spec(spec) spec.loader.exec_module(report_module) LoadTestReportGenerator = getattr(report_module, 'LoadTestReportGenerator') gen = LoadTestReportGenerator(test_name=f'{apiname}压测任务', report_title='压测详细报告') print(f"成功加载压测报告生成器: {report_path}") else: print('警告: 未找到压测报告生成器,将跳过报告生成') except Exception as e: print(f'加载压测报告生成器失败: {e},将跳过报告生成') # 初始化数据 if not data_manager.load_user_tokens(): print("加载用户token失败,退出压测") return if not data_manager.load_database_data(): print("加载数据库数据失败,退出压测") return timeout = aiohttp.ClientTimeout(total=REQUEST_TIMEOUT) connector = aiohttp.TCPConnector(limit=num_workers, limit_per_host=num_workers, force_close=False) async with aiohttp.ClientSession(timeout=timeout, connector=connector) as session: queue = asyncio.Queue() for i in range(1, total + 1): await queue.put(i) for _ in range(num_workers): await queue.put(None) success_counter = {'count': 0} failed_list = [] lock = asyncio.Lock() with tqdm(total=total, desc='压测进度') as pbar: workers = [ asyncio.create_task(worker(i, queue, session, gen, pbar, success_counter, failed_list, lock)) for i in range(num_workers) ] await asyncio.gather(*workers) # 任务完成,生成报告 if gen: os.makedirs(OUTPUT_DIR, exist_ok=True) outputs = gen.generate_report(OUTPUT_DIR, formats=['html', 'json', 'csv', 'docx']) stats = gen.compute_stats() # 构造钉钉摘要消息 now_str = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') msg = [f'【{apiname} 压测报告】', f'生成时间:{now_str}'] msg.append(f"总请求数:{stats.get('total_requests',0)},成功:{stats.get('success_count',0)},失败:{stats.get('fail_count',0)},成功率:{stats.get('success_rate',0):.2%}") if 'duration_seconds' in stats: msg.append(f"总耗时(s):{stats.get('duration_seconds',0):.2f},平均吞吐(req/s):{stats.get('throughput_rps',0):.2f}") if 'latency_ms' in stats: lat = stats.get('latency_ms', {}) msg.append(f"延迟(ms) - 平均:{lat.get('avg',0):.2f},P90:{lat.get('p90',0):.2f},P95:{lat.get('p95',0):.2f},P99:{lat.get('p99',0):.2f}") msg.append(f"使用用户数:{len(data_manager.user_tokens)}") msg.append(f"可用笼位数:{len(data_manager.cage_ids)}") msg.append(f"有效用户数:{len(data_manager.user_data)}") # 列出生成的报告文件 if outputs: file_list = [] for k, v in outputs.items(): if k == 'charts': for cname, cpath in v.items(): file_list.append(os.path.abspath(cpath)) else: file_list.append(os.path.abspath(v)) msg.append('生成文件:') for p in file_list: msg.append(p) final_msg = '\n'.join(msg) else: # 如果没有报告生成器,计算基础统计 stats = { 'total_requests': total, 'success_count': success_counter['count'], 'fail_count': len(failed_list), 'success_rate': success_counter['count'] / total if total > 0 else 0 } # 基础统计消息 now_str = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') msg = [f'【{apiname} 压测报告】', f'生成时间:{now_str}'] msg.append(f"总请求数:{stats.get('total_requests',0)},成功:{stats.get('success_count',0)},失败:{stats.get('fail_count',0)},成功率:{stats.get('success_rate',0):.2%}") msg.append(f"使用用户数:{len(data_manager.user_tokens)}") msg.append(f"可用笼位数:{len(data_manager.cage_ids)}") msg.append(f"有效用户数:{len(data_manager.user_data)}") final_msg = '\n'.join(msg) # 发送钉钉消息 try: dingtalk_helper.send_message(final_msg) print('钉钉消息发送成功') except Exception as e: print('发送钉钉消息失败:', e) print('\n[压测摘要]') print('成功数:', success_counter['count'], ' 失败数:', len(failed_list)) if failed_list: print('失败示例(最多显示20条):') for idx, err in failed_list[:20]: print(f' #{idx} => {err}') if __name__ == '__main__': # 运行前检查依赖 try: import pymysql import pandas print("依赖检查通过,开始压测...") except ImportError as e: print(f"缺少依赖: {e}") print("请安装: pip install pymysql pandas") exit(1) # 运行压测 asyncio.run(batch_update_cages(TOTAL_REQUESTS, NUM_WORKERS))