from src.common.naming import FileNaming from src.common.ComicInfo import ImageInfo from zipfile import ZipFile from datetime import datetime import time, shutil,re, xxhash, json from typing import Any import os,hashlib import xml.etree.ElementTree as ET from PIL import Image from io import BytesIO from tempfile import NamedTemporaryFile from xml.dom import minidom from src.common.ComicInfo import ComicInfoXml from lxml import etree from collections import defaultdict from typing import List, Dict, Tuple class test: def clean_min_cbz(self): """ 清理3KB以下CBZ文件 """ dir_path = "/mnt/Comics/CBZ/rm_comic" for dir in os.listdir(dir_path): c_dir = os.path.join(dir_path, dir) if os.path.isdir(c_dir): files = list(FileNaming.get_filenames_optimized(c_dir, ext_filter=['.CBZ'])) for file in files: size = os.path.getsize(file) if size < 3000: os.remove(file) print(f"已删除{file}") def _clean_old_cbz(self, cbz_path): m_time = datetime.fromtimestamp(os.path.getmtime(cbz_path)) str_strftime = '%Y%m%d' zip_time = m_time.strftime(str_strftime) with ZipFile(cbz_path, 'r') as zip_ref: old_img = 0 for file_info in zip_ref.infolist(): # 获取日期时间信息,格式为 (year, month, day, hour, minute, second) date_time = file_info.date_time # 将日期时间元组转换为datetime对象 dt = datetime(*date_time) # 格式化输出日期时间,例如:YYYY-MM-DD HH:MM:SS file_date_time = dt.strftime(str_strftime) # 一周内的图片跳过 if int(zip_time) - int(file_date_time) > 7: #print(f"Clear Filename: {file_info.filename}, zip: {cbz_path}") old_img += 1 if old_img > 0: #os.remove(cbz_path) print(f"remove cbz {cbz_path}") def clean_old_cbz(self): dir_path = "/mnt/Comics/CBZ/rm_comic" for dir in os.listdir(dir_path): c_dir = os.path.join(dir_path, dir) if os.path.isdir(c_dir): files = list(FileNaming.get_filenames_optimized(c_dir, ext_filter=['.CBZ'])) for file in files: self._clean_old_cbz(file) class comicInfo: def find_actual_path(self, zip_ref, target_path): """不区分大小写查找压缩包内的实际文件路径""" target_lower = target_path.lower() for name in zip_ref.namelist(): if name.lower() == target_lower: return name return None def process_cbz(self, cbz_path): try: with ZipFile(cbz_path, 'r') as cbz: # ============================================ # 第一部分:读取 ComicInfo.xml 的元数据字段 # ============================================ xml_files = [f for f in cbz.namelist() if f.lower() == 'comicinfo.xml'] if not xml_files: print("未找到 ComicInfo.xml") return None xml_file_name = xml_files[0] # 解析 XML 元数据 metadata = {} with cbz.open(xml_file_name) as xml_file: xml_content = xml_file.read().decode('utf-8') root = ET.fromstring(xml_content) # 定义需要提取的元数据字段(用户自定义的字段列表) metadata_fields = [ "Title", "Series", "Number", "Summary", "Writer", "Genre", "Tags", "PageCount", "AgeRating" ] for field in metadata_fields: element = root.find(field) metadata[field] = element.text if element is not None else None # ============================================ # 第二部分:读取 Page 标签的图片信息 # ============================================ pages_info = [] with cbz.open(xml_file_name) as xml_file: xml_content = xml_file.read().decode('utf-8') root = ET.fromstring(xml_content) # 提取所有 Page 标签 pages = root.find('Pages') if pages is None: print("XML 中缺少 Pages 标签") #return {"metadata": metadata, "pages": None} if pages != None: page_list = pages.findall('Page') #if not page_list: # print("Pages 标签下无 Page 元素") # return {"metadata": metadata, "pages": None} # 收集图片路径 if pages != None: image_paths = [page.get('Image') for page in page_list if page.get('Image')] else: image_paths = [] if len(image_paths) == 0: print(f"{cbz_path} {image_paths} 为空") img_count = 1 while 1: actual_path = self.find_actual_path(cbz, "{:0>3d}".format(img_count)+".jpg") if actual_path: image_paths.append("{:0>3d}".format(img_count)) img_count += 1 else: break print(f"生成新的paths{image_paths}") # 处理每个图片文件 for img_path in image_paths: actual_path = self.find_actual_path(cbz, img_path+".jpg") if not actual_path: print(f"警告:图片 '{img_path}' 不存在于压缩包中") continue with cbz.open(actual_path) as img_file: content = img_file.read() # 计算 MD5 和文件大小 file_md5 = hashlib.md5(content).hexdigest() file_size = len(content) # 读取图片尺寸 img_width, img_height = None, None try: with Image.open(BytesIO(content)) as img: img_width, img_height = img.size except Exception as e: print(f"无法读取图片尺寸:{actual_path},错误:{e}") # 存储图片信息 pages_info.append({ "name": os.path.basename(actual_path).split(".")[0], "size": file_size, "key": file_md5, "width": img_width, "height": img_height }) return { "metadata": metadata, "pages": pages_info } except Exception as e: print(f"处理 CBZ 文件时出错: {e}") raise exit(f"处理CBZ出错") def generate_comic_info_xml(self, metadata, pages_info): """根据元数据和页面信息生成 ComicInfo.xml 内容""" # 创建根节点 root = ET.Element('ComicInfo') root.set('xmlns:xsd', 'http://www.w3.org/2001/XMLSchema') root.set('xmlns:xsi', 'http://www.w3.org/2001/XMLSchema-instance') # 添加元数据字段 for field, value in metadata.items(): if value is not None: elem = ET.SubElement(root, field) elem.text = str(value) # 添加 Pages 节点 if pages_info: pages_elem = ET.SubElement(root, "Pages") for page in pages_info: # 示例中保留关键属性,可根据需要扩展其他属性 page_elem = ET.SubElement(pages_elem, "Page", attrib={ "Image": page.get("name", ""), "ImageSize": str(page.get("size", 0)), "Key": str(page.get("key", 0)), "ImageWidth": str(page.get("width", 0)), "ImageHeight": str(page.get("height", 0)) }) # 生成 XML 字符串 #tree = ET.ElementTree(root) #xml_content = BytesIO() #tree.write(xml_content, encoding="utf-8", xml_declaration=True) # Create a formatted XML string xml_str = ET.tostring(root, encoding='utf-8', method='xml') parsed_xml = minidom.parseString(xml_str) formatted_xml = parsed_xml.toprettyxml(indent=" ", encoding="utf-8") # Adjust the number of spaces for indentation as needed # Convert bytes to string and add XML declaration return formatted_xml.decode('utf-8') #return xml_content.getvalue() def update_cbz_with_new_xml(self, cbz_path, new_xml_content, output_path=None): """将新生成的 ComicInfo.xml 更新到 CBZ 文件中""" try: # 默认输出路径为原文件路径(覆盖原文件) if output_path is None: output_path = cbz_path # 创建临时文件处理覆盖操作 with NamedTemporaryFile(delete=False) as tmp: tmp.close() shutil.move(cbz_path, tmp.name) # 读取原文件并替换 ComicInfo.xml with ZipFile(tmp.name, 'r') as source_zip: with ZipFile(output_path, 'w') as new_zip: # 复制原文件(跳过旧 XML) for item in source_zip.infolist(): if item.filename.lower() != 'comicinfo.xml': new_zip.writestr(item, source_zip.read(item.filename)) # 添加新 XML new_zip.writestr("ComicInfo.xml", new_xml_content) os.remove(tmp.name) # 清理临时文件 return True except Exception as e: print(f"更新 CBZ 文件失败: {e}") if os.path.exists(tmp.name): shutil.move(tmp.name, cbz_path) # 恢复备份 raise exit(f"更新失败") def update_comicinfo_cbz(self, cbz_path): """更新CBZ中的ComicInfo.xml Args: cbz_path (_type_): _description_ """ data = self.process_cbz(cbz_path) metadata = data["metadata"] author = data["metadata"].get("Writer", "") tags = data["metadata"].get("Tags", "") (list_value, value) = [[], str(author).replace("&", " ")] for val in set(str(value).split(" ")): list_value.append(val) author = FileNaming.chinese_file_name(",".join(list_value)) data["metadata"]["Writer"] = author # 生成 XML 内容 new_xml = self.generate_comic_info_xml(data["metadata"], data["pages"]) xml_file = "NewComicInfo.xml" # 测试:保存 XML 到本地查看 with open(xml_file, "w", encoding="utf-8") as f: f.write(new_xml) print(f"已生成 {xml_file}") ComicInfoXml()._validate_xml_with_xsd_file(xml_file=xml_file ,xsd_file="src/assets/ComicInfo_2.1.xsd") # 更新 CBZ 文件(示例路径,实际操作前请备份) success = comicInfo().update_cbz_with_new_xml(cbz_path, new_xml) # if success: # print("CBZ 文件更新成功") os.remove(xml_file) def ver_comicinfo_xml(self, cbz_path): try: with ZipFile(cbz_path, 'r') as cbz: # ============================================ # 第一部分:读取 ComicInfo.xml 的元数据字段 # ============================================ xml_files = [f for f in cbz.namelist() if f.lower() == 'comicinfo.xml'] if not xml_files: print(f" {cbz_path} 未找到 ComicInfo.xml") #os.remove(cbz_path) except: raise exit(f"ver_comicinfo_xml 错误") def _comic_info_xml_pages(self, zip_file): """获取 ComicInfo.xml 文件中的 标签值""" # data = { "file" : os.path.basename(zip_file), "path" : str(zip_file) } # data = { "file" : os.path.basename(zip_file)} data = { "file" : str(zip_file) } list_page = [] # 打开ZIP文件 with ZipFile(str(zip_file), 'r') as z: try: # 假设ZIP中的文件名是'text.txt' with z.open('ComicInfo.xml', 'r') as file: # 从文件流中解析 XML 数据 file_string = file.read().decode("utf-8") # 解析字符串 root = etree.fromstring(file_string.encode()) # 需要字节数据,或直接传字符串(Python 3.8+) # 方法 2:遍历所有 Page 元素(适用于多个 Page) for page in root.findall('Pages/Page'): page_attrib = page.attrib list_page.append(page_attrib) #image_name = page.get('Image') #print(f"遍历提取 Image 值: {image_name}") #image_size = page.get('ImageSize') #print(f"遍历提取 ImageSize 值: {image_size}") #key_value = page.get('Key') #print(f"遍历提取 Key 值: {key_value}") #image_width = page.get('ImageWidth') #print(f"遍历提取 ImageWidth 值: {image_width}") #image_height = page.get('ImageHeight') #print(f"遍历提取 ImageHeight 值: {image_height}") except Exception as e: raise exit(f"获取 ComicInfo.xml 文件中的 标签值失败: {zip_file},错误: {str(e)}") # data["list_page"] = list_page data["list_hash"] = self.generate_xxhash(list_page) return data def generate_xxhash(self, data: Any) -> str: """ 使用 xxhash 生成更快的哈希值 特点: - 比 MD5 快 2-5 倍 - 产生 64 位或 128 位哈希 """ # 创建哈希对象 hasher = xxhash.xxh64() # 使用 JSON 序列化 serialized = json.dumps(str(data), sort_keys=True, ensure_ascii=False) # 更新哈希 hasher.update(serialized.encode('utf-8')) # 返回十六进制摘要 return hasher.hexdigest() def extract_duplicate_files(self, data: List[Dict[str, str]]) -> Dict[str, List[str]]: """ 提取具有重复 list_hash 的文件名 参数: data: 包含字典的列表,每个字典包含 'file' 和 'list_hash' 键 返回: 字典: {重复的list_hash: [重复的文件名列表]} """ # 第一步: 创建哈希映射表 hash_map = defaultdict(list) # 第二步: 填充映射表 (O(n) 时间复杂度) for item in data: file_name = item['file'] list_hash = item['list_hash'] hash_map[list_hash].append(file_name) # 第三步: 过滤出重复项 (O(m) 时间复杂度,m 是唯一哈希数) duplicates = { hash_val: files for hash_val, files in hash_map.items() if len(files) > 1 } return duplicates if __name__ == "__main1__": # 清除3KB以下CBZ文件 # comicInfo().update_comicinfo_cbz("") #cbz_path = "/Users/cc/Documents/Dev/WorkSpace/VSCodeProjects/NewComicDownloader/CBZ/rm_comic/福利女姊姊/第1话 福利女姊姊.CBZ" dir_path = "CBZ/rm_comic" #dir_path = "/mnt/Comics/CBZ/rm_comic" for dir in os.listdir(dir_path): c_dir = os.path.join(dir_path, dir) if os.path.isdir(c_dir): files = list(FileNaming.get_filenames_optimized(c_dir, ext_filter=['.CBZ'])) for file in files: # 获取文件的创建时间(仅在Linux/MacOS中可用) # 修改时间 create_time = time.localtime(os.utime(file)) # 注意:st_birthtime 在Linux/MacOS中可用,但不是所有系统都支持 # 格式化时间 formatted_time = time.strftime('%Y%m%d%H', create_time) if int(formatted_time) > 2025020401: print(f"{file} 文件创建时间:", formatted_time) # 更新ComicInfoxml # comicInfo().update_comicinfo_cbz(file) # 检查CBZ是否存在ComicInfo.xml comicInfo().ver_comicinfo_xml(file) #if size < 3000: # os.remove(file) # print(f"已删除{file}") if __name__ == "__main__": # 批量删除漫画下的重复图片章节 # comicInfo()._comic_info_xml_pages("/Users/cc/Documents/Dev/WorkSpace/VSCodeProjects/NewComicDownloader/CBZ/rm_comic/和朋友的妈妈做朋友/第36话 36.CBZ") dir_path = "CBZ/rm_comic" #dir_path = "/mnt/Comics/CBZ/rm_comic" for dir in os.listdir(dir_path): c_dir = os.path.join(dir_path, dir) if os.path.isdir(c_dir): comic_pages = [] files = list(FileNaming.get_filenames_optimized(c_dir, ext_filter=['.CBZ'])) for file in files: page_data = comicInfo()._comic_info_xml_pages(file) comic_pages.append(page_data) #print(page_data) # 一本漫画读取完毕 #print(comic_pages) duplicates = comicInfo().extract_duplicate_files(comic_pages) for hash_val, delete_files in duplicates.items(): # 删除重复文件 for file_path in delete_files: try: # os.remove(file_path) print(f"已删除: {file_path}") except Exception as e: print(f"删除失败 {file_path}: {e}")