NewComicDownloader/src/common/utils.py
2025-07-13 02:21:00 +08:00

944 lines
38 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import aiohttp, json
import base64,hashlib,os,shutil,os.path,math,time
from PIL import Image
import logging,time,os,shutil,re,xmlschema
from pathlib import Path
from typing import List, Optional, Callable, Dict, Any
from src.common.naming import DirectoryNaming
from src.common.naming import FileNaming,PREFIX_SCRAMBLE
from src.config import DEFAULT_HEADERS, CONCURRENT_DOWNLOADS, TIMEOUT, RETRY_TIMES, CACHE_DIR, CACHE_IMAGE_DIR
from src.config import RETRIES, COMIC_INFO_NAME, PROXY_URL, RETRY_PROXY, RETRY_PROXY_TIMES, BASE_IMAGES_DIR
from src.common.exceptions import DownloadError
from src.common.item import ImageItem, MangaItem, MangaInfo
from zipfile import ZipFile, ZIP_DEFLATED
from src.common.logging import setup_logging
import logging
from tempfile import NamedTemporaryFile
from datetime import datetime
from filelock import FileLock
logger = setup_logging(__name__)
class Cache:
"""缓存类,用于存储和管理网页内容的缓存"""
def __init__(self, cache_dir: Path = CACHE_DIR, expiration_time: int = 3600):
self.cache_dir = cache_dir
self.expiration_time = expiration_time
self.cache_dir.mkdir(exist_ok=True) # 创建缓存目录
def _get_cache_file_path(self, url: str) -> Path:
"""根据 URL 生成缓存文件路径"""
filename = FileNaming.fix_file_name(str(url))
# 以网站 "/" 分离目录
parts = str(url).replace("https://", "").replace("http://", "").split("/") # 按照 "/" 分离 URL
subdir = parts[0] if len(parts) > 2 else "default" # 使用域名作为第一层子目录
hash_dir = hashlib.md5(str(url).encode()).hexdigest()
dir = self.cache_dir / subdir / hash_dir[0:2] / hash_dir[3:5] # 返回多级目录路径
dir.mkdir(parents=True, exist_ok=True)
return dir / filename
def get(self, url: str, type: str = "html") -> str:
"""从缓存中获取 HTML 内容"""
cache_file = self._get_cache_file_path(url)
if cache_file.exists():
# 检查缓存是否过期
if time.time() - cache_file.stat().st_mtime < self.expiration_time:
with open(cache_file, 'r', encoding='utf-8') as f:
return f.read()
elif type == "image":
with open(cache_file, 'rb') as f:
return f.read()
else:
cache_file.unlink() # 删除过期的缓存文件
return None
def get_image(self, url: str) -> bytes:
"""从缓存中获取图片"""
cache_file = self._get_cache_file_path(url)
if cache_file.exists():
# 验证下载的文件是否为有效的图片
if MangaDownloader()._is_valid_image(cache_file):
with open(cache_file, 'rb') as f:
return f.read()
else:
logger.error(f"图像已损坏: {cache_file}")
os.remove(cache_file)
return None
def set(self, url: str, html: str) -> None:
"""将 HTML 内容保存到缓存"""
cache_file = self._get_cache_file_path(url)
with open(cache_file, 'w', encoding='utf-8') as f:
f.write(html)
def set_image(self, url: str, image: bytes) -> None:
"""将图片保存到缓存"""
cache_file = self._get_cache_file_path(url)
with open(cache_file, 'wb') as f:
f.write(image)
class DownloadStatus:
"""下载状态跟踪类,用于记录下载进度"""
def __init__(self, total: int):
self.total = total
self.success = 0
self.failed = 0
self.current = 0
@property
def is_completed(self) -> bool:
"""检查下载是否完成"""
return self.current >= self.total
@property
def progress(self) -> float:
"""计算当前下载进度"""
return self.current / self.total if self.total > 0 else 0
class MangaDownloader:
"""漫画下载器类,负责下载漫画及其相关资源"""
def __init__(self, base_dir: Path = BASE_IMAGES_DIR):
self.connector = aiohttp.TCPConnector(limit_per_host=CONCURRENT_DOWNLOADS)
self.base_dir = Path(base_dir)
self.cache_dir = CACHE_IMAGE_DIR # 缓存目录
self.cache = Cache()
DirectoryNaming.ensure_dir(self.base_dir)
DirectoryNaming.ensure_dir(self.cache_dir) # 创建缓存目录
async def download_cover(self, manga_info: MangaInfo):
"""下载封面"""
cover_item = manga_info.cover
save_path = DirectoryNaming.manga_cover_dir(manga_info)
DirectoryNaming.ensure_dir(save_path.parent)
if os.path.exists(save_path):
if FileNaming().file_update_by_date(save_path, day=30):
"""需要更新刚删除原文件"""
os.remove(save_path)
else:
"""不更新刚返回"""
logger.debug(f"{save_path} 已是更新")
return
async with aiohttp.ClientSession(headers=DEFAULT_HEADERS, timeout=aiohttp.ClientTimeout(total=TIMEOUT, connect=TIMEOUT)) as session:
await self.download_image(session,str(cover_item.url), save_path)
async def download_chapter(
self,
manga_item: MangaItem,
semaphore: Optional[asyncio.Semaphore] = None,
status_callback: Optional[Callable[[DownloadStatus], None]] = None
) -> Dict[str, Any]:
"""
下载整个章节的图片
:param image_items: 要下载的图片项列表
:param chapter_name: 章节名称
:param manga_info: 漫画信息
:param semaphore: 限制并发下载的信号量
:param status_callback: 下载状态回调函数
:return: 下载结果统计字典
"""
manga_info = manga_item.info
chapter = manga_item.chapter
image_items = manga_item.chapter_images
if semaphore is None:
semaphore = asyncio.Semaphore(CONCURRENT_DOWNLOADS)
status = DownloadStatus(len(image_items))
failed_items = []
async with aiohttp.ClientSession(headers=DEFAULT_HEADERS, timeout=aiohttp.ClientTimeout(total=TIMEOUT, connect=TIMEOUT)) as session:
tasks = []
for image_item in image_items:
url = str(image_item.url)
save_path = DirectoryNaming.chapter_images_dir(manga_info, chapter, image_item.filename)
DirectoryNaming.ensure_dir(save_path.parent)
task = self._download_with_semaphore(semaphore, session, url, save_path, status, status_callback)
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
# 处理结果
for idx, result in enumerate(results):
if isinstance(result, Exception):
status.failed += 1
failed_items.append(image_items[idx])
logger.error(f"下载失败 {image_items[idx].url}: {str(result)}")
elif result:
status.success += 1
else:
status.failed += 1
failed_items.append(image_items[idx])
result = {
'chapter': chapter,
'total': len(image_items),
'success': status.success,
'failed': status.failed,
'failed_items': failed_items
}
logger.debug(f"章节 {chapter.title} 下载完成: {status.success}/{len(image_items)} 张图片成功下载")
return result
async def _download_with_semaphore(
self,
semaphore: asyncio.Semaphore,
session: aiohttp.ClientSession,
url: str,
save_path: Path,
status: DownloadStatus,
callback: Optional[Callable] = None
) -> bool:
async with semaphore:
result = await self.download_image(session, url, save_path)
status.current += 1
if callback:
callback(status)
return result
async def download_image(self, session: aiohttp.ClientSession, url: str, save_path: Path, retries: int = RETRIES, timeout: int = TIMEOUT, use_proxy: bool = RETRY_PROXY) -> bool:
"""下载单个图片,增加重试机制、超时等待和文件缓存机制"""
#file_path = FileNaming.getFileScrambleImageSave(save_path)
#if os.path.exists(file_path): # 检查文件是否已存在
# if not FileNaming().file_update_by_date(file_path, remove=True):
# logger.info(f"文件已存在,跳过下载: {file_path}")
# return True
# 从缓存中获取图片
cached_images = self.cache.get_image(url)
if cached_images:
with open(save_path, 'wb') as f:
f.write(cached_images)
return True
for attempt in range(retries):
try:
timeout_obj = aiohttp.ClientTimeout(total=timeout) # 设置超时
# 如果使用代理,设置代理 URL
if attempt > RETRY_PROXY_TIMES and use_proxy:
logger.info(f"使用代理: {PROXY_URL}")
session_get = session.get(url, timeout=timeout_obj, proxy=PROXY_URL)
else:
session_get = session.get(url, timeout=timeout_obj)
async with session_get as response:
if response.status == 200:
with open(str(save_path)+".downloads", 'wb') as f:
f.write(await response.read())
# 验证下载的文件是否为有效的图片
if self._is_valid_image(str(save_path)+".downloads"):
logger.debug(f"成功下载: {url}")
shutil.move(str(save_path)+".downloads", save_path)
self.cache.set_image(url, await response.read())
return True
else:
logger.error(f"下载的文件无效: {save_path}")
return False
else:
logger.error(f"下载失败: {url},状态码: {response.status}")
return False
except asyncio.TimeoutError:
logger.error(f"下载超时: {url},尝试次数: {attempt + 1}")
except Exception as e:
logger.error(f"下载图片时出错: {url},错误: {str(e)}")
if attempt < retries - 1:
logger.info(f"重试下载: {url},尝试次数: {attempt + 2}")
await asyncio.sleep(1) # 等待一段时间再重试
return False
def _is_valid_image(self, file_path: Path) -> bool:
"""验证文件是否为有效的图片"""
try:
from PIL import Image
with Image.open(file_path) as img:
img.verify() # 验证图片
return True
except Exception as e:
logger.error(f"图片验证失败: {file_path},错误: {str(e)}")
return False
class CBZUtils:
def __init__(self, cbz_path: Path = None):
if not cbz_path is None:
self.cbz_path = cbz_path
def get_page_count(self):
return self._comic_info_xml_page_count(self.cbz_path)
def _comic_info_xml_page_count(self, zip_file: Path):
"""获取 ComicInfo.xml 文件中的 <PageCount> 标签值"""
# 打开ZIP文件
with ZipFile(str(zip_file), 'r') as z:
try:
# 假设ZIP中的文件名是'text.txt'
with z.open('ComicInfo.xml', 'r') as file:
# 从文件流中解析 XML 数据
file_string = file.read().decode("utf-8")
# 使用正则表达式提取 <PageCount> 标签中的值
match = re.search(r"<PageCount>(\d+)</PageCount>", file_string)
if match:
page_count = match.group(1)
logger.debug(f"zip_file={zip_file} PageCount: {page_count}")
return page_count
except Exception as e:
raise exit(f"获取 ComicInfo.xml 文件中的 <PageCount> 标签值失败: {zip_file},错误: {str(e)}")
def _check_zip_file(self, zip_file_path: Path):
"""检查 ZIP 文件是否包含图片"""
result = False
is_comic_info = False
if not os.path.exists(zip_file_path):
logger.debug(f"ZIP 文件不存在: {zip_file_path}")
return False
try:
with ZipFile(zip_file_path, 'r') as zip_file:
file_list = zip_file.namelist()
result = any(file_name.endswith('.jpg') for file_name in file_list)
is_comic_info = any(file_name == COMIC_INFO_NAME for file_name in file_list)
if is_comic_info:
page_count = self._comic_info_xml_page_count(zip_file_path)
if len(file_list) == int(page_count) + 1:
logger.debug(f"ZIP 文件 {zip_file_path} 验证成功")
result = True
else:
logger.error(f"ZIP 文件 {zip_file_path} 验证失败,文件数量与 ComicInfo.xml 中的 <PageCount> 不一致")
os.remove(zip_file_path)
if not result and os.path.exists(zip_file_path):
logger.error("ZIP 文件中没有图片")
os.remove(zip_file_path)
if not is_comic_info:
logger.error("ZIP 文件中没有 ComicInfo.xml")
os.remove(zip_file_path)
except FileNotFoundError:
logger.error(f"ZIP 文件不存在: {zip_file_path}")
except Exception as e:
logger.error(f"检查 ZIP 文件失败: {zip_file_path},错误: {str(e)}")
if os.path.exists(zip_file_path):
os.remove(zip_file_path)
return result
def _zip_compression(cls, source_dir=None, target_file=None, remove=True):
cls._check_zip_file(target_file)
if not os.path.exists(source_dir):
raise FileNotFoundError(f"打包目标目录不存在: {source_dir}")
# 检查目录中是否存在 .jpg 文件
if not any(file_name.endswith('.jpg') for file_name in os.listdir(source_dir)):
logger.error(f"打包目标目录中不存在图片: {source_dir}")
return False
target_dir = os.path.dirname(target_file)
if not os.path.exists(target_dir): os.makedirs(target_dir)
if not os.path.exists(target_file) and source_dir is not None:
try:
count = 0
filenames = sorted(list(source_dir.glob("*.jpg")) + list(source_dir.glob(COMIC_INFO_NAME)), key=lambda f: f.name) # 对文件名进行排序
with ZipFile(str(target_file), mode='w') as cbz:
for file in filenames:
# 假设图片格式为 JPG 或 ComicInfo.xml
count += 1
logger.debug("打包中:" + str(count) + "/" + str(len(filenames)), os.path.join(source_dir, file.name))
cbz.write(file, arcname=file.name)
cbz.close()
logger.debug(f"打包完成:{target_file}{count} 个文件")
except Exception as e:
logger.error(f"打包失败: {target_file},错误: {str(e)}")
if os.path.exists(target_file):
os.remove(target_file)
raise e
return cls._check_zip_file(target_file)
def _image_deScrambleByPath(self, chapter_dir: Path):
if os.path.exists(chapter_dir):
dirs = os.listdir(chapter_dir)
for file in dirs:
if file.startswith(PREFIX_SCRAMBLE):
try:
ImageUtils.deScrambleImagesByPath(os.path.join(chapter_dir,file))
except Exception as e:
print(f"删除 {file} 发生错误 {e},已跳过")
return False
def create_cbz(self, chapter_dir: Path, clear_chapter: bool = False):
if os.path.exists(chapter_dir):
dirs = os.listdir(chapter_dir)
for file in dirs:
if file.startswith(PREFIX_SCRAMBLE):
try:
ImageUtils.deScrambleImagesByPath(os.path.join(chapter_dir,file))
except Exception as e:
logger.error(f"删除 {file} 发生错误 {e},已跳过")
return False
if self._zip_compression(source_dir=chapter_dir, target_file=self.cbz_path, remove=False):
logger.info(f"章节 {chapter_dir.name} 打包完成: {self.cbz_path}")
time.sleep(0.5)
if clear_chapter:
try:
shutil.rmtree(chapter_dir)
logger.debug(f"{chapter_dir} 章节原图片已删除")
except:
raise exit(f"{chapter_dir} 章节删除错误")
else:
raise exit(f"章节 {chapter_dir.name} 打包失败: {self.cbz_path}")
def update_zip_file(self,zip_path: str, update_files: dict):
"""
不整体解压的情况下更新 ZIP 中的文件
参数:
- zip_path: ZIP文件路径
- update_files: 需更新的文件字典 {内部路径: 新文件路径或bytes}
示例:
update_zip_file("data.zip", {"config.json": "new_config.json"})
"""
# 创建临时文件
temp_dir = os.path.dirname(zip_path)
with NamedTemporaryFile(dir=temp_dir, delete=False) as tmp_file:
temp_zip_path = tmp_file.name
try:
# 读取原始 ZIP 并创建新 ZIP
with ZipFile(zip_path, 'r') as orig_zip, \
ZipFile(temp_zip_path, 'w', ZIP_DEFLATED) as new_zip:
# 遍历原始 ZIP 中的文件
for orig_info in orig_zip.infolist():
file_name = orig_info.filename
if file_name in update_files:
# 替换目标文件
new_data = update_files[file_name]
if isinstance(new_data, bytes):
new_zip.writestr(file_name, new_data)
else:
new_zip.write(new_data, file_name)
# 保留原始时间戳
new_info = new_zip.getinfo(file_name)
new_info.date_time = orig_info.date_time
else:
# 复制未修改文件
with orig_zip.open(orig_info) as orig_file:
new_zip.writestr(orig_info, orig_file.read())
# 替换原文件
shutil.move(temp_zip_path, zip_path)
finally:
if os.path.exists(temp_zip_path):
os.remove(temp_zip_path)
# 使用示例 ------------------------------
#if __name__ == "__main__":
# 示例1用本地文件替换 ZIP 中的文件
# update_zip_file("archive.zip", {
# "docs/readme.txt": "new_readme.txt" # 本地文件路径
# })
# # 示例2直接写入字节数据
# new_config = b'{"version": 2.0, "active": true}'
# update_zip_file("data.zip", {
# "config.json": new_config # 字节数据
# })
def _clean_old_cbz(self, cbz_path):
m_time = datetime.fromtimestamp(os.path.getmtime(cbz_path))
str_strftime = '%Y%m%d'
zip_time = m_time.strftime(str_strftime)
with ZipFile(cbz_path, 'r') as zip_ref:
old_img = 0
for file_info in zip_ref.infolist():
# 获取日期时间信息,格式为 (year, month, day, hour, minute, second)
date_time = file_info.date_time
# 将日期时间元组转换为datetime对象
dt = datetime(*date_time)
# 格式化输出日期时间例如YYYY-MM-DD HH:MM:SS
file_date_time = dt.strftime(str_strftime)
# 一周内的图片跳过
if int(zip_time) - int(file_date_time) > 7:
#print(f"Clear Filename: {file_info.filename}, zip: {cbz_path}")
old_img += 1
if old_img > 0:
#os.remove(cbz_path)
print(f"remove cbz {cbz_path}")
def update_cbz_with_new_xml(self, cbz_path, new_xml_content, output_path=None):
"""将新生成的 ComicInfo.xml 更新到 CBZ 文件中"""
try:
# 默认输出路径为原文件路径(覆盖原文件)
if output_path is None:
output_path = cbz_path
# 创建临时文件处理覆盖操作
with NamedTemporaryFile(delete=False) as tmp:
tmp.close()
shutil.move(cbz_path, tmp.name)
# 读取原文件并替换 ComicInfo.xml
with ZipFile(tmp.name, 'r') as source_zip:
with ZipFile(output_path, 'w') as new_zip:
# 复制原文件(跳过旧 XML
for item in source_zip.infolist():
if item.filename.lower() != 'comicinfo.xml':
new_zip.writestr(item, source_zip.read(item.filename))
# 添加新 XML
new_zip.writestr("ComicInfo.xml", new_xml_content.encode('utf-8'))
os.remove(tmp.name) # 清理临时文件
return True
except Exception as e:
print(f"更新 CBZ 文件失败: {e}")
if os.path.exists(tmp.name):
shutil.move(tmp.name, cbz_path) # 恢复备份
raise exit(f"更新失败")
class ImageUtils:
@classmethod
def descramble_images_by_dir(cls, chapter_dir):
if os.path.isfile(chapter_dir):
chapter_dir = os.path.dirname(chapter_dir)
scramble_count = 0
if os.path.exists(chapter_dir): #获取章节图片路径
while PREFIX_SCRAMBLE in os.listdir(chapter_dir):
for img in os.listdir(chapter_dir):
if img.startswith(PREFIX_SCRAMBLE):
cls.encode_scramble_image(os.path.join(chapter_dir, img))
scramble_count += 1
logging.debug(f"{PREFIX_SCRAMBLE} {scramble_count}")
return scramble_count
@classmethod
def deScrambleImagesByPath(cls, img_path, img_save=None):
if os.path.basename(img_path).\
startswith(PREFIX_SCRAMBLE) and os.path.exists(img_path):
img_path = cls.encode_scramble_image(img_path, img_save)
return img_path
@classmethod
def encodeImage(cls,str_en):
#print("en",str_en)
enc = base64.b64decode(str_en)
#print("解密:",enc)
m = hashlib.md5()
m.update(enc)
md5 = m.digest()
d = md5[-1]
#print(md5)
try:
blocks = d % 10 + 5
except:
blocks = 0 %10 + 5
#print("blocks=",blocks)
return blocks
@classmethod
def scrambleImage(cls,file_path):
#检测到未下载完的图像 直接返回None
if str(file_path).endswith(".downloads"):
os.remove(file_path)
return None
file_str = str(file_path).split("=")
#10_29.jpg
base_dir = file_str[0].replace("scramble","")
base_name = file_str[-1]
base_fn = base_name.split("_")
save_name = base_fn[1]
save_name_delesu = save_name.split(".")[0]
blocks = int(base_fn[0])
save_file_path = os.path.join(base_dir,save_name)
print("sva",save_file_path)
if os.path.exists(save_file_path):
logger.debug("图片已解密,已跳过:", save_file_path)
return None
image_su = str(file_path).split(".")[-1]
try:
img = Image.open(file_path)
except:
print(f"error Image: {file_path}")
width = img.width
height = img.height
#blocks = cls.encodeImage(enStr)
print("blocks=",blocks)
block_height = int(height / blocks)
block_width = int(width / blocks)
print("blockHeight=",block_height)
suffix = str(file_path).split(".")[-1]
split_path = os.path.join(base_dir,save_name_delesu+"split")
if image_su == "downloads":
return None
is_split = cls.splitimage(file_path,blocks,1,split_path)
if is_split != None:
cls.image_compose(split_path,blocks,1,save_file_path,block_height,width)
else:
if os.path.exists(split_path):
shutil.rmtree(split_path)
if os.path.exists(file_path):
shutil.move(file_path, save_file_path)
#完成后清空
return file_path
@classmethod
def splitimage(cls,src,rownum,colnum,dstpath):
img=Image.open(src)
w,h=img.size
if rownum<= h and colnum<=w:
s=os.path.split(src)
if dstpath=='':
dstpath = s[0]
if not os.path.exists(dstpath):
os.makedirs(dstpath)
fn=s[1].split('.')
basename=fn[0]
ext=fn[-1]
num=0
rowheight=h//rownum
colwidth=w//colnum
for r in range(rownum):
for c in range(colnum):
box=(c*colwidth,r*rowheight,(c+1)*colwidth,(r+1)*rowheight)
count_image = "{:0>3d}".format(num)
file_path = os.path.join(dstpath,str(count_image)+'.'+ext)
print("file_path=",file_path)
img.crop(box).save(file_path)
num=num+1
return "成功"
else:
print('不数!')
return None
@classmethod
def image_compose(cls,src,row,column,save_path,image_height,image_width):
image_size = image_height
#image_height = 376
#image_width = 720
images_format = ['.png','.jpg']
#image_names = [name for name in os.listdir(src) for item in images_format if
# os.path.splitext(name)[1] == item][::-1]
img_list=os.listdir(src)
img_list.sort()
img_list.sort(key=lambda x: int(x[:-4]))
##文件名按数字排序
img_nums=len(img_list)
image_names = []
for i in range(img_nums):
img_name=os.path.join(src,img_list[i])
image_names.append(img_name)
#使用倒序
image_names = image_names[::-1]
# 简单的对于参数的设定和实际图片集的大小进行数量判断
if len(image_names) < row * column:
raise ValueError("合成图片的参数和要求的数量不能匹配!")
to_image = Image.new('RGB', (column * image_width, row * image_height)) #创建一个新图
# 循环遍历,把每张图片按顺序粘贴到对应位置上
for y in range(1, row + 1):
for x in range(1, column + 1):
#1 * (row=1 -1) col=1 -1
image_path = image_names[column * (y - 1) + x - 1]
print("split_image=",image_path)
from_image = Image.open(image_path)
#保持原图片大小
#.resize(
# (image_size, image_size),Image.ANTIALIAS)
to_image.paste(from_image, ((x - 1) * image_size, (y - 1) * image_size))
from_image.close()
to_image.save(save_path)
print("图片合并完成:", save_path)
shutil.rmtree(src)
# 保存新图
@classmethod
def getScrambleImage(cls,path):
scramble_file_cache = cls.scrambleImage(path)
if scramble_file_cache != None and os.path.exists(scramble_file_cache): os.remove(scramble_file_cache)
@classmethod
def encode_scramble_image(cls, img_path, img_save=None):
if not os.path.exists(img_path):
return
image = Image.open(img_path)
w, h = image.size
#image.show()
file_str = str(img_path).split("=")
#10_29.jpg
base_fn = file_str[-1].split("_")
blocks = int(base_fn[0])
if img_save == None:
save_path = FileNaming.getFileScrambleImageSave(img_path)
else: save_path = img_save
# print(type(aid),type(img_name))
if blocks:
s = blocks # 随机值
# print(s)
l = h % s # 切割最后多余的值
box_list = []
hz = 0
for i in range(s):
c = math.floor(h / s)
g = i * c
hz += c
h2 = h - c * (i + 1) - l
if i == 0:
c += l;hz += l
else:
g += l
box_list.append((0, h2, w, h - g))
# print(box_list,len(box_list))
item_width = w
# box_list.reverse() #还原切图可以倒序列表
# print(box_list, len(box_list))
newh = 0
image_list = [image.crop(box) for box in box_list]
# print(box_list)
newimage = Image.new("RGB", (w, h))
for image in image_list:
# image.show()
b_w, b_h = image.size
newimage.paste(image, (0, newh))
newh += b_h
newimage.save(save_path)
logging.debug(f"解密成功 {save_path}")
if os.path.exists(img_path):
os.remove(img_path)
logging.debug(f"remove {img_path}")
return save_path
class MangaUtils:
"""漫画信息管理系统"""
def __init__(self, file_path: str = "mangas.json", project = None):
if project is None:
self.file_path = Path(BASE_IMAGES_DIR, file_path)
else:
self.file_path = Path(BASE_IMAGES_DIR, project, file_path)
self.lock = FileLock(str(self.file_path) + ".lock")
self.data: List[Dict] = []
self._index: Dict[str, Dict] = {} # 加速查找的索引
# 初始化时自动加载数据
self.load_data()
def _build_index(self):
"""构建内存索引"""
self._index = {manga['name']: manga for manga in self.data}
def load_data(self):
"""从文件加载数据"""
if not self.file_path.exists():
self.data = []
self._build_index()
return
try:
with self.lock:
with open(self.file_path, 'r', encoding='utf-8') as f:
self.data = json.load(f)
# 转换字符串时间为datetime对象
#for manga in self.data:
# manga['created_at'] = datetime.fromisoformat(manga['created_at'])
self._build_index()
except (json.JSONDecodeError, FileNotFoundError):
self.data = []
self._build_index()
def _save_data(self):
"""原子化保存数据"""
temp_path = self.file_path.with_suffix(".tmp")
# 转换datetime为字符串
save_data = []
for manga in self.data:
updated_at = manga["updated_at"]
last_updated = manga["last_updated"]
if isinstance(updated_at, datetime):
str_strftime = '%Y%m%d'
updated_at = updated_at.strftime(str_strftime)
if isinstance(last_updated, datetime):
str_strftime = '%Y%m%d'
last_updated = last_updated.strftime(str_strftime)
save_data.append({"name" : manga["name"] , "updated_at" : updated_at , "last_updated" : last_updated})
with self.lock:
with open(temp_path, 'w', encoding='utf-8') as f:
json.dump(save_data, f, indent=2, ensure_ascii=False)
temp_path.replace(self.file_path)
def add_manga(self, name: str, updated_at: str = None) -> bool:
"""添加新漫画"""
if not self.validate_name(name):
raise ValueError("无效的漫画名称")
if name in self._index:
self.delete_manga(name)
str_strftime = '%Y%m%d'
now_time = datetime.now()
if isinstance(now_time , datetime):
now_time = now_time.strftime(str_strftime)
new_manga = {
"name": name.strip(),
"updated_at": updated_at,
"last_updated": now_time
}
self.data.append(new_manga)
self._index[name] = new_manga
self._save_data()
return True
def update_manga(self, old_name: str, new_name: str) -> bool:
"""更新漫画名称"""
if not self.validate_name(new_name):
raise ValueError("无效的新名称")
manga = self._index.get(old_name)
if not manga:
return False
# 检查新名称是否已存在
if new_name in self._index and new_name != old_name:
return False
# 更新数据
manga['name'] = new_name.strip()
del self._index[old_name]
self._index[new_name] = manga
self._save_data()
return True
def delete_manga(self, name: str) -> bool:
"""删除漫画"""
manga = self._index.get(name)
if not manga:
return False
self.data = [m for m in self.data if m['name'] != name]
del self._index[name]
self._save_data()
return True
def search_manga(self, name: str) -> Optional[Dict]:
"""精确查找漫画"""
return self._index.get(name)
def list_mangas(self, sort_by: str = "name") -> List[Dict]:
"""列出漫画(支持排序)"""
if sort_by == "name":
return sorted(self.data, key=lambda x: x['name'])
elif sort_by == "date":
return sorted(self.data, key=lambda x: x['updated_at'])
return self.data.copy()
def validate_name(self, name: str) -> bool:
"""验证漫画名称有效性"""
name = name.strip()
return 2 <= len(name) <= 50 and name not in ['', 'undefined']
# ---------- 高级功能 ----------
def bulk_import(self, mangas: List[Dict]):
"""批量导入漫画"""
for manga in mangas:
if self.validate_name(manga["name"]):
self.add_manga(manga["name"], manga.get("updated_at"))
def find_duplicates(self) -> List[str]:
"""查找可能的重复条目(简单版本)"""
seen = set()
duplicates = []
for manga in self.data:
lower_name = manga["name"].lower()
if lower_name in seen:
duplicates.append(manga["name"])
else:
seen.add(lower_name)
return duplicates
def cleanup_data(self):
"""数据清理:删除无效条目"""
original_count = len(self.data)
self.data = [
m for m in self.data
if self.validate_name(m["name"])
]
if len(self.data) != original_count:
self._build_index()
self._save_data()
class KomgaAPI():
import requests
from requests.auth import HTTPBasicAuth
# 配置信息
KOMGA_URL = "https://komga.caiwenxiu.cn" # 替换为你的Komga地址
USERNAME = "caiwenxiu0806@163.com" # 管理员邮箱
PASSWORD = "cwx@komga"
def search_series_id(self, search_name):
"""
搜索漫画系列
:param search_name: 漫画名称
:return: 返回搜索结果
"""
response = self.requests.get(
f"{self.KOMGA_URL}/api/v1/series",
params={"search": search_name},
auth=self.HTTPBasicAuth(self.USERNAME, self.PASSWORD)
)
# 解析结果
if response.status_code == 200:
series_list = response.json()["content"]
for series in series_list:
if series['metadata']['title'] == search_name:
# 输出匹配的漫画信息
print(f"找到漫画: {series['metadata']['title']}, ID: {series['id']}")
return series['id']
break
print(f"类似标题: {series['metadata']['title']}, ID: {series['id']}")
else:
print(f"搜索失败: {response.status_code}")
def update_series_status(self, series_id, status):
"""更新漫画系列状态"""
# 构建 API 端点
endpoint = f"{self.KOMGA_URL}/api/v1/series/{series_id}/metadata"
# 准备请求数据
payload = {
"status": status
}
# 发送 PATCH 请求
response = self.requests.patch(
endpoint,
json=payload,
auth=self.HTTPBasicAuth(self.USERNAME, self.PASSWORD),
headers={"Content-Type": "application/json"}
)
# 检查响应
if response.status_code == 200 or 204:
print(f"成功将系列 {series_id} 状态更新为 '{status}'")
return True
else:
print(f"更新失败: {response.status_code} - {response.text}")
return False
def update_series_ended(self, series_name, series_status="ENDED"):
series_id = self.search_series_id(series_name)
"""将漫画系列状态更新为已完结"""
return self.update_series_status(series_id, series_status)