import sys
|
import os
|
# 将上一级目录加入模块搜索路径
|
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
import asyncio
|
import aiohttp
|
import time
|
import traceback
|
import datetime
|
import pandas as pd
|
import pymysql
|
import random
|
from tqdm import tqdm
|
from Util.dingtalk_helper import DingTalkHelper
|
|
# 将上一级目录加入模块搜索路径
|
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
# --- 配置 ---
|
ACCESS_TOKEN = '4625f6690acd9347fae5b3a05af598be63e73d604b933a9b3902425b8f136d4d'
|
SECRET = 'SEC3b6937550bd297b5491855f6f40c2ff1b41bc8c495e118ba9848742b1ddf8f19'
|
|
apiname = "pda笼位更新"
|
url = "http://qilu.baoyizn.com:9935/api/base/cage/cage/trainingCage"
|
headers = {
|
"user-agent": "Mozilla/5.0 (Linux; Android 10; DS-MDT301 Build/QP1A.190711.020; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/74.0.3729.186 Mobile Safari/537.36 uni-app Html5Plus/1.0 (Immersed/24.0)",
|
"Content-Type": "application/json; charset=utf-8",
|
"Accept-Encoding": "gzip"
|
}
|
|
NUM_WORKERS = 100
|
TOTAL_REQUESTS = 20000
|
MAX_RETRIES = 3
|
REQUEST_TIMEOUT = 60
|
OUTPUT_DIR = './load_test_report'
|
|
# --- 数据库配置 ---
|
# DB_CONFIG = {
|
# 'host': 'rm-wz97y18i4t16hfsk6qo.mysql.rds.aliyuncs.com',
|
# 'database': 'srps_qilu',
|
# 'user': 'dev',
|
# 'password': 'Hello@112',
|
# 'charset': 'utf8mb4'
|
# }
|
DB_CONFIG = {
|
'host': 'rm-wz90i533p18631e685o.mysql.rds.aliyuncs.com',
|
'database': 'srps_qilu_prod',
|
'user': 'dev',
|
'password': 'Hello@112',
|
'charset': 'utf8mb4'
|
}
|
|
# --- 初始化 ---
|
dingtalk_helper = DingTalkHelper(ACCESS_TOKEN, SECRET)
|
|
class DataManager:
|
"""数据管理器,负责从数据库和Excel加载数据"""
|
|
def __init__(self):
|
self.cage_ids = []
|
self.user_data = [] # 存储用户ID和对应的课题组ID
|
self.user_tokens = [] # 存储用户token
|
|
def load_user_tokens(self, excel_file='TestData/userinfo.xlsx'):
|
"""从Excel加载用户token"""
|
try:
|
df = pd.read_excel(excel_file)
|
self.user_tokens = df['token'].tolist()
|
print(f"成功加载 {len(self.user_tokens)} 个用户token")
|
return True
|
except Exception as e:
|
print(f"加载用户token失败: {e}")
|
return False
|
|
def load_database_data(self):
|
"""从数据库加载笼位ID和用户信息"""
|
try:
|
conn = pymysql.connect(**DB_CONFIG)
|
|
# 获取笼位ID
|
with conn.cursor() as cursor:
|
cursor.execute("SELECT id FROM l_cage WHERE id IS NOT NULL")
|
self.cage_ids = [row[0] for row in cursor.fetchall()]
|
print(f"成功加载 {len(self.cage_ids)} 个笼位ID")
|
|
# 获取用户ID和课题组ID
|
with conn.cursor() as cursor:
|
cursor.execute("SELECT id, research_group_ids FROM sys_user WHERE research_group_ids IS NOT NULL AND research_group_ids != ''")
|
|
for user_id, research_groups in cursor.fetchall():
|
# 处理多个课题组ID(用逗号分隔的情况)
|
if research_groups and ',' in research_groups:
|
group_ids = [gid.strip() for gid in research_groups.split(',') if gid.strip()]
|
elif research_groups:
|
group_ids = [research_groups.strip()]
|
else:
|
continue
|
|
self.user_data.append({
|
'user_id': user_id,
|
'research_group_ids': group_ids
|
})
|
|
print(f"成功加载 {len(self.user_data)} 个有效用户数据")
|
|
conn.close()
|
return len(self.cage_ids) > 0 and len(self.user_data) > 0
|
|
except Exception as e:
|
print(f"数据库连接失败: {e}")
|
return False
|
|
def get_random_cage_id(self):
|
"""随机获取一个笼位ID"""
|
return random.choice(self.cage_ids) if self.cage_ids else None
|
|
def get_random_user_data(self):
|
"""随机获取一个用户数据(用户ID和课题组ID)"""
|
if not self.user_data:
|
return None, None
|
|
user = random.choice(self.user_data)
|
user_id = user['user_id']
|
research_group_id = random.choice(user['research_group_ids'])
|
|
return user_id, research_group_id
|
|
def get_random_token(self):
|
"""随机获取一个用户token"""
|
return random.choice(self.user_tokens) if self.user_tokens else None
|
|
# 全局数据管理器
|
data_manager = DataManager()
|
|
def create_cage_data():
|
"""创建笼位更新请求数据"""
|
cage_id = data_manager.get_random_cage_id()
|
user_id, research_group_id = data_manager.get_random_user_data()
|
|
if not cage_id or not user_id or not research_group_id:
|
return None
|
|
return {
|
"cage": {"id": str(cage_id)},
|
"researchGroup": {"id": research_group_id},
|
"user": {"id": user_id}
|
}
|
|
async def perform_request(session: aiohttp.ClientSession, index: int, max_retries: int = MAX_RETRIES):
|
"""执行单个请求"""
|
attempt = 0
|
last_err = None
|
token = data_manager.get_random_token()
|
|
if not token:
|
return {
|
'index': index,
|
'timestamp': time.time(),
|
'status_code': 0,
|
'latency_ms': 0,
|
'response_size': None,
|
'error': 'No available token'
|
}
|
|
# 动态设置token到headers
|
current_headers = headers.copy()
|
current_headers['token'] = token
|
|
while attempt < max_retries:
|
data = create_cage_data()
|
if not data:
|
return {
|
'index': index,
|
'timestamp': time.time(),
|
'status_code': 0,
|
'latency_ms': 0,
|
'response_size': None,
|
'error': 'No available cage/user data'
|
}
|
|
start = time.time()
|
try:
|
async with session.post(url, json=data, headers=current_headers) as resp:
|
text = await resp.text()
|
latency_ms = (time.time() - start) * 1000.0
|
status = resp.status
|
if status == 200:
|
return {
|
'index': index,
|
'timestamp': time.time(),
|
'status_code': status,
|
'latency_ms': latency_ms,
|
'response_size': len(text) if text is not None else None,
|
'error': None
|
}
|
else:
|
last_err = f'status_{status}:{text}'
|
attempt += 1
|
await asyncio.sleep(min(10, 2 ** attempt))
|
except Exception as e:
|
latency_ms = (time.time() - start) * 1000.0
|
last_err = f'{type(e).__name__}:{str(e)}'
|
attempt += 1
|
await asyncio.sleep(min(10, 2 ** attempt))
|
|
# 最终失败
|
return {
|
'index': index,
|
'timestamp': time.time(),
|
'status_code': 0,
|
'latency_ms': latency_ms if 'latency_ms' in locals() else 0,
|
'response_size': None,
|
'error': last_err
|
}
|
|
async def worker(name: int, queue: asyncio.Queue, session: aiohttp.ClientSession, gen, pbar, success_counter: dict, failed_list: list, lock: asyncio.Lock):
|
"""工作线程"""
|
while True:
|
idx = await queue.get()
|
if idx is None:
|
queue.task_done()
|
break
|
try:
|
res = await perform_request(session, idx)
|
# 记录到报告生成器
|
if gen:
|
gen.record_result(
|
index=res['index'],
|
timestamp=res['timestamp'],
|
status_code=int(res['status_code']),
|
latency_ms=float(res['latency_ms']),
|
response_size=res.get('response_size'),
|
error=res.get('error')
|
)
|
async with lock:
|
if res['status_code'] and 200 <= res['status_code'] < 300:
|
success_counter['count'] += 1
|
else:
|
failed_list.append((res['index'], res.get('error')))
|
pbar.update(1)
|
except Exception as e:
|
async with lock:
|
failed_list.append((idx, f'Worker异常:{type(e).__name__}:{e}'))
|
pbar.update(1)
|
finally:
|
queue.task_done()
|
|
async def batch_update_cages(total: int, num_workers: int):
|
"""批量更新笼位"""
|
# 动态加载报告生成器模块
|
gen = None
|
try:
|
import importlib.util
|
|
# 使用您提供的绝对路径
|
report_path = 'H:\\项目\\造数脚本\\Util\\stress_test_report_generator.py'
|
|
if os.path.exists(report_path):
|
spec = importlib.util.spec_from_file_location('report_module', report_path)
|
report_module = importlib.util.module_from_spec(spec)
|
spec.loader.exec_module(report_module)
|
LoadTestReportGenerator = getattr(report_module, 'LoadTestReportGenerator')
|
gen = LoadTestReportGenerator(test_name=f'{apiname}压测任务', report_title='压测详细报告')
|
print(f"成功加载压测报告生成器: {report_path}")
|
else:
|
# 备用:尝试相对路径
|
script_dir = os.path.dirname(os.path.abspath(__file__))
|
report_path = os.path.join(script_dir, 'Util', 'stress_test_report_generator.py')
|
if os.path.exists(report_path):
|
spec = importlib.util.spec_from_file_location('report_module', report_path)
|
report_module = importlib.util.module_from_spec(spec)
|
spec.loader.exec_module(report_module)
|
LoadTestReportGenerator = getattr(report_module, 'LoadTestReportGenerator')
|
gen = LoadTestReportGenerator(test_name=f'{apiname}压测任务', report_title='压测详细报告')
|
print(f"成功加载压测报告生成器: {report_path}")
|
else:
|
print('警告: 未找到压测报告生成器,将跳过报告生成')
|
except Exception as e:
|
print(f'加载压测报告生成器失败: {e},将跳过报告生成')
|
|
# 初始化数据
|
if not data_manager.load_user_tokens():
|
print("加载用户token失败,退出压测")
|
return
|
|
if not data_manager.load_database_data():
|
print("加载数据库数据失败,退出压测")
|
return
|
|
timeout = aiohttp.ClientTimeout(total=REQUEST_TIMEOUT)
|
connector = aiohttp.TCPConnector(limit=num_workers, limit_per_host=num_workers, force_close=False)
|
|
async with aiohttp.ClientSession(timeout=timeout, connector=connector) as session:
|
queue = asyncio.Queue()
|
for i in range(1, total + 1):
|
await queue.put(i)
|
for _ in range(num_workers):
|
await queue.put(None)
|
|
success_counter = {'count': 0}
|
failed_list = []
|
lock = asyncio.Lock()
|
|
with tqdm(total=total, desc='压测进度') as pbar:
|
workers = [
|
asyncio.create_task(worker(i, queue, session, gen, pbar, success_counter, failed_list, lock))
|
for i in range(num_workers)
|
]
|
await asyncio.gather(*workers)
|
|
# 任务完成,生成报告
|
if gen:
|
os.makedirs(OUTPUT_DIR, exist_ok=True)
|
outputs = gen.generate_report(OUTPUT_DIR, formats=['html', 'json', 'csv', 'docx'])
|
stats = gen.compute_stats()
|
|
# 构造钉钉摘要消息
|
now_str = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
msg = [f'【{apiname} 压测报告】', f'生成时间:{now_str}']
|
msg.append(f"总请求数:{stats.get('total_requests',0)},成功:{stats.get('success_count',0)},失败:{stats.get('fail_count',0)},成功率:{stats.get('success_rate',0):.2%}")
|
|
if 'duration_seconds' in stats:
|
msg.append(f"总耗时(s):{stats.get('duration_seconds',0):.2f},平均吞吐(req/s):{stats.get('throughput_rps',0):.2f}")
|
|
if 'latency_ms' in stats:
|
lat = stats.get('latency_ms', {})
|
msg.append(f"延迟(ms) - 平均:{lat.get('avg',0):.2f},P90:{lat.get('p90',0):.2f},P95:{lat.get('p95',0):.2f},P99:{lat.get('p99',0):.2f}")
|
|
msg.append(f"使用用户数:{len(data_manager.user_tokens)}")
|
msg.append(f"可用笼位数:{len(data_manager.cage_ids)}")
|
msg.append(f"有效用户数:{len(data_manager.user_data)}")
|
|
# 列出生成的报告文件
|
if outputs:
|
file_list = []
|
for k, v in outputs.items():
|
if k == 'charts':
|
for cname, cpath in v.items():
|
file_list.append(os.path.abspath(cpath))
|
else:
|
file_list.append(os.path.abspath(v))
|
msg.append('生成文件:')
|
for p in file_list:
|
msg.append(p)
|
|
final_msg = '\n'.join(msg)
|
else:
|
# 如果没有报告生成器,计算基础统计
|
stats = {
|
'total_requests': total,
|
'success_count': success_counter['count'],
|
'fail_count': len(failed_list),
|
'success_rate': success_counter['count'] / total if total > 0 else 0
|
}
|
|
# 基础统计消息
|
now_str = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
msg = [f'【{apiname} 压测报告】', f'生成时间:{now_str}']
|
msg.append(f"总请求数:{stats.get('total_requests',0)},成功:{stats.get('success_count',0)},失败:{stats.get('fail_count',0)},成功率:{stats.get('success_rate',0):.2%}")
|
msg.append(f"使用用户数:{len(data_manager.user_tokens)}")
|
msg.append(f"可用笼位数:{len(data_manager.cage_ids)}")
|
msg.append(f"有效用户数:{len(data_manager.user_data)}")
|
final_msg = '\n'.join(msg)
|
|
# 发送钉钉消息
|
try:
|
dingtalk_helper.send_message(final_msg)
|
print('钉钉消息发送成功')
|
except Exception as e:
|
print('发送钉钉消息失败:', e)
|
|
print('\n[压测摘要]')
|
print('成功数:', success_counter['count'], ' 失败数:', len(failed_list))
|
if failed_list:
|
print('失败示例(最多显示20条):')
|
for idx, err in failed_list[:20]:
|
print(f' #{idx} => {err}')
|
|
if __name__ == '__main__':
|
# 运行前检查依赖
|
try:
|
import pymysql
|
import pandas
|
print("依赖检查通过,开始压测...")
|
except ImportError as e:
|
print(f"缺少依赖: {e}")
|
print("请安装: pip install pymysql pandas")
|
exit(1)
|
|
# 运行压测
|
asyncio.run(batch_update_cages(TOTAL_REQUESTS, NUM_WORKERS))
|