inital commit

This commit is contained in:
caiwx86 2025-02-04 01:12:15 +08:00
commit 03c578f183
16 changed files with 2557 additions and 0 deletions

8
.gitignore vendored Normal file
View File

@ -0,0 +1,8 @@
.scrapy/*
.vscode/*
.cache/*
.DS_Store
CBZ/*
output/*
downloads/*
/**/__pycache__

19
run.py Normal file
View File

@ -0,0 +1,19 @@
import asyncio
from pathlib import Path
from src.sites.manager import MangaManager
from src.common.logging import setup_logging
logger = setup_logging(__name__)
async def main():
# 配置下载参数
#manga_url = "https://rouman5.com/books/cm693tf2z0170dr07ve0hpa7s"
manga_list_url = "https://rouman5.com/books?continued=true"
# 开始下载
#await MangaManager().download_manga(manga_url)
for i in range(0,70):
await MangaManager().download_list_manga(f"{manga_list_url}&page={i}")
if __name__ == "__main__":
asyncio.run(main())

View File

@ -0,0 +1,127 @@
<?xml version="1.0" encoding="utf-8"?>
<xs:schema elementFormDefault="qualified" xmlns:xs="http://www.w3.org/2001/XMLSchema">
<xs:element name="ComicInfo" nillable="true" type="ComicInfo"/>
<xs:complexType name="ComicInfo">
<xs:sequence>
<xs:element minOccurs="0" maxOccurs="1" default="" name="Title" type="xs:string"/>
<xs:element minOccurs="0" maxOccurs="1" default="" name="Series" type="xs:string"/>
<xs:element minOccurs="0" maxOccurs="1" default="" name="Number" type="xs:string"/>
<xs:element minOccurs="0" maxOccurs="1" default="-1" name="Count" type="xs:int"/>
<xs:element minOccurs="0" maxOccurs="1" default="-1" name="Volume" type="xs:int"/>
<xs:element minOccurs="0" maxOccurs="1" default="" name="AlternateSeries" type="xs:string"/>
<xs:element minOccurs="0" maxOccurs="1" default="" name="AlternateNumber" type="xs:string"/>
<xs:element minOccurs="0" maxOccurs="1" default="-1" name="AlternateCount" type="xs:int"/>
<xs:element minOccurs="0" maxOccurs="1" default="" name="Summary" type="xs:string"/>
<xs:element minOccurs="0" maxOccurs="1" default="" name="Notes" type="xs:string"/>
<xs:element minOccurs="0" maxOccurs="1" default="-1" name="Year" type="xs:int"/>
<xs:element minOccurs="0" maxOccurs="1" default="-1" name="Month" type="xs:int"/>
<xs:element minOccurs="0" maxOccurs="1" default="-1" name="Day" type="xs:int"/>
<xs:element minOccurs="0" maxOccurs="1" default="" name="Writer" type="xs:string"/>
<xs:element minOccurs="0" maxOccurs="1" default="" name="Penciller" type="xs:string"/>
<xs:element minOccurs="0" maxOccurs="1" default="" name="Inker" type="xs:string"/>
<xs:element minOccurs="0" maxOccurs="1" default="" name="Colorist" type="xs:string"/>
<xs:element minOccurs="0" maxOccurs="1" default="" name="Letterer" type="xs:string"/>
<xs:element minOccurs="0" maxOccurs="1" default="" name="CoverArtist" type="xs:string"/>
<xs:element minOccurs="0" maxOccurs="1" default="" name="Editor" type="xs:string"/>
<xs:element minOccurs="0" maxOccurs="1" default="" name="Translator" type="xs:string"/>
<xs:element minOccurs="0" maxOccurs="1" default="" name="Publisher" type="xs:string"/>
<xs:element minOccurs="0" maxOccurs="1" default="" name="Imprint" type="xs:string"/>
<xs:element minOccurs="0" maxOccurs="1" default="" name="Genre" type="xs:string"/>
<xs:element minOccurs="0" maxOccurs="1" default="" name="Tags" type="xs:string"/>
<xs:element minOccurs="0" maxOccurs="1" default="" name="Web" type="xs:string"/>
<xs:element minOccurs="0" maxOccurs="1" default="0" name="PageCount" type="xs:int"/>
<xs:element minOccurs="0" maxOccurs="1" default="" name="LanguageISO" type="xs:string"/>
<xs:element minOccurs="0" maxOccurs="1" default="" name="Format" type="xs:string"/>
<xs:element minOccurs="0" maxOccurs="1" default="Unknown" name="BlackAndWhite" type="YesNo"/>
<xs:element minOccurs="0" maxOccurs="1" default="Unknown" name="Manga" type="Manga"/>
<xs:element minOccurs="0" maxOccurs="1" default="" name="Characters" type="xs:string"/>
<xs:element minOccurs="0" maxOccurs="1" default="" name="Teams" type="xs:string"/>
<xs:element minOccurs="0" maxOccurs="1" default="" name="Locations" type="xs:string"/>
<xs:element minOccurs="0" maxOccurs="1" default="" name="ScanInformation" type="xs:string"/>
<xs:element minOccurs="0" maxOccurs="1" default="" name="StoryArc" type="xs:string"/>
<xs:element minOccurs="0" maxOccurs="1" default="" name="StoryArcNumber" type="xs:string"/>
<xs:element minOccurs="0" maxOccurs="1" default="" name="SeriesGroup" type="xs:string"/>
<xs:element minOccurs="0" maxOccurs="1" default="Unknown" name="AgeRating" type="AgeRating"/>
<xs:element minOccurs="0" maxOccurs="1" name="Pages" type="ArrayOfComicPageInfo"/>
<xs:element minOccurs="0" maxOccurs="1" name="CommunityRating" type="Rating"/>
<xs:element minOccurs="0" maxOccurs="1" default="" name="MainCharacterOrTeam" type="xs:string"/>
<xs:element minOccurs="0" maxOccurs="1" default="" name="Review" type="xs:string"/>
<xs:element minOccurs="0" maxOccurs="1" default="" name="GTIN" type="xs:string"/>
</xs:sequence>
</xs:complexType>
<xs:simpleType name="YesNo">
<xs:restriction base="xs:string">
<xs:enumeration value="Unknown"/>
<xs:enumeration value="No"/>
<xs:enumeration value="Yes"/>
</xs:restriction>
</xs:simpleType>
<xs:simpleType name="Manga">
<xs:restriction base="xs:string">
<xs:enumeration value="Unknown"/>
<xs:enumeration value="No"/>
<xs:enumeration value="Yes"/>
<xs:enumeration value="YesAndRightToLeft"/>
</xs:restriction>
</xs:simpleType>
<xs:simpleType name="Rating">
<xs:restriction base="xs:decimal">
<xs:minInclusive value="0"/>
<xs:maxInclusive value="5"/>
<xs:fractionDigits value="1"/>
</xs:restriction>
</xs:simpleType>
<xs:simpleType name="AgeRating">
<xs:restriction base="xs:string">
<xs:enumeration value="Unknown"/>
<xs:enumeration value="Adults Only 18+"/>
<xs:enumeration value="Early Childhood"/>
<xs:enumeration value="Everyone"/>
<xs:enumeration value="Everyone 10+"/>
<xs:enumeration value="G"/>
<xs:enumeration value="Kids to Adults"/>
<xs:enumeration value="M"/>
<xs:enumeration value="MA15+"/>
<xs:enumeration value="Mature 17+"/>
<xs:enumeration value="PG"/>
<xs:enumeration value="R18+"/>
<xs:enumeration value="Rating Pending"/>
<xs:enumeration value="Teen"/>
<xs:enumeration value="X18+"/>
</xs:restriction>
</xs:simpleType>
<xs:complexType name="ArrayOfComicPageInfo">
<xs:sequence>
<xs:element minOccurs="0" maxOccurs="unbounded" name="Page" nillable="true" type="ComicPageInfo"/>
</xs:sequence>
</xs:complexType>
<xs:complexType name="ComicPageInfo">
<xs:attribute name="Image" type="xs:int" use="required"/>
<xs:attribute default="Story" name="Type" type="ComicPageType"/>
<xs:attribute default="false" name="DoublePage" type="xs:boolean"/>
<xs:attribute default="0" name="ImageSize" type="xs:long"/>
<xs:attribute default="" name="Key" type="xs:string"/>
<xs:attribute default="" name="Bookmark" type="xs:string"/>
<xs:attribute default="-1" name="ImageWidth" type="xs:int"/>
<xs:attribute default="-1" name="ImageHeight" type="xs:int"/>
</xs:complexType>
<xs:simpleType name="ComicPageType">
<xs:list>
<xs:simpleType>
<xs:restriction base="xs:string">
<xs:enumeration value="FrontCover"/>
<xs:enumeration value="InnerCover"/>
<xs:enumeration value="Roundup"/>
<xs:enumeration value="Story"/>
<xs:enumeration value="Advertisement"/>
<xs:enumeration value="Editorial"/>
<xs:enumeration value="Letters"/>
<xs:enumeration value="Preview"/>
<xs:enumeration value="BackCover"/>
<xs:enumeration value="Other"/>
<xs:enumeration value="Deleted"/>
</xs:restriction>
</xs:simpleType>
</xs:list>
</xs:simpleType>
</xs:schema>

407
src/common/ComicInfo.py Normal file
View File

