import asyncio
|
import aiohttp
|
import traceback
|
import time
|
from Util.random_util import RandomUtil # 保证该模块中提供了生成随机数字和随机日期的函数
|
from Util.dingtalk_helper import DingTalkHelper
|
from tqdm import tqdm # 导入进度条库
|
|
# 钉钉机器人 access_token 和 secret
|
ACCESS_TOKEN = '4625f6690acd9347fae5b3a05af598be63e73d604b933a9b3902425b8f136d4d'
|
SECRET = 'SEC3b6937550bd297b5491855f6f40c2ff1b41bc8c495e118ba9848742b1ddf8f19'
|
|
# 请求URL和请求头
|
apiname = "我的动物"
|
url = "http://tsinghua.baoyizn.com:9906/api/escrow/animal/escrowAnimal/save"
|
headers = {
|
"token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjE3NDUwMzExNjAsInVzZXJuYW1lIjoiZ2x5In0.zra2vwwvl-NQKN_tyNzRbiF3kzfUQovXBuSmEGcXPFY",
|
"Content-Type": "application/json"
|
}
|
|
dingtalk_helper = DingTalkHelper(ACCESS_TOKEN, SECRET)
|
|
gender = "1"
|
mother_id = ""
|
mother_wholeCode = ""
|
father_id = ""
|
father_wholeCode = ""
|
algebra = 1
|
preCode = "性能测试"
|
|
# 生成请求体
|
def create_animal_data():
|
random_code = RandomUtil.generate_random_number_string(0, 999999999)
|
random_date = RandomUtil.generate_random_date("2023-01-01", "2025-03-25")
|
return {
|
"bac": {"id": ""},
|
"cage": {"id": "", "code": ""},
|
"code": random_code,
|
"room": {"id": ""},
|
"user": {
|
"id": "1780483551554764801",
|
"name": "hyb实验人员"
|
},
|
"color": "4",
|
"ethic": {
|
"id": "1790288720729997312",
|
"apCode": "24- Hybktfzr23.G24-4"
|
},
|
"shelf": {
|
"id": "",
|
"variety": {"id": ""}
|
},
|
"father": {
|
"id": father_id,
|
"wholeCode": father_wholeCode
|
},
|
"gender": gender,
|
"mother": {
|
"id": mother_id,
|
"wholeCode": mother_wholeCode
|
},
|
"strain": {
|
"id": "1846491449918550017",
|
"bac": {"id": "", "name": "转基因"},
|
"name": "hyb无需鉴定基因型,求求勿动",
|
"type": "1",
|
"maxNum": 5,
|
"geneIds": {"id": "1810583565757919233,1810583743961313281,1810583743961313281,1810583743961313281,1860972233362567169"},
|
"variety": {
|
"id": "",
|
"name": "SPF级基因鼠(荧光蛋白标记小鼠)",
|
"isChip": "1"
|
}
|
},
|
"algebra": algebra,
|
"preCode": preCode ,
|
"variety": {"id": "1661975536839733250"},
|
"weaning": "2",
|
"chipCode": random_code,
|
"genotype": "hyb管理员创建共享,求求勿动 +/-/Tg/WT,hyb管理员创建使用者为课题负责人共享,求求勿动 +/-/Homo/Heter/WT",
|
"transfer": "0",
|
"birthDate": random_date,
|
"wholeCode": f"{preCode}-{random_code}",
|
"algebraWay": "1",
|
"animalStatus": "0",
|
"escrowStatus": "1",
|
"researchGroup": {"id": "1699404304041828353"}
|
}
|
|
|
# 异步发送单个请求,并返回是否成功
|
async def create_animal(session: aiohttp.ClientSession, index: int, max_retries: int = 3):
|
data = create_animal_data()
|
attempt = 0
|
while attempt < max_retries:
|
try:
|
async with session.post(url, json=data, headers=headers) as response:
|
resp_text = await response.text()
|
if response.status == 200:
|
print(f"[INFO] 第 {index} 个动物创建成功,响应:{resp_text}")
|
return True
|
else:
|
print(f"[ERROR] 第 {index} 个动物创建失败,状态码:{response.status},响应:{resp_text}")
|
return False
|
except Exception as e:
|
attempt += 1
|
status_code = getattr(e, 'status', 'N/A')
|
error_trace = traceback.format_exc()
|
full_log = (
|
f"请求URL: {url}\n"
|
f"请求头: {headers}\n"
|
f"请求数据: {data}\n"
|
f"状态码: {status_code}\n"
|
f"异常信息: {str(e)}\n"
|
f"堆栈跟踪:\n{error_trace}\n"
|
f"重试次数: {attempt}/{max_retries}"
|
)
|
print(f"[EXCEPTION] 第 {index} 个动物请求异常:\n{full_log}\n")
|
if attempt < max_retries:
|
# 重试前等待 1 秒
|
await asyncio.sleep(1)
|
else:
|
return False
|
|
|
# 批量异步创建动物信息,同时显示进度条
|
async def batch_create_animals(batch_size: int):
|
tasks = []
|
# 设置超时时间为60秒(可根据需要调整)
|
timeout = aiohttp.ClientTimeout(total=60)
|
async with aiohttp.ClientSession(timeout=timeout) as session:
|
for i in range(1, batch_size + 1):
|
task = asyncio.ensure_future(create_animal(session, i))
|
tasks.append(task)
|
|
success_count = 0
|
# 使用 asyncio.as_completed 逐个等待任务完成,并更新进度条
|
with tqdm(total=batch_size, desc="创建动物进度") as pbar:
|
for finished in asyncio.as_completed(tasks):
|
result = await finished
|
if result:
|
success_count += 1
|
pbar.update(1)
|
summary = f"\n[SUMMARY] 成功创建 {success_count}/{batch_size} 只动物"
|
print(summary)
|
dingtalk_helper.send_message(
|
f"你的批量创建{apiname}模块数据已完成:成功创建 {success_count}/{batch_size} 只动物")
|
|
|
# 主入口
|
if __name__ == '__main__':
|
n = 10 # 可根据需要调整批量创建数量
|
asyncio.run(batch_create_animals(n))
|