import time
|
import hmac
|
import hashlib
|
import base64
|
import urllib.parse
|
import requests
|
import json
|
|
class DingTalkHelper:
|
def __init__(self, access_token, secret):
|
self.access_token = access_token
|
self.secret = secret
|
|
def calculate_sign(self):
|
# 获取当前时间戳,单位为毫秒
|
timestamp = str(round(time.time() * 1000))
|
|
# 使用 HMAC SHA256 加密算法生成签名
|
string_to_sign = '{}\n{}'.format(timestamp, self.secret)
|
string_to_sign_enc = string_to_sign.encode('utf-8')
|
secret_enc = self.secret.encode('utf-8')
|
|
# 计算 HMAC 值
|
hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
|
|
# 使用 base64 编码并对 URL 进行编码
|
sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
|
|
return timestamp, sign
|
|
def send_markdown(self, title, text, is_at_all=False):
|
timestamp, sign = self.calculate_sign()
|
dingtalk_webhook = f'https://oapi.dingtalk.com/robot/send?access_token={self.access_token}×tamp={timestamp}&sign={sign}'
|
|
payload = {
|
"msgtype": "markdown",
|
"markdown": {
|
"title": title,
|
"text": text
|
},
|
"at": {
|
"isAtAll": is_at_all
|
}
|
}
|
|
try:
|
response = requests.post(dingtalk_webhook, json=payload)
|
if response.status_code == 200:
|
print("Markdown消息发送成功!")
|
else:
|
print(f"发送失败,状态码:{response.status_code}")
|
except Exception as e:
|
print(f"发送异常:{e}")
|
|
def send_message(self, message):
|
# 计算签名和时间戳
|
timestamp, sign = self.calculate_sign()
|
|
# 构建钉钉 Webhook URL,拼接上 timestamp 和 sign
|
dingtalk_webhook = f'https://oapi.dingtalk.com/robot/send?access_token={self.access_token}×tamp={timestamp}&sign={sign}'
|
|
headers = {
|
'Content-Type': 'application/json',
|
}
|
|
# 构建发送的消息内容
|
payload = {
|
"msgtype": "text",
|
"text": {
|
"content": message
|
}
|
}
|
|
try:
|
# 发送请求
|
response = requests.post(dingtalk_webhook, json=payload, headers=headers)
|
|
# 输出返回的状态码和响应内容
|
print(f"Response status: {response.status_code}")
|
print(f"Response text: {response.text}")
|
|
if response.status_code == 200:
|
print("Message sent successfully!")
|
else:
|
print("Failed to send message.")
|
except Exception as e:
|
print(f"Error occurred: {e}")
|