960 lines
39 KiB
Python
960 lines
39 KiB
Python
import base64,hashlib,os,shutil,os.path,pathlib
|
||
import math,time,json,datetime,logging
|
||
import re,requests,time,xmlschema
|
||
from datetime import date
|
||
from Comics import settings
|
||
from opencc import OpenCC
|
||
from PIL import Image
|
||
from pathlib import Path
|
||
from zipfile import ZipFile
|
||
from Comics.settings import COMIC_INFO_XML_FILE,OUTPUT_DIR,PROJECT_KEY
|
||
import yaml
|
||
from Comics.loader import ComicLoader
|
||
from tinydb import TinyDB, Query
|
||
|
||
# 配置类
|
||
class Conf():
|
||
# 读取yml文件配置
|
||
# @project 根据工程名读取配置 project.yml
|
||
# @key 读取key内的字典的数据(默认为空)
|
||
#def init(self, project, key=None):
|
||
# data = None
|
||
# if project == None: project = "config"
|
||
# with open(os.path.join("Comics","spiders", project)+".yml") as f:
|
||
# data = yaml.load(f, Loader=yaml.FullLoader)
|
||
# if key != None and data != None:
|
||
# return data[key]
|
||
def get_config_value(self, project, key=None):
|
||
# 使用Path类来处理文件路径
|
||
config_path = Path(os.path.join("Comics","spiders", project)+".yml")
|
||
#Path("Comics") / "spiders" / (project + ".yml")
|
||
# 检查项目是否存在
|
||
if not config_path.is_file():
|
||
return None
|
||
# 打开文件并加载配置数据
|
||
try:
|
||
with config_path.open('r') as f:
|
||
data = yaml.safe_load(f)
|
||
except yaml.YAMLError as e:
|
||
print(f"Error loading YAML file: {e}")
|
||
return None
|
||
# 检查key是否存在
|
||
if key is not None and key in data:
|
||
return data[key]
|
||
else:
|
||
return None
|
||
|
||
# 根据读取的配置数据导入到ComicLoader中
|
||
def comic(self, project, item: ComicLoader, child_data='data', val=None):
|
||
item.project_name(project)
|
||
data = self.get_config_value(project, child_data)
|
||
for key, xpath_data in data.items():
|
||
if isinstance(xpath_data, str): xpath_data = {'xpath': xpath_data}
|
||
xpath = xpath_data.get('xpath', None)
|
||
index = xpath_data.get('index', None)
|
||
value = xpath_data.get('value', None) if val is None else val
|
||
sexec = xpath_data.get('sexec', None)
|
||
item.set_properties(name=key, value=value, xpath=xpath, index=index, sexec=sexec)
|
||
return item
|
||
|
||
def parse_chapter(self,item: ComicLoader, value):
|
||
return self.comic(item.get_project_name(), item, "parse_chapter", value)
|
||
|
||
def parse_chapter_api(self, item: ComicLoader, value):
|
||
return self.comic(item.get_project_name(), item, "parse_chapter_api", value)
|
||
|
||
# 文件操作类
|
||
class fileUtils:
|
||
|
||
# 文件是否存在
|
||
@classmethod
|
||
def exists(cls, path): return os.path.exists(path)
|
||
|
||
# 文件路径拼接
|
||
@classmethod
|
||
def join(cls, path, *paths): return os.path.join(path, *paths);
|
||
|
||
# 文件夹名
|
||
@classmethod
|
||
def dirname(cls, path): return os.path.dirname(path);
|
||
|
||
# 文件名
|
||
@classmethod
|
||
def basename(cls, path): return os.path.basename(path);
|
||
|
||
# 保存文件
|
||
@classmethod
|
||
def write(cls,path,data,type='file'):
|
||
root_dir = os.path.dirname(path)
|
||
if not os.path.exists(root_dir): os.makedirs(root_dir)
|
||
with open(path,'w',encoding='utf-8') as fs:
|
||
if type == 'file':
|
||
fs.write(data)
|
||
elif type == 'json':
|
||
fs.write(json.dumps(data,ensure_ascii=False,indent=4))
|
||
elif type == 'yaml':
|
||
fs.write(yaml.dump(data, allow_unicode=True, default_flow_style=False, indent=4))
|
||
|
||
# 返回校验后的文件路径
|
||
@classmethod
|
||
def path(cls, file):
|
||
base_dir = os.path.dirname(file)
|
||
if not os.path.exists(base_dir): os.makedirs(base_dir)
|
||
return file
|
||
|
||
# 比较文件大小
|
||
@classmethod
|
||
def compare_size(cls, dst, file):
|
||
if cls.exists(dst) and cls.exists(file):
|
||
return os.stat(dst).st_size == os.stat(file).st_size
|
||
else:
|
||
return None
|
||
|
||
# 读取文件
|
||
@classmethod
|
||
def read(cls, file, type='file'):
|
||
if os.path.exists(file):
|
||
with open(file, "r", encoding="utf-8") as fs:
|
||
if type == "json":
|
||
return json.loads(fs.read())
|
||
elif type == 'yaml':
|
||
return yaml.safe_load(fs.read())
|
||
elif type == 'file':
|
||
return fs.read()
|
||
else:
|
||
return []
|
||
|
||
@classmethod
|
||
def get_file_md5(cls, file_path):
|
||
md5 = hashlib.md5()
|
||
with open(file_path, 'rb') as f:
|
||
for chunk in iter(lambda: f.read(4096), b''):
|
||
md5.update(chunk)
|
||
return md5.hexdigest()
|
||
|
||
|
||
|
||
@classmethod
|
||
def write_json(cls, path, data): cls.write(path, data , type="json")
|
||
|
||
@classmethod
|
||
def read_json(cls, file): return cls.read(file, type="json")
|
||
|
||
"""
|
||
图像编号 image-1.jpg
|
||
如:存在image.png 返回 image-1.png 反之 image.png
|
||
"""
|
||
@classmethod
|
||
def file_check(cls, file, result="file", count=0):
|
||
# 初始化临时文件名 文件信息 文件名列表
|
||
temp_file_name, files_data, files_name = [file, {}, []]
|
||
# 默认文件名不存在
|
||
if not cls.exists(temp_file_name) and temp_file_name == file: count = 1
|
||
while count or count == 0:
|
||
# 获取格式化文件名
|
||
temp_file_name = ComicPath().images_icon(file=file, count=count)
|
||
# 格式化文件名存在
|
||
if cls.exists(temp_file_name):
|
||
# 保存存在的文件名
|
||
files_name.append(temp_file_name)
|
||
file_size = os.path.getsize(temp_file_name)
|
||
# 保存文件名和大小数据
|
||
files_data[file_size] = {"name": temp_file_name, "size": file_size}
|
||
# 格式化文件名
|
||
# temp_file_name = ComicPath().images_icon(file=file, count=count)
|
||
count += 1
|
||
else:
|
||
# 检测是否有重复数据
|
||
# 提取重复并需删除的文件名
|
||
diff_names = {value["name"] for value in files_data.values()}
|
||
# 不存在则返回原文件名
|
||
if len(diff_names) == 0:
|
||
# 默认为空
|
||
if result == "size": return diff_names
|
||
# 原文件名
|
||
else: return file
|
||
for file_name in files_name:
|
||
if file_name not in diff_names:
|
||
logging.info(f"删除文件:{file_name}")
|
||
os.remove(file_name)
|
||
|
||
# 判断是否存在初始文件和多个文件名
|
||
if len(diff_names) == 1 and file not in diff_names:
|
||
for diff_file in diff_names:
|
||
logging.info(f"移动文件{diff_file}到 {file}")
|
||
shutil.move(diff_file, file)
|
||
if file in diff_names and len(diff_names) > 1:
|
||
move_file = ComicPath().images_icon(file=file, count=count)
|
||
logging.info(f"移动文件{file}到 {move_file}")
|
||
shutil.move(file, move_file)
|
||
cls.file_check(file=file,result=result,count=0)
|
||
# 去重后文件名数与存在的文件名数不存在则证明文件存在重复,重新运行本方法
|
||
if len(set(diff_names)) != len(set(files_name)): cls.file_check(file, result=result,count=0)
|
||
|
||
if result == "size":
|
||
return {value["size"] for value in files_data.values()}
|
||
else:
|
||
return temp_file_name
|
||
|
||
|
||
# 判断文件是否更新
|
||
@classmethod
|
||
def file_update(cls, old_file, new_file):
|
||
is_update = False
|
||
if os.path.exists(old_file): is_update = os.path.getsize(old_file) not in cls.file_check(new_file, result="size")
|
||
return is_update
|
||
|
||
# 判断是否需要更新封面
|
||
@classmethod
|
||
def update_icon(cls, image_path, save_path):
|
||
# 不存在则更新
|
||
if cls.file_update(image_path, save_path):
|
||
save_dir = os.path.dirname(save_path)
|
||
if not os.path.exists(save_dir): os.makedirs(save_dir)
|
||
logging.info(f"update icon ... {image_path} ===> {cls.file_check(save_path)}")
|
||
shutil.copyfile(image_path, cls.file_check(save_path))
|
||
|
||
# 公共工具类
|
||
class CommonUtils:
|
||
@classmethod
|
||
def parseExec(cls,data,exec):
|
||
if data !=None and exec != None:
|
||
dots = str(exec).split(".")
|
||
if not isinstance(data,dict): data = json.loads(data)
|
||
for dot in dots:
|
||
data = data.get(dot)
|
||
return data
|
||
|
||
@classmethod
|
||
def _validate_xml(cls,xml_file, xsd_file):
|
||
# 读取XSD文件
|
||
xsd = xmlschema.XMLSchema(xsd_file)
|
||
|
||
# 验证XML
|
||
is_valid = xsd.is_valid(xml_file)
|
||
|
||
if is_valid:
|
||
print("XML文件通过XSD验证成功!")
|
||
else:
|
||
print("XML文件未通过XSD验证。以下是验证错误信息:")
|
||
validation_errors = xsd.to_errors(xml_file)
|
||
for error in validation_errors:
|
||
print(error)
|
||
|
||
@classmethod
|
||
def validate_comicinfo_xml(cls, xml_file):
|
||
cls._validate_xml(xml_file, "ComicInfo.xsd")
|
||
|
||
|
||
# 图片处理类
|
||
class imageUtils:
|
||
|
||
@classmethod
|
||
def descramble_images_by_dir(cls, chapter_dir):
|
||
if os.path.isfile(chapter_dir):
|
||
chapter_dir = os.path.dirname(chapter_dir)
|
||
scramble_count = 0
|
||
if os.path.exists(chapter_dir): #获取章节图片路径
|
||
while ComicPath.PREFIX_SCRAMBLE in os.listdir(chapter_dir):
|
||
for img in os.listdir(chapter_dir):
|
||
if img.startswith(ComicPath.PREFIX_SCRAMBLE):
|
||
imageUtils.encode_scramble_image(os.path.join(chapter_dir, img))
|
||
scramble_count += 1
|
||
logging.debug(f"{ComicPath.PREFIX_SCRAMBLE} {scramble_count}")
|
||
return scramble_count
|
||
|
||
@classmethod
|
||
def deScrambleImagesByPath(cls, img_path, img_save=None):
|
||
if os.path.basename(img_path).\
|
||
startswith(ComicPath.PREFIX_SCRAMBLE) and os.path.exists(img_path):
|
||
img_path = imageUtils.encode_scramble_image(img_path, img_save)
|
||
return img_path
|
||
|
||
@classmethod
|
||
def encodeImage(cls,str_en):
|
||
#print("en",str_en)
|
||
enc = base64.b64decode(str_en)
|
||
#print("解密:",enc)
|
||
m = hashlib.md5()
|
||
m.update(enc)
|
||
md5 = m.digest()
|
||
d = md5[-1]
|
||
#print(md5)
|
||
try:
|
||
blocks = d % 10 + 5
|
||
except:
|
||
blocks = 0 %10 + 5
|
||
#print("blocks=",blocks)
|
||
return blocks
|
||
|
||
@classmethod
|
||
def scrambleImage(cls,file_path):
|
||
#检测到未下载完的图像 直接返回None
|
||
if str(file_path).endswith(".downloads"):
|
||
os.remove(file_path)
|
||
return None
|
||
file_str = str(file_path).split("=")
|
||
#10_29.jpg
|
||
base_dir = file_str[0].replace("scramble","")
|
||
base_name = file_str[-1]
|
||
base_fn = base_name.split("_")
|
||
save_name = base_fn[1]
|
||
save_name_delesu = save_name.split(".")[0]
|
||
blocks = int(base_fn[0])
|
||
save_file_path = os.path.join(base_dir,save_name)
|
||
print("sva",save_file_path)
|
||
if os.path.exists(save_file_path):
|
||
print("图片已解密,已跳过:", save_file_path)
|
||
return None
|
||
image_su = str(file_path).split(".")[-1]
|
||
try:
|
||
img = Image.open(file_path)
|
||
except:
|
||
print(f"error Image: {file_path}")
|
||
width = img.width
|
||
height = img.height
|
||
#blocks = cls.encodeImage(enStr)
|
||
print("blocks=",blocks)
|
||
block_height = int(height / blocks)
|
||
block_width = int(width / blocks)
|
||
print("blockHeight=",block_height)
|
||
suffix = str(file_path).split(".")[-1]
|
||
split_path = os.path.join(base_dir,save_name_delesu+"split")
|
||
if image_su == "downloads":
|
||
return None
|
||
is_split = cls.splitimage(file_path,blocks,1,split_path)
|
||
if is_split != None:
|
||
cls.image_compose(split_path,blocks,1,save_file_path,block_height,width)
|
||
else:
|
||
if os.path.exists(split_path):
|
||
shutil.rmtree(split_path)
|
||
if os.path.exists(file_path):
|
||
shutil.move(file_path, save_file_path)
|
||
#完成后清空
|
||
return file_path
|
||
|
||
@classmethod
|
||
def splitimage(cls,src,rownum,colnum,dstpath):
|
||
img=Image.open(src)
|
||
w,h=img.size
|
||
if rownum<= h and colnum<=w:
|
||
s=os.path.split(src)
|
||
if dstpath=='':
|
||
dstpath = s[0]
|
||
if not os.path.exists(dstpath):
|
||
os.makedirs(dstpath)
|
||
fn=s[1].split('.')
|
||
basename=fn[0]
|
||
ext=fn[-1]
|
||
num=0
|
||
rowheight=h//rownum
|
||
colwidth=w//colnum
|
||
for r in range(rownum):
|
||
for c in range(colnum):
|
||
box=(c*colwidth,r*rowheight,(c+1)*colwidth,(r+1)*rowheight)
|
||
count_image = "{:0>3d}".format(num)
|
||
file_path = os.path.join(dstpath,str(count_image)+'.'+ext)
|
||
print("file_path=",file_path)
|
||
img.crop(box).save(file_path)
|
||
num=num+1
|
||
return "成功"
|
||
else:
|
||
print('不数!')
|
||
return None
|
||
|
||
@classmethod
|
||
def image_compose(cls,src,row,column,save_path,image_height,image_width):
|
||
image_size = image_height
|
||
#image_height = 376
|
||
#image_width = 720
|
||
images_format = ['.png','.jpg']
|
||
|
||
#image_names = [name for name in os.listdir(src) for item in images_format if
|
||
# os.path.splitext(name)[1] == item][::-1]
|
||
img_list=os.listdir(src)
|
||
img_list.sort()
|
||
img_list.sort(key=lambda x: int(x[:-4]))
|
||
##文件名按数字排序
|
||
img_nums=len(img_list)
|
||
image_names = []
|
||
for i in range(img_nums):
|
||
img_name=os.path.join(src,img_list[i])
|
||
image_names.append(img_name)
|
||
#使用倒序
|
||
image_names = image_names[::-1]
|
||
# 简单的对于参数的设定和实际图片集的大小进行数量判断
|
||
if len(image_names) < row * column:
|
||
raise ValueError("合成图片的参数和要求的数量不能匹配!")
|
||
|
||
to_image = Image.new('RGB', (column * image_width, row * image_height)) #创建一个新图
|
||
# 循环遍历,把每张图片按顺序粘贴到对应位置上
|
||
for y in range(1, row + 1):
|
||
for x in range(1, column + 1):
|
||
#1 * (row=1 -1) col=1 -1
|
||
image_path = image_names[column * (y - 1) + x - 1]
|
||
print("split_image=",image_path)
|
||
from_image = Image.open(image_path)
|
||
#保持原图片大小
|
||
#.resize(
|
||
# (image_size, image_size),Image.ANTIALIAS)
|
||
to_image.paste(from_image, ((x - 1) * image_size, (y - 1) * image_size))
|
||
from_image.close()
|
||
to_image.save(save_path)
|
||
print("图片合并完成:", save_path)
|
||
shutil.rmtree(src)
|
||
# 保存新图
|
||
|
||
@classmethod
|
||
def getScrambleImage(cls,path):
|
||
scramble_file_cache = cls.scrambleImage(path)
|
||
if scramble_file_cache != None and os.path.exists(scramble_file_cache): os.remove(scramble_file_cache)
|
||
|
||
@classmethod
|
||
def encode_scramble_image(cls, img_path, img_save=None):
|
||
if not os.path.exists(img_path):
|
||
return
|
||
image = Image.open(img_path)
|
||
w, h = image.size
|
||
#image.show()
|
||
file_str = str(img_path).split("=")
|
||
#10_29.jpg
|
||
base_fn = file_str[-1].split("_")
|
||
blocks = int(base_fn[0])
|
||
if img_save == None:
|
||
save_path = os.path.join(os.path.dirname(img_path),ComicPath.getFileScrambleImageSave(img_path))
|
||
else: save_path = img_save
|
||
# print(type(aid),type(img_name))
|
||
if blocks:
|
||
s = blocks # 随机值
|
||
# print(s)
|
||
l = h % s # 切割最后多余的值
|
||
box_list = []
|
||
hz = 0
|
||
for i in range(s):
|
||
c = math.floor(h / s)
|
||
g = i * c
|
||
hz += c
|
||
h2 = h - c * (i + 1) - l
|
||
if i == 0:
|
||
c += l;hz += l
|
||
else:
|
||
g += l
|
||
box_list.append((0, h2, w, h - g))
|
||
|
||
# print(box_list,len(box_list))
|
||
item_width = w
|
||
# box_list.reverse() #还原切图可以倒序列表
|
||
# print(box_list, len(box_list))
|
||
newh = 0
|
||
image_list = [image.crop(box) for box in box_list]
|
||
# print(box_list)
|
||
newimage = Image.new("RGB", (w, h))
|
||
for image in image_list:
|
||
# image.show()
|
||
b_w, b_h = image.size
|
||
newimage.paste(image, (0, newh))
|
||
|
||
newh += b_h
|
||
newimage.save(save_path)
|
||
logging.info(f"解密成功 {save_path}")
|
||
if os.path.exists(img_path):
|
||
os.remove(img_path)
|
||
logging.debug(f"remove {img_path}")
|
||
return save_path
|
||
|
||
# 压缩工具类
|
||
class CBZUtils:
|
||
|
||
@classmethod
|
||
def readDirsOrFiles(cls, dir, type):
|
||
data = []
|
||
files = os.listdir(dir)
|
||
for file in files:
|
||
path = os.path.join(dir, file)
|
||
if type == "files" and os.path.isfile(path):
|
||
data.append(path)
|
||
if type == "dirs" and os.path.isdir(path):
|
||
data.append(path)
|
||
return data
|
||
|
||
@classmethod
|
||
def zip_compression(cls, source_dir=None, target_file=None, remove=True):
|
||
target_dir = os.path.dirname(target_file)
|
||
if not os.path.exists(target_dir):
|
||
os.makedirs(target_dir)
|
||
if not os.path.exists(target_file) and source_dir is not None:
|
||
with ZipFile(target_file, mode='w') as zf:
|
||
for path, dir_names, filenames in os.walk(source_dir):
|
||
path = Path(path)
|
||
arc_dir = path.relative_to(source_dir)
|
||
y = 0
|
||
for filename in filenames:
|
||
y = y + 1
|
||
print("打包中:" + str(y) + "/" + str(len(filenames)), os.path.join(source_dir, filename))
|
||
zf.write(path.joinpath(filename), arc_dir.joinpath(filename))
|
||
zf.close()
|
||
logging.info(f"打包完成:{target_file}")
|
||
|
||
@classmethod
|
||
def packComicChapterCBZ(cls, src_dir, dts_path, comic_info_images, remove=True):
|
||
if os.path.exists(src_dir):
|
||
dirs = os.listdir(src_dir)
|
||
for file in dirs:
|
||
if file.startswith(ComicPath.PREFIX_SCRAMBLE):
|
||
try:
|
||
imageUtils.deScrambleImagesByPath(os.path.join(src_dir,file))
|
||
except Exception as e:
|
||
print(f"删除 {file} 发生错误 {e},已跳过")
|
||
return False
|
||
cls.zip_compression(src_dir, dts_path)
|
||
time.sleep(0.1)
|
||
if remove: shutil.rmtree(src_dir)
|
||
# validation
|
||
return cls.cbz_validate(dts_path, comic_info_images)
|
||
|
||
@classmethod
|
||
def replaceZip(cls, filepath, unpack_dir=None):
|
||
if not cls.compareFileDate(filepath): return None
|
||
if unpack_dir == None:
|
||
unpack_dir = str(filepath).split(".")[0]
|
||
fz = ZipFile(filepath, 'r')
|
||
for file in fz.namelist():
|
||
if file.endswith(".jpg"):
|
||
data = fz.read(file)
|
||
if len(data) < 500 and os.path.exists(filepath):
|
||
os.remove(filepath)
|
||
print(f"数据不完整,已删除:{filepath}")
|
||
if cls.compareFileDate(filepath):
|
||
os.utime(filepath)
|
||
print(f"已更新文件时间 {filepath}")
|
||
if os.path.exists(unpack_dir):
|
||
shutil.rmtree(unpack_dir)
|
||
# 删除删除main.ftl文件
|
||
# delete_filename = ''
|
||
# if os.path.exists(delete_filename):
|
||
# os.remove(delete_filename)
|
||
# time.sleep(60)
|
||
# shutil.copy(文件的路径,另一个目录);拷贝main.ftl到准备压缩的目录下
|
||
# cls.zip_compression()
|
||
# 小于则运行
|
||
|
||
@classmethod
|
||
def compareFileDate(cls, filepath):
|
||
if os.path.exists(filepath):
|
||
ctime = os.path.getmtime(filepath)
|
||
str_ctime = datetime.fromtimestamp(int(ctime))
|
||
file_ctime = str(str_ctime.year) + "{:0>2d}".format(str_ctime.month) + "{:0>2d}".format(
|
||
str_ctime.day) + "{:0>2d}".format(str_ctime.hour)
|
||
c_ctime = 2023011603
|
||
else:
|
||
return False
|
||
if int(file_ctime) < c_ctime:
|
||
return True
|
||
return False
|
||
|
||
@classmethod
|
||
def zip_info(cls, path, filter=True):
|
||
result = None
|
||
try:
|
||
with ZipFile(path, "r") as zip_file:
|
||
result = zip_file.namelist()
|
||
if filter:
|
||
result.remove(COMIC_INFO_XML_FILE)
|
||
except Exception as e:
|
||
print(e)
|
||
return result
|
||
|
||
@classmethod
|
||
def cbz_validate(cls, zip_path, comic_info_images):
|
||
cbz_info = cls.zip_info(zip_path)
|
||
if len(cbz_info) == len(comic_info_images):
|
||
# logging.info(f"validating successfully === {zip_path}")
|
||
# ntfy.sendMsg(f"validating successfully === {zip_path}", alert=True)
|
||
ntfy.gotify(title="漫画下载完成", message=f"validating successfully === {zip_path}")
|
||
return True
|
||
else:
|
||
os.remove(zip_path)
|
||
ntfy.gotify(title="漫画校验失败", message=f"validating fail === {zip_path}, cbz_info={cbz_info},zip_info={comic_info_images}")
|
||
#ntfy.sendMsg(f"validating fail === {zip_path}, cbz_info={cbz_info},zip_info={comic_info_images}", alert=True)
|
||
return False
|
||
|
||
# 检测工具类
|
||
class checkUtils:
|
||
|
||
def read(self, item):
|
||
file = os.path.join(OUTPUT_DIR, ComicLoader(item=item).get_project_name(), "error_comics.json")
|
||
return fileUtils.read(file)
|
||
#
|
||
# 检测某一章节是否连续错误
|
||
def export_error(self, item):
|
||
if not self.is_error(item):
|
||
file = os.path.join(OUTPUT_DIR, ComicLoader(item=item).get_project_name(), "error_comics.json")
|
||
try:
|
||
error_comic = eval(self.read(item))
|
||
except:
|
||
error_comic = []
|
||
error_comic.append({ "name" : ComicPath.new_file_name(item['name']),
|
||
"chapter" : ComicPath.new_file_name(item['chapter']),
|
||
"date" : ComicPath().getYearMonthDay()})
|
||
fileUtils.save_file(file, json.dumps(error_comic))
|
||
|
||
def is_error(self, item):
|
||
try:
|
||
for error_c in eval(self.read(item)):
|
||
(name, chatper, date) = [error_c['name'], error_c['chapter'], error_c['date']]
|
||
if ComicPath.new_file_name(item['name']) == ComicPath.new_file_name(name) and ComicPath.new_file_name(item['chapter']) == ComicPath.new_file_name(chatper):
|
||
return True
|
||
else:
|
||
return False
|
||
except:
|
||
return False
|
||
|
||
# Comic路径类
|
||
class ComicPath:
|
||
ci, project, name, chapter = [ None, "", "", ""]
|
||
|
||
def __init__(self, item = None):
|
||
if item == None: return
|
||
else:
|
||
self.ci : ComicLoader = ComicLoader(item=item)
|
||
if self.ci.get_project_name() != None: self.project = self.fix_file_name(self.ci.get_project_name())
|
||
if self.ci.get_name() != None: self.name = self.fix_file_name(self.ci.get_name())
|
||
if self.ci.get_chapter() != None: self.chapter = self.fix_file_name(self.ci.get_chapter())
|
||
|
||
PREFIX_SCRAMBLE = "scramble="
|
||
|
||
MAPPING_IMAGE = "image"
|
||
MAPPING_COMIC_INFO = "comic_info"
|
||
MAPPING_CBZ_ICON = "cbz_icon"
|
||
MAPPING_DOWN_ICON = "down_icon"
|
||
MAPPING_DOWN_CACHE_ICON = "down_cache_icon"
|
||
MAPPING_ICON = "icon"
|
||
MAPPING_ICON_CACHE = "icon_cache"
|
||
MAPPING_CBZ = "cbz"
|
||
MAPPING_CBZ_DIR = "cbz_dir"
|
||
MAPPING_OLD_CBZ_DIR = "old_cbz_dir"
|
||
MAPPING_IMAGES_DIR = "images_dir"
|
||
MAPPING_COMIC_JSON = "comic_json"
|
||
|
||
def PATH_MAPPING(self):
|
||
if self.project == None or self.name == None or self.chapter == None: return None
|
||
return {
|
||
self.MAPPING_IMAGE: os.path.join(self.project, "images", self.name, self.chapter),
|
||
self.MAPPING_COMIC_INFO: os.path.join(self.project, "images", self.name, self.chapter),
|
||
self.MAPPING_CBZ_ICON: os.path.join(settings.CBZ_EXPORT_PATH, self.project, self.name, self.chapter+".jpg"),
|
||
self.MAPPING_DOWN_ICON: os.path.join(settings.IMAGES_STORE, self.project, "icons", self.name, self.name+".jpg"),
|
||
self.MAPPING_DOWN_CACHE_ICON: os.path.join(settings.IMAGES_STORE, self.project, "icons", ".cache", self.name+".jpg"),
|
||
self.MAPPING_ICON: os.path.join(self.project, "icons", self.name, self.name+".jpg"),
|
||
self.MAPPING_ICON_CACHE: os.path.join(self.project, "icons", ".cache", self.name+".jpg"),
|
||
self.MAPPING_CBZ: os.path.join(settings.CBZ_EXPORT_PATH, self.project, self.name, self.chapter+".CBZ"),
|
||
self.MAPPING_IMAGES_DIR: os.path.join(settings.IMAGES_STORE, self.project, "images", self.name, self.chapter),
|
||
self.MAPPING_COMIC_JSON: os.path.join(settings.IMAGES_STORE, self.project, "json", self.name, self.chapter+".json"),
|
||
self.MAPPING_CBZ_DIR: os.path.join(settings.CBZ_EXPORT_PATH, self.project, self.name),
|
||
self.MAPPING_OLD_CBZ_DIR: os.path.join(settings.OLD_CBZ_EXPORT_PATH, self.project, self.name)
|
||
}
|
||
|
||
def PATH_CBZ(self, result_type=MAPPING_CBZ): return self.file_path(result_type=result_type)
|
||
|
||
def getDirJosnComicChapter(self, result_type=MAPPING_COMIC_JSON): return self.file_path(result_type=result_type)
|
||
|
||
def file_path(self, result_type=MAPPING_IMAGE, file=None, convert=True, chapter=None):
|
||
if chapter != None: self.chapter = chapter
|
||
path = self.PATH_MAPPING().get(result_type, None)
|
||
if result_type == self.MAPPING_IMAGE and os.path.sep not in file:
|
||
return os.path.join(path, file)
|
||
if convert:
|
||
path = self.chinese_convert(path)
|
||
return path
|
||
|
||
@classmethod
|
||
def getYearMonthDay(cls):
|
||
today = date.today()
|
||
# 格式化为年-月-日
|
||
return today.strftime("%Y%m%d")
|
||
|
||
#@classmethod
|
||
#def getDirComicChapter(cls, result_type=): return cls.file_path(result_type=result_type)
|
||
|
||
@classmethod
|
||
def getFileScrambleImageName(cls,count,block,suffix=".jpg"): return cls.PREFIX_SCRAMBLE+str(block)+"_"+str(count)+suffix
|
||
|
||
@classmethod
|
||
def getFileScrambleImageSave(cls,file,relative=False, is_prefix=True):
|
||
file_name = str(file).split("_")[-1]
|
||
if relative:
|
||
file_name = os.path.basename(file_name)
|
||
if relative == "fullpath":
|
||
file_name = os.path.join(os.path.dirname(file), file_name)
|
||
if not is_prefix:
|
||
return file_name.split(".")[0]
|
||
else:
|
||
return file_name
|
||
|
||
#繁体中文转简体中文
|
||
@classmethod
|
||
def chinese_convert(cls, text,convert='t2s'): return OpenCC(convert).convert(str(text))
|
||
|
||
#处理成符合规定的文件名
|
||
@classmethod
|
||
def fix_file_name(cls, filename, replace=None):
|
||
if not isinstance(filename, str):
|
||
return filename
|
||
in_tab = r'[?*/\|.:><]'
|
||
str_replace = ""
|
||
if replace is not None:
|
||
str_replace = replace
|
||
filename = re.sub(in_tab, str_replace, filename)
|
||
count = 1
|
||
while True:
|
||
str_file = filename[0-count]
|
||
if str_file == " ":
|
||
count += 1
|
||
else:
|
||
filename = filename[0:len(filename)+1-count]
|
||
break
|
||
return filename
|
||
|
||
@classmethod
|
||
def new_file_name(cls, name): return cls.fix_file_name(cls.chinese_convert(name))
|
||
|
||
@classmethod
|
||
def images_icon(cls, file, count):
|
||
if count == 0: return file
|
||
name, suffix = os.path.splitext(file)
|
||
return name+"-"+str(count)+suffix
|
||
|
||
# 通知类
|
||
class ntfy:
|
||
@classmethod
|
||
def sendMsg(cls, msg,alert=False,sleep=None,error=None):
|
||
try:
|
||
print(f"#ntfy: {msg}")
|
||
if alert:
|
||
requests.post("https://ntfy.caiwenxiu.cn/PyComic",
|
||
data=msg.encode(encoding='utf-8'))
|
||
except:
|
||
print(f"#ntfy error: {msg}")
|
||
if sleep != None:
|
||
logging.info(f'等待{sleep}秒后进入下一阶段')
|
||
time.sleep(int(sleep))
|
||
if error != None:
|
||
print(f"#ntfy Error: {error}")
|
||
return False
|
||
else:
|
||
return True
|
||
|
||
@classmethod
|
||
def gotify(cls,title, message):
|
||
# 替换为您的 Gotify 服务器 URL
|
||
GOTIFY_URL = 'https://gotify.caiwenxiu.cn'
|
||
# 替换为您的 Gotify 应用程序令牌
|
||
GOTIFY_TOKEN = 'Aq3balsIgPjwD6D'
|
||
# 通知的标题和消息
|
||
#title = "Test Notification"
|
||
#message = "This is a test message from Python script."
|
||
# 要发送的通知数据
|
||
payload = {
|
||
'title': title,
|
||
'message': message,
|
||
'priority': 5
|
||
}
|
||
# 发送 POST 请求到 Gotify 服务器
|
||
response = requests.post(
|
||
f'{GOTIFY_URL}/message',
|
||
headers={'X-Gotify-Key': GOTIFY_TOKEN},
|
||
json=payload
|
||
)
|
||
|
||
# 检查响应状态码
|
||
if response.status_code == 200:
|
||
print("Notification sent successfully!")
|
||
else:
|
||
print(f"Failed to send notification. Status code: {response.status_code}")
|
||
print(response.json())
|
||
|
||
|
||
class logger:
|
||
def log_image_download(self, image_path):
|
||
if image_path == "":
|
||
logging.info(f"icon file exists: IMAGE_STORE {image_path}")
|
||
else:
|
||
logging.info(f"file exists: IMAGE_STORE {image_path}")
|
||
|
||
def log_image_not_downloaded(self, image_path):
|
||
logging.info(f"file does not exist: IMAGE_STORE {image_path}")
|
||
|
||
class ItemIconUtils:
|
||
def __init__(self, path, icon_item=None):
|
||
if icon_item != None: self.icon_item = icon_item
|
||
else: self.icon_item = []
|
||
self.path = path
|
||
if path != None:
|
||
self.read_icon_json()
|
||
|
||
def add(self, name, path, size, md5, url=None):
|
||
self.icon_item.append({ 'name': name, 'path': path, 'size': size, 'md5':md5, 'url': url })
|
||
|
||
def toString(self):
|
||
return self.icon_item
|
||
|
||
def _get_values(self, key): return [ item.get(key) for item in self.icon_item ]
|
||
|
||
def get_names(self): return self._get_values("name")
|
||
|
||
def get_file_count_by_md5(self, md5):
|
||
md5_list = self._get_values("md5")
|
||
md5_count = md5_list.count(md5)
|
||
if md5_count > 1:
|
||
logging.info(f" {md5_list} == 存在md5重复文件")
|
||
return md5_count
|
||
|
||
def get_file_by_key_value(self, key, value):
|
||
list_item = []
|
||
for item in self.icon_item:
|
||
if item.get(key) == value: return list_item.append(item)
|
||
return list_item
|
||
|
||
def get_file_by_md5(self, md5):
|
||
list_item = []
|
||
for item in self.icon_item:
|
||
if item.get("md5") == md5 and self.get_file_count_by_md5(md5) == 1: list_item.append(item)
|
||
return list_item
|
||
|
||
def get_file_by_url(self, url): return self.get_file_by_key_value('url', url)
|
||
|
||
def update_file_by_md5_to_url(self, md5, url):
|
||
file_item = self.get_file_by_md5(md5)
|
||
if len(file_item) > 1 :
|
||
logging.info(f" {file_item} == 存在多个结果")
|
||
if len(file_item) == 1:
|
||
file_item = file_item[0]
|
||
self.icon_item.remove(file_item)
|
||
file_item['url'] = url
|
||
self.icon_item.append(file_item)
|
||
return self.write_icon_json()
|
||
|
||
def get_files_info(self, directory, filter_suffix=None):
|
||
for root, dirs, files in os.walk(directory):
|
||
for file in files:
|
||
name, suffix = os.path.splitext(file)
|
||
if suffix != filter_suffix:
|
||
file_path = os.path.join(root, file)
|
||
file_size = os.path.getsize(file_path)
|
||
self.add(name= name, path=file_path, size=file_size, md5=fileUtils.get_file_md5(file_path))
|
||
return self.toString()
|
||
|
||
def write_icon_json(self):
|
||
if self.path != None:
|
||
self.get_files_info(self.path, filter_suffix='.json')
|
||
fileUtils.write_json(os.path.join(self.path,"icons.json"), self.toString())
|
||
|
||
def read_icon_json(self):
|
||
if self.path != None: self.icon_item = fileUtils.read_json(os.path.join(self.path, "icons.json"))
|
||
return self.icon_item
|
||
|
||
class DBUtils:
|
||
|
||
def __init__(self, db_name, suffix="json"):
|
||
self.db = TinyDB(ComicPath().getDirConfDefault(db_name,suffix=suffix))
|
||
|
||
@classmethod
|
||
def init_db(cls,db_name):
|
||
return TinyDB(ComicPath.getDirConfDefault(db_name,suffix="json"))
|
||
|
||
@classmethod
|
||
def set(cls,name,progress,db_name):
|
||
if name == None or progress == None or db_name == None:
|
||
print("dbUtils set 数据为空")
|
||
return False
|
||
db = cls.init_db(db_name)
|
||
comic = Query()
|
||
if len(db.search(comic.name == name)) == 0: db.insert({"name":name,"progress":progress})
|
||
else: db.update({"progress":progress},comic.name== name)
|
||
msg = "失败"
|
||
if cls.query(name,progress,db_name): msg = "成功"
|
||
logger.debug(f"设置{msg}, name={name} value={progress} db={db_name}")
|
||
|
||
@classmethod
|
||
def query(cls,name,progress=None,db_name=None):
|
||
result = False
|
||
db = cls.init_db(db_name)
|
||
if db == None: return None
|
||
data = db.search(Query().name == name)
|
||
logger.debug(f"result query= {data}")
|
||
if progress != None:
|
||
try:
|
||
if len(db.search((Query().name == name) & (Query().progress == progress))) != 0: result = True
|
||
except Exception as e:
|
||
print(e)
|
||
return result
|
||
|
||
@classmethod
|
||
def remove(cls,name,db_name=None):
|
||
db = cls.init_db(db_name)
|
||
db.remove(Query().name == name)
|
||
|
||
class oldUtils:
|
||
def new_files(self, files, folder, suffix="CBZ", result_type="new"):
|
||
result_files = self.old_files(files=files, folder=folder, suffix=suffix, result_type=result_type)
|
||
new_files = []
|
||
if result_files == None:
|
||
if isinstance(files, str): new_files.append(ComicPath.chinese_convert(ComicPath.fix_file_name(files)))
|
||
else:
|
||
for file in files: new_files.append(ComicPath.chinese_convert(ComicPath.fix_file_name(file)))
|
||
return new_files
|
||
else: return result_files
|
||
|
||
def old_files(self, files, folder, suffix="CBZ", result_type="old"):
|
||
result = None
|
||
# 方法三:使用pathlib模块的iterdir方法获取文件夹下的所有文件和文件夹
|
||
# 如果只需要文件名而不是文件的绝对路径,可以使用name属性获取文件名
|
||
if os.path.exists(folder):
|
||
file_names = [f.name for f in pathlib.Path(folder).iterdir() if f.is_file()]
|
||
else:
|
||
return None
|
||
old_item = []
|
||
for file_name in file_names:
|
||
file_split = file_name.split(".")
|
||
file_suffix = file_split[-1]
|
||
file_prefix = file_split[0]
|
||
if file_suffix == suffix:
|
||
old_item.append(file_prefix)
|
||
|
||
new_item = []
|
||
if isinstance(files, str): new_item.append(ComicPath.chinese_convert(ComicPath.fix_file_name(files)))
|
||
else:
|
||
for file in files: new_item.append(ComicPath.chinese_convert(ComicPath.fix_file_name(file)))
|
||
only_in_new_item = [item for item in new_item if item not in old_item]
|
||
only_in_old_item = [item for item in old_item if item not in new_item]
|
||
in_new_item_and_old_item = [item for item in new_item if item in old_item]
|
||
|
||
logging.debug(f"只在new_item中: {only_in_new_item}")
|
||
logging.debug(f"只在old_item中: {only_in_old_item}")
|
||
logging.debug(f"在new_item和old_item中都有: {in_new_item_and_old_item}")
|
||
if result_type == "old": result = only_in_old_item
|
||
if result_type == "new": result = only_in_new_item
|
||
return result
|
||
|
||
def clean_old_files(self, files, folder, move_folder, suffix="CBZ"):
|
||
# 方法三:使用pathlib模块的iterdir方法获取文件夹下的所有文件和文件夹
|
||
# 如果只需要文件名而不是文件的绝对路径,可以使用name属性获取文件名
|
||
|
||
only_in_old_item = self.old_files(files=files, folder=folder, suffix=suffix)
|
||
|
||
def move_file():
|
||
"""移动文件
|
||
"""
|
||
if not os.path.exists(move_folder): os.makedirs(move_folder)
|
||
for old_file in only_in_old_item:
|
||
try:
|
||
suffixs = [ suffix, "jpg" ]
|
||
for suf in suffixs:
|
||
new_move_file = os.path.join(folder, old_file)+"."+suf
|
||
old_move_file = os.path.join(move_folder, old_file)+"."+suf
|
||
if os.path.exists(new_move_file):
|
||
shutil.move(new_move_file, old_move_file)
|
||
print(f"move old_file={new_move_file} --> {old_move_file}")
|
||
except:
|
||
print(f"Error: move old_file={new_move_file} --> {old_move_file}")
|
||
|
||
if only_in_old_item != None: move_file() |