@ -0,0 +1,407 @@
import xml.etree.ElementTree as ET
from xml.dom import minidom
from typing import List
import os
from lxml import etree
from src.config import XSD_FILE
from src.common.logging import setup_logging
import logging
from zipfile import ZipFile
from pathlib import Path
import re
import requests
from urllib.parse import urlparse
from PIL import Image
from concurrent.futures import ThreadPoolExecutor
import hashlib
from io import BytesIO
logger = setup_logging(__name__)
class ImageInfo:
from src.config import BASE_DIR
def _image_path(self, comicinfo, filename):
"""生成章节目录"""
if filename:
return os.path.join(self.BASE_DIR,"images",f"{comicinfo.name}", comicinfo.chapter, filename)
def get_image_size(self, image_path: str, human_readable: bool = False) -> str:
"""
获取图片的字节大小支持本地路径和网络URL
参数
- image_path: 图片路径或URL
- human_readable: 是否返回可读格式 KB/MB
返回
- 字符串形式的字节大小或可读格式
示例
>>> get_image_size("photo.jpg")
'245.76 KB'
>>> get_image_size("http://example.com/image.png", human_readable=False)
'1024000'
"""
def convert_size(size_bytes: int) -> str:
"""将字节转换为可读格式"""
for unit in ['B', 'KB', 'MB', 'GB']:
if size_bytes < 1024.0:
return f"{size_bytes:.2f} {unit}"
size_bytes /= 1024.0
return f"{size_bytes:.2f} TB"
try:
# 判断是否为网络资源
if urlparse(str(image_path)).scheme in ('http', 'https'):
# 方法1通过HEAD请求获取大小可能不准确
response = requests.head(image_path, timeout=5)
if 'Content-Length' in response.headers:
size = int(response.headers['Content-Length'])
# 方法2完整下载获取准确大小推荐
else:
response = requests.get(image_path, stream=True, timeout=10)
response.raise_for_status()
size = len(response.content)
else:
# 本地文件处理
file_path = Path(image_path)
if not file_path.exists():
raise FileNotFoundError(f"文件不存在: {image_path}")
size = os.path.getsize(file_path)
return convert_size(size) if human_readable else str(size)
except requests.exceptions.RequestException as e:
raise ValueError(f"网络请求失败: {str(e)}")
except Exception as e:
raise RuntimeError(f"获取大小失败: {str(e)}")
def get_image_hash_advanced(self,
source: str,
hash_type: str = "md5",
is_url: bool = False
) -> str:
"""
高级版图片哈希生成支持多种输入源
参数
- source: 输入源文件路径/URL/二进制数据/BytesIO
- hash_type: 哈希类型md5/sha1/sha256
- is_url: source URL 字符串时需设置为 True
返回
- 十六进制字符串形式的哈希值
"""
hash_type = hash_type.lower()
valid_hashes = ["md5", "sha1", "sha256"]
if hash_type not in valid_hashes:
raise ValueError(f"不支持的哈希类型,可选值:{valid_hashes}")
hash_func = hashlib.new(hash_type)
# 处理不同输入类型
if isinstance(source, bytes):
hash_func.update(source)
elif isinstance(source, BytesIO):
source.seek(0)
while chunk := source.read(4096):
hash_func.update(chunk)
elif is_url:
response = requests.get(source, stream=True)
response.raise_for_status()
for chunk in response.iter_content(4096):
hash_func.update(chunk)
else: # 视为文件路径
with open(source, "rb") as f:
while chunk := f.read(4096):
hash_func.update(chunk)
return hash_func.hexdigest()
def get_image_metadata(self,image_path: str):
"""获取完整图片信息"""
page = ComicPageInfo()
image_name = os.path.basename(image_path)
size = self.get_image_size(image_path)
page.Image = image_name.split(".")[0].split("_")[-1]
page.ImageSize = size
page.Key = self.get_image_hash_advanced(image_path)
try:
with Image.open(image_path) as img:
ImageWidth, ImageHeight = zip(img.size)
page.ImageWidth, page.ImageHeight = [ImageWidth[0], ImageHeight[0]]
#return {
# "format": img.format,
# "mode": img.mode,
# "size_px": img.size, # (width, height)
# "file_size": size
#}
return page
except Exception as e:
raise RuntimeError(f"读取图片信息失败: {str(e)}")
def get_image_metadata_from_zip(self, zip_path: str, chunk_size: int = 4096) -> list:
"""
ZIP 文件中读取图片的元数据无需解压整个文件
参数
- zip_path: ZIP 文件路径
- chunk_size: 读取的字节数用于解析图片头部信息
返回
- 包含图片元数据的列表每个元素格式
{
"filename": 文件名,
"compressed_size": 压缩后大小字节,
"original_size": 原始大小字节,
"format": 图片格式,
"width": 宽度像素,
"height": 高度像素
}
"""
pages = []
with ZipFile(zip_path, 'r') as zf:
for file_info in zf.infolist():
# 仅处理常见图片格式
if not file_info.filename.lower().endswith(
('.png', '.jpg', '.jpeg', '.gif', '.bmp')
):
continue
try:
with zf.open(file_info) as file:
# 读取前 chunk_size 字节用于解析元数据
img_header = file.read(chunk_size)
# 将数据包装为文件流
img_buffer = BytesIO(img_header)
page = ComicPageInfo()
page.Key = self.get_image_hash_advanced(img_buffer)
# 使用 Pillow 解析图像信息
with Image.open(img_buffer) as img:
page.Image = file_info.filename.split(".")[0]
page.ImageSize = file_info.file_size
ImageWidth, ImageHeight = zip(img.size)
page.ImageWidth, page.ImageHeight = [ImageWidth[0], ImageHeight[0]]
#metadata = {
# "filename": file_info.filename,
# "compressed_size": file_info.compress_size,
# "original_size": file_info.file_size,
# "format": img.format,
# "width": img.width,
# "height": img.height
#}
pages.append(page)
except Exception as e:
print(f"解析失败 [{file_info.filename}]: {str(e)}")
return pages
# Define the ComicInfo and ComicPageInfo classes
class ComicInfo:
def __init__(self):
self.Title: str = ""
"""标题"""
self.Series: str = ""
self.Number: str = ""
self.Count: int = -1
self.Volume: int = -1
self.AlternateSeries: str = ""
self.AlternateNumber: str = ""
self.AlternateCount: int = -1
self.Summary: str = ""
self.Notes: str = ""
self.Year: int = -1
self.Month: int = -1
self.Day: int = -1
self.Writer: str = ""
self.Penciller: str = ""
self.Inker: str = ""
self.Colorist: str = ""
self.Letterer: str = ""
self.CoverArtist: str = ""
self.Editor: str = ""
self.Publisher: str = ""
self.Imprint: str = ""
self.Genre: str = ""
self.Tags: str = ""
self.Web: str = ""
self.PageCount: int = -1
self.LanguageISO: str = ""
self.Format: str = ""
self.BlackAndWhite: str = ""
self.Manga: str = ""
self.Characters: str = ""
self.Teams: str = ""
self.Locations: str = ""
self.ScanInformation: str = ""
self.StoryArc: str = ""
self.SeriesGroup: str = ""
self.AgeRating: str = ""
self.Pages: List[ComicPageInfo] = []
class ComicPageInfo:
def __init__(self):
self.Image: int = -1
self.Type: str = "Story"
self.DoublePage: bool = False
self.ImageSize: int = -1
self.Key: str = ""
self.Bookmark: str = ""
self.ImageWidth: int = -1
self.ImageHeight: int = -1
def toString(self):
data = {}
def add(key, value):
if value != -1 and value != "": data[key] = str(value)
add("Image", self.Image)
add("ImageSize", self.ImageSize)
add("Key", self.Key)
add("ImageWidth", self.ImageWidth)
add("ImageHeight", self.ImageHeight)
return data
class ComicInfoXml:
def _save_xml_to_file(self, xml_string, filename):
"""
Save the XML string to a file
"""
base_dir = os.path.dirname(filename)
if not os.path.exists(base_dir): os.makedirs(base_dir)
with open(filename, "w", encoding="utf-8") as file:
file.write(xml_string)
logger.info(f"ComicInfo.xml 生成成功 {filename}")
def _validate_xml_with_xsd_file(self, xml_file, xsd_file, remove=True):
"""
Validate the XML file against the XSD file
"""
xml_doc = etree.parse(xml_file)
with open(xsd_file, 'r', encoding="utf-8") as file:
xsd_doc = etree.XMLSchema(etree.parse(file))
try:
xsd_doc.assertValid(xml_doc)
logger.info(f"ComicInfo.xml 通过 XSD 验证成功 {xml_file}")
except etree.DocumentInvalid as e:
logger.error(f"ComicInfo.xml 通过 XSD 验证失败 {xml_file}")
if remove:
os.remove(xml_file)
def get_page_count(self, zip_file: Path):
"""获取 ComicInfo.xml 文件中的 <PageCount> 标签值"""
# 打开ZIP文件
with ZipFile(str(zip_file), 'r') as z:
# 假设ZIP中的文件名是'text.txt'
with z.open('ComicInfo.xml', 'r') as file:
# 从文件流中解析 XML 数据
file_string = file.read().decode("utf-8")
# 使用正则表达式提取 <PageCount> 标签中的值
match = re.search(r"<PageCount>(\d+)</PageCount>", file_string)
if match:
page_count = match.group(1)
logger.info(f"zip_file={zip_file} PageCount: {page_count}")
return page_count
def _parse_comicinfo(self, comic: ComicInfo, save_dir=None, xml_filename="ComicInfo.xml", xsd_filename="ComicInfo.xsd"):
"""_summary_
Args:
comic (ComicInfo): _description_
save_dir (_type_, optional): _description_. Defaults to None.
xml_filename (str, optional): _description_. Defaults to "ComicInfo.xml".
xsd_filename (str, optional): _description_. Defaults to "ComicInfo_2.1.xsd".
"""
# Serialize to XML with formatted output
def serialize_comic_info(comic: ComicInfo) -> str:
# Create root element with XML declaration and namespaces
comic_elem = ET.Element('ComicInfo')
comic_elem.set('xmlns:xsd', 'http://www.w3.org/2001/XMLSchema')
comic_elem.set('xmlns:xsi', 'http://www.w3.org/2001/XMLSchema-instance')
# Add subelements and attributes based on presence and requirements
for attr, value in comic.__dict__.items():
# if value or (attr in ['Volume', 'Year', 'Month', 'Day', 'PageCount'] and (value == -1 or value == "" ) ): # Check required attributes
if value == -1 or value == "" or value == None or value == "[]" or value == []:
if attr in self._required_attributes():
raise exit(f"{xml_filename} 缺少必要属性: {attr}")
else:
continue
else:
if attr == 'Pages':
pages_elem = ET.SubElement(comic_elem, 'Pages')
for page in value:
cpi = ComicPageInfo()
cpi.Image = page.Image
cpi.ImageSize = page.ImageSize
cpi.Key = page.Key
cpi.ImageWidth = page.ImageWidth
cpi.ImageHeight = page.ImageHeight
page_elem = ET.SubElement(pages_elem, 'Page', cpi.toString())
else:
ET.SubElement(comic_elem, attr).text = str(value)
# Create a formatted XML string
xml_str = ET.tostring(comic_elem, encoding='utf-8', method='xml')
parsed_xml = minidom.parseString(xml_str)
formatted_xml = parsed_xml.toprettyxml(indent=" ", encoding="utf-8") # Adjust the number of spaces for indentation as needed
# Convert bytes to string and add XML declaration
return formatted_xml.decode('utf-8')
# Serialize the ComicInfo object
serialized_xml = serialize_comic_info(comic)
# 保存数据XML到文件
if save_dir != None: xml_filename = os.path.join(save_dir, xml_filename)
self._save_xml_to_file(serialized_xml, xml_filename)
self._validate_xml_with_xsd_file(xml_filename, xsd_filename) # 将 JSON 转换为 XML
#xml_data = json_to_xml_with_declaration(json_data)
#print(xml_data)
def _required_attributes(self):
return ["Title", "Series", "Number", "PageCount", "Writer"]
def _gen_pageinfo(self, image_names, save_dir):
pages = []
# Adding pages to the comic
for image_name in image_names:
image_name = image_name.split(".")[0].split("_")[-1]+".jpg"
image_path = os.path.join(save_dir, image_name)
page = ImageInfo().get_image_metadata(image_path)
# 图像属性 文件名 大小 长
pages.append(page)
return pages
def scrapy_xml_by_json(self, json_data, save_dir=None, xsd_file=XSD_FILE):
comic = ComicInfo()
comic.Title = json_data.get("chapter", "")
comic.Series = json_data.get("name", "")
comic.Writer = json_data.get("author", "")
comic.AgeRating = json_data.get("age_rating", "")
comic.Tags = json_data.get("tags", "")
comic.Summary = json_data.get("description", "")
comic.Genre = json_data.get("genre", "")
comic.Number = json_data.get("number", "")
comic.PageCount = json_data.get("page_count", "")
comic.Writer = json_data.get("author", "")
image_names = json_data.get("images", "")
#pages = []
pages = self._gen_pageinfo(image_names=image_names, save_dir=save_dir)
for page in pages:
comic.Pages.append(page)
# Adding pages to the comic
#for image_name in image_names:
# page = ComicPageInfo()
# page.Image = image_name.split(".")[0].split("_")[-1]
# pages.append(page.Image)
# comic.Pages.append(page)
self._parse_comicinfo(comic, save_dir=save_dir, xsd_filename=xsd_file)
return pages

