hyb
2025-12-31 6cdcd01f77e11b72c323603e27ebdb85b15223c9
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
import asyncio
import aiohttp
import traceback
import time
from Util.random_util import RandomUtil  # 保证该模块中提供了生成随机数字和随机日期的函数
from Util.dingtalk_helper import DingTalkHelper
from tqdm import tqdm  # 导入进度条库
 
# 钉钉机器人 access_token 和 secret
ACCESS_TOKEN = '4625f6690acd9347fae5b3a05af598be63e73d604b933a9b3902425b8f136d4d'
SECRET = 'SEC3b6937550bd297b5491855f6f40c2ff1b41bc8c495e118ba9848742b1ddf8f19'
 
# 请求URL和请求头
apiname = "我的动物"
url = "http://tsinghua.baoyizn.com:9906/api/escrow/animal/escrowAnimal/save"
headers = {
    "token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjE3NDUwMzExNjAsInVzZXJuYW1lIjoiZ2x5In0.zra2vwwvl-NQKN_tyNzRbiF3kzfUQovXBuSmEGcXPFY",
    "Content-Type": "application/json"
}
 
dingtalk_helper = DingTalkHelper(ACCESS_TOKEN, SECRET)
 
gender = "1"
mother_id = ""
mother_wholeCode = ""
father_id = ""
father_wholeCode = ""
algebra = 1
preCode = "性能测试"
 
# 生成请求体
def create_animal_data():
    random_code = RandomUtil.generate_random_number_string(0, 999999999)
    random_date = RandomUtil.generate_random_date("2023-01-01", "2025-03-25")
    return {
        "bac": {"id": ""},
        "cage": {"id": "", "code": ""},
        "code": random_code,
        "room": {"id": ""},
        "user": {
            "id": "1780483551554764801",
            "name": "hyb实验人员"
        },
        "color": "4",
        "ethic": {
            "id": "1790288720729997312",
            "apCode": "24- Hybktfzr23.G24-4"
        },
        "shelf": {
            "id": "",
            "variety": {"id": ""}
        },
        "father": {
            "id": father_id,
            "wholeCode": father_wholeCode
        },
        "gender": gender,
        "mother": {
            "id": mother_id,
            "wholeCode": mother_wholeCode
        },
        "strain": {
            "id": "1846491449918550017",
            "bac": {"id": "", "name": "转基因"},
            "name": "hyb无需鉴定基因型,求求勿动",
            "type": "1",
            "maxNum": 5,
            "geneIds": {"id": "1810583565757919233,1810583743961313281,1810583743961313281,1810583743961313281,1860972233362567169"},
            "variety": {
                "id": "",
                "name": "SPF级基因鼠(荧光蛋白标记小鼠)",
                "isChip": "1"
            }
        },
        "algebra": algebra,
        "preCode": preCode ,
        "variety": {"id": "1661975536839733250"},
        "weaning": "2",
        "chipCode": random_code,
        "genotype": "hyb管理员创建共享,求求勿动 +/-/Tg/WT,hyb管理员创建使用者为课题负责人共享,求求勿动 +/-/Homo/Heter/WT",
        "transfer": "0",
        "birthDate": random_date,
        "wholeCode": f"{preCode}-{random_code}",
        "algebraWay": "1",
        "animalStatus": "0",
        "escrowStatus": "1",
        "researchGroup": {"id": "1699404304041828353"}
    }
 
 
# 异步发送单个请求,并返回是否成功
async def create_animal(session: aiohttp.ClientSession, index: int, max_retries: int = 3):
    data = create_animal_data()
    attempt = 0
    while attempt < max_retries:
        try:
            async with session.post(url, json=data, headers=headers) as response:
                resp_text = await response.text()
                if response.status == 200:
                    print(f"[INFO] 第 {index} 个动物创建成功,响应:{resp_text}")
                    return True
                else:
                    print(f"[ERROR] 第 {index} 个动物创建失败,状态码:{response.status},响应:{resp_text}")
                    return False
        except Exception as e:
            attempt += 1
            status_code = getattr(e, 'status', 'N/A')
            error_trace = traceback.format_exc()
            full_log = (
                f"请求URL: {url}\n"
                f"请求头: {headers}\n"
                f"请求数据: {data}\n"
                f"状态码: {status_code}\n"
                f"异常信息: {str(e)}\n"
                f"堆栈跟踪:\n{error_trace}\n"
                f"重试次数: {attempt}/{max_retries}"
            )
            print(f"[EXCEPTION] 第 {index} 个动物请求异常:\n{full_log}\n")
            if attempt < max_retries:
                # 重试前等待 1 秒
                await asyncio.sleep(1)
            else:
                return False
 
 
# 批量异步创建动物信息,同时显示进度条
async def batch_create_animals(batch_size: int):
    tasks = []
    # 设置超时时间为60秒(可根据需要调整)
    timeout = aiohttp.ClientTimeout(total=60)
    async with aiohttp.ClientSession(timeout=timeout) as session:
        for i in range(1, batch_size + 1):
            task = asyncio.ensure_future(create_animal(session, i))
            tasks.append(task)
 
        success_count = 0
        # 使用 asyncio.as_completed 逐个等待任务完成,并更新进度条
        with tqdm(total=batch_size, desc="创建动物进度") as pbar:
            for finished in asyncio.as_completed(tasks):
                result = await finished
                if result:
                    success_count += 1
                pbar.update(1)
        summary = f"\n[SUMMARY] 成功创建 {success_count}/{batch_size} 只动物"
        print(summary)
        dingtalk_helper.send_message(
            f"你的批量创建{apiname}模块数据已完成:成功创建 {success_count}/{batch_size} 只动物")
 
 
# 主入口
if __name__ == '__main__':
    n = 10  # 可根据需要调整批量创建数量
    asyncio.run(batch_create_animals(n))