This commit is contained in:
caiwx86 2025-02-04 20:25:58 +08:00
parent d8be5a834d
commit 6bcaa91a2c
3 changed files with 198 additions and 6 deletions

View File

@ -6,6 +6,7 @@ from src.config import BASE_IMAGES_DIR,CBZ_DIR,OLD_CBZ_DIR
from src.common.item import MangaInfo,MangaItem
from typing import Generator, Union, List, Optional
from datetime import datetime
from opencc import OpenCC
PREFIX_SCRAMBLE = "scramble="
@ -237,6 +238,31 @@ class FileNaming:
return _scandir(folder_path)
@classmethod
def chinese_convert(cls, text,convert='t2s'): return OpenCC(convert).convert(str(text))
#处理成符合规定的文件名
@classmethod
def fix_file_name(cls, filename, replace=None):
if not isinstance(filename, str):
return filename
in_tab = r'[?*/\|.:><]'
str_replace = ""
if replace is not None:
str_replace = replace
filename = re.sub(in_tab, str_replace, filename)
count = 1
while True:
str_file = filename[0-count]
if str_file == " ":
count += 1
else:
filename = filename[0:len(filename)+1-count]
break
return filename
@classmethod
def chinese_file_name(cls, name): return cls.fix_file_name(cls.chinese_convert(name))
class NamingStrategy:
"""命名策略集合类"""

View File

@ -1,5 +1,5 @@
import asyncio
import aiohttp
import aiohttp, json
import base64,hashlib,os,shutil,os.path,math
from PIL import Image
import logging,time,os,shutil,re,xmlschema
@ -16,6 +16,7 @@ from src.common.logging import setup_logging
import logging
from tempfile import NamedTemporaryFile
from datetime import datetime
from filelock import FileLock
logger = setup_logging(__name__)
@ -681,3 +682,161 @@ class ImageUtils:
os.remove(img_path)
logging.debug(f"remove {img_path}")
return save_path
class MangaUtils:
"""漫画信息管理系统"""
def __init__(self, file_path: str = "mangas.json", project = None):
if project is None:
self.file_path = Path(BASE_IMAGES_DIR, file_path)
else:
self.file_path = Path(BASE_IMAGES_DIR, project, file_path)
self.lock = FileLock(str(self.file_path) + ".lock")
self.data: List[Dict] = []
self._index: Dict[str, Dict] = {} # 加速查找的索引
# 初始化时自动加载数据
self.load_data()
def _build_index(self):
"""构建内存索引"""
self._index = {manga['name']: manga for manga in self.data}
def load_data(self):
"""从文件加载数据"""
if not self.file_path.exists():
self.data = []
self._build_index()
return
try:
with self.lock:
with open(self.file_path, 'r', encoding='utf-8') as f:
self.data = json.load(f)
# 转换字符串时间为datetime对象
#for manga in self.data:
# manga['created_at'] = datetime.fromisoformat(manga['created_at'])
self._build_index()
except (json.JSONDecodeError, FileNotFoundError):
self.data = []
self._build_index()
def _save_data(self):
"""原子化保存数据"""
temp_path = self.file_path.with_suffix(".tmp")
# 转换datetime为字符串
save_data = []
for manga in self.data:
created_at = manga["created_at"]
if isinstance(created_at, datetime):
str_strftime = '%Y%m%d'
created_at = created_at.strftime(str_strftime)
save_data.append({"name" : manga["name"] , "created_at" : created_at})
with self.lock:
with open(temp_path, 'w', encoding='utf-8') as f:
json.dump(save_data, f, indent=2, ensure_ascii=False)
temp_path.replace(self.file_path)
def add_manga(self, name: str, created_at: str = None) -> bool:
"""添加新漫画"""
if not self.validate_name(name):
raise ValueError("无效的漫画名称")
if name in self._index:
self.delete_manga(name)
str_strftime = '%Y%m%d'
time = created_at or datetime.now()
if isinstance(time , datetime):
time = time.strftime(str_strftime)
new_manga = {
"name": name.strip(),
"created_at": time
}
self.data.append(new_manga)
self._index[name] = new_manga
self._save_data()
return True
def update_manga(self, old_name: str, new_name: str) -> bool:
"""更新漫画名称"""
if not self.validate_name(new_name):
raise ValueError("无效的新名称")
manga = self._index.get(old_name)
if not manga:
return False
# 检查新名称是否已存在
if new_name in self._index and new_name != old_name:
return False
# 更新数据
manga['name'] = new_name.strip()
del self._index[old_name]
self._index[new_name] = manga
self._save_data()
return True
def delete_manga(self, name: str) -> bool:
"""删除漫画"""
manga = self._index.get(name)
if not manga:
return False
self.data = [m for m in self.data if m['name'] != name]
del self._index[name]
self._save_data()
return True
def search_manga(self, name: str) -> Optional[Dict]:
"""精确查找漫画"""
return self._index.get(name)
def list_mangas(self, sort_by: str = "name") -> List[Dict]:
"""列出漫画(支持排序)"""
if sort_by == "name":
return sorted(self.data, key=lambda x: x['name'])
elif sort_by == "date":
return sorted(self.data, key=lambda x: x['created_at'])
return self.data.copy()
def validate_name(self, name: str) -> bool:
"""验证漫画名称有效性"""
name = name.strip()
return 2 <= len(name) <= 50 and name not in ['', 'undefined']
# ---------- 高级功能 ----------
def bulk_import(self, mangas: List[Dict]):
"""批量导入漫画"""
for manga in mangas:
if self.validate_name(manga["name"]):
self.add_manga(manga["name"], manga.get("created_at"))
def find_duplicates(self) -> List[str]:
"""查找可能的重复条目(简单版本)"""
seen = set()
duplicates = []
for manga in self.data:
lower_name = manga["name"].lower()
if lower_name in seen:
duplicates.append(manga["name"])
else:
seen.add(lower_name)
return duplicates
def cleanup_data(self):
"""数据清理:删除无效条目"""
original_count = len(self.data)
self.data = [
m for m in self.data
if self.validate_name(m["name"])
]
if len(self.data) != original_count:
self._build_index()
self._save_data()

