| | |
| | | """ |
| | | import sys |
| | | import os |
| | | # 将上一级目录加入模块搜索路径 |
| | | sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) |
| | | import random |
| | | import pymysql |
| | | import asyncio |
| | | import aiohttp |
| | | import time |
| | | import traceback |
| | | import datetime |
| | | from tqdm import tqdm |
| | | from Util.random_util import RandomUtil |
| | | from Util.dingtalk_helper import DingTalkHelper |
| | | |
| | | def get_parent_directory(file_path, levels=1): |
| | | """获取指定层级的父目录""" |
| | | path = os.path.abspath(file_path) |
| | | for _ in range(levels): |
| | | path = os.path.dirname(path) |
| | | return path |
| | | parent_dir = get_parent_directory(__file__, 5) # 获取上五级目录 |
| | | sys.path.append(parent_dir) |
| | | |
| | | from 测试组.脚本.造数脚本2.Util.random_util import RandomUtil |
| | | from 测试组.脚本.造数脚本2.Util.dingtalk_helper import DingTalkHelper |
| | | |
| | | |
| | | # --- 配置 --- |
| | |
| | | } |
| | | |
| | | NUM_WORKERS = 100 |
| | | TOTAL_REQUESTS = 10 |
| | | TOTAL_REQUESTS = 1000 |
| | | MAX_RETRIES = 3 |
| | | REQUEST_TIMEOUT = 60 |
| | | OUTPUT_DIR = './load_test_report' |
| | | |
| | | # --- 数据库配置 --- |
| | | DB_CONFIG = { |
| | | 'host': '192.168.6.190', |
| | | 'port': 3306, |
| | | 'user': 'dev', |
| | | 'password': 'Hello@112', |
| | | 'database': 'srps_ecnu', |
| | | 'charset': 'utf8mb4', |
| | | 'cursorclass': pymysql.cursors.DictCursor |
| | | } |
| | | |
| | | class DataManager: |
| | | """数据管理器,负责从数据库加载用户和课题组信息""" |
| | | |
| | | def __init__(self): |
| | | self.user_data = [] # 存储用户ID、用户名、课题组ID和课题组名称 |
| | | |
| | | def load_database_data(self): |
| | | """从数据库加载用户和课题组信息""" |
| | | try: |
| | | conn = pymysql.connect(**DB_CONFIG) |
| | | |
| | | # 获取用户ID、用户名和对应的课题组ID |
| | | with conn.cursor() as cursor: |
| | | cursor.execute(""" |
| | | SELECT |
| | | su.id as user_id, |
| | | su.name as user_name, |
| | | su.research_group_ids as research_group_ids |
| | | FROM sys_user su |
| | | WHERE su.research_group_ids IS NOT NULL AND su.research_group_ids != '' |
| | | """) |
| | | |
| | | results = cursor.fetchall() |
| | | |
| | | # 处理查询结果 |
| | | for row in results: |
| | | # 处理多个课题组ID(用逗号分隔的情况) |
| | | if row['research_group_ids'] and ',' in row['research_group_ids']: |
| | | group_ids = [gid.strip() for gid in row['research_group_ids'].split(',') if gid.strip()] |
| | | elif row['research_group_ids']: |
| | | group_ids = [row['research_group_ids'].strip()] |
| | | else: |
| | | continue |
| | | |
| | | # 获取所有相关的课题组信息 |
| | | group_info_list = [] |
| | | for group_id in group_ids: |
| | | # 查找对应的课题组名称,使用正确的表名 l_research_group |
| | | cursor.execute("SELECT id, name FROM l_research_group WHERE id = %s", (group_id,)) |
| | | group_info = cursor.fetchone() |
| | | if group_info: |
| | | group_info_list.append({ |
| | | 'group_id': group_info['id'], |
| | | 'group_name': group_info['name'] |
| | | }) |
| | | |
| | | if group_info_list: |
| | | self.user_data.append({ |
| | | 'user_id': row['user_id'], |
| | | 'user_name': row['user_name'], |
| | | 'groups': group_info_list |
| | | }) |
| | | |
| | | print(f"成功加载 {len(self.user_data)} 个有效用户数据") |
| | | conn.close() |
| | | return len(self.user_data) > 0 |
| | | |
| | | except Exception as e: |
| | | print(f"数据库连接失败: {e}") |
| | | return False |
| | | |
| | | def get_random_user_and_group(self): |
| | | """随机获取一个用户和对应的课题组信息""" |
| | | if not self.user_data: |
| | | return None, None, None, None |
| | | |
| | | # 随机选择一个用户 |
| | | user = random.choice(self.user_data) |
| | | user_id = user['user_id'] |
| | | user_name = user['user_name'] |
| | | |
| | | # 随机选择一个课题组 |
| | | group = random.choice(user['groups']) |
| | | group_id = group['group_id'] |
| | | group_name = group['group_name'] |
| | | |
| | | return user_id, user_name, group_id, group_name |
| | | |
| | | # 全局数据管理器 |
| | | data_manager = DataManager() |
| | | |
| | | # --- 初始化 --- |
| | | dingtalk_helper = DingTalkHelper(ACCESS_TOKEN, SECRET) |
| | |
| | | def create_animal_data(idx: int): |
| | | random_code = RandomUtil.generate_random_number_string(0, 10000) |
| | | random_code_zt = RandomUtil.generate_random_number_string(1, 12) |
| | | random_code_totalAmount = RandomUtil.generate_random_number_string(100, 10000) |
| | | random_date = RandomUtil.generate_random_date("2023-01-01", "2025-10-16") |
| | | |
| | | # 从数据管理器获取随机的用户和课题组信息 |
| | | user_id, user_name, group_id, group_name = data_manager.get_random_user_and_group() |
| | | |
| | | if not user_id or not user_name or not group_id or not group_name: |
| | | return None |
| | | |
| | | return { |
| | | "id": "", |
| | | "researchGroupId": "1951155355026763778", |
| | | "researchGroupName": "hyb课题组", |
| | | "userId": "1951155353399373826", |
| | | "userName": "hyb课题负责人", |
| | | "researchGroupId": group_id, |
| | | "researchGroupName": group_name, |
| | | "userId": user_id, |
| | | "userName": user_name, |
| | | "chargeDate": random_date, |
| | | "name": f"服务名称{random_code}", |
| | | "totalAmount": 9.99, |
| | | "totalAmount": random_code_totalAmount, |
| | | "status": random_code_zt, |
| | | "attachment": "/userfiles/1588133301094375425/程序附件/finance/servicecost/financeServiceCost/2026/1/cs(2).jpg", |
| | | "remarks": f"备注备注备注备注{random_code}" |
| | |
| | | last_err = None |
| | | while attempt < max_retries: |
| | | data = create_animal_data(index) |
| | | if not data: |
| | | return { |
| | | 'index': index, |
| | | 'timestamp': time.time(), |
| | | 'status_code': 0, |
| | | 'latency_ms': 0, |
| | | 'response_size': None, |
| | | 'error': 'No available user/group data' |
| | | } |
| | | start = time.time() |
| | | try: |
| | | async with session.post(url, json=data, headers=headers) as resp: |
| | |
| | | |
| | | |
| | | async def batch_create_animals(total: int, num_workers: int): |
| | | # 加载数据库数据 |
| | | if not data_manager.load_database_data(): |
| | | print("加载数据库数据失败,退出压测") |
| | | return |
| | | |
| | | # 动态加载报告生成器模块(支持中文文件名) |
| | | gen = None |
| | | try: |
| | | import importlib.util |
| | | script_dir = os.path.dirname(os.path.abspath(__file__)) |
| | | report_path = os.path.join(script_dir, 'H:\\项目\\造数脚本\\Util\\stress_test_report_generator.py') |
| | | report_path = os.path.join(script_dir, r'H:\\项目\\archive\\测试组\\脚本\\造数脚本2\\Util\\stress_test_report_generator.py') |
| | | if os.path.exists(report_path): |
| | | spec = importlib.util.spec_from_file_location('report_module', report_path) |
| | | report_module = importlib.util.module_from_spec(spec) |