diff --git a/chuangliangBaidu/urls.py b/chuangliangBaidu/urls.py index 9ca14b0..af0519a 100644 --- a/chuangliangBaidu/urls.py +++ b/chuangliangBaidu/urls.py @@ -11,5 +11,6 @@ from . import views urlpatterns = [ # path('datetime/', views.get_datetime, name='get_datetime'), - path('api/task_details/', views.get_task_details, name='task_details') + path('api/task_details/', views.get_task_details, name='task_details'), + path('api/magic_task/', views.get_magic_task_info, name='magic_task_info') ] \ No newline at end of file diff --git a/chuangliangBaidu/views.py b/chuangliangBaidu/views.py index 9a0f212..4bdccc7 100644 --- a/chuangliangBaidu/views.py +++ b/chuangliangBaidu/views.py @@ -7,8 +7,10 @@ @Motto : Catch as catch can.... """ from chuangliangBaidu.migrations.BaidutaskQuery import AsyncTaskDataQuery +from chuangliangProject.magic_cat.task_client_magic import MagicTaskQuery from django.http import JsonResponse from django.views.decorators.http import require_GET +from django.views.decorators.http import require_http_methods from django.utils import timezone from django.shortcuts import render import json @@ -128,3 +130,46 @@ def get_task_details(request): }) logger.error(error_data) return JsonResponse(error_data, status=500, json_dumps_params={'ensure_ascii': False}) + + +# @csrf_exempt # 如果不需要CSRF保护,可以添加此装饰器 +@require_http_methods(["GET"]) # 限制只允许GET请求 +def get_magic_task_info(request): + """ + 获取魔剪任务信息的GET接口 + 接收参数:task - 任务ID + """ + task_id = request.GET.get('taskid') + # 验证参数 + if not task_id: + return JsonResponse({ + 'status': 'error', + 'message': '缺少必要参数: task' + }, status=400, json_dumps_params={'ensure_ascii': False}) + + try: + # 将参数转换为整数 + task_id = int(task_id) + except ValueError: + return JsonResponse({ + 'status': 'error', + 'message': '参数格式错误: task必须是整数' + }, status=400, json_dumps_params={'ensure_ascii': False}) + + try: + # 创建查询实例并获取任务信息 + task_query = MagicTaskQuery(task_id) + result = task_query.get_task_info() + + # 返回成功响应 + return JsonResponse(result, status=200, json_dumps_params={'ensure_ascii': False}) + + except Exception as e: + # 记录错误日志(实际项目中应该使用日志模块) + print(f"查询魔剪任务信息失败: {str(e)}") + + # 返回错误响应 + return JsonResponse({ + 'status': 'error', + 'message': f'查询失败: {str(e)}' + }, status=500, json_dumps_params={'ensure_ascii': False}) \ No newline at end of file diff --git a/chuangliangProject/magic_cat/__init__.py b/chuangliangProject/magic_cat/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/chuangliangProject/magic_cat/task_client_magic.py b/chuangliangProject/magic_cat/task_client_magic.py new file mode 100644 index 0000000..a5b7b21 --- /dev/null +++ b/chuangliangProject/magic_cat/task_client_magic.py @@ -0,0 +1,165 @@ +import requests +import json +import pandas as pd +from typing import List, Dict, Any, Optional +from chuangliangTool.db_base import Chuangliang_ad_magic_task_skit, Chuangliang_ad_magic_cut, Material1_db, Material2_db + + +class MagicTaskQuery: + def __init__(self, task_id: int): + self.task_id = task_id + self.base_url = "https://tos.mobgi.com/" + self.sql = f"SELECT task_id, batch_id, status, task_param, result_data, main_user_id, create_user_id " \ + f"FROM chuangliang_ad_task_skit.task_log_client_mix_shear " \ + f"WHERE task_id = {self.task_id}" + self.detail_sql = f"SELECT rule_id, fission_mode, video_duration, create_task_time, " \ + f"procssing_task_time, finish_task_time, main_user_id " \ + f"FROM chuangliang_ad_task_skit.task_log_detail " \ + f"WHERE task_id = {self.task_id}" + self.material_sql = f"SELECT new_material_id, material_id, material_id_start, material_id_start_end, " \ + f"start_time, end_time, use_duration, is_effect " \ + f"FROM chuangliang_ad_magic_cut.client_material_relation " \ + f"WHERE task_id = {self.task_id}" + + def _fetch_url_content(self, path: str) -> Optional[Dict[str, Any]]: + """获取指定URL的内容并转换为JSON格式""" + try: + full_url = self.base_url + path + response = requests.get(full_url, timeout=10) + response.raise_for_status() + return response.json() + except requests.exceptions.RequestException as e: + print(f"HTTP请求失败: {str(e)}") + return None + except json.JSONDecodeError as e: + print(f"JSON解析失败: {str(e)}") + return None + + def _parse_json_string(self, json_str: str) -> Dict[str, Any]: + """解析JSON字符串""" + try: + return json.loads(json_str) + except json.JSONDecodeError: + return {"error_data": json_str} + + def _get_material_uri(self, material_id: int) -> Optional[str]: + """获取素材URI""" + material_sql = f"SELECT file_uri FROM chuangliang_ad_material.material WHERE material_id = {material_id}" + # 先查询第一个库表 + try: + result = Material1_db.query_database_select_to_dataframe(material_sql) + if result and len(result) > 0: + file_uri = result[0].get('file_uri') + if file_uri: + return f"{self.base_url}{file_uri}" + except Exception as e: + print(f"查询material_id {material_id} 时Material1_db出错: {e}") + + # 第一个库表无结果或出错时查询第二个库表 + try: + result = Material2_db.query_database_select_to_dataframe(material_sql) + if result and len(result) > 0: + file_uri = result[0].get('file_uri') + if file_uri: + return f"{self.base_url}{file_uri}" + except Exception as e: + print(f"查询material_id {material_id} 时Material2_db出错: {e}") + return None + + def _process_material_data(self, query_results: List[Dict[str, Any]]) -> Dict[str, Any]: + """处理素材关联表查询结果,生成指定JSON结构""" + if not query_results: + return {"new_material_id": None, "sum_use_duration": 0, "material_info": []} + + new_material_id = query_results[0]["new_material_id"] + sum_use_duration = sum(record.get("use_duration", 0) for record in query_results) + + material_info = [] + for record in query_results: + material_id = record["material_id"] + material_uri = self._get_material_uri(material_id) + + material_record = { + "material_id": material_id, + "material_id_start": record.get("material_id_start"), + "material_id_start_end": record.get("material_id_start_end"), + "start_time": record.get("start_time"), + "end_time": record.get("end_time"), + "use_duration": record.get("use_duration"), + "is_effect": record.get("is_effect"), + "material_uri": material_uri + } + material_info.append(material_record) + + return { + "new_material_id": new_material_id, + "sum_use_duration": sum_use_duration, + "material_info": material_info + } + + def get_task_info(self) -> Dict[str, Any]: + """获取任务信息并组合成最终结果""" + # 获取主任务数据 + task_data_list = Chuangliang_ad_magic_task_skit.query_database_select_to_dataframe(self.sql) + if not task_data_list: + return {"error": f"未找到task_id为{self.task_id}的任务"} + + task_data = task_data_list[0] + + # 获取任务详情 + task_detail_list = Chuangliang_ad_magic_task_skit.query_database_select_to_dataframe(self.detail_sql) + if not task_detail_list: + return {"error": f"未找到task_id为{self.task_id}的任务详情"} + + detail_data = task_detail_list[0] + + # 获取任务参数 + if task_data['status'] != 'success': + task_param = self._parse_json_string(task_data['task_param']) + else: + task_param = self._fetch_url_content(task_data['task_param']) + + # 处理结果数据 + result_data = self._parse_json_string(task_data['result_data']) + + # 获取素材信息 + material_info = Chuangliang_ad_magic_cut.query_database_select_to_dataframe(self.material_sql) + material_detail = self._process_material_data(material_info) + + # 构建最终结果 + return { + "task_info": { + "task_id": task_data['task_id'], + "batch_id": task_data['batch_id'], + "status": task_data['status'], + "main_user_id": task_data['main_user_id'], + "create_user_id": task_data['create_user_id'] + }, + "task_param": task_param, + "result_data": result_data, + "task_detail": { + "rule_id": detail_data['rule_id'], + "fission_mode": detail_data['fission_mode'], + "video_duration": detail_data['video_duration'], + "create_task_time": str(detail_data.get('create_task_time', '')), + "procssing_task_time": str(detail_data.get('procssing_task_time', '')), + "finish_task_time": str(detail_data.get('finish_task_time', '')), + "main_user_id": detail_data['main_user_id'] + }, + "material_detail": material_detail + } + + +def main(): + """主函数示例""" + task_id = 31544930 + task_query = MagicTaskQuery(task_id) + result = task_query.get_task_info() + + # 格式化输出JSON + formatted_result = json.dumps(result, ensure_ascii=False) + print(formatted_result) + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/chuangliangTool/db_base.py b/chuangliangTool/db_base.py index 08e65d5..82c9c9b 100644 --- a/chuangliangTool/db_base.py +++ b/chuangliangTool/db_base.py @@ -136,7 +136,7 @@ config_chuangliang_ad_magic_cut = { 'database_name': "chuangliang_ad_magic_cut" } # 生产魔剪任务库配置 -config_chuangliang_ad_task_skit = { +config_chuangliang_ad_magic_task_skit = { 'jump_host_ip': "180.184.103.38", 'jump_host_port': 2222, 'jump_host_user': "cl_ad", @@ -156,11 +156,12 @@ Material1_db = JumpHostDatabaseConnector(config_material1) Material2_db = JumpHostDatabaseConnector(config_material2) Async_create_ad_batch_db = JumpHostDatabaseConnector(config_async_create_ad_batch) Chuangliang_ad_magic_cut = JumpHostDatabaseConnector(config_chuangliang_ad_magic_cut) -Chuangliang_ad_task_skit = JumpHostDatabaseConnector(config_chuangliang_ad_task_skit) +Chuangliang_ad_magic_task_skit = JumpHostDatabaseConnector(config_chuangliang_ad_magic_task_skit) if __name__ == '__main__': sql = "SELECT * FROM chuangliang_ad_magic_cut.config_apollo WHERE `key` = 'client_task_num_day_limit'" - magic_sql = "SELECT task_id,batch_id,task_param,result_data,main_user_id,create_user_id FROM chuangliang_ad_task_skit.task_log_client_mix_shear WHERE task_id = 19110555" + magic_sql = "SELECT task_id,batch_id,task_param,result_data,main_user_id,create_user_id FROM chuangliang_ad_task_skit.task_log_client_mix_shear WHERE task_id = 30821620" + material_sql = "SELECT new_material_id,material_id,material_id_start,material_id_start_end,start_time,end_time,use_duration,rule_id,task_id,is_effect FROM chuangliang_ad_magic_cut.client_material_relation WHERE task_id = 30821691" # 执行查询 - results = Chuangliang_ad_task_skit.query_database_select_to_dataframe(magic_sql) + results = Chuangliang_ad_magic_cut.query_database_select_to_dataframe(material_sql) print(results)