| | |
| | | """ |
| | | import sys |
| | | import os |
| | | # 将上一级目录加入模块搜索路径 |
| | | sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) |
| | | import asyncio |
| | | import aiohttp |
| | | import time |
| | | import traceback |
| | | import datetime |
| | | from tqdm import tqdm |
| | | from Util.random_util import RandomUtil |
| | | from Util.dingtalk_helper import DingTalkHelper |
| | | import pymysql |
| | | import pymysql.cursors |
| | | import random |
| | | def get_parent_directory(file_path, levels=1): |
| | | """获取指定层级的父目录""" |
| | | path = os.path.abspath(file_path) |
| | | for _ in range(levels): |
| | | path = os.path.dirname(path) |
| | | return path |
| | | parent_dir = get_parent_directory(__file__, 5) # 获取上五级目录 |
| | | sys.path.append(parent_dir) |
| | | from 测试组.脚本.造数脚本2.Util import TokenValidator, DingTalkHelper, RandomUtil, RequestRecord, LoadTestReportGenerator |
| | | |
| | | |
| | | # --- 配置 --- |
| | |
| | | 'charset': 'utf8mb4', |
| | | 'cursorclass': pymysql.cursors.DictCursor |
| | | } |
| | | # 账号密码配置 |
| | | username = "gly" |
| | | password = "Baoyi@1341" |
| | | # 创建获取token实例 |
| | | token_validator = TokenValidator() |
| | | |
| | | domain = "http://192.168.6.190:5561" |
| | | token = token_validator.get_token(domain, username, password) |
| | | |
| | | apiname = "入驻笼位" |
| | | url = "http://192.168.6.190:5561/api/base/cage/cage/enterCage" |
| | | url = f"{domain}/api/base/cage/cage/enterCage" |
| | | headers = { |
| | | "token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjE3NjcyMzc1MDgsInVzZXJuYW1lIjoiZ2x5In0.2N0rQ7Oy1B-Wg_fnywOrcDelYnCe5JOpd7-vwu_2H6U", |
| | | "token": token, |
| | | "Content-Type": "application/json" |
| | | } |
| | | |
| | | NUM_WORKERS = 100 |
| | | TOTAL_REQUESTS = 244481 |
| | | TOTAL_REQUESTS = 1 |
| | | MAX_RETRIES = 3 |
| | | REQUEST_TIMEOUT = 60 |
| | | OUTPUT_DIR = './load_test_report' |
| | |
| | | |
| | | # 全局变量,存储从数据库获取的笼位列表 |
| | | cage_list = [] |
| | | |
| | | |
| | | class DataManager: |
| | | """数据管理器,负责从数据库加载用户和课题组信息""" |
| | | |
| | | def __init__(self): |
| | | self.user_data = [] # 存储用户ID、用户名、课题组ID和课题组名称 |
| | | |
| | | def load_user_and_group_data(self): |
| | | """从数据库加载用户和课题组信息""" |
| | | try: |
| | | conn = pymysql.connect(**DB_CONFIG) |
| | | |
| | | # 获取用户ID、用户名和对应的课题组ID |
| | | with conn.cursor() as cursor: |
| | | cursor.execute(""" |
| | | SELECT |
| | | su.id as user_id, |
| | | su.name as user_name, |
| | | su.research_group_ids as research_group_ids |
| | | FROM sys_user su |
| | | WHERE su.research_group_ids IS NOT NULL AND su.research_group_ids != '' |
| | | """) |
| | | |
| | | results = cursor.fetchall() |
| | | |
| | | # 处理查询结果 |
| | | for row in results: |
| | | # 处理多个课题组ID(用逗号分隔的情况) |
| | | if row['research_group_ids'] and ',' in row['research_group_ids']: |
| | | group_ids = [gid.strip() for gid in row['research_group_ids'].split(',') if gid.strip()] |
| | | elif row['research_group_ids']: |
| | | group_ids = [row['research_group_ids'].strip()] |
| | | else: |
| | | continue |
| | | |
| | | # 获取所有相关的课题组信息 |
| | | group_info_list = [] |
| | | for group_id in group_ids: |
| | | # 查找对应的课题组名称,使用正确的表名 l_research_group |
| | | cursor.execute("SELECT id, name FROM l_research_group WHERE id = %s", (group_id,)) |
| | | group_info = cursor.fetchone() |
| | | if group_info: |
| | | group_info_list.append({ |
| | | 'group_id': group_info['id'], |
| | | 'group_name': group_info['name'] |
| | | }) |
| | | |
| | | if group_info_list: |
| | | self.user_data.append({ |
| | | 'user_id': row['user_id'], |
| | | 'user_name': row['user_name'], |
| | | 'groups': group_info_list |
| | | }) |
| | | |
| | | print(f"成功加载 {len(self.user_data)} 个有效用户数据") |
| | | conn.close() |
| | | return len(self.user_data) > 0 |
| | | |
| | | except Exception as e: |
| | | print(f"数据库连接失败: {e}") |
| | | return False |
| | | |
| | | def get_random_user_and_group(self): |
| | | """随机获取一个用户和对应的课题组信息""" |
| | | if not self.user_data: |
| | | return None, None, None, None |
| | | |
| | | # 随机选择一个用户 |
| | | user = random.choice(self.user_data) |
| | | user_id = user['user_id'] |
| | | user_name = user['user_name'] |
| | | |
| | | # 随机选择一个课题组 |
| | | group = random.choice(user['groups']) |
| | | group_id = group['group_id'] |
| | | group_name = group['group_name'] |
| | | |
| | | return user_id, user_name, group_id, group_name |
| | | |
| | | |
| | | # 创建数据管理器实例 |
| | | data_manager = DataManager() |
| | | |
| | | |
| | | |
| | | def fetch_cages_from_db(): |
| | |
| | | |
| | | |
| | | def create_animal_data(idx: int): |
| | | """创建动物数据,使用动态获取的笼位信息""" |
| | | """创建动物数据,使用动态获取的笼位、用户和课题组信息""" |
| | | random_code = RandomUtil.generate_random_number_string(0, 999999999) |
| | | random_femaleNum = RandomUtil.generate_random_number_string(1, 5) |
| | | random_maleNum = RandomUtil.generate_random_number_string(1, 5) |
| | |
| | | |
| | | # 获取笼位信息 |
| | | cage_info = get_random_cage(idx) |
| | | |
| | | # 从数据管理器获取随机的用户和课题组信息 |
| | | user_id, user_name, group_id, group_name = data_manager.get_random_user_and_group() |
| | | |
| | | # 如果无法获取用户和课题组信息,使用默认值 |
| | | if not user_id or not group_id: |
| | | user_id = "1995379969088860162" # 默认用户ID |
| | | group_id = "1995379941721026561" # 默认课题组ID |
| | | group_name = "hyb课题组2" # 默认课题组名称 |
| | | |
| | | return { |
| | | "enterCageList": [ |
| | |
| | | }, |
| | | "cageStatus": "2", # 笼具状态 |
| | | "user": { |
| | | "id": "1995379969088860162" # 操作用户ID |
| | | "id": user_id # 用户ID |
| | | }, |
| | | "researchGroup": { |
| | | "id": "1995379941721026561", |
| | | "name": "hyb课题组2" # 课题组信息 |
| | | "id": group_id, |
| | | "name": group_name # 课题组信息 |
| | | }, |
| | | "femaleNum": random_femaleNum, # 雌性数量 |
| | | "maleNum": random_maleNum, # 雄性数量 |
| | |
| | | print("错误: 无法获取笼位信息,压测终止") |
| | | return |
| | | |
| | | # 从数据库获取用户和课题组信息 |
| | | print("正在从数据库获取用户和课题组信息...") |
| | | if not data_manager.load_user_and_group_data(): |
| | | print("警告: 无法获取用户和课题组信息,将使用默认值") |
| | | |
| | | print(f"获取到 {len(cage_list)} 个笼位,将进行{apiname}压测") |
| | | |
| | | # 动态加载报告生成器模块(支持中文文件名) |
| | |
| | | try: |
| | | import importlib.util |
| | | script_dir = os.path.dirname(os.path.abspath(__file__)) |
| | | report_path = os.path.join(script_dir, 'H:\\项目\\造数脚本\\Util\\stress_test_report_generator.py') |
| | | report_path = os.path.join(script_dir, 'H:\\项目\\archive\\测试组\\脚本\\造数脚本2\\Util\\stress_test_report_generator.py') |
| | | if os.path.exists(report_path): |
| | | spec = importlib.util.spec_from_file_location('report_module', report_path) |
| | | report_module = importlib.util.module_from_spec(spec) |