hyb
2025-12-23 c980682a1fe205d8c21d349e9fc6b9e4951aea34
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
"""
压测报告生成器(中文报告)
 
依赖:
  - numpy
  - pandas (可选,用于更方便的表格导出)
  - matplotlib (用于绘图)
  - python-docx (可选,用于生成 Word 文档)
 
使用场景:在你的压测脚本中,逐条调用 `recorder.record_result(...)` 或者在压测结束后把结果一次性传入 `bulk_record`,
然后调用 `generate_report(output_dir, formats=['html','docx','json','csv'])`。
 
输出:HTML 报告(含统计与图表)、可选的 Word 报告、JSON/CSV 明细文件和图像文件。
 
"""
 
from __future__ import annotations
import os
import json
import math
import time
import statistics
import datetime
from typing import List, Optional, Dict, Any
import matplotlib.pyplot as plt
plt.rcParams['font.sans-serif'] = ['SimHei']  # 设置中文字体为黑体
plt.rcParams['axes.unicode_minus'] = False    # 正常显示负号
 
try:
    import numpy as np
except Exception:
    np = None
 
try:
    import pandas as pd
except Exception:
    pd = None
 
try:
    import matplotlib
    matplotlib.use('Agg')
    import matplotlib.pyplot as plt
except Exception:
    plt = None
 
try:
    from docx import Document
    from docx.shared import Inches
except Exception:
    Document = None
 
 
class RequestRecord:
    """单条请求记录数据结构。"""
 
    def __init__(self, index: int, timestamp: float, status_code: int, latency_ms: float,
                 response_size: Optional[int] = None, error: Optional[str] = None, extra: Optional[Dict] = None):
        self.index = index
        self.timestamp = timestamp  # unix 时间戳,秒
        self.status_code = status_code
        self.latency_ms = latency_ms
        self.response_size = response_size
        self.error = error
        self.extra = extra or {}
 
    def to_dict(self) -> Dict[str, Any]:
        return {
            'index': self.index,
            'timestamp': self.timestamp,
            'datetime': datetime.datetime.fromtimestamp(self.timestamp).isoformat(sep=' ', timespec='seconds'),
            'status_code': self.status_code,
            'latency_ms': self.latency_ms,
            'response_size': self.response_size,
            'error': self.error,
            **(self.extra or {})
        }
 
 
