chuangliangProject/chuangliangTool/suCaiDownload.py

275 lines
10 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"""
@Time : 2025/6/16 14:12
@Auth : 九月的海
@File : suCaiDownload.py
@IDE : PyCharm
@Motto : September_sea
"""
import os
import time
import requests
import traceback
from urllib.parse import urljoin
from concurrent.futures import ThreadPoolExecutor
from db_base import Material1_db # 导入数据库模块
class MaterialDownloader:
def __init__(self, base_url="https://tos.mobgi.com"):
self.base_url = base_url
self.download_dir = "D:\\创量\\A素材\\数据库素材批量下载"
# 确保下载目录存在
self.ensure_directory_exists(self.download_dir)
def set_download_dir(self, directory):
"""设置下载目录并确保它存在"""
self.download_dir = directory
return self.ensure_directory_exists(directory)
def ensure_directory_exists(self, directory):
"""确保目录存在,如果不存在则创建"""
if not os.path.exists(directory):
try:
os.makedirs(directory)
print(f"创建下载目录: {directory}")
return True
except Exception as e:
print(f"无法创建目录 {directory}: {str(e)}")
return False
return True
def sanitize_filename(self, filename):
"""清理文件名中的无效字符"""
invalid_chars = ['<', '>', ':', '"', '/', '\\', '|', '?', '*']
for char in invalid_chars:
filename = filename.replace(char, '_')
return filename.strip()
def download_file(self, full_url, save_path):
"""下载单个文件到本地"""
try:
start_time = time.time()
# 确保文件所在目录存在
save_dir = os.path.dirname(save_path)
if not self.ensure_directory_exists(save_dir):
return False, f"无法创建文件目录: {save_dir}"
response = requests.get(full_url, stream=True, timeout=30) # 增加超时时间到30秒
response.raise_for_status()
file_size = int(response.headers.get('Content-Length', 0))
downloaded = 0
last_percent = 0
# 使用临时文件名下载,完成后重命名为目标文件名
temp_path = f"{save_path}.download"
with open(temp_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
downloaded += len(chunk)
# 显示下载进度(当文件大小已知时)
if file_size > 0:
percent = int(downloaded * 100 / file_size)
# 每10%的进度更新一次显示
if percent - last_percent >= 10 or percent == 100:
print(
f"已下载: {percent}% ({downloaded / 1024 / 1024:.2f}MB/{file_size / 1024 / 1024:.2f}MB)")
last_percent = percent
# 下载完成后重命名
os.rename(temp_path, save_path)
elapsed = time.time() - start_time
size_mb = os.path.getsize(save_path) / (1024 * 1024)
speed = size_mb / elapsed if elapsed > 0 else 0
return True, f"下载成功! 大小: {size_mb:.2f}MB, 耗时: {elapsed:.1f}秒, 速度: {speed:.1f}MB/s"
except Exception as e:
# 清理临时文件
if 'temp_path' in locals() and os.path.exists(temp_path):
os.remove(temp_path)
return False, f"下载失败: {str(e)}"
def process_material(self, material):
"""处理单个素材记录"""
material_id, material_name, file_uri = material
# 生成完整URL
full_url = urljoin(self.base_url, file_uri)
# 清理文件名
safe_name = self.sanitize_filename(material_name)
# 提取文件扩展名
extension = os.path.splitext(file_uri)[1] or '.bin' # 如果没有扩展名默认.bin
# 生成保存路径 (文件名格式: 素材ID_清理后的素材名.扩展名)
# 确保文件名中不包含路径分隔符
filename = f"{material_id}_{safe_name}{extension}"
save_path = os.path.join(self.download_dir, filename)
# 检查文件是否已存在
if os.path.exists(save_path):
return {
'material_id': material_id,
'material_name': material_name,
'url': full_url,
'save_path': save_path,
'success': True,
'message': f"文件已存在 (大小: {os.path.getsize(save_path) / (1024 * 1024):.2f}MB)"
}
# 下载文件
success, message = self.download_file(full_url, save_path)
return {
'material_id': material_id,
'material_name': material_name,
'url': full_url,
'save_path': save_path,
'success': success,
'message': message
}
def batch_download(self, sql_query=None, max_workers=5):
"""
批量下载素材
:param sql_query: 可选的自定义SQL查询
:param max_workers: 并发下载线程数
:return: 下载结果列表
"""
# 默认查询语句
if not sql_query:
sql_query = """
SELECT material_id, material_name, file_uri
FROM `material`
USE INDEX (idx_cTime_mainUserId)
WHERE main_user_id = '12400078500'
AND create_time BETWEEN '2025-04-29 17:53:37' AND '2025-04-30 17:53:37'
AND create_time >= '2025-04-27'
AND material_type = 'video'
AND file_uri LIKE 'tos_beijing/%'
LIMIT 50
"""
# 使用正确的数据库查询方式
try:
print(f"正在执行SQL查询...")
# 通过模块方法查询数据库
materials = Material1_db.query_database(sql_query)
if not materials:
print("未查询到符合条件的素材")
return []
print(f"\n查询到 {len(materials)} 个素材, 开始下载...\n")
# 保存开始时间
start_time = time.time()
# 使用线程池并发下载
results = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(self.process_material, mat) for mat in materials]
# 使用tqdm显示进度
try:
from tqdm import tqdm
for future in tqdm(futures, desc="下载进度", unit="文件"):
result = future.result()
results.append(result)
# 立即显示结果
self.print_result(result)
except ImportError:
# 如果没有安装tqdm使用普通循环
for future in futures:
result = future.result()
results.append(result)
# 立即显示结果
self.print_result(result)
# 计算总耗时
total_time = time.time() - start_time
# 打印统计信息
success_count = sum(1 for r in results if r['success'])
print(f"\n{'=' * 60}")
print(f"下载完成! 成功: {success_count}/{len(results)}")
print(f"总耗时: {total_time:.2f}")
print(f"下载目录: {self.download_dir}")
print(f"{'=' * 60}")
# 列出所有成功下载的文件
if success_count > 0:
print("\n成功下载的文件:")
for result in results:
if result['success']:
print(f"- {os.path.basename(result['save_path'])}")
# 如果有失败的下载,创建失败列表
failed_downloads = [r for r in results if not r['success']]
if failed_downloads:
print("\n下载失败的文件:")
for result in failed_downloads:
print(f"- {result['material_name']} (ID: {result['material_id']}) - 原因: {result['message']}")
return results
except Exception as e:
# 避免使用特殊Unicode字符
print(f"\n[错误] 数据库查询失败: {str(e)}")
traceback.print_exc() # 打印详细的错误堆栈
return []
def print_result(self, result):
"""打印单个下载结果"""
material_info = f"{result['material_id']} - {result['material_name']}"
save_path = result['save_path']
message = result['message']
# 下载成功的特殊格式
if result['success']:
print(f"\n[成功] {material_info} - {message}")
else:
print(f"\n[失败] {material_info} - {message}")
# 输出保存路径
print(f" 保存位置: {save_path}")
print(f" 下载地址: {result['url']}")
if __name__ == "__main__":
downloader = MaterialDownloader()
# 自定义下载目录
custom_dir = "D:\\创量\\A素材\\1500个素材10~100s批量小牛24年12月份素材"
# 确保自定义目录存在
if not downloader.set_download_dir(custom_dir):
print(f"无法创建下载目录: {custom_dir}")
exit(1)
print("+" * 60)
print("开始素材批量下载程序")
print(f"下载目录: {downloader.download_dir}")
print("+" * 60)
print()
# 使用自定义查询下载
custom_sql = """
SELECT material_id, material_name, file_uri
FROM `material`
USE INDEX (idx_cTime_mainUserId)
WHERE main_user_id = '9687'
AND create_time BETWEEN '2024-12-01 17:53:37' AND '2024-12-30 17:53:37'
AND video_duration BETWEEN 10 AND 100
AND material_type = 'video'
AND file_uri LIKE 'tos_beijing/%'
LIMIT 1500;
"""
# 设置并行下载数量
max_workers = 5
print(f"使用 {max_workers} 个并发线程下载")
results = downloader.batch_download(sql_query=custom_sql, max_workers=max_workers)