View File

@ -4,7 +4,7 @@ import logging
from src.config import BASE_IMAGES_DIR
from src.sites.base import BaseSite
from src.sites.configs.rouman import RoumanSite
from src.common.utils import MangaDownloader, CBZUtils
from src.common.utils import MangaDownloader, CBZUtils, MangaUtils
from src.common.naming import DirectoryNaming, FileNaming
from src.common.exceptions import MangaException
from src.common.item import MangaItem, MangaInfo
@ -73,6 +73,9 @@ class MangaManager:
if not chapter.status == "downloaded":
total += 1
total_chapters = total
# 找到0个章节则证明已全部下载跳过
if total == 0: MangaUtils().add_manga(manga_name)
else: MangaUtils().delete_manga(manga_name)
logger.info(f"找到 {total_chapters} 个章节")
manga_item.chapters.extend(chapters) # 添加章节到 MangaItem
yield {
@ -153,10 +156,14 @@ class MangaManager:
async with list_site_handler() as site:
manga_list = await site.get_manga_list(manga_url)
for title,url in zip(manga_list.title, manga_list.url):
print(title,url)
logger.info(f"开始下载 漫画: {title}")
logger.info(f"{url}")
await self.download_manga(str(url))
title = FileNaming.chinese_file_name(title)
save_manga = MangaUtils().search_manga(title)
if save_manga != None:
logger.info(f"{save_manga} 已存在")
else:
logger.info(f"开始下载 漫画: {title}")
logger.info(f"{url}")
await self.download_manga(str(url))
@classmethod
async def download_manga(cls, url: str, save_dir: Path = BASE_IMAGES_DIR):