import sys
|
import os
|
import asyncio
|
import aiohttp
|
import traceback
|
from tqdm import tqdm # 导入进度条库
|
import time
|
|
def get_parent_directory(file_path, levels=1):
|
"""获取指定层级的父目录"""
|
path = os.path.abspath(file_path)
|
for _ in range(levels):
|
path = os.path.dirname(path)
|
return path
|
parent_dir = get_parent_directory(__file__, 5) # 获取上五级目录
|
sys.path.append(parent_dir)
|
|
from 测试组.脚本.造数脚本2.Util.random_util import RandomUtil
|
from 测试组.脚本.造数脚本2.Util.dingtalk_helper import DingTalkHelper
|
|
|
# 钉钉机器人 access_token 和 secret
|
ACCESS_TOKEN = '4625f6690acd9347fae5b3a05af598be63e73d604b933a9b3902425b8f136d4d'
|
SECRET = 'SEC3b6937550bd297b5491855f6f40c2ff1b41bc8c495e118ba9848742b1ddf8f19'
|
|
# 请求URL和请求头
|
apiname = "设备维保通知"
|
url = "http://192.168.6.168:5537/api/notify/save"
|
headers = {
|
"token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjE3NjA2MDczMTAsInVzZXJuYW1lIjoiZ2x5In0.3DC4-4oUMuIpSDT-dYn8bv9JvuPadIssUQfIKAnjAL4",
|
"Content-Type": "application/json"
|
}
|
|
dingtalk_helper = DingTalkHelper(ACCESS_TOKEN, SECRET)
|
|
gender = "1"
|
mother_id = ""
|
mother_wholeCode = ""
|
father_id = ""
|
father_wholeCode = ""
|
algebra = 1
|
preCode = "性能测试"
|
|
# 生成请求体
|
def create_animal_data():
|
random_code = RandomUtil.generate_random_number_string(0, 999999999)
|
# random_date = RandomUtil.generate_random_date("2023-01-01", "2025-03-25")
|
return {
|
"id": "",
|
"type": "6",
|
"title": f"压测中{random_code}",
|
"content": (
|
"压测中" * 500
|
),
|
"files": "/userfiles/1588133301094375425/程序附件/notify/notify/2025/10/15/173933/cs.jpg",
|
"status": "1",
|
"notifyRecordIds": "",
|
"isNotice": "0",
|
"receivingRange": "1"
|
}
|
|
|
# 异步发送单个请求,并返回是否成功
|
async def create_animal(session: aiohttp.ClientSession, index: int, max_retries: int = 3):
|
data = create_animal_data()
|
attempt = 0
|
while attempt < max_retries:
|
try:
|
async with session.post(url, json=data, headers=headers) as response:
|
resp_text = await response.text()
|
if response.status == 200:
|
print(f"[INFO] 第 {index} 个{apiname}创建成功,响应:{resp_text}")
|
return True
|
else:
|
print(f"[ERROR] 第 {index} 个{apiname}创建失败,状态码:{response.status},响应:{resp_text}")
|
return False
|
except Exception as e:
|
attempt += 1
|
status_code = getattr(e, 'status', 'N/A')
|
error_trace = traceback.format_exc()
|
full_log = (
|
f"请求URL: {url}\n"
|
f"请求头: {headers}\n"
|
f"请求数据: {data}\n"
|
f"状态码: {status_code}\n"
|
f"异常信息: {str(e)}\n"
|
f"堆栈跟踪:\n{error_trace}\n"
|
f"重试次数: {attempt}/{max_retries}"
|
)
|
print(f"[EXCEPTION] 第 {index} 个{apiname}请求异常:\n{full_log}\n")
|
if attempt < max_retries:
|
# 重试前等待 1 秒
|
await asyncio.sleep(1)
|
else:
|
return False
|
|
|
# 批量异步创建信息,同时显示进度条
|
async def batch_create_animals(batch_size: int):
|
tasks = []
|
# 设置超时时间为60秒(可根据需要调整)
|
timeout = aiohttp.ClientTimeout(total=60)
|
async with aiohttp.ClientSession(timeout=timeout) as session:
|
for i in range(1, batch_size + 1):
|
task = asyncio.ensure_future(create_animal(session, i))
|
tasks.append(task)
|
|
success_count = 0
|
# 使用 asyncio.as_completed 逐个等待任务完成,并更新进度条
|
with tqdm(total=batch_size, desc="创建进度") as pbar:
|
for finished in asyncio.as_completed(tasks):
|
result = await finished
|
if result:
|
success_count += 1
|
pbar.update(1)
|
summary = f"\n[SUMMARY] 成功创建 {success_count}/{batch_size} 个{apiname}"
|
print(summary)
|
dingtalk_helper.send_message(
|
f"你的批量创建{apiname}模块数据已完成:成功创建 {success_count}/{batch_size} 个{apiname},可前往系统查看!")
|
|
|
# 主入口
|
if __name__ == '__main__':
|
n = 100000 # 可根据需要调整批量创建数量
|
asyncio.run(batch_create_animals(n))
|