From 7657e1b2fa251a2ea372710ad75cb395a3c0e374 Mon Sep 17 00:00:00 2001
From: hyb <kk_huangyangbo@163.com>
Date: Fri, 30 Jan 2026 07:15:55 +0000
Subject: [PATCH] feat: 新增将两种加密方式合并一个脚本、新增批量修改邮箱功能
---
测试组/脚本/造数脚本2/华东师范大学二期/新建技术服务费.py | 156 ++++++++++++++++++++++++++++++++++++++++++++++++----
1 files changed, 144 insertions(+), 12 deletions(-)
diff --git "a/\346\265\213\350\257\225\347\273\204/\350\204\232\346\234\254/\351\200\240\346\225\260\350\204\232\346\234\2542/\345\215\216\344\270\234\345\270\210\350\214\203\345\244\247\345\255\246\344\272\214\346\234\237/\346\226\260\345\273\272\346\212\200\346\234\257\346\234\215\345\212\241\350\264\271.py" "b/\346\265\213\350\257\225\347\273\204/\350\204\232\346\234\254/\351\200\240\346\225\260\350\204\232\346\234\2542/\345\215\216\344\270\234\345\270\210\350\214\203\345\244\247\345\255\246\344\272\214\346\234\237/\346\226\260\345\273\272\346\212\200\346\234\257\346\234\215\345\212\241\350\264\271.py"
index 537eadf..2874016 100644
--- "a/\346\265\213\350\257\225\347\273\204/\350\204\232\346\234\254/\351\200\240\346\225\260\350\204\232\346\234\2542/\345\215\216\344\270\234\345\270\210\350\214\203\345\244\247\345\255\246\344\272\214\346\234\237/\346\226\260\345\273\272\346\212\200\346\234\257\346\234\215\345\212\241\350\264\271.py"
+++ "b/\346\265\213\350\257\225\347\273\204/\350\204\232\346\234\254/\351\200\240\346\225\260\350\204\232\346\234\2542/\345\215\216\344\270\234\345\270\210\350\214\203\345\244\247\345\255\246\344\272\214\346\234\237/\346\226\260\345\273\272\346\212\200\346\234\257\346\234\215\345\212\241\350\264\271.py"
@@ -10,26 +10,44 @@
"""
import sys
import os
-# 将上一级目录加入模块搜索路径
-sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+import random
+import pymysql
import asyncio
import aiohttp
import time
import traceback
import datetime
from tqdm import tqdm
-from Util.random_util import RandomUtil
-from Util.dingtalk_helper import DingTalkHelper
+
+
+def get_parent_directory(file_path, levels=1):
+ """获取指定层级的父目录"""
+ path = os.path.abspath(file_path)
+ for _ in range(levels):
+ path = os.path.dirname(path)
+ return path
+parent_dir = get_parent_directory(__file__, 5) # 获取上五级目录
+sys.path.append(parent_dir)
+from 测试组.脚本.造数脚本2.Util import TokenValidator, DingTalkHelper, RandomUtil, RequestRecord, LoadTestReportGenerator
# --- 配置 ---
ACCESS_TOKEN = '4625f6690acd9347fae5b3a05af598be63e73d604b933a9b3902425b8f136d4d'
SECRET = 'SEC3b6937550bd297b5491855f6f40c2ff1b41bc8c495e118ba9848742b1ddf8f19'
+# 账号密码配置
+username = "gly"
+password = "Baoyi@1341"
+# 创建获取token实例
+token_validator = TokenValidator()
+
+domain = "http://192.168.6.190:5561"
+token = token_validator.get_token(domain, username, password)
+
apiname = "新建技术服务费"
-url = "http://192.168.6.190:5561/api/finance/servicecost/financeServiceCost/save"
+url = f"{domain}/api/finance/servicecost/financeServiceCost/save"
headers = {
- "token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjE3Njc4NTg5MjcsInVzZXJuYW1lIjoiZ2x5In0.l2ulcY6VbJzYRgzlxwxrBsqQ9xrVnxaiUq703e_igng",
+ "token": token,
"Content-Type": "application/json"
}
@@ -38,6 +56,98 @@
MAX_RETRIES = 3
REQUEST_TIMEOUT = 60
OUTPUT_DIR = './load_test_report'
+
+# --- 数据库配置 ---
+DB_CONFIG = {
+ 'host': '192.168.6.190',
+ 'port': 3306,
+ 'user': 'dev',
+ 'password': 'Hello@112',
+ 'database': 'srps_ecnu',
+ 'charset': 'utf8mb4',
+ 'cursorclass': pymysql.cursors.DictCursor
+}
+
+class DataManager:
+ """数据管理器,负责从数据库加载用户和课题组信息"""
+
+ def __init__(self):
+ self.user_data = [] # 存储用户ID、用户名、课题组ID和课题组名称
+
+ def load_database_data(self):
+ """从数据库加载用户和课题组信息"""
+ try:
+ conn = pymysql.connect(**DB_CONFIG)
+
+ # 获取用户ID、用户名和对应的课题组ID
+ with conn.cursor() as cursor:
+ cursor.execute("""
+ SELECT
+ su.id as user_id,
+ su.name as user_name,
+ su.research_group_ids as research_group_ids
+ FROM sys_user su
+ WHERE su.research_group_ids IS NOT NULL AND su.research_group_ids != ''
+ """)
+
+ results = cursor.fetchall()
+
+ # 处理查询结果
+ for row in results:
+ # 处理多个课题组ID(用逗号分隔的情况)
+ if row['research_group_ids'] and ',' in row['research_group_ids']:
+ group_ids = [gid.strip() for gid in row['research_group_ids'].split(',') if gid.strip()]
+ elif row['research_group_ids']:
+ group_ids = [row['research_group_ids'].strip()]
+ else:
+ continue
+
+ # 获取所有相关的课题组信息
+ group_info_list = []
+ for group_id in group_ids:
+ # 查找对应的课题组名称,使用正确的表名 l_research_group
+ cursor.execute("SELECT id, name FROM l_research_group WHERE id = %s", (group_id,))
+ group_info = cursor.fetchone()
+ if group_info:
+ group_info_list.append({
+ 'group_id': group_info['id'],
+ 'group_name': group_info['name']
+ })
+
+ if group_info_list:
+ self.user_data.append({
+ 'user_id': row['user_id'],
+ 'user_name': row['user_name'],
+ 'groups': group_info_list
+ })
+
+ print(f"成功加载 {len(self.user_data)} 个有效用户数据")
+ conn.close()
+ return len(self.user_data) > 0
+
+ except Exception as e:
+ print(f"数据库连接失败: {e}")
+ return False
+
+ def get_random_user_and_group(self):
+ """随机获取一个用户和对应的课题组信息"""
+ if not self.user_data:
+ return None, None, None, None
+
+ # 随机选择一个用户
+ user = random.choice(self.user_data)
+ user_id = user['user_id']
+ user_name = user['user_name']
+
+ # 随机选择一个课题组
+ group = random.choice(user['groups'])
+ group_id = group['group_id']
+ group_name = group['group_name']
+
+ return user_id, user_name, group_id, group_name
+
+# 全局数据管理器
+data_manager = DataManager()
# --- 初始化 ---
dingtalk_helper = DingTalkHelper(ACCESS_TOKEN, SECRET)
@@ -49,16 +159,24 @@
def create_animal_data(idx: int):
random_code = RandomUtil.generate_random_number_string(0, 10000)
random_code_zt = RandomUtil.generate_random_number_string(1, 12)
+ random_code_totalAmount = RandomUtil.generate_random_number_string(100, 10000)
random_date = RandomUtil.generate_random_date("2023-01-01", "2025-10-16")
+
+ # 从数据管理器获取随机的用户和课题组信息
+ user_id, user_name, group_id, group_name = data_manager.get_random_user_and_group()
+
+ if not user_id or not user_name or not group_id or not group_name:
+ return None
+
return {
"id": "",
- "researchGroupId": "1951155355026763778",
- "researchGroupName": "hyb课题组",
- "userId": "1951155353399373826",
- "userName": "hyb课题负责人",
+ "researchGroupId": group_id,
+ "researchGroupName": group_name,
+ "userId": user_id,
+ "userName": user_name,
"chargeDate": random_date,
"name": f"服务名称{random_code}",
- "totalAmount": 9.99,
+ "totalAmount": random_code_totalAmount,
"status": random_code_zt,
"attachment": "/userfiles/1588133301094375425/程序附件/finance/servicecost/financeServiceCost/2026/1/cs(2).jpg",
"remarks": f"备注备注备注备注{random_code}"
@@ -70,6 +188,15 @@
last_err = None
while attempt < max_retries:
data = create_animal_data(index)
+ if not data:
+ return {
+ 'index': index,
+ 'timestamp': time.time(),
+ 'status_code': 0,
+ 'latency_ms': 0,
+ 'response_size': None,
+ 'error': 'No available user/group data'
+ }
start = time.time()
try:
async with session.post(url, json=data, headers=headers) as resp:
@@ -137,12 +264,17 @@
async def batch_create_animals(total: int, num_workers: int):
+ # 加载数据库数据
+ if not data_manager.load_database_data():
+ print("加载数据库数据失败,退出压测")
+ return
+
# 动态加载报告生成器模块(支持中文文件名)
gen = None
try:
import importlib.util
script_dir = os.path.dirname(os.path.abspath(__file__))
- report_path = os.path.join(script_dir, 'H:\\项目\\造数脚本\\Util\\stress_test_report_generator.py')
+ report_path = os.path.join(script_dir, r'H:\\项目\\archive\\测试组\\脚本\\造数脚本2\\Util\\stress_test_report_generator.py')
if os.path.exists(report_path):
spec = importlib.util.spec_from_file_location('report_module', report_path)
report_module = importlib.util.module_from_spec(spec)
--
Gitblit v1.9.1