""" 压测报告生成器(中文报告) 依赖: - numpy - pandas (可选,用于更方便的表格导出) - matplotlib (用于绘图) - python-docx (可选,用于生成 Word 文档) 使用场景:在你的压测脚本中,逐条调用 `recorder.record_result(...)` 或者在压测结束后把结果一次性传入 `bulk_record`, 然后调用 `generate_report(output_dir, formats=['html','docx','json','csv'])`。 输出:HTML 报告(含统计与图表)、可选的 Word 报告、JSON/CSV 明细文件和图像文件。 """ from __future__ import annotations import os import json import math import time import statistics import datetime from typing import List, Optional, Dict, Any import matplotlib.pyplot as plt plt.rcParams['font.sans-serif'] = ['SimHei'] # 设置中文字体为黑体 plt.rcParams['axes.unicode_minus'] = False # 正常显示负号 try: import numpy as np except Exception: np = None try: import pandas as pd except Exception: pd = None try: import matplotlib matplotlib.use('Agg') import matplotlib.pyplot as plt except Exception: plt = None try: from docx import Document from docx.shared import Inches except Exception: Document = None class RequestRecord: """单条请求记录数据结构。""" def __init__(self, index: int, timestamp: float, status_code: int, latency_ms: float, response_size: Optional[int] = None, error: Optional[str] = None, extra: Optional[Dict] = None): self.index = index self.timestamp = timestamp # unix 时间戳,秒 self.status_code = status_code self.latency_ms = latency_ms self.response_size = response_size self.error = error self.extra = extra or {} def to_dict(self) -> Dict[str, Any]: return { 'index': self.index, 'timestamp': self.timestamp, 'datetime': datetime.datetime.fromtimestamp(self.timestamp).isoformat(sep=' ', timespec='seconds'), 'status_code': self.status_code, 'latency_ms': self.latency_ms, 'response_size': self.response_size, 'error': self.error, **(self.extra or {}) } class LoadTestReportGenerator: """压测报告生成器。将请求结果记录并生成中文报告。""" def __init__(self, test_name: str = '压测任务', report_title: Optional[str] = None): self.test_name = test_name self.report_title = report_title or f"{test_name} 报告" self.records: List[RequestRecord] = [] self._started_at: Optional[float] = None self._ended_at: Optional[float] = None # ---------- 记录方法 ---------- def record_result(self, index: int, timestamp: float, status_code: int, latency_ms: float, response_size: Optional[int] = None, error: Optional[str] = None, extra: Optional[Dict] = None): """记录单条请求结果。timestamp 为 unix 时间戳(秒)。""" rec = RequestRecord(index, timestamp, status_code, latency_ms, response_size, error, extra) self.records.append(rec) if self._started_at is None or timestamp < self._started_at: self._started_at = timestamp if self._ended_at is None or timestamp > self._ended_at: self._ended_at = timestamp def bulk_record(self, results: List[Dict[str, Any]]): """一次性批量导入结果。每一项是 dict,需包含 index,timestamp,status_code,latency_ms 等字段。""" for r in results: self.record_result( index=r.get('index', len(self.records) + 1), timestamp=r['timestamp'], status_code=int(r.get('status_code', 0)), latency_ms=float(r.get('latency_ms', 0)), response_size=r.get('response_size'), error=r.get('error'), extra=r.get('extra') ) # ---------- 统计方法 ---------- def compute_stats(self) -> Dict[str, Any]: if not self.records: return {} latencies = [r.latency_ms for r in self.records if r.latency_ms is not None] statuses = [r.status_code for r in self.records] errors = [r for r in self.records if r.error] total = len(self.records) success_count = sum(1 for s in statuses if 200 <= s < 300) fail_count = total - success_count duration = (self._ended_at - self._started_at) if (self._started_at and self._ended_at and self._ended_at > self._started_at) else None duration = float(duration) if duration else 0.0 throughput = (total / duration) if duration > 0 else 0.0 # 使用 numpy 计算分位数(如果可用),否则用纯 python if np: p50 = float(np.percentile(latencies, 50)) if latencies else 0 p90 = float(np.percentile(latencies, 90)) if latencies else 0 p95 = float(np.percentile(latencies, 95)) if latencies else 0 p99 = float(np.percentile(latencies, 99)) if latencies else 0 else: lat_sorted = sorted(latencies) def percentile(lst, q): if not lst: return 0 k = (len(lst)-1) * (q/100) f = math.floor(k) c = math.ceil(k) if f == c: return lst[int(k)] d0 = lst[int(f)] * (c-k) d1 = lst[int(c)] * (k-f) return d0 + d1 p50 = percentile(lat_sorted, 50) p90 = percentile(lat_sorted, 90) p95 = percentile(lat_sorted, 95) p99 = percentile(lat_sorted, 99) status_groups: Dict[int, int] = {} for s in statuses: status_groups[s] = status_groups.get(s, 0) + 1 # 每秒请求数(RPS)时间序列 rps_series = {} for r in self.records: sec = int(r.timestamp) rps_series[sec] = rps_series.get(sec, 0) + 1 # 简单错误聚合 error_summary: Dict[str, int] = {} for r in errors: key = r.error if r.error else f'status_{r.status_code}' error_summary[key] = error_summary.get(key, 0) + 1 stats = { 'total_requests': total, 'success_count': success_count, 'fail_count': fail_count, 'success_rate': success_count / total if total else 0, 'duration_seconds': duration, 'throughput_rps': throughput, 'latency_ms': { 'min': min(latencies) if latencies else 0, 'max': max(latencies) if latencies else 0, 'avg': statistics.mean(latencies) if latencies else 0, 'median': p50, 'p90': p90, 'p95': p95, 'p99': p99, }, 'status_groups': status_groups, 'error_summary': error_summary, 'rps_series': sorted(list(rps_series.items())), # [(sec, count), ...] } return stats # ---------- 报告输出 ---------- def _ensure_dir(self, path: str): if not os.path.exists(path): os.makedirs(path, exist_ok=True) def generate_report(self, output_dir: str = './load_test_report', formats: Optional[List[str]] = None): """ 生成报告。 formats: 列表,支持 'html','docx','json','csv'。 返回生成的文件路径字典。 """ formats = formats or ['html', 'json'] self._ensure_dir(output_dir) stats = self.compute_stats() timestamp_str = datetime.datetime.now().strftime('%Y%m%d_%H%M%S') base_name = f"{self.test_name.replace(' ', '_')}_{timestamp_str}" outputs = {} # 1) 输出 JSON 统计与明细 if 'json' in formats: json_path = os.path.join(output_dir, base_name + '.summary.json') with open(json_path, 'w', encoding='utf-8') as f: json.dump({'stats': stats, 'records': [r.to_dict() for r in self.records]}, f, ensure_ascii=False, indent=2) outputs['json'] = json_path # 2) 输出 CSV 明细(如果 pandas 可用则用 pandas,否则手写) if 'csv' in formats: csv_path = os.path.join(output_dir, base_name + '.details.csv') if pd: df = pd.DataFrame([r.to_dict() for r in self.records]) df.to_csv(csv_path, index=False, encoding='utf-8-sig') else: # 手动写入 import csv with open(csv_path, 'w', newline='', encoding='utf-8') as f: writer = csv.DictWriter(f, fieldnames=list(self.records[0].to_dict().keys())) writer.writeheader() for r in self.records: writer.writerow(r.to_dict()) outputs['csv'] = csv_path # 3) 生成图表(延迟分布与RPS),保存为 PNG charts = {} if plt: try: # 延迟直方图 latencies = [r.latency_ms for r in self.records if r.latency_ms is not None] if latencies: plt.figure() plt.hist(latencies, bins=50) plt.title('响应时间分布 (ms)') plt.xlabel('延迟 (ms)') plt.ylabel('请求数量') hist_path = os.path.join(output_dir, base_name + '_latency_hist.png') plt.tight_layout() plt.savefig(hist_path) plt.close() charts['latency_hist'] = hist_path # RPS 图 times = [int(r.timestamp) for r in self.records] if times: from collections import Counter cnt = Counter(times) xs = sorted(cnt.keys()) ys = [cnt[x] for x in xs] plt.figure() plt.plot(xs, ys) plt.title('每秒请求数 (RPS)') plt.xlabel('Unix 秒') plt.ylabel('请求数') rps_path = os.path.join(output_dir, base_name + '_rps.png') plt.tight_layout() plt.savefig(rps_path) plt.close() charts['rps'] = rps_path except Exception as e: print('生成图表时出错:', e) outputs['charts'] = charts # 4) 生成 HTML 报告 if 'html' in formats: html_path = os.path.join(output_dir, base_name + '.html') html_content = self._build_html_report(stats, charts) with open(html_path, 'w', encoding='utf-8') as f: f.write(html_content) outputs['html'] = html_path # 5) 生成 Word 报告(可选) if 'docx' in formats and Document: try: docx_path = os.path.join(output_dir, base_name + '.docx') self._build_docx_report(stats, charts, docx_path) outputs['docx'] = docx_path except Exception as e: print('生成 docx 报告时出错:', e) return outputs def _build_html_report(self, stats: Dict[str, Any], charts: Dict[str, str]) -> str: # 这里构建一份中文的 HTML 模板(简洁风格) title = self.report_title now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') summary_html = f"""

