添加魔剪批次查询接口添加百度媒体广告查询

This commit is contained in:
xiaohai 2025-10-28 15:34:21 +08:00
parent 7140734107
commit 5acf004937
9 changed files with 569 additions and 2 deletions

View File

@ -0,0 +1,75 @@
import requests
import json
from typing import Union
from get_baidu_token import GetBaiduToken
def get_baidu_adgroup_request(identifier: Union[str, int], body: dict) -> dict:
"""
向百度API发送请求
:param identifier: 广告主名称或媒体账户ID
:param body: 请求体内容
:return: API响应结果
"""
# 获取header
token_getter = GetBaiduToken(identifier)
header = token_getter.get_header()
if not header:
return {"error": "Failed to get valid header"}
# 构建完整请求负载
payload = {
"header": header,
"body": body
}
# API端点
url = "https://api.baidu.com/json/feed/v1/AdgroupFeedService/getAdgroupFeed"
# 请求头
http_headers = {
"Accept-Encoding": "gzip, deflate",
"Content-Type": "application/json",
"Accept": "application/json"
}
try:
# 发送请求
response = requests.post(
url,
data=json.dumps(payload),
headers=http_headers
)
# 返回JSON响应
return response.json()
except Exception as e:
return {"error": f"API request failed: {str(e)}"}
# 使用示例
if __name__ == "__main__":
# 定义请求体
request_body = {
"adgroupFeedFields": [
"adgroupFeedId",
"campaignFeedId",
"adgroupFeedName",
"pause",
"bid",
"producttypes",
"ftypes",
"status",
"bidtype",
"ocpc",
"atpFeedId",
"deliveryType",
"productSetId",
"ftypeSelection",
"bidSource",
"atpName"
],
"ids": [1413521619], # 广告计划ID
"idType": 1, # ID类型1-计划ID2-单元ID
}
# 使用广告主名称查询
# response = make_baidu_api_request("原生-SLG-乱世-安卓12A20KA00006", request_body)
# 使用媒体账户ID查询
response = get_baidu_adgroup_request("12466757256", request_body)
print(json.dumps(response, ensure_ascii=False))

View File

@ -0,0 +1,92 @@
import requests
import json
from typing import Union
from get_baidu_token import GetBaiduToken
def get_baidu_campaign_request(identifier: Union[str, int], body: dict) -> dict:
"""
向百度API发送请求
:param identifier: 广告主名称或媒体账户ID
:param body: 请求体内容
:return: API响应结果
"""
# 获取header
token_getter = GetBaiduToken(identifier)
header = token_getter.get_header()
if not header:
return {"error": "Failed to get valid header"}
# 构建完整请求负载
payload = {
"header": header,
"body": body
}
# API端点
url = "https://api.baidu.com/json/feed/v1/CampaignFeedService/getCampaignFeed"
# 请求头
http_headers = {
"Accept-Encoding": "gzip, deflate",
"Content-Type": "application/json",
"Accept": "application/json"
}
try:
# 发送请求
response = requests.post(
url,
data=json.dumps(payload),
headers=http_headers
)
# 返回JSON响应
return response.json()
except Exception as e:
return {"error": f"API request failed: {str(e)}"}
# 使用示例
if __name__ == "__main__":
# 定义请求体
request_body = {
"campaignFeedFields": [ # 需要查询的字段列表
"campaignFeedId",
"campaignFeedName",
"subject",
"appinfo",
"budget",
"starttime",
"endtime",
"schedule",
"pause",
"status",
"bstype",
"addtime",
"eshopType",
"rtaStatus",
"bid",
"bidtype",
"ftypes",
"ocpc",
"bmcUserId",
"catalogId",
"projectFeedId",
"productType",
"appSubType",
"deliveryType",
"miniProgramType",
"bidMode",
"productIds",
"saleType"
],
"campaignFeedIds": [1413521619], # 广告计划ID
"campaignFeedFilter": {
"bstype": [1] # 广告类型 1-普通计划 3-商品计划 7-原生RTA
}
}
# 使用广告主名称查询
# response = make_baidu_api_request("原生-SLG-乱世-安卓12A20KA00006", request_body)
# 使用媒体账户ID查询
response = get_baidu_campaign_request("12466757256", request_body)
print(json.dumps(response, ensure_ascii=False))

