Compare commits

...

3 Commits

43 changed files with 2551 additions and 0 deletions

8
.idea/.gitignore vendored Normal file
View File

@ -0,0 +1,8 @@
# Default ignored files
/shelf/
/workspace.xml
# Editor-based HTTP Client requests
/httpRequests/
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml

View File

@ -0,0 +1,30 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="FacetManager">
<facet type="django" name="Django">
<configuration>
<option name="rootFolder" value="$MODULE_DIR$" />
<option name="settingsModule" value="chuangliangProject/settings.py" />
<option name="manageScript" value="$MODULE_DIR$/manage.py" />
<option name="environment" value="&lt;map/&gt;" />
<option name="doNotUseTestRunner" value="false" />
<option name="trackFilePattern" value="migrations" />
</configuration>
</facet>
</component>
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$">
<excludeFolder url="file://$MODULE_DIR$/.venv" />
</content>
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
<component name="TemplatesService">
<option name="TEMPLATE_CONFIGURATION" value="Django" />
<option name="TEMPLATE_FOLDERS">
<list>
<option value="$MODULE_DIR$/../chuangliangProject\templates" />
</list>
</option>
</component>
</module>

View File

@ -0,0 +1,6 @@
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>

7
.idea/misc.xml Normal file
View File

@ -0,0 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Black">
<option name="sdkName" value="Python 3.12 (chuangliangProject)" />
</component>
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.12 (chuangliangProject)" project-jdk-type="Python SDK" />
</project>

8
.idea/modules.xml Normal file
View File

@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/chuangliangProject.iml" filepath="$PROJECT_DIR$/.idea/chuangliangProject.iml" />
</modules>
</component>
</project>

6
.idea/vcs.xml Normal file
View File

@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
</component>
</project>

View File

Binary file not shown.

Binary file not shown.

View File

@ -0,0 +1,16 @@
"""
ASGI config for chuangliangProject project.
It exposes the ASGI callable as a module-level variable named ``application``.
For more information on this file, see
https://docs.djangoproject.com/en/5.2/howto/deployment/asgi/
"""
import os
from django.core.asgi import get_asgi_application
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'chuangliangProject.settings')
application = get_asgi_application()

View File

@ -0,0 +1,124 @@
"""
Django settings for chuangliangProject project.
Generated by 'django-admin startproject' using Django 5.2.4.
For more information on this file, see
https://docs.djangoproject.com/en/5.2/topics/settings/
For the full list of settings and their values, see
https://docs.djangoproject.com/en/5.2/ref/settings/
"""
from pathlib import Path
# Build paths inside the project like this: BASE_DIR / 'subdir'.
BASE_DIR = Path(__file__).resolve().parent.parent
# Quick-start development settings - unsuitable for production
# See https://docs.djangoproject.com/en/5.2/howto/deployment/checklist/
# SECURITY WARNING: keep the secret key used in production secret!
SECRET_KEY = 'django-insecure-*6+s74!7pkn%w23)!n(!mq^3h_trt%e1*kgq8aknwqco&kkfhk'
# SECURITY WARNING: don't run with debug turned on in production!
DEBUG = True
ALLOWED_HOSTS = []
# Application definition
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'chuanglinagBaidu'
]
MIDDLEWARE = [
'django.middleware.security.SecurityMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'django.middleware.common.CommonMiddleware',
'django.middleware.csrf.CsrfViewMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
]
ROOT_URLCONF = 'chuangliangProject.urls'
TEMPLATES = [
{
'BACKEND': 'django.template.backends.django.DjangoTemplates',
'DIRS': [BASE_DIR / 'templates']
,
'APP_DIRS': True,
'OPTIONS': {
'context_processors': [
'django.template.context_processors.request',
'django.contrib.auth.context_processors.auth',
'django.contrib.messages.context_processors.messages',
],
},
},
]
WSGI_APPLICATION = 'chuangliangProject.wsgi.application'
# Database
# https://docs.djangoproject.com/en/5.2/ref/settings/#databases
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.sqlite3',
'NAME': BASE_DIR / 'db.sqlite3',
}
}
# Password validation
# https://docs.djangoproject.com/en/5.2/ref/settings/#auth-password-validators
AUTH_PASSWORD_VALIDATORS = [
{
'NAME': 'django.contrib.auth.password_validation.UserAttributeSimilarityValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.MinimumLengthValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.CommonPasswordValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.NumericPasswordValidator',
},
]
# Internationalization
# https://docs.djangoproject.com/en/5.2/topics/i18n/
LANGUAGE_CODE = 'en-us'
TIME_ZONE = 'UTC'
USE_I18N = True
USE_TZ = True
# Static files (CSS, JavaScript, Images)
# https://docs.djangoproject.com/en/5.2/howto/static-files/
STATIC_URL = 'static/'
# Default primary key field type
# https://docs.djangoproject.com/en/5.2/ref/settings/#default-auto-field
DEFAULT_AUTO_FIELD = 'django.db.models.BigAutoField'

View File

@ -0,0 +1,23 @@
"""
URL configuration for chuangliangProject project.
The `urlpatterns` list routes URLs to views. For more information please see:
https://docs.djangoproject.com/en/5.2/topics/http/urls/
Examples:
Function views
1. Add an import: from my_app import views
2. Add a URL to urlpatterns: path('', views.home, name='home')
Class-based views
1. Add an import: from other_app.views import Home
2. Add a URL to urlpatterns: path('', Home.as_view(), name='home')
Including another URLconf
1. Import the include() function: from django.urls import include, path
2. Add a URL to urlpatterns: path('blog/', include('blog.urls'))
"""
from django.contrib import admin
from django.urls import include, path
urlpatterns = [
path('admin/', admin.site.urls),
path('chuanglinagBaidu/', include('chuanglinagBaidu.urls')),
]

View File

@ -0,0 +1,16 @@
"""
WSGI config for chuangliangProject project.
It exposes the WSGI callable as a module-level variable named ``application``.
For more information on this file, see
https://docs.djangoproject.com/en/5.2/howto/deployment/wsgi/
"""
import os
from django.core.wsgi import get_wsgi_application
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'chuangliangProject.settings')
application = get_wsgi_application()

View File

@ -0,0 +1,8 @@
# -*- coding: utf-8 -*-
"""
@Time : 2025/7/9 17:36
@Auth : 九月的海
@File : __init__.py.py
@IDE : PyCharm
@Motto : Catch as catch can....
"""

Binary file not shown.

Binary file not shown.

126
chuangliangTool/db_base.py Normal file
View File

