chuangliangProject/chuangliangBaidu/migrations/BaidutaskQuery.py

169 lines
5.6 KiB
Python
Raw Permalink Normal View History

2025-08-19 02:51:33 +00:00
# -*- coding: utf-8 -*-
"""
@Time : 2025/8/15 19:29
@Auth : 九月的海
@IDE : PyCharm
@Motto : Catch as catch can....
"""
2025-08-13 03:11:46 +00:00
from chuangliangTool.db_base import Bd_task_db, Async_create_ad_batch_db
from typing import List, Dict, Any, Optional
from django.db import migrations, models
class Migration(migrations.Migration): # 确保有 Migration 类
dependencies = [
# 声明依赖的迁移
]
operations = [
# 迁移操作列表
]
class AsyncTaskDataQuery:
def __init__(self, async_task_id: int):
"""
初始化异步任务数据查询
:param async_task_id: 异步任务ID (整数)
"""
self.async_task_id = async_task_id
self.batch_id = None
self.create_status = None
self.task_details = [] # 存储多个任务详情记录
def _query_first_database(self) -> bool:
"""
查询第一个数据库获取batch_id和create_status
:return: 查询是否成功
"""
sql_query = f"""
SELECT async_task_id, batch_id, create_status
FROM chuangliang_ad_task.async_create_ad_batch
WHERE async_task_id = {self.async_task_id}
"""
try:
results = Async_create_ad_batch_db.query_database(sql_query)
if not results:
print(f"警告: 未找到异步任务ID {self.async_task_id} 的记录")
return False
first_result = results[0]
if len(first_result) >= 3:
self.batch_id = first_result[1]
self.create_status = first_result[2]
return True
else:
print(f"错误: 查询结果字段不足, 预期3个字段, 实际{len(first_result)}个字段")
return False
except Exception as e:
print(f"查询第一个数据库时出错: {str(e)}")
return False
def _query_second_database(self) -> List[Dict[str, Any]]:
"""
使用batch_id查询第二个数据库获取任务详情
:return: 包含所有任务详情的字典列表
"""
if not self.batch_id:
print("错误: batch_id未初始化")
return []
sql_query = f"""
SELECT task_param, media_account_id, `status`, result_data
FROM chuangliang_ad_task_baidu.`task_log_baidu_batch_add`
WHERE batch_id = '{self.batch_id}'
"""
try:
results = Bd_task_db.query_database(sql_query)
if not results:
print(f"警告: 未找到batch_id {self.batch_id} 的记录")
return []
# 处理多个结果
task_logs = []
for i, row in enumerate(results, 1):
if len(row) < 4:
print(f"警告: 结果字段不足, 预期4个字段, 实际{len(row)}个字段 (记录#{i})")
continue
task_log = {
"log_index": i,
"task_param": row[0],
"media_account_id": row[1],
"operation_status": row[2],
"result_data": row[3]
}
task_logs.append(task_log)
return task_logs
except Exception as e:
print(f"查询第二个数据库时出错: {str(e)}")
return []
def get_task_details(self) -> Dict[str, Any]:
"""
获取完整的任务详情数据
:return: 包含所有所需字段的字典
"""
# 获取第一个数据库的结果
if not self._query_first_database():
return {
"async_task_id": self.async_task_id,
"error": "未找到批次信息"
}
# 获取第二个数据库的结果
task_logs = self._query_second_database()
# 组织返回结果
return {
"async_task_id": self.async_task_id,
"batch_id": self.batch_id,
"create_status": self.create_status,
"task_logs": task_logs,
"total_logs": len(task_logs)
}
@staticmethod
def format_output(data: Dict[str, Any]) -> str:
"""
格式化输出结果为带标识的可读字符串
"""
if not data or "error" in data:
error_msg = data.get("error", "未知错误") if data else "没有可用的数据"
return f"未能获取任务详情: {error_msg}"
# 标题和基本信息
output = [
f"\n========== 任务详情 [ID: {data['async_task_id']}] ==========",
f"• 批次ID: {data['batch_id']}",
f"• 创建状态: {data['create_status']}",
f"• 日志记录总数: {data['total_logs']}"
]
# 处理每个日志记录
for log in data.get("task_logs", []):
output.append(f"\n==== 任务日志 #{log['log_index']} ====")
output.append(f"• 媒体账户ID: {log.get('media_account_id', 'N/A')}")
output.append(f"• 操作状态: {log.get('operation_status', 'N/A')}")
output.append(f"\n[任务参数]\n{log.get('task_param', '无数据')}")
output.append(f"\n[结果数据]\n{log.get('result_data', '无数据')}")
output.append("-" * 40)
output.append("==========================================")
return "\n".join(output)
# 使用示例
if __name__ == "__main__":
# 创建查询实例 (使用实际任务ID)
task_query = AsyncTaskDataQuery(14038291158)
# 获取任务详情
task_details = task_query.get_task_details()
# 格式化输出结果
print(AsyncTaskDataQuery.format_output(task_details))