internet.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. from __future__ import annotations
  2. from aiohttp import ClientSession, ClientTimeout
  3. try:
  4. from duckduckgo_search import DDGS
  5. from bs4 import BeautifulSoup
  6. has_requirements = True
  7. except ImportError:
  8. has_requirements = False
  9. from ...errors import MissingRequirementsError
  10. from ... import debug
  11. import asyncio
  12. class SearchResults():
  13. def __init__(self, results: list, used_words: int):
  14. self.results = results
  15. self.used_words = used_words
  16. def __iter__(self):
  17. yield from self.results
  18. def __str__(self):
  19. search = ""
  20. for idx, result in enumerate(self.results):
  21. if search:
  22. search += "\n\n\n"
  23. search += f"Title: {result.title}\n\n"
  24. if result.text:
  25. search += result.text
  26. else:
  27. search += result.snippet
  28. search += f"\n\nSource: [[{idx}]]({result.url})"
  29. return search
  30. def __len__(self) -> int:
  31. return len(self.results)
  32. class SearchResultEntry():
  33. def __init__(self, title: str, url: str, snippet: str, text: str = None):
  34. self.title = title
  35. self.url = url
  36. self.snippet = snippet
  37. self.text = text
  38. def set_text(self, text: str):
  39. self.text = text
  40. def scrape_text(html: str, max_words: int = None) -> str:
  41. soup = BeautifulSoup(html, "html.parser")
  42. for selector in [
  43. "main",
  44. ".main-content-wrapper",
  45. ".main-content",
  46. ".emt-container-inner",
  47. ".content-wrapper",
  48. "#content",
  49. "#mainContent",
  50. ]:
  51. select = soup.select_one(selector)
  52. if select:
  53. soup = select
  54. break
  55. # Zdnet
  56. for remove in [".c-globalDisclosure"]:
  57. select = soup.select_one(remove)
  58. if select:
  59. select.extract()
  60. clean_text = ""
  61. for paragraph in soup.select("p, h1, h2, h3, h4, h5, h6"):
  62. text = paragraph.get_text()
  63. for line in text.splitlines():
  64. words = []
  65. for word in line.replace("\t", " ").split(" "):
  66. if word:
  67. words.append(word)
  68. count = len(words)
  69. if not count:
  70. continue
  71. if max_words:
  72. max_words -= count
  73. if max_words <= 0:
  74. break
  75. if clean_text:
  76. clean_text += "\n"
  77. clean_text += " ".join(words)
  78. return clean_text
  79. async def fetch_and_scrape(session: ClientSession, url: str, max_words: int = None) -> str:
  80. try:
  81. async with session.get(url) as response:
  82. if response.status == 200:
  83. html = await response.text()
  84. return scrape_text(html, max_words)
  85. except:
  86. return
  87. async def search(query: str, n_results: int = 5, max_words: int = 2500, add_text: bool = True) -> SearchResults:
  88. if not has_requirements:
  89. raise MissingRequirementsError('Install "duckduckgo-search" and "beautifulsoup4" package | pip install -U g4f[search]')
  90. with DDGS() as ddgs:
  91. results = []
  92. for result in ddgs.text(
  93. query,
  94. region="wt-wt",
  95. safesearch="moderate",
  96. timelimit="y",
  97. max_results=n_results,
  98. backend="html"
  99. ):
  100. results.append(SearchResultEntry(
  101. result["title"],
  102. result["href"],
  103. result["body"]
  104. ))
  105. if add_text:
  106. requests = []
  107. async with ClientSession(timeout=ClientTimeout(5)) as session:
  108. for entry in results:
  109. requests.append(fetch_and_scrape(session, entry.url, int(max_words / (n_results - 1))))
  110. texts = await asyncio.gather(*requests)
  111. formatted_results = []
  112. used_words = 0
  113. left_words = max_words
  114. for i, entry in enumerate(results):
  115. if add_text:
  116. entry.text = texts[i]
  117. if left_words:
  118. left_words -= entry.title.count(" ") + 5
  119. if entry.text:
  120. left_words -= entry.text.count(" ")
  121. else:
  122. left_words -= entry.snippet.count(" ")
  123. if 0 > left_words:
  124. break
  125. used_words = max_words - left_words
  126. formatted_results.append(entry)
  127. return SearchResults(formatted_results, used_words)
  128. def get_search_message(prompt, n_results: int = 5, max_words: int = 2500) -> str:
  129. try:
  130. search_results = asyncio.run(search(prompt, n_results, max_words))
  131. message = f"""
  132. {search_results}
  133. Instruction: Using the provided web search results, to write a comprehensive reply to the user request.
  134. Make sure to add the sources of cites using [[Number]](Url) notation after the reference. Example: [[0]](http://google.com)
  135. User request:
  136. {prompt}
  137. """
  138. debug.log(f"Web search: '{prompt.strip()[:50]}...' {search_results.used_words} Words")
  139. return message
  140. except Exception as e:
  141. debug.log(f"Couldn't do web search: {e.__class__.__name__}: {e}")
  142. return prompt