PyComicPackRouMan/utils/NetUtils.py
2023-04-07 08:49:49 +08:00

257 lines
10 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import print_function
from queue import Queue
from fake_useragent import UserAgent
import shutil,imghdr,concurrent.futures
import requests,os,json,time,re
from lxml import html
from urllib3.util.retry import Retry
from requests.adapters import HTTPAdapter
from common.Constant import pathStr
from common.ComicInfo import ComicInfoUtils as ciUtils
from common.ComicInfo import ComicInfo as ci
from common.ComicInfo import Comic
from common.Constant import ComicPath
from utils.FileUtils import fileUtils as fu
from utils.Logger import logger
class htmlUtils:
headers = {'User-Agent': UserAgent().random}
url_data = {}
#domain
@classmethod
def parseExec(cls,data,exec):
if data !=None and exec != None:
dots = str(exec).split(".")
if not isinstance(data,dict): data = json.loads(data)
for dot in dots:
data = data.get(dot)
return data
@classmethod
def getXpathData(cls,c_xpath,url=None,num=None,not_eq=None,update=False):
return htmlUtils.xpathData(c_xpath=c_xpath,url=url,num=num,not_eq=not_eq,update=update)
@classmethod
def setXpathData(cls,url,xpath,exec,num=None,result_type=None,type=None,start_add=None,update=False):
result = cls.parseExec(htmlUtils.xpathData(xpath,url=url,num=num,update=update),exec)
if result == None: return None
if result_type == "list" and type != None:
data = []
for x in range(0, len(result)):
if start_add != None:
data.append(start_add+result[x].get(type))
else:
data.append(result[x].get(type))
return data
return result
@classmethod
def getPathSaveHtml(cls,url,type=None):
rstr = r"[\/\\\:\*\?\"\<\>\|\.]" #  '/ \ : * ? " < > |'
try:
file_url = re.sub(rstr, "", url)
except:
file_url = "error_cache"
file_path = os.path.join(pathStr.base_html_cache(),file_url)
if type == "new":
return file_path
if os.path.exists(file_path):
if type == "read":
with open(file_path,"r",encoding="utf-8") as fs: return fs.read()
return file_path
else:
return None
@classmethod
def saveHtml(cls,url,data,type=None):
file_path = cls.getPathSaveHtml(url,type="new")
dir_name = os.path.dirname(file_path)
if not os.path.exists(dir_name):
os.makedirs(dir_name)
with open(file_path,"w",encoding="utf-8") as fs:
if type== "json": data = json.dumps(data)
fs.write(str(data))
@classmethod
def remove_HtmlCache(cls,url):
file_path = cls.getPathSaveHtml(url,type="new")
if os.path.exists(file_path):
try:
os.remove(file_path)
print("已删除")
except:
print()
@classmethod
def getHTML(cls, curl,type=None,update=False):
url_text = None
if update: cls.remove_HtmlCache(curl)
retries = Retry(total=1, backoff_factor=0.5, status_forcelist=[ 500, 502, 503, 504 ])
s = requests.Session()
s.keep_alive = False
s.mount('http://', HTTPAdapter(max_retries=retries))
s.mount('https://', HTTPAdapter(max_retries=retries))
#数据为空则获取数据
try: url_text = cls.getPathSaveHtml(curl,"read")
except: url_text = None
if url_text != None and update == False: return html.fromstring(url_text)
else: url_text = None
repeat = 0
while url_text == None and repeat <=5:
try:
print(f"请求地址:{curl}")
res = s.get(curl,stream=True, headers=cls.headers, timeout=10,allow_redirects=True)
if type == "bytes":
url_text = res
if type == "json":
cls.saveHtml(curl,res.text,type="json")
return json.loads(res.text)
if type == None:
url_text = html.fromstring(res.text)
cls.saveHtml(curl,res.text)
res.close()
except Exception as e:
repeat += 1
print(f"请求失败Exception: {e} {curl}")
return url_text
@classmethod
def getBytes(cls, url):
return cls.getHTML(url,type="bytes")
@classmethod
def getJSON(cls,url,update=False):
return cls.getHTML(url,type="json",update=update)
@classmethod
def xpathData(cls,c_xpath,url=None,num=None,not_eq=None,update=False):
if url == None: url = cls.temp_url
else: cls.temp_url = url
result = []
if update:
html_cache_path = cls.getPathSaveHtml(url,"new")
if os.path.exists(html_cache_path):
try:
os.remove(html_cache_path)
logger.info(f"html_cache更新成功 {html_cache_path}")
except:
logger.info(f"html_cache更新失败 {html_cache_path}")
#获取html实体数据
et = cls.getHTML(url)
if et == None:
return None
#比对数据
count = 1
xpaths = et.xpath(c_xpath)
for x in xpaths:
if x != not_eq:
result.append(x)
count +=1
if num != None:
try:
result = result[num]
except:
result = None
return result
class downloadUtils:
QUEUE_DOWN = Queue()
TYPE_IMG = "image"
TYPE_ICON = "icon"
headers = {
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Proxy-Connection": "keep-alive",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.99 Safari/537.36",
"Accept-Encoding": "gzip, deflate, sdch",
# 'Connection': 'close',
}
@classmethod
def downQueueClear(cls): cls.QUEUE_DOWN = Queue()
@classmethod
def putDownUrlDirFileType(cls,url,dir,file,type): cls.QUEUE_DOWN.put([url,dir,file,type])
@classmethod
def getDownUrlDirFileType(cls):
if not cls.QUEUE_DOWN.empty(): return cls.QUEUE_DOWN.get(False)
else: return None
@classmethod
def putDownImageUrlDirFile(cls,url,dir,file): cls.putDownUrlDirFileType(url,dir,file,cls.TYPE_IMG)
@classmethod
def common_download(cls,file_url,dir,file,file_type,repair_max=15,timeout=10,proxy=None,proxy_type=None):
logger.debug(f"file_url={file_url}, dir={dir} , file={file}, file_type={file_type}")
en_scrabmle_file = ComicPath.getFileScrambleImageSave(file)
en_scrabmle_path = os.path.join(dir,en_scrabmle_file)
save_path = os.path.join(dir,file)
logger.debug(f"save_path= {save_path}")
if os.path.exists(en_scrabmle_path):
logger.info(f"文件已存在,跳过中... {en_scrabmle_path}")
return True
if file_url == None:
logger.error("common_down file_url 为空")
raise NameError("common_down file_url为空")
proxies = None
if proxy_type is not None:
proxies = {
"http": proxy_type + "://" + proxy,
"https": proxy_type + "://" + proxy }
response = None
logger.debug(f"save_path {save_path}")
if not os.path.exists(dir): os.makedirs(dir)
temp_path = save_path+".downloads"
repair_count = 1
while not os.path.exists(save_path) and repair_count <= repair_max:
try:
response = requests.get(
file_url, headers=cls.headers, timeout=timeout, proxies=proxies)
if response.status_code != 200 and repair_count <= repair_max:
logger.warning("下载异常")
raise NameError("下载异常")
with open(temp_path, 'wb') as f:
f.write(response.content)
time.sleep(0.7)
response.close()
#验证是否是图像
if fu.ver_file(temp_path,type=file_type):
shutil.move(temp_path, save_path)
logger.info("## OK: {} {}".format(save_path, file_url))
else:
logger.warning("## Fail: {} {}".format(file_url, "图像损坏"))
raise NameError("## Fail: {} {}".format(file_url, "图像损坏"))
except Exception as e:
logger.warning(f'重试:第{repair_count}次 异常:{e} {file_url}')
repair_count += 1
@classmethod
def start_downloads(cls,repair_max=20,concurrency=None,timeout=20,proxy_type=None, proxy=None):
"""
Download image according to given urls and automatically rename them in order.
:param timeout:
:param proxy:
:param proxy_type:
:param image_urls: list of image urls
:param dst_dir: output the downloaded images to dst_dir
:param file_suffix: if set to "img", files will be in format "img_xxx.jpg"
:param concurrency: number of requests process simultaneously
:return: none
"""
if concurrency == None:
concurrency = cls.QUEUE_DOWN.qsize()
logger.debug(f"concurrency= {concurrency}")
with concurrent.futures.ThreadPoolExecutor(max_workers=concurrency) as executor:
future_list = list()
while True:
result = cls.getDownUrlDirFileType()
if result != None:
(file_url,dir,file,file_type) = [result[0],result[1],result[2],result[3]]
future_list.append(executor.submit(cls.common_download,
file_url,dir,file,file_type,
timeout=timeout, proxy_type=proxy_type, proxy=proxy))
else:
break
concurrent.futures.wait(future_list, timeout)