internet.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. from __future__ import annotations
  2. from aiohttp import ClientSession, ClientTimeout
  3. try:
  4. from duckduckgo_search.duckduckgo_search_async import AsyncDDGS
  5. from bs4 import BeautifulSoup
  6. has_requirements = True
  7. except ImportError:
  8. has_requirements = False
  9. from ...errors import MissingRequirementsError
  10. import asyncio
  11. class SearchResults():
  12. def __init__(self, results: list):
  13. self.results = results
  14. def __iter__(self):
  15. yield from self.results
  16. def __str__(self):
  17. search = ""
  18. for idx, result in enumerate(self.results):
  19. if search:
  20. search += "\n\n\n"
  21. search += f"Title: {result.title}\n\n"
  22. if result.text:
  23. search += result.text
  24. else:
  25. search += result.snippet
  26. search += f"\n\nSource: [[{idx}]]({result.url})"
  27. return search
  28. def __len__(self) -> int:
  29. return len(self.results)
  30. class SearchResultEntry():
  31. def __init__(self, title: str, url: str, snippet: str, text: str = None):
  32. self.title = title
  33. self.url = url
  34. self.snippet = snippet
  35. self.text = text
  36. def set_text(self, text: str):
  37. self.text = text
  38. def scrape_text(html: str, max_words: int = None) -> str:
  39. soup = BeautifulSoup(html, "html.parser")
  40. for exclude in soup(["script", "style"]):
  41. exclude.extract()
  42. for selector in [
  43. "main",
  44. ".main-content-wrapper",
  45. ".main-content",
  46. ".emt-container-inner",
  47. ".content-wrapper",
  48. "#content",
  49. "#mainContent",
  50. ]:
  51. select = soup.select_one(selector)
  52. if select:
  53. soup = select
  54. break
  55. # Zdnet
  56. for remove in [".c-globalDisclosure"]:
  57. select = soup.select_one(remove)
  58. if select:
  59. select.extract()
  60. clean_text = ""
  61. for paragraph in soup.select("p"):
  62. text = paragraph.get_text()
  63. for line in text.splitlines():
  64. words = []
  65. for word in line.replace("\t", " ").split(" "):
  66. if word:
  67. words.append(word)
  68. count = len(words)
  69. if not count:
  70. continue
  71. if max_words:
  72. max_words -= count
  73. if max_words <= 0:
  74. break
  75. if clean_text:
  76. clean_text += "\n"
  77. clean_text += " ".join(words)
  78. return clean_text
  79. async def fetch_and_scrape(session: ClientSession, url: str, max_words: int = None) -> str:
  80. try:
  81. async with session.get(url) as response:
  82. if response.status == 200:
  83. html = await response.text()
  84. return scrape_text(html, max_words)
  85. except:
  86. return
  87. async def search(query: str, n_results: int = 5, max_words: int = 2500, add_text: bool = True) -> SearchResults:
  88. if not has_requirements:
  89. raise MissingRequirementsError('Install "duckduckgo-search" and "beautifulsoup4" package')
  90. async with AsyncDDGS() as ddgs:
  91. results = []
  92. for result in await ddgs.atext(
  93. query,
  94. region="wt-wt",
  95. safesearch="moderate",
  96. timelimit="y",
  97. max_results=n_results
  98. ):
  99. results.append(SearchResultEntry(
  100. result["title"],
  101. result["href"],
  102. result["body"]
  103. ))
  104. if add_text:
  105. requests = []
  106. async with ClientSession(timeout=ClientTimeout(5)) as session:
  107. for entry in results:
  108. requests.append(fetch_and_scrape(session, entry.url, int(max_words / (n_results - 1))))
  109. texts = await asyncio.gather(*requests)
  110. formatted_results = []
  111. left_words = max_words
  112. for i, entry in enumerate(results):
  113. if add_text:
  114. entry.text = texts[i]
  115. if left_words:
  116. left_words -= entry.title.count(" ") + 5
  117. if entry.text:
  118. left_words -= entry.text.count(" ")
  119. else:
  120. left_words -= entry.snippet.count(" ")
  121. if 0 > left_words:
  122. break
  123. formatted_results.append(entry)
  124. return SearchResults(formatted_results)
  125. def get_search_message(prompt) -> str:
  126. try:
  127. search_results = asyncio.run(search(prompt))
  128. message = f"""
  129. {search_results}
  130. Instruction: Using the provided web search results, to write a comprehensive reply to the user request.
  131. Make sure to add the sources of cites using [[Number]](Url) notation after the reference. Example: [[0]](http://google.com)
  132. User request:
  133. {prompt}
  134. """
  135. return message
  136. except Exception as e:
  137. print("Couldn't do web search:", e)
  138. return prompt