View File

@ -0,0 +1,97 @@
from chuangliangTool.db_base import Chuangliang_ad
from typing import Optional, Union, Dict
class GetBaiduToken:
# 定义两种查询方式的SQL模板
TOKEN_QUERY_BY_NAME = """
SELECT advertiser_name, access_token
FROM chuangliang_ad.media_account
FORCE INDEX(idx_media_type_advertiser_status_is_active)
WHERE media_type = 'baidu'
AND is_delete = 0
AND advertiser_name = %s
AND product_version = 0
LIMIT 1
"""
TOKEN_QUERY_BY_ID = """
SELECT advertiser_name, access_token
FROM chuangliang_ad.media_account
FORCE INDEX(idx_media_type_advertiser_status_is_active)
WHERE media_type = 'baidu'
AND is_delete = 0
AND media_account_id = %s
AND product_version = 0
LIMIT 1
"""
def __init__(self, identifier: Union[str, int]):
"""
初始化方法
:param identifier: 可以是广告主名称(str)或媒体账户ID(int/str)
"""
self.identifier = identifier
self.header = None # 缓存header数据避免重复查询
def get_header(self) -> Optional[Dict[str, str]]:
"""获取包含用户名和令牌的header字典优先返回缓存值"""
if self.header is not None:
return self.header
try:
result = self._query_token()
if result:
advertiser_name, access_token = result[0]
self.header = {
"userName": advertiser_name,
"accessToken": access_token
}
return self.header
else:
print(f"No active token found for: {self.identifier}")
return None
except Exception as e:
print(f"Error fetching token: {str(e)}")
return None
def _query_token(self) -> list:
"""根据标识符类型执行参数化SQL查询"""
# 判断标识符类型并选择相应的SQL模板
if self._is_media_account_id():
sql = self.TOKEN_QUERY_BY_ID
param = int(self.identifier) # 确保转换为整数
else:
sql = self.TOKEN_QUERY_BY_NAME
param = self.identifier
return Chuangliang_ad.query_params(sql, (param,))
def _is_media_account_id(self) -> bool:
"""判断标识符是否符合media_account_id的特征"""
# 检查是否为11位纯数字
if isinstance(self.identifier, int):
return len(str(self.identifier)) == 11
if isinstance(self.identifier, str):
return self.identifier.isdigit() and len(self.identifier) == 11
return False
# 使用示例
if __name__ == "__main__":
# 使用广告主名称查询
# token_getter1 = GetBaiduToken("原生-SLG-乱世-安卓12A20KA00006")
# header1 = token_getter1.get_header()
# print(header1)
# 使用媒体账户ID查询字符串形式
token_getter2 = GetBaiduToken("12466757256")
header2 = token_getter2.get_header()
print(header2)
#
# # 使用媒体账户ID查询整数形式
# token_getter3 = GetBaiduToken(12466757256)
# header3 = token_getter3.get_header()
# print(header3)

View File

@ -12,5 +12,6 @@ from . import views
urlpatterns = [ urlpatterns = [
# path('datetime/', views.get_datetime, name='get_datetime'), # path('datetime/', views.get_datetime, name='get_datetime'),
path('api/task_details/', views.get_task_details, name='task_details'), path('api/task_details/', views.get_task_details, name='task_details'),
path('api/magic_task/', views.get_magic_task_info, name='magic_task_info') path('api/magic_task/', views.get_magic_task_info, name='magic_task_info'),
path('api/task_list/', views.get_task_data_by_rule_name, name='task_data_by_rule_name')
] ]

View File

