PyComicPackRouMan/utils/NetUtils.py
2023-04-05 23:10:46 +08:00

269 lines
11 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import print_function
from queue import Queue
from fake_useragent import UserAgent
import shutil,imghdr,concurrent.futures
import requests,os,json,time,re
from lxml import html
from urllib3.util.retry import Retry
from requests.adapters import HTTPAdapter
from common.Constant import pathStr
from common.ComicInfo import ComicInfoUtils as ciUtils
from common.ComicInfo import ComicInfo as ci
from common.ComicInfo import Comic
from common.Constant import ComicPath
from utils.FileUtils import fileUtils as fu
class htmlUtils:
headers = {'User-Agent': UserAgent().random}
url_data = {}
#domain
@classmethod
def parseExec(cls,data,exec):
if data !=None and exec != None:
dots = str(exec).split(".")
if not isinstance(data,dict): data = json.loads(data)
for dot in dots:
data = data.get(dot)
return data
@classmethod
def getXpathData(cls,c_xpath,url=None,num=None,not_eq=None,update=False):
return htmlUtils.xpathData(c_xpath=c_xpath,url=url,num=num,not_eq=not_eq,update=update)
@classmethod
def setXpathData(cls,url,xpath,exec,num=None,result_type=None,type=None,start_add=None,update=False):
result = cls.parseExec(htmlUtils.xpathData(xpath,url=url,num=num,update=update),exec)
if result == None: return None
if result_type == "list" and type != None:
data = []
for x in range(0, len(result)):
if start_add != None:
data.append(start_add+result[x].get(type))
else:
data.append(result[x].get(type))
return data
return result
@classmethod
def getPathSaveHtml(cls,url,type=None):
rstr = r"[\/\\\:\*\?\"\<\>\|\.]" #  '/ \ : * ? " < > |'
try:
file_url = re.sub(rstr, "", url)
except:
file_url = "error_cache"
file_path = os.path.join(pathStr.base_html_cache(),file_url)
if type == "new":
return file_path
if os.path.exists(file_path):
if type == "read":
with open(file_path,"r",encoding="utf-8") as fs: return fs.read()
return file_path
else:
return None
@classmethod
def saveHtml(cls,url,data,type=None):
file_path = cls.getPathSaveHtml(url,type="new")
dir_name = os.path.dirname(file_path)
if not os.path.exists(dir_name):
os.makedirs(dir_name)
with open(file_path,"w",encoding="utf-8") as fs:
if type== "json": data = json.dumps(data)
fs.write(str(data))
@classmethod
def remove_HtmlCache(cls,url):
file_path = cls.getPathSaveHtml(url,type="new")
if os.path.exists(file_path):
try:
os.remove(file_path)
print("已删除")
except:
print()
@classmethod
def getHTML(cls, curl,type=None,update=False):
url_text = None
if update: cls.remove_HtmlCache(curl)
retries = Retry(total=1, backoff_factor=0.5, status_forcelist=[ 500, 502, 503, 504 ])
s = requests.Session()
s.keep_alive = False
s.mount('http://', HTTPAdapter(max_retries=retries))
s.mount('https://', HTTPAdapter(max_retries=retries))
#数据为空则获取数据
try: url_text = cls.getPathSaveHtml(curl,"read")
except: url_text = None
if url_text != None and update == False: return html.fromstring(url_text)
else: url_text = None
repeat = 0
while url_text == None and repeat <=5:
try:
print(f"请求地址:{curl}")
res = s.get(curl,stream=True, headers=cls.headers, timeout=10,allow_redirects=True)
if type == "bytes":
url_text = res
if type == "json":
cls.saveHtml(curl,res.text,type="json")
return json.loads(res.text)
if type == None:
url_text = html.fromstring(res.text)
cls.saveHtml(curl,res.text)
res.close()
except Exception as e:
repeat += 1
print(f"请求失败Exception: {e} {curl}")
return url_text
@classmethod
def getBytes(cls, url):
return cls.getHTML(url,type="bytes")
@classmethod
def getJSON(cls,url,update=False):
return cls.getHTML(url,type="json",update=update)
@classmethod
def xpathData(cls,c_xpath,url=None,num=None,not_eq=None,update=False):
if url == None: url = cls.temp_url
else: cls.temp_url = url
result = []
if update:
html_cache_path = cls.getPathSaveHtml(url,"new")
if os.path.exists(html_cache_path):
try:
os.remove(html_cache_path)
print(f"html_cache更新成功 {html_cache_path}")
except:
print(f"html_cache更新失败 {html_cache_path}")
#获取html实体数据
et = cls.getHTML(url)
if et == None:
return None
#比对数据
count = 1
xpaths = et.xpath(c_xpath)
for x in xpaths:
if x != not_eq:
result.append(x)
count +=1
if num != None:
try:
result = result[num]
except:
result = None
return result
class downloadUtils:
headers = {
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Proxy-Connection": "keep-alive",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.99 Safari/537.36",
"Accept-Encoding": "gzip, deflate, sdch",
# 'Connection': 'close',
}
down_queue = Queue()
@classmethod
def common_download(cls,file_name,image_url,dst_dir,timeout=10,proxy=None,proxy_type=None):
proxies = None
if proxy_type is not None:
proxies = {
"http": proxy_type + "://" + proxy,
"https": proxy_type + "://" + proxy }
response = None
file_path = os.path.join(dst_dir, file_name)
if os.path.exists(file_path):
print("download_image 文件已存在,已跳过=",file_path)
return None
temp_path = os.path.join(dst_dir, file_name+".downloads")
repair_count = 1
response = requests.get(
image_url, headers=cls.headers, timeout=timeout, proxies=proxies)
while response.status_code != 200 and repair_count <= 5:
time.sleep(0.7)
cls.download_image(image_url,dst_dir,file_name)
print(f'重试:第{repair_count}{image_url}')
cls.down_queue.put([file_name,image_url,dst_dir])
repair_count += 1
with open(temp_path, 'wb') as f:
f.write(response.content)
response.close()
#验证是否是图像
if fu.ver_file(temp_path,type="image"):
shutil.move(temp_path, file_path)
print("## OK: {} {}".format(file_path, image_url))
else:
print("## Fail: {} {}".format(image_url, "图像损坏"))
cls.down_queue.put([file_name,image_url,dst_dir])
@classmethod
def download_image(cls,timeout=20, proxy_type=None, proxy=None,type="image"):
repeat = 1
while not cls.down_queue.empty() and repeat <= 10:
data = cls.down_queue.get(False)
(file_name,image_url,dst_dir) = [data[0],data[1],data[2]]
cls.common_download(file_name,image_url,dst_dir,timeout=timeout,
proxy=proxy,proxy_type=proxy_type,type=type)
repeat += 1
@classmethod
def download_images(cls,image_urls, dst_dir,concurrency=None,timeout=20,proxy_type=None, proxy=None,files_name=None):
"""
Download image according to given urls and automatically rename them in order.
:param timeout:
:param proxy:
:param proxy_type:
:param image_urls: list of image urls
:param dst_dir: output the downloaded images to dst_dir
:param file_suffix: if set to "img", files will be in format "img_xxx.jpg"
:param concurrency: number of requests process simultaneously
:return: none
"""
if concurrency == None:
concurrency = len(image_urls)
with concurrent.futures.ThreadPoolExecutor(max_workers=concurrency) as executor:
future_list = list()
count = 0
if not os.path.exists(dst_dir):
os.makedirs(dst_dir)
for image_url in image_urls:
file_name = files_name[count]
cls.down_queue.put([file_name,image_url,dst_dir])
future_list.append(executor.submit(
cls.download_image,timeout, proxy_type, proxy))
count += 1
concurrent.futures.wait(future_list, timeout)
@classmethod
def download_comic_icon(cls,is_new=ciUtils.IS_NEW_ICON):
icon_url = Comic.getIcon()
if icon_url == None:
print("icon 不存在,已跳过")
return None
icon_suffix = str(icon_url).split(".")[-1]
#判断漫画名路径是否已存在comicname/cover.jpg, 存在跳过
path_comic_icon = ComicPath.getPathConfComicIcon(suffix=icon_suffix)
if not ciUtils.equIcon() and fu.exists(path_comic_icon): os.remove(path_comic_icon)
if fu.notExists(path_comic_icon):
cls.download_images([icon_url],ComicPath.getDirConfComic(),files_name=[ComicPath.COMIC_ICON_NAME+"."+icon_suffix],timeout=30)
save_path = ComicPath.getPathCBZComicChapterIcon(icon_suffix)
if is_new:
#历史版本ICON
if os.path.exists(save_path):
os.remove(save_path)
if os.path.exists(path_comic_icon):
base_dir = ComicPath.getDirComicChapter()
if not os.path.exists(base_dir): os.makedirs(base_dir)
shutil.copy(path_comic_icon,os.path.join(base_dir,ComicPath.COMIC_ICON_NAME+icon_suffix))
else:
if fu.notExists(ComicPath.getDirCBZComic()): os.makedirs(ComicPath.getDirCBZComic())
if fu.notExists(save_path):
shutil.copy(path_comic_icon,save_path)
print(f"{path_comic_icon} 已复制至: {save_path}")
#保存icon信息
ciUtils.iconDB()
ciUtils.setProgressCBZ()