@ -0,0 +1,126 @@
from sshtunnel import SSHTunnelForwarder
import pymysql
class JumpHostDatabaseConnector:
def __init__(self, config):
self.config = config
def connect_to_jump_host(self):
server = SSHTunnelForwarder(
(self.config['jump_host_ip'], self.config['jump_host_port']),
ssh_username=self.config['jump_host_user'],
ssh_password=self.config['jump_host_password'],
remote_bind_address=(self.config['database_ip'], self.config['database_port'])
)
server.start()
return server
def connect_to_database(self, server):
conn = pymysql.connect(
host='127.0.0.1', # 这里必须填127.0.0.1
port=server.local_bind_port,
user=self.config['database_user'],
password=self.config['database_password'],
db=self.config['database_name']
)
return conn
def query_database(self, query):
with self.connect_to_jump_host() as server:
with self.connect_to_database(server) as conn:
with conn.cursor() as cursor:
cursor.execute(query)
results = cursor.fetchall()
return results
# 配置字典
config_common = {
'jump_host_ip': "180.184.103.38",
'jump_host_port': 2222,
'jump_host_user': "cl_ad",
'jump_host_password': "4CGbdPW2zkbewcp^",
'database_ip': "mysqlc684e4271035.rds.ivolces.com",
'database_port': 3306,
'database_user': "cl_readOnly",
'database_password': "Chuangliang@2023",
'database_name': "chuangliang_ad_ucx_common"
}
config_task = {
'jump_host_ip': "180.184.103.38",
'jump_host_port': 2222,
'jump_host_user': "cl_ad",
'jump_host_password': "4CGbdPW2zkbewcp^",
'database_ip': "mysql94eab4cb64ee.rds.ivolces.com",
'database_port': 3306,
'database_user': "cl_readOnly",
'database_password': "Chuangliang@2023",
'database_name': "chuangliang_ad_task_ucx"
}
config_baidu_task = {
'jump_host_ip': "180.184.103.38",
'jump_host_port': 2222,
'jump_host_user': "cl_ad",
'jump_host_password': "4CGbdPW2zkbewcp^",
'database_ip': "mysqlf546d0debe54.rds.ivolces.com",
'database_port': 3306,
'database_user': "cl_readOnly",
'database_password': "Chuangliang@2023",
'database_name': "chuangliang_ad_task_baidu"
}
config_material1 = {
'jump_host_ip': "180.184.103.38",
'jump_host_port': 2222,
'jump_host_user': "cl_ad",
'jump_host_password': "4CGbdPW2zkbewcp^",
'database_ip': "mysql7f735bf46c55.rds.ivolces.com",
'database_port': 3306,
'database_user': "cl_readOnly",
'database_password': "Chuangliang@2023",
'database_name': "chuangliang_ad_material"
}
config_material2 = {
'jump_host_ip': "180.184.103.38",
'jump_host_port': 2222,
'jump_host_user': "cl_ad",
'jump_host_password': "4CGbdPW2zkbewcp^",
'database_ip': "mysql58c5730efa71.rds.ivolces.com",
'database_port': 3306,
'database_user': "cl_readOnly",
'database_password': "Chuangliang@2023",
'database_name': "chuangliang_ad_material"
}
config_async_create_ad_batch = {
'jump_host_ip': "180.184.103.38",
'jump_host_port': 2222,
'jump_host_user': "cl_ad",
'jump_host_password': "4CGbdPW2zkbewcp^",
'database_ip': "mysqlf43ac72e50a0.rds.ivolces.com",
'database_port': 3306,
'database_user': "cl_readOnly",
'database_password': "Chuangliang@2023",
'database_name': "chuangliang_ad_task"
}
# 实例化数据库连接器
UC_common_db = JumpHostDatabaseConnector(config_common)
UC_task_db = JumpHostDatabaseConnector(config_task)
Bd_task_db = JumpHostDatabaseConnector(config_baidu_task)
Material1_db = JumpHostDatabaseConnector(config_material1)
Material2_db = JumpHostDatabaseConnector(config_material2)
Async_create_ad_batch_db = JumpHostDatabaseConnector(config_async_create_ad_batch)
if __name__ == '__main__':
sql = "SELECT * FROM `task_log_ucx_batch_add_ad` where batch_id = '20241110104432b71b64129f0d11efb0d200163e0d187b'"
baidu_task_batchid_sql = "SELECT batch_id, async_task_id,create_status FROM `async_create_ad_batch` where " \
"async_task_id = 14027844245 "
# 执行查询
results = Async_create_ad_batch_db.query_database(baidu_task_batchid_sql)
print(results)

View File

@ -0,0 +1,275 @@
# -*- coding: utf-8 -*-
"""
@Time : 2025/6/16 14:12
@Auth : 九月的海
@File : suCaiDownload.py
@IDE : PyCharm
@Motto : September_sea
"""
import os
import time
import requests
import traceback
from urllib.parse import urljoin
from concurrent.futures import ThreadPoolExecutor
from db_base import Material1_db # 导入数据库模块
class MaterialDownloader:
def __init__(self, base_url="https://tos.mobgi.com"):
self.base_url = base_url
self.download_dir = "D:\\创量\\A素材\\数据库素材批量下载"
# 确保下载目录存在
self.ensure_directory_exists(self.download_dir)
def set_download_dir(self, directory):
"""设置下载目录并确保它存在"""
self.download_dir = directory
return self.ensure_directory_exists(directory)
def ensure_directory_exists(self, directory):
"""确保目录存在,如果不存在则创建"""
if not os.path.exists(directory):
try:
os.makedirs(directory)
print(f"创建下载目录: {directory}")
return True
except Exception as e:
print(f"无法创建目录 {directory}: {str(e)}")
return False
return True
def sanitize_filename(self, filename):
"""清理文件名中的无效字符"""
invalid_chars = ['<', '>', ':', '"', '/', '\\', '|', '?', '*']
for char in invalid_chars:
filename = filename.replace(char, '_')
return filename.strip()
def download_file(self, full_url, save_path):
"""下载单个文件到本地"""
try:
start_time = time.time()
# 确保文件所在目录存在
save_dir = os.path.dirname(save_path)
if not self.ensure_directory_exists(save_dir):
return False, f"无法创建文件目录: {save_dir}"
response = requests.get(full_url, stream=True, timeout=30) # 增加超时时间到30秒
response.raise_for_status()
file_size = int(response.headers.get('Content-Length', 0))
downloaded = 0
last_percent = 0
# 使用临时文件名下载,完成后重命名为目标文件名
temp_path = f"{save_path}.download"
with open(temp_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
downloaded += len(chunk)
# 显示下载进度(当文件大小已知时)
if file_size > 0:
percent = int(downloaded * 100 / file_size)
# 每10%的进度更新一次显示
if percent - last_percent >= 10 or percent == 100:
print(
f"已下载: {percent}% ({downloaded / 1024 / 1024:.2f}MB/{file_size / 1024 / 1024:.2f}MB)")
last_percent = percent
# 下载完成后重命名
os.rename(temp_path, save_path)
elapsed = time.time() - start_time
size_mb = os.path.getsize(save_path) / (1024 * 1024)
speed = size_mb / elapsed if elapsed > 0 else 0
return True, f"下载成功! 大小: {size_mb:.2f}MB, 耗时: {elapsed:.1f}秒, 速度: {speed:.1f}MB/s"
except Exception as e:
# 清理临时文件
if 'temp_path' in locals() and os.path.exists(temp_path):
os.remove(temp_path)
return False, f"下载失败: {str(e)}"
def process_material(self, material):
"""处理单个素材记录"""
material_id, material_name, file_uri = material
# 生成完整URL
full_url = urljoin(self.base_url, file_uri)
# 清理文件名
safe_name = self.sanitize_filename(material_name)
# 提取文件扩展名
extension = os.path.splitext(file_uri)[1] or '.bin' # 如果没有扩展名默认.bin
# 生成保存路径 (文件名格式: 素材ID_清理后的素材名.扩展名)
# 确保文件名中不包含路径分隔符
filename = f"{material_id}_{safe_name}{extension}"
save_path = os.path.join(self.download_dir, filename)
# 检查文件是否已存在
if os.path.exists(save_path):
return {
'material_id': material_id,
'material_name': material_name,
'url': full_url,
'save_path': save_path,
'success': True,
'message': f"文件已存在 (大小: {os.path.getsize(save_path) / (1024 * 1024):.2f}MB)"
}
# 下载文件
success, message = self.download_file(full_url, save_path)
return {
'material_id': material_id,
'material_name': material_name,
'url': full_url,
'save_path': save_path,
'success': success,
'message': message
}
def batch_download(self, sql_query=None, max_workers=5):
"""
批量下载素材
:param sql_query: 可选的自定义SQL查询
:param max_workers: 并发下载线程数
:return: 下载结果列表
"""
# 默认查询语句
if not sql_query:
sql_query = """
SELECT material_id, material_name, file_uri
FROM `material`
USE INDEX (idx_cTime_mainUserId)
WHERE main_user_id = '12400078500'
AND create_time BETWEEN '2025-04-29 17:53:37' AND '2025-04-30 17:53:37'
AND create_time >= '2025-04-27'
AND material_type = 'video'
AND file_uri LIKE 'tos_beijing/%'
LIMIT 50
"""
# 使用正确的数据库查询方式
try:
print(f"正在执行SQL查询...")
# 通过模块方法查询数据库
materials = Material1_db.query_database(sql_query)
if not materials:
print("未查询到符合条件的素材")
return []
print(f"\n查询到 {len(materials)} 个素材, 开始下载...\n")
# 保存开始时间
start_time = time.time()
# 使用线程池并发下载
results = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(self.process_material, mat) for mat in materials]
# 使用tqdm显示进度
try:
from tqdm import tqdm
for future in tqdm(futures, desc="下载进度", unit="文件"):
result = future.result()
results.append(result)
# 立即显示结果
self.print_result(result)
except ImportError:
# 如果没有安装tqdm使用普通循环
for future in futures:
result = future.result()
results.append(result)
# 立即显示结果
self.print_result(result)
# 计算总耗时
total_time = time.time() - start_time
# 打印统计信息
success_count = sum(1 for r in results if r['success'])
print(f"\n{'=' * 60}")
print(f"下载完成! 成功: {success_count}/{len(results)}")
print(f"总耗时: {total_time:.2f}")
print(f"下载目录: {self.download_dir}")
print(f"{'=' * 60}")
# 列出所有成功下载的文件
if success_count > 0:
print("\n成功下载的文件:")
for result in results:
if result['success']:
print(f"- {os.path.basename(result['save_path'])}")
# 如果有失败的下载,创建失败列表
failed_downloads = [r for r in results if not r['success']]
if failed_downloads:
print("\n下载失败的文件:")
for result in failed_downloads:
print(f"- {result['material_name']} (ID: {result['material_id']}) - 原因: {result['message']}")
return results
except Exception as e:
# 避免使用特殊Unicode字符
print(f"\n[错误] 数据库查询失败: {str(e)}")
traceback.print_exc() # 打印详细的错误堆栈
return []
def print_result(self, result):
"""打印单个下载结果"""
material_info = f"{result['material_id']} - {result['material_name']}"
save_path = result['save_path']
message = result['message']
# 下载成功的特殊格式
if result['success']:
print(f"\n[成功] {material_info} - {message}")
else:
print(f"\n[失败] {material_info} - {message}")
# 输出保存路径
print(f" 保存位置: {save_path}")
print(f" 下载地址: {result['url']}")
if __name__ == "__main__":
downloader = MaterialDownloader()
# 自定义下载目录
custom_dir = "D:\\创量\\A素材\\1500个素材10~100s批量小牛24年12月份素材"
# 确保自定义目录存在
if not downloader.set_download_dir(custom_dir):
print(f"无法创建下载目录: {custom_dir}")
exit(1)
print("+" * 60)
print("开始素材批量下载程序")
print(f"下载目录: {downloader.download_dir}")
print("+" * 60)
print()
# 使用自定义查询下载
custom_sql = """
SELECT material_id, material_name, file_uri
FROM `material`
USE INDEX (idx_cTime_mainUserId)
WHERE main_user_id = '9687'
AND create_time BETWEEN '2024-12-01 17:53:37' AND '2024-12-30 17:53:37'
AND video_duration BETWEEN 10 AND 100
AND material_type = 'video'
AND file_uri LIKE 'tos_beijing/%'
LIMIT 1500;
"""
# 设置并行下载数量
max_workers = 5
print(f"使用 {max_workers} 个并发线程下载")
results = downloader.batch_download(sql_query=custom_sql, max_workers=max_workers)