@ -15,6 +15,8 @@ from django.utils import timezone
from django.shortcuts import render from django.shortcuts import render
import json import json
import logging import logging
from django.views.decorators.csrf import csrf_exempt
from chuangliangProject.magic_cat.rule_task import RuleTaskQuery
logger = logging.getLogger('chuangliang') logger = logging.getLogger('chuangliang')
@ -172,4 +174,37 @@ def get_magic_task_info(request):
return JsonResponse({ return JsonResponse({
'status': 'error', 'status': 'error',
'message': f'查询失败: {str(e)}' 'message': f'查询失败: {str(e)}'
}, status=500, json_dumps_params={'ensure_ascii': False})
@csrf_exempt # 如果不需要CSRF保护可以加上这个装饰器
@require_GET # 确保只接受GET请求
def get_task_data_by_rule_name(request):
"""
通过rule_name参数查询任务数据的接口
"""
# 获取请求参数
rule_name = request.GET.get('rule_name')
# 检查参数是否提供
if not rule_name:
return JsonResponse({
'success': False,
'message': '缺少必要参数: rule_name',
'data': []
}, status=400, json_dumps_params={'ensure_ascii': False})
try:
# 创建查询对象并获取数据
rule_query = RuleTaskQuery(rule_name)
result = rule_query.get_comprehensive_data()
# 返回成功响应
return JsonResponse(result, status=200, json_dumps_params={'ensure_ascii': False})
except Exception as e:
# 处理异常情况
return JsonResponse({
'success': False,
'message': f'查询过程中发生错误: {str(e)}',
'data': []
}, status=500, json_dumps_params={'ensure_ascii': False}) }, status=500, json_dumps_params={'ensure_ascii': False})

View File

