hyb
2026-01-07 c7f60dc7e9a36596f0e0d1787bd0cca4e9b57bcb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
"""
集成压测脚本(带压测报告生成并通过钉钉发送摘要)
 
说明:
 - 该脚本基于之前的稳定 worker/队列 实现,运行后会记录每条请求的时间、状态码和延迟。
 - 运行结束后会调用Util目录下的压测报告生成器(文件名: stress_test_report_generator.py),输出 HTML/JSON/CSV/(可选)DOCX 等文件。
 - 生成后会把关键统计摘要通过 DingTalk 机器人发送(调用 DingTalkHelper.send_message)。
 - 安装依赖:aiohttp, tqdm, numpy/pandas/matplotlib/python-docx(可选)
 - 确保 DingTalkHelper 类在你的 `Util.dingtalk_helper` 中可用,且 ACCESS_TOKEN/SECRET 正确。
"""
import sys
import os
# 将上一级目录加入模块搜索路径
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import asyncio
import aiohttp
import time
import traceback
import datetime
from tqdm import tqdm
from Util.random_util import RandomUtil
from Util.dingtalk_helper import DingTalkHelper
import pymysql
import random
 
 
# --- 配置 ---
ACCESS_TOKEN = '4625f6690acd9347fae5b3a05af598be63e73d604b933a9b3902425b8f136d4d'
SECRET = 'SEC3b6937550bd297b5491855f6f40c2ff1b41bc8c495e118ba9848742b1ddf8f19'
 
# 数据库配置
DB_CONFIG = {
    'host': '192.168.6.190',
    'port': 3306,
    'user': 'dev',
    'password': 'Hello@112',
    'database': 'srps_ecnu',
    'charset': 'utf8mb4',
    'cursorclass': pymysql.cursors.DictCursor
}
 
apiname = "入驻笼位"
url = "http://192.168.6.190:5561/api/base/cage/cage/enterCage"
headers = {
    "token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjE3NjcyMzc1MDgsInVzZXJuYW1lIjoiZ2x5In0.2N0rQ7Oy1B-Wg_fnywOrcDelYnCe5JOpd7-vwu_2H6U",
    "Content-Type": "application/json"
}
 
NUM_WORKERS = 100
TOTAL_REQUESTS = 244481
MAX_RETRIES = 3
REQUEST_TIMEOUT = 60
OUTPUT_DIR = './load_test_report'
 
# --- 初始化 ---
dingtalk_helper = DingTalkHelper(ACCESS_TOKEN, SECRET)
 
LARGE_CONTENT = "压测中" * 500
FILES_PATH = "/userfiles/1588133301094375425/程序附件/notify/notify/2025/10/15/173933/cs.jpg"
 
# 全局变量,存储从数据库获取的笼位列表
cage_list = []
 
 
def fetch_cages_from_db():
    """从数据库获取笼位信息"""
    global cage_list
    try:
        connection = pymysql.connect(**DB_CONFIG)
        with connection.cursor() as cursor:
            # 查询所有笼位的id和code,可以添加where条件过滤需要测试的笼位
            sql = "SELECT id, code FROM l_cage WHERE status = 1"  # 假设有status字段,可以根据实际情况调整
            cursor.execute(sql)
            results = cursor.fetchall()
 
            if not results:
                # 如果没有查询条件,查询所有笼位
                sql = "SELECT id, code FROM l_cage"
                cursor.execute(sql)
                results = cursor.fetchall()
 
            cage_list = results
            print(f"从数据库获取到 {len(cage_list)} 个笼位")
 
            # 如果笼位数量少于请求数,打印警告
            if len(cage_list) < TOTAL_REQUESTS:
                print(f"警告: 笼位数量({len(cage_list)})少于请求数({TOTAL_REQUESTS}),将循环使用笼位")
 
        connection.close()
        return cage_list
    except Exception as e:
        print(f"数据库连接失败: {e}")
        # 如果数据库连接失败,使用默认的笼位作为后备
        cage_list = [{"id": "10386", "code": "hyb压测笼架0001-1-A"}]
        print(f"使用默认笼位: {cage_list[0]}")
        return cage_list
 
 
def get_random_cage(index: int):
    """根据索引获取笼位,如果笼位数量不足则循环使用"""
    if not cage_list:
        # 如果列表为空,尝试重新获取
        fetch_cages_from_db()
 
    if not cage_list:
        # 如果还是为空,返回默认值
        return {"id": "10386", "code": "hyb压测笼架0001-1-A"}
 
    # 使用索引取模来循环获取笼位
    cage_index = index % len(cage_list)
    return cage_list[cage_index]
 
 