View File

@ -0,0 +1,368 @@
# -*- coding: utf-8 -*-
"""
@Time : 2025/6/16 14:12
@Auth : 九月的海
@File : suCaiDownload.py
@IDE : PyCharm
@Motto : September_sea
"""
import os
import time
import requests
import traceback
import random
from urllib.parse import urljoin
from concurrent.futures import ThreadPoolExecutor
from db_base import Material1_db # 导入数据库模块
class MaterialDownloader:
def __init__(self, base_url="https://tos.mobgi.com"):
"""
素材下载器初始化
:param base_url: 素材基础URL前缀
"""
self.base_url = base_url
self.download_dir = "D:\\创量\\A素材\\数据库素材批量下载" # 使用双反斜杠避免转义问题
self.base_save_directory = None
self.subfolder_count = 0
self.files_per_folder = 0
self.current_file_counts = {}
# 确保下载目录存在
self.ensure_directory_exists(self.download_dir)
def set_download_dir(self, directory):
"""设置下载目录并确保它存在"""
self.download_dir = directory
return self.ensure_directory_exists(directory)
def setup_subfolder_download(self, base_directory, subfolder_count=12, files_per_folder=40):
"""
设置分层文件夹下载结构
:param base_directory: 基础目录路径
:param subfolder_count: 子文件夹数量
:param files_per_folder: 每个子文件夹最大文件数量
:return: True表示设置成功False表示失败
"""
self.base_save_directory = base_directory
self.subfolder_count = subfolder_count
self.files_per_folder = files_per_folder
# 创建基础目录
if not self.ensure_directory_exists(self.base_save_directory):
return False
# 创建子文件夹1到subfolder_count
for i in range(1, self.subfolder_count + 1):
folder_path = os.path.join(self.base_save_directory, str(i))
if not self.ensure_directory_exists(folder_path):
return False
# 初始化文件夹文件计数器
self.current_file_counts = {str(i): 0 for i in range(1, self.subfolder_count + 1)}
print(f"已创建子文件夹: 1-{subfolder_count},每个文件夹最多保存 {files_per_folder} 个文件")
return True
def get_next_subfolder(self):
"""获取下一个有空间的子文件夹"""
if not self.current_file_counts:
return "1"
# 找出文件数量最少的文件夹
min_count = min(self.current_file_counts.values())
min_folders = [folder for folder, count in self.current_file_counts.items() if count == min_count]
# 随机选择一个最小文件的文件夹(确保分布均匀)
folder = random.choice(min_folders)
# 增加该文件夹的计数
self.current_file_counts[folder] += 1
return folder
def ensure_directory_exists(self, directory):
"""确保目录存在,如果不存在则创建"""
if not os.path.exists(directory):
try:
os.makedirs(directory)
print(f"创建下载目录: {directory}")
return True
except Exception as e:
print(f"无法创建目录 {directory}: {str(e)}")
return False
return True
def sanitize_filename(self, filename):
"""清理文件名中的无效字符"""
invalid_chars = ['<', '>', ':', '"', '/', '\\', '|', '?', '*']
for char in invalid_chars:
filename = filename.replace(char, '_')
return filename.strip()
def download_file(self, full_url, save_path):
"""下载单个文件到本地"""
try:
start_time = time.time()
# 确保文件所在目录存在
save_dir = os.path.dirname(save_path)
if not self.ensure_directory_exists(save_dir):
return False, f"无法创建文件目录: {save_dir}"
response = requests.get(full_url, stream=True, timeout=30) # 增加超时时间到30秒
response.raise_for_status()
file_size = int(response.headers.get('Content-Length', 0))
downloaded = 0
last_percent = 0
# 使用临时文件名下载,完成后重命名为目标文件名
temp_path = f"{save_path}.download"
with open(temp_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
downloaded += len(chunk)
# 显示下载进度(当文件大小已知时)
if file_size > 0:
percent = int(downloaded * 100 / file_size)
# 每10%的进度更新一次显示
if percent - last_percent >= 10 or percent == 100:
print(
f"已下载: {percent}% ({downloaded / 1024 / 1024:.2f}MB/{file_size / 1024 / 1024:.2f}MB)")
last_percent = percent
# 下载完成后重命名
os.rename(temp_path, save_path)
elapsed = time.time() - start_time
size_mb = os.path.getsize(save_path) / (1024 * 1024)
speed = size_mb / elapsed if elapsed > 0 else 0
return True, f"下载成功! 大小: {size_mb:.2f}MB, 耗时: {elapsed:.1f}秒, 速度: {speed:.1f}MB/s"
except Exception as e:
# 清理临时文件
if 'temp_path' in locals() and os.path.exists(temp_path):
os.remove(temp_path)
return False, f"下载失败: {str(e)}"
def process_material(self, material):
"""处理单个素材记录"""
# 确保我们有正确的三个字段
if len(material) < 3:
return {
'material_id': 'unknown',
'material_name': 'invalid_data',
'url': 'invalid_data',
'save_path': 'invalid_data',
'success': False,
'message': f"数据库记录字段不足预期3个实际得到{len(material)}个: {material}"
}
material_id = material[0]
material_name = material[1] if len(material) > 1 else 'unknown'
file_uri = material[2] if len(material) > 2 else 'unknown'
# 生成完整URL
full_url = urljoin(self.base_url, file_uri) if file_uri != 'unknown' else 'unknown'
# 清理文件名
safe_name = self.sanitize_filename(material_name)
# 提取文件扩展名
extension = os.path.splitext(file_uri)[1] if file_uri != 'unknown' else '.bin'
# 生成保存路径 (文件名格式: 素材ID_清理后的素材名.扩展名)
# 确保文件名中不包含路径分隔符
filename = f"{material_id}_{safe_name}{extension}"
# 确定保存位置 - 如果有设置子文件夹结构,则使用;否则使用统一目录
if self.base_save_directory and self.subfolder_count > 0:
# 获取下一个有空间的子文件夹
folder_name = self.get_next_subfolder()
# 生成路径:基础路径/文件夹名/文件名
save_path = os.path.join(self.base_save_directory, folder_name, filename)
else:
save_path = os.path.join(self.download_dir, filename)
# 检查文件是否已存在
if os.path.exists(save_path):
return {
'material_id': material_id,
'material_name': material_name,
'url': full_url,
'save_path': save_path,
'success': True,
'message': f"文件已存在 (大小: {os.path.getsize(save_path) / (1024 * 1024):.2f}MB)"
}
# 如果URL无效直接返回失败
if full_url == 'unknown':
return {
'material_id': material_id,
'material_name': material_name,
'url': full_url,
'save_path': save_path,
'success': False,
'message': "无效的文件URI"
}
# 下载文件
success, message = self.download_file(full_url, save_path)
return {
'material_id': material_id,
'material_name': material_name,
'url': full_url,
'save_path': save_path,
'success': success,
'message': message
}
def batch_download(self, sql_query=None, max_workers=5):
"""
批量下载素材
:param sql_query: 可选的自定义SQL查询
:param max_workers: 并发下载线程数
:return: 下载结果列表
"""
# 默认查询语句
if not sql_query:
sql_query = """
SELECT material_id, material_name, file_uri
FROM `material`
USE INDEX (idx_cTime_mainUserId)
WHERE main_user_id = '12400078500'
AND create_time BETWEEN '2025-04-29 17:53:37' AND '2025-04-30 17:53:37'
AND create_time >= '2025-04-27'
AND material_type = 'video'
AND file_uri LIKE 'tos_beijing/%'
LIMIT 50
"""
# 使用正确的数据库查询方式
try:
print(f"正在执行SQL查询: {sql_query}")
# 通过模块方法查询数据库
materials = Material1_db.query_database(sql_query)
if not materials:
print("未查询到符合条件的素材")
return []
# 打印查询结果中的第一条记录
if materials:
print(f"查询到的第一条记录: {materials[0]}")
print(f"\n查询到 {len(materials)} 个素材, 开始下载...\n")
# 保存开始时间
start_time = time.time()
# 使用线程池并发下载
results = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(self.process_material, mat) for mat in materials]
# 使用tqdm显示进度可选
try:
from tqdm import tqdm
for future in tqdm(futures, desc="下载进度", unit="文件"):
result = future.result()
results.append(result)
# 立即显示结果
self.print_result(result)
except ImportError:
# 如果没有安装tqdm使用普通循环
for future in futures:
result = future.result()
results.append(result)
# 立即显示结果
self.print_result(result)
# 计算总耗时
total_time = time.time() - start_time
# 打印统计信息
success_count = sum(1 for r in results if r['success'])
print(f"\n{'=' * 60}")
print(f"下载完成! 成功: {success_count}/{len(results)}")
print(f"总耗时: {total_time:.2f}")
if self.base_save_directory:
print(f"下载目录: {self.base_save_directory} (分为{self.subfolder_count}个子文件夹)")
else:
print(f"下载目录: {self.download_dir}")
print(f"{'=' * 60}")
# 列出所有成功下载的文件
if success_count > 0:
print("\n成功下载的文件:")
for result in results:
if result['success']:
print(f"- {os.path.basename(result['save_path'])}")
# 如果有失败的下载,创建失败列表
failed_downloads = [r for r in results if not r['success']]
if failed_downloads:
print("\n下载失败的文件:")
for result in failed_downloads:
print(f"- {result['material_name']} (ID: {result['material_id']}) - 原因: {result['message']}")
return results
except Exception as e:
# 避免使用特殊Unicode字符
print(f"\n[错误] 批量下载失败: {str(e)}")
traceback.print_exc() # 打印详细的错误堆栈
return []
def print_result(self, result):
"""打印单个下载结果"""
material_info = f"{result['material_id']} - {result['material_name']}"
save_path = result['save_path']
message = result['message']
# 下载成功的特殊格式
if result['success']:
print(f"\n[成功] {material_info} - {message}")
else:
print(f"\n[失败] {material_info} - {message}")
# 输出保存路径
print(f" 保存位置: {save_path}")
if 'url' in result and result['url'] != 'unknown':
print(f" 下载地址: {result['url']}")
# 使用示例
if __name__ == "__main__":
downloader = MaterialDownloader()
# 设置分层文件夹下载结构
# 在该目录下创建30个子文件夹1-30每个文件夹最多存放2个文件
if not downloader.setup_subfolder_download(
base_directory="D:\\创量\\A素材\\80个文件夹素材",
subfolder_count=80,
files_per_folder=1
):
print("无法创建下载目录结构")
exit(1)
print("+" * 60)
print("开始素材批量下载程序")
print(f"下载基础目录: {downloader.base_save_directory}")
print(f"子文件夹: 1-{downloader.subfolder_count}")
print(f"每个文件夹最多存放: {downloader.files_per_folder}个文件")
print("+" * 60)
print()
# 使用自定义查询下载
custom_sql = """
SELECT material_id, material_name,file_uri
FROM `material`
USE INDEX (idx_cTime_mainUserId)
WHERE main_user_id = '12000016859'
AND create_time BETWEEN '2025-07-14 17:53:37' AND '2025-07-18 17:53:37' -- 限定时间范围
AND video_duration BETWEEN 10 AND 30
AND material_type = 'video'
AND file_uri LIKE 'tos_beijing/%'
AND material_name NOT LIKE '%-衍生'
LIMIT 80;
"""
# 设置并行下载数量
max_workers = 5
print(f"使用 {max_workers} 个并发线程下载")
results = downloader.batch_download(sql_query=custom_sql, max_workers=max_workers)

