新增认证校验工具类,可实现将明文加密后,直接获取token,避免认证过期
已更新华东师范大学二期的压测脚本实现自动获取最新token功能,其他脚本后续更新
1 files added
5 files modified
186 ■■■■ changed files
测试组/脚本/造数脚本2/Util/__init__.py 4 ●●●● patch | view | raw | blame | history
测试组/脚本/造数脚本2/Util/token_validator.py 93 ●●●●● patch | view | raw | blame | history
测试组/脚本/造数脚本2/华东师范大学二期/并发入驻笼位.py 32 ●●●●● patch | view | raw | blame | history
测试组/脚本/造数脚本2/华东师范大学二期/并发创建笼架.py 18 ●●●● patch | view | raw | blame | history
测试组/脚本/造数脚本2/华东师范大学二期/并发笼位标记.py 19 ●●●● patch | view | raw | blame | history
测试组/脚本/造数脚本2/华东师范大学二期/新建技术服务费.py 20 ●●●● patch | view | raw | blame | history
测试组/脚本/造数脚本2/Util/__init__.py
@@ -0,0 +1,4 @@
from .token_validator import TokenValidator
from .dingtalk_helper import DingTalkHelper
from .random_util import RandomUtil
from .stress_test_report_generator import RequestRecord, LoadTestReportGenerator
测试组/脚本/造数脚本2/Util/token_validator.py
New file
@@ -0,0 +1,93 @@
import base64
import requests
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad
class TokenValidator:
    def __init__(self):
        pass
    def encrypt(self, word: str, keyStr: str = "abcdefgh12345678", ivStr: str = "12345678abcdefgh") -> str:
        """
        加密函数说明:
          - word:待加密的文本(字符串)
          - keyStr:密钥字符串(默认使用 "abcdefgh12345678")
          - ivStr:初始向量字符串(默认使用 "12345678abcdefgh")
        加密过程:
          1. 将 keyStr 和 ivStr 使用 UTF-8 编码转换为字节
          2. 将 word 转为字节后进行 PKCS7 填充
          3. 使用 AES 的 CBC 模式加密数据
          4. 对加密结果进行 Base64 编码返回
        """
        # 将 key 与 iv 转为字节(默认使用 16 字节,即 AES-128)
        key = keyStr.encode('utf-8')
        iv = ivStr.encode('utf-8')
        # 明文转字节
        plaintext = word.encode('utf-8')
        # 使用 PKCS7 填充补全明文到 16 字节的整数倍
        padded_plaintext = pad(plaintext, AES.block_size)
        # 创建 AES-CBC 加密器
        cipher = AES.new(key, AES.MODE_CBC, iv)
        encrypted_bytes = cipher.encrypt(padded_plaintext)
        # 返回 Base64 编码的密文字符串
        return base64.b64encode(encrypted_bytes).decode('utf-8')
    def get_token(self, domain: str, username: str, password: str) -> str:
        """
        通过加密后的账号密码请求登录接口获取token
        :param domain: 域名,如 http://example.com
        :param username: 明文账号
        :param password: 明文密码
        :return: token字符串
        """
        # 加密账号密码
        encrypted_username = self.encrypt(username)
        encrypted_password = self.encrypt(password)
        # 构建登录接口URL
        login_url = f"{domain}/api/sys/login"
        # 构建请求头
        headers = {
            "Content-Type": "application/json; charset=UTF-8",
            "Cookie": f"x-domain={domain.split('://')[-1]}"
        }
        # 构建请求体
        data = {
            "username": encrypted_username,
            "password": encrypted_password,
            "code": ""
        }
        # 发送请求
        response = requests.post(login_url, headers=headers, json=data)
        response.raise_for_status()  # 检查请求是否成功
        # 解析响应,返回token
        result = response.json()
        return result.get("token", "")