def create_animal_data(idx: int):
    """创建动物数据,使用动态获取的笼位信息"""
    random_code = RandomUtil.generate_random_number_string(0, 999999999)
    random_femaleNum = RandomUtil.generate_random_number_string(1, 5)
    random_maleNum = RandomUtil.generate_random_number_string(1, 5)
    random_data = RandomUtil.generate_random_date("2023-01-01", "2025-12-19")
 
    # 获取笼位信息
    cage_info = get_random_cage(idx)
 
    return {
        "enterCageList": [
            {
                "cage": {
                    "id": cage_info["id"],
                    "code": cage_info["code"]
                },
                "cageStatus": "2",  # 笼具状态
                "user": {
                    "id": "1995379969088860162"  # 操作用户ID
                },
                "researchGroup": {
                    "id": "1995379941721026561",
                    "name": "hyb课题组2"  # 课题组信息
                },
                "femaleNum": random_femaleNum,  # 雌性数量
                "maleNum": random_maleNum,    # 雄性数量
                "enterTime": random_data,  # 入笼时间
                "expectLeaveTime": random_data,  # 预计离笼时间
                "leaveCenterTime": "",  # 离开中心时间(为空)
                "animalVariety": {
                    "id": "1595669319989637121",
                    "name": "小鼠"  # 动物品种
                },
                "animalStrain": {
                    "id": "1595669618858962945",
                    "name": "BALB/c"  # 动物品系
                },
                "specifications": f"规格规格{random_code}",  # 规格说明
                "animalList": [],  # 动物列表(空数组)
                "groupUser": [],   # 组内用户(空数组)
                "ethicCode": f"伦理编号伦理编号{random_code}"  # 伦理编号
            }
        ]
    }
 
 
async def perform_request(session: aiohttp.ClientSession, index: int, max_retries: int = MAX_RETRIES):
    attempt = 0
    last_err = None
    while attempt < max_retries:
        data = create_animal_data(index)
        start = time.time()
        try:
            async with session.post(url, json=data, headers=headers) as resp:
                text = await resp.text()
                latency_ms = (time.time() - start) * 1000.0
                status = resp.status
                if status == 200:
                    return {
                        'index': index,
                        'timestamp': time.time(),
                        'status_code': status,
                        'latency_ms': latency_ms,
                        'response_size': len(text) if text is not None else None,
                        'error': None,
                        'cage_id': data['enterCageList'][0]['cage']['id'],  # 记录使用的笼位ID
                        'cage_code': data['enterCageList'][0]['cage']['code']  # 记录使用的笼位编码
                    }
                else:
                    last_err = f'status_{status}:{text}'
                    attempt += 1
                    await asyncio.sleep(min(10, 2 ** attempt))
        except Exception as e:
            latency_ms = (time.time() - start) * 1000.0
            last_err = f'{type(e).__name__}:{str(e)}'
            attempt += 1
            await asyncio.sleep(min(10, 2 ** attempt))
 
    # 获取当前请求使用的笼位信息用于记录
    data = create_animal_data(index)
 
    # 最终失败
    return {
        'index': index,
        'timestamp': time.time(),
        'status_code': 0,
        'latency_ms': latency_ms if 'latency_ms' in locals() else 0,
        'response_size': None,
        'error': last_err,
        'cage_id': data['enterCageList'][0]['cage']['id'],
        'cage_code': data['enterCageList'][0]['cage']['code']
    }
 
 
async def worker(name: int, queue: asyncio.Queue, session: aiohttp.ClientSession, gen, pbar, success_counter: dict, failed_list: list, lock: asyncio.Lock):
    while True:
        idx = await queue.get()
        if idx is None:
            queue.task_done()
            break
        try:
            res = await perform_request(session, idx)
            # 记录到报告生成器
            gen.record_result(
                index=res['index'],
                timestamp=res['timestamp'],
                status_code=int(res['status_code']),
                latency_ms=float(res['latency_ms']),
                response_size=res.get('response_size'),
                error=res.get('error')
            )
            async with lock:
                if res['status_code'] and 200 <= res['status_code'] < 300:
                    success_counter['count'] += 1
                    # 记录成功的笼位信息
                    if 'cage_id' in res and 'cage_code' in res:
                        if 'success_cages' not in success_counter:
                            success_counter['success_cages'] = []
                        cage_info = f"{res['cage_code']}({res['cage_id']})"
                        if cage_info not in success_counter['success_cages']:
                            success_counter['success_cages'].append(cage_info)
                else:
                    failed_list.append((res['index'], res.get('error'), res.get('cage_code', '未知笼位')))
                pbar.update(1)
        except Exception as e:
            async with lock:
                failed_list.append((idx, f'Worker异常:{type(e).__name__}:{e}', '未知笼位'))
                pbar.update(1)
        finally:
            queue.task_done()
 
 
