ComicScrapy/Comics/pipelines.py
2024-07-22 23:36:42 +08:00

148 lines
6.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Define your item pipelines here
#
# Don't forget to add your pipeline to the ITEM_PIPELINES setting
# See: https://docs.scrapy.org/en/latest/topics/item-pipeline.html
# useful for handling different item types with a single interface
import os,scrapy,logging
from Comics import settings
from Comics.items import ComicItem
from Comics.loader import ComicLoader
from Comics.utils import CBZUtils,fileUtils as fu
from Comics.utils import ComicPath
from Comics.utils import checkUtils,oldUtils
from Comics.exporters import JsonExport,ItemExporter
from scrapy.pipelines.images import ImagesPipeline
from Comics._utils.ComicInfo import ComicInfoXml
class ComicsPipeline():
# item就是yield后面的对象
def process_item(self, item: ComicItem, spider):
if isinstance(item, ComicItem):
# 'output/rm_comic/json/壞X/第1話 壞X'
# 已存在漫画CBZ文件 调用转换
result_item = None
if fu.exists(ComicPath(item).PATH_CBZ()): result_item = ItemExporter().export_obj(item)
# 不存在漫画CBZ文件
else: result_item = JsonExport(file=ComicPath(item).getDirJosnComicChapter()).export_json(ComicLoader(item).load_item(), if_return=True)
#oldUtils().clean_old_files(files=result_item["chapters"], folder=ComicPath(item).file_path(result_type=ComicPath.MAPPING_CBZ_DIR), move_folder=ComicPath(item).file_path(result_type=ComicPath.MAPPING_OLD_CBZ_DIR))
return result_item
class BaseImagesPipeline(ImagesPipeline):
def image_scramble_exits(self, item,image_path):
en_image_path = ComicPath(item).getFileScrambleImageSave(image_path, relative="fullpath")
return fu.exists(fu.join(settings.IMAGES_STORE, self.get_file_path(item, en_image_path)))
def get_file_path(self, item, file=None, result_type="image"):
return ComicPath(item).file_path(file=file, result_type= result_type)
# 封面路径
def file_path(self, request, response=None, info=None, *, item=None): return request.meta['path']
# 判断是否需要更新封面
def update_icon(self, item):
# 下载后的缓存封面路径
image_path = self.get_file_path(item, result_type="down_cache_icon")
# 最终封面保存路径
save_path = self.get_file_path(item=item, result_type="down_icon")
fu.update_icon(image_path, save_path)
def success_completed(self, item, results):
is_success = False
fail_data = []
for result in results:
# 失败数据
if not result[0]: fail_data.append(result[1])
if len(fail_data) == 0 and len(results) != 0: is_success = True
return is_success
# 封面下载操作类
class IconDownloadPipeline(BaseImagesPipeline):
# 数据处理
def get_media_requests(self, item, info):
comic = ComicLoader(item=item)
# 获取封面链接和封面保存路径
icon_url, icon_cache_path = [ comic.get_icon(), super().get_file_path(item, result_type="icon_cache") ]
# 封面已存在
if fu.exists(icon_cache_path): return False
else: yield scrapy.Request(url=icon_url, meta={'path': icon_cache_path })
def item_completed(self, results, item, info):
if super().success_completed(item, results):
print(" icon download success")
# 更新封面到Icon文件夹内
super().update_icon(item)
return item
class ImgDownloadPipeline(BaseImagesPipeline):
def get_media_requests(self, item, info):
comic = ComicLoader(item=item)
self.image_urls, self.images = [ comic.get_image_urls(), comic.get_images() ]
# 添加封面下载信息至下载列表中
# self.add_download_icon(item)
for image_url,image in zip(self.image_urls,self.images):
if_down, image_path = [ True, super().get_file_path(item, image)]
# 图像(含加密图像)已存在
if super().image_scramble_exits(item, image_path):
#if image_path == self.get_file_path(item, result_type="icon_cache"):
# logging.info(f"icon file exists: IMAGE_STORE {image_path}")
#else:
if_down = False
logging.info(f"file exists: IMAGE_STORE {image_path}")
if if_down:
logging.info(f"downloading {image_url} --> IMAGE_STORE {image_path}")
yield scrapy.Request(url=image_url, meta={'path': image_path})
# 打包cbz封面
def pack_icon(self, item):
cbz_icon = super().get_file_path(item=item, result_type="cbz_icon")
dwn_icon = super().get_file_path(item=item, result_type="down_icon")
base_dir = fu.dirname(dwn_icon)
name = fu.basename(dwn_icon).split(".")[0]
for dirname in os.listdir(base_dir):
path = fu.join(base_dir, dirname)
if os.path.isfile(path) and dirname.startswith(name):
fu.update_icon(path, cbz_icon)
def item_completed(self, results, item, info):
"""图片下载完成开始CBZ打包
Args:
results (_type_): 当前下载完成的图片,已下载的已忽略
item (_type_): Comic item数据
info (_type_): 信息
"""
cbz_path = super().get_file_path(item, result_type="cbz")
# chapter_dir = ComicPath(item=item).file_path(result_type=ComicPath().MAPPING_IMAGES_DIR)
# images_file = oldUtils().old_images(folder=chapter_dir)
# if len(images_file) != len(ComicLoader(item=item).get_image_urls()): return
if fu.exists(cbz_path):
#self.update_icon(item)
self.pack_icon(item)
else:
# ComicInfoXml 生成
#comic_info = ComicInfoXmlItemExporter(dir=super().get_file_path(item=item, result_type="comic_info")).export_xml(item)
comic_pages = ComicInfoXml().scrapy_xml_by_json(item, save_dir=super().get_file_path(item=item, result_type="images_dir"))
#if CBZUtils.packComicChapterCBZ(src_dir= super().get_file_path(item, result_type="images_dir"),
# dts_path= cbz_path,
# comic_info_images= comic_info['Pages'], remove=True):
if CBZUtils.packComicChapterCBZ(src_dir= super().get_file_path(item, result_type="images_dir"),
dts_path= cbz_path,
comic_info_images= comic_pages, remove=True):
super().update_icon(item)
self.pack_icon(item)
# CBZ校验失败
else:
checkUtils().export_error(item)
#sleep_time = random.randint(3,15)
#print(f'等待{sleep_time}秒后进行下一章节')
#time.sleep(int(sleep_time))