commit 03c578f183d0ed24625b7b415a253151917e5b05 Author: caiwx86 Date: Tue Feb 4 01:12:15 2025 +0800 inital commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..8033124 --- /dev/null +++ b/.gitignore @@ -0,0 +1,8 @@ +.scrapy/* +.vscode/* +.cache/* +.DS_Store +CBZ/* +output/* +downloads/* +/**/__pycache__ \ No newline at end of file diff --git a/run.py b/run.py new file mode 100644 index 0000000..07d2f60 --- /dev/null +++ b/run.py @@ -0,0 +1,19 @@ +import asyncio +from pathlib import Path +from src.sites.manager import MangaManager +from src.common.logging import setup_logging + +logger = setup_logging(__name__) + +async def main(): + # 配置下载参数 + #manga_url = "https://rouman5.com/books/cm693tf2z0170dr07ve0hpa7s" + manga_list_url = "https://rouman5.com/books?continued=true" + + # 开始下载 + #await MangaManager().download_manga(manga_url) + for i in range(0,70): + await MangaManager().download_list_manga(f"{manga_list_url}&page={i}") + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/src/assets/ComicInfo_2.1.xsd b/src/assets/ComicInfo_2.1.xsd new file mode 100644 index 0000000..a2dd90f --- /dev/null +++ b/src/assets/ComicInfo_2.1.xsd @@ -0,0 +1,127 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/src/common/ComicInfo.py b/src/common/ComicInfo.py new file mode 100644 index 0000000..d608280 --- /dev/null +++ b/src/common/ComicInfo.py @@ -0,0 +1,407 @@ +import xml.etree.ElementTree as ET +from xml.dom import minidom +from typing import List +import os +from lxml import etree +from src.config import XSD_FILE +from src.common.logging import setup_logging +import logging +from zipfile import ZipFile +from pathlib import Path +import re +import requests +from urllib.parse import urlparse +from PIL import Image +from concurrent.futures import ThreadPoolExecutor +import hashlib +from io import BytesIO + + +logger = setup_logging(__name__) + +class ImageInfo: + from src.config import BASE_DIR + def _image_path(self, comicinfo, filename): + """生成章节目录""" + if filename: + return os.path.join(self.BASE_DIR,"images",f"{comicinfo.name}", comicinfo.chapter, filename) + + def get_image_size(self, image_path: str, human_readable: bool = False) -> str: + """ + 获取图片的字节大小(支持本地路径和网络URL) + + 参数: + - image_path: 图片路径或URL + - human_readable: 是否返回可读格式(如 KB/MB) + + 返回: + - 字符串形式的字节大小(或可读格式) + + 示例: + >>> get_image_size("photo.jpg") + '245.76 KB' + >>> get_image_size("http://example.com/image.png", human_readable=False) + '1024000' + """ + def convert_size(size_bytes: int) -> str: + """将字节转换为可读格式""" + for unit in ['B', 'KB', 'MB', 'GB']: + if size_bytes < 1024.0: + return f"{size_bytes:.2f} {unit}" + size_bytes /= 1024.0 + return f"{size_bytes:.2f} TB" + + try: + # 判断是否为网络资源 + if urlparse(str(image_path)).scheme in ('http', 'https'): + # 方法1:通过HEAD请求获取大小(可能不准确) + response = requests.head(image_path, timeout=5) + if 'Content-Length' in response.headers: + size = int(response.headers['Content-Length']) + + # 方法2:完整下载获取准确大小(推荐) + else: + response = requests.get(image_path, stream=True, timeout=10) + response.raise_for_status() + size = len(response.content) + else: + # 本地文件处理 + file_path = Path(image_path) + if not file_path.exists(): + raise FileNotFoundError(f"文件不存在: {image_path}") + size = os.path.getsize(file_path) + + return convert_size(size) if human_readable else str(size) + + except requests.exceptions.RequestException as e: + raise ValueError(f"网络请求失败: {str(e)}") + except Exception as e: + raise RuntimeError(f"获取大小失败: {str(e)}") + + def get_image_hash_advanced(self, + source: str, + hash_type: str = "md5", + is_url: bool = False + ) -> str: + """ + 高级版图片哈希生成(支持多种输入源) + + 参数: + - source: 输入源(文件路径/URL/二进制数据/BytesIO) + - hash_type: 哈希类型(md5/sha1/sha256) + - is_url: 当 source 为 URL 字符串时需设置为 True + + 返回: + - 十六进制字符串形式的哈希值 + """ + hash_type = hash_type.lower() + valid_hashes = ["md5", "sha1", "sha256"] + if hash_type not in valid_hashes: + raise ValueError(f"不支持的哈希类型,可选值:{valid_hashes}") + + hash_func = hashlib.new(hash_type) + + # 处理不同输入类型 + if isinstance(source, bytes): + hash_func.update(source) + elif isinstance(source, BytesIO): + source.seek(0) + while chunk := source.read(4096): + hash_func.update(chunk) + elif is_url: + response = requests.get(source, stream=True) + response.raise_for_status() + for chunk in response.iter_content(4096): + hash_func.update(chunk) + else: # 视为文件路径 + with open(source, "rb") as f: + while chunk := f.read(4096): + hash_func.update(chunk) + + return hash_func.hexdigest() + + def get_image_metadata(self,image_path: str): + """获取完整图片信息""" + page = ComicPageInfo() + image_name = os.path.basename(image_path) + size = self.get_image_size(image_path) + page.Image = image_name.split(".")[0].split("_")[-1] + page.ImageSize = size + page.Key = self.get_image_hash_advanced(image_path) + + try: + with Image.open(image_path) as img: + ImageWidth, ImageHeight = zip(img.size) + page.ImageWidth, page.ImageHeight = [ImageWidth[0], ImageHeight[0]] + #return { + # "format": img.format, + # "mode": img.mode, + # "size_px": img.size, # (width, height) + # "file_size": size + #} + return page + except Exception as e: + raise RuntimeError(f"读取图片信息失败: {str(e)}") + + def get_image_metadata_from_zip(self, zip_path: str, chunk_size: int = 4096) -> list: + """ + 从 ZIP 文件中读取图片的元数据(无需解压整个文件) + + 参数: + - zip_path: ZIP 文件路径 + - chunk_size: 读取的字节数(用于解析图片头部信息) + + 返回: + - 包含图片元数据的列表,每个元素格式: + { + "filename": 文件名, + "compressed_size": 压缩后大小(字节), + "original_size": 原始大小(字节), + "format": 图片格式, + "width": 宽度(像素), + "height": 高度(像素) + } + """ + pages = [] + + with ZipFile(zip_path, 'r') as zf: + for file_info in zf.infolist(): + # 仅处理常见图片格式 + if not file_info.filename.lower().endswith( + ('.png', '.jpg', '.jpeg', '.gif', '.bmp') + ): + continue + + try: + with zf.open(file_info) as file: + # 读取前 chunk_size 字节用于解析元数据 + img_header = file.read(chunk_size) + + # 将数据包装为文件流 + img_buffer = BytesIO(img_header) + + page = ComicPageInfo() + page.Key = self.get_image_hash_advanced(img_buffer) + + # 使用 Pillow 解析图像信息 + with Image.open(img_buffer) as img: + page.Image = file_info.filename.split(".")[0] + page.ImageSize = file_info.file_size + ImageWidth, ImageHeight = zip(img.size) + page.ImageWidth, page.ImageHeight = [ImageWidth[0], ImageHeight[0]] + #metadata = { + # "filename": file_info.filename, + # "compressed_size": file_info.compress_size, + # "original_size": file_info.file_size, + # "format": img.format, + # "width": img.width, + # "height": img.height + #} + pages.append(page) + + except Exception as e: + print(f"解析失败 [{file_info.filename}]: {str(e)}") + + return pages + +# Define the ComicInfo and ComicPageInfo classes +class ComicInfo: + def __init__(self): + self.Title: str = "" + """标题""" + self.Series: str = "" + self.Number: str = "" + self.Count: int = -1 + self.Volume: int = -1 + self.AlternateSeries: str = "" + self.AlternateNumber: str = "" + self.AlternateCount: int = -1 + self.Summary: str = "" + self.Notes: str = "" + self.Year: int = -1 + self.Month: int = -1 + self.Day: int = -1 + self.Writer: str = "" + self.Penciller: str = "" + self.Inker: str = "" + self.Colorist: str = "" + self.Letterer: str = "" + self.CoverArtist: str = "" + self.Editor: str = "" + self.Publisher: str = "" + self.Imprint: str = "" + self.Genre: str = "" + self.Tags: str = "" + self.Web: str = "" + self.PageCount: int = -1 + self.LanguageISO: str = "" + self.Format: str = "" + self.BlackAndWhite: str = "" + self.Manga: str = "" + self.Characters: str = "" + self.Teams: str = "" + self.Locations: str = "" + self.ScanInformation: str = "" + self.StoryArc: str = "" + self.SeriesGroup: str = "" + self.AgeRating: str = "" + self.Pages: List[ComicPageInfo] = [] + +class ComicPageInfo: + def __init__(self): + self.Image: int = -1 + self.Type: str = "Story" + self.DoublePage: bool = False + self.ImageSize: int = -1 + self.Key: str = "" + self.Bookmark: str = "" + self.ImageWidth: int = -1 + self.ImageHeight: int = -1 + + def toString(self): + data = {} + def add(key, value): + if value != -1 and value != "": data[key] = str(value) + add("Image", self.Image) + add("ImageSize", self.ImageSize) + add("Key", self.Key) + add("ImageWidth", self.ImageWidth) + add("ImageHeight", self.ImageHeight) + return data + +class ComicInfoXml: + def _save_xml_to_file(self, xml_string, filename): + """ + Save the XML string to a file + """ + base_dir = os.path.dirname(filename) + if not os.path.exists(base_dir): os.makedirs(base_dir) + with open(filename, "w", encoding="utf-8") as file: + file.write(xml_string) + logger.info(f"ComicInfo.xml 生成成功 {filename}") + + def _validate_xml_with_xsd_file(self, xml_file, xsd_file, remove=True): + """ + Validate the XML file against the XSD file + """ + xml_doc = etree.parse(xml_file) + with open(xsd_file, 'r', encoding="utf-8") as file: + xsd_doc = etree.XMLSchema(etree.parse(file)) + try: + xsd_doc.assertValid(xml_doc) + logger.info(f"ComicInfo.xml 通过 XSD 验证成功 {xml_file}") + except etree.DocumentInvalid as e: + logger.error(f"ComicInfo.xml 通过 XSD 验证失败 {xml_file}") + if remove: + os.remove(xml_file) + + def get_page_count(self, zip_file: Path): + """获取 ComicInfo.xml 文件中的 标签值""" + # 打开ZIP文件 + with ZipFile(str(zip_file), 'r') as z: + # 假设ZIP中的文件名是'text.txt' + with z.open('ComicInfo.xml', 'r') as file: + # 从文件流中解析 XML 数据 + file_string = file.read().decode("utf-8") + # 使用正则表达式提取 标签中的值 + match = re.search(r"(\d+)", file_string) + if match: + page_count = match.group(1) + logger.info(f"zip_file={zip_file} PageCount: {page_count}") + return page_count + + def _parse_comicinfo(self, comic: ComicInfo, save_dir=None, xml_filename="ComicInfo.xml", xsd_filename="ComicInfo.xsd"): + """_summary_ + + Args: + comic (ComicInfo): _description_ + save_dir (_type_, optional): _description_. Defaults to None. + xml_filename (str, optional): _description_. Defaults to "ComicInfo.xml". + xsd_filename (str, optional): _description_. Defaults to "ComicInfo_2.1.xsd". + """ + # Serialize to XML with formatted output + def serialize_comic_info(comic: ComicInfo) -> str: + # Create root element with XML declaration and namespaces + comic_elem = ET.Element('ComicInfo') + comic_elem.set('xmlns:xsd', 'http://www.w3.org/2001/XMLSchema') + comic_elem.set('xmlns:xsi', 'http://www.w3.org/2001/XMLSchema-instance') + # Add subelements and attributes based on presence and requirements + for attr, value in comic.__dict__.items(): + # if value or (attr in ['Volume', 'Year', 'Month', 'Day', 'PageCount'] and (value == -1 or value == "" ) ): # Check required attributes + if value == -1 or value == "" or value == None or value == "[]" or value == []: + if attr in self._required_attributes(): + raise exit(f"{xml_filename} 缺少必要属性: {attr}") + else: + continue + else: + if attr == 'Pages': + pages_elem = ET.SubElement(comic_elem, 'Pages') + for page in value: + cpi = ComicPageInfo() + cpi.Image = page.Image + cpi.ImageSize = page.ImageSize + cpi.Key = page.Key + cpi.ImageWidth = page.ImageWidth + cpi.ImageHeight = page.ImageHeight + page_elem = ET.SubElement(pages_elem, 'Page', cpi.toString()) + else: + ET.SubElement(comic_elem, attr).text = str(value) + + # Create a formatted XML string + xml_str = ET.tostring(comic_elem, encoding='utf-8', method='xml') + parsed_xml = minidom.parseString(xml_str) + formatted_xml = parsed_xml.toprettyxml(indent=" ", encoding="utf-8") # Adjust the number of spaces for indentation as needed + + # Convert bytes to string and add XML declaration + return formatted_xml.decode('utf-8') + + # Serialize the ComicInfo object + serialized_xml = serialize_comic_info(comic) + + # 保存数据XML到文件 + if save_dir != None: xml_filename = os.path.join(save_dir, xml_filename) + self._save_xml_to_file(serialized_xml, xml_filename) + self._validate_xml_with_xsd_file(xml_filename, xsd_filename) # 将 JSON 转换为 XML + #xml_data = json_to_xml_with_declaration(json_data) + #print(xml_data) + + def _required_attributes(self): + return ["Title", "Series", "Number", "PageCount", "Writer"] + + def _gen_pageinfo(self, image_names, save_dir): + pages = [] + # Adding pages to the comic + for image_name in image_names: + image_name = image_name.split(".")[0].split("_")[-1]+".jpg" + image_path = os.path.join(save_dir, image_name) + page = ImageInfo().get_image_metadata(image_path) + # 图像属性 文件名 大小 长 + pages.append(page) + return pages + + def scrapy_xml_by_json(self, json_data, save_dir=None, xsd_file=XSD_FILE): + comic = ComicInfo() + comic.Title = json_data.get("chapter", "") + comic.Series = json_data.get("name", "") + comic.Writer = json_data.get("author", "") + comic.AgeRating = json_data.get("age_rating", "") + comic.Tags = json_data.get("tags", "") + comic.Summary = json_data.get("description", "") + comic.Genre = json_data.get("genre", "") + comic.Number = json_data.get("number", "") + comic.PageCount = json_data.get("page_count", "") + comic.Writer = json_data.get("author", "") + image_names = json_data.get("images", "") + #pages = [] + + pages = self._gen_pageinfo(image_names=image_names, save_dir=save_dir) + for page in pages: + comic.Pages.append(page) + # Adding pages to the comic + #for image_name in image_names: + # page = ComicPageInfo() + # page.Image = image_name.split(".")[0].split("_")[-1] + # pages.append(page.Image) + # comic.Pages.append(page) + self._parse_comicinfo(comic, save_dir=save_dir, xsd_filename=xsd_file) + return pages diff --git a/src/common/exceptions.py b/src/common/exceptions.py new file mode 100644 index 0000000..cb483db --- /dev/null +++ b/src/common/exceptions.py @@ -0,0 +1,25 @@ +"""异常定义""" + +class MangaException(Exception): + """漫画下载相关异常的基类""" + pass + +class NetworkError(MangaException): + """网络相关错误""" + pass + +class ParseError(MangaException): + """解析错误""" + pass + +class ConfigError(MangaException): + """配置错误""" + pass + +class DownloadError(MangaException): + """下载错误""" + pass + +class SiteError(MangaException): + """网站特定错误""" + pass \ No newline at end of file diff --git a/src/common/extractor.py b/src/common/extractor.py new file mode 100644 index 0000000..607e6ff --- /dev/null +++ b/src/common/extractor.py @@ -0,0 +1,220 @@ +"""数据提取工具""" +from typing import Any, Dict, List, Optional, Union +import re +from lxml import etree +from src.common.exceptions import ParseError +from src.common.loader import SiteConfig +from src.common.item import MangaItem,ListManga, MangaInfo, Chapter # 导入模型 + +class SelectorProcessor: + """选择器处理器""" + + @staticmethod + def select(tree: etree._Element, selector: str, index: int = -1) -> List[etree._Element]: + """XPath选择器""" + elements = tree.xpath(selector) + len_elements = len(elements) + try: + if len_elements == 0: + raise ParseError(f"无法找到元素: {selector}") + elif len_elements == 1: + return elements[0] + elif len_elements > 1 and index > -1: + return elements[index] + else: + return elements + except Exception as e: + return None + + @staticmethod + def select_one(tree: etree._Element, selector: str) -> Optional[etree._Element]: + """XPath选择器(单个)""" + elements = tree.xpath(selector) + return elements[0] if elements else None + + @staticmethod + def get_text(text: str): + """获取文本""" + return text.strip() if text is not None else '' + + @staticmethod + def get_attribute(element: etree._Element, attr: str) -> str: + """获取属性""" + result = element.get(attr, '') + if isinstance(result, str): + return element.get(attr, '').strip() if element is not None else '' + return result + + @staticmethod + def join_base_url(url: str, base_url: str) -> str: + """拼接基础URL""" + if url.startswith('http'): + return url + return f"{base_url.rstrip('/')}/{url.lstrip('/')}" + + @staticmethod + def extract_pattern(text: str, pattern: str) -> Optional[str]: + """提取正则匹配""" + match = re.search(pattern, text) + return match.group(1) if match else None + +class Extractor: + """数据提取器""" + + def __init__(self, config: SiteConfig): + self.config = config + self.processor = SelectorProcessor() + + def extract_manga_list(self, tree: etree._Element) -> ListManga: + """提取漫画信息并返回 MangaInfo 实例""" + selectors = self.config.get_selector('manga_list') + info_data = {} + for key, selector in selectors.items(): + if isinstance(selector, str): + element = self.processor.select(tree, selector) + if element: + if isinstance(element, str): + info_data[key] = self.processor.get_text(element) + else: + info_data[key] = element + return ListManga(**info_data) + + def extract_manga_info(self, tree: etree._Element) -> MangaInfo: + """提取漫画信息并返回 MangaInfo 实例""" + selectors = self.config.get_selector('manga_info') + info_data = {} + info_data['project'] = self.config.project + info_data['base_url'] = self.config.base_url + + for key, selector in selectors.items(): + if isinstance(selector, str): + element = self.processor.select(tree, selector) + if element: + if isinstance(element, str): + info_data[key] = self.processor.get_text(element) + else: + info_data[key] = element + elif isinstance(selector, dict): + if 'value' in selector: + info_data[key] = selector.get('value') + continue + element = self.processor.select(tree, selector['selector'], selector.get('index', -1)) + if element: + if 'attribute' in selector: + value = self.processor.get_attribute(element, selector['attribute']) + else: + value = self.processor.get_text(element) + + if 'process' in selector: + if selector['process'] == 'join_base_url': + value = self.processor.join_base_url(value, self.config.base_url) + + info_data[key] = value + + # 创建 MangaInfo 实例 + return MangaInfo(**info_data) # 使用解包操作符将字典传递给模型 + + def extract_chapter_list(self, tree: etree._Element) -> List[Chapter]: + """提取章节列表并返回 Chapter 实例列表""" + selector_config = self.config.get_selector('chapter_list') + elements = self.processor.select(tree, selector_config['container']) + urls = self.processor.select(tree, selector_config['attribute']) + + chapters = [] + result = {elements[i]: urls[i] for i in range(len(elements))} + + for element in elements: + chapter_data = {} + if selector_config['title'] == 'text': + chapter_data['title'] = self.processor.get_text(element) + + url_config = selector_config['url'] + url = self.processor.get_attribute(element, url_config['attribute']) + if url_config.get('process') == 'join_base_url': + url = self.processor.join_base_url(url, self.config.base_url) + chapter_data['url'] = url + + # 创建 Chapter 实例 + chapters.append(Chapter(**chapter_data)) # 使用解包操作符将字典传递给模型 + + return chapters + + def extract_chapter_images(self, html: str) -> List[str]: + """提取章节图片""" + config = self.config.get_selector('chapter') + data = self._extract_encrypted_data(html, config['image_data']) + return self._build_image_urls(data, config['image_url_template']) + + def _extract_data(self, tree: etree._Element, selectors: Dict) -> Dict[str, str]: + """通用数据提取""" + data = {} + for key, selector in selectors.items(): + if isinstance(selector, str): + element = tree.xpath(selector) + if element: + data[key] = element[0].text.strip() + elif isinstance(selector, dict): + data[key] = self._process_complex_selector(tree, selector) + return data + + def _extract_list(self, tree: etree._Element, config: Dict) -> List[Dict[str, str]]: + """提取列表数据""" + items = [] + elements = tree.xpath(config['container']) + seen_titles = set() # 用于跟踪已提取的标题 + for element in elements: + item = {} + if config['title'] == 'text': + title = element.text.strip() + if title not in seen_titles: # 检查标题是否已存在 + item['title'] = title + seen_titles.add(title) # 标记为已提取 + + url = element.get(config['url']['attribute'], '') + if config['url'].get('process') == 'join_base_url': + url = self._join_url(url) + item['url'] = url + + items.append(item) + return items + + def _extract_encrypted_data(self, html: str, config: Dict) -> Any: + """提取并解密数据""" + pattern = config['pattern'] + match = re.search(pattern, html) + if not match: + raise ParseError("无法找到数据") + + data = match.group(1) + if config.get('decrypt'): + data = self._decrypt_data(data, config['process']) + return data + + def _decrypt_data(self, data: str, steps: List[str]) -> Any: + """数据解密""" + import base64 + import zlib + import json + + result = data + for step in steps: + if step == 'base64_decode': + result = base64.b64decode(result) + elif step == 'zlib_decompress': + result = zlib.decompress(result).decode('utf-8') + elif step == 'json_parse': + result = json.loads(result) + return result + + def _join_url(self, path: str) -> str: + """拼接URL""" + if path.startswith('http'): + return path + return f"{self.config.base_url.rstrip('/')}/{path.lstrip('/')}" + + def _build_image_urls(self, data: Dict, template: str) -> List[str]: + """构建图片URL列表""" + urls = [] + for file in data.get('files', []): + urls.append(template.format(path=file)) + return urls \ No newline at end of file diff --git a/src/common/item.py b/src/common/item.py new file mode 100644 index 0000000..6c7ca2d --- /dev/null +++ b/src/common/item.py @@ -0,0 +1,234 @@ +from pydantic import BaseModel, HttpUrl, field_validator, model_validator +from typing import List, Optional +from opencc import OpenCC +import re,os +from src.common.ComicInfo import ImageInfo + +class FileNaming: + + @classmethod + def chinese_convert(cls, text,convert='t2s'): return OpenCC(convert).convert(str(text)) + + #处理成符合规定的文件名 + @classmethod + def fix_file_name(cls, filename, replace=None): + if not isinstance(filename, str): + return filename + in_tab = r'[?*/\|.:><]' + str_replace = "" + if replace is not None: + str_replace = replace + filename = re.sub(in_tab, str_replace, filename) + count = 1 + while True: + str_file = filename[0-count] + if str_file == " ": + count += 1 + else: + filename = filename[0:len(filename)+1-count] + break + return filename + + @classmethod + def chinese_file_name(cls, name): return cls.fix_file_name(cls.chinese_convert(name)) + +class ImageItem(BaseModel): + url: HttpUrl + scramble: bool + filename: str + +class CoverItem(BaseModel): + name: Optional[str] = "" + """文件名""" + url: HttpUrl = "" + """下载链接""" + path: Optional[str] = "" + """文件路径""" + size: Optional[int] = 0 + """文件大小""" + md5: Optional[str] = "" + """文件MD5""" + + @model_validator(mode="after") + def validate(self): + if self.path != "": + self.name = os.path.basename(self.path) + self.md5 = ImageInfo().get_image_hash_advanced(self.path) + self.size = ImageInfo().get_image_size(self.path) + return self + +class Chapter(BaseModel): + + title: str + + @field_validator('title', mode='before') + def validate_url(cls, v): + return FileNaming.chinese_file_name(v) + + url: HttpUrl + + # downloaded + status: Optional[str] = "" + #images: List[ImageItem] = [] + +class ListManga(BaseModel): + title: List[str] + + url: List[HttpUrl] + + @field_validator('url', mode='before') + def validate_url(cls, v): + list_url = [] + for url in v: + if isinstance(url, str) and not url.startswith('http'): + list_url.append(HttpUrl("https://rouman5.com" + url)) + return list_url + +class MangaInfo(BaseModel): + project: str + """漫画项目名称""" + + base_url: str = "" + """漫画网站域名""" + @field_validator('base_url', mode='before') + def validate_base_url(cls, v): + cls.base_url = v + return v + + title: str + """漫画名称""" + @field_validator('title', mode='before') + def validate_title(cls, v): + return FileNaming.chinese_file_name(v) + + author: str + """漫画作者""" + @field_validator('author', mode='before') + def validate_author(cls, v): + (list_value, value) = [[], str(v).replace("&", " ")] + for val in set(str(value).split(" ")): + list_value.append(val) + return FileNaming.chinese_file_name(",".join(list_value)) + + description: Optional[str] = None + """漫画描述""" + @field_validator('description', mode='before') + def validate_description(cls, v): + return FileNaming.chinese_file_name(v) + + cover: CoverItem + """漫画封面""" + @field_validator('cover', mode='before') + def validate_cover(cls, v): + cover_info = {} + if isinstance(v, str) and not v.startswith('http'): + cover_info['url'] = HttpUrl(cls.base_url + v) + return CoverItem(**cover_info) + + tags: str = [] + """漫画标签""" + + @field_validator('tags', mode='before') + def validate_tags(cls, v): + return FileNaming.chinese_file_name(v) + + # date: str + genre: str + """漫画类型""" + + age_rating: str + """漫画年龄分级""" + + chapter_link: List[HttpUrl] + """章节链接""" + + chapters_name: List[str] + """章节名称""" + + #list_chapter: dict[Chapter] + #status: str + #tags: List[str] + + @field_validator('chapter_link', mode='before') + def validate_chapter_link(cls, v): + if isinstance(v, str) and not v.startswith('http'): + return [HttpUrl(cls.base_url + v)] + elif isinstance(v, list): + if not v[0].startswith('http'): + return [HttpUrl(cls.base_url + chapter) for chapter in v] + return v + + def get_list_chapter(cls): + chapters_name = cls.chapters_name + chapter_link = cls.chapter_link + chapters = [] + for name, link in zip(chapters_name, chapter_link): + chapters.append(Chapter(title=name, url=link)) + return chapters + #@field_validator('list_chapter', mode='before') + #def validate_list_chapter(cls, v): + # s = cls.chapters_name + # c = cls.chapter_link + # return v + #if isinstance(v, list): + # return [Chapter(**chapter) for chapter in v] + #return v + #@validator('tags', pre=True) + #def validate_tags(cls, v): + # if not isinstance(v, list): + # raise ValueError('tags must be a list') + # return v + +class MangaItem(BaseModel): + info: MangaInfo + covers: List[CoverItem] = [] + chapter: Chapter = [] + chapter_images: List[ImageItem] = [] + chapters: List[Chapter] = [] + number: int = 0 + pages: int = 0 + + #@field_validator('chapter', mode='before') + #def fix_file_name(cls, v): + # return FileNaming.chinese_file_name(v) + + @field_validator('chapters', mode='before') + def validate_chapters(cls, v): + if not isinstance(v, list) or not all(isinstance(chapter, Chapter) for chapter in v): + raise ValueError('chapters must be a list of Chapter instances') + return v + + def get_item(cls): + # number 转换 + if len(cls.chapters) > 0: + count = 1 + for chapter in cls.chapters: + if chapter.title == cls.chapter.title and chapter.url == cls.chapter.url: + cls.number = count + break + count += 1 + # pages 转换 + if len(cls.chapter_images) > 0: cls.pages = len(cls.chapter_images) + + return cls + + def get_comic_info_json(cls): + cls.get_item() + filename_list = [] + for image in cls.chapter_images: + filename_list.append(image.filename) + + return { + "name": cls.info.title, + "chapter": cls.chapter.title, + "author": cls.info.author, + "tags": cls.info.tags, + "images": filename_list, + "description": cls.info.description, + "genre": cls.info.genre, + "age_rating": cls.info.age_rating, + "series": cls.info.title, + "number": cls.number, + "page_count": cls.pages, + } + diff --git a/src/common/loader.py b/src/common/loader.py new file mode 100644 index 0000000..9d63ed6 --- /dev/null +++ b/src/common/loader.py @@ -0,0 +1,49 @@ +from pathlib import Path +from typing import Dict, Any +import yaml +import importlib.resources as pkg_resources +from src.common.exceptions import ConfigError + +class SiteConfig: + """网站配置类""" + def __init__(self, config_data: Dict[str, Any]): + self.project = config_data['project'] + self.name = config_data['name'] + self.domain = config_data['domain'] + self.base_url = config_data['base_url'] + self.headers = config_data.get('headers', {}) + self.selectors = config_data['selectors'] + + def get_selector(self, *keys) -> Any: + """获取选择器配置""" + value = self.selectors + for key in keys: + if not isinstance(value, dict) or key not in value: + raise ConfigError(f"无效的选择器路径: {'.'.join(keys)}") + value = value[key] + return value + + def get_base_url(self): + return self.base_url + +class ConfigLoader: + """配置加载器""" + _configs: Dict[str, SiteConfig] = {} + + @classmethod + def load_config(cls, site_name: str) -> SiteConfig: + """加载网站配置""" + if site_name in cls._configs: + return cls._configs[site_name] + + try: + # 从包资源中读取配置文件 + config_text = Path('src/sites/configs', f'{site_name}.yml').read_text() + config_data = yaml.safe_load(config_text) + + config = SiteConfig(config_data) + cls._configs[site_name] = config + return config + + except Exception as e: + raise ConfigError(f"加载配置文件失败 {site_name}: {str(e)}") \ No newline at end of file diff --git a/src/common/logging.py b/src/common/logging.py new file mode 100644 index 0000000..574b429 --- /dev/null +++ b/src/common/logging.py @@ -0,0 +1,11 @@ +"""日志配置""" +import logging + +# 日志格式 +LOG_FORMAT = '%(asctime)s - %(levelname)s - %(message)s' +LOG_LEVEL = logging.INFO + +def setup_logging(name: str = None) -> logging.Logger: + """配置日志""" + logging.basicConfig(level=LOG_LEVEL, format=LOG_FORMAT) + return logging.getLogger(name or __name__) \ No newline at end of file diff --git a/src/common/naming.py b/src/common/naming.py new file mode 100644 index 0000000..1ca6d8c --- /dev/null +++ b/src/common/naming.py @@ -0,0 +1,256 @@ +from pathlib import Path +from datetime import datetime +from typing import Callable +import base64,hashlib,os,re +from src.config import BASE_DIR,CBZ_DIR,OLD_CBZ_DIR +from src.common.item import MangaInfo,MangaItem +from typing import Generator, Union, List, Optional + +PREFIX_SCRAMBLE = "scramble=" + +class DirectoryNaming: + """目录命名策略类""" + def ensure_dir(directory: Path): + """确保目录存在""" + directory.mkdir(parents=True, exist_ok=True) + + @classmethod + def chapter_images_dir(cls, manga_info: MangaInfo, chapter: str, filename: str = None) -> Path: + """生成章节目录""" + if filename: + return Path(BASE_DIR,f"{manga_info.project}","images",f"{manga_info.title}",chapter.title, filename) + else: + return Path(BASE_DIR,f"{manga_info.project}","images",f"{manga_info.title}",chapter.title) + + @classmethod + def chapter_cbz_dir(cls, manga_info: MangaInfo) -> Path: + """生成章节CBZ文件目录""" + return Path(CBZ_DIR,f"{manga_info.project}",f"{manga_info.title}") + + @classmethod + def manga_cover_dir(cls, manga_item: MangaItem) -> Path: + """生成漫画封面目录""" + return Path(BASE_DIR,f"{manga_item.info.project}","icons",f"{manga_item.info.title}",f"{manga_item.info.title}.jpg") + + @classmethod + def manga_cover_dir(cls, manga_info: MangaInfo, cache: bool = True, is_dir: bool = False) -> Path: + """生成漫画封面目录""" + path = "" + if cache: + path = Path(BASE_DIR,f"{manga_info.project}","icons",".cache") + else: + path = Path(BASE_DIR,f"{manga_info.project}","icons",f"{manga_info.title}") + if not is_dir: + path = os.path.join(path, f"{manga_info.title}.jpg") + return Path(path) + +class FileNaming: + """文件命名策略类""" + PREFIX_SCRAMBLE = "scramble=" + ext = ".jpg" + + @classmethod + def chapter_cbz(cls, manga_info: MangaInfo, chapter: str) -> Path: + """生成章节CBZ文件目录""" + return Path(CBZ_DIR,f"{manga_info.project}",f"{manga_info.title}",f"{chapter.title}.cbz") + + @classmethod + def old_chapter_cbz(cls, manga_info: MangaInfo, chapter: str) -> Path: + """生成章节CBZ文件目录""" + return Path(OLD_CBZ_DIR,f"{manga_info.project}",f"{manga_info.title}",f"{chapter.title}.cbz") + + #处理成符合规定的文件名 + @classmethod + def fix_file_name(cls, filename, replace=None): + if not isinstance(filename, str): + return filename + in_tab = r'[?*/\|.:><]' + str_replace = "" + if replace is not None: + str_replace = replace + filename = re.sub(in_tab, str_replace, filename) + count = 1 + while True: + str_file = filename[0-count] + if str_file == " ": + count += 1 + else: + filename = filename[0:len(filename)+1-count] + break + return filename + + @classmethod + def default_filename(cls,url: str, idx: int) -> str: + """默认文件名生成器:使用数字序号""" + #from ..utils import get_file_extension + #ext = get_file_extension(url) + return f"{idx:03d}{cls.ext}" + + @staticmethod + def default_path(base_dir: Path, chapter_name: str, filename: str) -> Path: + """默认路径生成器:直接在章节目录下""" + return base_dir / chapter_name / filename + + @classmethod + def getFileScrambleImageName(cls,count,block=None,suffix=".jpg"): + if block: + return cls.PREFIX_SCRAMBLE+str(block)+"_"+"{:0>3d}".format(count)+suffix + else: + return "{:0>3d}".format(count)+suffix + + @classmethod + def getFileScrambleImageSave(cls,img_path): + base_dir = os.path.dirname(img_path) + file_name = os.path.basename(img_path) + if file_name.startswith(cls.PREFIX_SCRAMBLE): + file_name = file_name.split("_")[-1] + return os.path.join(base_dir,file_name) + + # 解密切片 + @classmethod + def encodeImage(cls,str_en): + #print("en",str_en) + enc = base64.b64decode(str_en) + #print("解密:",enc) + m = hashlib.md5() + m.update(enc) + md5 = m.digest() + d = md5[-1] + #print(md5) + try: + blocks = d % 10 + 5 + except: + blocks = 0 %10 + 5 + #print("blocks=",blocks) + return blocks + + @classmethod + def cover_format_path(cls, path, count=0): + if count != 0: + name, suffix = os.path.splitext(path) + new_path = name+"-"+str(count)+suffix + return new_path + if not os.path.exists(path): return path + count = 1 + while count: + name, suffix = os.path.splitext(path) + new_path = name+"-"+str(count)+suffix + if not os.path.exists(new_path): return new_path + else: count += 1 + + @classmethod + def get_filenames_optimized(cls, + folder_path: Union[str, Path], + recursive: bool = False, + ext_filter: Optional[List[str]] = None, + include_hidden: bool = False, + full_path: bool = True, + min_size: Optional[int] = None, + max_size: Optional[int] = None + ) -> Generator[str, None, None]: + """ + 高性能文件名获取函数(优化版) + + :param folder_path: 目标文件夹路径 + :param recursive: 是否递归子目录 + :param ext_filter: 扩展名过滤列表(如 ['.jpg', '.png']),不区分大小写 + :param include_hidden: 是否包含隐藏文件 + :param full_path: 是否返回完整路径 + :param min_size: 最小文件大小(单位:字节) + :param max_size: 最大文件大小(单位:字节) + + :return: 生成器,按需生成符合条件的文件路径 + """ + # 路径标准化处理 + folder_path = Path(folder_path).resolve() + if not folder_path.is_dir(): + raise ValueError(f"无效的目录路径: {folder_path}") + + # 预处理扩展名过滤条件 + ext_tuple = tuple(ext.lower() for ext in ext_filter) if ext_filter else None + + # 主扫描逻辑 + def _scandir(path: Path): + with os.scandir(path) as entries: + for entry in entries: + # 跳过无效条目 + if not entry.name: + continue + + # 处理目录 + if entry.is_dir(): + if recursive: + # 隐藏目录处理 + if not include_hidden and entry.name.startswith('.'): + continue + yield from _scandir(Path(entry.path)) + continue + + # 处理文件 + if not entry.is_file(): + continue + + # 过滤隐藏文件 + if not include_hidden: + if entry.name.startswith('.') or (os.name == 'nt' and entry.is_system()): + continue + + # 扩展名过滤 + if ext_tuple: + file_ext = Path(entry.name).suffix.lower() + if file_ext not in ext_tuple: + continue + + # 文件大小过滤 + try: + stat = entry.stat(follow_symlinks=False) + except OSError: + continue + + if min_size is not None and stat.st_size < min_size: + continue + if max_size is not None and stat.st_size > max_size: + continue + + # 生成结果 + yield entry.path if full_path else entry.name + + return _scandir(folder_path) + +class NamingStrategy: + """命名策略集合类""" + + @staticmethod + def original_filename(url: str, idx: int) -> str: + """保留原始文件名的生成器""" + from ..utils import get_file_extension + ext = get_file_extension(url) + return f"image_{idx}_original{ext}" + + @staticmethod + def date_based_path(base_dir: Path, chapter_name: str, filename: str) -> Path: + """按日期组织的路径生成器""" + today = datetime.now() + return base_dir / str(today.year) / f"{today.month:02d}" / chapter_name / filename + + @staticmethod + def manga_volume_path( + manga_name: str, + volume_num: int + ) -> Callable[[Path, str, str], Path]: + """生成按漫画名和卷号组织的路径生成器""" + def path_generator(base_dir: Path, chapter_name: str, filename: str) -> Path: + return base_dir / manga_name / f"第{volume_num:02d}卷" / chapter_name / filename + return path_generator + + @staticmethod + def custom_manga_filename( + prefix: str = "page", + digits: int = 4 + ) -> Callable[[str, int], str]: + """生成自定义漫画页面文件名生成器""" + def filename_generator(url: str, idx: int) -> str: + from ..utils import get_file_extension + ext = get_file_extension(url) + return f"{prefix}_{idx:0{digits}d}{ext}" + return filename_generator \ No newline at end of file diff --git a/src/common/utils.py b/src/common/utils.py new file mode 100644 index 0000000..579013d --- /dev/null +++ b/src/common/utils.py @@ -0,0 +1,646 @@ +import asyncio +import aiohttp +import base64,hashlib,os,shutil,os.path,math +from PIL import Image +import logging,time,os,shutil,re,xmlschema +from pathlib import Path +from typing import List, Optional, Callable, Dict, Any +from src.common.naming import DirectoryNaming +from src.common.naming import FileNaming,PREFIX_SCRAMBLE +from src.config import DEFAULT_HEADERS, CONCURRENT_DOWNLOADS, TIMEOUT, RETRY_TIMES, CACHE_DIR, CACHE_IMAGE_DIR +from src.config import RETRIES, COMIC_INFO_NAME, PROXY_URL, RETRY_PROXY, RETRY_PROXY_TIMES, XSD_FILE, BASE_DIR +from src.common.exceptions import DownloadError +from src.common.item import ImageItem, MangaItem, MangaInfo +from zipfile import ZipFile, ZIP_DEFLATED +from src.common.logging import setup_logging +import logging +from tempfile import NamedTemporaryFile + + +logger = setup_logging(__name__) + +class Cache: + """缓存类,用于存储和管理网页内容的缓存""" + + def __init__(self, cache_dir: Path = CACHE_DIR, expiration_time: int = 3600): + self.cache_dir = cache_dir + self.expiration_time = expiration_time + self.cache_dir.mkdir(exist_ok=True) # 创建缓存目录 + + def _get_cache_file_path(self, url: str) -> Path: + """根据 URL 生成缓存文件路径""" + filename = FileNaming.fix_file_name(str(url)) + # 以网站 "/" 分离目录 + parts = str(url).replace("https://", "").replace("http://", "").split("/") # 按照 "/" 分离 URL + subdir = parts[0] if len(parts) > 2 else "default" # 使用域名作为第一层子目录 + hash_dir = hashlib.md5(str(url).encode()).hexdigest() + dir = self.cache_dir / subdir / hash_dir[0:2] / hash_dir[3:5] # 返回多级目录路径 + dir.mkdir(parents=True, exist_ok=True) + return dir / filename + + + def get(self, url: str, type: str = "html") -> str: + """从缓存中获取 HTML 内容""" + cache_file = self._get_cache_file_path(url) + if cache_file.exists(): + # 检查缓存是否过期 + if time.time() - cache_file.stat().st_mtime < self.expiration_time: + with open(cache_file, 'r', encoding='utf-8') as f: + return f.read() + elif type == "image": + with open(cache_file, 'rb') as f: + return f.read() + else: + cache_file.unlink() # 删除过期的缓存文件 + return None + + def get_image(self, url: str) -> bytes: + """从缓存中获取图片""" + cache_file = self._get_cache_file_path(url) + if cache_file.exists(): + # 验证下载的文件是否为有效的图片 + if MangaDownloader()._is_valid_image(cache_file): + with open(cache_file, 'rb') as f: + return f.read() + else: + logger.error(f"图像已损坏: {cache_file}") + os.remove(cache_file) + return None + + def set(self, url: str, html: str) -> None: + """将 HTML 内容保存到缓存""" + cache_file = self._get_cache_file_path(url) + with open(cache_file, 'w', encoding='utf-8') as f: + f.write(html) + + def set_image(self, url: str, image: bytes) -> None: + """将图片保存到缓存""" + cache_file = self._get_cache_file_path(url) + with open(cache_file, 'wb') as f: + f.write(image) + +class DownloadStatus: + """下载状态跟踪类,用于记录下载进度""" + + def __init__(self, total: int): + self.total = total + self.success = 0 + self.failed = 0 + self.current = 0 + + @property + def is_completed(self) -> bool: + """检查下载是否完成""" + return self.current >= self.total + + @property + def progress(self) -> float: + """计算当前下载进度""" + return self.current / self.total if self.total > 0 else 0 + +class MangaDownloader: + """漫画下载器类,负责下载漫画及其相关资源""" + + def __init__(self, base_dir: Path = BASE_DIR): + self.connector = aiohttp.TCPConnector(limit_per_host=CONCURRENT_DOWNLOADS) + self.base_dir = Path(base_dir) + self.cache_dir = CACHE_IMAGE_DIR # 缓存目录 + self.cache = Cache() + DirectoryNaming.ensure_dir(self.base_dir) + DirectoryNaming.ensure_dir(self.cache_dir) # 创建缓存目录 + + async def download_cover(self, manga_info: MangaInfo): + """下载封面""" + cover_item = manga_info.cover + save_path = DirectoryNaming.manga_cover_dir(manga_info) + DirectoryNaming.ensure_dir(save_path.parent) + if os.path.exists(save_path): + print("f".format(save_path)) + async with aiohttp.ClientSession(headers=DEFAULT_HEADERS, timeout=aiohttp.ClientTimeout(total=TIMEOUT, connect=TIMEOUT)) as session: + await self.download_image(session,str(cover_item.url), save_path) + + async def download_chapter( + self, + manga_item: MangaItem, + semaphore: Optional[asyncio.Semaphore] = None, + status_callback: Optional[Callable[[DownloadStatus], None]] = None + ) -> Dict[str, Any]: + """ + 下载整个章节的图片 + :param image_items: 要下载的图片项列表 + :param chapter_name: 章节名称 + :param manga_info: 漫画信息 + :param semaphore: 限制并发下载的信号量 + :param status_callback: 下载状态回调函数 + :return: 下载结果统计字典 + """ + manga_info = manga_item.info + chapter = manga_item.chapter + image_items = manga_item.chapter_images + + if semaphore is None: + semaphore = asyncio.Semaphore(CONCURRENT_DOWNLOADS) + + status = DownloadStatus(len(image_items)) + failed_items = [] + + async with aiohttp.ClientSession(headers=DEFAULT_HEADERS, timeout=aiohttp.ClientTimeout(total=TIMEOUT, connect=TIMEOUT)) as session: + tasks = [] + for image_item in image_items: + url = str(image_item.url) + save_path = DirectoryNaming.chapter_images_dir(manga_info, chapter, image_item.filename) + DirectoryNaming.ensure_dir(save_path.parent) + + task = self._download_with_semaphore(semaphore, session, url, save_path, status, status_callback) + tasks.append(task) + + results = await asyncio.gather(*tasks, return_exceptions=True) + + # 处理结果 + for idx, result in enumerate(results): + if isinstance(result, Exception): + status.failed += 1 + failed_items.append(image_items[idx]) + logger.error(f"下载失败 {image_items[idx].url}: {str(result)}") + elif result: + status.success += 1 + else: + status.failed += 1 + failed_items.append(image_items[idx]) + + result = { + 'chapter': chapter, + 'total': len(image_items), + 'success': status.success, + 'failed': status.failed, + 'failed_items': failed_items + } + + logger.info(f"章节 {chapter.title} 下载完成: {status.success}/{len(image_items)} 张图片成功下载") + + return result + + async def _download_with_semaphore( + self, + semaphore: asyncio.Semaphore, + session: aiohttp.ClientSession, + url: str, + save_path: Path, + status: DownloadStatus, + callback: Optional[Callable] = None + ) -> bool: + async with semaphore: + result = await self.download_image(session, url, save_path) + status.current += 1 + if callback: + callback(status) + return result + + async def download_image(self, session: aiohttp.ClientSession, url: str, save_path: Path, retries: int = RETRIES, timeout: int = TIMEOUT, use_proxy: bool = RETRY_PROXY) -> bool: + """下载单个图片,增加重试机制、超时等待和文件缓存机制""" + if os.path.exists(FileNaming.getFileScrambleImageSave(save_path)): # 检查文件是否已存在 + logger.info(f"文件已存在,跳过下载: {save_path}") + return True + # 从缓存中获取图片 + cached_images = self.cache.get_image(url) + if cached_images: + with open(save_path, 'wb') as f: + f.write(cached_images) + return True + + for attempt in range(retries): + try: + timeout_obj = aiohttp.ClientTimeout(total=timeout) # 设置超时 + # 如果使用代理,设置代理 URL + if attempt > RETRY_PROXY_TIMES and use_proxy: + logger.info(f"使用代理: {PROXY_URL}") + session_get = session.get(url, timeout=timeout_obj, proxy=PROXY_URL) + else: + session_get = session.get(url, timeout=timeout_obj) + async with session_get as response: + if response.status == 200: + with open(str(save_path)+".downloads", 'wb') as f: + f.write(await response.read()) + # 验证下载的文件是否为有效的图片 + if self._is_valid_image(str(save_path)+".downloads"): + logger.info(f"成功下载: {url}") + shutil.move(str(save_path)+".downloads", save_path) + self.cache.set_image(url, await response.read()) + return True + else: + logger.error(f"下载的文件无效: {save_path}") + return False + else: + logger.error(f"下载失败: {url},状态码: {response.status}") + return False + except asyncio.TimeoutError: + logger.error(f"下载超时: {url},尝试次数: {attempt + 1}") + except Exception as e: + logger.error(f"下载图片时出错: {url},错误: {str(e)}") + + if attempt < retries - 1: + logger.info(f"重试下载: {url},尝试次数: {attempt + 2}") + await asyncio.sleep(1) # 等待一段时间再重试 + + return False + + def _is_valid_image(self, file_path: Path) -> bool: + """验证文件是否为有效的图片""" + try: + from PIL import Image + with Image.open(file_path) as img: + img.verify() # 验证图片 + return True + except Exception as e: + logger.error(f"图片验证失败: {file_path},错误: {str(e)}") + return False + +class CBZUtils: + def __init__(self, cbz_path: Path): + self.cbz_path = cbz_path + + def get_page_count(self): + return self._comic_info_xml_page_count(self.cbz_path) + + def _comic_info_xml_page_count(self, zip_file: Path): + """获取 ComicInfo.xml 文件中的 标签值""" + # 打开ZIP文件 + with ZipFile(str(zip_file), 'r') as z: + try: + # 假设ZIP中的文件名是'text.txt' + with z.open('ComicInfo.xml', 'r') as file: + # 从文件流中解析 XML 数据 + file_string = file.read().decode("utf-8") + # 使用正则表达式提取 标签中的值 + match = re.search(r"(\d+)", file_string) + if match: + page_count = match.group(1) + logger.info(f"zip_file={zip_file} PageCount: {page_count}") + return page_count + except Exception as e: + raise exit(f"获取 ComicInfo.xml 文件中的 标签值失败: {zip_file},错误: {str(e)}") + + def _check_zip_file(self, zip_file_path: Path): + """检查 ZIP 文件是否包含图片""" + result = False + is_comic_info = False + if not os.path.exists(zip_file_path): + logger.info(f"ZIP 文件不存在: {zip_file_path}") + return False + try: + with ZipFile(zip_file_path, 'r') as zip_file: + file_list = zip_file.namelist() + result = any(file_name.endswith('.jpg') for file_name in file_list) + is_comic_info = any(file_name == COMIC_INFO_NAME for file_name in file_list) + if is_comic_info: + page_count = self._comic_info_xml_page_count(zip_file_path) + if len(file_list) == int(page_count) + 1: + logger.info(f"ZIP 文件 {zip_file_path} 验证成功") + result = True + else: + logger.error(f"ZIP 文件 {zip_file_path} 验证失败,文件数量与 ComicInfo.xml 中的 不一致") + os.remove(zip_file_path) + if not result and os.path.exists(zip_file_path): + logger.error("ZIP 文件中没有图片") + os.remove(zip_file_path) + if not is_comic_info: + logger.error("ZIP 文件中没有 ComicInfo.xml") + os.remove(zip_file_path) + except FileNotFoundError: + logger.info(f"ZIP 文件不存在: {zip_file_path}") + except Exception as e: + logger.error(f"检查 ZIP 文件失败: {zip_file_path},错误: {str(e)}") + if os.path.exists(zip_file_path): + os.remove(zip_file_path) + return result + + def _zip_compression(cls, source_dir=None, target_file=None, remove=True): + cls._check_zip_file(target_file) + if not os.path.exists(source_dir): + raise FileNotFoundError(f"打包目标目录不存在: {source_dir}") + # 检查目录中是否存在 .jpg 文件 + if not any(file_name.endswith('.jpg') for file_name in os.listdir(source_dir)): + logger.error(f"打包目标目录中不存在图片: {source_dir}") + return False + target_dir = os.path.dirname(target_file) + if not os.path.exists(target_dir): os.makedirs(target_dir) + if not os.path.exists(target_file) and source_dir is not None: + try: + count = 0 + filenames = sorted(list(source_dir.glob("*.jpg")) + list(source_dir.glob(COMIC_INFO_NAME)), key=lambda f: f.name) # 对文件名进行排序 + with ZipFile(str(target_file), mode='w') as cbz: + for file in filenames: + # 假设图片格式为 JPG 或 ComicInfo.xml + count += 1 + print("打包中:" + str(count) + "/" + str(len(filenames)), os.path.join(source_dir, file.name)) + cbz.write(file, arcname=file.name) + cbz.close() + logger.info(f"打包完成:{target_file} 共 {count} 个文件") + except Exception as e: + logger.error(f"打包失败: {target_file},错误: {str(e)}") + if os.path.exists(target_file): + os.remove(target_file) + raise e + return cls._check_zip_file(target_file) + + def _image_deScrambleByPath(self, chapter_dir: Path): + if os.path.exists(chapter_dir): + dirs = os.listdir(chapter_dir) + for file in dirs: + if file.startswith(PREFIX_SCRAMBLE): + try: + ImageUtils.deScrambleImagesByPath(os.path.join(chapter_dir,file)) + except Exception as e: + print(f"删除 {file} 发生错误 {e},已跳过") + return False + + def create_cbz(self, chapter_dir: Path): + if os.path.exists(chapter_dir): + dirs = os.listdir(chapter_dir) + for file in dirs: + if file.startswith(PREFIX_SCRAMBLE): + try: + ImageUtils.deScrambleImagesByPath(os.path.join(chapter_dir,file)) + except Exception as e: + print(f"删除 {file} 发生错误 {e},已跳过") + return False + if self._zip_compression(source_dir=chapter_dir, target_file=self.cbz_path, remove=False): + logger.info(f"章节 {chapter_dir.name} 打包完成: {self.cbz_path}") + else: + raise exit(f"章节 {chapter_dir.name} 打包失败: {self.cbz_path}") + + def update_zip_file(self,zip_path: str, update_files: dict): + """ + 不整体解压的情况下更新 ZIP 中的文件 + + 参数: + - zip_path: ZIP文件路径 + - update_files: 需更新的文件字典 {内部路径: 新文件路径或bytes} + + 示例: + update_zip_file("data.zip", {"config.json": "new_config.json"}) + """ + # 创建临时文件 + temp_dir = os.path.dirname(zip_path) + with NamedTemporaryFile(dir=temp_dir, delete=False) as tmp_file: + temp_zip_path = tmp_file.name + + try: + # 读取原始 ZIP 并创建新 ZIP + with ZipFile(zip_path, 'r') as orig_zip, \ + ZipFile(temp_zip_path, 'w', ZIP_DEFLATED) as new_zip: + + # 遍历原始 ZIP 中的文件 + for orig_info in orig_zip.infolist(): + file_name = orig_info.filename + + if file_name in update_files: + # 替换目标文件 + new_data = update_files[file_name] + if isinstance(new_data, bytes): + new_zip.writestr(file_name, new_data) + else: + new_zip.write(new_data, file_name) + + # 保留原始时间戳 + new_info = new_zip.getinfo(file_name) + new_info.date_time = orig_info.date_time + else: + # 复制未修改文件 + with orig_zip.open(orig_info) as orig_file: + new_zip.writestr(orig_info, orig_file.read()) + + # 替换原文件 + shutil.move(temp_zip_path, zip_path) + + finally: + if os.path.exists(temp_zip_path): + os.remove(temp_zip_path) + + # 使用示例 ------------------------------ + #if __name__ == "__main__": + # 示例1:用本地文件替换 ZIP 中的文件 + # update_zip_file("archive.zip", { + # "docs/readme.txt": "new_readme.txt" # 本地文件路径 + # }) + + # # 示例2:直接写入字节数据 + # new_config = b'{"version": 2.0, "active": true}' + # update_zip_file("data.zip", { + # "config.json": new_config # 字节数据 + # }) + +class ImageUtils: + + @classmethod + def descramble_images_by_dir(cls, chapter_dir): + if os.path.isfile(chapter_dir): + chapter_dir = os.path.dirname(chapter_dir) + scramble_count = 0 + if os.path.exists(chapter_dir): #获取章节图片路径 + while PREFIX_SCRAMBLE in os.listdir(chapter_dir): + for img in os.listdir(chapter_dir): + if img.startswith(PREFIX_SCRAMBLE): + cls.encode_scramble_image(os.path.join(chapter_dir, img)) + scramble_count += 1 + logging.debug(f"{PREFIX_SCRAMBLE} {scramble_count}") + return scramble_count + + @classmethod + def deScrambleImagesByPath(cls, img_path, img_save=None): + if os.path.basename(img_path).\ + startswith(PREFIX_SCRAMBLE) and os.path.exists(img_path): + img_path = cls.encode_scramble_image(img_path, img_save) + return img_path + + @classmethod + def encodeImage(cls,str_en): + #print("en",str_en) + enc = base64.b64decode(str_en) + #print("解密:",enc) + m = hashlib.md5() + m.update(enc) + md5 = m.digest() + d = md5[-1] + #print(md5) + try: + blocks = d % 10 + 5 + except: + blocks = 0 %10 + 5 + #print("blocks=",blocks) + return blocks + + @classmethod + def scrambleImage(cls,file_path): + #检测到未下载完的图像 直接返回None + if str(file_path).endswith(".downloads"): + os.remove(file_path) + return None + file_str = str(file_path).split("=") + #10_29.jpg + base_dir = file_str[0].replace("scramble","") + base_name = file_str[-1] + base_fn = base_name.split("_") + save_name = base_fn[1] + save_name_delesu = save_name.split(".")[0] + blocks = int(base_fn[0]) + save_file_path = os.path.join(base_dir,save_name) + print("sva",save_file_path) + if os.path.exists(save_file_path): + print("图片已解密,已跳过:", save_file_path) + return None + image_su = str(file_path).split(".")[-1] + try: + img = Image.open(file_path) + except: + print(f"error Image: {file_path}") + width = img.width + height = img.height + #blocks = cls.encodeImage(enStr) + print("blocks=",blocks) + block_height = int(height / blocks) + block_width = int(width / blocks) + print("blockHeight=",block_height) + suffix = str(file_path).split(".")[-1] + split_path = os.path.join(base_dir,save_name_delesu+"split") + if image_su == "downloads": + return None + is_split = cls.splitimage(file_path,blocks,1,split_path) + if is_split != None: + cls.image_compose(split_path,blocks,1,save_file_path,block_height,width) + else: + if os.path.exists(split_path): + shutil.rmtree(split_path) + if os.path.exists(file_path): + shutil.move(file_path, save_file_path) + #完成后清空 + return file_path + + @classmethod + def splitimage(cls,src,rownum,colnum,dstpath): + img=Image.open(src) + w,h=img.size + if rownum<= h and colnum<=w: + s=os.path.split(src) + if dstpath=='': + dstpath = s[0] + if not os.path.exists(dstpath): + os.makedirs(dstpath) + fn=s[1].split('.') + basename=fn[0] + ext=fn[-1] + num=0 + rowheight=h//rownum + colwidth=w//colnum + for r in range(rownum): + for c in range(colnum): + box=(c*colwidth,r*rowheight,(c+1)*colwidth,(r+1)*rowheight) + count_image = "{:0>3d}".format(num) + file_path = os.path.join(dstpath,str(count_image)+'.'+ext) + print("file_path=",file_path) + img.crop(box).save(file_path) + num=num+1 + return "成功" + else: + print('不数!') + return None + + @classmethod + def image_compose(cls,src,row,column,save_path,image_height,image_width): + image_size = image_height + #image_height = 376 + #image_width = 720 + images_format = ['.png','.jpg'] + + #image_names = [name for name in os.listdir(src) for item in images_format if + # os.path.splitext(name)[1] == item][::-1] + img_list=os.listdir(src) + img_list.sort() + img_list.sort(key=lambda x: int(x[:-4])) + ##文件名按数字排序 + img_nums=len(img_list) + image_names = [] + for i in range(img_nums): + img_name=os.path.join(src,img_list[i]) + image_names.append(img_name) + #使用倒序 + image_names = image_names[::-1] + # 简单的对于参数的设定和实际图片集的大小进行数量判断 + if len(image_names) < row * column: + raise ValueError("合成图片的参数和要求的数量不能匹配!") + + to_image = Image.new('RGB', (column * image_width, row * image_height)) #创建一个新图 + # 循环遍历,把每张图片按顺序粘贴到对应位置上 + for y in range(1, row + 1): + for x in range(1, column + 1): + #1 * (row=1 -1) col=1 -1 + image_path = image_names[column * (y - 1) + x - 1] + print("split_image=",image_path) + from_image = Image.open(image_path) + #保持原图片大小 + #.resize( + # (image_size, image_size),Image.ANTIALIAS) + to_image.paste(from_image, ((x - 1) * image_size, (y - 1) * image_size)) + from_image.close() + to_image.save(save_path) + print("图片合并完成:", save_path) + shutil.rmtree(src) + # 保存新图 + + @classmethod + def getScrambleImage(cls,path): + scramble_file_cache = cls.scrambleImage(path) + if scramble_file_cache != None and os.path.exists(scramble_file_cache): os.remove(scramble_file_cache) + + @classmethod + def encode_scramble_image(cls, img_path, img_save=None): + if not os.path.exists(img_path): + return + image = Image.open(img_path) + w, h = image.size + #image.show() + file_str = str(img_path).split("=") + #10_29.jpg + base_fn = file_str[-1].split("_") + blocks = int(base_fn[0]) + if img_save == None: + save_path = FileNaming.getFileScrambleImageSave(img_path) + else: save_path = img_save + # print(type(aid),type(img_name)) + if blocks: + s = blocks # 随机值 + # print(s) + l = h % s # 切割最后多余的值 + box_list = [] + hz = 0 + for i in range(s): + c = math.floor(h / s) + g = i * c + hz += c + h2 = h - c * (i + 1) - l + if i == 0: + c += l;hz += l + else: + g += l + box_list.append((0, h2, w, h - g)) + + # print(box_list,len(box_list)) + item_width = w + # box_list.reverse() #还原切图可以倒序列表 + # print(box_list, len(box_list)) + newh = 0 + image_list = [image.crop(box) for box in box_list] + # print(box_list) + newimage = Image.new("RGB", (w, h)) + for image in image_list: + # image.show() + b_w, b_h = image.size + newimage.paste(image, (0, newh)) + + newh += b_h + newimage.save(save_path) + logging.info(f"解密成功 {save_path}") + if os.path.exists(img_path): + os.remove(img_path) + logging.debug(f"remove {img_path}") + return save_path diff --git a/src/config.py b/src/config.py new file mode 100644 index 0000000..960b90d --- /dev/null +++ b/src/config.py @@ -0,0 +1,37 @@ +import logging +from pathlib import Path +from fake_useragent import UserAgent + +# 基础配置 +BASE_DIR = Path("output") +CACHE_DIR = Path(".cache") +CACHE_IMAGE_DIR = CACHE_DIR / "images" +CBZ_DIR = Path("CBZ") +OLD_CBZ_DIR = Path("OldCBZ") +# DEFAULT_SAVE_DIR = Path("output") +CONCURRENT_DOWNLOADS = 10 +RETRY_TIMES = 10 +RETRY_PROXY = False +# 在下载失败后,重试次数 +RETRY_PROXY_TIMES = 1 +RETRIES = 15 +TIMEOUT = 60 +COMIC_INFO_NAME = "ComicInfo.xml" +XSD_FILE = "src/assets/ComicInfo_2.1.xsd" +# 代理配置 +PROXY_URL = "http://47.98.225.49:9890" + +# 日志配置 +LOG_FORMAT = '%(asctime)s - %(levelname)s - %(message)s' +LOG_LEVEL = logging.INFO + +# HTTP配置 +USER_AGENT = UserAgent().random + +DEFAULT_HEADERS = { + 'User-Agent': USER_AGENT +} + +# 文件类型 +IMAGES_NAME_FORMAT = "{:0>3d}" +DEFAULT_IMAGE_EXT = '.jpg' \ No newline at end of file diff --git a/src/sites/base.py b/src/sites/base.py new file mode 100644 index 0000000..26ab0dd --- /dev/null +++ b/src/sites/base.py @@ -0,0 +1,215 @@ +from abc import ABC, abstractmethod +from typing import List, Dict, Optional, AsyncGenerator +from pathlib import Path +import aiohttp,os,shutil +import asyncio +import logging +from src.config import DEFAULT_HEADERS, TIMEOUT, RETRIES, PROXY_URL, RETRY_PROXY +from lxml import etree +from src.common.utils import Cache # 导入缓存类 +from src.common.item import Chapter, MangaItem, MangaInfo,CoverItem +from src.common.exceptions import SiteError, NetworkError, ParseError +from src.common.logging import setup_logging +from src.common.naming import DirectoryNaming,FileNaming +from src.common.ComicInfo import ComicInfo, ImageInfo + +logger = setup_logging(__name__) + +class BaseSite(ABC): + """漫画网站基类""" + def __init__(self): + self.session: Optional[aiohttp.ClientSession] = None + self.headers = DEFAULT_HEADERS.copy() + self.cache = Cache() # 初始化缓存 + + async def __aenter__(self): + self.session = aiohttp.ClientSession(headers=self.headers) + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + if self.session: + await self.session.close() + + async def _get(self, url: str, retries: int = RETRIES, PROXY: bool = RETRY_PROXY) -> str: + """发送GET请求并处理错误""" + # 尝试从缓存中获取 HTML 内容 + cached_html = self.cache.get(url) + if cached_html: + logger.info(f"从缓存中获取 HTML 内容: {url}") + return cached_html + + for attempt in range(retries): + try: + if PROXY: + proxy = PROXY_URL + else: + proxy = None + async with self.session.get(str(url), proxy=proxy) as response: + if response.status == 200: + html = await response.text() + self.cache.set(url, html) # 将 HTML 内容保存到缓存 + return html + elif response.status == 404: + raise SiteError(f"页面不存在: {url}") + elif response.status == 403: + raise SiteError(f"访问被拒绝: {url}") + else: + raise NetworkError(f"HTTP错误 {response.status}: {url}") + except aiohttp.ClientError as e: + if attempt == retries - 1: + raise NetworkError(f"网络错误: {str(e)}") + logger.info(f"第 {attempt + 2} 次重试, 网站: {url}") + await asyncio.sleep(2 * (attempt + 1)) + + @abstractmethod + async def get_chapter_images(self, chapter_url: str) -> List[str]: + """获取章节所有图片URL""" + pass + + #@abstractmethod + async def get_manga_info(self, manga_url: str) -> Dict[str, str]: + """获取漫画信息""" + try: + html = await self._get(manga_url) + tree = etree.HTML(html) + return self.extractor.extract_manga_info(tree) + except Exception as e: + if isinstance(e, (ParseError, SiteError)): + raise exit(f"解析漫画信息失败: {str(e)}") + raise ParseError(f"解析漫画信息失败: {str(e)}") + + #@abstractmethod + #async def get_chapter_list(self, info: MangaInfo) -> List[Dict[str, str]]: + # """获取漫画章节列表""" + # pass + + async def get_chapter_list(self, manga_info: MangaInfo) -> List[Dict[str, str]]: + """获取章节列表""" + try: + # result_type list[Chapter] + list_chapter = manga_info.get_list_chapter() + down_chapter = [] + for chapter in list_chapter: + cbz_path = FileNaming.chapter_cbz(manga_info=manga_info,chapter=chapter) + old_cbz_path = FileNaming.old_chapter_cbz(manga_info=manga_info,chapter=chapter) + if os.path.exists(cbz_path): + logger.info(f"{chapter.title} 章节已存在") + chapter.status = "downloaded" + if os.path.exists(old_cbz_path): + logger.info(f"{chapter.title} Old章节存在") + if not os.path.exists(os.path.dirname(cbz_path)): os.makedirs(cbz_path) + shutil.copy(old_cbz_path, cbz_path) + logger.info(f"{old_cbz_path} ==> {cbz_path} 已复制") + chapter.status = "downloaded" + down_chapter.append(chapter) + return down_chapter + except Exception as e: + if isinstance(e, (ParseError, SiteError)): + raise + raise ParseError(f"解析章节列表失败: {str(e)}") + + async def update_covers(self, manga_info : MangaInfo) -> AsyncGenerator[Dict, None]: + """更新Icons文件夹内Cover逻辑""" + cache_cover = { 'path' : str(DirectoryNaming.manga_cover_dir(manga_info, cache=True)) } + cover_img = { 'path' : str(DirectoryNaming.manga_cover_dir(manga_info, cache=False)) } + cache_cover_item = CoverItem(**cache_cover) + icons_dir = os.path.dirname(cover_img['path']) + if not os.path.exists(icons_dir): os.makedirs(icons_dir) + list_cover = [] + is_update = 0 + try: + for file in os.listdir(icons_dir): + if file.lower().endswith(".jpg"): + file_cover = {'path' : os.path.join(icons_dir, file)} + f_item = CoverItem(**file_cover) + list_cover.append(f_item) + if f_item.md5 == cache_cover_item.md5: is_update += 1 + if is_update == 0: + new_cover = { 'path' : FileNaming.cover_format_path(cover_img["path"]) } + shutil.copy(cache_cover["path"], new_cover["path"]) + list_cover.append(CoverItem(**new_cover)) + except Exception: + raise exit("Cover 检测异常") + return list_cover + + async def update_cbz_covers(self, manga_info : MangaInfo): + """更新CBZ漫画的Cover""" + cbz_dir = DirectoryNaming().chapter_cbz_dir(manga_info=manga_info) + list_cbz = list(FileNaming().get_filenames_optimized(cbz_dir, ext_filter=[".cbz"])) + + list_cover = await self.update_covers(manga_info) + for cbz_path in list_cbz: + first_cover_path = str(cbz_path).split(".")[0]+".jpg" + if len(list_cover) == 1: + shutil.copy(list_cover[0].path, first_cover_path) + logger.info(f"{list_cover[0].path} ==> {first_cover_path} 已复制") + continue + cover_count = 1 + for cover in list_cover: + cover_path = cover.path + if os.path.exists(first_cover_path): os.remove(first_cover_path) + new_cover_path = FileNaming().cover_format_path(str(cbz_path).split(".")[0]+".jpg", count=cover_count) + shutil.copy(cover_path, new_cover_path) + logger.info(f"{cover_path} ==> {new_cover_path} 已复制") + cover_count += 1 + + async def download_manga(self, manga_url: str) -> AsyncGenerator[Dict, None]: + """下载整部漫画""" + try: + # 获取漫画信息 + info = await self.get_manga_info(manga_url) + yield {'type': 'info', 'data': info, 'item': info} + + # 获取章节列表 + chapters = await self.get_chapter_list(info) + yield {'type': 'chapters', 'data': chapters, 'item': info} + + # 下载封面 + yield {'type': 'cover', 'item': info} + covers = await self.update_covers(info) + + # 下载每个章节 + for chapter in chapters: + try: + if chapter.status == "downloaded": + logger.info(f"{chapter.title} 章节已下载") + continue + images = await self.get_chapter_images(chapter.url) + manga_item = MangaItem( + info=info, + covers=covers, + chapter=chapter, + chapter_images=images, + chapters=chapters + ).get_item() + + yield { + 'type': 'chapter', + 'chapter': str(chapter.title), + 'images': images, + 'item': manga_item + } + except Exception as e: + yield { + 'type': 'error', + 'chapter': chapter, + 'error': str(e) + } + continue + + # 所有章节全部下载完后执行 + await self.update_cbz_covers(info) + + except Exception as e: + yield {'type': 'error', 'error': str(e)} + + async def get_manga_list(self, manga_url: str) -> List[Dict[str, str]]: + """获取漫画列表""" + try: + html = await self._get(manga_url) + tree = etree.HTML(html) + return self.extractor.extract_manga_list(tree) + except Exception as e: + if isinstance(e, (ParseError, SiteError)): + raise exit(f"解析漫画信息失败: {str(e)}") + raise ParseError(f"解析漫画信息失败: {str(e)}") \ No newline at end of file diff --git a/src/sites/configs/rouman.py b/src/sites/configs/rouman.py new file mode 100644 index 0000000..a2d913c --- /dev/null +++ b/src/sites/configs/rouman.py @@ -0,0 +1,58 @@ +import base64,re +import zlib +import json +from typing import List, Dict +from lxml import etree +from src.sites.base import BaseSite +from src.common.loader import ConfigLoader +from src.common.extractor import Extractor +from src.common.exceptions import ParseError, SiteError +from src.common.item import Chapter,MangaInfo,ImageItem +from src.common.naming import FileNaming + +class RoumanSite(BaseSite): + def __init__(self): + super().__init__() + self.config = ConfigLoader.load_config('rouman') + self.headers.update(self.config.headers) + self.extractor = Extractor(self.config) + + async def get_chapter_images(self, chapter_url: str) -> List[str]: + """获取章节图片URL列表""" + try: + html = await self._get(chapter_url) + tree = etree.HTML(html) + image_urls_str = [] + for data_json in tree.xpath('//script/text()'): + data_json = data_json.replace('\\', '') + if "imageUrl" in data_json: + image_urls_str = re.findall(r'"imageUrl":"(https?://[^"]+)"', data_json) + # 再次通过获取的XPATH数据解析并保存到ci(ComicItem)中 + # 正则表达式匹配 .jpg 链接 + # 打印提取的 .jpg 链接 + image_urls = [] + count = 0 + for link in image_urls_str: + count += 1 + sr_value = re.search(r'sr:(\d+)', link) + # 打印提取到的 sr: 的值 + if sr_value: + sr = sr_value.group(1) # group(1) 返回第一个捕获组,即数字部分 + else: + print("No match found") + if str(sr) == "1": + de_str = str(link).split("/")[-1].split(".")[0]+"==" + blocks_num = FileNaming.encodeImage(de_str) + image_urls.append(ImageItem(url=link, scramble=sr.replace("0", "False").replace("1", "True"), filename=FileNaming.getFileScrambleImageName(count,blocks_num))) + else: + image_urls.append(ImageItem(url=link, scramble=sr.replace("0", "False").replace("1", "True"), filename=FileNaming.getFileScrambleImageName(count))) + + if not image_urls: + raise ParseError("未找到图片URL") + + return image_urls + + except Exception as e: + if isinstance(e, (ParseError, SiteError)): + raise + raise ParseError(f"解析章节失败: {str(e)}") \ No newline at end of file diff --git a/src/sites/configs/rouman.yml b/src/sites/configs/rouman.yml new file mode 100644 index 0000000..99597b4 --- /dev/null +++ b/src/sites/configs/rouman.yml @@ -0,0 +1,46 @@ +project: rm_comic +name: 肉漫屋 +domain: rouman5.com +base_url: https://rouman5.com + +selectors: + manga_list: + title: '//div[@class="truncate text-foreground"]/text()' + url: '//main//div[@class="grid grid-cols-1 sm:grid-cols-4 md:grid-cols-6 gap-2 sm:gap-4"]//a/@href' + manga_info: + title: '//div[@class="basis-3/5 text-sm sm:text-base"]//div[@class="text-xl text-foreground"]/text()' + author: + selector: '//div[@class="basis-3/5 text-sm sm:text-base"]//span[@class="text-foreground"]/text()' + index: 0 + description: + selector: '//div[@class="my-2 text-foreground text-sm sm:text-base"]/p/text()' + index: 1 + cover: '//div[@class="flex flex-row gap-3 sm:gap-4"]//div[@class="basis-2/5"]/img[@class="rounded"]/@src' + #status: .book-detail dl dt:contains("状态") + dd + tags: + selector: '//div[@class="basis-3/5 text-sm sm:text-base"]//span[@class="text-foreground"]/text()' + index: 3 + # date: '//div[@class="text-gray-500 text-sm mt-2"]/div/text()' + genre: + value: "韩漫" + age_rating: + value: "R18+" + chapter_link: '//div[@class="grid grid-cols-1 sm:grid-cols-2 md:grid-cols-3 gap-2 px-2 py-4"]//a/@href' + chapters_name: '//main//div[@class="text truncate bg-muted p-2 hover:bg-primary/10"]/text()' + + chapter_list: + container: '//main//div[@class="text truncate bg-muted p-2 hover:bg-primary/10"]/text()' + title: text + url: + attribute: '//div[@class="grid grid-cols-1 sm:grid-cols-2 md:grid-cols-3 gap-2 px-2 py-4"]//a/@href' + process: join_base_url + + chapter: + image_data: + pattern: window\[".*?"\]\s*=\s*"([^"]+)" + decrypt: true + process: + - base64_decode + - zlib_decompress + - json_parse + image_url_template: https://i.hamreus.com{path} \ No newline at end of file diff --git a/src/sites/manager.py b/src/sites/manager.py new file mode 100644 index 0000000..465032c --- /dev/null +++ b/src/sites/manager.py @@ -0,0 +1,199 @@ +from pathlib import Path +from typing import Dict, Type, Optional +import logging +from src.config import BASE_DIR +from src.sites.base import BaseSite +from src.sites.configs.rouman import RoumanSite +from src.common.utils import MangaDownloader, CBZUtils +from src.common.naming import DirectoryNaming, FileNaming +from src.common.exceptions import MangaException +from src.common.item import MangaItem, MangaInfo +from src.common.logging import setup_logging +from src.common.ComicInfo import ComicInfoXml + +logger = setup_logging(__name__) + +class MangaManager: + """漫画下载管理器""" + + SITE_MAP: Dict[str, Type[BaseSite]] = { +# 'manhuagui.com': ManhuaguiSite, + 'roum20.xyz': RoumanSite, + 'rouman5.com': RoumanSite, + # 在这里添加更多网站支持 + } + + def __init__(self, base_dir: Path = BASE_DIR): + self.downloader = MangaDownloader(base_dir) + + def get_site_handler(self, url: str) -> Optional[Type[BaseSite]]: + """根据URL获取对应的网站处理器""" + for domain, handler in self.SITE_MAP.items(): + if domain in url: + return handler + return None + + async def process_manga( + self, + url: str, + volume_num: int = 1, + status_callback = None + ): + """处理漫画下载""" + # 获取网站处理器 + site_handler = self.get_site_handler(url) + if not site_handler: + raise MangaException(f"不支持的网站: {url}") + + async with site_handler() as site: + # 下载整部漫画 + async for result in site.download_manga(url): + if result['type'] == 'info': + manga_info = result['data'] + logger.info(f"漫画信息: {manga_info}") + + # 使用 MangaItem 保存数据 + manga_item = MangaItem(info=manga_info, chapters=[]) + manga_name = manga_info.title + + # 创建命名策略 + #self.manga_path = NamingStrategy.manga_volume_path( + # manga_name, + # volume_num=volume_num + #) + #self.manga_filename = NamingStrategy.custom_manga_filename( + # prefix="page", + # digits=3 + #) + + elif result['type'] == 'chapters': + chapters = result['data'] + total = 0 + for chapter in chapters: + if not chapter.status == "downloaded": + total += 1 + total_chapters = total + logger.info(f"找到 {total_chapters} 个章节") + manga_item.chapters.extend(chapters) # 添加章节到 MangaItem + yield { + 'type': 'progress', + 'total_chapters': total_chapters + } + + elif result['type'] == 'cover': + await self.downloader.download_cover(manga_info) + yield { + 'type': 'cover_complete', + 'item': manga_item + } + + elif result['type'] == 'chapter': + manga_item = result['item'] + chapter = manga_item.chapter + # 生成章节图像工作目录 + chapter_dir = DirectoryNaming.chapter_images_dir(manga_info, chapter) + DirectoryNaming.ensure_dir(chapter_dir) + + try: + # 下载章节 + download_result = await self.downloader.download_chapter( + manga_item, + #filename_generator=self.manga_filename, + #path_generator=self.manga_path, + status_callback=status_callback + ) + + # 章节下载完成后处理流程 start + # 下载完成后生成 ComicInfo.xml + if int(download_result['success']) == int(download_result['total']): + cbz_path = FileNaming.chapter_cbz(manga_info, chapter) + # 解密图片 + CBZUtils(cbz_path)._image_deScrambleByPath(chapter_dir) + ComicInfoXml().scrapy_xml_by_json(manga_item.get_comic_info_json(), chapter_dir) + # 打包成 CBZ 文件 + CBZUtils(cbz_path).create_cbz(chapter_dir) + + # 章节下载完成后处理流程 end + + yield { + 'type': 'chapter_complete', + 'chapter': chapter, + 'result': download_result + } + + except Exception as e: + logger.error(f"下载章节 {chapter['title']} 失败: {str(e)}") + yield { + 'type': 'chapter_error', + 'chapter': chapter, + 'error': str(e) + } + + elif result['type'] == 'error': + logger.error(f"错误: {result['error']}") + yield { + 'type': 'error', + 'error': result['error'] + } + + @staticmethod + def print_progress(status): + """打印下载进度""" + progress_bar_length = 30 # 进度条长度 + progress = int(status.progress * progress_bar_length) + bar = '#' * progress + '-' * (progress_bar_length - progress) + print(f"\r下载进度: |{bar}| {status.current}/{status.total} " + f"({status.progress:.1%})", end="") + + async def download_list_manga(self, manga_url: str): + # 获取网站处理器 + list_site_handler = self.get_site_handler(manga_url) + if not list_site_handler: + raise MangaException(f"不支持的网站: {manga_url}") + async with list_site_handler() as site: + manga_list = await site.get_manga_list(manga_url) + for title,url in zip(manga_list.title, manga_list.url): + print(title,url) + logger.info(f"开始下载 漫画: {title}") + logger.info(f"{url}") + await self.download_manga(str(url)) + + @classmethod + async def download_manga(cls, url: str, save_dir: Path = BASE_DIR): + """下载漫画""" + manager = MangaManager(save_dir) + + try: + total_chapters = 0 + completed_chapters = 0 + + async for result in manager.process_manga(url, status_callback=cls.print_progress): + if result['type'] == 'progress': + total_chapters = result['total_chapters'] + logger.info(f"开始下载,共 {total_chapters} 章") + + elif result['type'] == 'chapter_complete': + completed_chapters += 1 + chapter_result = result['result'] + + if chapter_result['failed']: + logger.warning( + f"章节 {result['chapter']} 完成: " + f"{chapter_result['success']}/{chapter_result['total']} 张图片成功, " + f"{chapter_result['failed']} 张失败" + ) + else: + logger.info(f"章节 {result['chapter']} 完成") + + print(f"\n总进度: {completed_chapters}/{total_chapters} 章") + + elif result['type'] == 'chapter_error': + logger.error(f"章节 {result['chapter']} 下载失败: {result['error']}") + + elif result['type'] == 'error': + logger.error(f"下载出错: {result['error']}") + + except MangaException as e: + logger.error(f"下载失败: {str(e)}") + except Exception as e: + logger.error(f"未知错误: {str(e)}") \ No newline at end of file