"""
|
压测报告生成器(中文报告)
|
|
依赖:
|
- numpy
|
- pandas (可选,用于更方便的表格导出)
|
- matplotlib (用于绘图)
|
- python-docx (可选,用于生成 Word 文档)
|
|
使用场景:在你的压测脚本中,逐条调用 `recorder.record_result(...)` 或者在压测结束后把结果一次性传入 `bulk_record`,
|
然后调用 `generate_report(output_dir, formats=['html','docx','json','csv'])`。
|
|
输出:HTML 报告(含统计与图表)、可选的 Word 报告、JSON/CSV 明细文件和图像文件。
|
|
"""
|
|
from __future__ import annotations
|
import os
|
import json
|
import math
|
import time
|
import statistics
|
import datetime
|
from typing import List, Optional, Dict, Any
|
import matplotlib.pyplot as plt
|
plt.rcParams['font.sans-serif'] = ['SimHei'] # 设置中文字体为黑体
|
plt.rcParams['axes.unicode_minus'] = False # 正常显示负号
|
|
try:
|
import numpy as np
|
except Exception:
|
np = None
|
|
try:
|
import pandas as pd
|
except Exception:
|
pd = None
|
|
try:
|
import matplotlib
|
matplotlib.use('Agg')
|
import matplotlib.pyplot as plt
|
except Exception:
|
plt = None
|
|
try:
|
from docx import Document
|
from docx.shared import Inches
|
except Exception:
|
Document = None
|
|
|
class RequestRecord:
|
"""单条请求记录数据结构。"""
|
|
def __init__(self, index: int, timestamp: float, status_code: int, latency_ms: float,
|
response_size: Optional[int] = None, error: Optional[str] = None, extra: Optional[Dict] = None):
|
self.index = index
|
self.timestamp = timestamp # unix 时间戳,秒
|
self.status_code = status_code
|
self.latency_ms = latency_ms
|
self.response_size = response_size
|
self.error = error
|
self.extra = extra or {}
|
|
def to_dict(self) -> Dict[str, Any]:
|
return {
|
'index': self.index,
|
'timestamp': self.timestamp,
|
'datetime': datetime.datetime.fromtimestamp(self.timestamp).isoformat(sep=' ', timespec='seconds'),
|
'status_code': self.status_code,
|
'latency_ms': self.latency_ms,
|
'response_size': self.response_size,
|
'error': self.error,
|
**(self.extra or {})
|
}
|
|
|
class LoadTestReportGenerator:
|
"""压测报告生成器。将请求结果记录并生成中文报告。"""
|
|
def __init__(self, test_name: str = '压测任务', report_title: Optional[str] = None):
|
self.test_name = test_name
|
self.report_title = report_title or f"{test_name} 报告"
|
self.records: List[RequestRecord] = []
|
self._started_at: Optional[float] = None
|
self._ended_at: Optional[float] = None
|
|
# ---------- 记录方法 ----------
|
def record_result(self, index: int, timestamp: float, status_code: int, latency_ms: float,
|
response_size: Optional[int] = None, error: Optional[str] = None, extra: Optional[Dict] = None):
|
"""记录单条请求结果。timestamp 为 unix 时间戳(秒)。"""
|
rec = RequestRecord(index, timestamp, status_code, latency_ms, response_size, error, extra)
|
self.records.append(rec)
|
if self._started_at is None or timestamp < self._started_at:
|
self._started_at = timestamp
|
if self._ended_at is None or timestamp > self._ended_at:
|
self._ended_at = timestamp
|
|
def bulk_record(self, results: List[Dict[str, Any]]):
|
"""一次性批量导入结果。每一项是 dict,需包含 index,timestamp,status_code,latency_ms 等字段。"""
|
for r in results:
|
self.record_result(
|
index=r.get('index', len(self.records) + 1),
|
timestamp=r['timestamp'],
|
status_code=int(r.get('status_code', 0)),
|
latency_ms=float(r.get('latency_ms', 0)),
|
response_size=r.get('response_size'),
|
error=r.get('error'),
|
extra=r.get('extra')
|
)
|
|
# ---------- 统计方法 ----------
|
def compute_stats(self) -> Dict[str, Any]:
|
if not self.records:
|
return {}
|
|
latencies = [r.latency_ms for r in self.records if r.latency_ms is not None]
|
statuses = [r.status_code for r in self.records]
|
errors = [r for r in self.records if r.error]
|
|
total = len(self.records)
|
success_count = sum(1 for s in statuses if 200 <= s < 300)
|
fail_count = total - success_count
|
|
duration = (self._ended_at - self._started_at) if (self._started_at and self._ended_at and self._ended_at > self._started_at) else None
|
duration = float(duration) if duration else 0.0
|
|
throughput = (total / duration) if duration > 0 else 0.0
|
|
# 使用 numpy 计算分位数(如果可用),否则用纯 python
|
if np:
|
p50 = float(np.percentile(latencies, 50)) if latencies else 0
|
p90 = float(np.percentile(latencies, 90)) if latencies else 0
|
p95 = float(np.percentile(latencies, 95)) if latencies else 0
|
p99 = float(np.percentile(latencies, 99)) if latencies else 0
|
else:
|
lat_sorted = sorted(latencies)
|
def percentile(lst, q):
|
if not lst:
|
return 0
|
k = (len(lst)-1) * (q/100)
|
f = math.floor(k)
|
c = math.ceil(k)
|
if f == c:
|
return lst[int(k)]
|
d0 = lst[int(f)] * (c-k)
|
d1 = lst[int(c)] * (k-f)
|
return d0 + d1
|
p50 = percentile(lat_sorted, 50)
|
p90 = percentile(lat_sorted, 90)
|
p95 = percentile(lat_sorted, 95)
|
p99 = percentile(lat_sorted, 99)
|
|
status_groups: Dict[int, int] = {}
|
for s in statuses:
|
status_groups[s] = status_groups.get(s, 0) + 1
|
|
# 每秒请求数(RPS)时间序列
|
rps_series = {}
|
for r in self.records:
|
sec = int(r.timestamp)
|
rps_series[sec] = rps_series.get(sec, 0) + 1
|
|
# 简单错误聚合
|
error_summary: Dict[str, int] = {}
|
for r in errors:
|
key = r.error if r.error else f'status_{r.status_code}'
|
error_summary[key] = error_summary.get(key, 0) + 1
|
|
stats = {
|
'total_requests': total,
|
'success_count': success_count,
|
'fail_count': fail_count,
|
'success_rate': success_count / total if total else 0,
|
'duration_seconds': duration,
|
'throughput_rps': throughput,
|
'latency_ms': {
|
'min': min(latencies) if latencies else 0,
|
'max': max(latencies) if latencies else 0,
|
'avg': statistics.mean(latencies) if latencies else 0,
|
'median': p50,
|
'p90': p90,
|
'p95': p95,
|
'p99': p99,
|
},
|
'status_groups': status_groups,
|
'error_summary': error_summary,
|
'rps_series': sorted(list(rps_series.items())), # [(sec, count), ...]
|
}
|
return stats
|
|
# ---------- 报告输出 ----------
|
def _ensure_dir(self, path: str):
|
if not os.path.exists(path):
|
os.makedirs(path, exist_ok=True)
|
|
def generate_report(self, output_dir: str = './load_test_report', formats: Optional[List[str]] = None):
|
"""
|
生成报告。
|
formats: 列表,支持 'html','docx','json','csv'。
|
返回生成的文件路径字典。
|
"""
|
formats = formats or ['html', 'json']
|
self._ensure_dir(output_dir)
|
|
stats = self.compute_stats()
|
timestamp_str = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
|
base_name = f"{self.test_name.replace(' ', '_')}_{timestamp_str}"
|
|
outputs = {}
|
|
# 1) 输出 JSON 统计与明细
|
if 'json' in formats:
|
json_path = os.path.join(output_dir, base_name + '.summary.json')
|
with open(json_path, 'w', encoding='utf-8') as f:
|
json.dump({'stats': stats, 'records': [r.to_dict() for r in self.records]}, f, ensure_ascii=False, indent=2)
|
outputs['json'] = json_path
|
|
# 2) 输出 CSV 明细(如果 pandas 可用则用 pandas,否则手写)
|
if 'csv' in formats:
|
csv_path = os.path.join(output_dir, base_name + '.details.csv')
|
if pd:
|
df = pd.DataFrame([r.to_dict() for r in self.records])
|
df.to_csv(csv_path, index=False, encoding='utf-8-sig')
|
else:
|
# 手动写入
|
import csv
|
with open(csv_path, 'w', newline='', encoding='utf-8') as f:
|
writer = csv.DictWriter(f, fieldnames=list(self.records[0].to_dict().keys()))
|
writer.writeheader()
|
for r in self.records:
|
writer.writerow(r.to_dict())
|
outputs['csv'] = csv_path
|
|
# 3) 生成图表(延迟分布与RPS),保存为 PNG
|
charts = {}
|
if plt:
|
try:
|
# 延迟直方图
|
latencies = [r.latency_ms for r in self.records if r.latency_ms is not None]
|
if latencies:
|
plt.figure()
|
plt.hist(latencies, bins=50)
|
plt.title('响应时间分布 (ms)')
|
plt.xlabel('延迟 (ms)')
|
plt.ylabel('请求数量')
|
hist_path = os.path.join(output_dir, base_name + '_latency_hist.png')
|
plt.tight_layout()
|
plt.savefig(hist_path)
|
plt.close()
|
charts['latency_hist'] = hist_path
|
|
# RPS 图
|
times = [int(r.timestamp) for r in self.records]
|
if times:
|
from collections import Counter
|
cnt = Counter(times)
|
xs = sorted(cnt.keys())
|
ys = [cnt[x] for x in xs]
|
plt.figure()
|
plt.plot(xs, ys)
|
plt.title('每秒请求数 (RPS)')
|
plt.xlabel('Unix 秒')
|
plt.ylabel('请求数')
|
rps_path = os.path.join(output_dir, base_name + '_rps.png')
|
plt.tight_layout()
|
plt.savefig(rps_path)
|
plt.close()
|
charts['rps'] = rps_path
|
except Exception as e:
|
print('生成图表时出错:', e)
|
|
outputs['charts'] = charts
|
|
# 4) 生成 HTML 报告
|
if 'html' in formats:
|
html_path = os.path.join(output_dir, base_name + '.html')
|
html_content = self._build_html_report(stats, charts)
|
with open(html_path, 'w', encoding='utf-8') as f:
|
f.write(html_content)
|
outputs['html'] = html_path
|
|
# 5) 生成 Word 报告(可选)
|
if 'docx' in formats and Document:
|
try:
|
docx_path = os.path.join(output_dir, base_name + '.docx')
|
self._build_docx_report(stats, charts, docx_path)
|
outputs['docx'] = docx_path
|
except Exception as e:
|
print('生成 docx 报告时出错:', e)
|
|
return outputs
|
|
def _build_html_report(self, stats: Dict[str, Any], charts: Dict[str, str]) -> str:
|
# 这里构建一份中文的 HTML 模板(简洁风格)
|
title = self.report_title
|
now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
summary_html = f"""
|
<h2>摘要</h2>
|
<ul>
|
<li>报告名称:{title}</li>
|
<li>生成时间:{now}</li>
|
<li>总请求数:{stats.get('total_requests', 0)}</li>
|
<li>成功数:{stats.get('success_count',0)},失败数:{stats.get('fail_count',0)},成功率:{stats.get('success_rate',0):.2%}</li>
|
<li>总耗时(秒):{stats.get('duration_seconds',0):.2f}</li>
|
<li>平均吞吐(req/s):{stats.get('throughput_rps',0):.2f}</li>
|
</ul>
|
"""
|
|
latency = stats.get('latency_ms', {})
|
latency_html = f"""
|
<h2>响应时间统计 (ms)</h2>
|
<ul>
|
<li>最小:{latency.get('min',0):.2f}</li>
|
<li>最大:{latency.get('max',0):.2f}</li>
|
<li>平均:{latency.get('avg',0):.2f}</li>
|
<li>中位数(P50):{latency.get('median',0):.2f}</li>
|
<li>P90:{latency.get('p90',0):.2f},P95:{latency.get('p95',0):.2f},P99:{latency.get('p99',0):.2f}</li>
|
</ul>
|
"""
|
|
status_html = '<h2>状态码分布</h2><ul>'
|
for k, v in stats.get('status_groups', {}).items():
|
status_html += f'<li>{k}: {v}</li>'
|
status_html += '</ul>'
|
|
error_html = '<h2>错误汇总</h2>'
|
if stats.get('error_summary'):
|
error_html += '<ul>'
|
for k, v in stats.get('error_summary', {}).items():
|
error_html += f'<li>{k}: {v}</li>'
|
error_html += '</ul>'
|
else:
|
error_html += '<p>无错误记录</p>'
|
|
charts_html = '<h2>图表</h2>'
|
for name, path in charts.items():
|
charts_html += f'<div><h3>{name}</h3><img src="{os.path.basename(path)}" alt="{name}" style="max-width:100%;height:auto;"/></div>'
|
|
# 详细请求表(默认仅包含前 100 条,避免页面过大)
|
details = [r.to_dict() for r in self.records[:100]]
|
detail_rows = ''.join([f"<tr><td>{d['index']}</td><td>{d['datetime']}</td><td>{d['status_code']}</td><td>{d['latency_ms']}</td><td>{d['response_size']}</td><td>{d['error'] or ''}</td></tr>" for d in details])
|
details_html = f"""
|
<h2>请求明细(仅显示前100条)</h2>
|
<table border="1" cellpadding="4" cellspacing="0">
|
<tr><th>#</th><th>时间</th><th>状态码</th><th>延迟(ms)</th><th>响应大小</th><th>错误</th></tr>
|
{detail_rows}
|
</table>
|
"""
|
|
html = f"""
|
<!doctype html>
|
<html lang="zh-CN">
|
<head>
|
<meta charset="utf-8">
|
<title>{title}</title>
|
<style>
|
body{{font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, 'Helvetica Neue', Arial; padding:20px}}
|
h2{{color:#2c3e50}}
|
table{{border-collapse:collapse; width:100%}}
|
th,td{{padding:6px; text-align:left}}
|
</style>
|
</head>
|
<body>
|
<h1>{title}</h1>
|
{summary_html}
|
{latency_html}
|
{status_html}
|
{error_html}
|
{charts_html}
|
{details_html}
|
<p>注:如需查看所有请求明细,请下载同目录下的 CSV/JSON 文件。</p>
|
</body>
|
</html>
|
"""
|
|
# 若有图表,将图表文件拷贝到同目录(图表已保存在 output 目录),HTML 中用相对路径引用 basename
|
return html
|
|
def _build_docx_report(self, stats: Dict[str, Any], charts: Dict[str, str], docx_path: str):
|
if not Document:
|
raise RuntimeError('缺少 python-docx 库,无法生成 docx。')
|
doc = Document()
|
doc.add_heading(self.report_title, level=1)
|
doc.add_paragraph(f"生成时间:{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
|
|
doc.add_heading('摘要', level=2)
|
doc.add_paragraph(f"总请求数:{stats.get('total_requests',0)} 成功:{stats.get('success_count',0)} 失败:{stats.get('fail_count',0)} 成功率:{stats.get('success_rate',0):.2%}")
|
doc.add_paragraph(f"总耗时(秒):{stats.get('duration_seconds',0):.2f} 平均吞吐(req/s):{stats.get('throughput_rps',0):.2f}")
|
|
doc.add_heading('响应时间统计 (ms)', level=2)
|
lat = stats.get('latency_ms', {})
|
doc.add_paragraph(f"最小:{lat.get('min',0):.2f} 最大:{lat.get('max',0):.2f} 平均:{lat.get('avg',0):.2f}")
|
doc.add_paragraph(f"P50:{lat.get('median',0):.2f} P90:{lat.get('p90',0):.2f} P95:{lat.get('p95',0):.2f} P99:{lat.get('p99',0):.2f}")
|
|
doc.add_heading('状态码分布', level=2)
|
for k, v in stats.get('status_groups', {}).items():
|
doc.add_paragraph(f"{k}: {v}")
|
|
doc.add_heading('错误汇总', level=2)
|
if stats.get('error_summary'):
|
for k, v in stats.get('error_summary', {}).items():
|
doc.add_paragraph(f"{k}: {v}")
|
else:
|
doc.add_paragraph('无错误记录')
|
|
# 插入图表
|
for name, path in charts.items():
|
if os.path.exists(path):
|
doc.add_heading(name, level=2)
|
try:
|
doc.add_picture(path, width=Inches(6))
|
except Exception:
|
doc.add_paragraph(f'无法插入图片:{path}')
|
|
# 附加前 100 条请求明细
|
doc.add_heading('请求明细(前100条)', level=2)
|
table = doc.add_table(rows=1, cols=6)
|
hdr_cells = table.rows[0].cells
|
hdr_cells[0].text = '#'
|
hdr_cells[1].text = '时间'
|
hdr_cells[2].text = '状态码'
|
hdr_cells[3].text = '延迟(ms)'
|
hdr_cells[4].text = '响应大小'
|
hdr_cells[5].text = '错误'
|
|
for r in self.records[:100]:
|
row_cells = table.add_row().cells
|
d = r.to_dict()
|
row_cells[0].text = str(d['index'])
|
row_cells[1].text = d['datetime']
|
row_cells[2].text = str(d['status_code'])
|
row_cells[3].text = f"{d['latency_ms']}"
|
row_cells[4].text = str(d.get('response_size', ''))
|
row_cells[5].text = d.get('error') or ''
|
|
doc.save(docx_path)
|
|
|
# ---------------- 使用示例 ----------------
|
if __name__ == '__main__':
|
# 简单示例:模拟一些请求结果并生成报告
|
gen = LoadTestReportGenerator(test_name='示例压测', report_title='示例压测详细报告')
|
now = time.time()
|
# 模拟 100 条请求
|
import random
|
for i in range(1, 101):
|
ts = now + (i // 5) # 每秒 5 个请求(模拟)
|
lat = max(1.0, random.gauss(200, 50))
|
status = 200 if random.random() > 0.05 else 500
|
err = None if status == 200 else '500 Internal Server Error'
|
gen.record_result(i, ts, status, lat, response_size=random.randint(500, 5000), error=err)
|
|
outputs = gen.generate_report('./example_report', formats=['html', 'json', 'csv', 'docx'])
|
print('已生成报告:', outputs)
|