ComicScrapy/Comics/pipelines.py
2024-11-14 23:58:22 +08:00

195 lines
8.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Define your item pipelines here
#
# Don't forget to add your pipeline to the ITEM_PIPELINES setting
# See: https://docs.scrapy.org/en/latest/topics/item-pipeline.html
# useful for handling different item types with a single interface
import os,scrapy,logging,shutil
from Comics import settings
from Comics.settings import IMAGES_STORE
from Comics._utils.items import ComicItem
from Comics._utils.loader import ComicLoader
from Comics._utils.utils import CBZUtils,fileUtils as fu
from Comics._utils.utils import ComicPath
from Comics._utils.utils import oldUtils
from Comics._utils.exporters import JsonExport,ItemExporter
from scrapy.pipelines.images import ImagesPipeline
from Comics._utils.ComicInfo import ComicInfoXml
from Comics._utils.downloader import download_images
class ComicsPipeline():
# 500kb
def remove_min_file(self, file_path, min_size=500*1024):
try:
# 使用 os.path.getsize() 获取文件大小(以字节为单位)
file_size = os.path.getsize(file_path)
if file_size < min_size:
os.remove(file_path)
logging.info(f"清除错误文件: {file_path}")
except Exception as e:
print(f"获取文件大小失败: {e}")
'''
解析前端传入的item数据
将数据进行序列化后传出
'''
# item就是yield后面的对象
def process_item(self, item: ComicItem, spider):
if isinstance(item, ComicItem):
# 已存在漫画CBZ文件 调用转换
result_item = None
if fu.exists(ComicPath(item).PATH_CBZ()): result_item = ItemExporter().export_obj(item)
# 不存在漫画CBZ文件
else: result_item = JsonExport(file=ComicPath(item).getDirJosnComicChapter()).export_json(ComicLoader(item).load_item(), if_return=True)
cbz_path = ComicPath(item=item).PATH_CBZ()
if not fu.exists(cbz_path):
return result_item
else: self.remove_min_file(cbz_path)
return None
class BaseImagesPipeline(ImagesPipeline):
def image_scramble_exits(self, item,image_path):
en_image_path = ComicPath(item).getFileScrambleImageSave(image_path, relative="fullpath")
return fu.exists(fu.join(settings.IMAGES_STORE, self.get_file_path(item, en_image_path)))
def get_file_path(self, item, file=None, result_type="image"):
return ComicPath(item).file_path(file=file, result_type= result_type)
# 封面路径
def file_path(self, request, response=None, info=None, *, item=None): return request.meta['path']
# 判断是否需要更新封面
def update_icon(self, item):
# 下载后的缓存封面路径
image_path = self.get_file_path(item, result_type="down_cache_icon")
# 最终封面保存路径
save_path = self.get_file_path(item=item, result_type="down_icon")
fu.update_icon(image_path, save_path)
def success_completed(self, item, results):
is_success = False
fail_data = []
for result in results:
# 失败数据
if not result[0]: fail_data.append(result[1])
if len(fail_data) == 0 and len(results) != 0: is_success = True
return is_success
class ImgDownloadPipeline(BaseImagesPipeline):
def get_media_requests(self, item, info):
comic = ComicLoader(item=item)
# 获取需要解析下载的图像
images_item = comic.parse_images()
donwloaded_images = []
for image_item in images_item:
image_url, image_path = [ image_item["image_url"], image_item["image_path"]]
if image_item["image_type"] == "Icon":
image_path = super().get_file_path(item, result_type="icon_cache")
# 图像(含加密图像)存在
# if super().image_scramble_exits(item, image_path):
if os.path.exists(image_path):
logging.info(f"file exists: IMAGE_STORE {image_path}")
donwloaded_images.append(image_path)
logging.info(f"images count= {len(images_item)} downloaded_images_count= {len(donwloaded_images)}")
else:
logging.info(f"downloading {image_url} --> IMAGE_STORE {image_path}")
yield scrapy.Request(url=image_url, meta={'path': image_path })
# 打包cbz封面
def pack_icon(self, item):
cbz_icon = super().get_file_path(item=item, result_type="cbz_icon")
dwn_icon = super().get_file_path(item=item, result_type="down_icon")
base_dir = fu.dirname(dwn_icon)
name = fu.basename(dwn_icon).split(".")[0]
for dirname in os.listdir(base_dir):
path = fu.join(base_dir, dirname)
if os.path.isfile(path) and dirname.startswith(name):
fu.update_icon(path, cbz_icon)
def download_validate(self, item):
"""图片校验完整则返回TRUE
Args:
item (_type_): _description_
Returns:
_type_: _description_
"""
comic = ComicLoader(item=item)
# 获取需要解析下载的图像
images_item = comic.parse_images()
exist_images = []
for image_item in images_item:
image_path = os.path.join(IMAGES_STORE, image_item["image_path"])
if image_item["image_type"] == "Image" and os.path.exists(image_path): exist_images.append(image_path)
if len(comic.get_image_urls()) == len(exist_images) and len(exist_images) != 0 : return True
return False
def download_done(self, item):
# super().update_icon(item)
cbz_path = super().get_file_path(item, result_type="cbz")
chapter_dir = ComicPath(item=item).file_path(result_type=ComicPath().MAPPING_IMAGES_DIR)
# images_file = oldUtils().old_images(folder=chapter_dir)
# images_urls = ComicLoader(item=item).get_image_urls()
# 校验数据是正确
# if len(images_file) != len(images_urls) or len(images_urls) == 0: return
super().update_icon(item)
# CBZ文件是否已存在
if fu.exists(cbz_path):
logging.info(f"file exists {cbz_path}")
#self.update_icon(item)
chapter = os.path.basename(cbz_path).split(".")[0]
if os.path.exists(chapter_dir) and chapter in chapter_dir:
print(f"正在删除多余章节文件夹 {chapter_dir}")
shutil.rmtree(chapter_dir)
self.pack_icon(item)
else:
# ComicInfoXml 生成
ComicInfoXml().scrapy_xml_by_json(item, save_dir= chapter_dir)
if CBZUtils.comicChapterCBZPack(src_dir= chapter_dir, dts_path= cbz_path, remove=True):
super().update_icon(item)
self.pack_icon(item)
def down_image(self, item):
"""图片下载完成开始CBZ打包
不再做数据完整检验CBZUtils后续会自动检验
传入图像路径即可
Args:
results (_type_): 当前下载完成的图片,已下载的已忽略
item (_type_): Comic item数据
info (_type_): 信息
"""
comic = ComicLoader(item=item)
# 获取需要解析下载的图像
images_item = comic.parse_images()
donwloaded_images = []
down_queue = []
for image_item in images_item:
image_url, image_path = [ image_item["image_url"], image_item["image_path"]]
if image_item["image_type"] == "Image":
is_next = not super().image_scramble_exits(item, image_path)
# 图像(含加密图像)存在
if not is_next:
logging.info(f"file exists: IMAGE_STORE {image_path}")
donwloaded_images.append(image_path)
logging.info(f"images count= {len(images_item)} downloaded_images_count= {len(donwloaded_images)}")
# 如果图像已全部下载则直接跳至下一步压缩CBZ
# if len(donwloaded_images) == len(images_item):
# logging.info(f"len(donwloaded_images) == len(images_item)")
# self.download_done(item)
if is_next:
# logging.info(f"downloading {image_url} --> IMAGE_STORE {image_path}")
down_queue.append((image_url, os.path.join(IMAGES_STORE, image_path)))
if len(down_queue) > 0:
download_images(down_queue, max_retries=3)
def item_completed(self, results, item, info):
cbz_path = super().get_file_path(item, result_type="cbz")
if not fu.exists(cbz_path):
self.down_image(item)
# 存在未下载图像数据则重试
self.download_done(item)