async def batch_create_animals(total: int, num_workers: int):
    # 首先从数据库获取笼位信息
    print("正在从数据库获取笼位信息...")
    fetch_cages_from_db()
 
    if not cage_list:
        print("错误: 无法获取笼位信息,压测终止")
        return
 
    print(f"获取到 {len(cage_list)} 个笼位,将进行{apiname}压测")
 
    # 动态加载报告生成器模块(支持中文文件名)
    gen = None
    try:
        import importlib.util
        script_dir = os.path.dirname(os.path.abspath(__file__))
        report_path = os.path.join(script_dir, 'H:\\项目\\造数脚本\\Util\\stress_test_report_generator.py')
        if os.path.exists(report_path):
            spec = importlib.util.spec_from_file_location('report_module', report_path)
            report_module = importlib.util.module_from_spec(spec)
            spec.loader.exec_module(report_module)
            LoadTestReportGenerator = getattr(report_module, 'LoadTestReportGenerator')
        else:
            # 备用:尝试直接导入模块名(若你的文件名已改为 ascii)
            from report_generator import LoadTestReportGenerator  # type: ignore
        gen = LoadTestReportGenerator(test_name='压测任务', report_title='压测详细报告')
    except Exception as e:
        print('无法加载压测报告生成器,请确认stress_test_report_generator.py 文件位置正确。\n', e)
        raise
 
    timeout = aiohttp.ClientTimeout(total=REQUEST_TIMEOUT)
    connector = aiohttp.TCPConnector(limit=num_workers, limit_per_host=num_workers, force_close=False)
    async with aiohttp.ClientSession(timeout=timeout, connector=connector) as session:
        queue = asyncio.Queue()
        for i in range(1, total + 1):
            await queue.put(i)
        for _ in range(num_workers):
            await queue.put(None)
 
        success_counter = {'count': 0}
        failed_list = []
        lock = asyncio.Lock()
 
        with tqdm(total=total, desc='创建进度') as pbar:
            workers = [
                asyncio.create_task(worker(i, queue, session, gen, pbar, success_counter, failed_list, lock))
                for i in range(num_workers)
            ]
            await asyncio.gather(*workers)
 
        # 任务完成,生成报告
        os.makedirs(OUTPUT_DIR, exist_ok=True)
        outputs = gen.generate_report(OUTPUT_DIR, formats=['html', 'json', 'csv', 'docx'])
 
        stats = gen.compute_stats()
 
        # 构造钉钉摘要消息(中文)
        now_str = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        msg = [f'【{apiname} 压测报告】', f'生成时间:{now_str}']
        msg.append(f"总请求数:{stats.get('total_requests',0)},成功:{stats.get('success_count',0)},失败:{stats.get('fail_count',0)},成功率:{stats.get('success_rate',0):.2%}")
        msg.append(f"总耗时(s):{stats.get('duration_seconds',0):.2f},平均吞吐(req/s):{stats.get('throughput_rps',0):.2f}")
        lat = stats.get('latency_ms', {})
        msg.append(f"延迟(ms) - 平均:{lat.get('avg',0):.2f},P90:{lat.get('p90',0):.2f},P95:{lat.get('p95',0):.2f},P99:{lat.get('p99',0):.2f}")
 
        # 添加笼位信息统计
        msg.append(f"数据库笼位总数:{len(cage_list)}")
        if 'success_cages' in success_counter:
            msg.append(f"成功使用的笼位数:{len(success_counter['success_cages'])}")
            # 显示部分成功笼位示例
            if len(success_counter['success_cages']) > 0:
                sample_cages = success_counter['success_cages'][:5]  # 显示前5个
                msg.append(f"成功笼位示例:{', '.join(sample_cages)}")
                if len(success_counter['success_cages']) > 5:
                    msg.append(f"...等{len(success_counter['success_cages'])}个笼位")
 
        # 列出生成的报告文件
        file_list = []
        for k, v in outputs.items():
            if k == 'charts':
                for cname, cpath in v.items():
                    file_list.append(os.path.abspath(cpath))
            else:
                file_list.append(os.path.abspath(v))
        msg.append('生成文件:')
        for p in file_list:
            msg.append(p)
 
        final_msg = '\n'.join(msg)
 
        # 发送钉钉消息
        try:
            dingtalk_helper.send_message(final_msg)
        except Exception as e:
            print('发送钉钉消息失败:', e)
 
        print('\n[SUMMARY] 已生成报告并发送钉钉摘要。')
        print('成功数:', success_counter['count'], ' 失败数:', len(failed_list))
        print(f'数据库笼位总数:{len(cage_list)}')
        if 'success_cages' in success_counter:
            print(f'成功使用的笼位数:{len(success_counter["success_cages"])}')
 
        if failed_list:
            print('失败示例(最多显示50条):')
            for idx, err, cage_code in failed_list[:50]:
                print(f'  #{idx} 笼位[{cage_code}] => {err}')
 
 
if __name__ == '__main__':
    # 检查数据库连接
    try:
        test_conn = pymysql.connect(**DB_CONFIG)
        test_conn.close()
        print("数据库连接测试成功")
    except Exception as e:
        print(f"数据库连接测试失败: {e}")
        print("将使用默认笼位进行压测")
 
    # 运行前建议先用小规模测试
    TOTAL_REQUESTS = TOTAL_REQUESTS
    NUM_WORKERS = NUM_WORKERS
    asyncio.run(batch_create_animals(TOTAL_REQUESTS, NUM_WORKERS))