440 lines
19 KiB
Python
440 lines
19 KiB
Python
from src.common.naming import FileNaming
|
||
from src.common.ComicInfo import ImageInfo, ComicInfo as ci, ComicPageInfo
|
||
from zipfile import ZipFile
|
||
from datetime import datetime
|
||
import time, shutil,re, xxhash, json
|
||
from typing import Any
|
||
import os,hashlib
|
||
import xml.etree.ElementTree as ET
|
||
from PIL import Image
|
||
from io import BytesIO
|
||
from tempfile import NamedTemporaryFile
|
||
from xml.dom import minidom
|
||
from src.common.ComicInfo import ComicInfoXml
|
||
from lxml import etree
|
||
from collections import defaultdict
|
||
from typing import List, Dict, Tuple
|
||
|
||
class test:
|
||
|
||
def clean_min_cbz(self):
|
||
"""
|
||
清理3KB以下CBZ文件
|
||
"""
|
||
dir_path = "/mnt/Comics/CBZ/rm_comic"
|
||
for dir in os.listdir(dir_path):
|
||
c_dir = os.path.join(dir_path, dir)
|
||
if os.path.isdir(c_dir):
|
||
files = list(FileNaming.get_filenames_optimized(c_dir, ext_filter=['.CBZ']))
|
||
for file in files:
|
||
size = os.path.getsize(file)
|
||
if size < 3000:
|
||
os.remove(file)
|
||
print(f"已删除{file}")
|
||
|
||
def _clean_old_cbz(self, cbz_path):
|
||
m_time = datetime.fromtimestamp(os.path.getmtime(cbz_path))
|
||
str_strftime = '%Y%m%d'
|
||
zip_time = m_time.strftime(str_strftime)
|
||
|
||
with ZipFile(cbz_path, 'r') as zip_ref:
|
||
old_img = 0
|
||
for file_info in zip_ref.infolist():
|
||
# 获取日期时间信息,格式为 (year, month, day, hour, minute, second)
|
||
date_time = file_info.date_time
|
||
# 将日期时间元组转换为datetime对象
|
||
dt = datetime(*date_time)
|
||
# 格式化输出日期时间,例如:YYYY-MM-DD HH:MM:SS
|
||
file_date_time = dt.strftime(str_strftime)
|
||
# 一周内的图片跳过
|
||
if int(zip_time) - int(file_date_time) > 7:
|
||
#print(f"Clear Filename: {file_info.filename}, zip: {cbz_path}")
|
||
old_img += 1
|
||
|
||
if old_img > 0:
|
||
#os.remove(cbz_path)
|
||
print(f"remove cbz {cbz_path}")
|
||
|
||
def clean_old_cbz(self):
|
||
dir_path = "/mnt/Comics/CBZ/rm_comic"
|
||
for dir in os.listdir(dir_path):
|
||
c_dir = os.path.join(dir_path, dir)
|
||
if os.path.isdir(c_dir):
|
||
files = list(FileNaming.get_filenames_optimized(c_dir, ext_filter=['.CBZ']))
|
||
for file in files:
|
||
self._clean_old_cbz(file)
|
||
|
||
class comicInfo:
|
||
|
||
def find_actual_path(self, zip_ref, target_path):
|
||
"""不区分大小写查找压缩包内的实际文件路径"""
|
||
target_lower = target_path.lower()
|
||
for name in zip_ref.namelist():
|
||
if name.lower() == target_lower:
|
||
return name
|
||
return None
|
||
|
||
def process_cbz(self, cbz_path):
|
||
try:
|
||
with ZipFile(cbz_path, 'r') as cbz:
|
||
# ============================================
|
||
# 第一部分:读取 ComicInfo.xml 的元数据字段
|
||
# ============================================
|
||
xml_files = [f for f in cbz.namelist() if f.lower() == 'comicinfo.xml']
|
||
if not xml_files:
|
||
print("未找到 ComicInfo.xml")
|
||
return None
|
||
xml_file_name = xml_files[0]
|
||
|
||
# 解析 XML 元数据
|
||
metadata = {}
|
||
with cbz.open(xml_file_name) as xml_file:
|
||
xml_content = xml_file.read().decode('utf-8')
|
||
root = ET.fromstring(xml_content)
|
||
|
||
# 定义需要提取的元数据字段(用户自定义的字段列表)
|
||
metadata_fields = [
|
||
"Title", "Series", "Number", "Summary", "Writer",
|
||
"Genre", "Tags", "PageCount", "AgeRating"
|
||
]
|
||
|
||
for field in metadata_fields:
|
||
element = root.find(field)
|
||
metadata[field] = element.text if element is not None else None
|
||
|
||
# ============================================
|
||
# 第二部分:读取 Page 标签的图片信息
|
||
# ============================================
|
||
pages_info = []
|
||
with cbz.open(xml_file_name) as xml_file:
|
||
xml_content = xml_file.read().decode('utf-8')
|
||
root = ET.fromstring(xml_content)
|
||
|
||
# 提取所有 Page 标签
|
||
pages = root.find('Pages')
|
||
if pages is None:
|
||
print("XML 中缺少 Pages 标签")
|
||
#return {"metadata": metadata, "pages": None}
|
||
|
||
if pages != None: page_list = pages.findall('Page')
|
||
#if not page_list:
|
||
# print("Pages 标签下无 Page 元素")
|
||
# return {"metadata": metadata, "pages": None}
|
||
|
||
# 收集图片路径
|
||
if pages != None: image_paths = [page.get('Image') for page in page_list if page.get('Image')]
|
||
else: image_paths = []
|
||
if len(image_paths) == 0:
|
||
print(f"{cbz_path} {image_paths} 为空")
|
||
img_count = 1
|
||
while 1:
|
||
actual_path = self.find_actual_path(cbz, "{:0>3d}".format(img_count)+".jpg")
|
||
if actual_path:
|
||
image_paths.append("{:0>3d}".format(img_count))
|
||
img_count += 1
|
||
else:
|
||
break
|
||
print(f"生成新的paths{image_paths}")
|
||
# 处理每个图片文件
|
||
for img_path in image_paths:
|
||
actual_path = self.find_actual_path(cbz, img_path+".jpg")
|
||
if not actual_path:
|
||
print(f"警告:图片 '{img_path}' 不存在于压缩包中")
|
||
continue
|
||
|
||
with cbz.open(actual_path) as img_file:
|
||
content = img_file.read()
|
||
|
||
# 计算 MD5 和文件大小
|
||
file_md5 = hashlib.md5(content).hexdigest()
|
||
file_size = len(content)
|
||
|
||
# 读取图片尺寸
|
||
img_width, img_height = None, None
|
||
try:
|
||
with Image.open(BytesIO(content)) as img:
|
||
img_width, img_height = img.size
|
||
except Exception as e:
|
||
print(f"无法读取图片尺寸:{actual_path},错误:{e}")
|
||
|
||
# 存储图片信息
|
||
pages_info.append({
|
||
"name": os.path.basename(actual_path).split(".")[0],
|
||
"size": file_size,
|
||
"key": file_md5,
|
||
"width": img_width,
|
||
"height": img_height
|
||
})
|
||
|
||
return {
|
||
"metadata": metadata,
|
||
"pages": pages_info
|
||
}
|
||
except Exception as e:
|
||
print(f"处理 CBZ 文件时出错: {e}")
|
||
raise exit(f"处理CBZ出错")
|
||
|
||
def generate_comic_info_xml(self, metadata, pages_info):
|
||
"""根据元数据和页面信息生成 ComicInfo.xml 内容"""
|
||
# 创建根节点
|
||
root = ET.Element('ComicInfo')
|
||
root.set('xmlns:xsd', 'http://www.w3.org/2001/XMLSchema')
|
||
root.set('xmlns:xsi', 'http://www.w3.org/2001/XMLSchema-instance')
|
||
|
||
# 添加元数据字段
|
||
for field, value in metadata.items():
|
||
if value is not None:
|
||
elem = ET.SubElement(root, field)
|
||
elem.text = str(value)
|
||
|
||
# 添加 Pages 节点
|
||
if pages_info:
|
||
pages_elem = ET.SubElement(root, "Pages")
|
||
for page in pages_info:
|
||
# 示例中保留关键属性,可根据需要扩展其他属性
|
||
page_elem = ET.SubElement(pages_elem, "Page", attrib={
|
||
"Image": page.get("name", ""),
|
||
"ImageSize": str(page.get("size", 0)),
|
||
"Key": str(page.get("key", 0)),
|
||
"ImageWidth": str(page.get("width", 0)),
|
||
"ImageHeight": str(page.get("height", 0))
|
||
})
|
||
|
||
# 生成 XML 字符串
|
||
#tree = ET.ElementTree(root)
|
||
#xml_content = BytesIO()
|
||
#tree.write(xml_content, encoding="utf-8", xml_declaration=True)
|
||
|
||
# Create a formatted XML string
|
||
xml_str = ET.tostring(root, encoding='utf-8', method='xml')
|
||
parsed_xml = minidom.parseString(xml_str)
|
||
formatted_xml = parsed_xml.toprettyxml(indent=" ", encoding="utf-8") # Adjust the number of spaces for indentation as needed
|
||
|
||
# Convert bytes to string and add XML declaration
|
||
return formatted_xml.decode('utf-8')
|
||
|
||
#return xml_content.getvalue()
|
||
|
||
def update_cbz_with_new_xml(self, cbz_path, new_xml_content, output_path=None):
|
||
"""将新生成的 ComicInfo.xml 更新到 CBZ 文件中"""
|
||
try:
|
||
# 默认输出路径为原文件路径(覆盖原文件)
|
||
if output_path is None:
|
||
output_path = cbz_path
|
||
|
||
# 创建临时文件处理覆盖操作
|
||
with NamedTemporaryFile(delete=False) as tmp:
|
||
tmp.close()
|
||
shutil.move(cbz_path, tmp.name)
|
||
# 读取原文件并替换 ComicInfo.xml
|
||
with ZipFile(tmp.name, 'r') as source_zip:
|
||
with ZipFile(output_path, 'w') as new_zip:
|
||
# 复制原文件(跳过旧 XML)
|
||
for item in source_zip.infolist():
|
||
if item.filename.lower() != 'comicinfo.xml':
|
||
new_zip.writestr(item, source_zip.read(item.filename))
|
||
|
||
# 添加新 XML
|
||
new_zip.writestr("ComicInfo.xml", new_xml_content)
|
||
|
||
os.remove(tmp.name) # 清理临时文件
|
||
return True
|
||
except Exception as e:
|
||
print(f"更新 CBZ 文件失败: {e}")
|
||
if os.path.exists(tmp.name):
|
||
shutil.move(tmp.name, cbz_path) # 恢复备份
|
||
raise exit(f"更新失败")
|
||
|
||
def update_comicinfo_cbz(self, cbz_path):
|
||
"""更新CBZ中的ComicInfo.xml
|
||
|
||
Args:
|
||
cbz_path (_type_): _description_
|
||
"""
|
||
data = self.process_cbz(cbz_path)
|
||
metadata = data["metadata"]
|
||
author = data["metadata"].get("Writer", "")
|
||
tags = data["metadata"].get("Tags", "")
|
||
|
||
(list_value, value) = [[], str(author).replace("&", " ")]
|
||
for val in set(str(value).split(" ")):
|
||
list_value.append(val)
|
||
author = FileNaming.chinese_file_name(",".join(list_value))
|
||
data["metadata"]["Writer"] = author
|
||
# 生成 XML 内容
|
||
new_xml = self.generate_comic_info_xml(data["metadata"], data["pages"])
|
||
xml_file = "NewComicInfo.xml"
|
||
# 测试:保存 XML 到本地查看
|
||
with open(xml_file, "w", encoding="utf-8") as f:
|
||
f.write(new_xml)
|
||
print(f"已生成 {xml_file}")
|
||
ComicInfoXml()._validate_xml_with_xsd_file(xml_file=xml_file ,xsd_file="src/assets/ComicInfo_2.1.xsd")
|
||
# 更新 CBZ 文件(示例路径,实际操作前请备份)
|
||
success = comicInfo().update_cbz_with_new_xml(cbz_path, new_xml)
|
||
# if success:
|
||
# print("CBZ 文件更新成功")
|
||
os.remove(xml_file)
|
||
|
||
def ver_comicinfo_xml(self, cbz_path):
|
||
try:
|
||
with ZipFile(cbz_path, 'r') as cbz:
|
||
# ============================================
|
||
# 第一部分:读取 ComicInfo.xml 的元数据字段
|
||
# ============================================
|
||
xml_files = [f for f in cbz.namelist() if f.lower() == 'comicinfo.xml']
|
||
if not xml_files:
|
||
print(f" {cbz_path} 未找到 ComicInfo.xml")
|
||
#os.remove(cbz_path)
|
||
except:
|
||
raise exit(f"ver_comicinfo_xml 错误")
|
||
|
||
def clear_cbz(self):
|
||
# 清除3KB以下CBZ文件
|
||
# comicInfo().update_comicinfo_cbz("")
|
||
#cbz_path = "/Users/cc/Documents/Dev/WorkSpace/VSCodeProjects/NewComicDownloader/CBZ/rm_comic/福利女姊姊/第1话 福利女姊姊.CBZ"
|
||
|
||
dir_path = "CBZ/rm_comic"
|
||
#dir_path = "/mnt/Comics/CBZ/rm_comic"
|
||
for dir in os.listdir(dir_path):
|
||
c_dir = os.path.join(dir_path, dir)
|
||
if os.path.isdir(c_dir):
|
||
files = list(FileNaming.get_filenames_optimized(c_dir, ext_filter=['.CBZ']))
|
||
for file in files:
|
||
# 获取文件的创建时间(仅在Linux/MacOS中可用)
|
||
# 修改时间
|
||
create_time = time.localtime(os.utime(file)) # 注意:st_birthtime 在Linux/MacOS中可用,但不是所有系统都支持
|
||
# 格式化时间
|
||
formatted_time = time.strftime('%Y%m%d%H', create_time)
|
||
if int(formatted_time) > 2025020401:
|
||
print(f"{file} 文件创建时间:", formatted_time)
|
||
# 更新ComicInfoxml
|
||
# comicInfo().update_comicinfo_cbz(file)
|
||
# 检查CBZ是否存在ComicInfo.xml
|
||
comicInfo().ver_comicinfo_xml(file)
|
||
#if size < 3000:
|
||
# os.remove(file)
|
||
# print(f"已删除{file}")
|
||
|
||
|
||
def _comic_info_xml_pages(self, zip_file):
|
||
"""获取 ComicInfo.xml 文件中的 <PageCount> 标签值"""
|
||
# data = { "file" : os.path.basename(zip_file), "path" : str(zip_file) }
|
||
# data = { "file" : os.path.basename(zip_file)}
|
||
data = { "file" : str(zip_file) }
|
||
list_page = []
|
||
# 打开ZIP文件
|
||
with ZipFile(str(zip_file), 'r') as z:
|
||
try:
|
||
# 假设ZIP中的文件名是'text.txt'
|
||
with z.open('ComicInfo.xml', 'r') as file:
|
||
# 从文件流中解析 XML 数据
|
||
file_string = file.read().decode("utf-8")
|
||
# 解析字符串
|
||
root = etree.fromstring(file_string.encode()) # 需要字节数据,或直接传字符串(Python 3.8+)
|
||
# 方法 2:遍历所有 Page 元素(适用于多个 Page)
|
||
for page in root.findall('Pages/Page'):
|
||
page_attrib = page.attrib
|
||
list_page.append(page_attrib)
|
||
#image_name = page.get('Image')
|
||
#print(f"遍历提取 Image 值: {image_name}")
|
||
#image_size = page.get('ImageSize')
|
||
#print(f"遍历提取 ImageSize 值: {image_size}")
|
||
#key_value = page.get('Key')
|
||
#print(f"遍历提取 Key 值: {key_value}")
|
||
#image_width = page.get('ImageWidth')
|
||
#print(f"遍历提取 ImageWidth 值: {image_width}")
|
||
#image_height = page.get('ImageHeight')
|
||
#print(f"遍历提取 ImageHeight 值: {image_height}")
|
||
except Exception as e:
|
||
raise exit(f"获取 ComicInfo.xml 文件中的 <PageCount> 标签值失败: {zip_file},错误: {str(e)}")
|
||
# data["list_page"] = list_page
|
||
data["list_hash"] = self.generate_xxhash(list_page)
|
||
return data
|
||
|
||
def _generate_xxhash(self, data: Any) -> str:
|
||
"""
|
||
使用 xxhash 生成更快的哈希值
|
||
|
||
特点:
|
||
- 比 MD5 快 2-5 倍
|
||
- 产生 64 位或 128 位哈希
|
||
"""
|
||
# 创建哈希对象
|
||
hasher = xxhash.xxh64()
|
||
|
||
# 使用 JSON 序列化
|
||
serialized = json.dumps(str(data), sort_keys=True, ensure_ascii=False)
|
||
|
||
# 更新哈希
|
||
hasher.update(serialized.encode('utf-8'))
|
||
|
||
# 返回十六进制摘要
|
||
return hasher.hexdigest()
|
||
|
||
def _extract_duplicate_files(self, data: List[Dict[str, str]]) -> Dict[str, List[str]]:
|
||
"""
|
||
提取具有重复 list_hash 的文件名
|
||
|
||
参数:
|
||
data: 包含字典的列表,每个字典包含 'file' 和 'list_hash' 键
|
||
|
||
返回:
|
||
字典: {重复的list_hash: [重复的文件名列表]}
|
||
"""
|
||
# 第一步: 创建哈希映射表
|
||
hash_map = defaultdict(list)
|
||
|
||
# 第二步: 填充映射表 (O(n) 时间复杂度)
|
||
for item in data:
|
||
file_name = item['file']
|
||
list_hash = item['list_hash']
|
||
hash_map[list_hash].append(file_name)
|
||
|
||
# 第三步: 过滤出重复项 (O(m) 时间复杂度,m 是唯一哈希数)
|
||
duplicates = {
|
||
hash_val: files
|
||
for hash_val, files in hash_map.items()
|
||
if len(files) > 1
|
||
}
|
||
|
||
return duplicates
|
||
|
||
|
||
def delete_repeat_file(self, cbz_path) -> None:
|
||
""" 删除 CBZ 文件中的重复图片章节
|
||
|
||
Args:
|
||
cbz_path (_type_): _description_
|
||
"""
|
||
# 批量删除漫画下的重复图片章节
|
||
|
||
|
||
dir_path = "CBZ/rm_comic"
|
||
#dir_path = "/mnt/Comics/CBZ/rm_comic"
|
||
for dir in os.listdir(dir_path):
|
||
c_dir = os.path.join(dir_path, dir)
|
||
if os.path.isdir(c_dir):
|
||
comic_pages = []
|
||
files = list(FileNaming.get_filenames_optimized(c_dir, ext_filter=['.CBZ']))
|
||
for file in files:
|
||
page_data = self._comic_info_xml_pages(file)
|
||
comic_pages.append(page_data)
|
||
# 一本漫画读取完毕
|
||
duplicates = comicInfo()._extract_duplicate_files(comic_pages)
|
||
for hash_val, delete_files in duplicates.items():
|
||
# 删除重复文件
|
||
for file_path in delete_files:
|
||
try:
|
||
# os.remove(file_path)
|
||
print(f"已删除: {file_path}")
|
||
except Exception as e:
|
||
print(f"删除失败 {file_path}: {e}")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
print("开始处理")
|
||
# ComicInfoXml()._xml_file_to_comicinfo("/Users/cc/Documents/Dev/WorkSpace/VSCodeProjects/NewComicDownloader/CBZ/rm_comic/和朋友的妈妈做朋友/第37话 37.CBZ")
|
||
xml_path = ComicInfoXml().update_comicinfo_count(37,"/Users/cc/Documents/Dev/WorkSpace/VSCodeProjects/NewComicDownloader/CBZ/rm_comic/和朋友的妈妈做朋友/第37话 37.CBZ")
|
||
comicInfo().update_cbz_with_new_xml("/Users/cc/Documents/Dev/WorkSpace/VSCodeProjects/NewComicDownloader/CBZ/rm_comic/和朋友的妈妈做朋友/第37话 37.CBZ", xml_path.read_text(encoding="utf-8"))
|
||
#items = ci().__dict__.keys()
|
||
#print(items) |