25
src/common/exceptions.py Normal file
View File

@ -0,0 +1,25 @@
"""异常定义"""
class MangaException(Exception):
"""漫画下载相关异常的基类"""
pass
class NetworkError(MangaException):
"""网络相关错误"""
pass
class ParseError(MangaException):
"""解析错误"""
pass
class ConfigError(MangaException):
"""配置错误"""
pass
class DownloadError(MangaException):
"""下载错误"""
pass
class SiteError(MangaException):
"""网站特定错误"""
pass

220
src/common/extractor.py Normal file
View File

@ -0,0 +1,220 @@
"""数据提取工具"""
from typing import Any, Dict, List, Optional, Union
import re
from lxml import etree
from src.common.exceptions import ParseError
from src.common.loader import SiteConfig
from src.common.item import MangaItem,ListManga, MangaInfo, Chapter # 导入模型
class SelectorProcessor:
"""选择器处理器"""
@staticmethod
def select(tree: etree._Element, selector: str, index: int = -1) -> List[etree._Element]:
"""XPath选择器"""
elements = tree.xpath(selector)
len_elements = len(elements)
try:
if len_elements == 0:
raise ParseError(f"无法找到元素: {selector}")
elif len_elements == 1:
return elements[0]
elif len_elements > 1 and index > -1:
return elements[index]
else:
return elements
except Exception as e:
return None
@staticmethod
def select_one(tree: etree._Element, selector: str) -> Optional[etree._Element]:
"""XPath选择器(单个)"""
elements = tree.xpath(selector)
return elements[0] if elements else None
@staticmethod
def get_text(text: str):
"""获取文本"""
return text.strip() if text is not None else ''
@staticmethod
def get_attribute(element: etree._Element, attr: str) -> str:
"""获取属性"""
result = element.get(attr, '')
if isinstance(result, str):
return element.get(attr, '').strip() if element is not None else ''
return result
@staticmethod
def join_base_url(url: str, base_url: str) -> str:
"""拼接基础URL"""
if url.startswith('http'):
return url
return f"{base_url.rstrip('/')}/{url.lstrip('/')}"
@staticmethod
def extract_pattern(text: str, pattern: str) -> Optional[str]:
"""提取正则匹配"""
match = re.search(pattern, text)
return match.group(1) if match else None
class Extractor:
"""数据提取器"""
def __init__(self, config: SiteConfig):
self.config = config
self.processor = SelectorProcessor()
def extract_manga_list(self, tree: etree._Element) -> ListManga:
"""提取漫画信息并返回 MangaInfo 实例"""
selectors = self.config.get_selector('manga_list')
info_data = {}
for key, selector in selectors.items():
if isinstance(selector, str):
element = self.processor.select(tree, selector)
if element:
if isinstance(element, str):
info_data[key] = self.processor.get_text(element)
else:
info_data[key] = element
return ListManga(**info_data)
def extract_manga_info(self, tree: etree._Element) -> MangaInfo:
"""提取漫画信息并返回 MangaInfo 实例"""
selectors = self.config.get_selector('manga_info')
info_data = {}
info_data['project'] = self.config.project
info_data['base_url'] = self.config.base_url
for key, selector in selectors.items():
if isinstance(selector, str):
element = self.processor.select(tree, selector)
if element:
if isinstance(element, str):
info_data[key] = self.processor.get_text(element)
else:
info_data[key] = element
elif isinstance(selector, dict):
if 'value' in selector:
info_data[key] = selector.get('value')
continue
element = self.processor.select(tree, selector['selector'], selector.get('index', -1))
if element:
if 'attribute' in selector:
value = self.processor.get_attribute(element, selector['attribute'])
else:
value = self.processor.get_text(element)
if 'process' in selector:
if selector['process'] == 'join_base_url':
value = self.processor.join_base_url(value, self.config.base_url)
info_data[key] = value
# 创建 MangaInfo 实例
return MangaInfo(**info_data) # 使用解包操作符将字典传递给模型
def extract_chapter_list(self, tree: etree._Element) -> List[Chapter]:
"""提取章节列表并返回 Chapter 实例列表"""
selector_config = self.config.get_selector('chapter_list')
elements = self.processor.select(tree, selector_config['container'])
urls = self.processor.select(tree, selector_config['attribute'])
chapters = []
result = {elements[i]: urls[i] for i in range(len(elements))}
for element in elements:
chapter_data = {}
if selector_config['title'] == 'text':
chapter_data['title'] = self.processor.get_text(element)
url_config = selector_config['url']
url = self.processor.get_attribute(element, url_config['attribute'])
if url_config.get('process') == 'join_base_url':
url = self.processor.join_base_url(url, self.config.base_url)
chapter_data['url'] = url
# 创建 Chapter 实例
chapters.append(Chapter(**chapter_data)) # 使用解包操作符将字典传递给模型
return chapters
def extract_chapter_images(self, html: str) -> List[str]:
"""提取章节图片"""
config = self.config.get_selector('chapter')
data = self._extract_encrypted_data(html, config['image_data'])
return self._build_image_urls(data, config['image_url_template'])
def _extract_data(self, tree: etree._Element, selectors: Dict) -> Dict[str, str]:
"""通用数据提取"""
data = {}
for key, selector in selectors.items():
if isinstance(selector, str):
element = tree.xpath(selector)
if element:
data[key] = element[0].text.strip()
elif isinstance(selector, dict):
data[key] = self._process_complex_selector(tree, selector)
return data
def _extract_list(self, tree: etree._Element, config: Dict) -> List[Dict[str, str]]:
"""提取列表数据"""
items = []
elements = tree.xpath(config['container'])
seen_titles = set() # 用于跟踪已提取的标题
for element in elements:
item = {}
if config['title'] == 'text':
title = element.text.strip()
if title not in seen_titles: # 检查标题是否已存在
item['title'] = title
seen_titles.add(title) # 标记为已提取
url = element.get(config['url']['attribute'], '')
if config['url'].get('process') == 'join_base_url':
url = self._join_url(url)
item['url'] = url
items.append(item)
return items
def _extract_encrypted_data(self, html: str, config: Dict) -> Any:
"""提取并解密数据"""
pattern = config['pattern']
match = re.search(pattern, html)
if not match:
raise ParseError("无法找到数据")
data = match.group(1)
if config.get('decrypt'):
data = self._decrypt_data(data, config['process'])
return data
def _decrypt_data(self, data: str, steps: List[str]) -> Any:
"""数据解密"""
import base64
import zlib
import json
result = data
for step in steps:
if step == 'base64_decode':
result = base64.b64decode(result)
elif step == 'zlib_decompress':
result = zlib.decompress(result).decode('utf-8')
elif step == 'json_parse':
result = json.loads(result)
return result
def _join_url(self, path: str) -> str:
"""拼接URL"""
if path.startswith('http'):
return path
return f"{self.config.base_url.rstrip('/')}/{path.lstrip('/')}"
def _build_image_urls(self, data: Dict, template: str) -> List[str]:
"""构建图片URL列表"""
urls = []
for file in data.get('files', []):
urls.append(template.format(path=file))
return urls

234
src/common/item.py Normal file
View File

@ -0,0 +1,234 @@
from pydantic import BaseModel, HttpUrl, field_validator, model_validator
from typing import List, Optional
from opencc import OpenCC
import re,os
from src.common.ComicInfo import ImageInfo
class FileNaming:
@classmethod
def chinese_convert(cls, text,convert='t2s'): return OpenCC(convert).convert(str(text))
#处理成符合规定的文件名
@classmethod
def fix_file_name(cls, filename, replace=None):
if not isinstance(filename, str):
return filename
in_tab = r'[?*/\|.:><]'
str_replace = ""
if replace is not None:
str_replace = replace
filename = re.sub(in_tab, str_replace, filename)
count = 1
while True:
str_file = filename[0-count]
if str_file == " ":
count += 1
else:
filename = filename[0:len(filename)+1-count]
break
return filename
@classmethod
def chinese_file_name(cls, name): return cls.fix_file_name(cls.chinese_convert(name))
class ImageItem(BaseModel):
url: HttpUrl
scramble: bool
filename: str
class CoverItem(BaseModel):
name: Optional[str] = ""
"""文件名"""
url: HttpUrl = ""
"""下载链接"""
path: Optional[str] = ""
"""文件路径"""
size: Optional[int] = 0
"""文件大小"""
md5: Optional[str] = ""
"""文件MD5"""
@model_validator(mode="after")
def validate(self):
if self.path != "":
self.name = os.path.basename(self.path)
self.md5 = ImageInfo().get_image_hash_advanced(self.path)
self.size = ImageInfo().get_image_size(self.path)
return self
class Chapter(BaseModel):
title: str
@field_validator('title', mode='before')
def validate_url(cls, v):
return FileNaming.chinese_file_name(v)
url: HttpUrl
# downloaded
status: Optional[str] = ""
#images: List[ImageItem] = []
class ListManga(BaseModel):
title: List[str]
url: List[HttpUrl]
@field_validator('url', mode='before')
def validate_url(cls, v):
list_url = []
for url in v:
if isinstance(url, str) and not url.startswith('http'):
list_url.append(HttpUrl("https://rouman5.com" + url))
return list_url
class MangaInfo(BaseModel):
project: str
"""漫画项目名称"""
base_url: str = ""
"""漫画网站域名"""
@field_validator('base_url', mode='before')
def validate_base_url(cls, v):
cls.base_url = v
return v
title: str
"""漫画名称"""
@field_validator('title', mode='before')
def validate_title(cls, v):
return FileNaming.chinese_file_name(v)
author: str
"""漫画作者"""
@field_validator('author', mode='before')
def validate_author(cls, v):
(list_value, value) = [[], str(v).replace("&", " ")]
for val in set(str(value).split(" ")):
list_value.append(val)
return FileNaming.chinese_file_name(",".join(list_value))
description: Optional[str] = None
"""漫画描述"""
@field_validator('description', mode='before')
def validate_description(cls, v):
return FileNaming.chinese_file_name(v)
cover: CoverItem
"""漫画封面"""
@field_validator('cover', mode='before')
def validate_cover(cls, v):
cover_info = {}
if isinstance(v, str) and not v.startswith('http'):
cover_info['url'] = HttpUrl(cls.base_url + v)
return CoverItem(**cover_info)
tags: str = []
"""漫画标签"""
@field_validator('tags', mode='before')
def validate_tags(cls, v):
return FileNaming.chinese_file_name(v)
# date: str
genre: str
"""漫画类型"""
age_rating: str
"""漫画年龄分级"""
chapter_link: List[HttpUrl]
"""章节链接"""
chapters_name: List[str]
"""章节名称"""
#list_chapter: dict[Chapter]
#status: str
#tags: List[str]
@field_validator('chapter_link', mode='before')
def validate_chapter_link(cls, v):
if isinstance(v, str) and not v.startswith('http'):
return [HttpUrl(cls.base_url + v)]
elif isinstance(v, list):
if not v[0].startswith('http'):
return [HttpUrl(cls.base_url + chapter) for chapter in v]
return v
def get_list_chapter(cls):
chapters_name = cls.chapters_name
chapter_link = cls.chapter_link
chapters = []
for name, link in zip(chapters_name, chapter_link):
chapters.append(Chapter(title=name, url=link))
return chapters
#@field_validator('list_chapter', mode='before')
#def validate_list_chapter(cls, v):
# s = cls.chapters_name
# c = cls.chapter_link
# return v
#if isinstance(v, list):
# return [Chapter(**chapter) for chapter in v]
#return v
#@validator('tags', pre=True)
#def validate_tags(cls, v):
# if not isinstance(v, list):
# raise ValueError('tags must be a list')
# return v
class MangaItem(BaseModel):
info: MangaInfo
covers: List[CoverItem] = []
chapter: Chapter = []
chapter_images: List[ImageItem] = []
chapters: List[Chapter] = []
number: int = 0
pages: int = 0
#@field_validator('chapter', mode='before')
#def fix_file_name(cls, v):
# return FileNaming.chinese_file_name(v)
@field_validator('chapters', mode='before')
def validate_chapters(cls, v):
if not isinstance(v, list) or not all(isinstance(chapter, Chapter) for chapter in v):
raise ValueError('chapters must be a list of Chapter instances')
return v
def get_item(cls):
# number 转换
if len(cls.chapters) > 0:
count = 1
for chapter in cls.chapters:
if chapter.title == cls.chapter.title and chapter.url == cls.chapter.url:
cls.number = count
break
count += 1
# pages 转换
if len(cls.chapter_images) > 0: cls.pages = len(cls.chapter_images)
return cls
def get_comic_info_json(cls):
cls.get_item()
filename_list = []
for image in cls.chapter_images:
filename_list.append(image.filename)
return {
"name": cls.info.title,
"chapter": cls.chapter.title,
"author": cls.info.author,
"tags": cls.info.tags,
"images": filename_list,
"description": cls.info.description,
"genre": cls.info.genre,
"age_rating": cls.info.age_rating,
"series": cls.info.title,
"number": cls.number,
"page_count": cls.pages,
}