class LoadTestReportGenerator:
    """压测报告生成器。将请求结果记录并生成中文报告。"""
 
    def __init__(self, test_name: str = '压测任务', report_title: Optional[str] = None):
        self.test_name = test_name
        self.report_title = report_title or f"{test_name} 报告"
        self.records: List[RequestRecord] = []
        self._started_at: Optional[float] = None
        self._ended_at: Optional[float] = None
 
    # ---------- 记录方法 ----------
    def record_result(self, index: int, timestamp: float, status_code: int, latency_ms: float,
                      response_size: Optional[int] = None, error: Optional[str] = None, extra: Optional[Dict] = None):
        """记录单条请求结果。timestamp 为 unix 时间戳(秒)。"""
        rec = RequestRecord(index, timestamp, status_code, latency_ms, response_size, error, extra)
        self.records.append(rec)
        if self._started_at is None or timestamp < self._started_at:
            self._started_at = timestamp
        if self._ended_at is None or timestamp > self._ended_at:
            self._ended_at = timestamp
 
    def bulk_record(self, results: List[Dict[str, Any]]):
        """一次性批量导入结果。每一项是 dict,需包含 index,timestamp,status_code,latency_ms 等字段。"""
        for r in results:
            self.record_result(
                index=r.get('index', len(self.records) + 1),
                timestamp=r['timestamp'],
                status_code=int(r.get('status_code', 0)),
                latency_ms=float(r.get('latency_ms', 0)),
                response_size=r.get('response_size'),
                error=r.get('error'),
                extra=r.get('extra')
            )
 
    # ---------- 统计方法 ----------
    def compute_stats(self) -> Dict[str, Any]:
        if not self.records:
            return {}
 
        latencies = [r.latency_ms for r in self.records if r.latency_ms is not None]
        statuses = [r.status_code for r in self.records]
        errors = [r for r in self.records if r.error]
 
        total = len(self.records)
        success_count = sum(1 for s in statuses if 200 <= s < 300)
        fail_count = total - success_count
 
        duration = (self._ended_at - self._started_at) if (self._started_at and self._ended_at and self._ended_at > self._started_at) else None
        duration = float(duration) if duration else 0.0
 
        throughput = (total / duration) if duration > 0 else 0.0
 
        # 使用 numpy 计算分位数(如果可用),否则用纯 python
        if np:
            p50 = float(np.percentile(latencies, 50)) if latencies else 0
            p90 = float(np.percentile(latencies, 90)) if latencies else 0
            p95 = float(np.percentile(latencies, 95)) if latencies else 0
            p99 = float(np.percentile(latencies, 99)) if latencies else 0
        else:
            lat_sorted = sorted(latencies)
            def percentile(lst, q):
                if not lst:
                    return 0
                k = (len(lst)-1) * (q/100)
                f = math.floor(k)
                c = math.ceil(k)
                if f == c:
                    return lst[int(k)]
                d0 = lst[int(f)] * (c-k)
                d1 = lst[int(c)] * (k-f)
                return d0 + d1
            p50 = percentile(lat_sorted, 50)
            p90 = percentile(lat_sorted, 90)
            p95 = percentile(lat_sorted, 95)
            p99 = percentile(lat_sorted, 99)
 
        status_groups: Dict[int, int] = {}
        for s in statuses:
            status_groups[s] = status_groups.get(s, 0) + 1
 
        # 每秒请求数(RPS)时间序列
        rps_series = {}
        for r in self.records:
            sec = int(r.timestamp)
            rps_series[sec] = rps_series.get(sec, 0) + 1
 
        # 简单错误聚合
        error_summary: Dict[str, int] = {}
        for r in errors:
            key = r.error if r.error else f'status_{r.status_code}'
            error_summary[key] = error_summary.get(key, 0) + 1
 
        stats = {
            'total_requests': total,
            'success_count': success_count,
            'fail_count': fail_count,
            'success_rate': success_count / total if total else 0,
            'duration_seconds': duration,
            'throughput_rps': throughput,
            'latency_ms': {
                'min': min(latencies) if latencies else 0,
                'max': max(latencies) if latencies else 0,
                'avg': statistics.mean(latencies) if latencies else 0,
                'median': p50,
                'p90': p90,
                'p95': p95,
                'p99': p99,
            },
            'status_groups': status_groups,
            'error_summary': error_summary,
            'rps_series': sorted(list(rps_series.items())),  # [(sec, count), ...]
        }
        return stats
 
    # ---------- 报告输出 ----------
    def _ensure_dir(self, path: str):
        if not os.path.exists(path):
            os.makedirs(path, exist_ok=True)
 
    def generate_report(self, output_dir: str = './load_test_report', formats: Optional[List[str]] = None):
        """
        生成报告。
        formats: 列表,支持 'html','docx','json','csv'。
        返回生成的文件路径字典。
        """
        formats = formats or ['html', 'json']
        self._ensure_dir(output_dir)
 
        stats = self.compute_stats()
        timestamp_str = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
        base_name = f"{self.test_name.replace(' ', '_')}_{timestamp_str}"
 
        outputs = {}
 
        # 1) 输出 JSON 统计与明细
        if 'json' in formats:
            json_path = os.path.join(output_dir, base_name + '.summary.json')
            with open(json_path, 'w', encoding='utf-8') as f:
                json.dump({'stats': stats, 'records': [r.to_dict() for r in self.records]}, f, ensure_ascii=False, indent=2)
            outputs['json'] = json_path
 
        # 2) 输出 CSV 明细(如果 pandas 可用则用 pandas,否则手写)
        if 'csv' in formats:
            csv_path = os.path.join(output_dir, base_name + '.details.csv')
            if pd:
                df = pd.DataFrame([r.to_dict() for r in self.records])
                df.to_csv(csv_path, index=False, encoding='utf-8-sig')
            else:
                # 手动写入
                import csv
                with open(csv_path, 'w', newline='', encoding='utf-8') as f:
                    writer = csv.DictWriter(f, fieldnames=list(self.records[0].to_dict().keys()))
                    writer.writeheader()
                    for r in self.records:
                        writer.writerow(r.to_dict())
            outputs['csv'] = csv_path
 
        # 3) 生成图表(延迟分布与RPS),保存为 PNG
        charts = {}
        if plt:
            try:
                # 延迟直方图
                latencies = [r.latency_ms for r in self.records if r.latency_ms is not None]
                if latencies:
                    plt.figure()
                    plt.hist(latencies, bins=50)
                    plt.title('响应时间分布 (ms)')
                    plt.xlabel('延迟 (ms)')
                    plt.ylabel('请求数量')
                    hist_path = os.path.join(output_dir, base_name + '_latency_hist.png')
                    plt.tight_layout()
                    plt.savefig(hist_path)
                    plt.close()
                    charts['latency_hist'] = hist_path
 
                # RPS 图
                times = [int(r.timestamp) for r in self.records]
                if times:
                    from collections import Counter
                    cnt = Counter(times)
                    xs = sorted(cnt.keys())
                    ys = [cnt[x] for x in xs]
                    plt.figure()
                    plt.plot(xs, ys)
                    plt.title('每秒请求数 (RPS)')
                    plt.xlabel('Unix 秒')
                    plt.ylabel('请求数')
                    rps_path = os.path.join(output_dir, base_name + '_rps.png')
                    plt.tight_layout()
                    plt.savefig(rps_path)
                    plt.close()
                    charts['rps'] = rps_path
            except Exception as e:
                print('生成图表时出错:', e)
 
        outputs['charts'] = charts
 
        # 4) 生成 HTML 报告
        if 'html' in formats:
            html_path = os.path.join(output_dir, base_name + '.html')
            html_content = self._build_html_report(stats, charts)
            with open(html_path, 'w', encoding='utf-8') as f:
                f.write(html_content)
            outputs['html'] = html_path
 
        # 5) 生成 Word 报告(可选)
        if 'docx' in formats and Document:
            try:
                docx_path = os.path.join(output_dir, base_name + '.docx')
                self._build_docx_report(stats, charts, docx_path)
                outputs['docx'] = docx_path
            except Exception as e:
                print('生成 docx 报告时出错:', e)
 
        return outputs
 
    def _build_html_report(self, stats: Dict[str, Any], charts: Dict[str, str]) -> str:
        # 这里构建一份中文的 HTML 模板(简洁风格)
        title = self.report_title
        now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        summary_html = f"""
        <h2>摘要</h2>
        <ul>
          <li>报告名称:{title}</li>
          <li>生成时间:{now}</li>
          <li>总请求数:{stats.get('total_requests', 0)}</li>
          <li>成功数:{stats.get('success_count',0)},失败数:{stats.get('fail_count',0)},成功率:{stats.get('success_rate',0):.2%}</li>
          <li>总耗时(秒):{stats.get('duration_seconds',0):.2f}</li>
          <li>平均吞吐(req/s):{stats.get('throughput_rps',0):.2f}</li>
        </ul>
        """
 
        latency = stats.get('latency_ms', {})
        latency_html = f"""
        <h2>响应时间统计 (ms)</h2>
        <ul>
          <li>最小:{latency.get('min',0):.2f}</li>
          <li>最大:{latency.get('max',0):.2f}</li>
          <li>平均:{latency.get('avg',0):.2f}</li>
          <li>中位数(P50):{latency.get('median',0):.2f}</li>
          <li>P90:{latency.get('p90',0):.2f},P95:{latency.get('p95',0):.2f},P99:{latency.get('p99',0):.2f}</li>
        </ul>
        """
 
        status_html = '<h2>状态码分布</h2><ul>'
        for k, v in stats.get('status_groups', {}).items():
            status_html += f'<li>{k}: {v}</li>'
        status_html += '</ul>'
 
        error_html = '<h2>错误汇总</h2>'
        if stats.get('error_summary'):
            error_html += '<ul>'
            for k, v in stats.get('error_summary', {}).items():
                error_html += f'<li>{k}: {v}</li>'
            error_html += '</ul>'
        else:
            error_html += '<p>无错误记录</p>'
 
        charts_html = '<h2>图表</h2>'
        for name, path in charts.items():
            charts_html += f'<div><h3>{name}</h3><img src="{os.path.basename(path)}" alt="{name}" style="max-width:100%;height:auto;"/></div>'
 
        # 详细请求表(默认仅包含前 100 条,避免页面过大)
        details = [r.to_dict() for r in self.records[:100]]
        detail_rows = ''.join([f"<tr><td>{d['index']}</td><td>{d['datetime']}</td><td>{d['status_code']}</td><td>{d['latency_ms']}</td><td>{d['response_size']}</td><td>{d['error'] or ''}</td></tr>" for d in details])
        details_html = f"""
        <h2>请求明细(仅显示前100条)</h2>
        <table border="1" cellpadding="4" cellspacing="0">
          <tr><th>#</th><th>时间</th><th>状态码</th><th>延迟(ms)</th><th>响应大小</th><th>错误</th></tr>
          {detail_rows}
        </table>
        """
 
        html = f"""
        <!doctype html>
        <html lang="zh-CN">
        <head>
          <meta charset="utf-8">
          <title>{title}</title>
          <style>
            body{{font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, 'Helvetica Neue', Arial; padding:20px}}
            h2{{color:#2c3e50}}
            table{{border-collapse:collapse; width:100%}}
            th,td{{padding:6px; text-align:left}}
          </style>
        </head>
        <body>
          <h1>{title}</h1>
          {summary_html}
          {latency_html}
          {status_html}
          {error_html}
          {charts_html}
          {details_html}
          <p>注:如需查看所有请求明细,请下载同目录下的 CSV/JSON 文件。</p>
        </body>
        </html>
        """
 
        # 若有图表,将图表文件拷贝到同目录(图表已保存在 output 目录),HTML 中用相对路径引用 basename
        return html
 
    def _build_docx_report(self, stats: Dict[str, Any], charts: Dict[str, str], docx_path: str):
        if not Document:
            raise RuntimeError('缺少 python-docx 库,无法生成 docx。')
        doc = Document()
        doc.add_heading(self.report_title, level=1)
        doc.add_paragraph(f"生成时间:{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
 
        doc.add_heading('摘要', level=2)
        doc.add_paragraph(f"总请求数:{stats.get('total_requests',0)}  成功:{stats.get('success_count',0)}  失败:{stats.get('fail_count',0)}  成功率:{stats.get('success_rate',0):.2%}")
        doc.add_paragraph(f"总耗时(秒):{stats.get('duration_seconds',0):.2f}  平均吞吐(req/s):{stats.get('throughput_rps',0):.2f}")
 
        doc.add_heading('响应时间统计 (ms)', level=2)
        lat = stats.get('latency_ms', {})
        doc.add_paragraph(f"最小:{lat.get('min',0):.2f}  最大:{lat.get('max',0):.2f}  平均:{lat.get('avg',0):.2f}")
        doc.add_paragraph(f"P50:{lat.get('median',0):.2f}  P90:{lat.get('p90',0):.2f}  P95:{lat.get('p95',0):.2f}  P99:{lat.get('p99',0):.2f}")
 
        doc.add_heading('状态码分布', level=2)
        for k, v in stats.get('status_groups', {}).items():
            doc.add_paragraph(f"{k}: {v}")
 
        doc.add_heading('错误汇总', level=2)
        if stats.get('error_summary'):
            for k, v in stats.get('error_summary', {}).items():
                doc.add_paragraph(f"{k}: {v}")
        else:
            doc.add_paragraph('无错误记录')
 
        # 插入图表
        for name, path in charts.items():
            if os.path.exists(path):
                doc.add_heading(name, level=2)
                try:
                    doc.add_picture(path, width=Inches(6))
                except Exception:
                    doc.add_paragraph(f'无法插入图片:{path}')
 
        # 附加前 100 条请求明细
        doc.add_heading('请求明细(前100条)', level=2)
        table = doc.add_table(rows=1, cols=6)
        hdr_cells = table.rows[0].cells
        hdr_cells[0].text = '#'
        hdr_cells[1].text = '时间'
        hdr_cells[2].text = '状态码'
        hdr_cells[3].text = '延迟(ms)'
        hdr_cells[4].text = '响应大小'
        hdr_cells[5].text = '错误'
 
        for r in self.records[:100]:
            row_cells = table.add_row().cells
            d = r.to_dict()
            row_cells[0].text = str(d['index'])
            row_cells[1].text = d['datetime']
            row_cells[2].text = str(d['status_code'])
            row_cells[3].text = f"{d['latency_ms']}"
            row_cells[4].text = str(d.get('response_size', ''))
            row_cells[5].text = d.get('error') or ''
 
        doc.save(docx_path)
 
 
# ---------------- 使用示例 ----------------
if __name__ == '__main__':
    # 简单示例:模拟一些请求结果并生成报告
    gen = LoadTestReportGenerator(test_name='示例压测', report_title='示例压测详细报告')
    now = time.time()
    # 模拟 100 条请求
    import random
    for i in range(1, 101):
        ts = now + (i // 5)  # 每秒 5 个请求(模拟)
        lat = max(1.0, random.gauss(200, 50))
        status = 200 if random.random() > 0.05 else 500
        err = None if status == 200 else '500 Internal Server Error'
        gen.record_result(i, ts, status, lat, response_size=random.randint(500, 5000), error=err)
 
    outputs = gen.generate_report('./example_report', formats=['html', 'json', 'csv', 'docx'])
    print('已生成报告:', outputs)