1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
"""
集成压测脚本(带压测报告生成并通过钉钉发送摘要)
 
说明:
 - 该脚本基于之前的稳定 worker/队列 实现,运行后会记录每条请求的时间、状态码和延迟。
 - 运行结束后会调用Util目录下的压测报告生成器(文件名: stress_test_report_generator.py),输出 HTML/JSON/CSV/(可选)DOCX 等文件。
 - 生成后会把关键统计摘要通过 DingTalk 机器人发送(调用 DingTalkHelper.send_message)。
 - 安装依赖:aiohttp, tqdm, numpy/pandas/matplotlib/python-docx(可选)
 - 确保 DingTalkHelper 类在你的 `Util.dingtalk_helper` 中可用,且 ACCESS_TOKEN/SECRET 正确。
"""
import sys
import os
import random
import pymysql
# 将上一级目录加入模块搜索路径
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import asyncio
import aiohttp
import time
import traceback
import datetime
from tqdm import tqdm
from Util.random_util import RandomUtil
from Util.dingtalk_helper import DingTalkHelper
 
 
# --- 配置 ---
ACCESS_TOKEN = '4625f6690acd9347fae5b3a05af598be63e73d604b933a9b3902425b8f136d4d'
SECRET = 'SEC3b6937550bd297b5491855f6f40c2ff1b41bc8c495e118ba9848742b1ddf8f19'
 
apiname = "新建技术服务费"
url = "http://192.168.6.190:5561/api/finance/servicecost/financeServiceCost/save"
headers = {
    "token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjE3Njc4NTg5MjcsInVzZXJuYW1lIjoiZ2x5In0.l2ulcY6VbJzYRgzlxwxrBsqQ9xrVnxaiUq703e_igng",
    "Content-Type": "application/json"
}
 
NUM_WORKERS = 100
TOTAL_REQUESTS = 1000
MAX_RETRIES = 3
REQUEST_TIMEOUT = 60
OUTPUT_DIR = './load_test_report'
 
# --- 数据库配置 ---
DB_CONFIG = {
    'host': '192.168.6.190',
    'port': 3306,
    'user': 'dev',
    'password': 'Hello@112',
    'database': 'srps_ecnu',
    'charset': 'utf8mb4',
    'cursorclass': pymysql.cursors.DictCursor
}
 
class DataManager:
    """数据管理器,负责从数据库加载用户和课题组信息"""
 
    def __init__(self):
        self.user_data = []  # 存储用户ID、用户名、课题组ID和课题组名称
 
    def load_database_data(self):
        """从数据库加载用户和课题组信息"""
        try:
            conn = pymysql.connect(**DB_CONFIG)
            
            # 获取用户ID、用户名和对应的课题组ID
            with conn.cursor() as cursor:
                cursor.execute("""
                    SELECT 
                        su.id as user_id, 
                        su.name as user_name, 
                        su.research_group_ids as research_group_ids
                    FROM sys_user su
                    WHERE su.research_group_ids IS NOT NULL AND su.research_group_ids != ''
                """)
                
                results = cursor.fetchall()
                
                # 处理查询结果
                for row in results:
                    # 处理多个课题组ID(用逗号分隔的情况)
                    if row['research_group_ids'] and ',' in row['research_group_ids']:
                        group_ids = [gid.strip() for gid in row['research_group_ids'].split(',') if gid.strip()]
                    elif row['research_group_ids']:
                        group_ids = [row['research_group_ids'].strip()]
                    else:
                        continue
                    
                    # 获取所有相关的课题组信息
                    group_info_list = []
                    for group_id in group_ids:
                        # 查找对应的课题组名称,使用正确的表名 l_research_group
                        cursor.execute("SELECT id, name FROM l_research_group WHERE id = %s", (group_id,))
                        group_info = cursor.fetchone()
                        if group_info:
                            group_info_list.append({
                                'group_id': group_info['id'],
                                'group_name': group_info['name']
                            })
                    
                    if group_info_list:
                        self.user_data.append({
                            'user_id': row['user_id'],
                            'user_name': row['user_name'],
                            'groups': group_info_list
                        })
                
            print(f"成功加载 {len(self.user_data)} 个有效用户数据")
            conn.close()
            return len(self.user_data) > 0
            
        except Exception as e:
            print(f"数据库连接失败: {e}")
            return False
    
    def get_random_user_and_group(self):
        """随机获取一个用户和对应的课题组信息"""
        if not self.user_data:
            return None, None, None, None
        
        # 随机选择一个用户
        user = random.choice(self.user_data)
        user_id = user['user_id']
        user_name = user['user_name']
        
        # 随机选择一个课题组
        group = random.choice(user['groups'])
        group_id = group['group_id']
        group_name = group['group_name']
        
        return user_id, user_name, group_id, group_name
 