49
src/common/loader.py Normal file
View File

@ -0,0 +1,49 @@
from pathlib import Path
from typing import Dict, Any
import yaml
import importlib.resources as pkg_resources
from src.common.exceptions import ConfigError
class SiteConfig:
"""网站配置类"""
def __init__(self, config_data: Dict[str, Any]):
self.project = config_data['project']
self.name = config_data['name']
self.domain = config_data['domain']
self.base_url = config_data['base_url']
self.headers = config_data.get('headers', {})
self.selectors = config_data['selectors']
def get_selector(self, *keys) -> Any:
"""获取选择器配置"""
value = self.selectors
for key in keys:
if not isinstance(value, dict) or key not in value:
raise ConfigError(f"无效的选择器路径: {'.'.join(keys)}")
value = value[key]
return value
def get_base_url(self):
return self.base_url
class ConfigLoader:
"""配置加载器"""
_configs: Dict[str, SiteConfig] = {}
@classmethod
def load_config(cls, site_name: str) -> SiteConfig:
"""加载网站配置"""
if site_name in cls._configs:
return cls._configs[site_name]
try:
# 从包资源中读取配置文件
config_text = Path('src/sites/configs', f'{site_name}.yml').read_text()
config_data = yaml.safe_load(config_text)
config = SiteConfig(config_data)
cls._configs[site_name] = config
return config
except Exception as e:
raise ConfigError(f"加载配置文件失败 {site_name}: {str(e)}")

11
src/common/logging.py Normal file
View File

@ -0,0 +1,11 @@
"""日志配置"""
import logging
# 日志格式
LOG_FORMAT = '%(asctime)s - %(levelname)s - %(message)s'
LOG_LEVEL = logging.INFO
def setup_logging(name: str = None) -> logging.Logger:
"""配置日志"""
logging.basicConfig(level=LOG_LEVEL, format=LOG_FORMAT)
return logging.getLogger(name or __name__)

256
src/common/naming.py Normal file
View File

@ -0,0 +1,256 @@
from pathlib import Path
from datetime import datetime
from typing import Callable
import base64,hashlib,os,re
from src.config import BASE_DIR,CBZ_DIR,OLD_CBZ_DIR
from src.common.item import MangaInfo,MangaItem
from typing import Generator, Union, List, Optional
PREFIX_SCRAMBLE = "scramble="
class DirectoryNaming:
"""目录命名策略类"""
def ensure_dir(directory: Path):
"""确保目录存在"""
directory.mkdir(parents=True, exist_ok=True)
@classmethod
def chapter_images_dir(cls, manga_info: MangaInfo, chapter: str, filename: str = None) -> Path:
"""生成章节目录"""
if filename:
return Path(BASE_DIR,f"{manga_info.project}","images",f"{manga_info.title}",chapter.title, filename)
else:
return Path(BASE_DIR,f"{manga_info.project}","images",f"{manga_info.title}",chapter.title)
@classmethod
def chapter_cbz_dir(cls, manga_info: MangaInfo) -> Path:
"""生成章节CBZ文件目录"""
return Path(CBZ_DIR,f"{manga_info.project}",f"{manga_info.title}")
@classmethod
def manga_cover_dir(cls, manga_item: MangaItem) -> Path:
"""生成漫画封面目录"""
return Path(BASE_DIR,f"{manga_item.info.project}","icons",f"{manga_item.info.title}",f"{manga_item.info.title}.jpg")
@classmethod
def manga_cover_dir(cls, manga_info: MangaInfo, cache: bool = True, is_dir: bool = False) -> Path:
"""生成漫画封面目录"""
path = ""
if cache:
path = Path(BASE_DIR,f"{manga_info.project}","icons",".cache")
else:
path = Path(BASE_DIR,f"{manga_info.project}","icons",f"{manga_info.title}")
if not is_dir:
path = os.path.join(path, f"{manga_info.title}.jpg")
return Path(path)
class FileNaming:
"""文件命名策略类"""
PREFIX_SCRAMBLE = "scramble="
ext = ".jpg"
@classmethod
def chapter_cbz(cls, manga_info: MangaInfo, chapter: str) -> Path:
"""生成章节CBZ文件目录"""
return Path(CBZ_DIR,f"{manga_info.project}",f"{manga_info.title}",f"{chapter.title}.cbz")
@classmethod
def old_chapter_cbz(cls, manga_info: MangaInfo, chapter: str) -> Path:
"""生成章节CBZ文件目录"""
return Path(OLD_CBZ_DIR,f"{manga_info.project}",f"{manga_info.title}",f"{chapter.title}.cbz")
#处理成符合规定的文件名
@classmethod
def fix_file_name(cls, filename, replace=None):
if not isinstance(filename, str):
return filename
in_tab = r'[?*/\|.:><]'
str_replace = ""
if replace is not None:
str_replace = replace
filename = re.sub(in_tab, str_replace, filename)
count = 1
while True:
str_file = filename[0-count]
if str_file == " ":
count += 1
else:
filename = filename[0:len(filename)+1-count]
break
return filename
@classmethod
def default_filename(cls,url: str, idx: int) -> str:
"""默认文件名生成器:使用数字序号"""
#from ..utils import get_file_extension
#ext = get_file_extension(url)
return f"{idx:03d}{cls.ext}"
@staticmethod
def default_path(base_dir: Path, chapter_name: str, filename: str) -> Path:
"""默认路径生成器:直接在章节目录下"""
return base_dir / chapter_name / filename
@classmethod
def getFileScrambleImageName(cls,count,block=None,suffix=".jpg"):
if block:
return cls.PREFIX_SCRAMBLE+str(block)+"_"+"{:0>3d}".format(count)+suffix
else:
return "{:0>3d}".format(count)+suffix
@classmethod
def getFileScrambleImageSave(cls,img_path):
base_dir = os.path.dirname(img_path)
file_name = os.path.basename(img_path)
if file_name.startswith(cls.PREFIX_SCRAMBLE):
file_name = file_name.split("_")[-1]
return os.path.join(base_dir,file_name)
# 解密切片
@classmethod
def encodeImage(cls,str_en):
#print("en",str_en)
enc = base64.b64decode(str_en)
#print("解密:",enc)
m = hashlib.md5()
m.update(enc)
md5 = m.digest()
d = md5[-1]
#print(md5)
try:
blocks = d % 10 + 5
except:
blocks = 0 %10 + 5
#print("blocks=",blocks)
return blocks
@classmethod
def cover_format_path(cls, path, count=0):
if count != 0:
name, suffix = os.path.splitext(path)
new_path = name+"-"+str(count)+suffix
return new_path
if not os.path.exists(path): return path
count = 1
while count:
name, suffix = os.path.splitext(path)
new_path = name+"-"+str(count)+suffix
if not os.path.exists(new_path): return new_path
else: count += 1
@classmethod
def get_filenames_optimized(cls,
folder_path: Union[str, Path],
recursive: bool = False,
ext_filter: Optional[List[str]] = None,
include_hidden: bool = False,
full_path: bool = True,
min_size: Optional[int] = None,
max_size: Optional[int] = None
) -> Generator[str, None, None]:
"""
高性能文件名获取函数优化版
:param folder_path: 目标文件夹路径
:param recursive: 是否递归子目录
:param ext_filter: 扩展名过滤列表 ['.jpg', '.png']不区分大小写
:param include_hidden: 是否包含隐藏文件
:param full_path: 是否返回完整路径
:param min_size: 最小文件大小单位字节
:param max_size: 最大文件大小单位字节
:return: 生成器按需生成符合条件的文件路径
"""
# 路径标准化处理
folder_path = Path(folder_path).resolve()
if not folder_path.is_dir():
raise ValueError(f"无效的目录路径: {folder_path}")
# 预处理扩展名过滤条件
ext_tuple = tuple(ext.lower() for ext in ext_filter) if ext_filter else None
# 主扫描逻辑
def _scandir(path: Path):
with os.scandir(path) as entries:
for entry in entries:
# 跳过无效条目
if not entry.name:
continue
# 处理目录
if entry.is_dir():
if recursive:
# 隐藏目录处理
if not include_hidden and entry.name.startswith('.'):
continue
yield from _scandir(Path(entry.path))
continue
# 处理文件
if not entry.is_file():
continue
# 过滤隐藏文件
if not include_hidden:
if entry.name.startswith('.') or (os.name == 'nt' and entry.is_system()):
continue
# 扩展名过滤
if ext_tuple:
file_ext = Path(entry.name).suffix.lower()
if file_ext not in ext_tuple:
continue
# 文件大小过滤
try:
stat = entry.stat(follow_symlinks=False)
except OSError:
continue
if min_size is not None and stat.st_size < min_size:
continue
if max_size is not None and stat.st_size > max_size:
continue
# 生成结果
yield entry.path if full_path else entry.name
return _scandir(folder_path)
class NamingStrategy:
"""命名策略集合类"""
@staticmethod
def original_filename(url: str, idx: int) -> str:
"""保留原始文件名的生成器"""
from ..utils import get_file_extension
ext = get_file_extension(url)
return f"image_{idx}_original{ext}"
@staticmethod
def date_based_path(base_dir: Path, chapter_name: str, filename: str) -> Path:
"""按日期组织的路径生成器"""
today = datetime.now()
return base_dir / str(today.year) / f"{today.month:02d}" / chapter_name / filename
@staticmethod
def manga_volume_path(
manga_name: str,
volume_num: int
) -> Callable[[Path, str, str], Path]:
"""生成按漫画名和卷号组织的路径生成器"""
def path_generator(base_dir: Path, chapter_name: str, filename: str) -> Path:
return base_dir / manga_name / f"{volume_num:02d}" / chapter_name / filename
return path_generator
@staticmethod
def custom_manga_filename(
prefix: str = "page",
digits: int = 4
) -> Callable[[str, int], str]:
"""生成自定义漫画页面文件名生成器"""
def filename_generator(url: str, idx: int) -> str:
from ..utils import get_file_extension
ext = get_file_extension(url)
return f"{prefix}_{idx:0{digits}d}{ext}"
return filename_generator

