1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
"""
集成压测脚本(带压测报告生成并通过钉钉发送摘要)
 
说明:
 - 该脚本基于之前的稳定 worker/队列 实现,运行后会记录每条请求的时间、状态码和延迟。
 - 运行结束后会调用Util目录下的压测报告生成器(文件名: stress_test_report_generator.py),输出 HTML/JSON/CSV/(可选)DOCX 等文件。
 - 生成后会把关键统计摘要通过 DingTalk 机器人发送(调用 DingTalkHelper.send_message)。
 - 安装依赖:aiohttp, tqdm, numpy/pandas/matplotlib/python-docx(可选)
 - 确保 DingTalkHelper 类在你的 `Util.dingtalk_helper` 中可用,且 ACCESS_TOKEN/SECRET 正确。
"""
import sys
import os
# 将上一级目录加入模块搜索路径
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import asyncio
import aiohttp
import time
import traceback
import datetime
from tqdm import tqdm
from Util.random_util import RandomUtil
from Util.dingtalk_helper import DingTalkHelper
 
 
# --- 配置 ---
ACCESS_TOKEN = '4625f6690acd9347fae5b3a05af598be63e73d604b933a9b3902425b8f136d4d'
SECRET = 'SEC3b6937550bd297b5491855f6f40c2ff1b41bc8c495e118ba9848742b1ddf8f19'
 
apiname = "个体信息"
url = "http://192.168.6.168:5534/api/individual/individualrecord/individualRecord/save"
headers = {
    "token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjE3NjA2Nzk1MDksInVzZXJuYW1lIjoiZ2x5In0.aApZ-cXC_pIw6gdPZHg3TlsgZIaCPRrmlMDM3-zEhtg",
    "Content-Type": "application/json"
}
 
NUM_WORKERS = 10
TOTAL_REQUESTS = 100
MAX_RETRIES = 3
REQUEST_TIMEOUT = 60
OUTPUT_DIR = './load_test_report'
 
# --- 初始化 ---
dingtalk_helper = DingTalkHelper(ACCESS_TOKEN, SECRET)
 
LARGE_CONTENT = "备注压测中" * 10
FILES_PATH = "/userfiles/1463828311460319233/程序附件//baoyi/individual/individualrecord/2025/10/cs.jpg"
 
 
def create_animal_data(idx: int):
    random_code = RandomUtil.generate_random_number_string(0, 10000)
    random_code_grade = RandomUtil.generate_random_number_string(1, 2)
    random_code_sex = RandomUtil.generate_random_number_string(1, 2)
    random_date = RandomUtil.generate_random_date("2023-01-01", "2025-10-16")
    return {
        "individualExperimentRecordDTOList": [
            {
                "fileUrl": "",
                "id": "",
                "individualId": "",
                "experimentDate": random_date,
                "name": "3",
                "otherName": "",
                "remarks": "",
                "ultrasoundScanDate": "",
                "pregnancyStatus": "",
                "otherSituation": ""
            },
            {
                "fileUrl": "",
                "id": "",
                "individualId": "",
                "experimentDate": random_date,
                "name": "5",
                "otherName": "",
                "remarks": "",
                "ultrasoundScanDate": "",
                "pregnancyStatus": "",
                "otherSituation": ""
            }
        ],
        "id": "",
        "rfid": "random_code",
        "code": f"hyb压测模拟万人新建动物信息{random_code}",
        "variety": "食蟹猴",
        "birthDate": f"{random_date} 00:00:00",
        "deathDate": "",
        "grade": random_code_grade,
        "gender": random_code_sex,
        "father": {"id": ""},
        "mother": {"id": ""},
        "birthPlace": "",
        "room": {"id": "", "name": ""},
        "cage": {"id": "", "code": ""},
        "status": "5,2",
        "weight": "",
        "weightUnit": "",
        "weightUnitValue": "",
        "researchGroup": {"id": "1491301397488844801"},
        "isConfirmBirth": "1",
        "birthYear": "2025",
        "weightDate": "",
        "remarks": LARGE_CONTENT,
        "relevanceId": "",
        "version": "",
        "transferTime": "",
        "transferRemarks": "",
        "transferList": [],
        "otherVariety": "",
        "varietyType": "",
        "animalSource": "1",
        "enterTime": f"{random_date} 00:00:00",
        "animalModel": "",
        "project": {"id": "1825457771270230018"},
        "projectManager": {"id": "1810602921408172033"},
        "reminderTime": random_date,
        "reminderUser": {"id": "1463828311460319233"},
        "fileUrl": FILES_PATH,
        "groupChagneDate": random_date,
        "infantId": "",
        "statusStr": "5,2"
        }
 
 
async def perform_request(session: aiohttp.ClientSession, index: int, max_retries: int = MAX_RETRIES):
    attempt = 0
    last_err = None
    while attempt < max_retries:
        data = create_animal_data(index)
        start = time.time()
        try:
            async with session.post(url, json=data, headers=headers) as resp:
                text = await resp.text()
                latency_ms = (time.time() - start) * 1000.0
                status = resp.status
                if status == 200:
                    return {
                        'index': index,
                        'timestamp': time.time(),
                        'status_code': status,
                        'latency_ms': latency_ms,
                        'response_size': len(text) if text is not None else None,
                        'error': None
                    }
                else:
                    last_err = f'status_{status}:{text}'
                    attempt += 1
                    await asyncio.sleep(min(10, 2 ** attempt))
        except Exception as e:
            latency_ms = (time.time() - start) * 1000.0
            last_err = f'{type(e).__name__}:{str(e)}'
            attempt += 1
            await asyncio.sleep(min(10, 2 ** attempt))
    # 最终失败
    return {
        'index': index,
        'timestamp': time.time(),
        'status_code': 0,
        'latency_ms': latency_ms if 'latency_ms' in locals() else 0,
        'response_size': None,
        'error': last_err
    }
 
 
