hyb
2025-12-23 c980682a1fe205d8c21d349e9fc6b9e4951aea34
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
"""
pad笼位更新压测脚本(带压测报告生成并通过钉钉发送摘要)
 
说明:
 - 该脚本模拟多个饲养员同时对笼位进行频繁更新操作
 - 从userinfo.xlsx读取多个用户token实现多用户并发压测
 - 从数据库动态获取笼位ID、用户ID和课题组ID
 - 运行后会记录每条请求的时间、状态码和延迟
 - 运行结束后会生成压测报告并通过钉钉发送摘要
"""
 
import sys
import os
# 将上一级目录加入模块搜索路径
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import asyncio
import aiohttp
import time
import traceback
import datetime
import pandas as pd
import pymysql
import random
from tqdm import tqdm
from Util.dingtalk_helper import DingTalkHelper
 
# 将上一级目录加入模块搜索路径
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
 
# --- 配置 ---
ACCESS_TOKEN = '4625f6690acd9347fae5b3a05af598be63e73d604b933a9b3902425b8f136d4d'
SECRET = 'SEC3b6937550bd297b5491855f6f40c2ff1b41bc8c495e118ba9848742b1ddf8f19'
 
apiname = "pda笼位更新"
url = "http://qilu.baoyizn.com:9935/api/base/cage/cage/trainingCage"
headers = {
    "user-agent": "Mozilla/5.0 (Linux; Android 10; DS-MDT301 Build/QP1A.190711.020; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/74.0.3729.186 Mobile Safari/537.36 uni-app Html5Plus/1.0 (Immersed/24.0)",
    "Content-Type": "application/json; charset=utf-8",
    "Accept-Encoding": "gzip"
}
 
NUM_WORKERS = 100
TOTAL_REQUESTS = 20000
MAX_RETRIES = 3
REQUEST_TIMEOUT = 60
OUTPUT_DIR = './load_test_report'
 
# --- 数据库配置 ---
# DB_CONFIG = {
#     'host': 'rm-wz97y18i4t16hfsk6qo.mysql.rds.aliyuncs.com',
#     'database': 'srps_qilu',
#     'user': 'dev',
#     'password': 'Hello@112',
#     'charset': 'utf8mb4'
# }
DB_CONFIG = {
    'host': 'rm-wz90i533p18631e685o.mysql.rds.aliyuncs.com',
    'database': 'srps_qilu_prod',
    'user': 'dev',
    'password': 'Hello@112',
    'charset': 'utf8mb4'
}
 
# --- 初始化 ---
dingtalk_helper = DingTalkHelper(ACCESS_TOKEN, SECRET)
 
class DataManager:
    """数据管理器,负责从数据库和Excel加载数据"""
 
    def __init__(self):
        self.cage_ids = []
        self.user_data = []  # 存储用户ID和对应的课题组ID
        self.user_tokens = []  # 存储用户token
 
    def load_user_tokens(self, excel_file='TestData/userinfo.xlsx'):
        """从Excel加载用户token"""
        try:
            df = pd.read_excel(excel_file)
            self.user_tokens = df['token'].tolist()
            print(f"成功加载 {len(self.user_tokens)} 个用户token")
            return True
        except Exception as e:
            print(f"加载用户token失败: {e}")
            return False
 
    def load_database_data(self):
        """从数据库加载笼位ID和用户信息"""
        try:
            conn = pymysql.connect(**DB_CONFIG)
 
            # 获取笼位ID
            with conn.cursor() as cursor:
                cursor.execute("SELECT id FROM l_cage WHERE id IS NOT NULL")
                self.cage_ids = [row[0] for row in cursor.fetchall()]
                print(f"成功加载 {len(self.cage_ids)} 个笼位ID")
 
            # 获取用户ID和课题组ID
            with conn.cursor() as cursor:
                cursor.execute("SELECT id, research_group_ids FROM sys_user WHERE research_group_ids IS NOT NULL AND research_group_ids != ''")
 
                for user_id, research_groups in cursor.fetchall():
                    # 处理多个课题组ID(用逗号分隔的情况)
                    if research_groups and ',' in research_groups:
                        group_ids = [gid.strip() for gid in research_groups.split(',') if gid.strip()]
                    elif research_groups:
                        group_ids = [research_groups.strip()]
                    else:
                        continue
 
                    self.user_data.append({
                        'user_id': user_id,
                        'research_group_ids': group_ids
                    })
 
                print(f"成功加载 {len(self.user_data)} 个有效用户数据")
 
            conn.close()
            return len(self.cage_ids) > 0 and len(self.user_data) > 0
 
        except Exception as e:
            print(f"数据库连接失败: {e}")
            return False
 
    def get_random_cage_id(self):
        """随机获取一个笼位ID"""
        return random.choice(self.cage_ids) if self.cage_ids else None
 
    def get_random_user_data(self):
        """随机获取一个用户数据(用户ID和课题组ID)"""
        if not self.user_data:
            return None, None
 
        user = random.choice(self.user_data)
        user_id = user['user_id']
        research_group_id = random.choice(user['research_group_ids'])
 
        return user_id, research_group_id
 
    def get_random_token(self):
        """随机获取一个用户token"""
        return random.choice(self.user_tokens) if self.user_tokens else None
 