View File

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -0,0 +1,3 @@
from django.contrib import admin
# Register your models here.

6
chuanglinagBaidu/apps.py Normal file
View File

@ -0,0 +1,6 @@
from django.apps import AppConfig
class ChuanglinagbaiduConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'chuanglinagBaidu'

View File

@ -0,0 +1,167 @@
from chuangliangTool.db_base import Bd_task_db, Async_create_ad_batch_db
from typing import List, Dict, Any, Optional
import json
import html
from django.db import migrations, models
class Migration(migrations.Migration): # 确保有 Migration 类
dependencies = [
# 声明依赖的迁移
]
operations = [
# 迁移操作列表
]
class AsyncTaskDataQuery:
def __init__(self, async_task_id: int):
"""
初始化异步任务数据查询
:param async_task_id: 异步任务ID (整数)
"""
self.async_task_id = async_task_id
self.batch_id = None
self.create_status = None
self.task_details = [] # 存储多个任务详情记录
def _query_first_database(self) -> bool:
"""
查询第一个数据库获取batch_id和create_status
:return: 查询是否成功
"""
sql_query = f"""
SELECT async_task_id, batch_id, create_status
FROM chuangliang_ad_task.async_create_ad_batch
WHERE async_task_id = {self.async_task_id}
"""
try:
results = Async_create_ad_batch_db.query_database(sql_query)
if not results:
print(f"警告: 未找到异步任务ID {self.async_task_id} 的记录")
return False
first_result = results[0]
if len(first_result) >= 3:
self.batch_id = first_result[1]
self.create_status = first_result[2]
return True
else:
print(f"错误: 查询结果字段不足, 预期3个字段, 实际{len(first_result)}个字段")
return False
except Exception as e:
print(f"查询第一个数据库时出错: {str(e)}")
return False
def _query_second_database(self) -> List[Dict[str, Any]]:
"""
使用batch_id查询第二个数据库获取任务详情
:return: 包含所有任务详情的字典列表
"""
if not self.batch_id:
print("错误: batch_id未初始化")
return []
sql_query = f"""
SELECT task_param, media_account_id, `status`, result_data
FROM chuangliang_ad_task_baidu.`task_log_baidu_batch_add`
WHERE batch_id = '{self.batch_id}'
"""
try:
results = Bd_task_db.query_database(sql_query)
if not results:
print(f"警告: 未找到batch_id {self.batch_id} 的记录")
return []
# 处理多个结果
task_logs = []
for i, row in enumerate(results, 1):
if len(row) < 4:
print(f"警告: 结果字段不足, 预期4个字段, 实际{len(row)}个字段 (记录#{i})")
continue
task_log = {
"log_index": i,
"task_param": row[0],
"media_account_id": row[1],
"operation_status": row[2],
"result_data": row[3]
}
task_logs.append(task_log)
print(f"找到 {len(task_logs)} 条任务日志记录")
return task_logs
except Exception as e:
print(f"查询第二个数据库时出错: {str(e)}")
return []
def get_task_details(self) -> Dict[str, Any]:
"""
获取完整的任务详情数据
:return: 包含所有所需字段的字典
"""
# 获取第一个数据库的结果
if not self._query_first_database():
return {
"async_task_id": self.async_task_id,
"error": "未找到批次信息"
}
# 获取第二个数据库的结果
task_logs = self._query_second_database()
# 组织返回结果
return {
"async_task_id": self.async_task_id,
"batch_id": self.batch_id,
"create_status": self.create_status,
"task_logs": task_logs,
"total_logs": len(task_logs)
}
@staticmethod
def format_output(data: Dict[str, Any]) -> str:
"""
格式化输出结果为带标识的可读字符串
"""
if not data or "error" in data:
error_msg = data.get("error", "未知错误") if data else "没有可用的数据"
return f"未能获取任务详情: {error_msg}"
# 标题和基本信息
output = [
f"\n========== 任务详情 [ID: {data['async_task_id']}] ==========",
f"• 批次ID: {data['batch_id']}",
f"• 创建状态: {data['create_status']}",
f"• 日志记录总数: {data['total_logs']}"
]
# 处理每个日志记录
for log in data.get("task_logs", []):
output.append(f"\n==== 任务日志 #{log['log_index']} ====")
output.append(f"• 媒体账户ID: {log.get('media_account_id', 'N/A')}")
output.append(f"• 操作状态: {log.get('operation_status', 'N/A')}")
output.append(f"\n[任务参数]\n{log.get('task_param', '无数据')}")
output.append(f"\n[结果数据]\n{log.get('result_data', '无数据')}")
output.append("-" * 40)
output.append("==========================================")
return "\n".join(output)
# 使用示例
if __name__ == "__main__":
# 创建查询实例 (使用实际任务ID)
task_query = AsyncTaskDataQuery(14038291158)
# 获取任务详情
task_details = task_query.get_task_details()
# 格式化输出结果
print(AsyncTaskDataQuery.format_output(task_details))

