262 lines
9.1 KiB
Python
262 lines
9.1 KiB
Python
import requests
|
||
import json
|
||
import datetime
|
||
from typing import Dict, Any
|
||
|
||
from chuangliangProject.magic_cat.task_client_magic import MagicTaskQuery
|
||
|
||
|
||
def quick_send_task_update(task_id: int, webhook_url: str):
|
||
"""快速发送任务更新到飞书 - 修复版"""
|
||
try:
|
||
# 获取任务数据
|
||
task_query = MagicTaskQuery(task_id)
|
||
result = task_query.get_task_info()
|
||
|
||
print(f"原始结果类型: {type(result)}")
|
||
print(f"原始结果内容: {result}")
|
||
|
||
# 处理结果数据 - 根据实际类型进行调整
|
||
if isinstance(result, str):
|
||
# 如果是字符串,尝试解析为JSON
|
||
try:
|
||
result_data = json.loads(result)
|
||
except:
|
||
result_data = {"raw_data": result}
|
||
elif hasattr(result, '__dict__'):
|
||
# 如果是对象,转换为字典
|
||
result_data = result.__dict__
|
||
elif isinstance(result, dict):
|
||
# 如果是字典,直接使用
|
||
result_data = result
|
||
else:
|
||
# 其他类型,转换为字符串
|
||
result_data = {"data": str(result)}
|
||
|
||
# 构建飞书消息 - 修复msg_type错误
|
||
message = {
|
||
"msg_type": "text", # 确保有这个字段
|
||
"content": {
|
||
"text": f"任务查询报告 - 任务ID: {task_id}\n"
|
||
f"查询时间: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n"
|
||
f"原始数据: {json.dumps(result_data, ensure_ascii=False, indent=2)}"
|
||
}
|
||
}
|
||
|
||
# 发送请求
|
||
headers = {'Content-Type': 'application/json'}
|
||
response = requests.post(webhook_url, headers=headers, data=json.dumps(message))
|
||
|
||
# 检查响应
|
||
if response.status_code == 200:
|
||
feishu_response = response.json()
|
||
if feishu_response.get('code') == 0:
|
||
print("✓ 消息发送成功")
|
||
return True
|
||
else:
|
||
print(f"✗ 飞书API返回错误: {feishu_response}")
|
||
return False
|
||
else:
|
||
print(f"✗ HTTP请求失败: {response.status_code}")
|
||
return False
|
||
|
||
except Exception as e:
|
||
print(f"✗ 执行过程中发生错误: {str(e)}")
|
||
return False
|
||
|
||
|
||
# 更安全的版本,处理各种数据类型
|
||
def safe_send_task_update(task_id: int, webhook_url: str, bug: str = None):
|
||
"""安全版本,处理各种数据类型"""
|
||
try:
|
||
# 获取任务数据
|
||
task_query = MagicTaskQuery(task_id)
|
||
raw_result = task_query.get_task_info()
|
||
|
||
# 安全地提取数据
|
||
task_param = "N/A"
|
||
result_data = "N/A"
|
||
task_info = "N/A"
|
||
material_detail = "N/A"
|
||
|
||
# 根据不同类型安全访问
|
||
if isinstance(raw_result, dict):
|
||
task_param = raw_result.get('task_param', 'N/A')
|
||
result_data = raw_result.get('result_data', raw_result)
|
||
task_info = raw_result.get('task_info', 'N/A')
|
||
material_detail = raw_result.get('material_detail', 'N/A')
|
||
elif hasattr(raw_result, 'task_param'):
|
||
task_param = getattr(raw_result, 'task_param', 'N/A')
|
||
result_data = getattr(raw_result, 'result_data', str(raw_result))
|
||
task_info = getattr(raw_result, 'task_info', 'N/A')
|
||
material_detail = getattr(raw_result, 'material_detail', 'N/A')
|
||
else:
|
||
# 如果是其他类型,直接转换为字符串
|
||
result_data = str(raw_result)
|
||
|
||
# 构建更友好的消息格式
|
||
message = {
|
||
"msg_type": "interactive",
|
||
"card": {
|
||
"header": {
|
||
"title": {
|
||
"tag": "plain_text",
|
||
"content": f"BUG - {bug or '看bug表'}"
|
||
},
|
||
"template": "red"
|
||
},
|
||
"elements": [
|
||
{
|
||
"tag": "div",
|
||
"text": {
|
||
"tag": "lark_md",
|
||
"content": f"**任务ID:** {task_id} | **查询时间:** {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} | **状态:** {task_info.get('status', 'N/A')}"
|
||
}
|
||
},
|
||
{
|
||
"tag": "hr"
|
||
},
|
||
{
|
||
"tag": "div",
|
||
"text": {
|
||
"tag": "lark_md",
|
||
"content": "**请求参数**"
|
||
}
|
||
},
|
||
{
|
||
"tag": "div",
|
||
"text": {
|
||
"tag": "plain_text",
|
||
"content": json.dumps(task_param, ensure_ascii=False)
|
||
}
|
||
},
|
||
{
|
||
"tag": "hr"
|
||
},
|
||
{
|
||
"tag": "div",
|
||
"text": {
|
||
"tag": "lark_md",
|
||
"content": "**响应结果**"
|
||
}
|
||
},
|
||
{
|
||
"tag": "div",
|
||
"text": {
|
||
"tag": "plain_text",
|
||
"content": json.dumps(result_data, ensure_ascii=False)
|
||
}
|
||
},
|
||
{
|
||
"tag": "hr"
|
||
},
|
||
{
|
||
"tag": "div",
|
||
"text": {
|
||
"tag": "lark_md",
|
||
"content": "**素材数据**"
|
||
}
|
||
},
|
||
{
|
||
"tag": "div",
|
||
"text": {
|
||
"tag": "plain_text",
|
||
"content": json.dumps(material_detail, ensure_ascii=False)
|
||
}
|
||
}
|
||
]
|
||
}
|
||
}
|
||
|
||
# 发送请求
|
||
headers = {'Content-Type': 'application/json'}
|
||
response = requests.post(webhook_url, headers=headers, data=json.dumps(message))
|
||
|
||
# 检查响应
|
||
if response.status_code == 200:
|
||
feishu_response = response.json()
|
||
if feishu_response.get('code') == 0:
|
||
print("✓ 消息发送成功")
|
||
return True
|
||
else:
|
||
print(f"✗ 飞书API返回错误: {feishu_response}")
|
||
# 如果是消息格式问题,尝试使用文本格式
|
||
return fallback_text_send(task_id, raw_result, webhook_url)
|
||
else:
|
||
print(f"✗ HTTP请求失败: {response.status_code}")
|
||
return False
|
||
|
||
except Exception as e:
|
||
print(f"✗ 执行过程中发生错误: {str(e)}")
|
||
return False
|
||
|
||
|
||
def fallback_text_send(task_id: int, result, webhook_url: str):
|
||
"""备用方案:使用纯文本格式发送"""
|
||
try:
|
||
message = {
|
||
"msg_type": "text",
|
||
"content": {
|
||
"text": f"任务ID: {task_id}\n结果: {str(result)[:500]}..." # 限制长度
|
||
}
|
||
}
|
||
|
||
headers = {'Content-Type': 'application/json'}
|
||
response = requests.post(webhook_url, headers=headers, data=json.dumps(message))
|
||
|
||
if response.status_code == 200:
|
||
feishu_response = response.json()
|
||
if feishu_response.get('code') == 0:
|
||
print("✓ 备用文本消息发送成功")
|
||
return True
|
||
return False
|
||
except:
|
||
return False
|
||
|
||
|
||
# 调试函数:查看数据结构
|
||
def debug_task_data(task_id: int):
|
||
"""调试函数,查看任务数据的结构"""
|
||
try:
|
||
task_query = MagicTaskQuery(task_id)
|
||
result = task_query.get_task_info()
|
||
|
||
print("=== 调试信息 ===")
|
||
print(f"数据类型: {type(result)}")
|
||
print(f"数据内容: {result}")
|
||
|
||
if hasattr(result, '__dict__'):
|
||
print("对象属性:")
|
||
for key, value in result.__dict__.items():
|
||
print(f" {key}: {value}")
|
||
|
||
if isinstance(result, dict):
|
||
print("字典键值:")
|
||
for key, value in result.items():
|
||
print(f" {key}: {value}")
|
||
|
||
return result
|
||
except Exception as e:
|
||
print(f"调试失败: {str(e)}")
|
||
return None
|
||
|
||
|
||
# 使用示例
|
||
if __name__ == "__main__":
|
||
# 请替换为您的实际webhook链接
|
||
WEBHOOK_URL = "https://open.feishu.cn/open-apis/bot/v2/hook/c0a6dfdf-0a4b-4293-b997-a3eabd2597cf"
|
||
TASK_ID = 33945388
|
||
BUG = "画中画字段缺失"
|
||
# WEBHOOK_URL = "https://open.feishu.cn/open-apis/bot/v2/hook/3317c0db-c30c-4386-97dd-e8d5d427b611" # 魔剪测试群
|
||
|
||
# 先调试查看数据结构
|
||
# print("=== 调试数据结构 ===")
|
||
# debug_task_data(TASK_ID)
|
||
|
||
print("\n=== 发送消息 ===")
|
||
# 使用安全版本发送
|
||
success = safe_send_task_update(TASK_ID, WEBHOOK_URL, BUG)
|
||
|
||
if not success:
|
||
print("尝试使用简单文本格式...")
|
||
success = quick_send_task_update(TASK_ID, WEBHOOK_URL) |