# 全局数据管理器
data_manager = DataManager()
 
def create_cage_data():
    """创建笼位更新请求数据"""
    cage_id = data_manager.get_random_cage_id()
    user_id, research_group_id = data_manager.get_random_user_data()
 
    if not cage_id or not user_id or not research_group_id:
        return None
 
    return {
        "cage": {"id": str(cage_id)},
        "researchGroup": {"id": research_group_id},
        "user": {"id": user_id}
    }
 
async def perform_request(session: aiohttp.ClientSession, index: int, max_retries: int = MAX_RETRIES):
    """执行单个请求"""
    attempt = 0
    last_err = None
    token = data_manager.get_random_token()
 
    if not token:
        return {
            'index': index,
            'timestamp': time.time(),
            'status_code': 0,
            'latency_ms': 0,
            'response_size': None,
            'error': 'No available token'
        }
 
    # 动态设置token到headers
    current_headers = headers.copy()
    current_headers['token'] = token
 
    while attempt < max_retries:
        data = create_cage_data()
        if not data:
            return {
                'index': index,
                'timestamp': time.time(),
                'status_code': 0,
                'latency_ms': 0,
                'response_size': None,
                'error': 'No available cage/user data'
            }
 
        start = time.time()
        try:
            async with session.post(url, json=data, headers=current_headers) as resp:
                text = await resp.text()
                latency_ms = (time.time() - start) * 1000.0
                status = resp.status
                if status == 200:
                    return {
                        'index': index,
                        'timestamp': time.time(),
                        'status_code': status,
                        'latency_ms': latency_ms,
                        'response_size': len(text) if text is not None else None,
                        'error': None
                    }
                else:
                    last_err = f'status_{status}:{text}'
                    attempt += 1
                    await asyncio.sleep(min(10, 2 ** attempt))
        except Exception as e:
            latency_ms = (time.time() - start) * 1000.0
            last_err = f'{type(e).__name__}:{str(e)}'
            attempt += 1
            await asyncio.sleep(min(10, 2 ** attempt))
 
    # 最终失败
    return {
        'index': index,
        'timestamp': time.time(),
        'status_code': 0,
        'latency_ms': latency_ms if 'latency_ms' in locals() else 0,
        'response_size': None,
        'error': last_err
    }
 
async def worker(name: int, queue: asyncio.Queue, session: aiohttp.ClientSession, gen, pbar, success_counter: dict, failed_list: list, lock: asyncio.Lock):
    """工作线程"""
    while True:
        idx = await queue.get()
        if idx is None:
            queue.task_done()
            break
        try:
            res = await perform_request(session, idx)
            # 记录到报告生成器
            if gen:
                gen.record_result(
                    index=res['index'],
                    timestamp=res['timestamp'],
                    status_code=int(res['status_code']),
                    latency_ms=float(res['latency_ms']),
                    response_size=res.get('response_size'),
                    error=res.get('error')
                )
            async with lock:
                if res['status_code'] and 200 <= res['status_code'] < 300:
                    success_counter['count'] += 1
                else:
                    failed_list.append((res['index'], res.get('error')))
                pbar.update(1)
        except Exception as e:
            async with lock:
                failed_list.append((idx, f'Worker异常:{type(e).__name__}:{e}'))
                pbar.update(1)
        finally:
            queue.task_done()
 