646
src/common/utils.py Normal file
View File

@ -0,0 +1,646 @@
import asyncio
import aiohttp
import base64,hashlib,os,shutil,os.path,math
from PIL import Image
import logging,time,os,shutil,re,xmlschema
from pathlib import Path
from typing import List, Optional, Callable, Dict, Any
from src.common.naming import DirectoryNaming
from src.common.naming import FileNaming,PREFIX_SCRAMBLE
from src.config import DEFAULT_HEADERS, CONCURRENT_DOWNLOADS, TIMEOUT, RETRY_TIMES, CACHE_DIR, CACHE_IMAGE_DIR
from src.config import RETRIES, COMIC_INFO_NAME, PROXY_URL, RETRY_PROXY, RETRY_PROXY_TIMES, XSD_FILE, BASE_DIR
from src.common.exceptions import DownloadError
from src.common.item import ImageItem, MangaItem, MangaInfo
from zipfile import ZipFile, ZIP_DEFLATED
from src.common.logging import setup_logging
import logging
from tempfile import NamedTemporaryFile
logger = setup_logging(__name__)
class Cache:
"""缓存类,用于存储和管理网页内容的缓存"""
def __init__(self, cache_dir: Path = CACHE_DIR, expiration_time: int = 3600):
self.cache_dir = cache_dir
self.expiration_time = expiration_time
self.cache_dir.mkdir(exist_ok=True) # 创建缓存目录
def _get_cache_file_path(self, url: str) -> Path:
"""根据 URL 生成缓存文件路径"""
filename = FileNaming.fix_file_name(str(url))
# 以网站 "/" 分离目录
parts = str(url).replace("https://", "").replace("http://", "").split("/") # 按照 "/" 分离 URL
subdir = parts[0] if len(parts) > 2 else "default" # 使用域名作为第一层子目录
hash_dir = hashlib.md5(str(url).encode()).hexdigest()
dir = self.cache_dir / subdir / hash_dir[0:2] / hash_dir[3:5] # 返回多级目录路径
dir.mkdir(parents=True, exist_ok=True)
return dir / filename
def get(self, url: str, type: str = "html") -> str:
"""从缓存中获取 HTML 内容"""
cache_file = self._get_cache_file_path(url)
if cache_file.exists():
# 检查缓存是否过期
if time.time() - cache_file.stat().st_mtime < self.expiration_time:
with open(cache_file, 'r', encoding='utf-8') as f:
return f.read()
elif type == "image":
with open(cache_file, 'rb') as f:
return f.read()
else:
cache_file.unlink() # 删除过期的缓存文件
return None
def get_image(self, url: str) -> bytes:
"""从缓存中获取图片"""
cache_file = self._get_cache_file_path(url)
if cache_file.exists():
# 验证下载的文件是否为有效的图片
if MangaDownloader()._is_valid_image(cache_file):
with open(cache_file, 'rb') as f:
return f.read()
else:
logger.error(f"图像已损坏: {cache_file}")
os.remove(cache_file)
return None
def set(self, url: str, html: str) -> None:
"""将 HTML 内容保存到缓存"""
cache_file = self._get_cache_file_path(url)
with open(cache_file, 'w', encoding='utf-8') as f:
f.write(html)
def set_image(self, url: str, image: bytes) -> None:
"""将图片保存到缓存"""
cache_file = self._get_cache_file_path(url)
with open(cache_file, 'wb') as f:
f.write(image)
class DownloadStatus:
"""下载状态跟踪类,用于记录下载进度"""
def __init__(self, total: int):
self.total = total
self.success = 0
self.failed = 0
self.current = 0
@property
def is_completed(self) -> bool:
"""检查下载是否完成"""
return self.current >= self.total
@property
def progress(self) -> float:
"""计算当前下载进度"""
return self.current / self.total if self.total > 0 else 0
class MangaDownloader:
"""漫画下载器类,负责下载漫画及其相关资源"""
def __init__(self, base_dir: Path = BASE_DIR):
self.connector = aiohttp.TCPConnector(limit_per_host=CONCURRENT_DOWNLOADS)
self.base_dir = Path(base_dir)
self.cache_dir = CACHE_IMAGE_DIR # 缓存目录
self.cache = Cache()
DirectoryNaming.ensure_dir(self.base_dir)
DirectoryNaming.ensure_dir(self.cache_dir) # 创建缓存目录
async def download_cover(self, manga_info: MangaInfo):
"""下载封面"""
cover_item = manga_info.cover
save_path = DirectoryNaming.manga_cover_dir(manga_info)
DirectoryNaming.ensure_dir(save_path.parent)
if os.path.exists(save_path):
print("f".format(save_path))
async with aiohttp.ClientSession(headers=DEFAULT_HEADERS, timeout=aiohttp.ClientTimeout(total=TIMEOUT, connect=TIMEOUT)) as session:
await self.download_image(session,str(cover_item.url), save_path)
async def download_chapter(
self,
manga_item: MangaItem,
semaphore: Optional[asyncio.Semaphore] = None,
status_callback: Optional[Callable[[DownloadStatus], None]] = None
) -> Dict[str, Any]:
"""
下载整个章节的图片
:param image_items: 要下载的图片项列表
:param chapter_name: 章节名称
:param manga_info: 漫画信息
:param semaphore: 限制并发下载的信号量
:param status_callback: 下载状态回调函数
:return: 下载结果统计字典
"""
manga_info = manga_item.info
chapter = manga_item.chapter
image_items = manga_item.chapter_images
if semaphore is None:
semaphore = asyncio.Semaphore(CONCURRENT_DOWNLOADS)
status = DownloadStatus(len(image_items))
failed_items = []
async with aiohttp.ClientSession(headers=DEFAULT_HEADERS, timeout=aiohttp.ClientTimeout(total=TIMEOUT, connect=TIMEOUT)) as session:
tasks = []
for image_item in image_items:
url = str(image_item.url)
save_path = DirectoryNaming.chapter_images_dir(manga_info, chapter, image_item.filename)
DirectoryNaming.ensure_dir(save_path.parent)
task = self._download_with_semaphore(semaphore, session, url, save_path, status, status_callback)
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
# 处理结果
for idx, result in enumerate(results):
if isinstance(result, Exception):
status.failed += 1
failed_items.append(image_items[idx])
logger.error(f"下载失败 {image_items[idx].url}: {str(result)}")
elif result:
status.success += 1
else:
status.failed += 1
failed_items.append(image_items[idx])
result = {
'chapter': chapter,
'total': len(image_items),
'success': status.success,
'failed': status.failed,
'failed_items': failed_items
}
logger.info(f"章节 {chapter.title} 下载完成: {status.success}/{len(image_items)} 张图片成功下载")
return result
async def _download_with_semaphore(
self,
semaphore: asyncio.Semaphore,
session: aiohttp.ClientSession,
url: str,
save_path: Path,
status: DownloadStatus,
callback: Optional[Callable] = None
) -> bool:
async with semaphore:
result = await self.download_image(session, url, save_path)
status.current += 1
if callback:
callback(status)
return result
async def download_image(self, session: aiohttp.ClientSession, url: str, save_path: Path, retries: int = RETRIES, timeout: int = TIMEOUT, use_proxy: bool = RETRY_PROXY) -> bool:
"""下载单个图片,增加重试机制、超时等待和文件缓存机制"""
if os.path.exists(FileNaming.getFileScrambleImageSave(save_path)): # 检查文件是否已存在
logger.info(f"文件已存在,跳过下载: {save_path}")
return True
# 从缓存中获取图片
cached_images = self.cache.get_image(url)
if cached_images:
with open(save_path, 'wb') as f:
f.write(cached_images)
return True
for attempt in range(retries):
try:
timeout_obj = aiohttp.ClientTimeout(total=timeout) # 设置超时
# 如果使用代理,设置代理 URL
if attempt > RETRY_PROXY_TIMES and use_proxy:
logger.info(f"使用代理: {PROXY_URL}")
session_get = session.get(url, timeout=timeout_obj, proxy=PROXY_URL)
else:
session_get = session.get(url, timeout=timeout_obj)
async with session_get as response:
if response.status == 200:
with open(str(save_path)+".downloads", 'wb') as f:
f.write(await response.read())
# 验证下载的文件是否为有效的图片
if self._is_valid_image(str(save_path)+".downloads"):
logger.info(f"成功下载: {url}")
shutil.move(str(save_path)+".downloads", save_path)
self.cache.set_image(url, await response.read())
return True
else:
logger.error(f"下载的文件无效: {save_path}")
return False
else:
logger.error(f"下载失败: {url},状态码: {response.status}")
return False
except asyncio.TimeoutError:
logger.error(f"下载超时: {url},尝试次数: {attempt + 1}")
except Exception as e:
logger.error(f"下载图片时出错: {url},错误: {str(e)}")
if attempt < retries - 1:
logger.info(f"重试下载: {url},尝试次数: {attempt + 2}")
await asyncio.sleep(1) # 等待一段时间再重试
return False
def _is_valid_image(self, file_path: Path) -> bool:
"""验证文件是否为有效的图片"""
try:
from PIL import Image
with Image.open(file_path) as img:
img.verify() # 验证图片
return True
except Exception as e:
logger.error(f"图片验证失败: {file_path},错误: {str(e)}")
return False
class CBZUtils:
def __init__(self, cbz_path: Path):
self.cbz_path = cbz_path
def get_page_count(self):
return self._comic_info_xml_page_count(self.cbz_path)
def _comic_info_xml_page_count(self, zip_file: Path):
"""获取 ComicInfo.xml 文件中的 <PageCount> 标签值"""
# 打开ZIP文件
with ZipFile(str(zip_file), 'r') as z:
try:
# 假设ZIP中的文件名是'text.txt'
with z.open('ComicInfo.xml', 'r') as file:
# 从文件流中解析 XML 数据
file_string = file.read().decode("utf-8")
# 使用正则表达式提取 <PageCount> 标签中的值
match = re.search(r"<PageCount>(\d+)</PageCount>", file_string)
if match:
page_count = match.group(1)
logger.info(f"zip_file={zip_file} PageCount: {page_count}")
return page_count
except Exception as e:
raise exit(f"获取 ComicInfo.xml 文件中的 <PageCount> 标签值失败: {zip_file},错误: {str(e)}")
def _check_zip_file(self, zip_file_path: Path):
"""检查 ZIP 文件是否包含图片"""
result = False
is_comic_info = False
if not os.path.exists(zip_file_path):
logger.info(f"ZIP 文件不存在: {zip_file_path}")
return False
try:
with ZipFile(zip_file_path, 'r') as zip_file:
file_list = zip_file.namelist()
result = any(file_name.endswith('.jpg') for file_name in file_list)
is_comic_info = any(file_name == COMIC_INFO_NAME for file_name in file_list)
if is_comic_info:
page_count = self._comic_info_xml_page_count(zip_file_path)
if len(file_list) == int(page_count) + 1:
logger.info(f"ZIP 文件 {zip_file_path} 验证成功")
result = True
else:
logger.error(f"ZIP 文件 {zip_file_path} 验证失败,文件数量与 ComicInfo.xml 中的 <PageCount> 不一致")
os.remove(zip_file_path)
if not result and os.path.exists(zip_file_path):
logger.error("ZIP 文件中没有图片")
os.remove(zip_file_path)
if not is_comic_info:
logger.error("ZIP 文件中没有 ComicInfo.xml")
os.remove(zip_file_path)
except FileNotFoundError:
logger.info(f"ZIP 文件不存在: {zip_file_path}")
except Exception as e:
logger.error(f"检查 ZIP 文件失败: {zip_file_path},错误: {str(e)}")
if os.path.exists(zip_file_path):
os.remove(zip_file_path)
return result
def _zip_compression(cls, source_dir=None, target_file=None, remove=True):
cls._check_zip_file(target_file)
if not os.path.exists(source_dir):
raise FileNotFoundError(f"打包目标目录不存在: {source_dir}")
# 检查目录中是否存在 .jpg 文件
if not any(file_name.endswith('.jpg') for file_name in os.listdir(source_dir)):
logger.error(f"打包目标目录中不存在图片: {source_dir}")
return False
target_dir = os.path.dirname(target_file)
if not os.path.exists(target_dir): os.makedirs(target_dir)
if not os.path.exists(target_file) and source_dir is not None:
try:
count = 0
filenames = sorted(list(source_dir.glob("*.jpg")) + list(source_dir.glob(COMIC_INFO_NAME)), key=lambda f: f.name) # 对文件名进行排序
with ZipFile(str(target_file), mode='w') as cbz:
for file in filenames:
# 假设图片格式为 JPG 或 ComicInfo.xml
count += 1
print("打包中:" + str(count) + "/" + str(len(filenames)), os.path.join(source_dir, file.name))
cbz.write(file, arcname=file.name)
cbz.close()
logger.info(f"打包完成:{target_file}{count} 个文件")
except Exception as e:
logger.error(f"打包失败: {target_file},错误: {str(e)}")
if os.path.exists(target_file):
os.remove(target_file)
raise e
return cls._check_zip_file(target_file)
def _image_deScrambleByPath(self, chapter_dir: Path):
if os.path.exists(chapter_dir):
dirs = os.listdir(chapter_dir)
for file in dirs:
if file.startswith(PREFIX_SCRAMBLE):
try:
ImageUtils.deScrambleImagesByPath(os.path.join(chapter_dir,file))
except Exception as e:
print(f"删除 {file} 发生错误 {e},已跳过")
return False
def create_cbz(self, chapter_dir: Path):
if os.path.exists(chapter_dir):
dirs = os.listdir(chapter_dir)
for file in dirs:
if file.startswith(PREFIX_SCRAMBLE):
try:
ImageUtils.deScrambleImagesByPath(os.path.join(chapter_dir,file))
except Exception as e:
print(f"删除 {file} 发生错误 {e},已跳过")
return False
if self._zip_compression(source_dir=chapter_dir, target_file=self.cbz_path, remove=False):
logger.info(f"章节 {chapter_dir.name} 打包完成: {self.cbz_path}")
else:
raise exit(f"章节 {chapter_dir.name} 打包失败: {self.cbz_path}")
def update_zip_file(self,zip_path: str, update_files: dict):
"""
不整体解压的情况下更新 ZIP 中的文件
参数
- zip_path: ZIP文件路径
- update_files: 需更新的文件字典 {内部路径: 新文件路径或bytes}
示例
update_zip_file("data.zip", {"config.json": "new_config.json"})
"""
# 创建临时文件
temp_dir = os.path.dirname(zip_path)
with NamedTemporaryFile(dir=temp_dir, delete=False) as tmp_file:
temp_zip_path = tmp_file.name
try:
# 读取原始 ZIP 并创建新 ZIP
with ZipFile(zip_path, 'r') as orig_zip, \
ZipFile(temp_zip_path, 'w', ZIP_DEFLATED) as new_zip:
# 遍历原始 ZIP 中的文件
for orig_info in orig_zip.infolist():
file_name = orig_info.filename
if file_name in update_files:
# 替换目标文件
new_data = update_files[file_name]
if isinstance(new_data, bytes):
new_zip.writestr(file_name, new_data)
else:
new_zip.write(new_data, file_name)
# 保留原始时间戳
new_info = new_zip.getinfo(file_name)
new_info.date_time = orig_info.date_time
else:
# 复制未修改文件
with orig_zip.open(orig_info) as orig_file:
new_zip.writestr(orig_info, orig_file.read())
# 替换原文件
shutil.move(temp_zip_path, zip_path)
finally:
if os.path.exists(temp_zip_path):
os.remove(temp_zip_path)
# 使用示例 ------------------------------
#if __name__ == "__main__":
# 示例1用本地文件替换 ZIP 中的文件
# update_zip_file("archive.zip", {
# "docs/readme.txt": "new_readme.txt" # 本地文件路径
# })
# # 示例2直接写入字节数据
# new_config = b'{"version": 2.0, "active": true}'
# update_zip_file("data.zip", {
# "config.json": new_config # 字节数据
# })
class ImageUtils:
@classmethod
def descramble_images_by_dir(cls, chapter_dir):
if os.path.isfile(chapter_dir):
chapter_dir = os.path.dirname(chapter_dir)
scramble_count = 0
if os.path.exists(chapter_dir): #获取章节图片路径
while PREFIX_SCRAMBLE in os.listdir(chapter_dir):
for img in os.listdir(chapter_dir):
if img.startswith(PREFIX_SCRAMBLE):
cls.encode_scramble_image(os.path.join(chapter_dir, img))
scramble_count += 1
logging.debug(f"{PREFIX_SCRAMBLE} {scramble_count}")
return scramble_count
@classmethod
def deScrambleImagesByPath(cls, img_path, img_save=None):
if os.path.basename(img_path).\
startswith(PREFIX_SCRAMBLE) and os.path.exists(img_path):
img_path = cls.encode_scramble_image(img_path, img_save)
return img_path
@classmethod
def encodeImage(cls,str_en):
#print("en",str_en)
enc = base64.b64decode(str_en)
#print("解密:",enc)
m = hashlib.md5()
m.update(enc)
md5 = m.digest()
d = md5[-1]
#print(md5)
try:
blocks = d % 10 + 5
except:
blocks = 0 %10 + 5
#print("blocks=",blocks)
return blocks
@classmethod
def scrambleImage(cls,file_path):
#检测到未下载完的图像 直接返回None
if str(file_path).endswith(".downloads"):
os.remove(file_path)
return None
file_str = str(file_path).split("=")
#10_29.jpg
base_dir = file_str[0].replace("scramble","")
base_name = file_str[-1]
base_fn = base_name.split("_")
save_name = base_fn[1]
save_name_delesu = save_name.split(".")[0]
blocks = int(base_fn[0])
save_file_path = os.path.join(base_dir,save_name)
print("sva",save_file_path)
if os.path.exists(save_file_path):
print("图片已解密,已跳过:", save_file_path)
return None
image_su = str(file_path).split(".")[-1]
try:
img = Image.open(file_path)
except:
print(f"error Image: {file_path}")
width = img.width
height = img.height
#blocks = cls.encodeImage(enStr)
print("blocks=",blocks)
block_height = int(height / blocks)
block_width = int(width / blocks)
print("blockHeight=",block_height)
suffix = str(file_path).split(".")[-1]
split_path = os.path.join(base_dir,save_name_delesu+"split")
if image_su == "downloads":
return None
is_split = cls.splitimage(file_path,blocks,1,split_path)
if is_split != None:
cls.image_compose(split_path,blocks,1,save_file_path,block_height,width)
else:
if os.path.exists(split_path):
shutil.rmtree(split_path)
if os.path.exists(file_path):
shutil.move(file_path, save_file_path)
#完成后清空
return file_path
@classmethod
def splitimage(cls,src,rownum,colnum,dstpath):
img=Image.open(src)
w,h=img.size
if rownum<= h and colnum<=w:
s=os.path.split(src)
if dstpath=='':
dstpath = s[0]
if not os.path.exists(dstpath):
os.makedirs(dstpath)
fn=s[1].split('.')
basename=fn[0]
ext=fn[-1]
num=0
rowheight=h//rownum
colwidth=w//colnum
for r in range(rownum):
for c in range(colnum):
box=(c*colwidth,r*rowheight,(c+1)*colwidth,(r+1)*rowheight)
count_image = "{:0>3d}".format(num)
file_path = os.path.join(dstpath,str(count_image)+'.'+ext)
print("file_path=",file_path)
img.crop(box).save(file_path)
num=num+1
return "成功"
else:
print('不数!')
return None
@classmethod
def image_compose(cls,src,row,column,save_path,image_height,image_width):
image_size = image_height
#image_height = 376
#image_width = 720
images_format = ['.png','.jpg']
#image_names = [name for name in os.listdir(src) for item in images_format if
# os.path.splitext(name)[1] == item][::-1]
img_list=os.listdir(src)
img_list.sort()
img_list.sort(key=lambda x: int(x[:-4]))
##文件名按数字排序
img_nums=len(img_list)
image_names = []
for i in range(img_nums):
img_name=os.path.join(src,img_list[i])
image_names.append(img_name)
#使用倒序
image_names = image_names[::-1]
# 简单的对于参数的设定和实际图片集的大小进行数量判断
if len(image_names) < row * column:
raise ValueError("合成图片的参数和要求的数量不能匹配!")
to_image = Image.new('RGB', (column * image_width, row * image_height)) #创建一个新图
# 循环遍历,把每张图片按顺序粘贴到对应位置上
for y in range(1, row + 1):
for x in range(1, column + 1):
#1 * (row=1 -1) col=1 -1
image_path = image_names[column * (y - 1) + x - 1]
print("split_image=",image_path)
from_image = Image.open(image_path)
#保持原图片大小
#.resize(
# (image_size, image_size),Image.ANTIALIAS)
to_image.paste(from_image, ((x - 1) * image_size, (y - 1) * image_size))
from_image.close()
to_image.save(save_path)
print("图片合并完成:", save_path)
shutil.rmtree(src)
# 保存新图
@classmethod
def getScrambleImage(cls,path):
scramble_file_cache = cls.scrambleImage(path)
if scramble_file_cache != None and os.path.exists(scramble_file_cache): os.remove(scramble_file_cache)
@classmethod
def encode_scramble_image(cls, img_path, img_save=None):
if not os.path.exists(img_path):
return
image = Image.open(img_path)
w, h = image.size
#image.show()
file_str = str(img_path).split("=")
#10_29.jpg
base_fn = file_str[-1].split("_")
blocks = int(base_fn[0])
if img_save == None:
save_path = FileNaming.getFileScrambleImageSave(img_path)
else: save_path = img_save
# print(type(aid),type(img_name))
if blocks:
s = blocks # 随机值
# print(s)
l = h % s # 切割最后多余的值
box_list = []
hz = 0
for i in range(s):
c = math.floor(h / s)
g = i * c
hz += c
h2 = h - c * (i + 1) - l
if i == 0:
c += l;hz += l
else:
g += l
box_list.append((0, h2, w, h - g))
# print(box_list,len(box_list))
item_width = w
# box_list.reverse() #还原切图可以倒序列表
# print(box_list, len(box_list))
newh = 0
image_list = [image.crop(box) for box in box_list]
# print(box_list)
newimage = Image.new("RGB", (w, h))
for image in image_list:
# image.show()
b_w, b_h = image.size
newimage.paste(image, (0, newh))
newh += b_h
newimage.save(save_path)
logging.info(f"解密成功 {save_path}")
if os.path.exists(img_path):
os.remove(img_path)
logging.debug(f"remove {img_path}")
return save_path