# 全局数据管理器
data_manager = DataManager()
 
# --- 初始化 ---
dingtalk_helper = DingTalkHelper(ACCESS_TOKEN, SECRET)
 
LARGE_CONTENT = "备注造数据" * 5
FILES_PATH = "/userfiles/1463828311460319233/程序附件//baoyi/individual/individualrecord/2025/10/cs.jpg"
 
 
def create_animal_data(idx: int):
    random_code = RandomUtil.generate_random_number_string(0, 10000)
    random_code_zt = RandomUtil.generate_random_number_string(1, 12)
    random_code_totalAmount = RandomUtil.generate_random_number_string(100, 10000)
    random_date = RandomUtil.generate_random_date("2023-01-01", "2025-10-16")
    
    # 从数据管理器获取随机的用户和课题组信息
    user_id, user_name, group_id, group_name = data_manager.get_random_user_and_group()
    
    if not user_id or not user_name or not group_id or not group_name:
        return None
    
    return {
            "id": "",
            "researchGroupId": group_id,
            "researchGroupName": group_name,
            "userId": user_id,
            "userName": user_name,
            "chargeDate": random_date,
            "name": f"服务名称{random_code}",
            "totalAmount": random_code_totalAmount,
            "status": random_code_zt,
            "attachment": "/userfiles/1588133301094375425/程序附件/finance/servicecost/financeServiceCost/2026/1/cs(2).jpg",
            "remarks": f"备注备注备注备注{random_code}"
            }
 
 
async def perform_request(session: aiohttp.ClientSession, index: int, max_retries: int = MAX_RETRIES):
    attempt = 0
    last_err = None
    while attempt < max_retries:
        data = create_animal_data(index)
        if not data:
            return {
                'index': index,
                'timestamp': time.time(),
                'status_code': 0,
                'latency_ms': 0,
                'response_size': None,
                'error': 'No available user/group data'
            }
        start = time.time()
        try:
            async with session.post(url, json=data, headers=headers) as resp:
                text = await resp.text()
                latency_ms = (time.time() - start) * 1000.0
                status = resp.status
                if status == 200:
                    return {
                        'index': index,
                        'timestamp': time.time(),
                        'status_code': status,
                        'latency_ms': latency_ms,
                        'response_size': len(text) if text is not None else None,
                        'error': None
                    }
                else:
                    last_err = f'status_{status}:{text}'
                    attempt += 1
                    await asyncio.sleep(min(10, 2 ** attempt))
        except Exception as e:
            latency_ms = (time.time() - start) * 1000.0
            last_err = f'{type(e).__name__}:{str(e)}'
            attempt += 1
            await asyncio.sleep(min(10, 2 ** attempt))
    # 最终失败
    return {
        'index': index,
        'timestamp': time.time(),
        'status_code': 0,
        'latency_ms': latency_ms if 'latency_ms' in locals() else 0,
        'response_size': None,
        'error': last_err
    }
 
 
async def worker(name: int, queue: asyncio.Queue, session: aiohttp.ClientSession, gen, pbar, success_counter: dict, failed_list: list, lock: asyncio.Lock):
    while True:
        idx = await queue.get()
        if idx is None:
            queue.task_done()
            break
        try:
            res = await perform_request(session, idx)
            # 记录到报告生成器
            gen.record_result(
                index=res['index'],
                timestamp=res['timestamp'],
                status_code=int(res['status_code']),
                latency_ms=float(res['latency_ms']),
                response_size=res.get('response_size'),
                error=res.get('error')
            )
            async with lock:
                if res['status_code'] and 200 <= res['status_code'] < 300:
                    success_counter['count'] += 1
                else:
                    failed_list.append((res['index'], res.get('error')))
                pbar.update(1)
        except Exception as e:
            async with lock:
                failed_list.append((idx, f'Worker异常:{type(e).__name__}:{e}'))
                pbar.update(1)
        finally:
            queue.task_done()
 
 