# 示例用法
if __name__ == '__main__':
    token_validator = TokenValidator()
    # 测试加密方法
    username = "hyb实验人员"
    password = "Baoyi@1341"
    encrypted_username = token_validator.encrypt(username)
    encrypted_password = token_validator.encrypt(password)
    print(f"加密后的用户名:{encrypted_username}")
    print(f"加密后的密码:{encrypted_password}")
    # 测试获取token方法(需要实际域名才能运行)
    domain = "http://tsinghua.baoyizn.com:9906/"
    token = token_validator.get_token(domain, username, password)
    print(f"获取到的token:{token}")
测试组/脚本/造数脚本2/华东师范大学二期/并发入驻笼位.py
@@ -10,6 +10,15 @@
"""
import sys
import os
import asyncio
import aiohttp
import time
import traceback
import datetime
from tqdm import tqdm
import pymysql
import pymysql.cursors
import random
def get_parent_directory(file_path, levels=1):
    """获取指定层级的父目录"""
    path = os.path.abspath(file_path)
@@ -18,16 +27,7 @@
    return path
parent_dir = get_parent_directory(__file__, 5)  # 获取上五级目录
sys.path.append(parent_dir)
import asyncio
import aiohttp
import time
import traceback
import datetime
from tqdm import tqdm
from 测试组.脚本.造数脚本2.Util.random_util import RandomUtil
from 测试组.脚本.造数脚本2.Util.dingtalk_helper import DingTalkHelper
import pymysql
import random
from 测试组.脚本.造数脚本2.Util import TokenValidator, DingTalkHelper, RandomUtil, RequestRecord, LoadTestReportGenerator
# --- 配置 ---
@@ -44,11 +44,19 @@
    'charset': 'utf8mb4',
    'cursorclass': pymysql.cursors.DictCursor
}
# 账号密码配置
username = "gly"
password = "Baoyi@1341"
# 创建获取token实例
token_validator = TokenValidator()
domain = "http://192.168.6.190:5561"
token = token_validator.get_token(domain, username, password)
apiname = "入驻笼位"
url = "http://192.168.6.190:5561/api/base/cage/cage/enterCage"
url = f"{domain}/api/base/cage/cage/enterCage"
headers = {
    "token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjE3NjgwMzY0MzksInVzZXJuYW1lIjoiZ2x5In0.-LYYNbSJ-zb5RKaiBiPjntgUfnGRfvajA2B1N2v7a-o",
    "token": token,
    "Content-Type": "application/json"
}
测试组/脚本/造数脚本2/华东师范大学二期/并发创建笼架.py
@@ -26,23 +26,31 @@
parent_dir = get_parent_directory(__file__, 5)  # 获取上五级目录
sys.path.append(parent_dir)
from 测试组.脚本.造数脚本2.Util.random_util import RandomUtil
from 测试组.脚本.造数脚本2.Util.dingtalk_helper import DingTalkHelper
from 测试组.脚本.造数脚本2.Util import TokenValidator, DingTalkHelper, RandomUtil, RequestRecord, LoadTestReportGenerator
# --- 配置 ---
ACCESS_TOKEN = '4625f6690acd9347fae5b3a05af598be63e73d604b933a9b3902425b8f136d4d'
SECRET = 'SEC3b6937550bd297b5491855f6f40c2ff1b41bc8c495e118ba9848742b1ddf8f19'
# 账号密码配置
username = "gly"
password = "Baoyi@1341"
# 创建获取token实例
token_validator = TokenValidator()
domain = "http://192.168.6.190:5561"
token = token_validator.get_token(domain, username, password)
apiname = "创建动物房笼架"
url = "http://192.168.6.190:5561/api/base/room/shelf/save"
url = f"{domain}/api/base/room/shelf/save"
headers = {
    "token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjE3NjYyMTk5MzIsInVzZXJuYW1lIjoiZ2x5In0.fN-yJ0SVrEd_GyIvPmaBL2nx1M3a1jloSOsXyu3E5_w",
    "token": token,
    "Content-Type": "application/json"
}
NUM_WORKERS = 100
TOTAL_REQUESTS = 40000
TOTAL_REQUESTS = 2
MAX_RETRIES = 3
REQUEST_TIMEOUT = 60
OUTPUT_DIR = './load_test_report'
测试组/脚本/造数脚本2/华东师范大学二期/并发笼位标记.py
@@ -17,6 +17,7 @@
import datetime
from tqdm import tqdm
import pymysql
import pymysql.cursors
import random
def get_parent_directory(file_path, levels=1):
@@ -28,8 +29,7 @@
parent_dir = get_parent_directory(__file__, 5)  # 获取上五级目录
sys.path.append(parent_dir)
from 测试组.脚本.造数脚本2.Util.random_util import RandomUtil
from 测试组.脚本.造数脚本2.Util.dingtalk_helper import DingTalkHelper
from 测试组.脚本.造数脚本2.Util import TokenValidator, DingTalkHelper, RandomUtil, RequestRecord, LoadTestReportGenerator
@@ -48,15 +48,24 @@
    'cursorclass': pymysql.cursors.DictCursor
}
# 账号密码配置
username = "gly"
password = "Baoyi@1341"
# 创建获取token实例
token_validator = TokenValidator()
domain = "http://192.168.6.190:5561"
token = token_validator.get_token(domain, username, password)
apiname = "笼位标记"
url = "http://192.168.6.190:5561/api/base/cage/cage/tagCage"
url = f"{domain}/api/base/cage/cage/tagCage"
headers = {
    "token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjE3Njc5NTE3ODksInVzZXJuYW1lIjoiZ2x5In0.Btsm0AAxyyCID0ctiMFg95Wgb-KFdQJ1Lt0Va8dZpqI",
    "token": token,
    "Content-Type": "application/json"
}
NUM_WORKERS = 200
TOTAL_REQUESTS = 500000
TOTAL_REQUESTS = 2
MAX_RETRIES = 3
REQUEST_TIMEOUT = 60
OUTPUT_DIR = './load_test_report'
测试组/脚本/造数脚本2/华东师范大学二期/新建技术服务费.py
@@ -19,6 +19,7 @@
import datetime
from tqdm import tqdm
def get_parent_directory(file_path, levels=1):
    """获取指定层级的父目录"""
    path = os.path.abspath(file_path)
@@ -27,24 +28,31 @@
    return path
parent_dir = get_parent_directory(__file__, 5)  # 获取上五级目录
sys.path.append(parent_dir)
from 测试组.脚本.造数脚本2.Util.random_util import RandomUtil
from 测试组.脚本.造数脚本2.Util.dingtalk_helper import DingTalkHelper
from 测试组.脚本.造数脚本2.Util import TokenValidator, DingTalkHelper, RandomUtil, RequestRecord, LoadTestReportGenerator
# --- 配置 ---
ACCESS_TOKEN = '4625f6690acd9347fae5b3a05af598be63e73d604b933a9b3902425b8f136d4d'
SECRET = 'SEC3b6937550bd297b5491855f6f40c2ff1b41bc8c495e118ba9848742b1ddf8f19'
# 账号密码配置
username = "gly"
password = "Baoyi@1341"
# 创建获取token实例
token_validator = TokenValidator()
domain = "http://192.168.6.190:5561"
token = token_validator.get_token(domain, username, password)
apiname = "新建技术服务费"
url = "http://192.168.6.190:5561/api/finance/servicecost/financeServiceCost/save"
url = f"{domain}/api/finance/servicecost/financeServiceCost/save"
headers = {
    "token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjE3Njc4NTg5MjcsInVzZXJuYW1lIjoiZ2x5In0.l2ulcY6VbJzYRgzlxwxrBsqQ9xrVnxaiUq703e_igng",
    "token": token,
    "Content-Type": "application/json"
}
NUM_WORKERS = 100
TOTAL_REQUESTS = 1000
TOTAL_REQUESTS = 10
MAX_RETRIES = 3
REQUEST_TIMEOUT = 60
OUTPUT_DIR = './load_test_report'