ComicScrapy/Comics/utils.py
2024-07-22 00:52:50 +08:00

907 lines
36 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import base64,hashlib,os,shutil,os.path,pathlib
import math,time,json,datetime,logging
import re,requests,time,xmlschema
from datetime import date
from Comics import settings
from opencc import OpenCC
from PIL import Image
from pathlib import Path
from zipfile import ZipFile
from Comics.settings import COMIC_INFO_XML_FILE,OUTPUT_DIR,PROJECT_KEY
import yaml
from Comics.loader import ComicLoader
from tinydb import TinyDB, Query
# 配置类
class Conf():
# 读取yml文件配置
# @project 根据工程名读取配置 project.yml
# @key 读取key内的字典的数据(默认为空)
#def init(self, project, key=None):
# data = None
# if project == None: project = "config"
# with open(os.path.join("Comics","spiders", project)+".yml") as f:
# data = yaml.load(f, Loader=yaml.FullLoader)
# if key != None and data != None:
# return data[key]
def get_config_value(self, project, key=None):
# 使用Path类来处理文件路径
config_path = Path(os.path.join("Comics","spiders", project)+".yml")
#Path("Comics") / "spiders" / (project + ".yml")
# 检查项目是否存在
if not config_path.is_file():
return None
# 打开文件并加载配置数据
try:
with config_path.open('r') as f:
data = yaml.safe_load(f)
except yaml.YAMLError as e:
print(f"Error loading YAML file: {e}")
return None
# 检查key是否存在
if key is not None and key in data:
return data[key]
else:
return None
# 根据读取的配置数据导入到ComicLoader中
def comic(self, project, item: ComicLoader, child_data='data', val=None):
item.project_name(project)
data = self.get_config_value(project, child_data)
for key, xpath_data in data.items():
if isinstance(xpath_data, str): xpath_data = {'xpath': xpath_data}
xpath = xpath_data.get('xpath', None)
index = xpath_data.get('index', None)
value = xpath_data.get('value', None) if val is None else val
sexec = xpath_data.get('sexec', None)
item.set_properties(name=key, value=value, xpath=xpath, index=index, sexec=sexec)
return item
def parse_chapter(self,item: ComicLoader, value):
return self.comic(item.get_project_name(), item, "parse_chapter", value)
def parse_chapter_api(self, item: ComicLoader, value):
return self.comic(item.get_project_name(), item, "parse_chapter_api", value)
# 文件操作类
class fileUtils:
# 文件是否存在
@classmethod
def exists(cls, path): return os.path.exists(path)
# 文件路径拼接
@classmethod
def join(cls, path, *paths): return os.path.join(path, *paths);
# 文件夹名
@classmethod
def dirname(cls, path): return os.path.dirname(path);
# 文件名
@classmethod
def basename(cls, path): return os.path.basename(path);
# 保存文件
@classmethod
def write(cls,path,data,type='file'):
root_dir = os.path.dirname(path)
if not os.path.exists(root_dir): os.makedirs(root_dir)
with open(path,'w',encoding='utf-8') as fs:
if type == 'file':
fs.write(data)
elif type == 'json':
fs.write(json.dumps(data,ensure_ascii=False,indent=4))
elif type == 'yaml':
fs.write(yaml.dump(data, allow_unicode=True, default_flow_style=False, indent=4))
# 返回校验后的文件路径
@classmethod
def path(cls, file):
base_dir = os.path.dirname(file)
if not os.path.exists(base_dir): os.makedirs(base_dir)
return file
# 比较文件大小
@classmethod
def compare_size(cls, dst, file):
if cls.exists(dst) and cls.exists(file):
return os.stat(dst).st_size == os.stat(file).st_size
else:
return None
# 读取文件
@classmethod
def read(cls, file, type='file'):
if os.path.exists(file):
with open(file, "r", encoding="utf-8") as fs:
if type == "json":
return json.loads(fs.read())
elif type == 'yaml':
return yaml.safe_load(fs.read())
elif type == 'file':
return fs.read()
else:
return []
@classmethod
def get_file_md5(cls, file_path):
md5 = hashlib.md5()
with open(file_path, 'rb') as f:
for chunk in iter(lambda: f.read(4096), b''):
md5.update(chunk)
return md5.hexdigest()
@classmethod
def write_json(cls, path, data): cls.write(path, data , type="json")
@classmethod
def read_json(cls, file): return cls.read(file, type="json")
"""
图像编号 image-1.jpg
存在image.png 返回 image-1.png 反之 image.png
"""
@classmethod
def file_check(cls, file, result="file", count=0):
# 初始化临时文件名 文件信息 文件名列表
temp_file_name, files_data, files_name = [file, {}, []]
# 默认文件名不存在
if not cls.exists(temp_file_name) and temp_file_name == file: count = 1
while count or count == 0:
# 获取格式化文件名
temp_file_name = ComicPath().images_icon(file=file, count=count)
# 格式化文件名存在
if cls.exists(temp_file_name):
# 保存存在的文件名
files_name.append(temp_file_name)
file_size = os.path.getsize(temp_file_name)
# 保存文件名和大小数据
files_data[file_size] = {"name": temp_file_name, "size": file_size}
# 格式化文件名
# temp_file_name = ComicPath().images_icon(file=file, count=count)
count += 1
else:
# 检测是否有重复数据
# 提取重复并需删除的文件名
diff_names = {value["name"] for value in files_data.values()}
# 不存在则返回原文件名
if len(diff_names) == 0:
# 默认为空
if result == "size": return diff_names
# 原文件名
else: return file
for file_name in files_name:
if file_name not in diff_names:
logging.info(f"删除文件:{file_name}")
os.remove(file_name)
# 判断是否存在初始文件和多个文件名
if len(diff_names) == 1 and file not in diff_names:
for diff_file in diff_names:
logging.info(f"移动文件{diff_file}{file}")
shutil.move(diff_file, file)
if file in diff_names and len(diff_names) > 1:
move_file = ComicPath().images_icon(file=file, count=count)
logging.info(f"移动文件{file}{move_file}")
shutil.move(file, move_file)
cls.file_check(file=file,result=result,count=0)
# 去重后文件名数与存在的文件名数不存在则证明文件存在重复,重新运行本方法
if len(set(diff_names)) != len(set(files_name)): cls.file_check(file, result=result,count=0)
if result == "size":
return {value["size"] for value in files_data.values()}
else:
return temp_file_name
# 判断文件是否更新
@classmethod
def file_update(cls, old_file, new_file):
is_update = False
if os.path.exists(old_file): is_update = os.path.getsize(old_file) not in cls.file_check(new_file, result="size")
return is_update
# 判断是否需要更新封面
@classmethod
def update_icon(cls, image_path, save_path):
# 不存在则更新
if cls.file_update(image_path, save_path):
save_dir = os.path.dirname(save_path)
if not os.path.exists(save_dir): os.makedirs(save_dir)
logging.info(f"update icon ... {image_path} ===> {cls.file_check(save_path)}")
shutil.copyfile(image_path, cls.file_check(save_path))
# 公共工具类
class CommonUtils:
@classmethod
def parseExec(cls,data,exec):
if data !=None and exec != None:
dots = str(exec).split(".")
if not isinstance(data,dict): data = json.loads(data)
for dot in dots:
data = data.get(dot)
return data
@classmethod
def _validate_xml(cls,xml_file, xsd_file):
# 读取XSD文件
xsd = xmlschema.XMLSchema(xsd_file)
# 验证XML
is_valid = xsd.is_valid(xml_file)
if is_valid:
print("XML文件通过XSD验证成功")
else:
print("XML文件未通过XSD验证。以下是验证错误信息")
validation_errors = xsd.to_errors(xml_file)
for error in validation_errors:
print(error)
@classmethod
def validate_comicinfo_xml(cls, xml_file):
cls._validate_xml(xml_file, "ComicInfo.xsd")
# 图片处理类
class imageUtils:
@classmethod
def descramble_images_by_dir(cls, chapter_dir):
if os.path.isfile(chapter_dir):
chapter_dir = os.path.dirname(chapter_dir)
scramble_count = 0
if os.path.exists(chapter_dir): #获取章节图片路径
while ComicPath.PREFIX_SCRAMBLE in os.listdir(chapter_dir):
for img in os.listdir(chapter_dir):
if img.startswith(ComicPath.PREFIX_SCRAMBLE):
imageUtils.encode_scramble_image(os.path.join(chapter_dir, img))
scramble_count += 1
logging.debug(f"{ComicPath.PREFIX_SCRAMBLE} {scramble_count}")
return scramble_count
@classmethod
def deScrambleImagesByPath(cls, img_path, img_save=None):
if os.path.basename(img_path).\
startswith(ComicPath.PREFIX_SCRAMBLE) and os.path.exists(img_path):
img_path = imageUtils.encode_scramble_image(img_path, img_save)
return img_path
@classmethod
def encodeImage(cls,str_en):
#print("en",str_en)
enc = base64.b64decode(str_en)
#print("解密:",enc)
m = hashlib.md5()
m.update(enc)
md5 = m.digest()
d = md5[-1]
#print(md5)
try:
blocks = d % 10 + 5
except:
blocks = 0 %10 + 5
#print("blocks=",blocks)
return blocks
@classmethod
def scrambleImage(cls,file_path):
#检测到未下载完的图像 直接返回None
if str(file_path).endswith(".downloads"):
os.remove(file_path)
return None
file_str = str(file_path).split("=")
#10_29.jpg
base_dir = file_str[0].replace("scramble","")
base_name = file_str[-1]
base_fn = base_name.split("_")
save_name = base_fn[1]
save_name_delesu = save_name.split(".")[0]
blocks = int(base_fn[0])
save_file_path = os.path.join(base_dir,save_name)
print("sva",save_file_path)
if os.path.exists(save_file_path):
print("图片已解密,已跳过:", save_file_path)
return None
image_su = str(file_path).split(".")[-1]
try:
img = Image.open(file_path)
except:
print(f"error Image: {file_path}")
width = img.width
height = img.height
#blocks = cls.encodeImage(enStr)
print("blocks=",blocks)
block_height = int(height / blocks)
block_width = int(width / blocks)
print("blockHeight=",block_height)
suffix = str(file_path).split(".")[-1]
split_path = os.path.join(base_dir,save_name_delesu+"split")
if image_su == "downloads":
return None
is_split = cls.splitimage(file_path,blocks,1,split_path)
if is_split != None:
cls.image_compose(split_path,blocks,1,save_file_path,block_height,width)
else:
if os.path.exists(split_path):
shutil.rmtree(split_path)
if os.path.exists(file_path):
shutil.move(file_path, save_file_path)
#完成后清空
return file_path
@classmethod
def splitimage(cls,src,rownum,colnum,dstpath):
img=Image.open(src)
w,h=img.size
if rownum<= h and colnum<=w:
s=os.path.split(src)
if dstpath=='':
dstpath = s[0]
if not os.path.exists(dstpath):
os.makedirs(dstpath)
fn=s[1].split('.')
basename=fn[0]
ext=fn[-1]
num=0
rowheight=h//rownum
colwidth=w//colnum
for r in range(rownum):
for c in range(colnum):
box=(c*colwidth,r*rowheight,(c+1)*colwidth,(r+1)*rowheight)
count_image = "{:0>3d}".format(num)
file_path = os.path.join(dstpath,str(count_image)+'.'+ext)
print("file_path=",file_path)
img.crop(box).save(file_path)
num=num+1
return "成功"
else:
print('不数!')
return None
@classmethod
def image_compose(cls,src,row,column,save_path,image_height,image_width):
image_size = image_height
#image_height = 376
#image_width = 720
images_format = ['.png','.jpg']
#image_names = [name for name in os.listdir(src) for item in images_format if
# os.path.splitext(name)[1] == item][::-1]
img_list=os.listdir(src)
img_list.sort()
img_list.sort(key=lambda x: int(x[:-4]))
##文件名按数字排序
img_nums=len(img_list)
image_names = []
for i in range(img_nums):
img_name=os.path.join(src,img_list[i])
image_names.append(img_name)
#使用倒序
image_names = image_names[::-1]
# 简单的对于参数的设定和实际图片集的大小进行数量判断
if len(image_names) < row * column:
raise ValueError("合成图片的参数和要求的数量不能匹配!")
to_image = Image.new('RGB', (column * image_width, row * image_height)) #创建一个新图
# 循环遍历,把每张图片按顺序粘贴到对应位置上
for y in range(1, row + 1):
for x in range(1, column + 1):
#1 * (row=1 -1) col=1 -1
image_path = image_names[column * (y - 1) + x - 1]
print("split_image=",image_path)
from_image = Image.open(image_path)
#保持原图片大小
#.resize(
# (image_size, image_size),Image.ANTIALIAS)
to_image.paste(from_image, ((x - 1) * image_size, (y - 1) * image_size))
from_image.close()
to_image.save(save_path)
print("图片合并完成:", save_path)
shutil.rmtree(src)
# 保存新图
@classmethod
def getScrambleImage(cls,path):
scramble_file_cache = cls.scrambleImage(path)
if scramble_file_cache != None and os.path.exists(scramble_file_cache): os.remove(scramble_file_cache)
@classmethod
def encode_scramble_image(cls, img_path, img_save=None):
if not os.path.exists(img_path):
return
image = Image.open(img_path)
w, h = image.size
#image.show()
file_str = str(img_path).split("=")
#10_29.jpg
base_fn = file_str[-1].split("_")
blocks = int(base_fn[0])
if img_save == None:
save_path = os.path.join(os.path.dirname(img_path),ComicPath.getFileScrambleImageSave(img_path))
else: save_path = img_save
# print(type(aid),type(img_name))
if blocks:
s = blocks # 随机值
# print(s)
l = h % s # 切割最后多余的值
box_list = []
hz = 0
for i in range(s):
c = math.floor(h / s)
g = i * c
hz += c
h2 = h - c * (i + 1) - l
if i == 0:
c += l;hz += l
else:
g += l
box_list.append((0, h2, w, h - g))
# print(box_list,len(box_list))
item_width = w
# box_list.reverse() #还原切图可以倒序列表
# print(box_list, len(box_list))
newh = 0
image_list = [image.crop(box) for box in box_list]
# print(box_list)
newimage = Image.new("RGB", (w, h))
for image in image_list:
# image.show()
b_w, b_h = image.size
newimage.paste(image, (0, newh))
newh += b_h
newimage.save(save_path)
logging.info(f"解密成功 {save_path}")
if os.path.exists(img_path):
os.remove(img_path)
logging.debug(f"remove {img_path}")
return save_path
# 压缩工具类
class CBZUtils:
@classmethod
def readDirsOrFiles(cls, dir, type):
data = []
files = os.listdir(dir)
for file in files:
path = os.path.join(dir, file)
if type == "files" and os.path.isfile(path):
data.append(path)
if type == "dirs" and os.path.isdir(path):
data.append(path)
return data
@classmethod
def zip_compression(cls, source_dir=None, target_file=None, remove=True):
target_dir = os.path.dirname(target_file)
if not os.path.exists(target_dir):
os.makedirs(target_dir)
if not os.path.exists(target_file) and source_dir is not None:
with ZipFile(target_file, mode='w') as zf:
for path, dir_names, filenames in os.walk(source_dir):
path = Path(path)
arc_dir = path.relative_to(source_dir)
y = 0
for filename in filenames:
y = y + 1
print("打包中:" + str(y) + "/" + str(len(filenames)), os.path.join(source_dir, filename))
zf.write(path.joinpath(filename), arc_dir.joinpath(filename))
zf.close()
logging.info(f"打包完成:{target_file}")
@classmethod
def packComicChapterCBZ(cls, src_dir, dts_path, comic_info_images, remove=True):
if os.path.exists(src_dir):
dirs = os.listdir(src_dir)
for file in dirs:
if file.startswith(ComicPath.PREFIX_SCRAMBLE):
try:
imageUtils.deScrambleImagesByPath(os.path.join(src_dir,file))
except Exception as e:
print(f"删除 {file} 发生错误 {e},已跳过")
return False
cls.zip_compression(src_dir, dts_path)
time.sleep(0.1)
if remove: shutil.rmtree(src_dir)
# validation
return cls.cbz_validate(dts_path, comic_info_images)
@classmethod
def replaceZip(cls, filepath, unpack_dir=None):
if not cls.compareFileDate(filepath): return None
if unpack_dir == None:
unpack_dir = str(filepath).split(".")[0]
fz = ZipFile(filepath, 'r')
for file in fz.namelist():
if file.endswith(".jpg"):
data = fz.read(file)
if len(data) < 500 and os.path.exists(filepath):
os.remove(filepath)
print(f"数据不完整,已删除:{filepath}")
if cls.compareFileDate(filepath):
os.utime(filepath)
print(f"已更新文件时间 {filepath}")
if os.path.exists(unpack_dir):
shutil.rmtree(unpack_dir)
# 删除删除main.ftl文件
# delete_filename = ''
# if os.path.exists(delete_filename):
# os.remove(delete_filename)
# time.sleep(60)
# shutil.copy(文件的路径,另一个目录);拷贝main.ftl到准备压缩的目录下
# cls.zip_compression()
# 小于则运行
@classmethod
def compareFileDate(cls, filepath):
if os.path.exists(filepath):
ctime = os.path.getmtime(filepath)
str_ctime = datetime.fromtimestamp(int(ctime))
file_ctime = str(str_ctime.year) + "{:0>2d}".format(str_ctime.month) + "{:0>2d}".format(
str_ctime.day) + "{:0>2d}".format(str_ctime.hour)
c_ctime = 2023011603
else:
return False
if int(file_ctime) < c_ctime:
return True
return False
@classmethod
def zip_info(cls, path, filter=True):
result = None
try:
with ZipFile(path, "r") as zip_file:
result = zip_file.namelist()
if filter:
result.remove(COMIC_INFO_XML_FILE)
except Exception as e:
print(e)
return result
@classmethod
def cbz_validate(cls, zip_path, comic_info_images):
cbz_info = cls.zip_info(zip_path)
if len(cbz_info) == len(comic_info_images):
# logging.info(f"validating successfully === {zip_path}")
ntfy.sendMsg(f"validating successfully === {zip_path}", alert=True)
return True
else:
os.remove(zip_path)
ntfy.sendMsg(f"validating fail === {zip_path}, cbz_info={cbz_info},zip_info={comic_info_images}", alert=True)
return False
# 检测工具类
class checkUtils:
def read(self, item):
file = os.path.join(OUTPUT_DIR, ComicLoader(item=item).get_project_name(), "error_comics.json")
return fileUtils.read(file)
#
# 检测某一章节是否连续错误
def export_error(self, item):
if not self.is_error(item):
file = os.path.join(OUTPUT_DIR, ComicLoader(item=item).get_project_name(), "error_comics.json")
try:
error_comic = eval(self.read(item))
except:
error_comic = []
error_comic.append({ "name" : ComicPath.new_file_name(item['name']),
"chapter" : ComicPath.new_file_name(item['chapter']),
"date" : ComicPath().getYearMonthDay()})
fileUtils.save_file(file, json.dumps(error_comic))
def is_error(self, item):
try:
for error_c in eval(self.read(item)):
(name, chatper, date) = [error_c['name'], error_c['chapter'], error_c['date']]
if ComicPath.new_file_name(item['name']) == ComicPath.new_file_name(name) and ComicPath.new_file_name(item['chapter']) == ComicPath.new_file_name(chatper):
return True
else:
return False
except:
return False
# Comic路径类
class ComicPath:
ci, project, name, chapter = [ None, "", "", ""]
def __init__(self, item = None):
if item == None: return
else:
self.ci : ComicLoader = ComicLoader(item=item)
if self.ci.get_project_name() != None: self.project = self.fix_file_name(self.ci.get_project_name())
if self.ci.get_name() != None: self.name = self.fix_file_name(self.ci.get_name())
if self.ci.get_chapter() != None: self.chapter = self.fix_file_name(self.ci.get_chapter())
PREFIX_SCRAMBLE = "scramble="
MAPPING_IMAGE = "image"
MAPPING_COMIC_INFO = "comic_info"
MAPPING_CBZ_ICON = "cbz_icon"
MAPPING_DOWN_ICON = "down_icon"
MAPPING_DOWN_CACHE_ICON = "down_cache_icon"
MAPPING_ICON = "icon"
MAPPING_ICON_CACHE = "icon_cache"
MAPPING_CBZ = "cbz"
MAPPING_CBZ_DIR = "cbz_dir"
MAPPING_OLD_CBZ_DIR = "old_cbz_dir"
MAPPING_IMAGES_DIR = "images_dir"
MAPPING_COMIC_JSON = "comic_json"
def PATH_MAPPING(self):
if self.project == None or self.name == None or self.chapter == None: return None
return {
self.MAPPING_IMAGE: os.path.join(self.project, "images", self.name, self.chapter),
self.MAPPING_COMIC_INFO: os.path.join(self.project, "images", self.name, self.chapter),
self.MAPPING_CBZ_ICON: os.path.join(settings.CBZ_EXPORT_PATH, self.project, self.name, self.chapter+".jpg"),
self.MAPPING_DOWN_ICON: os.path.join(settings.IMAGES_STORE, self.project, "icons", self.name, self.name+".jpg"),
self.MAPPING_DOWN_CACHE_ICON: os.path.join(settings.IMAGES_STORE, self.project, "icons", ".cache", self.name+".jpg"),
self.MAPPING_ICON: os.path.join(self.project, "icons", self.name, self.name+".jpg"),
self.MAPPING_ICON_CACHE: os.path.join(self.project, "icons", ".cache", self.name+".jpg"),
self.MAPPING_CBZ: os.path.join(settings.CBZ_EXPORT_PATH, self.project, self.name, self.chapter+".CBZ"),
self.MAPPING_IMAGES_DIR: os.path.join(settings.IMAGES_STORE, self.project, "images", self.name, self.chapter),
self.MAPPING_COMIC_JSON: os.path.join(settings.IMAGES_STORE, self.project, "json", self.name, self.chapter+".json"),
self.MAPPING_CBZ_DIR: os.path.join(settings.CBZ_EXPORT_PATH, self.project, self.name),
self.MAPPING_OLD_CBZ_DIR: os.path.join(settings.OLD_CBZ_EXPORT_PATH, self.project, self.name)
}
def PATH_CBZ(self, result_type=MAPPING_CBZ): return self.file_path(result_type=result_type)
def getDirJosnComicChapter(self, result_type=MAPPING_COMIC_JSON): return self.file_path(result_type=result_type)
def file_path(self, result_type=MAPPING_IMAGE, file=None, convert=True, chapter=None):
if chapter != None: self.chapter = chapter
path = self.PATH_MAPPING().get(result_type, None)
if result_type == self.MAPPING_IMAGE and os.path.sep not in file:
return os.path.join(path, file)
if convert:
path = self.chinese_convert(path)
return path
@classmethod
def getYearMonthDay(cls):
today = date.today()
# 格式化为年-月-日
return today.strftime("%Y%m%d")
#@classmethod
#def getDirComicChapter(cls, result_type=): return cls.file_path(result_type=result_type)
@classmethod
def getFileScrambleImageName(cls,count,block,suffix=".jpg"): return cls.PREFIX_SCRAMBLE+str(block)+"_"+str(count)+suffix
@classmethod
def getFileScrambleImageSave(cls,file,relative=False, is_prefix=True):
file_name = str(file).split("_")[-1]
if relative:
file_name = os.path.basename(file_name)
if relative == "fullpath":
file_name = os.path.join(os.path.dirname(file), file_name)
if not is_prefix:
return file_name.split(".")[0]
else:
return file_name
#繁体中文转简体中文
@classmethod
def chinese_convert(cls, text,convert='t2s'): return OpenCC(convert).convert(str(text))
#处理成符合规定的文件名
@classmethod
def fix_file_name(cls, filename, replace=None):
if isinstance(filename, list):
for file in filename: cls.fix_file_name(file)
if not isinstance(filename, str):
return filename
in_tab = r'[?*/\|.:><]'
str_replace = ""
if replace is not None:
str_replace = replace
filename = re.sub(in_tab, str_replace, filename)
count = 1
while True:
str_file = filename[0-count]
if str_file == " ":
count += 1
else:
filename = filename[0:len(filename)+1-count]
break
return filename
@classmethod
def new_file_name(cls, name): return cls.fix_file_name(cls.chinese_convert(name))
@classmethod
def images_icon(cls, file, count):
if count == 0: return file
name, suffix = os.path.splitext(file)
return name+"-"+str(count)+suffix
# 通知类
class ntfy:
@classmethod
def sendMsg(cls, msg,alert=False,sleep=None,error=None):
try:
print(f"#ntfy: {msg}")
if alert:
requests.post("https://ntfy.caiwenxiu.cn/PyComic",
data=msg.encode(encoding='utf-8'))
except:
print(f"#ntfy error: {msg}")
if sleep != None:
logging.info(f'等待{sleep}秒后进入下一阶段')
time.sleep(int(sleep))
if error != None:
print(f"#ntfy Error: {error}")
return False
else:
return True
class logger:
def log_image_download(self, image_path):
if image_path == "":
logging.info(f"icon file exists: IMAGE_STORE {image_path}")
else:
logging.info(f"file exists: IMAGE_STORE {image_path}")
def log_image_not_downloaded(self, image_path):
logging.info(f"file does not exist: IMAGE_STORE {image_path}")
class ItemIconUtils:
def __init__(self, path, icon_item=None):
if icon_item != None: self.icon_item = icon_item
else: self.icon_item = []
self.path = path
if path != None:
self.read_icon_json()
def add(self, name, path, size, md5, url=None):
self.icon_item.append({ 'name': name, 'path': path, 'size': size, 'md5':md5, 'url': url })
def toString(self):
return self.icon_item
def _get_values(self, key): return [ item.get(key) for item in self.icon_item ]
def get_names(self): return self._get_values("name")
def get_file_count_by_md5(self, md5):
md5_list = self._get_values("md5")
md5_count = md5_list.count(md5)
if md5_count > 1:
logging.info(f" {md5_list} == 存在md5重复文件")
return md5_count
def get_file_by_key_value(self, key, value):
list_item = []
for item in self.icon_item:
if item.get(key) == value: return list_item.append(item)
return list_item
def get_file_by_md5(self, md5):
list_item = []
for item in self.icon_item:
if item.get("md5") == md5 and self.get_file_count_by_md5(md5) == 1: list_item.append(item)
return list_item
def get_file_by_url(self, url): return self.get_file_by_key_value('url', url)
def update_file_by_md5_to_url(self, md5, url):
file_item = self.get_file_by_md5(md5)
if len(file_item) > 1 :
logging.info(f" {file_item} == 存在多个结果")
if len(file_item) == 1:
file_item = file_item[0]
self.icon_item.remove(file_item)
file_item['url'] = url
self.icon_item.append(file_item)
return self.write_icon_json()
def get_files_info(self, directory, filter_suffix=None):
for root, dirs, files in os.walk(directory):
for file in files:
name, suffix = os.path.splitext(file)
if suffix != filter_suffix:
file_path = os.path.join(root, file)
file_size = os.path.getsize(file_path)
self.add(name= name, path=file_path, size=file_size, md5=fileUtils.get_file_md5(file_path))
return self.toString()
def write_icon_json(self):
if self.path != None:
self.get_files_info(self.path, filter_suffix='.json')
fileUtils.write_json(os.path.join(self.path,"icons.json"), self.toString())
def read_icon_json(self):
if self.path != None: self.icon_item = fileUtils.read_json(os.path.join(self.path, "icons.json"))
return self.icon_item
class DBUtils:
def __init__(self, db_name, suffix="json"):
self.db = TinyDB(ComicPath().getDirConfDefault(db_name,suffix=suffix))
@classmethod
def init_db(cls,db_name):
return TinyDB(ComicPath.getDirConfDefault(db_name,suffix="json"))
@classmethod
def set(cls,name,progress,db_name):
if name == None or progress == None or db_name == None:
print("dbUtils set 数据为空")
return False
db = cls.init_db(db_name)
comic = Query()
if len(db.search(comic.name == name)) == 0: db.insert({"name":name,"progress":progress})
else: db.update({"progress":progress},comic.name== name)
msg = "失败"
if cls.query(name,progress,db_name): msg = "成功"
logger.debug(f"设置{msg}, name={name} value={progress} db={db_name}")
@classmethod
def query(cls,name,progress=None,db_name=None):
result = False
db = cls.init_db(db_name)
if db == None: return None
data = db.search(Query().name == name)
logger.debug(f"result query= {data}")
if progress != None:
try:
if len(db.search((Query().name == name) & (Query().progress == progress))) != 0: result = True
except Exception as e:
print(e)
return result
@classmethod
def remove(cls,name,db_name=None):
db = cls.init_db(db_name)
db.remove(Query().name == name)
class oldUtils:
def clean_old_files(self, files, folder, move_folder, suffix="CBZ"):
# 方法三使用pathlib模块的iterdir方法获取文件夹下的所有文件和文件夹
# 如果只需要文件名而不是文件的绝对路径可以使用name属性获取文件名
if os.path.exists(folder):
file_names = [f.name for f in pathlib.Path(folder).iterdir() if f.is_file()]
else:
return None
old_item = []
for file_name in file_names:
file_split = file_name.split(".")
file_suffix = file_split[-1]
file_prefix = file_split[0]
if file_suffix == suffix:
old_item.append(file_prefix)
new_item = ComicPath.fix_file_name(files)
only_in_new_item = [item for item in new_item if item not in old_item]
only_in_old_item = [item for item in old_item if item not in new_item]
in_new_item_and_old_item = [item for item in new_item if item in old_item]
print(f"只在new_item中: {only_in_new_item}")
print(f"只在old_item中: {only_in_old_item}")
print(f"在new_item和old_item中都有: {in_new_item_and_old_item}")
def move_file():
"""移动文件
"""
if not os.path.exists(move_folder): os.makedirs(move_folder)
for old_file in only_in_old_item:
try:
suffixs = [ suffix, "jpg" ]
for suf in suffixs:
new_move_file = os.path.join(folder, old_file)+"."+suf
old_move_file = os.path.join(move_folder, old_file)+"."+suf
if os.path.exists(new_move_file):
shutil.move(new_move_file, old_move_file)
print(f"move old_file={new_move_file} --> {old_move_file}")
except:
print(f"Error: move old_file={new_move_file} --> {old_move_file}")
move_file()