@ -0,0 +1,246 @@
import json
import pandas as pd
from typing import List, Dict, Any, Optional, Union
from chuangliangTool.db_base import Chuangliang_ad_magic_task_skit, Chuangliang_ad_magic_cut
import requests
from functools import lru_cache
def get_task_param(task_id: int) -> Union[Dict[str, Any], str]:
"""获取任务参数"""
try:
# 使用字符串格式化而不是参数化查询
task_param_sql = f"""
SELECT status, task_param
FROM chuangliang_ad_task_skit.task_log_client_mix_shear
WHERE task_id = {task_id}
"""
task_param_result = Chuangliang_ad_magic_task_skit.query_database_select_to_dataframe(task_param_sql)
# 检查结果是否为空
if task_param_result and len(task_param_result) > 0:
row = task_param_result[0] # 列表中的第一个元素
status, task_param = row['status'], row['task_param']
if status == 'success':
full_url = f"https://tos.mobgi.com/{task_param}"
response = requests.get(full_url, timeout=10)
response.raise_for_status()
return response.json()
return task_param
return "No task parameter found for the given task_id"
except Exception as e:
return f"Error fetching task param: {str(e)}"
class RuleTaskQuery:
def __init__(self, rule_name: str):
self.rule_name = rule_name
self._rule_id = None
self._rule_info = None
self._number_man_rule = None
@lru_cache(maxsize=None)
def get_rule_id(self) -> Optional[int]:
"""获取规则ID使用缓存避免重复查询"""
# 使用字符串格式化而不是参数化查询
rule_id_sql = f"""
SELECT id
FROM chuangliang_ad_magic_cut.magic_fission_rule
WHERE name = '{self.rule_name}'
"""
result = Chuangliang_ad_magic_cut.query_database_select_to_dataframe(rule_id_sql)
# 检查结果是否为空
if result and len(result) > 0:
return result[0]['id'] # 列表中的第一个元素
print(f"未找到规则 '{self.rule_name}' 对应的ID")
return None
def get_task_list(self) -> List[int]:
"""获取任务ID列表"""
rule_id = self.get_rule_id()
if not rule_id:
return []
# 使用字符串格式化而不是参数化查询
task_list_sql = f"""
SELECT task_id
FROM chuangliang_ad_magic_cut.magic_rule_result
WHERE rule_id = {rule_id}
"""
task_list = Chuangliang_ad_magic_cut.query_database_select_to_dataframe(task_list_sql)
# 提取任务ID列表
if task_list and len(task_list) > 0:
return [task['task_id'] for task in task_list]
return []
def _parse_json_field(self, json_str: str) -> Union[Dict[str, Any], str]:
"""安全解析JSON字符串"""
try:
return json.loads(json_str)
except (json.JSONDecodeError, TypeError):
return json_str
def _process_task_result(self, result: Dict[str, Any]) -> Dict[str, Any]:
"""处理单个任务结果"""
if 'result_data' in result:
result['result_data'] = self._parse_json_field(result['result_data'])
return result
def _process_number_man_result(self, result: Dict[str, Any]) -> Dict[str, Any]:
"""处理数字人合成任务结果"""
if 'task_param' in result:
result['task_param'] = self._parse_json_field(result['task_param'])
if 'result_data' in result:
result_data = self._parse_json_field(result['result_data'])
if isinstance(result_data, dict) and 'result' in result_data:
result_data['result'] = self._parse_json_field(result_data['result'])
result['result_data'] = result_data
return result
def get_task_list_result(self) -> List[Dict[str, Any]]:
"""获取任务结果列表"""
task_ids = self.get_task_list()
if not task_ids:
return []
# 分批处理避免SQL过长
batch_size = 1000
results = []
for i in range(0, len(task_ids), batch_size):
batch_ids = task_ids[i:i + batch_size]
# 使用字符串格式化而不是参数化查询
task_ids_str = ",".join(map(str, batch_ids))
task_result_sql = f"""
SELECT task_id, status, result_data
FROM chuangliang_ad_task_skit.task_log_client_mix_shear
WHERE task_id IN ({task_ids_str})
"""
batch_results = Chuangliang_ad_magic_task_skit.query_database_select_to_dataframe(task_result_sql)
if batch_results and len(batch_results) > 0:
results.extend(batch_results)
return [self._process_task_result(r) for r in results] if results else []
def get_number_man_task_list_result(self) -> List[Dict[str, Any]]:
"""获取数字人合成任务结果列表"""
task_ids = self.get_task_list()
if not task_ids:
return []
# 分批处理避免SQL过长
batch_size = 1000
results = []
for i in range(0, len(task_ids), batch_size):
batch_ids = task_ids[i:i + batch_size]
# 使用字符串格式化而不是参数化查询
task_ids_str = ",".join(map(str, batch_ids))
number_man_sql = f"""
SELECT task_id, status, task_param, result_data
FROM chuangliang_ad_task_skit.task_log_client_number_man
WHERE task_id IN ({task_ids_str})
"""
batch_results = Chuangliang_ad_magic_task_skit.query_database_select_to_dataframe(number_man_sql)
if batch_results and len(batch_results) > 0:
results.extend(batch_results)
return [self._process_number_man_result(r) for r in results] if results else []
def get_rule_info(self) -> Union[Dict[str, Any], str]:
"""获取规则批次提交信息"""
if self._rule_info is not None:
return self._rule_info
rule_id = self.get_rule_id()
if not rule_id:
return {}
# 使用字符串格式化而不是参数化查询
rule_info_sql = f"""
SELECT value
FROM chuangliang_ad_magic_cut.magic_fission_rule_extend
WHERE fission_rule_id = {rule_id} AND field = 'preview'
"""
result = Chuangliang_ad_magic_cut.query_database_select_to_dataframe(rule_info_sql)
if result and len(result) > 0:
value = result[0]['value'] # 列表中的第一个元素
self._rule_info = self._parse_json_field(value)
return self._rule_info
print(f"未找到规则ID {rule_id} 的批次提交信息")
return {}
def get_number_man_rule(self) -> Union[Dict[str, Any], str]:
"""获取数字人合成批次提交信息"""
if self._number_man_rule is not None:
return self._number_man_rule
rule_id = self.get_rule_id()
if not rule_id:
return {}
# 使用字符串格式化而不是参数化查询
number_man_sql = f"""
SELECT value
FROM chuangliang_ad_magic_cut.magic_fission_rule_extend
WHERE fission_rule_id = {rule_id} AND field = 'content'
"""
result = Chuangliang_ad_magic_cut.query_database_select_to_dataframe(number_man_sql)
if result and len(result) > 0:
value = result[0]['value'] # 列表中的第一个元素
self._number_man_rule = self._parse_json_field(value)
return self._number_man_rule
print(f"未找到数字人合成ID {rule_id} 的批次提交信息")
return {}
def get_comprehensive_data(self) -> Dict[str, Any]:
"""获取完整的规则数据"""
rule_id = self.get_rule_id()
if not rule_id:
return {
"rule_name": self.rule_name,
"error": "Rule ID not found"
}
rule_info = self.get_rule_info()
use_fallback = not rule_info
rule_data = self.get_number_man_rule() if use_fallback else rule_info
task_results = self.get_number_man_task_list_result() if use_fallback else self.get_task_list_result()
return {
"rule_name": self.rule_name,
"rule_id": rule_id,
"info_task_count": len(rule_data) if isinstance(rule_data, dict) else 0,
"rule_info": rule_data,
"task_count": len(task_results),
"tasks": task_results
}
def main():
"""主函数示例"""
rule_name = '数字人成片-20251015-114418'
rule_query = RuleTaskQuery(rule_name)
# 获取完整数据
result = rule_query.get_comprehensive_data()
man_result = rule_query.get_number_man_task_list_result()
# result = rule_query.get_task_list_result()
# result = get_task_param(32743070)
print(json.dumps(man_result, ensure_ascii=False))
if __name__ == "__main__":
main()

