From efcfbf596bfd9f5f623e2e05a92e140fd27f1838 Mon Sep 17 00:00:00 2001
From: hyb <kk_huangyangbo@163.com>
Date: Thu, 08 Jan 2026 08:57:48 +0000
Subject: [PATCH] 新增认证校验工具类,可实现将明文加密后,直接获取token,避免认证过期 已更新华东师范大学二期的压测脚本实现自动获取最新token功能,其他脚本后续更新

---
 测试组/脚本/造数脚本2/华东师范大学二期/并发入驻笼位.py |  136 +++++++++++++++++++++++++++++++++++++++++----
 1 files changed, 124 insertions(+), 12 deletions(-)

diff --git "a/\346\265\213\350\257\225\347\273\204/\350\204\232\346\234\254/\351\200\240\346\225\260\350\204\232\346\234\2542/\345\215\216\344\270\234\345\270\210\350\214\203\345\244\247\345\255\246\344\272\214\346\234\237/\345\271\266\345\217\221\345\205\245\351\251\273\347\254\274\344\275\215.py" "b/\346\265\213\350\257\225\347\273\204/\350\204\232\346\234\254/\351\200\240\346\225\260\350\204\232\346\234\2542/\345\215\216\344\270\234\345\270\210\350\214\203\345\244\247\345\255\246\344\272\214\346\234\237/\345\271\266\345\217\221\345\205\245\351\251\273\347\254\274\344\275\215.py"
index 4e46a8d..a8c6add 100644
--- "a/\346\265\213\350\257\225\347\273\204/\350\204\232\346\234\254/\351\200\240\346\225\260\350\204\232\346\234\2542/\345\215\216\344\270\234\345\270\210\350\214\203\345\244\247\345\255\246\344\272\214\346\234\237/\345\271\266\345\217\221\345\205\245\351\251\273\347\254\274\344\275\215.py"
+++ "b/\346\265\213\350\257\225\347\273\204/\350\204\232\346\234\254/\351\200\240\346\225\260\350\204\232\346\234\2542/\345\215\216\344\270\234\345\270\210\350\214\203\345\244\247\345\255\246\344\272\214\346\234\237/\345\271\266\345\217\221\345\205\245\351\251\273\347\254\274\344\275\215.py"
@@ -10,18 +10,24 @@
 """
 import sys
 import os
-# 将上一级目录加入模块搜索路径
-sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
 import asyncio
 import aiohttp
 import time
 import traceback
 import datetime
 from tqdm import tqdm
-from Util.random_util import RandomUtil
-from Util.dingtalk_helper import DingTalkHelper
 import pymysql
+import pymysql.cursors
 import random
+def get_parent_directory(file_path, levels=1):
+    """获取指定层级的父目录"""
+    path = os.path.abspath(file_path)
+    for _ in range(levels):
+        path = os.path.dirname(path)
+    return path
+parent_dir = get_parent_directory(__file__, 5)  # 获取上五级目录
+sys.path.append(parent_dir)
+from 测试组.脚本.造数脚本2.Util import TokenValidator, DingTalkHelper, RandomUtil, RequestRecord, LoadTestReportGenerator
 
 
 # --- 配置 ---
@@ -38,16 +44,24 @@
     'charset': 'utf8mb4',
     'cursorclass': pymysql.cursors.DictCursor
 }
+# 账号密码配置
+username = "gly"
+password = "Baoyi@1341"
+# 创建获取token实例
+token_validator = TokenValidator()
+
+domain = "http://192.168.6.190:5561"
+token = token_validator.get_token(domain, username, password)
 
 apiname = "入驻笼位"
-url = "http://192.168.6.190:5561/api/base/cage/cage/enterCage"
+url = f"{domain}/api/base/cage/cage/enterCage"
 headers = {
-    "token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjE3NjcwODE5NzMsInVzZXJuYW1lIjoiZ2x5In0.Gk5C1A26dmC3Q-deDUQtwS5Ssj0DSzQ7PcUNSJKl2Mw",
+    "token": token,
     "Content-Type": "application/json"
 }
 
 NUM_WORKERS = 100
-TOTAL_REQUESTS = 244469
+TOTAL_REQUESTS = 1
 MAX_RETRIES = 3
 REQUEST_TIMEOUT = 60
 OUTPUT_DIR = './load_test_report'
@@ -60,6 +74,90 @@
 
 # 全局变量,存储从数据库获取的笼位列表
 cage_list = []
+
+
+class DataManager:
+    """数据管理器,负责从数据库加载用户和课题组信息"""
+
+    def __init__(self):
+        self.user_data = []  # 存储用户ID、用户名、课题组ID和课题组名称
+
+    def load_user_and_group_data(self):
+        """从数据库加载用户和课题组信息"""
+        try:
+            conn = pymysql.connect(**DB_CONFIG)
+            
+            # 获取用户ID、用户名和对应的课题组ID
+            with conn.cursor() as cursor:
+                cursor.execute("""
+                    SELECT 
+                        su.id as user_id, 
+                        su.name as user_name, 
+                        su.research_group_ids as research_group_ids
+                    FROM sys_user su
+                    WHERE su.research_group_ids IS NOT NULL AND su.research_group_ids != ''
+                """)
+                
+                results = cursor.fetchall()
+                
+                # 处理查询结果
+                for row in results:
+                    # 处理多个课题组ID(用逗号分隔的情况)
+                    if row['research_group_ids'] and ',' in row['research_group_ids']:
+                        group_ids = [gid.strip() for gid in row['research_group_ids'].split(',') if gid.strip()]
+                    elif row['research_group_ids']:
+                        group_ids = [row['research_group_ids'].strip()]
+                    else:
+                        continue
+                    
+                    # 获取所有相关的课题组信息
+                    group_info_list = []
+                    for group_id in group_ids:
+                        # 查找对应的课题组名称,使用正确的表名 l_research_group
+                        cursor.execute("SELECT id, name FROM l_research_group WHERE id = %s", (group_id,))
+                        group_info = cursor.fetchone()
+                        if group_info:
+                            group_info_list.append({
+                                'group_id': group_info['id'],
+                                'group_name': group_info['name']
+                            })
+                    
+                    if group_info_list:
+                        self.user_data.append({
+                            'user_id': row['user_id'],
+                            'user_name': row['user_name'],
+                            'groups': group_info_list
+                        })
+                
+            print(f"成功加载 {len(self.user_data)} 个有效用户数据")
+            conn.close()
+            return len(self.user_data) > 0
+            
+        except Exception as e:
+            print(f"数据库连接失败: {e}")
+            return False
+    
+    def get_random_user_and_group(self):
+        """随机获取一个用户和对应的课题组信息"""
+        if not self.user_data:
+            return None, None, None, None
+        
+        # 随机选择一个用户
+        user = random.choice(self.user_data)
+        user_id = user['user_id']
+        user_name = user['user_name']
+        
+        # 随机选择一个课题组
+        group = random.choice(user['groups'])
+        group_id = group['group_id']
+        group_name = group['group_name']
+        
+        return user_id, user_name, group_id, group_name
+
+
+# 创建数据管理器实例
+data_manager = DataManager()
+
 
 
 def fetch_cages_from_db():
@@ -112,7 +210,7 @@
 
 
 def create_animal_data(idx: int):
-    """创建动物数据,使用动态获取的笼位信息"""
+    """创建动物数据,使用动态获取的笼位、用户和课题组信息"""
     random_code = RandomUtil.generate_random_number_string(0, 999999999)
     random_femaleNum = RandomUtil.generate_random_number_string(1, 5)
     random_maleNum = RandomUtil.generate_random_number_string(1, 5)
@@ -120,6 +218,15 @@
 
     # 获取笼位信息
     cage_info = get_random_cage(idx)
+    
+    # 从数据管理器获取随机的用户和课题组信息
+    user_id, user_name, group_id, group_name = data_manager.get_random_user_and_group()
+    
+    # 如果无法获取用户和课题组信息,使用默认值
+    if not user_id or not group_id:
+        user_id = "1995379969088860162"  # 默认用户ID
+        group_id = "1995379941721026561"  # 默认课题组ID
+        group_name = "hyb课题组2"  # 默认课题组名称
 
     return {
         "enterCageList": [
@@ -130,11 +237,11 @@
                 },
                 "cageStatus": "2",  # 笼具状态
                 "user": {
-                    "id": "1995379969088860162"  # 操作用户ID
+                    "id": user_id  # 用户ID
                 },
                 "researchGroup": {
-                    "id": "1995379941721026561",
-                    "name": "hyb课题组2"  # 课题组信息
+                    "id": group_id,
+                    "name": group_name  # 课题组信息
                 },
                 "femaleNum": random_femaleNum,  # 雌性数量
                 "maleNum": random_maleNum,    # 雄性数量
@@ -253,6 +360,11 @@
         print("错误: 无法获取笼位信息,压测终止")
         return
 
+    # 从数据库获取用户和课题组信息
+    print("正在从数据库获取用户和课题组信息...")
+    if not data_manager.load_user_and_group_data():
+        print("警告: 无法获取用户和课题组信息,将使用默认值")
+    
     print(f"获取到 {len(cage_list)} 个笼位,将进行{apiname}压测")
 
     # 动态加载报告生成器模块(支持中文文件名)
@@ -260,7 +372,7 @@
     try:
         import importlib.util
         script_dir = os.path.dirname(os.path.abspath(__file__))
-        report_path = os.path.join(script_dir, 'H:\\项目\\造数脚本\\Util\\stress_test_report_generator.py')
+        report_path = os.path.join(script_dir, 'H:\\项目\\archive\\测试组\\脚本\\造数脚本2\\Util\\stress_test_report_generator.py')
         if os.path.exists(report_path):
             spec = importlib.util.spec_from_file_location('report_module', report_path)
             report_module = importlib.util.module_from_spec(spec)

--
Gitblit v1.9.1