123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- # File : htmlParser.py
- # Author: DaShenHan&道长-----先苦后甜,任凭晚风拂柳颜------
- # Date : 2022/8/25
- # upDate : 2022/11/17 支持 -- 剔除元素 多个剔除
- import ujson
- from pyquery import PyQuery as pq
- from urllib.parse import urljoin
- import re
- from jsonpath import jsonpath
- PARSE_CACHE = True # 解析缓存
- NOADD_INDEX = ':eq|:lt|:gt|:first|:last|^body$|^#' # 不自动加eq下标索引
- URLJOIN_ATTR = '(url|src|href|-original|-src|-play|-url|style)$' # 需要自动urljoin的属性
- SPECIAL_URL = '^(ftp|magnet|thunder|ws):' # 过滤特殊链接,不走urlJoin
- class jsoup:
- def __init__(self, MY_URL=''):
- self.MY_URL = MY_URL
- self.pdfh_html = ''
- self.pdfa_html = ''
- self.pdfh_doc = None
- self.pdfa_doc = None
- def test(self, text: str, string: str):
- """
- 正则判断字符串包含,模仿js的 //.test()
- :param text:
- :param string:
- :return:
- """
- searchObj = re.search(rf'{text}', string, re.M | re.I)
- test_ret = True if searchObj else False
- return test_ret
- def contains(self, text: str, match: str):
- # return match in text
- return text.find(match) > -1
- def parseHikerToJq(self, parse, first=False):
- """
- 海阔解析表达式转原生表达式,自动补eq,如果传了first就最后一个也取eq(0)
- :param parse:
- :param first:
- :return:
- """
- if self.contains(parse, '&&'):
- parse = parse.split('&&') # 带&&的重新拼接
- new_parses = [] # 构造新的解析表达式列表
- for i in range(len(parse)):
- ps = parse[i].split(' ')[-1] # 如果分割&&后带空格就取最后一个元素
- if not self.test(NOADD_INDEX, ps):
- if not first and i >= len(parse) - 1: # 不传first且遇到最后一个,不用补eq(0)
- new_parses.append(parse[i])
- else:
- new_parses.append(f'{parse[i]}:eq(0)')
- else:
- new_parses.append(parse[i])
- parse = ' '.join(new_parses)
- else:
- ps = parse.split(' ')[-1] # 如果带空格就取最后一个元素
- if not self.test(NOADD_INDEX, ps) and first:
- parse = f'{parse}:eq(0)'
- return parse
- def getParseInfo(self, nparse):
- """
- 根据传入的单规则获取 parse规则,索引位置,排除列表 -- 可以用于剔除元素,支持多个,按标签剔除,按id剔除等操作
- :param nparse:
- :return:
- """
- excludes = [] # 定义排除列表默认值为空
- nparse_index = 0 # 定义位置索引默认值为0
- nparse_rule = nparse # 定义规则默认值为本身
- if self.contains(nparse, ':eq'):
- nparse_rule = nparse.split(':eq')[0]
- nparse_pos = nparse.split(':eq')[1]
- # print(nparse_rule)
- if self.contains(nparse_rule, '--'):
- excludes = nparse_rule.split('--')[1:]
- nparse_rule = nparse_rule.split('--')[0]
- elif self.contains(nparse_pos, '--'):
- excludes = nparse_pos.split('--')[1:]
- nparse_pos = nparse_pos.split('--')[0]
- try:
- nparse_index = int(nparse_pos.split('(')[1].split(')')[0])
- except:
- pass
- elif self.contains(nparse, '--'):
- nparse_rule = nparse.split('--')[0]
- excludes = nparse.split('--')[1:]
- # if nparse_index > 0:
- # print(f'nparse_rule:{nparse_rule},nparse_index:{nparse_index},excludes:{excludes}')
- return nparse_rule, nparse_index, excludes
- def parseOneRule(self, doc, nparse, ret=None):
- """
- 解析空格分割后的原生表达式中的一条记录,正确处理eq的索引,返回处理后的ret
- :param doc: pq(html) load 后的pq对象
- :param nparse: 当前单个解析表达式
- :param ret: pd对象结果
- :return:
- """
- nparse_rule, nparse_index, excludes = self.getParseInfo(nparse)
- if not ret:
- ret = doc(nparse_rule)
- else:
- ret = ret(nparse_rule)
- # print(f'nparse_rule:{nparse_rule},nparse_index:{nparse_index},excludes:{excludes},ret:{ret}')
- if self.contains(nparse, ':eq'):
- ret = ret.eq(nparse_index)
- # if nparse_index > 4:
- # print('nparse_index',ret,not ret)
- if excludes and ret:
- # print(excludes)
- ret = ret.clone() # 克隆一个,免得直接remove会影响doc的缓存
- for exclude in excludes:
- # ret.remove(exclude)
- ret(exclude).remove()
- return ret
- def pdfa(self, html, parse: str):
- # 看官方文档才能解决这个问题!!!
- # https://pyquery.readthedocs.io/en/latest/api.html
- if not all([html, parse]):
- return []
- parse = self.parseHikerToJq(parse)
- print(f'pdfa:{parse}')
- if PARSE_CACHE:
- if self.pdfa_html != html:
- self.pdfa_html = html
- self.pdfa_doc = pq(html)
- doc = self.pdfa_doc
- else:
- doc = pq(html)
- parses = parse.split(' ')
- # print(parses)
- ret = None
- for nparse in parses:
- ret = self.parseOneRule(doc, nparse, ret)
- if not ret: # 可能循环取值后ret 对应eq取完无值了,pdfa直接返回空列表
- return []
- res = [item.outerHtml() for item in ret.items()]
- return res
- def pdfh(self, html, parse: str, base_url: str = ''):
- if not all([html, parse]):
- return ''
- if PARSE_CACHE:
- if self.pdfh_html != html:
- self.pdfh_html = html
- self.pdfh_doc = pq(html)
- doc = self.pdfh_doc
- else:
- doc = pq(html)
- if parse == 'body&&Text' or parse == 'Text':
- text = doc.text()
- return text
- elif parse == 'body&&Html' or parse == 'Html':
- return doc.html()
- option = None
- if self.contains(parse, '&&'):
- option = parse.split('&&')[-1]
- parse = '&&'.join(parse.split('&&')[:-1])
- parse = self.parseHikerToJq(parse, True)
- # print(f'pdfh:{parse},option:{option}')
- parses = parse.split(' ')
- # print(parses)
- ret = None
- for nparse in parses:
- ret = self.parseOneRule(doc, nparse, ret)
- # print(nparse,ret)
- if not ret: # 可能循环取值后ret 对应eq取完无值了,pdfh直接返回空字符串
- return ''
- if option:
- if option == 'Text':
- ret = ret.text()
- elif option == 'Html':
- ret = ret.html()
- else:
- ret = ret.attr(option) or ''
- if self.contains(option.lower(), 'style') and self.contains(ret, 'url('):
- try:
- ret = re.search('url\((.*?)\)', ret, re.M | re.S).groups()[0]
- # 2023/07/28新增 style取内部链接自动去除首尾单双引号
- ret = re.sub(r"^['\"]|['\"]$", '', ret)
- except:
- pass
- if ret and base_url:
- # need_add = re.search(URLJOIN_ATTR, option, re.M | re.I)
- need_add = self.test(URLJOIN_ATTR, option) and not self.test(SPECIAL_URL, ret)
- if need_add:
- if 'http' in ret:
- ret = ret[ret.find('http'):]
- else:
- ret = urljoin(base_url, ret)
- else:
- ret = ret.outerHtml()
- return ret
- def pd(self, html, parse: str, base_url: str = ''):
- if not base_url:
- base_url = self.MY_URL
- return self.pdfh(html, parse, base_url)
- def pq(self, html: str):
- return pq(html)
- def pjfh(self, html, parse: str, add_url=False):
- if not all([html, parse]):
- return ''
- if isinstance(html, str):
- # print(html)
- try:
- html = ujson.loads(html)
- # html = eval(html)
- except:
- print('字符串转json失败')
- return ''
- if not parse.startswith('$.'):
- parse = f'$.{parse}'
- ret = ''
- for ps in parse.split('||'):
- ret = jsonpath(html, ps)
- if isinstance(ret, list):
- ret = str(ret[0]) if ret[0] else ''
- else:
- ret = str(ret) if ret else ''
- if add_url and ret:
- ret = urljoin(self.MY_URL, ret)
- if ret:
- break
- # print(ret)
- return ret
- def pj(self, html, parse: str):
- return self.pjfh(html, parse, True)
- def pjfa(self, html, parse: str):
- if not all([html, parse]):
- return []
- if isinstance(html, str):
- try:
- html = ujson.loads(html)
- except:
- return []
- if not parse.startswith('$.'):
- parse = f'$.{parse}'
- # print(html)
- # print(parse)
- ret = jsonpath(html, parse)
- # print(ret)
- # print(type(ret))
- # print(type(ret[0]))
- # print(len(ret))
- if isinstance(ret, list) and isinstance(ret[0], list) and len(ret) == 1:
- # print('自动解包')
- ret = ret[0] # 自动解包
- return ret or []
- if __name__ == '__main__':
- pass
|