import sys
|
import os
|
import asyncio
|
import aiohttp
|
import traceback
|
import time
|
from tqdm import tqdm
|
import logging
|
|
def get_parent_directory(file_path, levels=1):
|
"""获取指定层级的父目录"""
|
path = os.path.abspath(file_path)
|
for _ in range(levels):
|
path = os.path.dirname(path)
|
return path
|
parent_dir = get_parent_directory(__file__, 5) # 获取上五级目录
|
sys.path.append(parent_dir)
|
|
from 测试组.脚本.造数脚本2.Util.random_util import RandomUtil
|
from 测试组.脚本.造数脚本2.Util.dingtalk_helper import DingTalkHelper
|
|
|
# 钉钉机器人 access_token 和 secret
|
ACCESS_TOKEN = '4625f6690acd9347fae5b3a05af598be63e73d604b933a9b3902425b8f136d4d'
|
SECRET = 'SEC3b6937550bd297b5491855f6f40c2ff1b41bc8c495e118ba9848742b1ddf8f19'
|
|
apiname = "设备维保通知"
|
url = "http://192.168.6.168:5537/api/notify/save"
|
headers = {
|
"token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjE3NjA2MDczMTAsInVzZXJuYW1lIjoiZ2x5In0.3DC4-4oUMuIpSDT-dYn8bv9JvuPadIssUQfIKAnjAL4",
|
"Content-Type": "application/json"
|
}
|
|
# 并发 worker 数(控制并发请求数),根据本机性能与被测服务能力调整,例如 50、100
|
NUM_WORKERS = 100
|
|
# 总请求数(不要一开始就设过高)
|
TOTAL_REQUESTS = 2000000
|
|
# 每个请求最大重试次数
|
MAX_RETRIES = 3
|
|
# aiohttp 连接/超时设置
|
REQUEST_TIMEOUT = 60 # 秒
|
# ------------------------
|
|
dingtalk_helper = DingTalkHelper(ACCESS_TOKEN, SECRET)
|
|
# 预生成不变的大 body 字段,减少每次构造开销
|
LARGE_CONTENT = "压测中" * 500
|
FILES_PATH = "/userfiles/1588133301094375425/程序附件/notify/notify/2025/10/15/173933/cs.jpg"
|
|
|
def create_animal_data(idx: int):
|
random_code = RandomUtil.generate_random_number_string(0, 999999999)
|
return {
|
"id": "",
|
"type": "6",
|
"title": f"压测中{random_code}",
|
"content": LARGE_CONTENT,
|
"files": FILES_PATH,
|
"status": "1",
|
"notifyRecordIds": "",
|
"isNotice": "0",
|
"receivingRange": "1"
|
}
|
|
|
async def create_animal(session: aiohttp.ClientSession, index: int, max_retries: int = MAX_RETRIES):
|
"""
|
单个请求,包含重试。返回 (success: bool, index: int, errmsg: str|None)
|
"""
|
attempt = 0
|
while attempt < max_retries:
|
data = create_animal_data(index)
|
try:
|
async with session.post(url, json=data, headers=headers) as resp:
|
text = await resp.text()
|
if resp.status == 200:
|
# 成功不频繁打印,交给外层进度统计
|
return True, index, None
|
else:
|
# 非 200 当成失败(可自定义对特定状态的判定)
|
errmsg = f"状态码 {resp.status}, body: {text}"
|
attempt += 1
|
# 指数退避,避免瞬时打爆目标
|
await asyncio.sleep(min(10, 2 ** attempt))
|
except Exception as e:
|
# 记录简短异常信息(避免每次打印超长堆栈)
|
err_msg = f"{type(e).__name__}: {str(e)}"
|
attempt += 1
|
await asyncio.sleep(min(10, 2 ** attempt))
|
# 走到这里表示重试失败
|
return False, index, f"重试 {max_retries} 次后失败"
|
|
|
async def worker(name: int, queue: asyncio.Queue, session: aiohttp.ClientSession,
|
pbar: tqdm, success_counter: dict, failed_list: list, lock: asyncio.Lock):
|
"""
|
worker 从队列获取任务并执行 create_animal
|
"""
|
while True:
|
idx = await queue.get()
|
if idx is None:
|
queue.task_done()
|
break
|
try:
|
ok, index, errmsg = await create_animal(session, idx)
|
async with lock:
|
if ok:
|
success_counter["count"] += 1
|
else:
|
failed_list.append((index, errmsg))
|
pbar.update(1)
|
except Exception as e:
|
# 捕获意外错误,记录并继续
|
async with lock:
|
failed_list.append((idx, f"Worker异常: {type(e).__name__}:{e}"))
|
pbar.update(1)
|
finally:
|
queue.task_done()
|
|
|
async def batch_create_animals(total: int, num_workers: int):
|
timeout = aiohttp.ClientTimeout(total=REQUEST_TIMEOUT)
|
connector = aiohttp.TCPConnector(limit=num_workers, limit_per_host=num_workers, force_close=False)
|
# 减少session请求头每次设置的开销,也可在 post 中单独传 headers
|
async with aiohttp.ClientSession(timeout=timeout, connector=connector) as session:
|
queue = asyncio.Queue()
|
for i in range(1, total + 1):
|
await queue.put(i)
|
|
# 结束信号
|
for _ in range(num_workers):
|
await queue.put(None)
|
|
success_counter = {"count": 0}
|
failed_list = []
|
lock = asyncio.Lock()
|
|
# tqdm 在主线程创建
|
with tqdm(total=total, desc="创建进度") as pbar:
|
workers = [
|
asyncio.create_task(worker(i, queue, session, pbar, success_counter, failed_list, lock))
|
for i in range(num_workers)
|
]
|
# 等待所有 worker 完成
|
await asyncio.gather(*workers)
|
|
# 汇总
|
total_success = success_counter["count"]
|
total_failed = len(failed_list)
|
summary = f"\n[SUMMARY] 成功创建 {total_success}/{total} 个 {apiname},失败 {total_failed} 个"
|
print(summary)
|
|
# 可只发送简短信息到钉钉(避免太长)
|
dingtalk_helper.send_message(
|
f"你的批量创建{apiname}模块数据已完成:成功创建 {total_success}/{total} 个,失败 {total_failed} 个(详见日志)。"
|
)
|
|
# 如果需要打印失败详情,这里只打印前 50 条,避免控制台炸屏
|
if total_failed:
|
print("失败示例(最多显示50条):")
|
for idx, err in failed_list[:50]:
|
print(f" #{idx} => {err}")
|
|
|
if __name__ == '__main__':
|
# 运行前建议:
|
# 1) 先用小的总数(例如 100 或 1000)跑通;
|
# 2) 观察目标服务器与本机负载,再把 TOTAL_REQUESTS 调高;
|
# 3) 调整 NUM_WORKERS 与 REQUEST_TIMEOUT 以获得最佳吞吐/稳定性。
|
NUM_WORKERS = NUM_WORKERS
|
TOTAL_REQUESTS = TOTAL_REQUESTS
|
|
# 简单日志设置(只打印 WARNING 及以上,避免太多日志)
|
logging.basicConfig(level=logging.WARNING, format='%(asctime)s %(levelname)s %(message)s')
|
|
asyncio.run(batch_create_animals(TOTAL_REQUESTS, NUM_WORKERS))
|