ComicScrapy/Comics/utils/FileUtils.py
2023-06-20 02:52:51 +08:00

361 lines
14 KiB
Python

import base64,hashlib,os,shutil
import math,time,json,datetime,logging
from PIL import Image
from Comics.utils.Constant import ComicPath
from pathlib import Path
from zipfile import ZipFile
from Comics.settings import COMIC_INFO_XML_FILE,CBZ_EXPORT_PATH,IMAGES_STORE
class fileUtils:
@classmethod
def save_file(cls,path,data):
dir = os.path.dirname(path)
if not os.path.exists(dir):
os.makedirs(dir)
with open(path,'w',encoding='utf-8') as fs:
fs.write(str(data))
fs.close()
@classmethod
def path(cls, file):
base_dir = os.path.dirname(file)
if not os.path.exists(base_dir): os.makedirs(base_dir)
return file
class CommonUtils:
@classmethod
def parseExec(cls,data,exec):
if data !=None and exec != None:
dots = str(exec).split(".")
if not isinstance(data,dict): data = json.loads(data)
for dot in dots:
data = data.get(dot)
return data
class imageUtils:
@classmethod
def descramble_images_by_dir(cls, chapter_dir):
if os.path.isfile(chapter_dir):
chapter_dir = os.path.dirname(chapter_dir)
scramble_count = 0
if os.path.exists(chapter_dir): #获取章节图片路径
while ComicPath.PREFIX_SCRAMBLE in os.listdir(chapter_dir):
for img in os.listdir(chapter_dir):
if img.startswith(ComicPath.PREFIX_SCRAMBLE):
imageUtils.encode_scramble_image(os.path.join(chapter_dir, img))
scramble_count += 1
logging.debug(f"{ComicPath.PREFIX_SCRAMBLE} {scramble_count}")
return scramble_count
@classmethod
def deScrambleImagesByPath(cls, img_path, img_save=None):
if os.path.basename(img_path).\
startswith(ComicPath.PREFIX_SCRAMBLE) and os.path.exists(img_path):
img_path = imageUtils.encode_scramble_image(img_path, img_save)
return img_path
@classmethod
def encodeImage(cls,str_en):
#print("en",str_en)
enc = base64.b64decode(str_en)
#print("解密:",enc)
m = hashlib.md5()
m.update(enc)
md5 = m.digest()
d = md5[-1]
#print(md5)
try:
blocks = d % 10 + 5
except:
blocks = 0 %10 + 5
#print("blocks=",blocks)
return blocks
@classmethod
def scrambleImage(cls,file_path):
#检测到未下载完的图像 直接返回None
if str(file_path).endswith(".downloads"):
os.remove(file_path)
return None
file_str = str(file_path).split("=")
#10_29.jpg
base_dir = file_str[0].replace("scramble","")
base_name = file_str[-1]
base_fn = base_name.split("_")
save_name = base_fn[1]
save_name_delesu = save_name.split(".")[0]
blocks = int(base_fn[0])
save_file_path = os.path.join(base_dir,save_name)
print("sva",save_file_path)
if os.path.exists(save_file_path):
print("图片已解密,已跳过:", save_file_path)
return None
image_su = str(file_path).split(".")[-1]
try:
img = Image.open(file_path)
except:
print(f"error Image: {file_path}")
width = img.width
height = img.height
#blocks = cls.encodeImage(enStr)
print("blocks=",blocks)
block_height = int(height / blocks)
block_width = int(width / blocks)
print("blockHeight=",block_height)
suffix = str(file_path).split(".")[-1]
split_path = os.path.join(base_dir,save_name_delesu+"split")
if image_su == "downloads":
return None
is_split = cls.splitimage(file_path,blocks,1,split_path)
if is_split != None:
cls.image_compose(split_path,blocks,1,save_file_path,block_height,width)
else:
if os.path.exists(split_path):
shutil.rmtree(split_path)
if os.path.exists(file_path):
shutil.move(file_path, save_file_path)
#完成后清空
return file_path
@classmethod
def splitimage(cls,src,rownum,colnum,dstpath):
img=Image.open(src)
w,h=img.size
if rownum<= h and colnum<=w:
s=os.path.split(src)
if dstpath=='':
dstpath = s[0]
if not os.path.exists(dstpath):
os.makedirs(dstpath)
fn=s[1].split('.')
basename=fn[0]
ext=fn[-1]
num=0
rowheight=h//rownum
colwidth=w//colnum
for r in range(rownum):
for c in range(colnum):
box=(c*colwidth,r*rowheight,(c+1)*colwidth,(r+1)*rowheight)
count_image = "{:0>3d}".format(num)
file_path = os.path.join(dstpath,str(count_image)+'.'+ext)
print("file_path=",file_path)
img.crop(box).save(file_path)
num=num+1
return "成功"
else:
print('不数!')
return None
@classmethod
def image_compose(cls,src,row,column,save_path,image_height,image_width):
image_size = image_height
#image_height = 376
#image_width = 720
images_format = ['.png','.jpg']
#image_names = [name for name in os.listdir(src) for item in images_format if
# os.path.splitext(name)[1] == item][::-1]
img_list=os.listdir(src)
img_list.sort()
img_list.sort(key=lambda x: int(x[:-4]))
##文件名按数字排序
img_nums=len(img_list)
image_names = []
for i in range(img_nums):
img_name=os.path.join(src,img_list[i])
image_names.append(img_name)
#使用倒序
image_names = image_names[::-1]
# 简单的对于参数的设定和实际图片集的大小进行数量判断
if len(image_names) < row * column:
raise ValueError("合成图片的参数和要求的数量不能匹配!")
to_image = Image.new('RGB', (column * image_width, row * image_height)) #创建一个新图
# 循环遍历,把每张图片按顺序粘贴到对应位置上
for y in range(1, row + 1):
for x in range(1, column + 1):
#1 * (row=1 -1) col=1 -1
image_path = image_names[column * (y - 1) + x - 1]
print("split_image=",image_path)
from_image = Image.open(image_path)
#保持原图片大小
#.resize(
# (image_size, image_size),Image.ANTIALIAS)
to_image.paste(from_image, ((x - 1) * image_size, (y - 1) * image_size))
from_image.close()
to_image.save(save_path)
print("图片合并完成:", save_path)
shutil.rmtree(src)
# 保存新图
@classmethod
def getScrambleImage(cls,path):
scramble_file_cache = cls.scrambleImage(path)
if scramble_file_cache != None and os.path.exists(scramble_file_cache): os.remove(scramble_file_cache)
@classmethod
def encode_scramble_image(cls, img_path, img_save=None):
if not os.path.exists(img_path):
return
image = Image.open(img_path)
w, h = image.size
#image.show()
file_str = str(img_path).split("=")
#10_29.jpg
base_fn = file_str[-1].split("_")
blocks = int(base_fn[0])
if img_save == None:
save_path = os.path.join(os.path.dirname(img_path),ComicPath.getFileScrambleImageSave(img_path))
else: save_path = img_save
# print(type(aid),type(img_name))
if blocks:
s = blocks # 随机值
# print(s)
l = h % s # 切割最后多余的值
box_list = []
hz = 0
for i in range(s):
c = math.floor(h / s)
g = i * c
hz += c
h2 = h - c * (i + 1) - l
if i == 0:
c += l;hz += l
else:
g += l
box_list.append((0, h2, w, h - g))
# print(box_list,len(box_list))
item_width = w
# box_list.reverse() #还原切图可以倒序列表
# print(box_list, len(box_list))
newh = 0
image_list = [image.crop(box) for box in box_list]
# print(box_list)
newimage = Image.new("RGB", (w, h))
for image in image_list:
# image.show()
b_w, b_h = image.size
newimage.paste(image, (0, newh))
newh += b_h
newimage.save(save_path)
logging.info(f"解密成功 {save_path}")
if os.path.exists(img_path):
os.remove(img_path)
logging.debug(f"remove {img_path}")
return save_path
class CBZUtils:
@classmethod
def readDirsOrFiles(cls, dir, type):
data = []
files = os.listdir(dir)
for file in files:
path = os.path.join(dir, file)
if type == "files" and os.path.isfile(path):
data.append(path)
if type == "dirs" and os.path.isdir(path):
data.append(path)
return data
@classmethod
def zip_compression(cls, source_dir=None, target_file=None, remove=True):
target_dir = os.path.dirname(target_file)
if not os.path.exists(target_dir):
os.makedirs(target_dir)
if not os.path.exists(target_file) and source_dir is not None:
with ZipFile(target_file, mode='w') as zf:
for path, dir_names, filenames in os.walk(source_dir):
path = Path(path)
arc_dir = path.relative_to(source_dir)
y = 0
for filename in filenames:
y = y + 1
print("打包中:" + str(y) + "/" + str(len(filenames)), os.path.join(source_dir, filename))
zf.write(path.joinpath(filename), arc_dir.joinpath(filename))
zf.close()
logging.info(f"打包完成:{target_file}")
@classmethod
def packComicChapterCBZ(cls, comic, chapter, comic_info_images, remove=True):
images_chapter_path = os.path.join(IMAGES_STORE, comic, chapter)
cbz_chapter_path = os.path.join(CBZ_EXPORT_PATH, comic, chapter) + ".CBZ"
if os.path.exists(images_chapter_path):
dirs = os.listdir(images_chapter_path)
for file in dirs:
if file.startswith(ComicPath.PREFIX_SCRAMBLE):
try:
imageUtils.deScrambleImagesByPath(os.path.join(images_chapter_path,file))
except Exception as e:
print(f"删除 {file} 发生错误 {e},已跳过")
return False
cls.zip_compression(images_chapter_path, cbz_chapter_path)
time.sleep(0.1)
if remove: shutil.rmtree(images_chapter_path)
# validation
cls.cbz_validate(cbz_chapter_path, comic_info_images)
return True
@classmethod
def replaceZip(cls, filepath, unpack_dir=None):
if not cls.compareFileDate(filepath): return None
if unpack_dir == None:
unpack_dir = str(filepath).split(".")[0]
fz = ZipFile(filepath, 'r')
for file in fz.namelist():
if file.endswith(".jpg"):
data = fz.read(file)
if len(data) < 500 and os.path.exists(filepath):
os.remove(filepath)
print(f"数据不完整,已删除:{filepath}")
if cls.compareFileDate(filepath):
os.utime(filepath)
print(f"已更新文件时间 {filepath}")
if os.path.exists(unpack_dir):
shutil.rmtree(unpack_dir)
# 删除删除main.ftl文件
# delete_filename = ''
# if os.path.exists(delete_filename):
# os.remove(delete_filename)
# time.sleep(60)
# shutil.copy(文件的路径,另一个目录);拷贝main.ftl到准备压缩的目录下
# cls.zip_compression()
# 小于则运行
@classmethod
def compareFileDate(cls, filepath):
if os.path.exists(filepath):
ctime = os.path.getmtime(filepath)
str_ctime = datetime.fromtimestamp(int(ctime))
file_ctime = str(str_ctime.year) + "{:0>2d}".format(str_ctime.month) + "{:0>2d}".format(
str_ctime.day) + "{:0>2d}".format(str_ctime.hour)
c_ctime = 2023011603
else:
return False
if int(file_ctime) < c_ctime:
return True
return False
@classmethod
def zip_info(cls, path, filter=True):
result = None
try:
with ZipFile(path, "r") as zip_file:
result = zip_file.namelist()
if filter:
result.remove(COMIC_INFO_XML_FILE)
except Exception as e:
print(e)
return result
@classmethod
def cbz_validate(cls, zip_path, comic_info_images):
if len(cls.zip_info(zip_path)) == len(comic_info_images):
logging.info(f"validating successfully === {zip_path}")
else:
os.remove(zip_path)
logging.error(f"validating fail === {zip_path}")