168 lines
5.6 KiB
Python
168 lines
5.6 KiB
Python
|
from chuangliangTool.db_base import Bd_task_db, Async_create_ad_batch_db
|
||
|
from typing import List, Dict, Any, Optional
|
||
|
import json
|
||
|
import html
|
||
|
|
||
|
from django.db import migrations, models
|
||
|
|
||
|
|
||
|
class Migration(migrations.Migration): # 确保有 Migration 类
|
||
|
dependencies = [
|
||
|
# 声明依赖的迁移
|
||
|
]
|
||
|
|
||
|
operations = [
|
||
|
# 迁移操作列表
|
||
|
]
|
||
|
|
||
|
|
||
|
class AsyncTaskDataQuery:
|
||
|
def __init__(self, async_task_id: int):
|
||
|
"""
|
||
|
初始化异步任务数据查询
|
||
|
:param async_task_id: 异步任务ID (整数)
|
||
|
"""
|
||
|
self.async_task_id = async_task_id
|
||
|
self.batch_id = None
|
||
|
self.create_status = None
|
||
|
self.task_details = [] # 存储多个任务详情记录
|
||
|
|
||
|
def _query_first_database(self) -> bool:
|
||
|
"""
|
||
|
查询第一个数据库获取batch_id和create_status
|
||
|
:return: 查询是否成功
|
||
|
"""
|
||
|
sql_query = f"""
|
||
|
SELECT async_task_id, batch_id, create_status
|
||
|
FROM chuangliang_ad_task.async_create_ad_batch
|
||
|
WHERE async_task_id = {self.async_task_id}
|
||
|
"""
|
||
|
try:
|
||
|
results = Async_create_ad_batch_db.query_database(sql_query)
|
||
|
|
||
|
if not results:
|
||
|
print(f"警告: 未找到异步任务ID {self.async_task_id} 的记录")
|
||
|
return False
|
||
|
|
||
|
first_result = results[0]
|
||
|
if len(first_result) >= 3:
|
||
|
self.batch_id = first_result[1]
|
||
|
self.create_status = first_result[2]
|
||
|
return True
|
||
|
else:
|
||
|
print(f"错误: 查询结果字段不足, 预期3个字段, 实际{len(first_result)}个字段")
|
||
|
return False
|
||
|
|
||
|
except Exception as e:
|
||
|
print(f"查询第一个数据库时出错: {str(e)}")
|
||
|
return False
|
||
|
|
||
|
def _query_second_database(self) -> List[Dict[str, Any]]:
|
||
|
"""
|
||
|
使用batch_id查询第二个数据库获取任务详情
|
||
|
:return: 包含所有任务详情的字典列表
|
||
|
"""
|
||
|
if not self.batch_id:
|
||
|
print("错误: batch_id未初始化")
|
||
|
return []
|
||
|
|
||
|
sql_query = f"""
|
||
|
SELECT task_param, media_account_id, `status`, result_data
|
||
|
FROM chuangliang_ad_task_baidu.`task_log_baidu_batch_add`
|
||
|
WHERE batch_id = '{self.batch_id}'
|
||
|
"""
|
||
|
try:
|
||
|
results = Bd_task_db.query_database(sql_query)
|
||
|
|
||
|
if not results:
|
||
|
print(f"警告: 未找到batch_id {self.batch_id} 的记录")
|
||
|
return []
|
||
|
|
||
|
# 处理多个结果
|
||
|
task_logs = []
|
||
|
for i, row in enumerate(results, 1):
|
||
|
if len(row) < 4:
|
||
|
print(f"警告: 结果字段不足, 预期4个字段, 实际{len(row)}个字段 (记录#{i})")
|
||
|
continue
|
||
|
|
||
|
task_log = {
|
||
|
"log_index": i,
|
||
|
"task_param": row[0],
|
||
|
"media_account_id": row[1],
|
||
|
"operation_status": row[2],
|
||
|
"result_data": row[3]
|
||
|
}
|
||
|
task_logs.append(task_log)
|
||
|
|
||
|
print(f"找到 {len(task_logs)} 条任务日志记录")
|
||
|
return task_logs
|
||
|
|
||
|
except Exception as e:
|
||
|
print(f"查询第二个数据库时出错: {str(e)}")
|
||
|
return []
|
||
|
|
||
|
def get_task_details(self) -> Dict[str, Any]:
|
||
|
"""
|
||
|
获取完整的任务详情数据
|
||
|
:return: 包含所有所需字段的字典
|
||
|
"""
|
||
|
# 获取第一个数据库的结果
|
||
|
if not self._query_first_database():
|
||
|
return {
|
||
|
"async_task_id": self.async_task_id,
|
||
|
"error": "未找到批次信息"
|
||
|
}
|
||
|
|
||
|
# 获取第二个数据库的结果
|
||
|
task_logs = self._query_second_database()
|
||
|
|
||
|
# 组织返回结果
|
||
|
return {
|
||
|
"async_task_id": self.async_task_id,
|
||
|
"batch_id": self.batch_id,
|
||
|
"create_status": self.create_status,
|
||
|
"task_logs": task_logs,
|
||
|
"total_logs": len(task_logs)
|
||
|
}
|
||
|
|
||
|
@staticmethod
|
||
|
def format_output(data: Dict[str, Any]) -> str:
|
||
|
"""
|
||
|
格式化输出结果为带标识的可读字符串
|
||
|
"""
|
||
|
if not data or "error" in data:
|
||
|
error_msg = data.get("error", "未知错误") if data else "没有可用的数据"
|
||
|
return f"未能获取任务详情: {error_msg}"
|
||
|
|
||
|
# 标题和基本信息
|
||
|
output = [
|
||
|
f"\n========== 任务详情 [ID: {data['async_task_id']}] ==========",
|
||
|
f"• 批次ID: {data['batch_id']}",
|
||
|
f"• 创建状态: {data['create_status']}",
|
||
|
f"• 日志记录总数: {data['total_logs']}"
|
||
|
]
|
||
|
|
||
|
# 处理每个日志记录
|
||
|
for log in data.get("task_logs", []):
|
||
|
output.append(f"\n==== 任务日志 #{log['log_index']} ====")
|
||
|
output.append(f"• 媒体账户ID: {log.get('media_account_id', 'N/A')}")
|
||
|
output.append(f"• 操作状态: {log.get('operation_status', 'N/A')}")
|
||
|
output.append(f"\n[任务参数]\n{log.get('task_param', '无数据')}")
|
||
|
output.append(f"\n[结果数据]\n{log.get('result_data', '无数据')}")
|
||
|
output.append("-" * 40)
|
||
|
|
||
|
output.append("==========================================")
|
||
|
return "\n".join(output)
|
||
|
|
||
|
|
||
|
# 使用示例
|
||
|
if __name__ == "__main__":
|
||
|
# 创建查询实例 (使用实际任务ID)
|
||
|
task_query = AsyncTaskDataQuery(14038291158)
|
||
|
|
||
|
# 获取任务详情
|
||
|
task_details = task_query.get_task_details()
|
||
|
|
||
|
# 格式化输出结果
|
||
|
print(AsyncTaskDataQuery.format_output(task_details))
|