diff --git a/chuangliangProject/magic_cat/rule_task.py b/chuangliangProject/magic_cat/rule_task.py index d415e62..04beebe 100644 --- a/chuangliangProject/magic_cat/rule_task.py +++ b/chuangliangProject/magic_cat/rule_task.py @@ -231,15 +231,16 @@ class RuleTaskQuery: def main(): """主函数示例""" - rule_name = '数字人成片-20251015-114418' + rule_name = 'lyy-20251110-191909' rule_query = RuleTaskQuery(rule_name) # 获取完整数据 - result = rule_query.get_comprehensive_data() + # result = rule_query.get_comprehensive_data() man_result = rule_query.get_number_man_task_list_result() - # result = rule_query.get_task_list_result() + result = rule_query.get_task_list_result() # result = get_task_param(32743070) - print(json.dumps(man_result, ensure_ascii=False)) + print(len(result)) + print(json.dumps(result, ensure_ascii=False)) if __name__ == "__main__": diff --git a/chuangliangProject/magic_cat/task_client_magic.py b/chuangliangProject/magic_cat/task_client_magic.py index 19a6d94..fda23e7 100644 --- a/chuangliangProject/magic_cat/task_client_magic.py +++ b/chuangliangProject/magic_cat/task_client_magic.py @@ -1,8 +1,11 @@ +import concurrent import requests import json -import pandas as pd +import concurrent.futures +from typing import List from typing import List, Dict, Any, Optional -from chuangliangTool.db_base import Chuangliang_ad_magic_task_skit, Chuangliang_ad_magic_cut, Material1_db, Material2_db +from chuangliangTool.db_base import Chuangliang_ad_magic_task_skit, Chuangliang_ad_magic_cut, Material1_db, \ + Material2_db, Material3_db, Material4_db class MagicTaskQuery: @@ -42,7 +45,7 @@ class MagicTaskQuery: except json.JSONDecodeError: return {"error_data": json_str} - def _get_material_uri(self, material_id: int) -> Optional[str]: + def _get_material_uris(self, material_id: int) -> Optional[str]: """获取素材URI""" material_sql = f"SELECT file_uri FROM chuangliang_ad_material.material WHERE material_id = {material_id}" # 先查询第一个库表 @@ -64,6 +67,60 @@ class MagicTaskQuery: return f"{self.base_url}{file_uri}" except Exception as e: print(f"查询material_id {material_id} 时Material2_db出错: {e}") + + # 第二个库表无结果或出错时查第三个库表 + try: + result = Material3_db.query_database_select_to_dataframe(material_sql) + if result and len(result) > 0: + file_uri = result[0].get('file_uri') + if file_uri: + return f"{self.base_url}{file_uri}" + except Exception as e: + print(f"查询material_id {material_id} 时Material3_db出错: {e}") + + # 第三个库表无结果或出错时查第四个库表 + try: + result = Material4_db.query_database_select_to_dataframe(material_sql) + if result and len(result) > 0: + file_uri = result[0].get('file_uri') + if file_uri: + return f"{self.base_url}{file_uri}" + except Exception as e: + print(f"查询material_id {material_id} 时Material4_db出错: {e}") + return None + + def _get_material_uri(self, material_id: int) -> Optional[str]: + """获取素材URI - 并行查询优化""" + material_sql = f"SELECT file_uri FROM chuangliang_ad_material.material WHERE material_id = {material_id}" + + databases = [Material1_db, Material2_db, Material3_db, Material4_db] + db_names = ["Material1_db", "Material2_db", "Material3_db", "Material4_db"] + + def query_single_db(db, db_name): + try: + result = db.query_database_select_to_dataframe(material_sql) + if result and len(result) > 0: + file_uri = result[0].get('file_uri') + if file_uri: + return f"{self.base_url}{file_uri}" + except Exception as e: + print(f"查询material_id {material_id} 时{db_name}出错: {e}") + return None + + # 使用线程池并行查询 + with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: + futures = { + executor.submit(query_single_db, db, name): (db, name) + for db, name in zip(databases, db_names) + } + for future in concurrent.futures.as_completed(futures): + result = future.result() + if result: + # 取消其他未完成的查询 + for f in futures: + if not f.done(): + f.cancel() + return result return None def _process_material_data(self, query_results: List[Dict[str, Any]]) -> Dict[str, Any]: @@ -97,6 +154,79 @@ class MagicTaskQuery: "material_info": material_info } + def _get_video_materials(self, task_param): + """ + 从task_param中提取视频素材列表 + Args: + task_param: 包含视频配置数据的字典 + + Returns: + list: 包含素材ID和名称的字典列表 + """ + video_materials = [] + + # 检查mixed_shear_config是否存在 + if not task_param.get('mixed_shear_config'): + return video_materials + + # 获取video_config_list + video_config_list = task_param['mixed_shear_config'].get('video_config_list', []) + + # 遍历每个视频配置 + for video_config in video_config_list: + # 获取每个配置中的video_list + video_list = video_config.get('video_list', []) + + # 遍历每个视频 + for video in video_list: + material_info = { + 'material_id': video.get('material_id'), + 'name': video.get('name'), + 'file_uri': self._get_material_uri(video.get('material_id')) + # 'video_id': video.get('video_id'), + # 'duration': video.get('video_duration'), + # 'start_time': video.get('start_time') + } + video_materials.append(material_info) + + return video_materials + + def _process_audio_config(self, task_param: Dict[str, Any]) -> Dict[str, Any]: + """处理音频配置""" + result = [] + try: + mixed_shear_config = task_param.get("mixed_shear_config") + if not mixed_shear_config: + print("mixed_shear_config为空") + return result + audio_config = mixed_shear_config.get("audio_config") + if not audio_config: + print("audio_config为空") + return result + for audio_info in audio_config: + if not audio_info: + continue + for audio_item in audio_info: + if not isinstance(audio_item, dict): + continue + # 提取material_id和name + material_id = audio_item.get('material_id') + name = audio_item.get('name') + if material_id is None: + print(f"音频项缺少material_id: {audio_item}") + continue + uri_file = self._get_material_uri(material_id) + result_item = { + "material_id": material_id, + "name": name, + "uri_file": uri_file + } + result.append(result_item) + return result + except Exception as e: + print(f"处理音频配置时出错: {e}") + return result + def get_task_info(self) -> Dict[str, Any]: """获取任务信息并组合成最终结果""" # 获取主任务数据 @@ -124,8 +254,8 @@ class MagicTaskQuery: # 获取素材信息 material_info = Chuangliang_ad_magic_cut.query_database_select_to_dataframe(self.material_sql) - material_detail = self._process_material_data(material_info) - + # material_detail = self._process_material_data(material_info) + material_detail = self._get_video_materials(task_param) # 构建最终结果 return { "task_info": { @@ -146,20 +276,20 @@ class MagicTaskQuery: "finish_task_time": str(detail_data.get('finish_task_time', '')), "main_user_id": detail_data['main_user_id'] }, - "material_detail": material_detail + "material_detail": material_detail, + "audio_config": self._process_audio_config(task_param) } def main(): """主函数示例""" - task_id = 32616647 + task_id = 33580458 task_query = MagicTaskQuery(task_id) result = task_query.get_task_info() - # 格式化输出JSON formatted_result = json.dumps(result, ensure_ascii=False) print(formatted_result) if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/chuangliangTool/db_base.py b/chuangliangTool/db_base.py index fbe54ae..7f35e08 100644 --- a/chuangliangTool/db_base.py +++ b/chuangliangTool/db_base.py @@ -120,6 +120,30 @@ config_material2 = { 'database_name': "chuangliang_ad_material" } +config_material3 = { + 'jump_host_ip': "180.184.103.38", + 'jump_host_port': 2222, + 'jump_host_user': "cl_ad", + 'jump_host_password': "4CGbdPW2zkbewcp^", + 'database_ip': "mysqle7d5aa8a089f.rds.ivolces.com", + 'database_port': 3306, + 'database_user': "cl_readOnly", + 'database_password': "Chuangliang@2023", + 'database_name': "chuangliang_ad_material" +} + +config_material4 = { + 'jump_host_ip': "180.184.103.38", + 'jump_host_port': 2222, + 'jump_host_user': "cl_ad", + 'jump_host_password': "4CGbdPW2zkbewcp^", + 'database_ip': "mysqle492a502e180.rds.ivolces.com", + 'database_port': 3306, + 'database_user': "cl_readOnly", + 'database_password': "Chuangliang@2023", + 'database_name': "chuangliang_ad_material" +} + config_async_create_ad_batch = { 'jump_host_ip': "180.184.103.38", 'jump_host_port': 2222, @@ -174,6 +198,8 @@ UC_task_db = JumpHostDatabaseConnector(config_task) Bd_task_db = JumpHostDatabaseConnector(config_baidu_task) Material1_db = JumpHostDatabaseConnector(config_material1) Material2_db = JumpHostDatabaseConnector(config_material2) +Material3_db = JumpHostDatabaseConnector(config_material3) +Material4_db = JumpHostDatabaseConnector(config_material4) Async_create_ad_batch_db = JumpHostDatabaseConnector(config_async_create_ad_batch) Chuangliang_ad_magic_cut = JumpHostDatabaseConnector(config_chuangliang_ad_magic_cut) Chuangliang_ad_magic_task_skit = JumpHostDatabaseConnector(config_chuangliang_ad_magic_task_skit) diff --git a/chuangliangTool/get_material.py b/chuangliangTool/get_material.py new file mode 100644 index 0000000..8805637 --- /dev/null +++ b/chuangliangTool/get_material.py @@ -0,0 +1,48 @@ +from typing import Optional, Union +from chuangliangTool.db_base import Material1_db, Material2_db, Material3_db, Material4_db + + +def get_material(material_param: Optional[Union[int, str]] = None) -> Optional[str]: + """ + 获取素材 + 参数: + material_param: 可以是 material_id(int), material_name(str) 或 file_md5(str) + 返回: + 素材 + """ + if material_param is None: + raise ValueError("必须提供 material_id, material_name 或 file_md5 参数") + + # 根据参数类型构建查询条件 + if isinstance(material_param, int): + # 如果是整数,认为是material_id + sql_template = f"SELECT * FROM chuangliang_ad_material.material WHERE material_id = '{material_param}'" + else: + # 如果是字符串,需要判断是material_name还是file_md5 + # 这里假设file_md5是32位的MD5值,material_name不是32位 + if len(material_param) == 32 and all(c in '0123456789abcdefABCDEF' for c in material_param): + # 如果是32位且只包含十六进制字符,认为是file_md5 + sql_template = f"SELECT * FROM chuangliang_ad_material.material WHERE file_md5 = '{material_param}'" + else: + # 否则认为是material_name + sql_template = f"SELECT * FROM chuangliang_ad_material.material WHERE material_name = '{material_param}'" + + material_dbs = [Material1_db, Material2_db, Material3_db, Material4_db] + db_names = ["Material1_db", "Material2_db", "Material3_db", "Material4_db"] + + for db, db_name in zip(material_dbs, db_names): + try: + result = db.query_database_select_to_dataframe(sql_template) + if result is not None and len(result) > 0: + # 返回整个DataFrame,让调用者处理 + return result + except Exception as e: + print(f"查询{db_name}出错: {e}") + + return None + + +if __name__ == '__main__': + # 测试不同的参数类型 + material_data = get_material("豪品素材-城市生活情感街拍短视频 (86)") + print("查询结果:", material_data) diff --git a/chuangliangTool/send_task_to_feishu.py b/chuangliangTool/send_task_to_feishu.py new file mode 100644 index 0000000..7a5e5ba --- /dev/null +++ b/chuangliangTool/send_task_to_feishu.py @@ -0,0 +1,262 @@ +import requests +import json +import datetime +from typing import Dict, Any + +from chuangliangProject.magic_cat.task_client_magic import MagicTaskQuery + + +def quick_send_task_update(task_id: int, webhook_url: str): + """快速发送任务更新到飞书 - 修复版""" + try: + # 获取任务数据 + task_query = MagicTaskQuery(task_id) + result = task_query.get_task_info() + + print(f"原始结果类型: {type(result)}") + print(f"原始结果内容: {result}") + + # 处理结果数据 - 根据实际类型进行调整 + if isinstance(result, str): + # 如果是字符串,尝试解析为JSON + try: + result_data = json.loads(result) + except: + result_data = {"raw_data": result} + elif hasattr(result, '__dict__'): + # 如果是对象,转换为字典 + result_data = result.__dict__ + elif isinstance(result, dict): + # 如果是字典,直接使用 + result_data = result + else: + # 其他类型,转换为字符串 + result_data = {"data": str(result)} + + # 构建飞书消息 - 修复msg_type错误 + message = { + "msg_type": "text", # 确保有这个字段 + "content": { + "text": f"任务查询报告 - 任务ID: {task_id}\n" + f"查询时间: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n" + f"原始数据: {json.dumps(result_data, ensure_ascii=False, indent=2)}" + } + } + + # 发送请求 + headers = {'Content-Type': 'application/json'} + response = requests.post(webhook_url, headers=headers, data=json.dumps(message)) + + # 检查响应 + if response.status_code == 200: + feishu_response = response.json() + if feishu_response.get('code') == 0: + print("✓ 消息发送成功") + return True + else: + print(f"✗ 飞书API返回错误: {feishu_response}") + return False + else: + print(f"✗ HTTP请求失败: {response.status_code}") + return False + + except Exception as e: + print(f"✗ 执行过程中发生错误: {str(e)}") + return False + + +# 更安全的版本,处理各种数据类型 +def safe_send_task_update(task_id: int, webhook_url: str, bug: str = None): + """安全版本,处理各种数据类型""" + try: + # 获取任务数据 + task_query = MagicTaskQuery(task_id) + raw_result = task_query.get_task_info() + + # 安全地提取数据 + task_param = "N/A" + result_data = "N/A" + task_info = "N/A" + material_detail = "N/A" + + # 根据不同类型安全访问 + if isinstance(raw_result, dict): + task_param = raw_result.get('task_param', 'N/A') + result_data = raw_result.get('result_data', raw_result) + task_info = raw_result.get('task_info', 'N/A') + material_detail = raw_result.get('material_detail', 'N/A') + elif hasattr(raw_result, 'task_param'): + task_param = getattr(raw_result, 'task_param', 'N/A') + result_data = getattr(raw_result, 'result_data', str(raw_result)) + task_info = getattr(raw_result, 'task_info', 'N/A') + material_detail = getattr(raw_result, 'material_detail', 'N/A') + else: + # 如果是其他类型,直接转换为字符串 + result_data = str(raw_result) + + # 构建更友好的消息格式 + message = { + "msg_type": "interactive", + "card": { + "header": { + "title": { + "tag": "plain_text", + "content": f"BUG - {bug or '看bug表'}" + }, + "template": "red" + }, + "elements": [ + { + "tag": "div", + "text": { + "tag": "lark_md", + "content": f"**任务ID:** {task_id} | **查询时间:** {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} | **状态:** {task_info.get('status', 'N/A')}" + } + }, + { + "tag": "hr" + }, + { + "tag": "div", + "text": { + "tag": "lark_md", + "content": "**请求参数**" + } + }, + { + "tag": "div", + "text": { + "tag": "plain_text", + "content": json.dumps(task_param, ensure_ascii=False) + } + }, + { + "tag": "hr" + }, + { + "tag": "div", + "text": { + "tag": "lark_md", + "content": "**响应结果**" + } + }, + { + "tag": "div", + "text": { + "tag": "plain_text", + "content": json.dumps(result_data, ensure_ascii=False) + } + }, + { + "tag": "hr" + }, + { + "tag": "div", + "text": { + "tag": "lark_md", + "content": "**素材数据**" + } + }, + { + "tag": "div", + "text": { + "tag": "plain_text", + "content": json.dumps(material_detail, ensure_ascii=False) + } + } + ] + } + } + + # 发送请求 + headers = {'Content-Type': 'application/json'} + response = requests.post(webhook_url, headers=headers, data=json.dumps(message)) + + # 检查响应 + if response.status_code == 200: + feishu_response = response.json() + if feishu_response.get('code') == 0: + print("✓ 消息发送成功") + return True + else: + print(f"✗ 飞书API返回错误: {feishu_response}") + # 如果是消息格式问题,尝试使用文本格式 + return fallback_text_send(task_id, raw_result, webhook_url) + else: + print(f"✗ HTTP请求失败: {response.status_code}") + return False + + except Exception as e: + print(f"✗ 执行过程中发生错误: {str(e)}") + return False + + +def fallback_text_send(task_id: int, result, webhook_url: str): + """备用方案:使用纯文本格式发送""" + try: + message = { + "msg_type": "text", + "content": { + "text": f"任务ID: {task_id}\n结果: {str(result)[:500]}..." # 限制长度 + } + } + + headers = {'Content-Type': 'application/json'} + response = requests.post(webhook_url, headers=headers, data=json.dumps(message)) + + if response.status_code == 200: + feishu_response = response.json() + if feishu_response.get('code') == 0: + print("✓ 备用文本消息发送成功") + return True + return False + except: + return False + + +# 调试函数:查看数据结构 +def debug_task_data(task_id: int): + """调试函数,查看任务数据的结构""" + try: + task_query = MagicTaskQuery(task_id) + result = task_query.get_task_info() + + print("=== 调试信息 ===") + print(f"数据类型: {type(result)}") + print(f"数据内容: {result}") + + if hasattr(result, '__dict__'): + print("对象属性:") + for key, value in result.__dict__.items(): + print(f" {key}: {value}") + + if isinstance(result, dict): + print("字典键值:") + for key, value in result.items(): + print(f" {key}: {value}") + + return result + except Exception as e: + print(f"调试失败: {str(e)}") + return None + + +# 使用示例 +if __name__ == "__main__": + # 请替换为您的实际webhook链接 + WEBHOOK_URL = "https://open.feishu.cn/open-apis/bot/v2/hook/c0a6dfdf-0a4b-4293-b997-a3eabd2597cf" + TASK_ID = 33945388 + BUG = "画中画字段缺失" + # WEBHOOK_URL = "https://open.feishu.cn/open-apis/bot/v2/hook/3317c0db-c30c-4386-97dd-e8d5d427b611" # 魔剪测试群 + + # 先调试查看数据结构 + # print("=== 调试数据结构 ===") + # debug_task_data(TASK_ID) + + print("\n=== 发送消息 ===") + # 使用安全版本发送 + success = safe_send_task_update(TASK_ID, WEBHOOK_URL, BUG) + + if not success: + print("尝试使用简单文本格式...") + success = quick_send_task_update(TASK_ID, WEBHOOK_URL) \ No newline at end of file