123 lines
4.1 KiB
Python
123 lines
4.1 KiB
Python
from fake_useragent import UserAgent
|
||
import requests,os,json
|
||
from lxml import html
|
||
import traceback
|
||
import time,re
|
||
from urllib3.util.retry import Retry
|
||
from requests.adapters import HTTPAdapter
|
||
from utils.Ntfy import ntfy
|
||
from utils.comic.PathStr import pathStr
|
||
|
||
class htmlUtils:
|
||
|
||
headers = {'User-Agent': UserAgent().random}
|
||
url_data = {}
|
||
|
||
@classmethod
|
||
def getPathSaveHtml(cls,url,type=None):
|
||
rstr = r"[\/\\\:\*\?\"\<\>\|\.]" # '/ \ : * ? " < > |'
|
||
try:
|
||
file_url = re.sub(rstr, "", url)
|
||
except:
|
||
file_url = "error_cache"
|
||
file_path = os.path.join(pathStr.base_html_cache(),file_url)
|
||
if type == "new":
|
||
return file_path
|
||
if os.path.exists(file_path):
|
||
if type == "read":
|
||
with open(file_path,"r",encoding="utf-8") as fs: return fs.read()
|
||
return file_path
|
||
else:
|
||
return None
|
||
|
||
@classmethod
|
||
def saveHtml(cls,url,data,type=None):
|
||
file_path = cls.getPathSaveHtml(url,type="new")
|
||
dir_name = os.path.dirname(file_path)
|
||
if not os.path.exists(dir_name):
|
||
os.makedirs(dir_name)
|
||
with open(file_path,"w",encoding="utf-8") as fs:
|
||
if type== "json": data = json.dumps(data)
|
||
fs.write(str(data))
|
||
|
||
@classmethod
|
||
def remove_HtmlCache(cls,url):
|
||
file_path = cls.getPathSaveHtml(url,type="new")
|
||
if os.path.exists(file_path):
|
||
try:
|
||
os.remove(file_path)
|
||
print("已删除")
|
||
except:
|
||
print()
|
||
|
||
@classmethod
|
||
def getHTML(cls, curl,type=None,update=False):
|
||
url_text = None
|
||
if update: cls.remove_HtmlCache(curl)
|
||
retries = Retry(total=5, backoff_factor=0.1, status_forcelist=[ 500, 502, 503, 504 ])
|
||
s = requests.Session()
|
||
s.mount('http://', HTTPAdapter(max_retries=retries))
|
||
s.mount('https://', HTTPAdapter(max_retries=retries))
|
||
#数据为空则获取数据
|
||
try: url_text = cls.getPathSaveHtml(curl,"read")
|
||
except: url_text = None
|
||
if url_text != None and update == False: return html.fromstring(url_text)
|
||
else: url_text = None
|
||
repeat = 0
|
||
while url_text == None and repeat <=5:
|
||
try:
|
||
print(f"请求地址:{curl}")
|
||
res = s.get(curl,stream=True, headers=cls.headers, timeout=5,allow_redirects=True)
|
||
if type == "bytes":
|
||
url_text = res
|
||
if type == "json":
|
||
cls.saveHtml(curl,res.text,type="json")
|
||
return json.loads(res.text)
|
||
if type == None:
|
||
url_text = html.fromstring(res.text)
|
||
cls.saveHtml(curl,res.text)
|
||
except:
|
||
repeat += 1
|
||
ntfy.sendMsg(f"请求失败:{curl}",sleep=1)
|
||
return url_text
|
||
|
||
@classmethod
|
||
def getBytes(cls, url):
|
||
return cls.getHTML(url,type="bytes")
|
||
|
||
@classmethod
|
||
def getJSON(cls,url,update=False):
|
||
return cls.getHTML(url,type="json",update=update)
|
||
|
||
@classmethod
|
||
def xpathData(cls,c_xpath,url=None,num=None,not_eq=None,update=False):
|
||
if url == None:
|
||
url = cls.temp_url
|
||
else:
|
||
cls.temp_url = url
|
||
result = []
|
||
if update:
|
||
html_cache_path = cls.getPathSaveHtml(url,"new")
|
||
if os.path.exists(html_cache_path):
|
||
try:
|
||
os.remove(html_cache_path)
|
||
ntfy.sendMsg(f"html_cache更新成功 {html_cache_path}")
|
||
except:
|
||
ntfy.sendMsg(f"html_cache更新失败 {html_cache_path}")
|
||
#获取html实体数据
|
||
et = cls.getHTML(url)
|
||
if et == None:
|
||
return None
|
||
#比对数据
|
||
count = 1
|
||
xpaths = et.xpath(c_xpath)
|
||
for x in xpaths:
|
||
if x != not_eq:
|
||
result.append(x)
|
||
count +=1
|
||
if num != None:
|
||
try:
|
||
result = result[num]
|
||
except:
|
||
result = None
|
||
return result |