37
src/config.py Normal file
View File

@ -0,0 +1,37 @@
import logging
from pathlib import Path
from fake_useragent import UserAgent
# 基础配置
BASE_DIR = Path("output")
CACHE_DIR = Path(".cache")
CACHE_IMAGE_DIR = CACHE_DIR / "images"
CBZ_DIR = Path("CBZ")
OLD_CBZ_DIR = Path("OldCBZ")
# DEFAULT_SAVE_DIR = Path("output")
CONCURRENT_DOWNLOADS = 10
RETRY_TIMES = 10
RETRY_PROXY = False
# 在下载失败后,重试次数
RETRY_PROXY_TIMES = 1
RETRIES = 15
TIMEOUT = 60
COMIC_INFO_NAME = "ComicInfo.xml"
XSD_FILE = "src/assets/ComicInfo_2.1.xsd"
# 代理配置
PROXY_URL = "http://47.98.225.49:9890"
# 日志配置
LOG_FORMAT = '%(asctime)s - %(levelname)s - %(message)s'
LOG_LEVEL = logging.INFO
# HTTP配置
USER_AGENT = UserAgent().random
DEFAULT_HEADERS = {
'User-Agent': USER_AGENT
}
# 文件类型
IMAGES_NAME_FORMAT = "{:0>3d}"
DEFAULT_IMAGE_EXT = '.jpg'