async def batch_update_cages(total: int, num_workers: int):
    """批量更新笼位"""
    # 动态加载报告生成器模块
    gen = None
    try:
        import importlib.util
 
        # 使用您提供的绝对路径
        report_path = 'H:\\项目\\造数脚本\\Util\\stress_test_report_generator.py'
 
        if os.path.exists(report_path):
            spec = importlib.util.spec_from_file_location('report_module', report_path)
            report_module = importlib.util.module_from_spec(spec)
            spec.loader.exec_module(report_module)
            LoadTestReportGenerator = getattr(report_module, 'LoadTestReportGenerator')
            gen = LoadTestReportGenerator(test_name=f'{apiname}压测任务', report_title='压测详细报告')
            print(f"成功加载压测报告生成器: {report_path}")
        else:
            # 备用:尝试相对路径
            script_dir = os.path.dirname(os.path.abspath(__file__))
            report_path = os.path.join(script_dir, 'Util', 'stress_test_report_generator.py')
            if os.path.exists(report_path):
                spec = importlib.util.spec_from_file_location('report_module', report_path)
                report_module = importlib.util.module_from_spec(spec)
                spec.loader.exec_module(report_module)
                LoadTestReportGenerator = getattr(report_module, 'LoadTestReportGenerator')
                gen = LoadTestReportGenerator(test_name=f'{apiname}压测任务', report_title='压测详细报告')
                print(f"成功加载压测报告生成器: {report_path}")
            else:
                print('警告: 未找到压测报告生成器,将跳过报告生成')
    except Exception as e:
        print(f'加载压测报告生成器失败: {e},将跳过报告生成')
 
    # 初始化数据
    if not data_manager.load_user_tokens():
        print("加载用户token失败,退出压测")
        return
 
    if not data_manager.load_database_data():
        print("加载数据库数据失败,退出压测")
        return
 
    timeout = aiohttp.ClientTimeout(total=REQUEST_TIMEOUT)
    connector = aiohttp.TCPConnector(limit=num_workers, limit_per_host=num_workers, force_close=False)
 
    async with aiohttp.ClientSession(timeout=timeout, connector=connector) as session:
        queue = asyncio.Queue()
        for i in range(1, total + 1):
            await queue.put(i)
        for _ in range(num_workers):
            await queue.put(None)
 
        success_counter = {'count': 0}
        failed_list = []
        lock = asyncio.Lock()
 
        with tqdm(total=total, desc='压测进度') as pbar:
            workers = [
                asyncio.create_task(worker(i, queue, session, gen, pbar, success_counter, failed_list, lock))
                for i in range(num_workers)
            ]
            await asyncio.gather(*workers)
 
        # 任务完成,生成报告
        if gen:
            os.makedirs(OUTPUT_DIR, exist_ok=True)
            outputs = gen.generate_report(OUTPUT_DIR, formats=['html', 'json', 'csv', 'docx'])
            stats = gen.compute_stats()
 
            # 构造钉钉摘要消息
            now_str = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            msg = [f'【{apiname} 压测报告】', f'生成时间:{now_str}']
            msg.append(f"总请求数:{stats.get('total_requests',0)},成功:{stats.get('success_count',0)},失败:{stats.get('fail_count',0)},成功率:{stats.get('success_rate',0):.2%}")
 
            if 'duration_seconds' in stats:
                msg.append(f"总耗时(s):{stats.get('duration_seconds',0):.2f},平均吞吐(req/s):{stats.get('throughput_rps',0):.2f}")
 
            if 'latency_ms' in stats:
                lat = stats.get('latency_ms', {})
                msg.append(f"延迟(ms) - 平均:{lat.get('avg',0):.2f},P90:{lat.get('p90',0):.2f},P95:{lat.get('p95',0):.2f},P99:{lat.get('p99',0):.2f}")
 
            msg.append(f"使用用户数:{len(data_manager.user_tokens)}")
            msg.append(f"可用笼位数:{len(data_manager.cage_ids)}")
            msg.append(f"有效用户数:{len(data_manager.user_data)}")
 
            # 列出生成的报告文件
            if outputs:
                file_list = []
                for k, v in outputs.items():
                    if k == 'charts':
                        for cname, cpath in v.items():
                            file_list.append(os.path.abspath(cpath))
                    else:
                        file_list.append(os.path.abspath(v))
                msg.append('生成文件:')
                for p in file_list:
                    msg.append(p)
 
            final_msg = '\n'.join(msg)
        else:
            # 如果没有报告生成器,计算基础统计
            stats = {
                'total_requests': total,
                'success_count': success_counter['count'],
                'fail_count': len(failed_list),
                'success_rate': success_counter['count'] / total if total > 0 else 0
            }
 
            # 基础统计消息
            now_str = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            msg = [f'【{apiname} 压测报告】', f'生成时间:{now_str}']
            msg.append(f"总请求数:{stats.get('total_requests',0)},成功:{stats.get('success_count',0)},失败:{stats.get('fail_count',0)},成功率:{stats.get('success_rate',0):.2%}")
            msg.append(f"使用用户数:{len(data_manager.user_tokens)}")
            msg.append(f"可用笼位数:{len(data_manager.cage_ids)}")
            msg.append(f"有效用户数:{len(data_manager.user_data)}")
            final_msg = '\n'.join(msg)
 
        # 发送钉钉消息
        try:
            dingtalk_helper.send_message(final_msg)
            print('钉钉消息发送成功')
        except Exception as e:
            print('发送钉钉消息失败:', e)
 
        print('\n[压测摘要]')
        print('成功数:', success_counter['count'], ' 失败数:', len(failed_list))
        if failed_list:
            print('失败示例(最多显示20条):')
            for idx, err in failed_list[:20]:
                print(f'  #{idx} => {err}')
 
if __name__ == '__main__':
    # 运行前检查依赖
    try:
        import pymysql
        import pandas
        print("依赖检查通过,开始压测...")
    except ImportError as e:
        print(f"缺少依赖: {e}")
        print("请安装: pip install pymysql pandas")
        exit(1)
 
    # 运行压测
    asyncio.run(batch_update_cages(TOTAL_REQUESTS, NUM_WORKERS))