View File

@ -152,7 +152,7 @@ class MagicTaskQuery:
def main(): def main():
"""主函数示例""" """主函数示例"""
task_id = 31544930 task_id = 32616647
task_query = MagicTaskQuery(task_id) task_query = MagicTaskQuery(task_id)
result = task_query.get_task_info() result = task_query.get_task_info()

View File

@ -35,6 +35,14 @@ class JumpHostDatabaseConnector:
results = cursor.fetchall() results = cursor.fetchall()
return results return results
def query_params(self, query: str, params=None):
with self.connect_to_jump_host() as server:
with self.connect_to_database(server) as conn:
with conn.cursor() as cursor:
cursor.execute(query, params)
results = cursor.fetchall()
return results
def query_database_select_field_names(self, query): def query_database_select_field_names(self, query):
with self.connect_to_jump_host() as server: with self.connect_to_jump_host() as server:
with self.connect_to_database(server) as conn: with self.connect_to_database(server) as conn:
@ -147,6 +155,18 @@ config_chuangliang_ad_magic_task_skit = {
'database_password': "Chuangliang@2023", 'database_password': "Chuangliang@2023",
'database_name': "chuangliang_ad_task_skit" 'database_name': "chuangliang_ad_task_skit"
} }
# 生产魔剪任务库配置
config_chuangliang_ad = {
'jump_host_ip': "180.184.103.38",
'jump_host_port': 2222,
'jump_host_user': "cl_ad",
'jump_host_password': "4CGbdPW2zkbewcp^",
'database_ip': "mysqla825647ee78f.rds.ivolces.com",
'database_port': 3306,
'database_user': "cl_readOnly",
'database_password': "Chuangliang@2023",
'database_name': "chuangliang_ad"
}
# 实例化数据库连接器 # 实例化数据库连接器
UC_common_db = JumpHostDatabaseConnector(config_common) UC_common_db = JumpHostDatabaseConnector(config_common)
@ -157,6 +177,7 @@ Material2_db = JumpHostDatabaseConnector(config_material2)
Async_create_ad_batch_db = JumpHostDatabaseConnector(config_async_create_ad_batch) Async_create_ad_batch_db = JumpHostDatabaseConnector(config_async_create_ad_batch)
Chuangliang_ad_magic_cut = JumpHostDatabaseConnector(config_chuangliang_ad_magic_cut) Chuangliang_ad_magic_cut = JumpHostDatabaseConnector(config_chuangliang_ad_magic_cut)
Chuangliang_ad_magic_task_skit = JumpHostDatabaseConnector(config_chuangliang_ad_magic_task_skit) Chuangliang_ad_magic_task_skit = JumpHostDatabaseConnector(config_chuangliang_ad_magic_task_skit)
Chuangliang_ad = JumpHostDatabaseConnector(config_chuangliang_ad)
if __name__ == '__main__': if __name__ == '__main__':
sql = "SELECT * FROM chuangliang_ad_magic_cut.config_apollo WHERE `key` = 'client_task_num_day_limit'" sql = "SELECT * FROM chuangliang_ad_magic_cut.config_apollo WHERE `key` = 'client_task_num_day_limit'"