215
src/sites/base.py Normal file
View File

@ -0,0 +1,215 @@
from abc import ABC, abstractmethod
from typing import List, Dict, Optional, AsyncGenerator
from pathlib import Path
import aiohttp,os,shutil
import asyncio
import logging
from src.config import DEFAULT_HEADERS, TIMEOUT, RETRIES, PROXY_URL, RETRY_PROXY
from lxml import etree
from src.common.utils import Cache # 导入缓存类
from src.common.item import Chapter, MangaItem, MangaInfo,CoverItem
from src.common.exceptions import SiteError, NetworkError, ParseError
from src.common.logging import setup_logging
from src.common.naming import DirectoryNaming,FileNaming
from src.common.ComicInfo import ComicInfo, ImageInfo
logger = setup_logging(__name__)
class BaseSite(ABC):
"""漫画网站基类"""
def __init__(self):
self.session: Optional[aiohttp.ClientSession] = None
self.headers = DEFAULT_HEADERS.copy()
self.cache = Cache() # 初始化缓存
async def __aenter__(self):
self.session = aiohttp.ClientSession(headers=self.headers)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
async def _get(self, url: str, retries: int = RETRIES, PROXY: bool = RETRY_PROXY) -> str:
"""发送GET请求并处理错误"""
# 尝试从缓存中获取 HTML 内容
cached_html = self.cache.get(url)
if cached_html:
logger.info(f"从缓存中获取 HTML 内容: {url}")
return cached_html
for attempt in range(retries):
try:
if PROXY:
proxy = PROXY_URL
else:
proxy = None
async with self.session.get(str(url), proxy=proxy) as response:
if response.status == 200:
html = await response.text()
self.cache.set(url, html) # 将 HTML 内容保存到缓存
return html
elif response.status == 404:
raise SiteError(f"页面不存在: {url}")
elif response.status == 403:
raise SiteError(f"访问被拒绝: {url}")
else:
raise NetworkError(f"HTTP错误 {response.status}: {url}")
except aiohttp.ClientError as e:
if attempt == retries - 1:
raise NetworkError(f"网络错误: {str(e)}")
logger.info(f"{attempt + 2} 次重试, 网站: {url}")
await asyncio.sleep(2 * (attempt + 1))
@abstractmethod
async def get_chapter_images(self, chapter_url: str) -> List[str]:
"""获取章节所有图片URL"""
pass
#@abstractmethod
async def get_manga_info(self, manga_url: str) -> Dict[str, str]:
"""获取漫画信息"""
try:
html = await self._get(manga_url)
tree = etree.HTML(html)
return self.extractor.extract_manga_info(tree)
except Exception as e:
if isinstance(e, (ParseError, SiteError)):
raise exit(f"解析漫画信息失败: {str(e)}")
raise ParseError(f"解析漫画信息失败: {str(e)}")
#@abstractmethod
#async def get_chapter_list(self, info: MangaInfo) -> List[Dict[str, str]]:
# """获取漫画章节列表"""
# pass
async def get_chapter_list(self, manga_info: MangaInfo) -> List[Dict[str, str]]:
"""获取章节列表"""
try:
# result_type list[Chapter]
list_chapter = manga_info.get_list_chapter()
down_chapter = []
for chapter in list_chapter:
cbz_path = FileNaming.chapter_cbz(manga_info=manga_info,chapter=chapter)
old_cbz_path = FileNaming.old_chapter_cbz(manga_info=manga_info,chapter=chapter)
if os.path.exists(cbz_path):
logger.info(f"{chapter.title} 章节已存在")
chapter.status = "downloaded"
if os.path.exists(old_cbz_path):
logger.info(f"{chapter.title} Old章节存在")
if not os.path.exists(os.path.dirname(cbz_path)): os.makedirs(cbz_path)
shutil.copy(old_cbz_path, cbz_path)
logger.info(f"{old_cbz_path} ==> {cbz_path} 已复制")
chapter.status = "downloaded"
down_chapter.append(chapter)
return down_chapter
except Exception as e:
if isinstance(e, (ParseError, SiteError)):
raise
raise ParseError(f"解析章节列表失败: {str(e)}")
async def update_covers(self, manga_info : MangaInfo) -> AsyncGenerator[Dict, None]:
"""更新Icons文件夹内Cover逻辑"""
cache_cover = { 'path' : str(DirectoryNaming.manga_cover_dir(manga_info, cache=True)) }
cover_img = { 'path' : str(DirectoryNaming.manga_cover_dir(manga_info, cache=False)) }
cache_cover_item = CoverItem(**cache_cover)
icons_dir = os.path.dirname(cover_img['path'])
if not os.path.exists(icons_dir): os.makedirs(icons_dir)
list_cover = []
is_update = 0
try:
for file in os.listdir(icons_dir):
if file.lower().endswith(".jpg"):
file_cover = {'path' : os.path.join(icons_dir, file)}
f_item = CoverItem(**file_cover)
list_cover.append(f_item)
if f_item.md5 == cache_cover_item.md5: is_update += 1
if is_update == 0:
new_cover = { 'path' : FileNaming.cover_format_path(cover_img["path"]) }
shutil.copy(cache_cover["path"], new_cover["path"])
list_cover.append(CoverItem(**new_cover))
except Exception:
raise exit("Cover 检测异常")
return list_cover
async def update_cbz_covers(self, manga_info : MangaInfo):
"""更新CBZ漫画的Cover"""
cbz_dir = DirectoryNaming().chapter_cbz_dir(manga_info=manga_info)
list_cbz = list(FileNaming().get_filenames_optimized(cbz_dir, ext_filter=[".cbz"]))
list_cover = await self.update_covers(manga_info)
for cbz_path in list_cbz:
first_cover_path = str(cbz_path).split(".")[0]+".jpg"
if len(list_cover) == 1:
shutil.copy(list_cover[0].path, first_cover_path)
logger.info(f"{list_cover[0].path} ==> {first_cover_path} 已复制")
continue
cover_count = 1
for cover in list_cover:
cover_path = cover.path
if os.path.exists(first_cover_path): os.remove(first_cover_path)
new_cover_path = FileNaming().cover_format_path(str(cbz_path).split(".")[0]+".jpg", count=cover_count)
shutil.copy(cover_path, new_cover_path)
logger.info(f"{cover_path} ==> {new_cover_path} 已复制")
cover_count += 1
async def download_manga(self, manga_url: str) -> AsyncGenerator[Dict, None]:
"""下载整部漫画"""
try:
# 获取漫画信息
info = await self.get_manga_info(manga_url)
yield {'type': 'info', 'data': info, 'item': info}
# 获取章节列表
chapters = await self.get_chapter_list(info)
yield {'type': 'chapters', 'data': chapters, 'item': info}
# 下载封面
yield {'type': 'cover', 'item': info}
covers = await self.update_covers(info)
# 下载每个章节
for chapter in chapters:
try:
if chapter.status == "downloaded":
logger.info(f"{chapter.title} 章节已下载")
continue
images = await self.get_chapter_images(chapter.url)
manga_item = MangaItem(
info=info,
covers=covers,
chapter=chapter,
chapter_images=images,
chapters=chapters
).get_item()
yield {
'type': 'chapter',
'chapter': str(chapter.title),
'images': images,
'item': manga_item
}
except Exception as e:
yield {
'type': 'error',
'chapter': chapter,
'error': str(e)
}
continue
# 所有章节全部下载完后执行
await self.update_cbz_covers(info)
except Exception as e:
yield {'type': 'error', 'error': str(e)}
async def get_manga_list(self, manga_url: str) -> List[Dict[str, str]]:
"""获取漫画列表"""
try:
html = await self._get(manga_url)
tree = etree.HTML(html)
return self.extractor.extract_manga_list(tree)
except Exception as e:
if isinstance(e, (ParseError, SiteError)):
raise exit(f"解析漫画信息失败: {str(e)}")
raise ParseError(f"解析漫画信息失败: {str(e)}")

