ComicScrapy/Comics/utils.py
2024-02-20 21:08:13 +08:00

690 lines
27 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import base64,hashlib,os,shutil,os.path
import math,time,json,datetime,logging
import re,requests,time,xmlschema
from datetime import date
from Comics import settings
from opencc import OpenCC
from PIL import Image
from pathlib import Path
from zipfile import ZipFile
from Comics.settings import COMIC_INFO_XML_FILE,OUTPUT_DIR,PROJECT_KEY
import yaml
from Comics.loader import ComicLoader
# 配置类
class Conf():
# 读取yml文件配置
# @project 根据工程名读取配置 project.yml
# @key 读取key内的字典的数据(默认为空)
#def init(self, project, key=None):
# data = None
# if project == None: project = "config"
# with open(os.path.join("Comics","spiders", project)+".yml") as f:
# data = yaml.load(f, Loader=yaml.FullLoader)
# if key != None and data != None:
# return data[key]
def get_config_value(self, project, key=None):
# 使用Path类来处理文件路径
config_path = Path(os.path.join("Comics","spiders", project)+".yml")
#Path("Comics") / "spiders" / project / (project + ".yml")
# 检查项目是否存在
if not config_path.is_file():
return None
# 打开文件并加载配置数据
try:
with config_path.open('r') as f:
data = yaml.safe_load(f)
except yaml.YAMLError as e:
print(f"Error loading YAML file: {e}")
return None
# 检查key是否存在
if key is not None and key in data:
return data[key]
else:
return None
# 根据读取的配置数据导入到ComicLoader中
def comic(self, project, item: ComicLoader, child_data='data', val=None):
item.project_name(project)
data = self.get_config_value(project, child_data)
for key, xpath_data in data.items():
if isinstance(xpath_data, str): xpath_data = {'xpath': xpath_data}
xpath = xpath_data.get('xpath', None)
index = xpath_data.get('index', None)
value = xpath_data.get('value', None) if val is None else val
sexec = xpath_data.get('sexec', None)
item.set_properties(name=key, value=value, xpath=xpath, index=index, sexec=sexec)
return item
def parse_chapter(self,item: ComicLoader, value):
return self.comic(item.get_project_name(), item, "parse_chapter", value)
# 文件操作类
class fileUtils:
# 文件是否存在
@classmethod
def exists(cls, path): return os.path.exists(path)
# 文件路径拼接
@classmethod
def join(cls, path, *paths): return os.path.join(path, *paths);
# 文件夹名
@classmethod
def dirname(cls, path): return os.path.dirname(path);
# 文件名
@classmethod
def basename(cls, path): return os.path.basename(path);
# 保存文件
@classmethod
def save_file(cls,path,data):
root_dir = os.path.dirname(path)
if not os.path.exists(root_dir): os.makedirs(root_dir)
with open(path,'w',encoding='utf-8') as fs:
fs.write(str(data))
# 返回校验后的文件路径
@classmethod
def path(cls, file):
base_dir = os.path.dirname(file)
if not os.path.exists(base_dir): os.makedirs(base_dir)
return file
# 比较文件大小
@classmethod
def compare_size(cls, dst, file):
if cls.exists(dst) and cls.exists(file):
return os.stat(dst).st_size == os.stat(file).st_size
else:
return None
# 读取文件
@classmethod
def read(cls, file):
if os.path.exists(file):
with open(file, "r", encoding="utf-8") as fs: return fs.read()
else:
return []
"""
图像编号 image-1.jpg
存在image.png 返回 image-1.png 反之 image.png
"""
@classmethod
def file_check(cls, file, result="file", count=0):
temp_file_name, files_size, files_name = [file, {}, []]
# 默认文件名不存在
if not cls.exists(temp_file_name) and temp_file_name == file: count = 1
while count or count == 0:
temp_file_name = ComicPath().images_icon(file=file, count=count)
if cls.exists(temp_file_name):
# 保存存在的文件名
files_name.append(temp_file_name)
file_size = os.path.getsize(temp_file_name)
# 保存文件名和大小数据
files_size[file_size] = {"name": temp_file_name, "size": file_size}
# 格式化文件名
# temp_file_name = ComicPath().images_icon(file=file, count=count)
count += 1
else:
# 检测是否有重复数据
# 提取重复并需删除的文件名
diff_names = {value["name"] for value in files_size.values()}
# 不存在则返回原文件名
if len(diff_names) == 0: return file
for file_name in files_name:
if file_name not in diff_names:
logging.info(f"删除文件:{file_name}")
os.remove(file_name)
# 判断是否存在初始文件和多个文件名
if file in diff_names:
move_file = ComicPath().images_icon(file=file, count=count)
logging.info(f"移动文件{file}{move_file}")
shutil.move(file, move_file)
cls.file_check(file=file,result=result,count=0)
# 去重后文件名数与存在的文件名数不存在则证明文件存在重复,重新运行本方法
if len(set(diff_names)) != len(set(files_name)): cls.file_check(file, result=result,count=0)
if result == "size":
return {value["size"] for value in files_size.values()}
else:
return temp_file_name
# 判断文件是否更新
@classmethod
def file_update(cls, old_file, new_file):
is_update = False
if os.path.exists(old_file): is_update = os.path.getsize(old_file) not in cls.file_check(new_file, result="size")
return is_update
# 判断是否需要更新封面
@classmethod
def update_icon(cls, image_path, save_path):
# 不存在则更新
if cls.file_update(image_path, save_path):
save_dir = os.path.dirname(save_path)
if not os.path.exists(save_dir): os.makedirs(save_dir)
logging.info(f"update icon ... {image_path} ===> {cls.file_check(save_path)}")
shutil.copyfile(image_path, cls.file_check(save_path))
# 公共工具类
class CommonUtils:
@classmethod
def parseExec(cls,data,exec):
if data !=None and exec != None:
dots = str(exec).split(".")
if not isinstance(data,dict): data = json.loads(data)
for dot in dots:
data = data.get(dot)
return data
@classmethod
def _validate_xml(cls,xml_file, xsd_file):
# 读取XSD文件
xsd = xmlschema.XMLSchema(xsd_file)
# 验证XML
is_valid = xsd.is_valid(xml_file)
if is_valid:
print("XML文件通过XSD验证成功")
else:
print("XML文件未通过XSD验证。以下是验证错误信息")
validation_errors = xsd.to_errors(xml_file)
for error in validation_errors:
print(error)
@classmethod
def validate_comicinfo_xml(cls, xml_file):
cls._validate_xml(xml_file, "ComicInfo.xsd")
# 图片处理类
class imageUtils:
@classmethod
def descramble_images_by_dir(cls, chapter_dir):
if os.path.isfile(chapter_dir):
chapter_dir = os.path.dirname(chapter_dir)
scramble_count = 0
if os.path.exists(chapter_dir): #获取章节图片路径
while ComicPath.PREFIX_SCRAMBLE in os.listdir(chapter_dir):
for img in os.listdir(chapter_dir):
if img.startswith(ComicPath.PREFIX_SCRAMBLE):
imageUtils.encode_scramble_image(os.path.join(chapter_dir, img))
scramble_count += 1
logging.debug(f"{ComicPath.PREFIX_SCRAMBLE} {scramble_count}")
return scramble_count
@classmethod
def deScrambleImagesByPath(cls, img_path, img_save=None):
if os.path.basename(img_path).\
startswith(ComicPath.PREFIX_SCRAMBLE) and os.path.exists(img_path):
img_path = imageUtils.encode_scramble_image(img_path, img_save)
return img_path
@classmethod
def encodeImage(cls,str_en):
#print("en",str_en)
enc = base64.b64decode(str_en)
#print("解密:",enc)
m = hashlib.md5()
m.update(enc)
md5 = m.digest()
d = md5[-1]
#print(md5)
try:
blocks = d % 10 + 5
except:
blocks = 0 %10 + 5
#print("blocks=",blocks)
return blocks
@classmethod
def scrambleImage(cls,file_path):
#检测到未下载完的图像 直接返回None
if str(file_path).endswith(".downloads"):
os.remove(file_path)
return None
file_str = str(file_path).split("=")
#10_29.jpg
base_dir = file_str[0].replace("scramble","")
base_name = file_str[-1]
base_fn = base_name.split("_")
save_name = base_fn[1]
save_name_delesu = save_name.split(".")[0]
blocks = int(base_fn[0])
save_file_path = os.path.join(base_dir,save_name)
print("sva",save_file_path)
if os.path.exists(save_file_path):
print("图片已解密,已跳过:", save_file_path)
return None
image_su = str(file_path).split(".")[-1]
try:
img = Image.open(file_path)
except:
print(f"error Image: {file_path}")
width = img.width
height = img.height
#blocks = cls.encodeImage(enStr)
print("blocks=",blocks)
block_height = int(height / blocks)
block_width = int(width / blocks)
print("blockHeight=",block_height)
suffix = str(file_path).split(".")[-1]
split_path = os.path.join(base_dir,save_name_delesu+"split")
if image_su == "downloads":
return None
is_split = cls.splitimage(file_path,blocks,1,split_path)
if is_split != None:
cls.image_compose(split_path,blocks,1,save_file_path,block_height,width)
else:
if os.path.exists(split_path):
shutil.rmtree(split_path)
if os.path.exists(file_path):
shutil.move(file_path, save_file_path)
#完成后清空
return file_path
@classmethod
def splitimage(cls,src,rownum,colnum,dstpath):
img=Image.open(src)
w,h=img.size
if rownum<= h and colnum<=w:
s=os.path.split(src)
if dstpath=='':
dstpath = s[0]
if not os.path.exists(dstpath):
os.makedirs(dstpath)
fn=s[1].split('.')
basename=fn[0]
ext=fn[-1]
num=0
rowheight=h//rownum
colwidth=w//colnum
for r in range(rownum):
for c in range(colnum):
box=(c*colwidth,r*rowheight,(c+1)*colwidth,(r+1)*rowheight)
count_image = "{:0>3d}".format(num)
file_path = os.path.join(dstpath,str(count_image)+'.'+ext)
print("file_path=",file_path)
img.crop(box).save(file_path)
num=num+1
return "成功"
else:
print('不数!')
return None
@classmethod
def image_compose(cls,src,row,column,save_path,image_height,image_width):
image_size = image_height
#image_height = 376
#image_width = 720
images_format = ['.png','.jpg']
#image_names = [name for name in os.listdir(src) for item in images_format if
# os.path.splitext(name)[1] == item][::-1]
img_list=os.listdir(src)
img_list.sort()
img_list.sort(key=lambda x: int(x[:-4]))
##文件名按数字排序
img_nums=len(img_list)
image_names = []
for i in range(img_nums):
img_name=os.path.join(src,img_list[i])
image_names.append(img_name)
#使用倒序
image_names = image_names[::-1]
# 简单的对于参数的设定和实际图片集的大小进行数量判断
if len(image_names) < row * column:
raise ValueError("合成图片的参数和要求的数量不能匹配!")
to_image = Image.new('RGB', (column * image_width, row * image_height)) #创建一个新图
# 循环遍历,把每张图片按顺序粘贴到对应位置上
for y in range(1, row + 1):
for x in range(1, column + 1):
#1 * (row=1 -1) col=1 -1
image_path = image_names[column * (y - 1) + x - 1]
print("split_image=",image_path)
from_image = Image.open(image_path)
#保持原图片大小
#.resize(
# (image_size, image_size),Image.ANTIALIAS)
to_image.paste(from_image, ((x - 1) * image_size, (y - 1) * image_size))
from_image.close()
to_image.save(save_path)
print("图片合并完成:", save_path)
shutil.rmtree(src)
# 保存新图
@classmethod
def getScrambleImage(cls,path):
scramble_file_cache = cls.scrambleImage(path)
if scramble_file_cache != None and os.path.exists(scramble_file_cache): os.remove(scramble_file_cache)
@classmethod
def encode_scramble_image(cls, img_path, img_save=None):
if not os.path.exists(img_path):
return
image = Image.open(img_path)
w, h = image.size
#image.show()
file_str = str(img_path).split("=")
#10_29.jpg
base_fn = file_str[-1].split("_")
blocks = int(base_fn[0])
if img_save == None:
save_path = os.path.join(os.path.dirname(img_path),ComicPath.getFileScrambleImageSave(img_path))
else: save_path = img_save
# print(type(aid),type(img_name))
if blocks:
s = blocks # 随机值
# print(s)
l = h % s # 切割最后多余的值
box_list = []
hz = 0
for i in range(s):
c = math.floor(h / s)
g = i * c
hz += c
h2 = h - c * (i + 1) - l
if i == 0:
c += l;hz += l
else:
g += l
box_list.append((0, h2, w, h - g))
# print(box_list,len(box_list))
item_width = w
# box_list.reverse() #还原切图可以倒序列表
# print(box_list, len(box_list))
newh = 0
image_list = [image.crop(box) for box in box_list]
# print(box_list)
newimage = Image.new("RGB", (w, h))
for image in image_list:
# image.show()
b_w, b_h = image.size
newimage.paste(image, (0, newh))
newh += b_h
newimage.save(save_path)
logging.info(f"解密成功 {save_path}")
if os.path.exists(img_path):
os.remove(img_path)
logging.debug(f"remove {img_path}")
return save_path
# 压缩工具类
class CBZUtils:
@classmethod
def readDirsOrFiles(cls, dir, type):
data = []
files = os.listdir(dir)
for file in files:
path = os.path.join(dir, file)
if type == "files" and os.path.isfile(path):
data.append(path)
if type == "dirs" and os.path.isdir(path):
data.append(path)
return data
@classmethod
def zip_compression(cls, source_dir=None, target_file=None, remove=True):
target_dir = os.path.dirname(target_file)
if not os.path.exists(target_dir):
os.makedirs(target_dir)
if not os.path.exists(target_file) and source_dir is not None:
with ZipFile(target_file, mode='w') as zf:
for path, dir_names, filenames in os.walk(source_dir):
path = Path(path)
arc_dir = path.relative_to(source_dir)
y = 0
for filename in filenames:
y = y + 1
print("打包中:" + str(y) + "/" + str(len(filenames)), os.path.join(source_dir, filename))
zf.write(path.joinpath(filename), arc_dir.joinpath(filename))
zf.close()
logging.info(f"打包完成:{target_file}")
@classmethod
def packComicChapterCBZ(cls, src_dir, dts_path, comic_info_images, remove=True):
if os.path.exists(src_dir):
dirs = os.listdir(src_dir)
for file in dirs:
if file.startswith(ComicPath.PREFIX_SCRAMBLE):
try:
imageUtils.deScrambleImagesByPath(os.path.join(src_dir,file))
except Exception as e:
print(f"删除 {file} 发生错误 {e},已跳过")
return False
cls.zip_compression(src_dir, dts_path)
time.sleep(0.1)
if remove: shutil.rmtree(src_dir)
# validation
return cls.cbz_validate(dts_path, comic_info_images)
@classmethod
def replaceZip(cls, filepath, unpack_dir=None):
if not cls.compareFileDate(filepath): return None
if unpack_dir == None:
unpack_dir = str(filepath).split(".")[0]
fz = ZipFile(filepath, 'r')
for file in fz.namelist():
if file.endswith(".jpg"):
data = fz.read(file)
if len(data) < 500 and os.path.exists(filepath):
os.remove(filepath)
print(f"数据不完整,已删除:{filepath}")
if cls.compareFileDate(filepath):
os.utime(filepath)
print(f"已更新文件时间 {filepath}")
if os.path.exists(unpack_dir):
shutil.rmtree(unpack_dir)
# 删除删除main.ftl文件
# delete_filename = ''
# if os.path.exists(delete_filename):
# os.remove(delete_filename)
# time.sleep(60)
# shutil.copy(文件的路径,另一个目录);拷贝main.ftl到准备压缩的目录下
# cls.zip_compression()
# 小于则运行
@classmethod
def compareFileDate(cls, filepath):
if os.path.exists(filepath):
ctime = os.path.getmtime(filepath)
str_ctime = datetime.fromtimestamp(int(ctime))
file_ctime = str(str_ctime.year) + "{:0>2d}".format(str_ctime.month) + "{:0>2d}".format(
str_ctime.day) + "{:0>2d}".format(str_ctime.hour)
c_ctime = 2023011603
else:
return False
if int(file_ctime) < c_ctime:
return True
return False
@classmethod
def zip_info(cls, path, filter=True):
result = None
try:
with ZipFile(path, "r") as zip_file:
result = zip_file.namelist()
if filter:
result.remove(COMIC_INFO_XML_FILE)
except Exception as e:
print(e)
return result
@classmethod
def cbz_validate(cls, zip_path, comic_info_images):
if len(cls.zip_info(zip_path)) == len(comic_info_images):
# logging.info(f"validating successfully === {zip_path}")
ntfy.sendMsg(f"validating successfully === {zip_path}", alert=True)
return True
else:
os.remove(zip_path)
logging.error(f"validating fail === {zip_path}")
return False
# 检测工具类
class checkUtils:
def read(self, item):
file = os.path.join(OUTPUT_DIR, ComicLoader(item=item).get_project_name(), "error_comics.json")
return fileUtils.read(file)
#
# 检测某一章节是否连续错误
def export_error(self, item):
if not self.is_error(item):
file = os.path.join(OUTPUT_DIR, ComicLoader(item=item).get_project_name(), "error_comics.json")
try:
error_comic = eval(self.read(item))
except:
error_comic = []
error_comic.append({ "name" : ComicPath.new_file_name(item['name']),
"chapter" : ComicPath.new_file_name(item['chapter']),
"date" : ComicPath().getYearMonthDay()})
fileUtils.save_file(file, json.dumps(error_comic))
def is_error(self, item):
try:
for error_c in eval(self.read(item)):
(name, chatper, date) = [error_c['name'], error_c['chapter'], error_c['date']]
if ComicPath.new_file_name(item['name']) == ComicPath.new_file_name(name) and ComicPath.new_file_name(item['chapter']) == ComicPath.new_file_name(chatper):
return True
else:
return False
except:
return False
# Comic路径类
class ComicPath:
PREFIX_SCRAMBLE = "scramble="
@classmethod
def getYearMonthDay(cls):
today = date.today()
# 格式化为年-月-日
return today.strftime("%Y%m%d")
@classmethod
def getDirComicChapter(cls, item, categorize=""):
comic = ComicLoader(item=item)
return os.path.join(OUTPUT_DIR, comic.get_project_name(), categorize, comic.get_name(), comic.get_chapter())
@classmethod
def getDirJosnComicChapter(cls, item):
return cls.getDirComicChapter(item=item, categorize="json")
@classmethod
def getFileScrambleImageName(cls,count,block,suffix=".jpg"): return cls.PREFIX_SCRAMBLE+str(block)+"_"+str(count)+suffix
@classmethod
def getFileScrambleImageSave(cls,file,relative=False, is_prefix=True):
file_name = str(file).split("_")[-1]
if relative:
file_name = os.path.basename(file_name)
if relative == "fullpath":
file_name = os.path.join(os.path.dirname(file), file_name)
if not is_prefix:
return file_name.split(".")[0]
else:
return file_name
#繁体中文转简体中文
@classmethod
def chinese_convert(cls, text,convert='t2s'): return OpenCC(convert).convert(str(text))
#处理成符合规定的文件名
@classmethod
def fix_file_name(cls, filename, replace=None):
if not isinstance(filename, str):
return filename
in_tab = r'[?*/\|.:><]'
str_replace = ""
if replace is not None:
str_replace = replace
filename = re.sub(in_tab, str_replace, filename)
count = 1
while True:
str_file = filename[0-count]
if str_file == " ":
count += 1
else:
filename = filename[0:len(filename)+1-count]
break
return filename
@classmethod
def new_file_name(cls, name): return cls.fix_file_name(cls.chinese_convert(name))
@classmethod
def get_file_path(cls, item, result_type="image", file=None, convert=False, chapter=None):
PROJECT = ComicLoader(item=item).get_project_name()
if not convert:
name = item['name']
if chapter == None: chapter = item['chapter']
else:
name = cls.fix_file_name(cls.chinese_convert(item['name']))
if chapter == None: chapter = cls.fix_file_name(cls.chinese_convert(item['chapter']))
if result_type == "image":
if os.path.sep not in file:
file = os.path.join(PROJECT, "images", name, chapter, file)
elif result_type == "comic_info":
file = os.path.join(PROJECT, "images", name, chapter)
elif result_type == "cbz_icon":
file = os.path.join(settings.CBZ_EXPORT_PATH, PROJECT, name, chapter+".jpg")
elif result_type == "down_icon":
file = os.path.join(settings.IMAGES_STORE, cls.get_file_path(item=item, result_type="icon"))
elif result_type == "down_cache_icon":
file = os.path.join(settings.IMAGES_STORE, cls.get_file_path(item=item, result_type="icon_cache"))
elif result_type == "icon":
file = os.path.join(PROJECT, "icons", name, name+".jpg")
elif result_type == "icon_cache":
file = os.path.join(PROJECT, "icons", ".cache", name+".jpg")
elif result_type == "cbz":
file = os.path.join(settings.CBZ_EXPORT_PATH, PROJECT, name, chapter+".CBZ")
elif result_type == "images_dir":
file = os.path.join(settings.IMAGES_STORE, PROJECT, "images", name, chapter)
else:
raise ValueError(f"Unsupported result_type: {result_type}")
return file
@classmethod
def path_cbz(cls, item):
return cls.get_file_path(item, result_type="cbz", convert=True)
@classmethod
def images_icon(cls, file, count):
if count == 0: return file
name, suffix = os.path.splitext(file)
return name+"-"+str(count)+suffix
# 通知类
class ntfy:
@classmethod
def sendMsg(cls, msg,alert=False,sleep=None,error=None):
try:
print(f"#ntfy: {msg}")
if alert:
requests.post("https://ntfy.caiwenxiu.cn/PyComic",
data=msg.encode(encoding='utf-8'))
except:
print(f"#ntfy error: {msg}")
if sleep != None:
logging.info(f'等待{sleep}秒后进入下一阶段')
time.sleep(int(sleep))
if error != None:
print(f"#ntfy Error: {error}")
return False
else:
return True