async def batch_create_animals(total: int, num_workers: int):
    # 加载数据库数据
    if not data_manager.load_database_data():
        print("加载数据库数据失败,退出压测")
        return
    
    # 动态加载报告生成器模块(支持中文文件名)
    gen = None
    try:
        import importlib.util
        script_dir = os.path.dirname(os.path.abspath(__file__))
        report_path = os.path.join(script_dir, r'H:\项目\造数脚本\Util\stress_test_report_generator.py')
        if os.path.exists(report_path):
            spec = importlib.util.spec_from_file_location('report_module', report_path)
            report_module = importlib.util.module_from_spec(spec)
            spec.loader.exec_module(report_module)
            LoadTestReportGenerator = getattr(report_module, 'LoadTestReportGenerator')
        else:
            # 备用:尝试直接导入模块名(若你的文件名已改为 ascii)
            from report_generator import LoadTestReportGenerator  # type: ignore
        gen = LoadTestReportGenerator(test_name=f'{apiname}压测任务', report_title='压测详细报告')
    except Exception as e:
        print('无法加载压测报告生成器,请确认stress_test_report_generator.py 文件位置正确。\n', e)
        raise
 
    timeout = aiohttp.ClientTimeout(total=REQUEST_TIMEOUT)
    connector = aiohttp.TCPConnector(limit=num_workers, limit_per_host=num_workers, force_close=False)
    async with aiohttp.ClientSession(timeout=timeout, connector=connector) as session:
        queue = asyncio.Queue()
        for i in range(1, total + 1):
            await queue.put(i)
        for _ in range(num_workers):
            await queue.put(None)
 
        success_counter = {'count': 0}
        failed_list = []
        lock = asyncio.Lock()
 
        with tqdm(total=total, desc='创建进度') as pbar:
            workers = [
                asyncio.create_task(worker(i, queue, session, gen, pbar, success_counter, failed_list, lock))
                for i in range(num_workers)
            ]
            await asyncio.gather(*workers)
 
        # 任务完成,生成报告
        os.makedirs(OUTPUT_DIR, exist_ok=True)
        outputs = gen.generate_report(OUTPUT_DIR, formats=['html', 'json', 'csv', 'docx'])
 
        stats = gen.compute_stats()
 
        # 构造钉钉摘要消息(中文)
        now_str = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        msg = [f'【{apiname} 压测报告】', f'生成时间:{now_str}']
        msg.append(f"总请求数:{stats.get('total_requests',0)},成功:{stats.get('success_count',0)},失败:{stats.get('fail_count',0)},成功率:{stats.get('success_rate',0):.2%}")
        msg.append(f"总耗时(s):{stats.get('duration_seconds',0):.2f},平均吞吐(req/s):{stats.get('throughput_rps',0):.2f}")
        lat = stats.get('latency_ms', {})
        msg.append(f"延迟(ms) - 平均:{lat.get('avg',0):.2f},P90:{lat.get('p90',0):.2f},P95:{lat.get('p95',0):.2f},P99:{lat.get('p99',0):.2f}")
 
        # 列出生成的报告文件
        file_list = []
        for k, v in outputs.items():
            if k == 'charts':
                for cname, cpath in v.items():
                    file_list.append(os.path.abspath(cpath))
            else:
                file_list.append(os.path.abspath(v))
        msg.append('生成文件:')
        for p in file_list:
            msg.append(p)
 
        final_msg = '\n'.join(msg)
 
        # 发送钉钉消息
        try:
            dingtalk_helper.send_message(final_msg)
        except Exception as e:
            print('发送钉钉消息失败:', e)
 
        print('\n[SUMMARY] 已生成报告并发送钉钉摘要。')
        print('成功数:', success_counter['count'], ' 失败数:', len(failed_list))
        if failed_list:
            print('失败示例(最多显示50条):')
            for idx, err in failed_list[:50]:
                print(f'  #{idx} => {err}')
 
 
if __name__ == '__main__':
    # 运行前建议先用小规模测试
    TOTAL_REQUESTS = TOTAL_REQUESTS
    NUM_WORKERS = NUM_WORKERS
    asyncio.run(batch_create_animals(TOTAL_REQUESTS, NUM_WORKERS))