chuangliangProject/chuangliangTool/suCaiDownload.py

275 lines
10 KiB
Python
Raw Permalink Normal View History

2025-08-13 03:11:46 +00:00
# -*- coding: utf-8 -*-
"""
@Time : 2025/6/16 14:12
@Auth : 九月的海
@File : suCaiDownload.py
@IDE : PyCharm
@Motto : September_sea
"""
import os
import time
import requests
import traceback
from urllib.parse import urljoin
from concurrent.futures import ThreadPoolExecutor
from db_base import Material1_db # 导入数据库模块
class MaterialDownloader:
def __init__(self, base_url="https://tos.mobgi.com"):
self.base_url = base_url
self.download_dir = "D:\\创量\\A素材\\数据库素材批量下载"
# 确保下载目录存在
self.ensure_directory_exists(self.download_dir)
def set_download_dir(self, directory):
"""设置下载目录并确保它存在"""
self.download_dir = directory
return self.ensure_directory_exists(directory)
def ensure_directory_exists(self, directory):
"""确保目录存在,如果不存在则创建"""
if not os.path.exists(directory):
try:
os.makedirs(directory)
print(f"创建下载目录: {directory}")
return True
except Exception as e:
print(f"无法创建目录 {directory}: {str(e)}")
return False
return True
def sanitize_filename(self, filename):
"""清理文件名中的无效字符"""
invalid_chars = ['<', '>', ':', '"', '/', '\\', '|', '?', '*']
for char in invalid_chars:
filename = filename.replace(char, '_')
return filename.strip()
def download_file(self, full_url, save_path):
"""下载单个文件到本地"""
try:
start_time = time.time()
# 确保文件所在目录存在
save_dir = os.path.dirname(save_path)
if not self.ensure_directory_exists(save_dir):
return False, f"无法创建文件目录: {save_dir}"
response = requests.get(full_url, stream=True, timeout=30) # 增加超时时间到30秒
response.raise_for_status()
file_size = int(response.headers.get('Content-Length', 0))
downloaded = 0
last_percent = 0
# 使用临时文件名下载,完成后重命名为目标文件名
temp_path = f"{save_path}.download"
with open(temp_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
downloaded += len(chunk)
# 显示下载进度(当文件大小已知时)
if file_size > 0:
percent = int(downloaded * 100 / file_size)
# 每10%的进度更新一次显示
if percent - last_percent >= 10 or percent == 100:
print(
f"已下载: {percent}% ({downloaded / 1024 / 1024:.2f}MB/{file_size / 1024 / 1024:.2f}MB)")
last_percent = percent
# 下载完成后重命名
os.rename(temp_path, save_path)
elapsed = time.time() - start_time
size_mb = os.path.getsize(save_path) / (1024 * 1024)
speed = size_mb / elapsed if elapsed > 0 else 0
return True, f"下载成功! 大小: {size_mb:.2f}MB, 耗时: {elapsed:.1f}秒, 速度: {speed:.1f}MB/s"
except Exception as e:
# 清理临时文件
if 'temp_path' in locals() and os.path.exists(temp_path):
os.remove(temp_path)
return False, f"下载失败: {str(e)}"
def process_material(self, material):
"""处理单个素材记录"""
material_id, material_name, file_uri = material
# 生成完整URL
full_url = urljoin(self.base_url, file_uri)
# 清理文件名
safe_name = self.sanitize_filename(material_name)
# 提取文件扩展名
extension = os.path.splitext(file_uri)[1] or '.bin' # 如果没有扩展名默认.bin
# 生成保存路径 (文件名格式: 素材ID_清理后的素材名.扩展名)
# 确保文件名中不包含路径分隔符
filename = f"{material_id}_{safe_name}{extension}"
save_path = os.path.join(self.download_dir, filename)
# 检查文件是否已存在
if os.path.exists(save_path):
return {
'material_id': material_id,
'material_name': material_name,
'url': full_url,
'save_path': save_path,
'success': True,
'message': f"文件已存在 (大小: {os.path.getsize(save_path) / (1024 * 1024):.2f}MB)"
}
# 下载文件
success, message = self.download_file(full_url, save_path)
return {
'material_id': material_id,
'material_name': material_name,
'url': full_url,
'save_path': save_path,
'success': success,
'message': message
}
def batch_download(self, sql_query=None, max_workers=5):
"""
批量下载素材
:param sql_query: 可选的自定义SQL查询
:param max_workers: 并发下载线程数
:return: 下载结果列表
"""
# 默认查询语句
if not sql_query:
sql_query = """
SELECT material_id, material_name, file_uri
FROM `material`
USE INDEX (idx_cTime_mainUserId)
WHERE main_user_id = '12400078500'
AND create_time BETWEEN '2025-04-29 17:53:37' AND '2025-04-30 17:53:37'
AND create_time >= '2025-04-27'
AND material_type = 'video'
AND file_uri LIKE 'tos_beijing/%'
LIMIT 50
"""
# 使用正确的数据库查询方式
try:
print(f"正在执行SQL查询...")
# 通过模块方法查询数据库
materials = Material1_db.query_database(sql_query)
if not materials:
print("未查询到符合条件的素材")
return []
print(f"\n查询到 {len(materials)} 个素材, 开始下载...\n")
# 保存开始时间
start_time = time.time()
# 使用线程池并发下载
results = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(self.process_material, mat) for mat in materials]
# 使用tqdm显示进度
try:
from tqdm import tqdm
for future in tqdm(futures, desc="下载进度", unit="文件"):
result = future.result()
results.append(result)
# 立即显示结果
self.print_result(result)
except ImportError:
# 如果没有安装tqdm使用普通循环
for future in futures:
result = future.result()
results.append(result)
# 立即显示结果
self.print_result(result)
# 计算总耗时
total_time = time.time() - start_time
# 打印统计信息
success_count = sum(1 for r in results if r['success'])
print(f"\n{'=' * 60}")
print(f"下载完成! 成功: {success_count}/{len(results)}")
print(f"总耗时: {total_time:.2f}")
print(f"下载目录: {self.download_dir}")
print(f"{'=' * 60}")
# 列出所有成功下载的文件
if success_count > 0:
print("\n成功下载的文件:")
for result in results:
if result['success']:
print(f"- {os.path.basename(result['save_path'])}")
# 如果有失败的下载,创建失败列表
failed_downloads = [r for r in results if not r['success']]
if failed_downloads:
print("\n下载失败的文件:")
for result in failed_downloads:
print(f"- {result['material_name']} (ID: {result['material_id']}) - 原因: {result['message']}")
return results
except Exception as e:
# 避免使用特殊Unicode字符
print(f"\n[错误] 数据库查询失败: {str(e)}")
traceback.print_exc() # 打印详细的错误堆栈
return []
def print_result(self, result):
"""打印单个下载结果"""
material_info = f"{result['material_id']} - {result['material_name']}"
save_path = result['save_path']
message = result['message']
# 下载成功的特殊格式
if result['success']:
print(f"\n[成功] {material_info} - {message}")
else:
print(f"\n[失败] {material_info} - {message}")
# 输出保存路径
print(f" 保存位置: {save_path}")
print(f" 下载地址: {result['url']}")
if __name__ == "__main__":
downloader = MaterialDownloader()
# 自定义下载目录
custom_dir = "D:\\创量\\A素材\\1500个素材10~100s批量小牛24年12月份素材"
# 确保自定义目录存在
if not downloader.set_download_dir(custom_dir):
print(f"无法创建下载目录: {custom_dir}")
exit(1)
print("+" * 60)
print("开始素材批量下载程序")
print(f"下载目录: {downloader.download_dir}")
print("+" * 60)
print()
# 使用自定义查询下载
custom_sql = """
SELECT material_id, material_name, file_uri
FROM `material`
USE INDEX (idx_cTime_mainUserId)
WHERE main_user_id = '9687'
AND create_time BETWEEN '2024-12-01 17:53:37' AND '2024-12-30 17:53:37'
AND video_duration BETWEEN 10 AND 100
AND material_type = 'video'
AND file_uri LIKE 'tos_beijing/%'
LIMIT 1500;
"""
# 设置并行下载数量
max_workers = 5
print(f"使用 {max_workers} 个并发线程下载")
results = downloader.batch_download(sql_query=custom_sql, max_workers=max_workers)