json_engine.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. # SPDX-License-Identifier: AGPL-3.0-or-later
  2. from collections.abc import Iterable
  3. from json import loads
  4. from urllib.parse import urlencode
  5. from searx.utils import to_string, html_to_text
  6. search_url = None
  7. url_query = None
  8. content_query = None
  9. title_query = None
  10. content_html_to_text = False
  11. title_html_to_text = False
  12. paging = False
  13. suggestion_query = ''
  14. results_query = ''
  15. # parameters for engines with paging support
  16. #
  17. # number of results on each page
  18. # (only needed if the site requires not a page number, but an offset)
  19. page_size = 1
  20. # number of the first page (usually 0 or 1)
  21. first_page_num = 1
  22. def iterate(iterable):
  23. if type(iterable) == dict:
  24. it = iterable.items()
  25. else:
  26. it = enumerate(iterable)
  27. for index, value in it:
  28. yield str(index), value
  29. def is_iterable(obj):
  30. if type(obj) == str:
  31. return False
  32. return isinstance(obj, Iterable)
  33. def parse(query):
  34. q = []
  35. for part in query.split('/'):
  36. if part == '':
  37. continue
  38. else:
  39. q.append(part)
  40. return q
  41. def do_query(data, q):
  42. ret = []
  43. if not q:
  44. return ret
  45. qkey = q[0]
  46. for key, value in iterate(data):
  47. if len(q) == 1:
  48. if key == qkey:
  49. ret.append(value)
  50. elif is_iterable(value):
  51. ret.extend(do_query(value, q))
  52. else:
  53. if not is_iterable(value):
  54. continue
  55. if key == qkey:
  56. ret.extend(do_query(value, q[1:]))
  57. else:
  58. ret.extend(do_query(value, q))
  59. return ret
  60. def query(data, query_string):
  61. q = parse(query_string)
  62. return do_query(data, q)
  63. def request(query, params):
  64. query = urlencode({'q': query})[2:]
  65. fp = {'query': query}
  66. if paging and search_url.find('{pageno}') >= 0:
  67. fp['pageno'] = (params['pageno'] - 1) * page_size + first_page_num
  68. params['url'] = search_url.format(**fp)
  69. params['query'] = query
  70. return params
  71. def identity(arg):
  72. return arg
  73. def response(resp):
  74. results = []
  75. json = loads(resp.text)
  76. title_filter = html_to_text if title_html_to_text else identity
  77. content_filter = html_to_text if content_html_to_text else identity
  78. if results_query:
  79. rs = query(json, results_query)
  80. if not len(rs):
  81. return results
  82. for result in rs[0]:
  83. try:
  84. url = query(result, url_query)[0]
  85. title = query(result, title_query)[0]
  86. except:
  87. continue
  88. try:
  89. content = query(result, content_query)[0]
  90. except:
  91. content = ""
  92. results.append({
  93. 'url': to_string(url),
  94. 'title': title_filter(to_string(title)),
  95. 'content': content_filter(to_string(content)),
  96. })
  97. else:
  98. for url, title, content in zip(
  99. query(json, url_query),
  100. query(json, title_query),
  101. query(json, content_query)
  102. ):
  103. results.append({
  104. 'url': to_string(url),
  105. 'title': title_filter(to_string(title)),
  106. 'content': content_filter(to_string(content)),
  107. })
  108. if not suggestion_query:
  109. return results
  110. for suggestion in query(json, suggestion_query):
  111. results.append({'suggestion': suggestion})
  112. return results