async def worker(name: int, queue: asyncio.Queue, session: aiohttp.ClientSession, gen, pbar, success_counter: dict, failed_list: list, lock: asyncio.Lock):
    while True:
        idx = await queue.get()
        if idx is None:
            queue.task_done()
            break
        try:
            res = await perform_request(session, idx)
            # 记录到报告生成器
            gen.record_result(
                index=res['index'],
                timestamp=res['timestamp'],
                status_code=int(res['status_code']),
                latency_ms=float(res['latency_ms']),
                response_size=res.get('response_size'),
                error=res.get('error')
            )
            async with lock:
                if res['status_code'] and 200 <= res['status_code'] < 300:
                    success_counter['count'] += 1
                else:
                    failed_list.append((res['index'], res.get('error')))
                pbar.update(1)
        except Exception as e:
            async with lock:
                failed_list.append((idx, f'Worker异常:{type(e).__name__}:{e}'))
                pbar.update(1)
        finally:
            queue.task_done()
 
 
async def batch_create_animals(total: int, num_workers: int):
    # 动态加载报告生成器模块(支持中文文件名)
    gen = None
    try:
        import importlib.util
        script_dir = os.path.dirname(os.path.abspath(__file__))
        report_path = os.path.join(script_dir, 'H:\\项目\\造数脚本\\Util\\stress_test_report_generator.py')
        if os.path.exists(report_path):
            spec = importlib.util.spec_from_file_location('report_module', report_path)
            report_module = importlib.util.module_from_spec(spec)
            spec.loader.exec_module(report_module)
            LoadTestReportGenerator = getattr(report_module, 'LoadTestReportGenerator')
        else:
            # 备用:尝试直接导入模块名(若你的文件名已改为 ascii)
            from report_generator import LoadTestReportGenerator  # type: ignore
        gen = LoadTestReportGenerator(test_name=f'{apiname}压测任务', report_title='压测详细报告')
    except Exception as e:
        print('无法加载压测报告生成器,请确认stress_test_report_generator.py 文件位置正确。\n', e)
        raise
 
    timeout = aiohttp.ClientTimeout(total=REQUEST_TIMEOUT)
    connector = aiohttp.TCPConnector(limit=num_workers, limit_per_host=num_workers, force_close=False)
    async with aiohttp.ClientSession(timeout=timeout, connector=connector) as session:
        queue = asyncio.Queue()
        for i in range(1, total + 1):
            await queue.put(i)
        for _ in range(num_workers):
            await queue.put(None)
 
        success_counter = {'count': 0}
        failed_list = []
        lock = asyncio.Lock()
 
        with tqdm(total=total, desc='创建进度') as pbar:
            workers = [
                asyncio.create_task(worker(i, queue, session, gen, pbar, success_counter, failed_list, lock))
                for i in range(num_workers)
            ]
            await asyncio.gather(*workers)
 
        # 任务完成,生成报告
        os.makedirs(OUTPUT_DIR, exist_ok=True)
        outputs = gen.generate_report(OUTPUT_DIR, formats=['html', 'json', 'csv', 'docx'])
 
        stats = gen.compute_stats()
 
        # 构造钉钉摘要消息(中文)
        now_str = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        msg = [f'【{apiname} 压测报告】', f'生成时间:{now_str}']
        msg.append(f"总请求数:{stats.get('total_requests',0)},成功:{stats.get('success_count',0)},失败:{stats.get('fail_count',0)},成功率:{stats.get('success_rate',0):.2%}")
        msg.append(f"总耗时(s):{stats.get('duration_seconds',0):.2f},平均吞吐(req/s):{stats.get('throughput_rps',0):.2f}")
        lat = stats.get('latency_ms', {})
        msg.append(f"延迟(ms) - 平均:{lat.get('avg',0):.2f},P90:{lat.get('p90',0):.2f},P95:{lat.get('p95',0):.2f},P99:{lat.get('p99',0):.2f}")
 
        # 列出生成的报告文件
        file_list = []
        for k, v in outputs.items():
            if k == 'charts':
                for cname, cpath in v.items():
                    file_list.append(os.path.abspath(cpath))
            else:
                file_list.append(os.path.abspath(v))
        msg.append('生成文件:')
        for p in file_list:
            msg.append(p)
 
        final_msg = '\n'.join(msg)
 
        # 发送钉钉消息
        try:
            dingtalk_helper.send_message(final_msg)
        except Exception as e:
            print('发送钉钉消息失败:', e)
 
        print('\n[SUMMARY] 已生成报告并发送钉钉摘要。')
        print('成功数:', success_counter['count'], ' 失败数:', len(failed_list))
        if failed_list:
            print('失败示例(最多显示50条):')
            for idx, err in failed_list[:50]:
                print(f'  #{idx} => {err}')
 
 
if __name__ == '__main__':
    # 运行前建议先用小规模测试
    TOTAL_REQUESTS = TOTAL_REQUESTS
    NUM_WORKERS = NUM_WORKERS
    asyncio.run(batch_create_animals(TOTAL_REQUESTS, NUM_WORKERS))