diff --git a/chuangliangBaidu/baiduMaterialProjectSelect/__init__.py b/chuangliangBaidu/baiduMaterialProjectSelect/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/chuangliangBaidu/baiduMaterialProjectSelect/baidu_getAdgroupFeed.py b/chuangliangBaidu/baiduMaterialProjectSelect/baidu_getAdgroupFeed.py new file mode 100644 index 0000000..4569cdf --- /dev/null +++ b/chuangliangBaidu/baiduMaterialProjectSelect/baidu_getAdgroupFeed.py @@ -0,0 +1,75 @@ +import requests +import json +from typing import Union +from get_baidu_token import GetBaiduToken + + +def get_baidu_adgroup_request(identifier: Union[str, int], body: dict) -> dict: + """ + 向百度API发送请求 + :param identifier: 广告主名称或媒体账户ID + :param body: 请求体内容 + :return: API响应结果 + """ + # 获取header + token_getter = GetBaiduToken(identifier) + header = token_getter.get_header() + if not header: + return {"error": "Failed to get valid header"} + # 构建完整请求负载 + payload = { + "header": header, + "body": body + } + # API端点 + url = "https://api.baidu.com/json/feed/v1/AdgroupFeedService/getAdgroupFeed" + # 请求头 + http_headers = { + "Accept-Encoding": "gzip, deflate", + "Content-Type": "application/json", + "Accept": "application/json" + } + try: + # 发送请求 + response = requests.post( + url, + data=json.dumps(payload), + headers=http_headers + ) + + # 返回JSON响应 + return response.json() + except Exception as e: + return {"error": f"API request failed: {str(e)}"} + + +# 使用示例 +if __name__ == "__main__": + # 定义请求体 + request_body = { + "adgroupFeedFields": [ + "adgroupFeedId", + "campaignFeedId", + "adgroupFeedName", + "pause", + "bid", + "producttypes", + "ftypes", + "status", + "bidtype", + "ocpc", + "atpFeedId", + "deliveryType", + "productSetId", + "ftypeSelection", + "bidSource", + "atpName" + ], + "ids": [1413521619], # 广告计划ID + "idType": 1, # ID类型,1-计划ID,2-单元ID + } + # 使用广告主名称查询 + # response = make_baidu_api_request("原生-SLG-乱世-安卓12A20KA00006", request_body) + # 使用媒体账户ID查询 + response = get_baidu_adgroup_request("12466757256", request_body) + print(json.dumps(response, ensure_ascii=False)) diff --git a/chuangliangBaidu/baiduMaterialProjectSelect/baidu_getCampaignFeed.py b/chuangliangBaidu/baiduMaterialProjectSelect/baidu_getCampaignFeed.py new file mode 100644 index 0000000..51a5627 --- /dev/null +++ b/chuangliangBaidu/baiduMaterialProjectSelect/baidu_getCampaignFeed.py @@ -0,0 +1,92 @@ +import requests +import json +from typing import Union +from get_baidu_token import GetBaiduToken + + +def get_baidu_campaign_request(identifier: Union[str, int], body: dict) -> dict: + """ + 向百度API发送请求 + :param identifier: 广告主名称或媒体账户ID + :param body: 请求体内容 + :return: API响应结果 + """ + # 获取header + token_getter = GetBaiduToken(identifier) + header = token_getter.get_header() + if not header: + return {"error": "Failed to get valid header"} + # 构建完整请求负载 + payload = { + "header": header, + "body": body + } + # API端点 + url = "https://api.baidu.com/json/feed/v1/CampaignFeedService/getCampaignFeed" + # 请求头 + http_headers = { + "Accept-Encoding": "gzip, deflate", + "Content-Type": "application/json", + "Accept": "application/json" + } + try: + # 发送请求 + response = requests.post( + url, + data=json.dumps(payload), + headers=http_headers + ) + + # 返回JSON响应 + return response.json() + except Exception as e: + return {"error": f"API request failed: {str(e)}"} + + +# 使用示例 +if __name__ == "__main__": + # 定义请求体 + request_body = { + "campaignFeedFields": [ # 需要查询的字段列表 + "campaignFeedId", + "campaignFeedName", + "subject", + "appinfo", + "budget", + "starttime", + "endtime", + "schedule", + "pause", + "status", + "bstype", + "addtime", + "eshopType", + "rtaStatus", + "bid", + "bidtype", + "ftypes", + "ocpc", + "bmcUserId", + "catalogId", + "projectFeedId", + "productType", + "appSubType", + "deliveryType", + "miniProgramType", + "bidMode", + "productIds", + "saleType" + ], + "campaignFeedIds": [1413521619], # 广告计划ID + "campaignFeedFilter": { + "bstype": [1] # 广告类型 1-普通计划 3-商品计划 7-原生RTA + } + } + + # 使用广告主名称查询 + # response = make_baidu_api_request("原生-SLG-乱世-安卓12A20KA00006", request_body) + + # 使用媒体账户ID查询 + response = get_baidu_campaign_request("12466757256", request_body) + + print(json.dumps(response, ensure_ascii=False)) diff --git a/chuangliangBaidu/baiduMaterialProjectSelect/get_baidu_token.py b/chuangliangBaidu/baiduMaterialProjectSelect/get_baidu_token.py new file mode 100644 index 0000000..52f4ca3 --- /dev/null +++ b/chuangliangBaidu/baiduMaterialProjectSelect/get_baidu_token.py @@ -0,0 +1,97 @@ +from chuangliangTool.db_base import Chuangliang_ad +from typing import Optional, Union, Dict + + +class GetBaiduToken: + # 定义两种查询方式的SQL模板 + TOKEN_QUERY_BY_NAME = """ + SELECT advertiser_name, access_token + FROM chuangliang_ad.media_account + FORCE INDEX(idx_media_type_advertiser_status_is_active) + WHERE media_type = 'baidu' + AND is_delete = 0 + AND advertiser_name = %s + AND product_version = 0 + LIMIT 1 + """ + + TOKEN_QUERY_BY_ID = """ + SELECT advertiser_name, access_token + FROM chuangliang_ad.media_account + FORCE INDEX(idx_media_type_advertiser_status_is_active) + WHERE media_type = 'baidu' + AND is_delete = 0 + AND media_account_id = %s + AND product_version = 0 + LIMIT 1 + """ + + def __init__(self, identifier: Union[str, int]): + """ + 初始化方法 + :param identifier: 可以是广告主名称(str)或媒体账户ID(int/str) + """ + self.identifier = identifier + self.header = None # 缓存header数据避免重复查询 + + def get_header(self) -> Optional[Dict[str, str]]: + """获取包含用户名和令牌的header字典,优先返回缓存值""" + if self.header is not None: + return self.header + + try: + result = self._query_token() + if result: + advertiser_name, access_token = result[0] + self.header = { + "userName": advertiser_name, + "accessToken": access_token + } + return self.header + else: + print(f"No active token found for: {self.identifier}") + return None + except Exception as e: + print(f"Error fetching token: {str(e)}") + return None + + def _query_token(self) -> list: + """根据标识符类型执行参数化SQL查询""" + # 判断标识符类型并选择相应的SQL模板 + if self._is_media_account_id(): + sql = self.TOKEN_QUERY_BY_ID + param = int(self.identifier) # 确保转换为整数 + else: + sql = self.TOKEN_QUERY_BY_NAME + param = self.identifier + + return Chuangliang_ad.query_params(sql, (param,)) + + def _is_media_account_id(self) -> bool: + """判断标识符是否符合media_account_id的特征""" + # 检查是否为11位纯数字 + if isinstance(self.identifier, int): + return len(str(self.identifier)) == 11 + + if isinstance(self.identifier, str): + return self.identifier.isdigit() and len(self.identifier) == 11 + + return False + + +# 使用示例 +if __name__ == "__main__": + # 使用广告主名称查询 + # token_getter1 = GetBaiduToken("原生-SLG-乱世-安卓12A20KA00006") + # header1 = token_getter1.get_header() + # print(header1) + + # 使用媒体账户ID查询(字符串形式) + token_getter2 = GetBaiduToken("12466757256") + header2 = token_getter2.get_header() + print(header2) + # + # # 使用媒体账户ID查询(整数形式) + # token_getter3 = GetBaiduToken(12466757256) + # header3 = token_getter3.get_header() + # print(header3) \ No newline at end of file diff --git a/chuangliangBaidu/urls.py b/chuangliangBaidu/urls.py index af0519a..ade6e65 100644 --- a/chuangliangBaidu/urls.py +++ b/chuangliangBaidu/urls.py @@ -12,5 +12,6 @@ from . import views urlpatterns = [ # path('datetime/', views.get_datetime, name='get_datetime'), path('api/task_details/', views.get_task_details, name='task_details'), - path('api/magic_task/', views.get_magic_task_info, name='magic_task_info') + path('api/magic_task/', views.get_magic_task_info, name='magic_task_info'), + path('api/task_list/', views.get_task_data_by_rule_name, name='task_data_by_rule_name') ] \ No newline at end of file diff --git a/chuangliangBaidu/views.py b/chuangliangBaidu/views.py index 4bdccc7..dd1deec 100644 --- a/chuangliangBaidu/views.py +++ b/chuangliangBaidu/views.py @@ -15,6 +15,8 @@ from django.utils import timezone from django.shortcuts import render import json import logging +from django.views.decorators.csrf import csrf_exempt +from chuangliangProject.magic_cat.rule_task import RuleTaskQuery logger = logging.getLogger('chuangliang') @@ -172,4 +174,37 @@ def get_magic_task_info(request): return JsonResponse({ 'status': 'error', 'message': f'查询失败: {str(e)}' + }, status=500, json_dumps_params={'ensure_ascii': False}) + + +@csrf_exempt # 如果不需要CSRF保护,可以加上这个装饰器 +@require_GET # 确保只接受GET请求 +def get_task_data_by_rule_name(request): + """ + 通过rule_name参数查询任务数据的接口 + """ + # 获取请求参数 + rule_name = request.GET.get('rule_name') + + # 检查参数是否提供 + if not rule_name: + return JsonResponse({ + 'success': False, + 'message': '缺少必要参数: rule_name', + 'data': [] + }, status=400, json_dumps_params={'ensure_ascii': False}) + + try: + # 创建查询对象并获取数据 + rule_query = RuleTaskQuery(rule_name) + result = rule_query.get_comprehensive_data() + # 返回成功响应 + return JsonResponse(result, status=200, json_dumps_params={'ensure_ascii': False}) + + except Exception as e: + # 处理异常情况 + return JsonResponse({ + 'success': False, + 'message': f'查询过程中发生错误: {str(e)}', + 'data': [] }, status=500, json_dumps_params={'ensure_ascii': False}) \ No newline at end of file diff --git a/chuangliangProject/magic_cat/rule_task.py b/chuangliangProject/magic_cat/rule_task.py new file mode 100644 index 0000000..d415e62 --- /dev/null +++ b/chuangliangProject/magic_cat/rule_task.py @@ -0,0 +1,246 @@ +import json +import pandas as pd +from typing import List, Dict, Any, Optional, Union +from chuangliangTool.db_base import Chuangliang_ad_magic_task_skit, Chuangliang_ad_magic_cut +import requests +from functools import lru_cache + + +def get_task_param(task_id: int) -> Union[Dict[str, Any], str]: + """获取任务参数""" + try: + # 使用字符串格式化而不是参数化查询 + task_param_sql = f""" + SELECT status, task_param + FROM chuangliang_ad_task_skit.task_log_client_mix_shear + WHERE task_id = {task_id} + """ + task_param_result = Chuangliang_ad_magic_task_skit.query_database_select_to_dataframe(task_param_sql) + + # 检查结果是否为空 + if task_param_result and len(task_param_result) > 0: + row = task_param_result[0] # 列表中的第一个元素 + status, task_param = row['status'], row['task_param'] + + if status == 'success': + full_url = f"https://tos.mobgi.com/{task_param}" + response = requests.get(full_url, timeout=10) + response.raise_for_status() + return response.json() + return task_param + return "No task parameter found for the given task_id" + except Exception as e: + return f"Error fetching task param: {str(e)}" + + +class RuleTaskQuery: + def __init__(self, rule_name: str): + self.rule_name = rule_name + self._rule_id = None + self._rule_info = None + self._number_man_rule = None + + @lru_cache(maxsize=None) + def get_rule_id(self) -> Optional[int]: + """获取规则ID,使用缓存避免重复查询""" + # 使用字符串格式化而不是参数化查询 + rule_id_sql = f""" + SELECT id + FROM chuangliang_ad_magic_cut.magic_fission_rule + WHERE name = '{self.rule_name}' + """ + result = Chuangliang_ad_magic_cut.query_database_select_to_dataframe(rule_id_sql) + + # 检查结果是否为空 + if result and len(result) > 0: + return result[0]['id'] # 列表中的第一个元素 + print(f"未找到规则 '{self.rule_name}' 对应的ID") + return None + + def get_task_list(self) -> List[int]: + """获取任务ID列表""" + rule_id = self.get_rule_id() + if not rule_id: + return [] + + # 使用字符串格式化而不是参数化查询 + task_list_sql = f""" + SELECT task_id + FROM chuangliang_ad_magic_cut.magic_rule_result + WHERE rule_id = {rule_id} + """ + task_list = Chuangliang_ad_magic_cut.query_database_select_to_dataframe(task_list_sql) + + # 提取任务ID列表 + if task_list and len(task_list) > 0: + return [task['task_id'] for task in task_list] + return [] + + def _parse_json_field(self, json_str: str) -> Union[Dict[str, Any], str]: + """安全解析JSON字符串""" + try: + return json.loads(json_str) + except (json.JSONDecodeError, TypeError): + return json_str + + def _process_task_result(self, result: Dict[str, Any]) -> Dict[str, Any]: + """处理单个任务结果""" + if 'result_data' in result: + result['result_data'] = self._parse_json_field(result['result_data']) + return result + + def _process_number_man_result(self, result: Dict[str, Any]) -> Dict[str, Any]: + """处理数字人合成任务结果""" + if 'task_param' in result: + result['task_param'] = self._parse_json_field(result['task_param']) + + if 'result_data' in result: + result_data = self._parse_json_field(result['result_data']) + if isinstance(result_data, dict) and 'result' in result_data: + result_data['result'] = self._parse_json_field(result_data['result']) + result['result_data'] = result_data + + return result + + def get_task_list_result(self) -> List[Dict[str, Any]]: + """获取任务结果列表""" + task_ids = self.get_task_list() + if not task_ids: + return [] + + # 分批处理避免SQL过长 + batch_size = 1000 + results = [] + for i in range(0, len(task_ids), batch_size): + batch_ids = task_ids[i:i + batch_size] + # 使用字符串格式化而不是参数化查询 + task_ids_str = ",".join(map(str, batch_ids)) + + task_result_sql = f""" + SELECT task_id, status, result_data + FROM chuangliang_ad_task_skit.task_log_client_mix_shear + WHERE task_id IN ({task_ids_str}) + """ + batch_results = Chuangliang_ad_magic_task_skit.query_database_select_to_dataframe(task_result_sql) + + if batch_results and len(batch_results) > 0: + results.extend(batch_results) + + return [self._process_task_result(r) for r in results] if results else [] + + def get_number_man_task_list_result(self) -> List[Dict[str, Any]]: + """获取数字人合成任务结果列表""" + task_ids = self.get_task_list() + if not task_ids: + return [] + + # 分批处理避免SQL过长 + batch_size = 1000 + results = [] + for i in range(0, len(task_ids), batch_size): + batch_ids = task_ids[i:i + batch_size] + # 使用字符串格式化而不是参数化查询 + task_ids_str = ",".join(map(str, batch_ids)) + + number_man_sql = f""" + SELECT task_id, status, task_param, result_data + FROM chuangliang_ad_task_skit.task_log_client_number_man + WHERE task_id IN ({task_ids_str}) + """ + batch_results = Chuangliang_ad_magic_task_skit.query_database_select_to_dataframe(number_man_sql) + + if batch_results and len(batch_results) > 0: + results.extend(batch_results) + + return [self._process_number_man_result(r) for r in results] if results else [] + + def get_rule_info(self) -> Union[Dict[str, Any], str]: + """获取规则批次提交信息""" + if self._rule_info is not None: + return self._rule_info + + rule_id = self.get_rule_id() + if not rule_id: + return {} + + # 使用字符串格式化而不是参数化查询 + rule_info_sql = f""" + SELECT value + FROM chuangliang_ad_magic_cut.magic_fission_rule_extend + WHERE fission_rule_id = {rule_id} AND field = 'preview' + """ + result = Chuangliang_ad_magic_cut.query_database_select_to_dataframe(rule_info_sql) + + if result and len(result) > 0: + value = result[0]['value'] # 列表中的第一个元素 + self._rule_info = self._parse_json_field(value) + return self._rule_info + + print(f"未找到规则ID {rule_id} 的批次提交信息") + return {} + + def get_number_man_rule(self) -> Union[Dict[str, Any], str]: + """获取数字人合成批次提交信息""" + if self._number_man_rule is not None: + return self._number_man_rule + + rule_id = self.get_rule_id() + if not rule_id: + return {} + + # 使用字符串格式化而不是参数化查询 + number_man_sql = f""" + SELECT value + FROM chuangliang_ad_magic_cut.magic_fission_rule_extend + WHERE fission_rule_id = {rule_id} AND field = 'content' + """ + result = Chuangliang_ad_magic_cut.query_database_select_to_dataframe(number_man_sql) + + if result and len(result) > 0: + value = result[0]['value'] # 列表中的第一个元素 + self._number_man_rule = self._parse_json_field(value) + return self._number_man_rule + + print(f"未找到数字人合成ID {rule_id} 的批次提交信息") + return {} + + def get_comprehensive_data(self) -> Dict[str, Any]: + """获取完整的规则数据""" + rule_id = self.get_rule_id() + if not rule_id: + return { + "rule_name": self.rule_name, + "error": "Rule ID not found" + } + + rule_info = self.get_rule_info() + use_fallback = not rule_info + + rule_data = self.get_number_man_rule() if use_fallback else rule_info + task_results = self.get_number_man_task_list_result() if use_fallback else self.get_task_list_result() + + return { + "rule_name": self.rule_name, + "rule_id": rule_id, + "info_task_count": len(rule_data) if isinstance(rule_data, dict) else 0, + "rule_info": rule_data, + "task_count": len(task_results), + "tasks": task_results + } + + +def main(): + """主函数示例""" + rule_name = '数字人成片-20251015-114418' + rule_query = RuleTaskQuery(rule_name) + + # 获取完整数据 + result = rule_query.get_comprehensive_data() + man_result = rule_query.get_number_man_task_list_result() + # result = rule_query.get_task_list_result() + # result = get_task_param(32743070) + print(json.dumps(man_result, ensure_ascii=False)) + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/chuangliangProject/magic_cat/task_client_magic.py b/chuangliangProject/magic_cat/task_client_magic.py index a5b7b21..19a6d94 100644 --- a/chuangliangProject/magic_cat/task_client_magic.py +++ b/chuangliangProject/magic_cat/task_client_magic.py @@ -152,7 +152,7 @@ class MagicTaskQuery: def main(): """主函数示例""" - task_id = 31544930 + task_id = 32616647 task_query = MagicTaskQuery(task_id) result = task_query.get_task_info() diff --git a/chuangliangTool/db_base.py b/chuangliangTool/db_base.py index 82c9c9b..fbe54ae 100644 --- a/chuangliangTool/db_base.py +++ b/chuangliangTool/db_base.py @@ -35,6 +35,14 @@ class JumpHostDatabaseConnector: results = cursor.fetchall() return results + def query_params(self, query: str, params=None): + with self.connect_to_jump_host() as server: + with self.connect_to_database(server) as conn: + with conn.cursor() as cursor: + cursor.execute(query, params) + results = cursor.fetchall() + return results + def query_database_select_field_names(self, query): with self.connect_to_jump_host() as server: with self.connect_to_database(server) as conn: @@ -147,6 +155,18 @@ config_chuangliang_ad_magic_task_skit = { 'database_password': "Chuangliang@2023", 'database_name': "chuangliang_ad_task_skit" } +# 生产魔剪任务库配置 +config_chuangliang_ad = { + 'jump_host_ip': "180.184.103.38", + 'jump_host_port': 2222, + 'jump_host_user': "cl_ad", + 'jump_host_password': "4CGbdPW2zkbewcp^", + 'database_ip': "mysqla825647ee78f.rds.ivolces.com", + 'database_port': 3306, + 'database_user': "cl_readOnly", + 'database_password': "Chuangliang@2023", + 'database_name': "chuangliang_ad" +} # 实例化数据库连接器 UC_common_db = JumpHostDatabaseConnector(config_common) @@ -157,6 +177,7 @@ Material2_db = JumpHostDatabaseConnector(config_material2) Async_create_ad_batch_db = JumpHostDatabaseConnector(config_async_create_ad_batch) Chuangliang_ad_magic_cut = JumpHostDatabaseConnector(config_chuangliang_ad_magic_cut) Chuangliang_ad_magic_task_skit = JumpHostDatabaseConnector(config_chuangliang_ad_magic_task_skit) +Chuangliang_ad = JumpHostDatabaseConnector(config_chuangliang_ad) if __name__ == '__main__': sql = "SELECT * FROM chuangliang_ad_magic_cut.config_apollo WHERE `key` = 'client_task_num_day_limit'"