View File

View File

@ -0,0 +1,3 @@
from django.db import models
# Create your models here.

View File

@ -0,0 +1,3 @@
from django.test import TestCase
# Create your tests here.

15
chuanglinagBaidu/urls.py Normal file
View File

@ -0,0 +1,15 @@
# -*- coding: utf-8 -*-
"""
@Time : 2025/7/9 17:24
@Auth : 九月的海
@File : urls.py
@IDE : PyCharm
@Motto : Catch as catch can....
"""
from django.urls import path
from . import views
urlpatterns = [
# path('datetime/', views.get_datetime, name='get_datetime'),
path('api/task_details/', views.get_task_details, name='task_details')
]

337
chuanglinagBaidu/views.py Normal file
View File

@ -0,0 +1,337 @@
from django.http import JsonResponse
from django.views.decorators.http import require_GET
from django.utils import timezone # 使用 Django 的时区支持
import json
# 确保导入你的 AsyncTaskDataQuery 类
from chuanglinagBaidu.migrations.BaidutaskQuery import AsyncTaskDataQuery # 替换 your_module 为实际模块名
#
#
# # @require_GET
# def get_task_details(request):
# """获取任务详情的API接口"""
# # 获取任务ID参数
# task_id = request.GET.get('task_id')
# current_time = timezone.now().strftime("%Y-%m-%d %H:%M:%S")
#
# if not task_id:
# return JsonResponse({
# "code": 400,
# "message": "缺少参数: task_id",
# "time": current_time
# }, status=400)
#
# try:
# # 转换为整数
# task_id = int(task_id)
# except ValueError:
# return JsonResponse({
# "code": 400,
# "message": "task_id 必须是整数",
# "time": current_time
# }, status=400)
#
# # 创建查询实例并获取数据
# task_query = AsyncTaskDataQuery(task_id)
# result = task_query.get_task_details()
# # 获取任务日志并处理JSON嵌套
# task_logs = result.get("task_logs", [])
#
# # 处理可能的嵌套JSON字符串
# for log in task_logs:
# # 处理 task_param
# if isinstance(log.get('task_param'), str):
# try:
# log['task_param'] = json.loads(log['task_param'])
# except json.JSONDecodeError:
# # 可选的日志记录
# # logger.debug(f"无法解析 task_param: {log['task_param']}")
# pass
#
# # 处理 result_data
# if isinstance(log.get('result_data'), str):
# try:
# log['result_data'] = json.loads(log['result_data'])
# except json.JSONDecodeError:
# pass
#
# # 准备响应数据
# response_data = {
# "code": 200,
# "message": "成功",
# "time": timezone.now().strftime("%Y-%m-%d %H:%M:%S"),
# "task_id": task_id,
# "task_logs": task_logs,
# "batch_id": result.get("batch_id"),
# "create_status": result.get("create_status"),
# "total_logs": len(task_logs)
# }
#
# # 返回时禁用ASCII转义
# return JsonResponse(
# response_data,
# json_dumps_params={'ensure_ascii': False}
# )
# from django.http import JsonResponse, HttpResponse
# from django.views.decorators.http import require_GET
# from django.utils import timezone
# from django.shortcuts import render
# import json
# import re
# # 确保导入你的 AsyncTaskDataQuery 类
# from chuanglinagBaidu.migrations.BaidutaskQuery import AsyncTaskDataQuery # 替换为实际模块名
#
#
# def render_json_pretty(data, status=200):
# """渲染美观的JSON HTML视图"""
# # 准备响应数据 - 格式化JSON用于展示
# formatted_json = json.dumps(data, indent=4, ensure_ascii=False)
#
# # 生成HTML内容
# html_content = f"""
#
# """
# return HttpResponse(html_content, status=status, content_type='text/html')
#
#
# @require_GET
# def get_task_details(request):
# """获取任务详情的API接口"""
# # 获取任务ID参数
# task_id = request.GET.get('task_id')
# current_time = timezone.now().strftime("%Y-%m-%d %H:%M:%S")
# host = request.get_host()
#
# # 参数验证
# if not task_id:
# error_data = {
# "code": 400,
# "message": "缺少参数: task_id",
# "time": current_time,
# "path": request.path,
# "method": request.method,
# "host": host
# }
#
# # 检查是否为浏览器请求
# if 'text/html' in request.META.get('HTTP_ACCEPT', ''):
# return render_json_pretty(error_data, 400)
#
# return JsonResponse(
# error_data,
# status=400,
# json_dumps_params={'ensure_ascii': False}
# )
#
# try:
# # 转换为整数
# task_id = int(task_id)
# except ValueError:
# error_data = {
# "code": 400,
# "message": "task_id 必须是整数",
# "time": current_time,
# "provided_value": task_id,
# "path": request.path,
# "host": host
# }
#
# if 'text/html' in request.META.get('HTTP_ACCEPT', ''):
# return render_json_pretty(error_data, 400)
#
# return JsonResponse(
# error_data,
# status=400,
# json_dumps_params={'ensure_ascii': False}
# )
#
# # 创建查询实例并获取数据
# try:
# task_query = AsyncTaskDataQuery(task_id)
# result = task_query.get_task_details()
#
# # 处理任务日志并解析嵌套JSON
# task_logs = result.get("task_logs", [])
# for log in task_logs:
# # 处理 task_param
# if isinstance(log.get('task_param'), str):
# try:
# log['task_param'] = json.loads(log['task_param'])
# except json.JSONDecodeError:
# # 保留原始字符串并添加解析错误标记
# log['task_param'] = {
# "__parse_error": True,
# "raw_value": log['task_param']
# }
#
# # 处理 result_data
# if isinstance(log.get('result_data'), str):
# try:
# log['result_data'] = json.loads(log['result_data'])
# except json.JSONDecodeError:
# log['result_data'] = {
# "__parse_error": True,
# "raw_value": log['result_data']
# }
#
# # 准备响应数据
# response_data = {
# "code": 200,
# "message": "成功",
# "time": current_time,
# "task_id": task_id,
# "task_logs": task_logs,
# "batch_id": result.get("batch_id"),
# "create_status": result.get("create_status"),
# "total_logs": len(task_logs),
# "api_details": {
# "method": request.method,
# "path": request.path,
# "host": host
# }
# }
#
# # 检查是否为浏览器请求
# if 'text/html' in request.META.get('HTTP_ACCEPT', ''):
# return render_json_pretty(response_data)
#
# # 为JSON响应添加元数据
# response_data['api_details']['rendered_format'] = 'json'
# return JsonResponse(
# response_data,
# json_dumps_params={
# 'ensure_ascii': False,
# 'indent': 2 if 'pretty' in request.GET else None
# }
# )
#
# except Exception as e:
# # 记录异常信息
# error_data = {
# "code": 500,
# "message": "服务器内部错误",
# "error_detail": str(e),
# "time": current_time,
# "task_id": task_id,
# "host": host,
# "path": request.path
# }
#
# if 'text/html' in request.META.get('HTTP_ACCEPT', ''):
# return render_json_pretty(error_data, 500)
#
# return JsonResponse(
# error_data,
# status=500,
# json_dumps_params={'ensure_ascii': False}
# )
from django.http import JsonResponse
from django.views.decorators.http import require_GET
from django.utils import timezone
from django.shortcuts import render
import json
def get_task_details(request):
"""获取任务详情的API接口优化版"""
# 获取任务ID参数
task_id = request.GET.get('task_id')
current_time = timezone.now().strftime("%Y-%m-%d %H:%M:%S")
# 处理浏览器请求的Accept头
is_browser_request = 'text/html' in request.META.get('HTTP_ACCEPT', '')
# 参数验证
if not task_id:
error_data = {
"code": 400,
"message": "缺少参数: task_id",
"time": current_time,
"endpoint": request.path,
"method": request.method
}
if is_browser_request:
return render(request, 'json_formatter.html', {
"json_data": json.dumps(error_data, ensure_ascii=False),
"status": 400
})
return JsonResponse(error_data, status=400, json_dumps_params={'ensure_ascii': False})
try:
# 转换为整数
task_id = int(task_id)
except ValueError:
error_data = {
"code": 400,
"message": "task_id 必须是整数",
"time": current_time,
"provided_value": task_id
}
if is_browser_request:
return render(request, 'json_formatter.html', {
"json_data": json.dumps(error_data, ensure_ascii=False),
"status": 400
})
return JsonResponse(error_data, status=400, json_dumps_params={'ensure_ascii': False})
# 创建查询实例并获取数据
try:
task_query = AsyncTaskDataQuery(task_id)
result = task_query.get_task_details()
# 处理任务日志并解码中文
task_logs = result.get("task_logs", [])
for log in task_logs:
# 处理 task_param
if isinstance(log.get('task_param'), str):
try:
log['task_param'] = json.loads(log['task_param'])
except json.JSONDecodeError:
pass
# 处理 result_data
if isinstance(log.get('result_data'), str):
try:
log['result_data'] = json.loads(log['result_data'])
except json.JSONDecodeError:
pass
# 准备响应数据
response_data = {
"code": 200,
"message": "成功",
"time": current_time,
"task_id": task_id,
"task_logs": task_logs,
"batch_id": result.get("batch_id", ""),
"create_status": result.get("create_status", ""),
"total_logs": len(task_logs)
}
# 如果是浏览器请求返回HTML视图
if is_browser_request:
return render(request, 'json_formatter3.html', {
"json_data": response_data,
# "json_data": json.dumps(response_data, ensure_ascii=False, indent=2),
"status": 200,
"task_id": task_id
})
# 返回JSON响应
return JsonResponse(
response_data,
json_dumps_params={'ensure_ascii': False}
)
except Exception as e:
error_data = {
"code": 500,
"message": "服务器内部错误",
"error": str(e),
"time": current_time,
"task_id": task_id
}
print(e)
if is_browser_request:
return render(request, 'json_formatter.html', {
"json_data": json.dumps(error_data, ensure_ascii=False),
"status": 500
})
return JsonResponse(error_data, status=500, json_dumps_params={'ensure_ascii': False})

