# -*- coding: utf-8 -*- """ @Time : 2025/6/16 14:12 @Auth : 九月的海 @File : suCaiDownload.py @IDE : PyCharm @Motto : September_sea """ import os import time import requests import traceback import random from urllib.parse import urljoin from concurrent.futures import ThreadPoolExecutor from db_base import Material1_db # 导入数据库模块 class MaterialDownloader: def __init__(self, base_url="https://tos.mobgi.com"): """ 素材下载器初始化 :param base_url: 素材基础URL前缀 """ self.base_url = base_url self.download_dir = "D:\\创量\\A素材\\数据库素材批量下载" # 使用双反斜杠避免转义问题 self.base_save_directory = None self.subfolder_count = 0 self.files_per_folder = 0 self.current_file_counts = {} # 确保下载目录存在 self.ensure_directory_exists(self.download_dir) def set_download_dir(self, directory): """设置下载目录并确保它存在""" self.download_dir = directory return self.ensure_directory_exists(directory) def setup_subfolder_download(self, base_directory, subfolder_count=12, files_per_folder=40): """ 设置分层文件夹下载结构 :param base_directory: 基础目录路径 :param subfolder_count: 子文件夹数量 :param files_per_folder: 每个子文件夹最大文件数量 :return: True表示设置成功,False表示失败 """ self.base_save_directory = base_directory self.subfolder_count = subfolder_count self.files_per_folder = files_per_folder # 创建基础目录 if not self.ensure_directory_exists(self.base_save_directory): return False # 创建子文件夹(1到subfolder_count) for i in range(1, self.subfolder_count + 1): folder_path = os.path.join(self.base_save_directory, str(i)) if not self.ensure_directory_exists(folder_path): return False # 初始化文件夹文件计数器 self.current_file_counts = {str(i): 0 for i in range(1, self.subfolder_count + 1)} print(f"已创建子文件夹: 1-{subfolder_count},每个文件夹最多保存 {files_per_folder} 个文件") return True def get_next_subfolder(self): """获取下一个有空间的子文件夹""" if not self.current_file_counts: return "1" # 找出文件数量最少的文件夹 min_count = min(self.current_file_counts.values()) min_folders = [folder for folder, count in self.current_file_counts.items() if count == min_count] # 随机选择一个最小文件的文件夹(确保分布均匀) folder = random.choice(min_folders) # 增加该文件夹的计数 self.current_file_counts[folder] += 1 return folder def ensure_directory_exists(self, directory): """确保目录存在,如果不存在则创建""" if not os.path.exists(directory): try: os.makedirs(directory) print(f"创建下载目录: {directory}") return True except Exception as e: print(f"无法创建目录 {directory}: {str(e)}") return False return True def sanitize_filename(self, filename): """清理文件名中的无效字符""" invalid_chars = ['<', '>', ':', '"', '/', '\\', '|', '?', '*'] for char in invalid_chars: filename = filename.replace(char, '_') return filename.strip() def download_file(self, full_url, save_path): """下载单个文件到本地""" try: start_time = time.time() # 确保文件所在目录存在 save_dir = os.path.dirname(save_path) if not self.ensure_directory_exists(save_dir): return False, f"无法创建文件目录: {save_dir}" response = requests.get(full_url, stream=True, timeout=30) # 增加超时时间到30秒 response.raise_for_status() file_size = int(response.headers.get('Content-Length', 0)) downloaded = 0 last_percent = 0 # 使用临时文件名下载,完成后重命名为目标文件名 temp_path = f"{save_path}.download" with open(temp_path, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): if chunk: f.write(chunk) downloaded += len(chunk) # 显示下载进度(当文件大小已知时) if file_size > 0: percent = int(downloaded * 100 / file_size) # 每10%的进度更新一次显示 if percent - last_percent >= 10 or percent == 100: print( f"已下载: {percent}% ({downloaded / 1024 / 1024:.2f}MB/{file_size / 1024 / 1024:.2f}MB)") last_percent = percent # 下载完成后重命名 os.rename(temp_path, save_path) elapsed = time.time() - start_time size_mb = os.path.getsize(save_path) / (1024 * 1024) speed = size_mb / elapsed if elapsed > 0 else 0 return True, f"下载成功! 大小: {size_mb:.2f}MB, 耗时: {elapsed:.1f}秒, 速度: {speed:.1f}MB/s" except Exception as e: # 清理临时文件 if 'temp_path' in locals() and os.path.exists(temp_path): os.remove(temp_path) return False, f"下载失败: {str(e)}" def process_material(self, material): """处理单个素材记录""" # 确保我们有正确的三个字段 if len(material) < 3: return { 'material_id': 'unknown', 'material_name': 'invalid_data', 'url': 'invalid_data', 'save_path': 'invalid_data', 'success': False, 'message': f"数据库记录字段不足,预期3个,实际得到{len(material)}个: {material}" } material_id = material[0] material_name = material[1] if len(material) > 1 else 'unknown' file_uri = material[2] if len(material) > 2 else 'unknown' # 生成完整URL full_url = urljoin(self.base_url, file_uri) if file_uri != 'unknown' else 'unknown' # 清理文件名 safe_name = self.sanitize_filename(material_name) # 提取文件扩展名 extension = os.path.splitext(file_uri)[1] if file_uri != 'unknown' else '.bin' # 生成保存路径 (文件名格式: 素材ID_清理后的素材名.扩展名) # 确保文件名中不包含路径分隔符 filename = f"{material_id}_{safe_name}{extension}" # 确定保存位置 - 如果有设置子文件夹结构,则使用;否则使用统一目录 if self.base_save_directory and self.subfolder_count > 0: # 获取下一个有空间的子文件夹 folder_name = self.get_next_subfolder() # 生成路径:基础路径/文件夹名/文件名 save_path = os.path.join(self.base_save_directory, folder_name, filename) else: save_path = os.path.join(self.download_dir, filename) # 检查文件是否已存在 if os.path.exists(save_path): return { 'material_id': material_id, 'material_name': material_name, 'url': full_url, 'save_path': save_path, 'success': True, 'message': f"文件已存在 (大小: {os.path.getsize(save_path) / (1024 * 1024):.2f}MB)" } # 如果URL无效,直接返回失败 if full_url == 'unknown': return { 'material_id': material_id, 'material_name': material_name, 'url': full_url, 'save_path': save_path, 'success': False, 'message': "无效的文件URI" } # 下载文件 success, message = self.download_file(full_url, save_path) return { 'material_id': material_id, 'material_name': material_name, 'url': full_url, 'save_path': save_path, 'success': success, 'message': message } def batch_download(self, sql_query=None, max_workers=5): """ 批量下载素材 :param sql_query: 可选的自定义SQL查询 :param max_workers: 并发下载线程数 :return: 下载结果列表 """ # 默认查询语句 if not sql_query: sql_query = """ SELECT material_id, material_name, file_uri FROM `material` USE INDEX (idx_cTime_mainUserId) WHERE main_user_id = '12400078500' AND create_time BETWEEN '2025-04-29 17:53:37' AND '2025-04-30 17:53:37' AND create_time >= '2025-04-27' AND material_type = 'video' AND file_uri LIKE 'tos_beijing/%' LIMIT 50 """ # 使用正确的数据库查询方式 try: print(f"正在执行SQL查询: {sql_query}") # 通过模块方法查询数据库 materials = Material1_db.query_database(sql_query) if not materials: print("未查询到符合条件的素材") return [] # 打印查询结果中的第一条记录 if materials: print(f"查询到的第一条记录: {materials[0]}") print(f"\n查询到 {len(materials)} 个素材, 开始下载...\n") # 保存开始时间 start_time = time.time() # 使用线程池并发下载 results = [] with ThreadPoolExecutor(max_workers=max_workers) as executor: futures = [executor.submit(self.process_material, mat) for mat in materials] # 使用tqdm显示进度(可选) try: from tqdm import tqdm for future in tqdm(futures, desc="下载进度", unit="文件"): result = future.result() results.append(result) # 立即显示结果 self.print_result(result) except ImportError: # 如果没有安装tqdm,使用普通循环 for future in futures: result = future.result() results.append(result) # 立即显示结果 self.print_result(result) # 计算总耗时 total_time = time.time() - start_time # 打印统计信息 success_count = sum(1 for r in results if r['success']) print(f"\n{'=' * 60}") print(f"下载完成! 成功: {success_count}/{len(results)}") print(f"总耗时: {total_time:.2f}秒") if self.base_save_directory: print(f"下载目录: {self.base_save_directory} (分为{self.subfolder_count}个子文件夹)") else: print(f"下载目录: {self.download_dir}") print(f"{'=' * 60}") # 列出所有成功下载的文件 if success_count > 0: print("\n成功下载的文件:") for result in results: if result['success']: print(f"- {os.path.basename(result['save_path'])}") # 如果有失败的下载,创建失败列表 failed_downloads = [r for r in results if not r['success']] if failed_downloads: print("\n下载失败的文件:") for result in failed_downloads: print(f"- {result['material_name']} (ID: {result['material_id']}) - 原因: {result['message']}") return results except Exception as e: # 避免使用特殊Unicode字符 print(f"\n[错误] 批量下载失败: {str(e)}") traceback.print_exc() # 打印详细的错误堆栈 return [] def print_result(self, result): """打印单个下载结果""" material_info = f"{result['material_id']} - {result['material_name']}" save_path = result['save_path'] message = result['message'] # 下载成功的特殊格式 if result['success']: print(f"\n[成功] {material_info} - {message}") else: print(f"\n[失败] {material_info} - {message}") # 输出保存路径 print(f" 保存位置: {save_path}") if 'url' in result and result['url'] != 'unknown': print(f" 下载地址: {result['url']}") # 使用示例 if __name__ == "__main__": downloader = MaterialDownloader() # 设置分层文件夹下载结构 # 在该目录下创建30个子文件夹(1-30),每个文件夹最多存放2个文件 if not downloader.setup_subfolder_download( base_directory="D:\\创量\\A素材\\80个文件夹素材", subfolder_count=80, files_per_folder=1 ): print("无法创建下载目录结构") exit(1) print("+" * 60) print("开始素材批量下载程序") print(f"下载基础目录: {downloader.base_save_directory}") print(f"子文件夹: 1-{downloader.subfolder_count}") print(f"每个文件夹最多存放: {downloader.files_per_folder}个文件") print("+" * 60) print() # 使用自定义查询下载 custom_sql = """ SELECT material_id, material_name,file_uri FROM `material` USE INDEX (idx_cTime_mainUserId) WHERE main_user_id = '12000016859' AND create_time BETWEEN '2025-07-14 17:53:37' AND '2025-07-18 17:53:37' -- 限定时间范围 AND video_duration BETWEEN 10 AND 30 AND material_type = 'video' AND file_uri LIKE 'tos_beijing/%' AND material_name NOT LIKE '%-衍生' LIMIT 80; """ # 设置并行下载数量 max_workers = 5 print(f"使用 {max_workers} 个并发线程下载") results = downloader.batch_download(sql_query=custom_sql, max_workers=max_workers)