add comicinfo.xml update cbz
This commit is contained in:
parent
97449dca0b
commit
cfae12f9b2
1
run.py
1
run.py
@ -1,5 +1,4 @@
|
|||||||
import asyncio
|
import asyncio
|
||||||
from pathlib import Path
|
|
||||||
from src.sites.manager import MangaManager
|
from src.sites.manager import MangaManager
|
||||||
from src.common.logging import setup_logging
|
from src.common.logging import setup_logging
|
||||||
|
|
||||||
|
|||||||
@ -69,6 +69,7 @@ class Extractor:
|
|||||||
"""提取漫画信息并返回 MangaInfo 实例"""
|
"""提取漫画信息并返回 MangaInfo 实例"""
|
||||||
selectors = self.config.get_selector('manga_list')
|
selectors = self.config.get_selector('manga_list')
|
||||||
info_data = {}
|
info_data = {}
|
||||||
|
info_data['base_url'] = self.config.base_url
|
||||||
for key, selector in selectors.items():
|
for key, selector in selectors.items():
|
||||||
if isinstance(selector, str):
|
if isinstance(selector, str):
|
||||||
element = self.processor.select(tree, selector)
|
element = self.processor.select(tree, selector)
|
||||||
|
|||||||
@ -72,6 +72,14 @@ class Chapter(BaseModel):
|
|||||||
#images: List[ImageItem] = []
|
#images: List[ImageItem] = []
|
||||||
|
|
||||||
class ListManga(BaseModel):
|
class ListManga(BaseModel):
|
||||||
|
|
||||||
|
base_url: str
|
||||||
|
"""漫画网站域名"""
|
||||||
|
@field_validator('base_url', mode='before')
|
||||||
|
def validate_base_url(cls, v):
|
||||||
|
cls.base_url = v
|
||||||
|
return v
|
||||||
|
|
||||||
title: List[str]
|
title: List[str]
|
||||||
|
|
||||||
url: List[HttpUrl]
|
url: List[HttpUrl]
|
||||||
@ -81,10 +89,12 @@ class ListManga(BaseModel):
|
|||||||
list_url = []
|
list_url = []
|
||||||
for url in v:
|
for url in v:
|
||||||
if isinstance(url, str) and not url.startswith('http'):
|
if isinstance(url, str) and not url.startswith('http'):
|
||||||
list_url.append(HttpUrl("https://rouman5.com" + url))
|
list_url.append(HttpUrl(cls.base_url + url))
|
||||||
return list_url
|
return list_url
|
||||||
|
|
||||||
created_at: List[str] = []
|
updated_at: List[str] = []
|
||||||
|
|
||||||
|
last_updated: List[str] = []
|
||||||
|
|
||||||
class MangaInfo(BaseModel):
|
class MangaInfo(BaseModel):
|
||||||
project: str
|
project: str
|
||||||
|
|||||||
@ -730,18 +730,22 @@ class MangaUtils:
|
|||||||
save_data = []
|
save_data = []
|
||||||
|
|
||||||
for manga in self.data:
|
for manga in self.data:
|
||||||
created_at = manga["created_at"]
|
updated_at = manga["updated_at"]
|
||||||
if isinstance(created_at, datetime):
|
last_updated = manga["last_updated"]
|
||||||
|
if isinstance(updated_at, datetime):
|
||||||
str_strftime = '%Y%m%d'
|
str_strftime = '%Y%m%d'
|
||||||
created_at = created_at.strftime(str_strftime)
|
updated_at = updated_at.strftime(str_strftime)
|
||||||
save_data.append({"name" : manga["name"] , "created_at" : created_at})
|
if isinstance(last_updated, datetime):
|
||||||
|
str_strftime = '%Y%m%d'
|
||||||
|
last_updated = last_updated.strftime(str_strftime)
|
||||||
|
save_data.append({"name" : manga["name"] , "updated_at" : updated_at , "last_updated" : last_updated})
|
||||||
|
|
||||||
with self.lock:
|
with self.lock:
|
||||||
with open(temp_path, 'w', encoding='utf-8') as f:
|
with open(temp_path, 'w', encoding='utf-8') as f:
|
||||||
json.dump(save_data, f, indent=2, ensure_ascii=False)
|
json.dump(save_data, f, indent=2, ensure_ascii=False)
|
||||||
temp_path.replace(self.file_path)
|
temp_path.replace(self.file_path)
|
||||||
|
|
||||||
def add_manga(self, name: str, created_at: str = None) -> bool:
|
def add_manga(self, name: str, updated_at: str = None) -> bool:
|
||||||
"""添加新漫画"""
|
"""添加新漫画"""
|
||||||
if not self.validate_name(name):
|
if not self.validate_name(name):
|
||||||
raise ValueError("无效的漫画名称")
|
raise ValueError("无效的漫画名称")
|
||||||
@ -750,13 +754,14 @@ class MangaUtils:
|
|||||||
self.delete_manga(name)
|
self.delete_manga(name)
|
||||||
|
|
||||||
str_strftime = '%Y%m%d'
|
str_strftime = '%Y%m%d'
|
||||||
time = created_at or datetime.now()
|
now_time = datetime.now()
|
||||||
if isinstance(time , datetime):
|
if isinstance(now_time , datetime):
|
||||||
time = time.strftime(str_strftime)
|
now_time = now_time.strftime(str_strftime)
|
||||||
|
|
||||||
new_manga = {
|
new_manga = {
|
||||||
"name": name.strip(),
|
"name": name.strip(),
|
||||||
"created_at": time
|
"updated_at": updated_at,
|
||||||
|
"last_updated": now_time
|
||||||
}
|
}
|
||||||
|
|
||||||
self.data.append(new_manga)
|
self.data.append(new_manga)
|
||||||
@ -804,7 +809,7 @@ class MangaUtils:
|
|||||||
if sort_by == "name":
|
if sort_by == "name":
|
||||||
return sorted(self.data, key=lambda x: x['name'])
|
return sorted(self.data, key=lambda x: x['name'])
|
||||||
elif sort_by == "date":
|
elif sort_by == "date":
|
||||||
return sorted(self.data, key=lambda x: x['created_at'])
|
return sorted(self.data, key=lambda x: x['updated_at'])
|
||||||
return self.data.copy()
|
return self.data.copy()
|
||||||
|
|
||||||
def validate_name(self, name: str) -> bool:
|
def validate_name(self, name: str) -> bool:
|
||||||
@ -817,7 +822,7 @@ class MangaUtils:
|
|||||||
"""批量导入漫画"""
|
"""批量导入漫画"""
|
||||||
for manga in mangas:
|
for manga in mangas:
|
||||||
if self.validate_name(manga["name"]):
|
if self.validate_name(manga["name"]):
|
||||||
self.add_manga(manga["name"], manga.get("created_at"))
|
self.add_manga(manga["name"], manga.get("updated_at"))
|
||||||
|
|
||||||
def find_duplicates(self) -> List[str]:
|
def find_duplicates(self) -> List[str]:
|
||||||
"""查找可能的重复条目(简单版本)"""
|
"""查找可能的重复条目(简单版本)"""
|
||||||
|
|||||||
@ -151,21 +151,21 @@ class MangaManager:
|
|||||||
raise MangaException(f"不支持的网站: {manga_url}")
|
raise MangaException(f"不支持的网站: {manga_url}")
|
||||||
async with list_site_handler() as site:
|
async with list_site_handler() as site:
|
||||||
manga_list = await site.get_manga_list(manga_url)
|
manga_list = await site.get_manga_list(manga_url)
|
||||||
for title,url,created_at in zip(manga_list.title, manga_list.url, manga_list.created_at):
|
for title,url,updated_at in zip(manga_list.title, manga_list.url, manga_list.updated_at):
|
||||||
title = FileNaming.chinese_file_name(title)
|
title = FileNaming.chinese_file_name(title)
|
||||||
save_manga = MangaUtils().search_manga(title)
|
save_manga = MangaUtils().search_manga(title)
|
||||||
created = None
|
updated = None
|
||||||
if save_manga != None: created = save_manga.get('created_at', None)
|
if save_manga != None: updated = save_manga.get('updated_at', None)
|
||||||
if created != None and created_at == created:
|
if updated != None and updated_at == updated:
|
||||||
created = save_manga.get('created_at', None)
|
updated = save_manga.get('updated_at', None)
|
||||||
logger.info(f"{save_manga} 已存在")
|
logger.info(f"{save_manga} 已存在")
|
||||||
else:
|
else:
|
||||||
logger.info(f"开始下载 漫画: {title}")
|
logger.info(f"开始下载 漫画: {title}")
|
||||||
logger.info(f"{url}")
|
logger.info(f"{url}")
|
||||||
await self.download_manga(str(url), title = title, created_at = created_at)
|
await self.download_manga(str(url), title = title, updated_at = updated_at )
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
async def download_manga(cls, url: str, title: str = None, created_at: str = None, save_dir: Path = BASE_IMAGES_DIR):
|
async def download_manga(cls, url: str, title: str = None, updated_at: str = None, save_dir: Path = BASE_IMAGES_DIR):
|
||||||
"""下载漫画"""
|
"""下载漫画"""
|
||||||
manager = MangaManager(save_dir)
|
manager = MangaManager(save_dir)
|
||||||
|
|
||||||
@ -204,9 +204,9 @@ class MangaManager:
|
|||||||
logger.error(f"下载出错: {result['error']}")
|
logger.error(f"下载出错: {result['error']}")
|
||||||
|
|
||||||
# 全部下载完成
|
# 全部下载完成
|
||||||
if int(total_chapters) == int(success_chapters) and title != None and created_at != None:
|
if int(total_chapters) == int(success_chapters) and title != None and updated_at != None:
|
||||||
MangaUtils().add_manga(title, created_at=created_at)
|
MangaUtils().add_manga(title, updated_at=updated_at)
|
||||||
logger.info(f"全部完成 {title}, {created_at}")
|
logger.info(f"全部完成 {title}, {updated_at}")
|
||||||
except MangaException as e:
|
except MangaException as e:
|
||||||
logger.error(f"下载失败: {str(e)}")
|
logger.error(f"下载失败: {str(e)}")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
|||||||
52
test.py
52
test.py
@ -2,13 +2,14 @@ from src.common.naming import FileNaming
|
|||||||
from src.common.ComicInfo import ImageInfo
|
from src.common.ComicInfo import ImageInfo
|
||||||
from zipfile import ZipFile
|
from zipfile import ZipFile
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
import time
|
||||||
import os,hashlib
|
import os,hashlib
|
||||||
import xml.etree.ElementTree as ET
|
import xml.etree.ElementTree as ET
|
||||||
from PIL import Image
|
from PIL import Image
|
||||||
from io import BytesIO
|
from io import BytesIO
|
||||||
from tempfile import NamedTemporaryFile
|
from tempfile import NamedTemporaryFile
|
||||||
from xml.dom import minidom
|
from xml.dom import minidom
|
||||||
|
from src.common.ComicInfo import ComicInfoXml
|
||||||
|
|
||||||
class test:
|
class test:
|
||||||
|
|
||||||
@ -90,7 +91,7 @@ class comicInfo:
|
|||||||
# 定义需要提取的元数据字段(用户自定义的字段列表)
|
# 定义需要提取的元数据字段(用户自定义的字段列表)
|
||||||
metadata_fields = [
|
metadata_fields = [
|
||||||
"Title", "Series", "Number", "Summary", "Writer",
|
"Title", "Series", "Number", "Summary", "Writer",
|
||||||
"Genre", "PageCount", "AgeRating"
|
"Genre", "Tags", "PageCount", "AgeRating"
|
||||||
]
|
]
|
||||||
|
|
||||||
for field in metadata_fields:
|
for field in metadata_fields:
|
||||||
@ -156,12 +157,14 @@ class comicInfo:
|
|||||||
}
|
}
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"处理 CBZ 文件时出错: {e}")
|
print(f"处理 CBZ 文件时出错: {e}")
|
||||||
return None
|
raise exit(f"处理CBZ出错")
|
||||||
|
|
||||||
def generate_comic_info_xml(self, metadata, pages_info):
|
def generate_comic_info_xml(self, metadata, pages_info):
|
||||||
"""根据元数据和页面信息生成 ComicInfo.xml 内容"""
|
"""根据元数据和页面信息生成 ComicInfo.xml 内容"""
|
||||||
# 创建根节点
|
# 创建根节点
|
||||||
root = ET.Element("ComicInfo", xmlns="http://comicrack.cyolito.com/comicinfo")
|
root = ET.Element('ComicInfo')
|
||||||
|
root.set('xmlns:xsd', 'http://www.w3.org/2001/XMLSchema')
|
||||||
|
root.set('xmlns:xsi', 'http://www.w3.org/2001/XMLSchema-instance')
|
||||||
|
|
||||||
# 添加元数据字段
|
# 添加元数据字段
|
||||||
for field, value in metadata.items():
|
for field, value in metadata.items():
|
||||||
@ -235,17 +238,50 @@ class comicInfo:
|
|||||||
cbz_path (_type_): _description_
|
cbz_path (_type_): _description_
|
||||||
"""
|
"""
|
||||||
data = self.process_cbz(cbz_path)
|
data = self.process_cbz(cbz_path)
|
||||||
|
metadata = data["metadata"]
|
||||||
|
author = data["metadata"].get("Writer", "")
|
||||||
|
tags = data["metadata"].get("Tags", "")
|
||||||
|
|
||||||
|
(list_value, value) = [[], str(author).replace("&", " ")]
|
||||||
|
for val in set(str(value).split(" ")):
|
||||||
|
list_value.append(val)
|
||||||
|
author = FileNaming.chinese_file_name(",".join(list_value))
|
||||||
|
data["metadata"]["Writer"] = author
|
||||||
# 生成 XML 内容
|
# 生成 XML 内容
|
||||||
new_xml = self.generate_comic_info_xml(data["metadata"], data["pages"])
|
new_xml = self.generate_comic_info_xml(data["metadata"], data["pages"])
|
||||||
|
xml_file = "NewComicInfo.xml"
|
||||||
# 测试:保存 XML 到本地查看
|
# 测试:保存 XML 到本地查看
|
||||||
with open("NewComicInfo.xml", "w", encoding="utf-8") as f:
|
with open(xml_file, "w", encoding="utf-8") as f:
|
||||||
f.write(new_xml)
|
f.write(new_xml)
|
||||||
print("已生成 NewComicInfo.xml")
|
print(f"已生成 {xml_file}")
|
||||||
|
ComicInfoXml()._validate_xml_with_xsd_file(xml_file=xml_file ,xsd_file="src/assets/ComicInfo_2.1.xsd")
|
||||||
# 更新 CBZ 文件(示例路径,实际操作前请备份)
|
# 更新 CBZ 文件(示例路径,实际操作前请备份)
|
||||||
success = comicInfo().update_cbz_with_new_xml("example.cbz", new_xml, "example_updated.cbz")
|
success = comicInfo().update_cbz_with_new_xml(cbz_path, new_xml)
|
||||||
# if success:
|
# if success:
|
||||||
# print("CBZ 文件更新成功")
|
# print("CBZ 文件更新成功")
|
||||||
|
os.remove(xml_file)
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
# 清除3KB以下CBZ文件
|
# 清除3KB以下CBZ文件
|
||||||
test().clean_min_cbz()
|
# comicInfo().update_comicinfo_cbz("")
|
||||||
|
#cbz_path = "/Users/cc/Documents/Dev/WorkSpace/VSCodeProjects/NewComicDownloader/CBZ/rm_comic/福利女姊姊/第1话 福利女姊姊.CBZ"
|
||||||
|
|
||||||
|
dir_path = "/mnt/Comics/CBZ/rm_comic"
|
||||||
|
for dir in os.listdir(dir_path):
|
||||||
|
c_dir = os.path.join(dir_path, dir)
|
||||||
|
if os.path.isdir(c_dir):
|
||||||
|
files = list(FileNaming.get_filenames_optimized(c_dir, ext_filter=['.CBZ']))
|
||||||
|
for file in files:
|
||||||
|
#size = os.path.getsize(file)
|
||||||
|
# 获取文件状态信息
|
||||||
|
file_stat = os.stat(file)
|
||||||
|
# 获取文件的创建时间(仅在Linux/MacOS中可用)
|
||||||
|
create_time = time.localtime(file_stat.st_birthtime) # 注意:st_birthtime 在Linux/MacOS中可用,但不是所有系统都支持
|
||||||
|
# 格式化时间
|
||||||
|
formatted_time = time.strftime('%Y%m%d', create_time)
|
||||||
|
if int(formatted_time) < 20250204:
|
||||||
|
print(f"{file} 文件创建时间:", formatted_time)
|
||||||
|
comicInfo().update_comicinfo_cbz(file)
|
||||||
|
#if size < 3000:
|
||||||
|
# os.remove(file)
|
||||||
|
# print(f"已删除{file}")
|
||||||
Loading…
Reference in New Issue
Block a user