NewComicDownloader/test.py
2025-07-11 09:04:51 +08:00

440 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from src.common.naming import FileNaming
from src.common.ComicInfo import ImageInfo, ComicInfo as ci, ComicPageInfo
from zipfile import ZipFile
from datetime import datetime
import time, shutil,re, xxhash, json
from typing import Any
import os,hashlib
import xml.etree.ElementTree as ET
from PIL import Image
from io import BytesIO
from tempfile import NamedTemporaryFile
from xml.dom import minidom
from src.common.ComicInfo import ComicInfoXml
from lxml import etree
from collections import defaultdict
from typing import List, Dict, Tuple
class test:
def clean_min_cbz(self):
"""
清理3KB以下CBZ文件
"""
dir_path = "/mnt/Comics/CBZ/rm_comic"
for dir in os.listdir(dir_path):
c_dir = os.path.join(dir_path, dir)
if os.path.isdir(c_dir):
files = list(FileNaming.get_filenames_optimized(c_dir, ext_filter=['.CBZ']))
for file in files:
size = os.path.getsize(file)
if size < 3000:
os.remove(file)
print(f"已删除{file}")
def _clean_old_cbz(self, cbz_path):
m_time = datetime.fromtimestamp(os.path.getmtime(cbz_path))
str_strftime = '%Y%m%d'
zip_time = m_time.strftime(str_strftime)
with ZipFile(cbz_path, 'r') as zip_ref:
old_img = 0
for file_info in zip_ref.infolist():
# 获取日期时间信息,格式为 (year, month, day, hour, minute, second)
date_time = file_info.date_time
# 将日期时间元组转换为datetime对象
dt = datetime(*date_time)
# 格式化输出日期时间例如YYYY-MM-DD HH:MM:SS
file_date_time = dt.strftime(str_strftime)
# 一周内的图片跳过
if int(zip_time) - int(file_date_time) > 7:
#print(f"Clear Filename: {file_info.filename}, zip: {cbz_path}")
old_img += 1
if old_img > 0:
#os.remove(cbz_path)
print(f"remove cbz {cbz_path}")
def clean_old_cbz(self):
dir_path = "/mnt/Comics/CBZ/rm_comic"
for dir in os.listdir(dir_path):
c_dir = os.path.join(dir_path, dir)
if os.path.isdir(c_dir):
files = list(FileNaming.get_filenames_optimized(c_dir, ext_filter=['.CBZ']))
for file in files:
self._clean_old_cbz(file)
class comicInfo:
def find_actual_path(self, zip_ref, target_path):
"""不区分大小写查找压缩包内的实际文件路径"""
target_lower = target_path.lower()
for name in zip_ref.namelist():
if name.lower() == target_lower:
return name
return None
def process_cbz(self, cbz_path):
try:
with ZipFile(cbz_path, 'r') as cbz:
# ============================================
# 第一部分:读取 ComicInfo.xml 的元数据字段
# ============================================
xml_files = [f for f in cbz.namelist() if f.lower() == 'comicinfo.xml']
if not xml_files:
print("未找到 ComicInfo.xml")
return None
xml_file_name = xml_files[0]
# 解析 XML 元数据
metadata = {}
with cbz.open(xml_file_name) as xml_file:
xml_content = xml_file.read().decode('utf-8')
root = ET.fromstring(xml_content)
# 定义需要提取的元数据字段(用户自定义的字段列表)
metadata_fields = [
"Title", "Series", "Number", "Summary", "Writer",
"Genre", "Tags", "PageCount", "AgeRating"
]
for field in metadata_fields:
element = root.find(field)
metadata[field] = element.text if element is not None else None
# ============================================
# 第二部分:读取 Page 标签的图片信息
# ============================================
pages_info = []
with cbz.open(xml_file_name) as xml_file:
xml_content = xml_file.read().decode('utf-8')
root = ET.fromstring(xml_content)
# 提取所有 Page 标签
pages = root.find('Pages')
if pages is None:
print("XML 中缺少 Pages 标签")
#return {"metadata": metadata, "pages": None}
if pages != None: page_list = pages.findall('Page')
#if not page_list:
# print("Pages 标签下无 Page 元素")
# return {"metadata": metadata, "pages": None}
# 收集图片路径
if pages != None: image_paths = [page.get('Image') for page in page_list if page.get('Image')]
else: image_paths = []
if len(image_paths) == 0:
print(f"{cbz_path} {image_paths} 为空")
img_count = 1
while 1:
actual_path = self.find_actual_path(cbz, "{:0>3d}".format(img_count)+".jpg")
if actual_path:
image_paths.append("{:0>3d}".format(img_count))
img_count += 1
else:
break
print(f"生成新的paths{image_paths}")
# 处理每个图片文件
for img_path in image_paths:
actual_path = self.find_actual_path(cbz, img_path+".jpg")
if not actual_path:
print(f"警告:图片 '{img_path}' 不存在于压缩包中")
continue
with cbz.open(actual_path) as img_file:
content = img_file.read()
# 计算 MD5 和文件大小
file_md5 = hashlib.md5(content).hexdigest()
file_size = len(content)
# 读取图片尺寸
img_width, img_height = None, None
try:
with Image.open(BytesIO(content)) as img:
img_width, img_height = img.size
except Exception as e:
print(f"无法读取图片尺寸:{actual_path},错误:{e}")
# 存储图片信息
pages_info.append({
"name": os.path.basename(actual_path).split(".")[0],
"size": file_size,
"key": file_md5,
"width": img_width,
"height": img_height
})
return {
"metadata": metadata,
"pages": pages_info
}
except Exception as e:
print(f"处理 CBZ 文件时出错: {e}")
raise exit(f"处理CBZ出错")
def generate_comic_info_xml(self, metadata, pages_info):
"""根据元数据和页面信息生成 ComicInfo.xml 内容"""
# 创建根节点
root = ET.Element('ComicInfo')
root.set('xmlns:xsd', 'http://www.w3.org/2001/XMLSchema')
root.set('xmlns:xsi', 'http://www.w3.org/2001/XMLSchema-instance')
# 添加元数据字段
for field, value in metadata.items():
if value is not None:
elem = ET.SubElement(root, field)
elem.text = str(value)
# 添加 Pages 节点
if pages_info:
pages_elem = ET.SubElement(root, "Pages")
for page in pages_info:
# 示例中保留关键属性,可根据需要扩展其他属性
page_elem = ET.SubElement(pages_elem, "Page", attrib={
"Image": page.get("name", ""),
"ImageSize": str(page.get("size", 0)),
"Key": str(page.get("key", 0)),
"ImageWidth": str(page.get("width", 0)),
"ImageHeight": str(page.get("height", 0))
})
# 生成 XML 字符串
#tree = ET.ElementTree(root)
#xml_content = BytesIO()
#tree.write(xml_content, encoding="utf-8", xml_declaration=True)
# Create a formatted XML string
xml_str = ET.tostring(root, encoding='utf-8', method='xml')
parsed_xml = minidom.parseString(xml_str)
formatted_xml = parsed_xml.toprettyxml(indent=" ", encoding="utf-8") # Adjust the number of spaces for indentation as needed
# Convert bytes to string and add XML declaration
return formatted_xml.decode('utf-8')
#return xml_content.getvalue()
def update_cbz_with_new_xml(self, cbz_path, new_xml_content, output_path=None):
"""将新生成的 ComicInfo.xml 更新到 CBZ 文件中"""
try:
# 默认输出路径为原文件路径(覆盖原文件)
if output_path is None:
output_path = cbz_path
# 创建临时文件处理覆盖操作
with NamedTemporaryFile(delete=False) as tmp:
tmp.close()
shutil.move(cbz_path, tmp.name)
# 读取原文件并替换 ComicInfo.xml
with ZipFile(tmp.name, 'r') as source_zip:
with ZipFile(output_path, 'w') as new_zip:
# 复制原文件(跳过旧 XML
for item in source_zip.infolist():
if item.filename.lower() != 'comicinfo.xml':
new_zip.writestr(item, source_zip.read(item.filename))
# 添加新 XML
new_zip.writestr("ComicInfo.xml", new_xml_content)
os.remove(tmp.name) # 清理临时文件
return True
except Exception as e:
print(f"更新 CBZ 文件失败: {e}")
if os.path.exists(tmp.name):
shutil.move(tmp.name, cbz_path) # 恢复备份
raise exit(f"更新失败")
def update_comicinfo_cbz(self, cbz_path):
"""更新CBZ中的ComicInfo.xml
Args:
cbz_path (_type_): _description_
"""
data = self.process_cbz(cbz_path)
metadata = data["metadata"]
author = data["metadata"].get("Writer", "")
tags = data["metadata"].get("Tags", "")
(list_value, value) = [[], str(author).replace("&", " ")]
for val in set(str(value).split(" ")):
list_value.append(val)
author = FileNaming.chinese_file_name(",".join(list_value))
data["metadata"]["Writer"] = author
# 生成 XML 内容
new_xml = self.generate_comic_info_xml(data["metadata"], data["pages"])
xml_file = "NewComicInfo.xml"
# 测试:保存 XML 到本地查看
with open(xml_file, "w", encoding="utf-8") as f:
f.write(new_xml)
print(f"已生成 {xml_file}")
ComicInfoXml()._validate_xml_with_xsd_file(xml_file=xml_file ,xsd_file="src/assets/ComicInfo_2.1.xsd")
# 更新 CBZ 文件(示例路径,实际操作前请备份)
success = comicInfo().update_cbz_with_new_xml(cbz_path, new_xml)
# if success:
# print("CBZ 文件更新成功")
os.remove(xml_file)
def ver_comicinfo_xml(self, cbz_path):
try:
with ZipFile(cbz_path, 'r') as cbz:
# ============================================
# 第一部分:读取 ComicInfo.xml 的元数据字段
# ============================================
xml_files = [f for f in cbz.namelist() if f.lower() == 'comicinfo.xml']
if not xml_files:
print(f" {cbz_path} 未找到 ComicInfo.xml")
#os.remove(cbz_path)
except:
raise exit(f"ver_comicinfo_xml 错误")
def clear_cbz(self):
# 清除3KB以下CBZ文件
# comicInfo().update_comicinfo_cbz("")
#cbz_path = "/Users/cc/Documents/Dev/WorkSpace/VSCodeProjects/NewComicDownloader/CBZ/rm_comic/福利女姊姊/第1话 福利女姊姊.CBZ"
dir_path = "CBZ/rm_comic"
#dir_path = "/mnt/Comics/CBZ/rm_comic"
for dir in os.listdir(dir_path):
c_dir = os.path.join(dir_path, dir)
if os.path.isdir(c_dir):
files = list(FileNaming.get_filenames_optimized(c_dir, ext_filter=['.CBZ']))
for file in files:
# 获取文件的创建时间仅在Linux/MacOS中可用
# 修改时间
create_time = time.localtime(os.utime(file)) # 注意st_birthtime 在Linux/MacOS中可用但不是所有系统都支持
# 格式化时间
formatted_time = time.strftime('%Y%m%d%H', create_time)
if int(formatted_time) > 2025020401:
print(f"{file} 文件创建时间:", formatted_time)
# 更新ComicInfoxml
# comicInfo().update_comicinfo_cbz(file)
# 检查CBZ是否存在ComicInfo.xml
comicInfo().ver_comicinfo_xml(file)
#if size < 3000:
# os.remove(file)
# print(f"已删除{file}")
def _comic_info_xml_pages(self, zip_file):
"""获取 ComicInfo.xml 文件中的 <PageCount> 标签值"""
# data = { "file" : os.path.basename(zip_file), "path" : str(zip_file) }
# data = { "file" : os.path.basename(zip_file)}
data = { "file" : str(zip_file) }
list_page = []
# 打开ZIP文件
with ZipFile(str(zip_file), 'r') as z:
try:
# 假设ZIP中的文件名是'text.txt'
with z.open('ComicInfo.xml', 'r') as file:
# 从文件流中解析 XML 数据
file_string = file.read().decode("utf-8")
# 解析字符串
root = etree.fromstring(file_string.encode()) # 需要字节数据或直接传字符串Python 3.8+
# 方法 2遍历所有 Page 元素(适用于多个 Page
for page in root.findall('Pages/Page'):
page_attrib = page.attrib
list_page.append(page_attrib)
#image_name = page.get('Image')
#print(f"遍历提取 Image 值: {image_name}")
#image_size = page.get('ImageSize')
#print(f"遍历提取 ImageSize 值: {image_size}")
#key_value = page.get('Key')
#print(f"遍历提取 Key 值: {key_value}")
#image_width = page.get('ImageWidth')
#print(f"遍历提取 ImageWidth 值: {image_width}")
#image_height = page.get('ImageHeight')
#print(f"遍历提取 ImageHeight 值: {image_height}")
except Exception as e:
raise exit(f"获取 ComicInfo.xml 文件中的 <PageCount> 标签值失败: {zip_file},错误: {str(e)}")
# data["list_page"] = list_page
data["list_hash"] = self.generate_xxhash(list_page)
return data
def _generate_xxhash(self, data: Any) -> str:
"""
使用 xxhash 生成更快的哈希值
特点:
- 比 MD5 快 2-5 倍
- 产生 64 位或 128 位哈希
"""
# 创建哈希对象
hasher = xxhash.xxh64()
# 使用 JSON 序列化
serialized = json.dumps(str(data), sort_keys=True, ensure_ascii=False)
# 更新哈希
hasher.update(serialized.encode('utf-8'))
# 返回十六进制摘要
return hasher.hexdigest()
def _extract_duplicate_files(self, data: List[Dict[str, str]]) -> Dict[str, List[str]]:
"""
提取具有重复 list_hash 的文件名
参数:
data: 包含字典的列表,每个字典包含 'file''list_hash'
返回:
字典: {重复的list_hash: [重复的文件名列表]}
"""
# 第一步: 创建哈希映射表
hash_map = defaultdict(list)
# 第二步: 填充映射表 (O(n) 时间复杂度)
for item in data:
file_name = item['file']
list_hash = item['list_hash']
hash_map[list_hash].append(file_name)
# 第三步: 过滤出重复项 (O(m) 时间复杂度m 是唯一哈希数)
duplicates = {
hash_val: files
for hash_val, files in hash_map.items()
if len(files) > 1
}
return duplicates
def delete_repeat_file(self, cbz_path) -> None:
""" 删除 CBZ 文件中的重复图片章节
Args:
cbz_path (_type_): _description_
"""
# 批量删除漫画下的重复图片章节
dir_path = "CBZ/rm_comic"
#dir_path = "/mnt/Comics/CBZ/rm_comic"
for dir in os.listdir(dir_path):
c_dir = os.path.join(dir_path, dir)
if os.path.isdir(c_dir):
comic_pages = []
files = list(FileNaming.get_filenames_optimized(c_dir, ext_filter=['.CBZ']))
for file in files:
page_data = self._comic_info_xml_pages(file)
comic_pages.append(page_data)
# 一本漫画读取完毕
duplicates = comicInfo()._extract_duplicate_files(comic_pages)
for hash_val, delete_files in duplicates.items():
# 删除重复文件
for file_path in delete_files:
try:
# os.remove(file_path)
print(f"已删除: {file_path}")
except Exception as e:
print(f"删除失败 {file_path}: {e}")
if __name__ == "__main__":
print("开始处理")
# ComicInfoXml()._xml_file_to_comicinfo("/Users/cc/Documents/Dev/WorkSpace/VSCodeProjects/NewComicDownloader/CBZ/rm_comic/和朋友的妈妈做朋友/第37话 37.CBZ")
xml_path = ComicInfoXml().update_comicinfo_count(37,"/Users/cc/Documents/Dev/WorkSpace/VSCodeProjects/NewComicDownloader/CBZ/rm_comic/和朋友的妈妈做朋友/第37话 37.CBZ")
comicInfo().update_cbz_with_new_xml("/Users/cc/Documents/Dev/WorkSpace/VSCodeProjects/NewComicDownloader/CBZ/rm_comic/和朋友的妈妈做朋友/第37话 37.CBZ", xml_path.read_text(encoding="utf-8"))
#items = ci().__dict__.keys()
#print(items)