368 lines
14 KiB
Python
368 lines
14 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""
|
||
@Time : 2025/6/16 14:12
|
||
@Auth : 九月的海
|
||
@File : suCaiDownload.py
|
||
@IDE : PyCharm
|
||
@Motto : September_sea
|
||
"""
|
||
import os
|
||
import time
|
||
import requests
|
||
import traceback
|
||
import random
|
||
from urllib.parse import urljoin
|
||
from concurrent.futures import ThreadPoolExecutor
|
||
from db_base import Material1_db # 导入数据库模块
|
||
|
||
|
||
class MaterialDownloader:
|
||
def __init__(self, base_url="https://tos.mobgi.com"):
|
||
"""
|
||
素材下载器初始化
|
||
|
||
:param base_url: 素材基础URL前缀
|
||
"""
|
||
self.base_url = base_url
|
||
self.download_dir = "D:\\创量\\A素材\\数据库素材批量下载" # 使用双反斜杠避免转义问题
|
||
self.base_save_directory = None
|
||
self.subfolder_count = 0
|
||
self.files_per_folder = 0
|
||
self.current_file_counts = {}
|
||
|
||
# 确保下载目录存在
|
||
self.ensure_directory_exists(self.download_dir)
|
||
|
||
def set_download_dir(self, directory):
|
||
"""设置下载目录并确保它存在"""
|
||
self.download_dir = directory
|
||
return self.ensure_directory_exists(directory)
|
||
|
||
def setup_subfolder_download(self, base_directory, subfolder_count=12, files_per_folder=40):
|
||
"""
|
||
设置分层文件夹下载结构
|
||
:param base_directory: 基础目录路径
|
||
:param subfolder_count: 子文件夹数量
|
||
:param files_per_folder: 每个子文件夹最大文件数量
|
||
:return: True表示设置成功,False表示失败
|
||
"""
|
||
self.base_save_directory = base_directory
|
||
self.subfolder_count = subfolder_count
|
||
self.files_per_folder = files_per_folder
|
||
|
||
# 创建基础目录
|
||
if not self.ensure_directory_exists(self.base_save_directory):
|
||
return False
|
||
|
||
# 创建子文件夹(1到subfolder_count)
|
||
for i in range(1, self.subfolder_count + 1):
|
||
folder_path = os.path.join(self.base_save_directory, str(i))
|
||
if not self.ensure_directory_exists(folder_path):
|
||
return False
|
||
|
||
# 初始化文件夹文件计数器
|
||
self.current_file_counts = {str(i): 0 for i in range(1, self.subfolder_count + 1)}
|
||
print(f"已创建子文件夹: 1-{subfolder_count},每个文件夹最多保存 {files_per_folder} 个文件")
|
||
return True
|
||
|
||
def get_next_subfolder(self):
|
||
"""获取下一个有空间的子文件夹"""
|
||
if not self.current_file_counts:
|
||
return "1"
|
||
|
||
# 找出文件数量最少的文件夹
|
||
min_count = min(self.current_file_counts.values())
|
||
min_folders = [folder for folder, count in self.current_file_counts.items() if count == min_count]
|
||
|
||
# 随机选择一个最小文件的文件夹(确保分布均匀)
|
||
folder = random.choice(min_folders)
|
||
|
||
# 增加该文件夹的计数
|
||
self.current_file_counts[folder] += 1
|
||
return folder
|
||
|
||
def ensure_directory_exists(self, directory):
|
||
"""确保目录存在,如果不存在则创建"""
|
||
if not os.path.exists(directory):
|
||
try:
|
||
os.makedirs(directory)
|
||
print(f"创建下载目录: {directory}")
|
||
return True
|
||
except Exception as e:
|
||
print(f"无法创建目录 {directory}: {str(e)}")
|
||
return False
|
||
return True
|
||
|
||
def sanitize_filename(self, filename):
|
||
"""清理文件名中的无效字符"""
|
||
invalid_chars = ['<', '>', ':', '"', '/', '\\', '|', '?', '*']
|
||
for char in invalid_chars:
|
||
filename = filename.replace(char, '_')
|
||
return filename.strip()
|
||
|
||
def download_file(self, full_url, save_path):
|
||
"""下载单个文件到本地"""
|
||
try:
|
||
start_time = time.time()
|
||
|
||
# 确保文件所在目录存在
|
||
save_dir = os.path.dirname(save_path)
|
||
if not self.ensure_directory_exists(save_dir):
|
||
return False, f"无法创建文件目录: {save_dir}"
|
||
response = requests.get(full_url, stream=True, timeout=30) # 增加超时时间到30秒
|
||
response.raise_for_status()
|
||
file_size = int(response.headers.get('Content-Length', 0))
|
||
downloaded = 0
|
||
last_percent = 0
|
||
# 使用临时文件名下载,完成后重命名为目标文件名
|
||
temp_path = f"{save_path}.download"
|
||
|
||
with open(temp_path, 'wb') as f:
|
||
for chunk in response.iter_content(chunk_size=8192):
|
||
if chunk:
|
||
f.write(chunk)
|
||
downloaded += len(chunk)
|
||
# 显示下载进度(当文件大小已知时)
|
||
if file_size > 0:
|
||
percent = int(downloaded * 100 / file_size)
|
||
# 每10%的进度更新一次显示
|
||
if percent - last_percent >= 10 or percent == 100:
|
||
print(
|
||
f"已下载: {percent}% ({downloaded / 1024 / 1024:.2f}MB/{file_size / 1024 / 1024:.2f}MB)")
|
||
last_percent = percent
|
||
|
||
# 下载完成后重命名
|
||
os.rename(temp_path, save_path)
|
||
|
||
elapsed = time.time() - start_time
|
||
size_mb = os.path.getsize(save_path) / (1024 * 1024)
|
||
speed = size_mb / elapsed if elapsed > 0 else 0
|
||
|
||
return True, f"下载成功! 大小: {size_mb:.2f}MB, 耗时: {elapsed:.1f}秒, 速度: {speed:.1f}MB/s"
|
||
|
||
except Exception as e:
|
||
# 清理临时文件
|
||
if 'temp_path' in locals() and os.path.exists(temp_path):
|
||
os.remove(temp_path)
|
||
return False, f"下载失败: {str(e)}"
|
||
|
||
def process_material(self, material):
|
||
"""处理单个素材记录"""
|
||
# 确保我们有正确的三个字段
|
||
if len(material) < 3:
|
||
return {
|
||
'material_id': 'unknown',
|
||
'material_name': 'invalid_data',
|
||
'url': 'invalid_data',
|
||
'save_path': 'invalid_data',
|
||
'success': False,
|
||
'message': f"数据库记录字段不足,预期3个,实际得到{len(material)}个: {material}"
|
||
}
|
||
|
||
material_id = material[0]
|
||
material_name = material[1] if len(material) > 1 else 'unknown'
|
||
file_uri = material[2] if len(material) > 2 else 'unknown'
|
||
# 生成完整URL
|
||
full_url = urljoin(self.base_url, file_uri) if file_uri != 'unknown' else 'unknown'
|
||
# 清理文件名
|
||
safe_name = self.sanitize_filename(material_name)
|
||
# 提取文件扩展名
|
||
extension = os.path.splitext(file_uri)[1] if file_uri != 'unknown' else '.bin'
|
||
# 生成保存路径 (文件名格式: 素材ID_清理后的素材名.扩展名)
|
||
# 确保文件名中不包含路径分隔符
|
||
filename = f"{material_id}_{safe_name}{extension}"
|
||
|
||
# 确定保存位置 - 如果有设置子文件夹结构,则使用;否则使用统一目录
|
||
if self.base_save_directory and self.subfolder_count > 0:
|
||
# 获取下一个有空间的子文件夹
|
||
folder_name = self.get_next_subfolder()
|
||
# 生成路径:基础路径/文件夹名/文件名
|
||
save_path = os.path.join(self.base_save_directory, folder_name, filename)
|
||
else:
|
||
save_path = os.path.join(self.download_dir, filename)
|
||
|
||
# 检查文件是否已存在
|
||
if os.path.exists(save_path):
|
||
return {
|
||
'material_id': material_id,
|
||
'material_name': material_name,
|
||
'url': full_url,
|
||
'save_path': save_path,
|
||
'success': True,
|
||
'message': f"文件已存在 (大小: {os.path.getsize(save_path) / (1024 * 1024):.2f}MB)"
|
||
}
|
||
|
||
# 如果URL无效,直接返回失败
|
||
if full_url == 'unknown':
|
||
return {
|
||
'material_id': material_id,
|
||
'material_name': material_name,
|
||
'url': full_url,
|
||
'save_path': save_path,
|
||
'success': False,
|
||
'message': "无效的文件URI"
|
||
}
|
||
|
||
# 下载文件
|
||
success, message = self.download_file(full_url, save_path)
|
||
return {
|
||
'material_id': material_id,
|
||
'material_name': material_name,
|
||
'url': full_url,
|
||
'save_path': save_path,
|
||
'success': success,
|
||
'message': message
|
||
}
|
||
|
||
def batch_download(self, sql_query=None, max_workers=5):
|
||
"""
|
||
批量下载素材
|
||
|
||
:param sql_query: 可选的自定义SQL查询
|
||
:param max_workers: 并发下载线程数
|
||
:return: 下载结果列表
|
||
"""
|
||
# 默认查询语句
|
||
if not sql_query:
|
||
sql_query = """
|
||
SELECT material_id, material_name, file_uri
|
||
FROM `material`
|
||
USE INDEX (idx_cTime_mainUserId)
|
||
WHERE main_user_id = '12400078500'
|
||
AND create_time BETWEEN '2025-04-29 17:53:37' AND '2025-04-30 17:53:37'
|
||
AND create_time >= '2025-04-27'
|
||
AND material_type = 'video'
|
||
AND file_uri LIKE 'tos_beijing/%'
|
||
LIMIT 50
|
||
"""
|
||
|
||
# 使用正确的数据库查询方式
|
||
try:
|
||
print(f"正在执行SQL查询: {sql_query}")
|
||
# 通过模块方法查询数据库
|
||
materials = Material1_db.query_database(sql_query)
|
||
if not materials:
|
||
print("未查询到符合条件的素材")
|
||
return []
|
||
|
||
# 打印查询结果中的第一条记录
|
||
if materials:
|
||
print(f"查询到的第一条记录: {materials[0]}")
|
||
|
||
print(f"\n查询到 {len(materials)} 个素材, 开始下载...\n")
|
||
# 保存开始时间
|
||
start_time = time.time()
|
||
# 使用线程池并发下载
|
||
results = []
|
||
with ThreadPoolExecutor(max_workers=max_workers) as executor:
|
||
futures = [executor.submit(self.process_material, mat) for mat in materials]
|
||
|
||
# 使用tqdm显示进度(可选)
|
||
try:
|
||
from tqdm import tqdm
|
||
for future in tqdm(futures, desc="下载进度", unit="文件"):
|
||
result = future.result()
|
||
results.append(result)
|
||
# 立即显示结果
|
||
self.print_result(result)
|
||
except ImportError:
|
||
# 如果没有安装tqdm,使用普通循环
|
||
for future in futures:
|
||
result = future.result()
|
||
results.append(result)
|
||
# 立即显示结果
|
||
self.print_result(result)
|
||
|
||
# 计算总耗时
|
||
total_time = time.time() - start_time
|
||
|
||
# 打印统计信息
|
||
success_count = sum(1 for r in results if r['success'])
|
||
print(f"\n{'=' * 60}")
|
||
print(f"下载完成! 成功: {success_count}/{len(results)}")
|
||
print(f"总耗时: {total_time:.2f}秒")
|
||
if self.base_save_directory:
|
||
print(f"下载目录: {self.base_save_directory} (分为{self.subfolder_count}个子文件夹)")
|
||
else:
|
||
print(f"下载目录: {self.download_dir}")
|
||
print(f"{'=' * 60}")
|
||
|
||
# 列出所有成功下载的文件
|
||
if success_count > 0:
|
||
print("\n成功下载的文件:")
|
||
for result in results:
|
||
if result['success']:
|
||
print(f"- {os.path.basename(result['save_path'])}")
|
||
|
||
# 如果有失败的下载,创建失败列表
|
||
failed_downloads = [r for r in results if not r['success']]
|
||
if failed_downloads:
|
||
print("\n下载失败的文件:")
|
||
for result in failed_downloads:
|
||
print(f"- {result['material_name']} (ID: {result['material_id']}) - 原因: {result['message']}")
|
||
|
||
return results
|
||
|
||
except Exception as e:
|
||
# 避免使用特殊Unicode字符
|
||
print(f"\n[错误] 批量下载失败: {str(e)}")
|
||
traceback.print_exc() # 打印详细的错误堆栈
|
||
return []
|
||
|
||
def print_result(self, result):
|
||
"""打印单个下载结果"""
|
||
material_info = f"{result['material_id']} - {result['material_name']}"
|
||
save_path = result['save_path']
|
||
message = result['message']
|
||
|
||
# 下载成功的特殊格式
|
||
if result['success']:
|
||
print(f"\n[成功] {material_info} - {message}")
|
||
else:
|
||
print(f"\n[失败] {material_info} - {message}")
|
||
|
||
# 输出保存路径
|
||
print(f" 保存位置: {save_path}")
|
||
if 'url' in result and result['url'] != 'unknown':
|
||
print(f" 下载地址: {result['url']}")
|
||
|
||
|
||
# 使用示例
|
||
if __name__ == "__main__":
|
||
downloader = MaterialDownloader()
|
||
|
||
# 设置分层文件夹下载结构
|
||
# 在该目录下创建30个子文件夹(1-30),每个文件夹最多存放2个文件
|
||
if not downloader.setup_subfolder_download(
|
||
base_directory="D:\\创量\\A素材\\80个文件夹素材",
|
||
subfolder_count=80,
|
||
files_per_folder=1
|
||
):
|
||
print("无法创建下载目录结构")
|
||
exit(1)
|
||
|
||
print("+" * 60)
|
||
print("开始素材批量下载程序")
|
||
print(f"下载基础目录: {downloader.base_save_directory}")
|
||
print(f"子文件夹: 1-{downloader.subfolder_count}")
|
||
print(f"每个文件夹最多存放: {downloader.files_per_folder}个文件")
|
||
print("+" * 60)
|
||
print()
|
||
# 使用自定义查询下载
|
||
custom_sql = """
|
||
SELECT material_id, material_name,file_uri
|
||
FROM `material`
|
||
USE INDEX (idx_cTime_mainUserId)
|
||
WHERE main_user_id = '12000016859'
|
||
AND create_time BETWEEN '2025-07-14 17:53:37' AND '2025-07-18 17:53:37' -- 限定时间范围
|
||
AND video_duration BETWEEN 10 AND 30
|
||
AND material_type = 'video'
|
||
AND file_uri LIKE 'tos_beijing/%'
|
||
AND material_name NOT LIKE '%-衍生'
|
||
LIMIT 80;
|
||
"""
|
||
# 设置并行下载数量
|
||
max_workers = 5
|
||
print(f"使用 {max_workers} 个并发线程下载")
|
||
|
||
results = downloader.batch_download(sql_query=custom_sql, max_workers=max_workers) |