View File

@ -0,0 +1,58 @@
import base64,re
import zlib
import json
from typing import List, Dict
from lxml import etree
from src.sites.base import BaseSite
from src.common.loader import ConfigLoader
from src.common.extractor import Extractor
from src.common.exceptions import ParseError, SiteError
from src.common.item import Chapter,MangaInfo,ImageItem
from src.common.naming import FileNaming
class RoumanSite(BaseSite):
def __init__(self):
super().__init__()
self.config = ConfigLoader.load_config('rouman')
self.headers.update(self.config.headers)
self.extractor = Extractor(self.config)
async def get_chapter_images(self, chapter_url: str) -> List[str]:
"""获取章节图片URL列表"""
try:
html = await self._get(chapter_url)
tree = etree.HTML(html)
image_urls_str = []
for data_json in tree.xpath('//script/text()'):
data_json = data_json.replace('\\', '')
if "imageUrl" in data_json:
image_urls_str = re.findall(r'"imageUrl":"(https?://[^"]+)"', data_json)
# 再次通过获取的XPATH数据解析并保存到ci(ComicItem)中
# 正则表达式匹配 .jpg 链接
# 打印提取的 .jpg 链接
image_urls = []
count = 0
for link in image_urls_str:
count += 1
sr_value = re.search(r'sr:(\d+)', link)
# 打印提取到的 sr: 的值
if sr_value:
sr = sr_value.group(1) # group(1) 返回第一个捕获组,即数字部分
else:
print("No match found")
if str(sr) == "1":
de_str = str(link).split("/")[-1].split(".")[0]+"=="
blocks_num = FileNaming.encodeImage(de_str)
image_urls.append(ImageItem(url=link, scramble=sr.replace("0", "False").replace("1", "True"), filename=FileNaming.getFileScrambleImageName(count,blocks_num)))
else:
image_urls.append(ImageItem(url=link, scramble=sr.replace("0", "False").replace("1", "True"), filename=FileNaming.getFileScrambleImageName(count)))
if not image_urls:
raise ParseError("未找到图片URL")
return image_urls
except Exception as e:
if isinstance(e, (ParseError, SiteError)):
raise
raise ParseError(f"解析章节失败: {str(e)}")

View File

@ -0,0 +1,46 @@
project: rm_comic
name: 肉漫屋
domain: rouman5.com
base_url: https://rouman5.com
selectors:
manga_list:
title: '//div[@class="truncate text-foreground"]/text()'
url: '//main//div[@class="grid grid-cols-1 sm:grid-cols-4 md:grid-cols-6 gap-2 sm:gap-4"]//a/@href'
manga_info:
title: '//div[@class="basis-3/5 text-sm sm:text-base"]//div[@class="text-xl text-foreground"]/text()'
author:
selector: '//div[@class="basis-3/5 text-sm sm:text-base"]//span[@class="text-foreground"]/text()'
index: 0
description:
selector: '//div[@class="my-2 text-foreground text-sm sm:text-base"]/p/text()'
index: 1
cover: '//div[@class="flex flex-row gap-3 sm:gap-4"]//div[@class="basis-2/5"]/img[@class="rounded"]/@src'
#status: .book-detail dl dt:contains("状态") + dd
tags:
selector: '//div[@class="basis-3/5 text-sm sm:text-base"]//span[@class="text-foreground"]/text()'
index: 3
# date: '//div[@class="text-gray-500 text-sm mt-2"]/div/text()'
genre:
value: "韩漫"
age_rating:
value: "R18+"
chapter_link: '//div[@class="grid grid-cols-1 sm:grid-cols-2 md:grid-cols-3 gap-2 px-2 py-4"]//a/@href'
chapters_name: '//main//div[@class="text truncate bg-muted p-2 hover:bg-primary/10"]/text()'
chapter_list:
container: '//main//div[@class="text truncate bg-muted p-2 hover:bg-primary/10"]/text()'
title: text
url:
attribute: '//div[@class="grid grid-cols-1 sm:grid-cols-2 md:grid-cols-3 gap-2 px-2 py-4"]//a/@href'
process: join_base_url
chapter:
image_data:
pattern: window\[".*?"\]\s*=\s*"([^"]+)"
decrypt: true
process:
- base64_decode
- zlib_decompress
- json_parse
image_url_template: https://i.hamreus.com{path}

199
src/sites/manager.py Normal file
View File

@ -0,0 +1,199 @@
from pathlib import Path
from typing import Dict, Type, Optional
import logging
from src.config import BASE_DIR
from src.sites.base import BaseSite
from src.sites.configs.rouman import RoumanSite
from src.common.utils import MangaDownloader, CBZUtils
from src.common.naming import DirectoryNaming, FileNaming
from src.common.exceptions import MangaException
from src.common.item import MangaItem, MangaInfo
from src.common.logging import setup_logging
from src.common.ComicInfo import ComicInfoXml
logger = setup_logging(__name__)
class MangaManager:
"""漫画下载管理器"""
SITE_MAP: Dict[str, Type[BaseSite]] = {
# 'manhuagui.com': ManhuaguiSite,
'roum20.xyz': RoumanSite,
'rouman5.com': RoumanSite,
# 在这里添加更多网站支持
}
def __init__(self, base_dir: Path = BASE_DIR):
self.downloader = MangaDownloader(base_dir)
def get_site_handler(self, url: str) -> Optional[Type[BaseSite]]:
"""根据URL获取对应的网站处理器"""
for domain, handler in self.SITE_MAP.items():
if domain in url:
return handler
return None
async def process_manga(
self,
url: str,
volume_num: int = 1,
status_callback = None
):
"""处理漫画下载"""
# 获取网站处理器
site_handler = self.get_site_handler(url)
if not site_handler:
raise MangaException(f"不支持的网站: {url}")
async with site_handler() as site:
# 下载整部漫画
async for result in site.download_manga(url):
if result['type'] == 'info':
manga_info = result['data']
logger.info(f"漫画信息: {manga_info}")
# 使用 MangaItem 保存数据
manga_item = MangaItem(info=manga_info, chapters=[])
manga_name = manga_info.title
# 创建命名策略
#self.manga_path = NamingStrategy.manga_volume_path(
# manga_name,
# volume_num=volume_num
#)
#self.manga_filename = NamingStrategy.custom_manga_filename(
# prefix="page",
# digits=3
#)
elif result['type'] == 'chapters':
chapters = result['data']
total = 0
for chapter in chapters:
if not chapter.status == "downloaded":
total += 1
total_chapters = total
logger.info(f"找到 {total_chapters} 个章节")
manga_item.chapters.extend(chapters) # 添加章节到 MangaItem
yield {
'type': 'progress',
'total_chapters': total_chapters
}
elif result['type'] == 'cover':
await self.downloader.download_cover(manga_info)
yield {
'type': 'cover_complete',
'item': manga_item
}
elif result['type'] == 'chapter':
manga_item = result['item']
chapter = manga_item.chapter
# 生成章节图像工作目录
chapter_dir = DirectoryNaming.chapter_images_dir(manga_info, chapter)
DirectoryNaming.ensure_dir(chapter_dir)
try:
# 下载章节
download_result = await self.downloader.download_chapter(
manga_item,
#filename_generator=self.manga_filename,
#path_generator=self.manga_path,
status_callback=status_callback
)
# 章节下载完成后处理流程 start
# 下载完成后生成 ComicInfo.xml
if int(download_result['success']) == int(download_result['total']):
cbz_path = FileNaming.chapter_cbz(manga_info, chapter)
# 解密图片
CBZUtils(cbz_path)._image_deScrambleByPath(chapter_dir)
ComicInfoXml().scrapy_xml_by_json(manga_item.get_comic_info_json(), chapter_dir)
# 打包成 CBZ 文件
CBZUtils(cbz_path).create_cbz(chapter_dir)
# 章节下载完成后处理流程 end
yield {
'type': 'chapter_complete',
'chapter': chapter,
'result': download_result
}
except Exception as e:
logger.error(f"下载章节 {chapter['title']} 失败: {str(e)}")
yield {
'type': 'chapter_error',
'chapter': chapter,
'error': str(e)
}
elif result['type'] == 'error':
logger.error(f"错误: {result['error']}")
yield {
'type': 'error',
'error': result['error']
}
@staticmethod
def print_progress(status):
"""打印下载进度"""
progress_bar_length = 30 # 进度条长度
progress = int(status.progress * progress_bar_length)
bar = '#' * progress + '-' * (progress_bar_length - progress)
print(f"\r下载进度: |{bar}| {status.current}/{status.total} "
f"({status.progress:.1%})", end="")
async def download_list_manga(self, manga_url: str):
# 获取网站处理器
list_site_handler = self.get_site_handler(manga_url)
if not list_site_handler:
raise MangaException(f"不支持的网站: {manga_url}")
async with list_site_handler() as site:
manga_list = await site.get_manga_list(manga_url)
for title,url in zip(manga_list.title, manga_list.url):
print(title,url)
logger.info(f"开始下载 漫画: {title}")
logger.info(f"{url}")
await self.download_manga(str(url))
@classmethod
async def download_manga(cls, url: str, save_dir: Path = BASE_DIR):
"""下载漫画"""
manager = MangaManager(save_dir)
try:
total_chapters = 0
completed_chapters = 0
async for result in manager.process_manga(url, status_callback=cls.print_progress):
if result['type'] == 'progress':
total_chapters = result['total_chapters']
logger.info(f"开始下载,共 {total_chapters}")
elif result['type'] == 'chapter_complete':
completed_chapters += 1
chapter_result = result['result']
if chapter_result['failed']:
logger.warning(
f"章节 {result['chapter']} 完成: "
f"{chapter_result['success']}/{chapter_result['total']} 张图片成功, "
f"{chapter_result['failed']} 张失败"
)
else:
logger.info(f"章节 {result['chapter']} 完成")
print(f"\n总进度: {completed_chapters}/{total_chapters}")
elif result['type'] == 'chapter_error':
logger.error(f"章节 {result['chapter']} 下载失败: {result['error']}")
elif result['type'] == 'error':
logger.error(f"下载出错: {result['error']}")
except MangaException as e:
logger.error(f"下载失败: {str(e)}")
except Exception as e:
logger.error(f"未知错误: {str(e)}")