摘要

""" latency = stats.get('latency_ms', {}) latency_html = f"""

响应时间统计 (ms)

""" status_html = '

状态码分布

' error_html = '

错误汇总

' if stats.get('error_summary'): error_html += '' else: error_html += '

无错误记录

' charts_html = '

图表

' for name, path in charts.items(): charts_html += f'

{name}

{name}
' # 详细请求表(默认仅包含前 100 条,避免页面过大) details = [r.to_dict() for r in self.records[:100]] detail_rows = ''.join([f"{d['index']}{d['datetime']}{d['status_code']}{d['latency_ms']}{d['response_size']}{d['error'] or ''}" for d in details]) details_html = f"""

请求明细(仅显示前100条)

{detail_rows}
#时间状态码延迟(ms)响应大小错误
""" html = f""" {title}

{title}

{summary_html} {latency_html} {status_html} {error_html} {charts_html} {details_html}

注:如需查看所有请求明细,请下载同目录下的 CSV/JSON 文件。

""" # 若有图表,将图表文件拷贝到同目录(图表已保存在 output 目录),HTML 中用相对路径引用 basename return html def _build_docx_report(self, stats: Dict[str, Any], charts: Dict[str, str], docx_path: str): if not Document: raise RuntimeError('缺少 python-docx 库,无法生成 docx。') doc = Document() doc.add_heading(self.report_title, level=1) doc.add_paragraph(f"生成时间:{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") doc.add_heading('摘要', level=2) doc.add_paragraph(f"总请求数:{stats.get('total_requests',0)} 成功:{stats.get('success_count',0)} 失败:{stats.get('fail_count',0)} 成功率:{stats.get('success_rate',0):.2%}") doc.add_paragraph(f"总耗时(秒):{stats.get('duration_seconds',0):.2f} 平均吞吐(req/s):{stats.get('throughput_rps',0):.2f}") doc.add_heading('响应时间统计 (ms)', level=2) lat = stats.get('latency_ms', {}) doc.add_paragraph(f"最小:{lat.get('min',0):.2f} 最大:{lat.get('max',0):.2f} 平均:{lat.get('avg',0):.2f}") doc.add_paragraph(f"P50:{lat.get('median',0):.2f} P90:{lat.get('p90',0):.2f} P95:{lat.get('p95',0):.2f} P99:{lat.get('p99',0):.2f}") doc.add_heading('状态码分布', level=2) for k, v in stats.get('status_groups', {}).items(): doc.add_paragraph(f"{k}: {v}") doc.add_heading('错误汇总', level=2) if stats.get('error_summary'): for k, v in stats.get('error_summary', {}).items(): doc.add_paragraph(f"{k}: {v}") else: doc.add_paragraph('无错误记录') # 插入图表 for name, path in charts.items(): if os.path.exists(path): doc.add_heading(name, level=2) try: doc.add_picture(path, width=Inches(6)) except Exception: doc.add_paragraph(f'无法插入图片:{path}') # 附加前 100 条请求明细 doc.add_heading('请求明细(前100条)', level=2) table = doc.add_table(rows=1, cols=6) hdr_cells = table.rows[0].cells hdr_cells[0].text = '#' hdr_cells[1].text = '时间' hdr_cells[2].text = '状态码' hdr_cells[3].text = '延迟(ms)' hdr_cells[4].text = '响应大小' hdr_cells[5].text = '错误' for r in self.records[:100]: row_cells = table.add_row().cells d = r.to_dict() row_cells[0].text = str(d['index']) row_cells[1].text = d['datetime'] row_cells[2].text = str(d['status_code']) row_cells[3].text = f"{d['latency_ms']}" row_cells[4].text = str(d.get('response_size', '')) row_cells[5].text = d.get('error') or '' doc.save(docx_path) # ---------------- 使用示例 ---------------- if __name__ == '__main__': # 简单示例:模拟一些请求结果并生成报告 gen = LoadTestReportGenerator(test_name='示例压测', report_title='示例压测详细报告') now = time.time() # 模拟 100 条请求 import random for i in range(1, 101): ts = now + (i // 5) # 每秒 5 个请求(模拟) lat = max(1.0, random.gauss(200, 50)) status = 200 if random.random() > 0.05 else 500 err = None if status == 200 else '500 Internal Server Error' gen.record_result(i, ts, status, lat, response_size=random.randint(500, 5000), error=err) outputs = gen.generate_report('./example_report', formats=['html', 'json', 'csv', 'docx']) print('已生成报告:', outputs)