123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417 |
- # SPDX-License-Identifier: AGPL-3.0-or-later
- import typing
- import types
- import functools
- import itertools
- import threading
- from time import time
- from urllib.parse import urlparse
- import re
- from langdetect import detect_langs
- from langdetect.lang_detect_exception import LangDetectException
- import requests.exceptions
- from searx import poolrequests, logger
- from searx.results import ResultContainer
- from searx.search.models import SearchQuery, EngineRef
- from searx.search.processors import EngineProcessor
- logger = logger.getChild('searx.search.checker')
- HTML_TAGS = [
- 'embed', 'iframe', 'object', 'param', 'picture', 'source', 'svg', 'math', 'canvas', 'noscript', 'script',
- 'del', 'ins', 'area', 'audio', 'img', 'map', 'track', 'video', 'a', 'abbr', 'b', 'bdi', 'bdo', 'br', 'cite',
- 'code', 'data', 'dfn', 'em', 'i', 'kdb', 'mark', 'q', 'rb', 'rp', 'rt', 'rtc', 'ruby', 's', 'samp', 'small',
- 'span', 'strong', 'sub', 'sup', 'time', 'u', 'var', 'wbr', 'style', 'blockquote', 'dd', 'div', 'dl', 'dt',
- 'figcaption', 'figure', 'hr', 'li', 'ol', 'p', 'pre', 'ul', 'button', 'datalist', 'fieldset', 'form', 'input',
- 'label', 'legend', 'meter', 'optgroup', 'option', 'output', 'progress', 'select', 'textarea', 'applet',
- 'frame', 'frameset'
- ]
- def get_check_no_html():
- rep = ['<' + tag + '[^\>]*>' for tag in HTML_TAGS]
- rep += ['</' + tag + '>' for tag in HTML_TAGS]
- pattern = re.compile('|'.join(rep))
- def f(text):
- return pattern.search(text.lower()) is None
- return f
- _check_no_html = get_check_no_html()
- def _is_url(url):
- try:
- result = urlparse(url)
- except ValueError:
- return False
- if result.scheme not in ('http', 'https'):
- return False
- return True
- @functools.lru_cache(maxsize=8192)
- def _is_url_image(image_url):
- if not isinstance(image_url, str):
- return False
- if image_url.startswith('//'):
- image_url = 'https:' + image_url
- if image_url.startswith('data:'):
- return image_url.startswith('data:image/')
- if not _is_url(image_url):
- return False
- retry = 2
- while retry > 0:
- a = time()
- try:
- poolrequests.set_timeout_for_thread(10.0, time())
- r = poolrequests.get(image_url, timeout=10.0, allow_redirects=True, headers={
- 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:84.0) Gecko/20100101 Firefox/84.0',
- 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
- 'Accept-Language': 'en-US;q=0.5,en;q=0.3',
- 'Accept-Encoding': 'gzip, deflate, br',
- 'DNT': '1',
- 'Connection': 'keep-alive',
- 'Upgrade-Insecure-Requests': '1',
- 'Sec-GPC': '1',
- 'Cache-Control': 'max-age=0'
- })
- if r.headers["content-type"].startswith('image/'):
- return True
- return False
- except requests.exceptions.Timeout:
- logger.error('Timeout for %s: %i', image_url, int(time() - a))
- retry -= 1
- except requests.exceptions.RequestException:
- logger.exception('Exception for %s', image_url)
- return False
- def _search_query_to_dict(search_query: SearchQuery) -> typing.Dict[str, typing.Any]:
- return {
- 'query': search_query.query,
- 'lang': search_query.lang,
- 'pageno': search_query.pageno,
- 'safesearch': search_query.safesearch,
- 'time_range': search_query.time_range,
- }
- def _search_query_diff(sq1: SearchQuery, sq2: SearchQuery)\
- -> typing.Tuple[typing.Dict[str, typing.Any], typing.Dict[str, typing.Any]]:
- param1 = _search_query_to_dict(sq1)
- param2 = _search_query_to_dict(sq2)
- common = {}
- diff = {}
- for k, value1 in param1.items():
- value2 = param2[k]
- if value1 == value2:
- common[k] = value1
- else:
- diff[k] = (value1, value2)
- return (common, diff)
- class TestResults:
- __slots__ = 'errors', 'logs', 'languages'
- def __init__(self):
- self.errors: typing.Dict[str, typing.List[str]] = {}
- self.logs: typing.Dict[str, typing.List[typing.Any]] = {}
- self.languages: typing.Set[str] = set()
- def add_error(self, test, message, *args):
- # message to self.errors
- errors_for_test = self.errors.setdefault(test, [])
- if message not in errors_for_test:
- errors_for_test.append(message)
- # (message, *args) to self.logs
- logs_for_test = self.logs.setdefault(test, [])
- if (message, *args) not in logs_for_test:
- logs_for_test.append((message, *args))
- def add_language(self, language):
- self.languages.add(language)
- @property
- def successful(self):
- return len(self.errors) == 0
- def __iter__(self):
- for test_name, errors in self.errors.items():
- for error in sorted(errors):
- yield (test_name, error)
- class ResultContainerTests:
- __slots__ = 'test_name', 'search_query', 'result_container', 'languages', 'stop_test', 'test_results'
- def __init__(self,
- test_results: TestResults,
- test_name: str,
- search_query: SearchQuery,
- result_container: ResultContainer):
- self.test_name = test_name
- self.search_query = search_query
- self.result_container = result_container
- self.languages: typing.Set[str] = set()
- self.test_results = test_results
- self.stop_test = False
- @property
- def result_urls(self):
- results = self.result_container.get_ordered_results()
- return [result['url'] for result in results if 'url' in result]
- def _record_error(self, message: str, *args) -> None:
- sq = _search_query_to_dict(self.search_query)
- sqstr = ' '.join(['{}={!r}'.format(k, v) for k, v in sq.items()])
- self.test_results.add_error(self.test_name, message, *args, '(' + sqstr + ')')
- def _add_language(self, text: str) -> typing.Optional[str]:
- try:
- r = detect_langs(str(text)) # pylint: disable=E1101
- except LangDetectException:
- return None
- if len(r) > 0 and r[0].prob > 0.95:
- self.languages.add(r[0].lang)
- self.test_results.add_language(r[0].lang)
- return None
- def _check_result(self, result):
- if not _check_no_html(result.get('title', '')):
- self._record_error('HTML in title', repr(result.get('title', '')))
- if not _check_no_html(result.get('content', '')):
- self._record_error('HTML in content', repr(result.get('content', '')))
- if result.get('url') is None:
- self._record_error('url is None')
- self._add_language(result.get('title', ''))
- self._add_language(result.get('content', ''))
- template = result.get('template', 'default.html')
- if template == 'default.html':
- return
- if template == 'code.html':
- return
- if template == 'torrent.html':
- return
- if template == 'map.html':
- return
- if template == 'images.html':
- thumbnail_src = result.get('thumbnail_src')
- if thumbnail_src is not None:
- if not _is_url_image(thumbnail_src):
- self._record_error('thumbnail_src URL is invalid', thumbnail_src)
- elif not _is_url_image(result.get('img_src')):
- self._record_error('img_src URL is invalid', result.get('img_src'))
- if template == 'videos.html' and not _is_url_image(result.get('thumbnail')):
- self._record_error('thumbnail URL is invalid', result.get('img_src'))
- def _check_results(self, results: list):
- for result in results:
- self._check_result(result)
- def _check_answers(self, answers):
- for answer in answers:
- if not _check_no_html(answer):
- self._record_error('HTML in answer', answer)
- def _check_infoboxes(self, infoboxes):
- for infobox in infoboxes:
- if not _check_no_html(infobox.get('content', '')):
- self._record_error('HTML in infobox content', infobox.get('content', ''))
- self._add_language(infobox.get('content', ''))
- for attribute in infobox.get('attributes', {}):
- if not _check_no_html(attribute.get('value', '')):
- self._record_error('HTML in infobox attribute value', attribute.get('value', ''))
- def check_basic(self):
- if len(self.result_container.unresponsive_engines) > 0:
- for message in self.result_container.unresponsive_engines:
- self._record_error(message[1] + ' ' + (message[2] or ''))
- self.stop_test = True
- return
- results = self.result_container.get_ordered_results()
- if len(results) > 0:
- self._check_results(results)
- if len(self.result_container.answers) > 0:
- self._check_answers(self.result_container.answers)
- if len(self.result_container.infoboxes) > 0:
- self._check_infoboxes(self.result_container.infoboxes)
- def has_infobox(self):
- """Check the ResultContainer has at least one infobox"""
- if len(self.result_container.infoboxes) == 0:
- self._record_error('No infobox')
- def has_answer(self):
- """Check the ResultContainer has at least one answer"""
- if len(self.result_container.answers) == 0:
- self._record_error('No answer')
- def has_language(self, lang):
- """Check at least one title or content of the results is written in the `lang`.
- Detected using pycld3, may be not accurate"""
- if lang not in self.languages:
- self._record_error(lang + ' not found')
- def not_empty(self):
- """Check the ResultContainer has at least one answer or infobox or result"""
- result_types = set()
- results = self.result_container.get_ordered_results()
- if len(results) > 0:
- result_types.add('results')
- if len(self.result_container.answers) > 0:
- result_types.add('answers')
- if len(self.result_container.infoboxes) > 0:
- result_types.add('infoboxes')
- if len(result_types) == 0:
- self._record_error('No result')
- def one_title_contains(self, title: str):
- """Check one of the title contains `title` (case insensitive comparison)"""
- title = title.lower()
- for result in self.result_container.get_ordered_results():
- if title in result['title'].lower():
- return
- self._record_error(('{!r} not found in the title'.format(title)))
- class CheckerTests:
- __slots__ = 'test_results', 'test_name', 'result_container_tests_list'
- def __init__(self,
- test_results: TestResults,
- test_name: str,
- result_container_tests_list: typing.List[ResultContainerTests]):
- self.test_results = test_results
- self.test_name = test_name
- self.result_container_tests_list = result_container_tests_list
- def unique_results(self):
- """Check the results of each ResultContainer is unique"""
- urls_list = [rct.result_urls for rct in self.result_container_tests_list]
- if len(urls_list[0]) > 0:
- # results on the first page
- for i, urls_i in enumerate(urls_list):
- for j, urls_j in enumerate(urls_list):
- if i < j and urls_i == urls_j:
- common, diff = _search_query_diff(self.result_container_tests_list[i].search_query,
- self.result_container_tests_list[j].search_query)
- common_str = ' '.join(['{}={!r}'.format(k, v) for k, v in common.items()])
- diff1_str = ', ' .join(['{}={!r}'.format(k, v1) for (k, (v1, v2)) in diff.items()])
- diff2_str = ', ' .join(['{}={!r}'.format(k, v2) for (k, (v1, v2)) in diff.items()])
- self.test_results.add_error(self.test_name,
- 'results are identitical for {} and {} ({})'
- .format(diff1_str, diff2_str, common_str))
- class Checker:
- __slots__ = 'processor', 'tests', 'test_results'
- def __init__(self, processor: EngineProcessor):
- self.processor = processor
- self.tests = self.processor.get_tests()
- self.test_results = TestResults()
- @property
- def engineref_list(self):
- engine_name = self.processor.engine_name
- engine_category = self.processor.engine.categories[0]
- return [EngineRef(engine_name, engine_category)]
- @staticmethod
- def search_query_matrix_iterator(engineref_list, matrix):
- p = []
- for name, values in matrix.items():
- if isinstance(values, (tuple, list)):
- l = [(name, value) for value in values]
- else:
- l = [(name, values)]
- p.append(l)
- for kwargs in itertools.product(*p):
- kwargs = {k: v for k, v in kwargs}
- query = kwargs['query']
- params = dict(kwargs)
- del params['query']
- yield SearchQuery(query, engineref_list, **params)
- def call_test(self, obj, test_description):
- if isinstance(test_description, (tuple, list)):
- method, args = test_description[0], test_description[1:]
- else:
- method = test_description
- args = ()
- if isinstance(method, str) and hasattr(obj, method):
- getattr(obj, method)(*args)
- elif isinstance(method, types.FunctionType):
- method(*args)
- else:
- self.test_results.add_error(obj.test_name,
- 'method {!r} ({}) not found for {}'
- .format(method, method.__class__.__name__, obj.__class__.__name__))
- def call_tests(self, obj, test_descriptions):
- for test_description in test_descriptions:
- self.call_test(obj, test_description)
- def search(self, search_query: SearchQuery) -> ResultContainer:
- result_container = ResultContainer()
- engineref_category = search_query.engineref_list[0].category
- params = self.processor.get_params(search_query, engineref_category)
- if params is not None:
- with threading.RLock():
- self.processor.engine.stats['sent_search_count'] += 1
- self.processor.search(search_query.query, params, result_container, time(), 5)
- return result_container
- def get_result_container_tests(self, test_name: str, search_query: SearchQuery) -> ResultContainerTests:
- result_container = self.search(search_query)
- result_container_check = ResultContainerTests(self.test_results, test_name, search_query, result_container)
- result_container_check.check_basic()
- return result_container_check
- def run_test(self, test_name):
- test_parameters = self.tests[test_name]
- search_query_list = list(Checker.search_query_matrix_iterator(self.engineref_list, test_parameters['matrix']))
- rct_list = [self.get_result_container_tests(test_name, search_query) for search_query in search_query_list]
- stop_test = False
- if 'result_container' in test_parameters:
- for rct in rct_list:
- stop_test = stop_test or rct.stop_test
- if not rct.stop_test:
- self.call_tests(rct, test_parameters['result_container'])
- if not stop_test:
- if 'test' in test_parameters:
- checker_tests = CheckerTests(self.test_results, test_name, rct_list)
- self.call_tests(checker_tests, test_parameters['test'])
- def run(self):
- for test_name in self.tests:
- self.run_test(test_name)
|