0
db.sqlite3 Normal file
View File

22
manage.py Normal file
View File

@ -0,0 +1,22 @@
#!/usr/bin/env python
"""Django's command-line utility for administrative tasks."""
import os
import sys
def main():
"""Run administrative tasks."""
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'chuangliangProject.settings')
try:
from django.core.management import execute_from_command_line
except ImportError as exc:
raise ImportError(
"Couldn't import Django. Are you sure it's installed and "
"available on your PYTHONPATH environment variable? Did you "
"forget to activate a virtual environment?"
) from exc
execute_from_command_line(sys.argv)
if __name__ == '__main__':
main()

View File

@ -0,0 +1,197 @@
<!DOCTYPE html>
<html lang="zh">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>JSON数据查看器</title>
<style>
:root {
--bg-color: #f8f9fa;
--panel-bg: #ffffff;
--border-color: #dee2e6;
--text-color: #212529;
--key-color: #d73a49;
--string-color: #032f62;
--number-color: #e36209;
--boolean-color: #0d6efd;
--null-color: #6f42c1;
}
body {
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, 'Helvetica Neue', Arial, sans-serif;
background-color: var(--bg-color);
color: var(--text-color);
margin: 0;
padding: 20px;
line-height: 1.5;
}
.container {
max-width: 98%;
margin: 0 auto;
}
.json-display {
background-color: var(--panel-bg);
border-radius: 6px;
padding: 15px;
overflow-x: auto;
box-shadow: 0 0.125rem 0.25rem rgba(0, 0, 0, 0.075);
border: 1px solid var(--border-color);
font-family: 'SFMono-Regular', Consolas, 'Liberation Mono', Menlo, monospace;
font-size: 14px;
line-height: 1.4;
white-space: pre-wrap;
margin-top: 10px;
word-wrap: break-word;
overflow-wrap: break-word;
}
.json-line {
display: block;
margin: 3px 0;
}
.json-key {
color: var(--key-color);
font-weight: bold;
}
.json-string { color: var(--string-color); }
.json-number { color: var(--number-color); }
.json-boolean { color: var(--boolean-color); }
.json-null { color: var(--null-color); }
.json-indent {
padding-left: 20px;
}
.header {
text-align: center;
margin-bottom: 15px;
padding-bottom: 10px;
border-bottom: 1px solid var(--border-color);
}
.json-brace {
color: #999;
}
</style>
</head>
<body>
<div class="container">
<div class="header">
<h1>JSON数据查看器</h1>
</div>
<div class="json-display" id="jsonContainer"></div>
</div>
{{ json_data|json_script:"external-json-data" }}
<script>
document.addEventListener('DOMContentLoaded', function() {
// 示例JSON数据
const jsonData = JSON.parse(document.getElementById('external-json-data').textContent);
// 渲染JSON数据
renderJson(jsonData);
function renderJson(data) {
const container = document.getElementById('jsonContainer');
container.innerHTML = '';
const formattedJson = formatJson(data, 0);
container.appendChild(formattedJson);
}
function formatJson(data, depth) {
const container = document.createElement('pre');
const type = typeof data;
const isArray = Array.isArray(data);
if (data === null) {
const line = document.createElement('span');
line.textContent = 'null';
line.classList.add('json-null');
container.appendChild(line);
return container;
}
if (type === 'string') {
const line = document.createElement('span');
line.textContent = `"${data}"`;
line.classList.add('json-string');
container.appendChild(line);
return container;
}
if (type === 'number') {
const line = document.createElement('span');
line.textContent = data;
line.classList.add('json-number');
container.appendChild(line);
return container;
}
if (type === 'boolean') {
const line = document.createElement('span');
line.textContent = data ? 'true' : 'false';
line.classList.add('json-boolean');
container.appendChild(line);
return container;
}
if (type === 'object') {
// 开括号
const openBrace = document.createElement('span');
openBrace.textContent = isArray ? '[' : '{';
openBrace.classList.add('json-brace');
container.appendChild(openBrace);
// 递归处理对象/数组的键值对
const entries = isArray ? data : Object.entries(data);
let count = 0;
const total = entries.length;
for (let key in entries) {
if (!isArray) {
key = entries[key][0];
}
const value = isArray ? data[key] : data[key];
const line = document.createElement('div');
line.classList.add('json-line');
line.classList.add('json-indent');
const keyElement = document.createElement('span');
if (!isArray) {
keyElement.textContent = `"${key}": `;
keyElement.classList.add('json-key');
} else {
// 数组索引
const indexSpan = document.createElement('span');
indexSpan.textContent = `${key}: `;
keyElement.appendChild(indexSpan);
}
// 值处理
const valueContainer = formatJson(value, depth + 1);
valueContainer.style.display = 'inline';
line.appendChild(keyElement);
line.appendChild(valueContainer);
// 添加逗号(最后一个元素不加)
if (++count < total) {
line.appendChild(document.createTextNode(','));
}
container.appendChild(line);
}
// 闭括号
const closeBrace = document.createElement('span');
closeBrace.textContent = isArray ? ']' : '}';
closeBrace.classList.add('json-brace');
container.appendChild(closeBrace);
}
return container;
}
});
</script>
</body>
</html>

View File

