# -*- coding: utf-8 -*- """ @Time : 2025/8/15 19:29 @Auth : 九月的海 @IDE : PyCharm @Motto : Catch as catch can.... """ from chuangliangTool.db_base import Bd_task_db, Async_create_ad_batch_db from typing import List, Dict, Any, Optional from django.db import migrations, models class Migration(migrations.Migration): # 确保有 Migration 类 dependencies = [ # 声明依赖的迁移 ] operations = [ # 迁移操作列表 ] class AsyncTaskDataQuery: def __init__(self, async_task_id: int): """ 初始化异步任务数据查询 :param async_task_id: 异步任务ID (整数) """ self.async_task_id = async_task_id self.batch_id = None self.create_status = None self.task_details = [] # 存储多个任务详情记录 def _query_first_database(self) -> bool: """ 查询第一个数据库获取batch_id和create_status :return: 查询是否成功 """ sql_query = f""" SELECT async_task_id, batch_id, create_status FROM chuangliang_ad_task.async_create_ad_batch WHERE async_task_id = {self.async_task_id} """ try: results = Async_create_ad_batch_db.query_database(sql_query) if not results: print(f"警告: 未找到异步任务ID {self.async_task_id} 的记录") return False first_result = results[0] if len(first_result) >= 3: self.batch_id = first_result[1] self.create_status = first_result[2] return True else: print(f"错误: 查询结果字段不足, 预期3个字段, 实际{len(first_result)}个字段") return False except Exception as e: print(f"查询第一个数据库时出错: {str(e)}") return False def _query_second_database(self) -> List[Dict[str, Any]]: """ 使用batch_id查询第二个数据库获取任务详情 :return: 包含所有任务详情的字典列表 """ if not self.batch_id: print("错误: batch_id未初始化") return [] sql_query = f""" SELECT task_param, media_account_id, `status`, result_data FROM chuangliang_ad_task_baidu.`task_log_baidu_batch_add` WHERE batch_id = '{self.batch_id}' """ try: results = Bd_task_db.query_database(sql_query) if not results: print(f"警告: 未找到batch_id {self.batch_id} 的记录") return [] # 处理多个结果 task_logs = [] for i, row in enumerate(results, 1): if len(row) < 4: print(f"警告: 结果字段不足, 预期4个字段, 实际{len(row)}个字段 (记录#{i})") continue task_log = { "log_index": i, "task_param": row[0], "media_account_id": row[1], "operation_status": row[2], "result_data": row[3] } task_logs.append(task_log) return task_logs except Exception as e: print(f"查询第二个数据库时出错: {str(e)}") return [] def get_task_details(self) -> Dict[str, Any]: """ 获取完整的任务详情数据 :return: 包含所有所需字段的字典 """ # 获取第一个数据库的结果 if not self._query_first_database(): return { "async_task_id": self.async_task_id, "error": "未找到批次信息" } # 获取第二个数据库的结果 task_logs = self._query_second_database() # 组织返回结果 return { "async_task_id": self.async_task_id, "batch_id": self.batch_id, "create_status": self.create_status, "task_logs": task_logs, "total_logs": len(task_logs) } @staticmethod def format_output(data: Dict[str, Any]) -> str: """ 格式化输出结果为带标识的可读字符串 """ if not data or "error" in data: error_msg = data.get("error", "未知错误") if data else "没有可用的数据" return f"未能获取任务详情: {error_msg}" # 标题和基本信息 output = [ f"\n========== 任务详情 [ID: {data['async_task_id']}] ==========", f"• 批次ID: {data['batch_id']}", f"• 创建状态: {data['create_status']}", f"• 日志记录总数: {data['total_logs']}" ] # 处理每个日志记录 for log in data.get("task_logs", []): output.append(f"\n==== 任务日志 #{log['log_index']} ====") output.append(f"• 媒体账户ID: {log.get('media_account_id', 'N/A')}") output.append(f"• 操作状态: {log.get('operation_status', 'N/A')}") output.append(f"\n[任务参数]\n{log.get('task_param', '无数据')}") output.append(f"\n[结果数据]\n{log.get('result_data', '无数据')}") output.append("-" * 40) output.append("==========================================") return "\n".join(output) # 使用示例 if __name__ == "__main__": # 创建查询实例 (使用实际任务ID) task_query = AsyncTaskDataQuery(14038291158) # 获取任务详情 task_details = task_query.get_task_details() # 格式化输出结果 print(AsyncTaskDataQuery.format_output(task_details))