@ -0,0 +1,376 @@
<!DOCTYPE html>
<html lang="zh">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>JSON数据查看器 | 优化修复版</title>
<style>
/* 保持不变 */
:root {
--bg-color: #f8f9fa;
--panel-bg: #ffffff;
--border-color: #dee2e6;
--text-color: #212529;
--key-color: #d73a49;
--string-color: #032f62;
--number-color: #e36209;
--boolean-color: #0d6efd;
--null-color: #6f42c1;
}
body {
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, 'Helvetica Neue', Arial, sans-serif;
background-color: var(--bg-color);
color: var(--text-color);
margin: 0;
padding: 20px;
line-height: 1.5;
}
.container {
max-width: 98%;
margin: 0 auto;
}
.json-display {
background-color: var(--panel-bg);
border-radius: 6px;
padding: 15px;
overflow-x: auto;
box-shadow: 0 0.125rem 0.25rem rgba(0, 0, 0, 0.075);
border: 1px solid var(--border-color);
font-family: 'SFMono-Regular', Consolas, 'Liberation Mono', Menlo, monospace;
font-size: 14px;
line-height: 1.4;
margin-top: 10px;
word-wrap: break-word;
overflow-wrap: break-word;
}
.json-line {
display: block;
margin: 3px 0;
}
.json-key {
color: var(--key-color);
font-weight: bold;
}
.json-string { color: var(--string-color); }
.json-number { color: var(--number-color); }
.json-boolean { color: var(--boolean-color); }
.json-null { color: var(--null-color); }
.json-indent {
padding-left: 20px;
}
.header {
text-align: center;
margin-bottom: 15px;
padding-bottom: 10px;
border-bottom: 1px solid var(--border-color);
}
.json-brace {
color: #999;
/* 保持闭合括号在同一行 */
display: inline;
}
/* 修复: 确保闭合括号与内容在同一行 */
.json-close-container {
padding-left: 0;
display: inline;
}
/* 长字符串样式保持不变 */
.long-string-indicator {
background-color: #fff8e1;
padding: 2px 6px;
border-radius: 3px;
font-size: 11px;
color: #e36209;
display: inline-block;
margin-left: 5px;
}
.long-string-toggle {
color: #032f62;
font-size: 11px;
margin-left: 8px;
cursor: pointer;
user-select: none;
font-weight: bold;
}
.long-string-container {
padding: 5px 10px;
background-color: #f6f8fa;
border-left: 2px solid #e36209;
margin: 5px 0;
border-radius: 0 4px 4px 0;
position: relative;
overflow: hidden;
display: inline-block;
vertical-align: top;
}
.long-string-content {
padding-top: 5px;
padding-left: 15px;
}
.long-string-container.collapsed .long-string-content {
display: none;
}
.long-string-container::after {
content: "";
position: absolute;
bottom: 0;
left: 0;
right: 0;
height: 20px;
background: linear-gradient(to bottom, rgba(246, 248, 250, 0.2), #f6f8fa);
opacity: 1;
transition: opacity 0.3s ease;
pointer-events: none;
}
.long-string-container.collapsed::after {
opacity: 1;
}
.long-string-container:not(.collapsed)::after {
opacity: 0;
}
.indicator-wrapper {
display: inline-flex;
align-items: center;
}
/* 修复: 保持内联元素在一行 */
.inline-value {
display: inline;
}
/* 修复: 优化逗号显示位置 */
.json-comma {
display: inline;
}
</style>
</head>
<body>
<div class="container">
<div class="header">
<h1>百度任务数据查看器 | 优化版</h1>
</div>
<div class="json-display" id="jsonContainer"></div>
</div>
{{ json_data|json_script:"external-json-data" }}
<script>
document.addEventListener('DOMContentLoaded', function() {
// 测试数据 - 包括各种数据类型和嵌套结构
const jsonData = JSON.parse(document.getElementById('external-json-data').textContent);
// 渲染JSON数据
renderJson(jsonData);
function renderJson(data) {
const container = document.getElementById('jsonContainer');
container.innerHTML = '';
const formattedJson = formatJson(data, 0);
container.appendChild(formattedJson);
}
function formatJson(data, depth) {
const container = document.createElement('div');
container.style.position = 'relative';
const type = typeof data;
const isArray = Array.isArray(data);
if (data === null) {
const line = document.createElement('span');
line.textContent = 'null';
line.classList.add('json-null');
container.appendChild(line);
return container;
}
if (type === 'string') {
// 特殊处理长字符串尝试自动解析为JSON
if (data.length > 80 && (data.includes('{') || data.includes('['))) {
return createLongStringElement(data, depth);
}
const line = document.createElement('span');
line.textContent = `"${data}"`;
line.classList.add('json-string');
container.appendChild(line);
return container;
}
if (type === 'number') {
const line = document.createElement('span');
line.textContent = data;
line.classList.add('json-number');
container.appendChild(line);
return container;
}
if (type === 'boolean') {
const line = document.createElement('span');
line.textContent = data ? 'true' : 'false';
line.classList.add('json-boolean');
container.appendChild(line);
return container;
}
if (type === 'object') {
// 优化:处理空数组和空对象
const isEmptyArray = isArray && data.length === 0;
const isEmptyObject = !isArray && Object.keys(data).length === 0;
if (isEmptyArray || isEmptyObject) {
const span = document.createElement('span');
span.textContent = isEmptyArray ? '[]' : '{}';
span.classList.add(isEmptyArray ? 'empty-array' : 'empty-object');
return span;
}
// 开括号
const openBrace = document.createElement('span');
openBrace.textContent = isArray ? '[' : '{';
openBrace.classList.add('json-brace');
container.appendChild(openBrace);
// 递归处理对象/数组的键值对
const entries = isArray ? data : Object.entries(data);
let count = 0;
const total = entries.length;
for (let i = 0; i < entries.length; i++) {
const key = isArray ? i : entries[i][0];
const value = isArray ? entries[i] : entries[i][1];
const line = document.createElement('div');
line.classList.add('json-line', 'json-indent');
if (!isArray) {
const keyElement = document.createElement('span');
keyElement.textContent = `"${key}": `;
keyElement.classList.add('json-key');
line.appendChild(keyElement);
}
// 值处理
const valueContainer = formatJson(value, depth + 1);
// 修复: 确保值容器内联显示
valueContainer.classList.add('inline-value');
line.appendChild(valueContainer);
// 添加逗号(最后一个元素不加)
if (i < total - 1) {
const comma = document.createElement('span');
comma.textContent = ',';
comma.classList.add('json-comma');
line.appendChild(comma);
}
container.appendChild(line);
}
// 闭括号 - 固定在同一行
const closeContainer = document.createElement('div');
closeContainer.classList.add('json-line', 'json-close-container');
const closeBrace = document.createElement('span');
closeBrace.textContent = isArray ? ']' : '}';
closeBrace.classList.add('json-brace');
closeContainer.appendChild(closeBrace);
container.appendChild(closeContainer);
}
return container;
}
/******************************************************
* 自动解析JSON数据的核心功能 - 保持不变
******************************************************/
function createLongStringElement(value, depth) {
const container = document.createElement('div');
container.classList.add('long-string-container');
// 创建指示器包装器
const wrapper = document.createElement('div');
wrapper.classList.add('indicator-wrapper');
const quoteSpan = document.createElement('span');
quoteSpan.textContent = '"';
quoteSpan.classList.add('json-string');
wrapper.appendChild(quoteSpan);
// 添加解析指示器
const indicator = document.createElement('span');
indicator.textContent = '已解析为JSON';
indicator.classList.add('long-string-indicator');
wrapper.appendChild(indicator);
// 添加折叠控制
const toggle = document.createElement('span');
toggle.textContent = '[折叠]';
toggle.classList.add('long-string-toggle');
// 初始为非折叠状态
let isCollapsed = false;
toggle.addEventListener('click', function(e) {
isCollapsed = !isCollapsed;
toggle.textContent = isCollapsed ? '[展开]' : '[折叠]';
// 切换折叠状态类
container.classList.toggle('collapsed', isCollapsed);
e.stopPropagation();
});
wrapper.appendChild(toggle);
container.appendChild(wrapper);
// 尝试解析长字符串为JSON对象
try {
const parsed = JSON.parse(value);
const contentContainer = document.createElement('div');
contentContainer.classList.add('long-string-content');
const textSpan = document.createElement('div');
textSpan.appendChild(formatJson(parsed, depth));
contentContainer.appendChild(textSpan);
container.appendChild(contentContainer);
return container;
} catch (e) {
// 解析失败,作为普通字符串显示
const contentContainer = document.createElement('div');
contentContainer.classList.add('long-string-content');
const text = document.createElement('span');
text.textContent = value;
text.classList.add('json-string');
contentContainer.appendChild(text);
container.appendChild(contentContainer);
return container;
}
}
});
</script>
</body>
</html>

View File

@ -0,0 +1,401 @@
<!DOCTYPE html>
<html lang="zh">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>JSON数据查看器 | 优化修复版</title>
<style>
/* 更新背景色为浅蓝色 */
:root {
--bg-color: #e6f7ff; /* 浅蓝色背景 */
--panel-bg: #ffffff;
--border-color: #b3e0ff;
--text-color: #003366;
--key-color: #d73a49;
--string-color: #0d47a1;
--number-color: #e36209;
--boolean-color: #0d6efd;
--null-color: #6f42c1;
}
body {
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, 'Helvetica Neue', Arial, sans-serif;
background-color: var(--bg-color);
color: var(--text-color);
margin: 0;
padding: 20px;
line-height: 1.5;
background-image: radial-gradient(#b8e1ff 1px, transparent 1px);
background-size: 20px 20px;
}
.container {
max-width: 98%;
margin: 0 auto;
}
/* 更新Header样式 */
.header {
text-align: center;
margin-bottom: 12px; /* 原25px → 减半 */
padding: 15px 20px; /* 原25px → 减少10px */
background: linear-gradient(135deg, #1a2980, #26d0ce);
color: white;
border-radius: 8px; /* 原10px → 微调更协调 */
box-shadow: 0 4px 12px rgba(0, 50, 93, 0.2); /* 阴影微调 */
position: relative;
overflow: hidden;
}
.header::before {
content: "";
position: absolute;
top: -5px; /* 原-10px → 缩小模糊范围 */
left: -5px;
right: -5px;
bottom: -5px;
background: inherit;
filter: blur(12px); /* 原20px → 减少模糊强度 */
z-index: -1;
opacity: 0.6; /* 原0.7 → 微调透明度 */
}
.header h1 {
font-size: 1.6rem; /* 原2.2rem → 减少30% */
margin: 0 0 5px; /* 原10px → 减半 */
font-weight: 600;
text-shadow: 0 1px 3px rgba(0,0,0,0.2); /* 阴影微调 */
line-height: 1.3; /* 新增行高控制 */
}
.json-display {
background-color: var(--panel-bg);
border-radius: 8px;
padding: 20px;
overflow-x: auto;
box-shadow: 0 8px 20px rgba(0, 66, 117, 0.1);
border: 1px solid var(--border-color);
font-family: 'SFMono-Regular', Consolas, 'Liberation Mono', Menlo, monospace;
font-size: 14px;
line-height: 1.4;
margin-top: 10px;
word-wrap: break-word;
overflow-wrap: break-word;
}
.json-line {
display: block;
margin: 4px 0;
}
.json-key {
color: var(--key-color);
font-weight: bold;
}
.json-string { color: var(--string-color); }
.json-number { color: var(--number-color); }
.json-boolean { color: var(--boolean-color); }
.json-null { color: var(--null-color); }
.json-indent {
padding-left: 22px;
}
.json-brace {
color: #666;
font-weight: bold;
}
.json-close-container {
padding-left: 0;
display: inline;
}
.long-string-indicator {
background-color: #fff8e1;
padding: 3px 8px;
border-radius: 4px;
font-size: 11px;
color: #e36209;
display: inline-block;
margin-left: 6px;
}
.long-string-toggle {
color: #032f62;
font-size: 11px;
margin-left: 8px;
cursor: pointer;
user-select: none;
font-weight: bold;
}
.long-string-container {
padding: 8px 12px;
background-color: #f0f8ff;
border-left: 3px solid #e36209;
margin: 6px 0;
border-radius: 0 6px 6px 0;
position: relative;
overflow: hidden;
display: inline-block;
vertical-align: top;
}
.long-string-content {
padding-top: 6px;
padding-left: 15px;
}
.long-string-container.collapsed .long-string-content {
display: none;
}
.long-string-container::after {
content: "";
position: absolute;
bottom: 0;
left: 0;
right: 0;
height: 20px;
background: linear-gradient(to bottom, rgba(240, 248, 255, 0.3), #f0f8ff);
opacity: 1;
transition: opacity 0.3s ease;
pointer-events: none;
}
.long-string-container.collapsed::after {
opacity: 1;
}
.long-string-container:not(.collapsed)::after {
opacity: 0;
}
.indicator-wrapper {
display: inline-flex;
align-items: center;
}
.inline-value {
display: inline;
}
.json-comma {
display: inline;
}
</style>
</head>
<body>
<div class="container">
<div class="header">
<h1>百度任务数据查看器 | 优化版</h1>
<p>高效、直观的JSON数据可视化工具</p>
</div>
<div class="json-display" id="jsonContainer"></div>
</div>
{{ json_data|json_script:"external-json-data" }}
<script>
document.addEventListener('DOMContentLoaded', function() {
// 测试数据 - 包括各种数据类型和嵌套结构
const jsonData = JSON.parse(document.getElementById('external-json-data').textContent);
// 渲染JSON数据
renderJson(jsonData);
function renderJson(data) {
const container = document.getElementById('jsonContainer');
container.innerHTML = '';
const formattedJson = formatJson(data, 0);
container.appendChild(formattedJson);
}
function formatJson(data, depth) {
const container = document.createElement('div');
container.style.position = 'relative';
const type = typeof data;
const isArray = Array.isArray(data);
if (data === null) {
const line = document.createElement('span');
line.textContent = 'null';
line.classList.add('json-null');
container.appendChild(line);
return container;
}
if (type === 'string') {
// 特殊处理长字符串尝试自动解析为JSON
if (data.length > 80 && (data.includes('{') || data.includes('['))) {
return createLongStringElement(data, depth);
}
const line = document.createElement('span');
line.textContent = `"${data}"`;
line.classList.add('json-string');
container.appendChild(line);
return container;
}
if (type === 'number') {
const line = document.createElement('span');
line.textContent = data;
line.classList.add('json-number');
container.appendChild(line);
return container;
}
if (type === 'boolean') {
const line = document.createElement('span');
line.textContent = data ? 'true' : 'false';
line.classList.add('json-boolean');
container.appendChild(line);
return container;
}
if (type === 'object') {
// 优化:处理空数组和空对象
const isEmptyArray = isArray && data.length === 0;
const isEmptyObject = !isArray && Object.keys(data).length === 0;
if (isEmptyArray || isEmptyObject) {
const span = document.createElement('span');
span.textContent = isEmptyArray ? '[]' : '{}';
span.classList.add(isEmptyArray ? 'empty-array' : 'empty-object');
return span;
}
// 开括号
const openBrace = document.createElement('span');
openBrace.textContent = isArray ? '[' : '{';
openBrace.classList.add('json-brace');
container.appendChild(openBrace);
// 递归处理对象/数组的键值对
const entries = isArray ? data : Object.entries(data);
let count = 0;
const total = entries.length;
for (let i = 0; i < entries.length; i++) {
const key = isArray ? i : entries[i][0];
const value = isArray ? entries[i] : entries[i][1];
const line = document.createElement('div');
line.classList.add('json-line', 'json-indent');
if (!isArray) {
const keyElement = document.createElement('span');
keyElement.textContent = `"${key}": `;
keyElement.classList.add('json-key');
line.appendChild(keyElement);
}
// 值处理
const valueContainer = formatJson(value, depth + 1);
// 修复: 确保值容器内联显示
valueContainer.classList.add('inline-value');
line.appendChild(valueContainer);
// 添加逗号(最后一个元素不加)
if (i < total - 1) {
const comma = document.createElement('span');
comma.textContent = ',';
comma.classList.add('json-comma');
line.appendChild(comma);
}
container.appendChild(line);
}
// 闭括号 - 固定在同一行
const closeContainer = document.createElement('div');
closeContainer.classList.add('json-line', 'json-close-container');
const closeBrace = document.createElement('span');
closeBrace.textContent = isArray ? ']' : '}';
closeBrace.classList.add('json-brace');
closeContainer.appendChild(closeBrace);
container.appendChild(closeContainer);
}
return container;
}
/******************************************************
* 自动解析JSON数据的核心功能 - 保持不变
******************************************************/
function createLongStringElement(value, depth) {
const container = document.createElement('div');
container.classList.add('long-string-container');
// 创建指示器包装器
const wrapper = document.createElement('div');
wrapper.classList.add('indicator-wrapper');
const quoteSpan = document.createElement('span');
{#quoteSpan.textContent = '"';#}
quoteSpan.classList.add('json-string');
wrapper.appendChild(quoteSpan);
// 添加解析指示器
{#const indicator = document.createElement('span');#}
{#indicator.textContent = '已解析为JSON';#}
{#indicator.classList.add('long-string-indicator');#}
{#wrapper.appendChild(indicator);#}
// 添加折叠控制
const toggle = document.createElement('span');
toggle.textContent = '[折叠]';
toggle.classList.add('long-string-toggle');
// 初始为非折叠状态
let isCollapsed = false;
toggle.addEventListener('click', function(e) {
isCollapsed = !isCollapsed;
toggle.textContent = isCollapsed ? '[展开]' : '[折叠]';
// 切换折叠状态类
container.classList.toggle('collapsed', isCollapsed);
e.stopPropagation();
});
wrapper.appendChild(toggle);
container.appendChild(wrapper);
// 尝试解析长字符串为JSON对象
try {
const parsed = JSON.parse(value);
const contentContainer = document.createElement('div');
contentContainer.classList.add('long-string-content');
const textSpan = document.createElement('div');
textSpan.appendChild(formatJson(parsed, depth));
contentContainer.appendChild(textSpan);
container.appendChild(contentContainer);
return container;
} catch (e) {
// 解析失败,作为普通字符串显示
const contentContainer = document.createElement('div');
contentContainer.classList.add('long-string-content');
const text = document.createElement('span');
text.textContent = value;
text.classList.add('json-string');
contentContainer.appendChild(text);
container.appendChild(contentContainer);
return container;
}
}
});
</script>
</body>
</html>