123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158 |
- from __future__ import annotations
- from aiohttp import ClientSession, ClientTimeout
- try:
- from duckduckgo_search import DDGS
- from bs4 import BeautifulSoup
- has_requirements = True
- except ImportError:
- has_requirements = False
- from ...errors import MissingRequirementsError
-
- import asyncio
- class SearchResults():
- def __init__(self, results: list):
- self.results = results
- def __iter__(self):
- yield from self.results
- def __str__(self):
- search = ""
- for idx, result in enumerate(self.results):
- if search:
- search += "\n\n\n"
- search += f"Title: {result.title}\n\n"
- if result.text:
- search += result.text
- else:
- search += result.snippet
- search += f"\n\nSource: [[{idx}]]({result.url})"
- return search
-
- class SearchResultEntry():
- def __init__(self, title: str, url: str, snippet: str, text: str = None):
- self.title = title
- self.url = url
- self.snippet = snippet
- self.text = text
- def set_text(self, text: str):
- self.text = text
- def scrape_text(html: str, max_words: int = None) -> str:
- soup = BeautifulSoup(html, "html.parser")
- for exclude in soup(["script", "style"]):
- exclude.extract()
- for selector in [
- "main",
- ".main-content-wrapper",
- ".main-content",
- ".emt-container-inner",
- ".content-wrapper",
- "#content",
- "#mainContent",
- ]:
- select = soup.select_one(selector)
- if select:
- soup = select
- break
- # Zdnet
- for remove in [".c-globalDisclosure"]:
- select = soup.select_one(remove)
- if select:
- select.extract()
- clean_text = ""
- for paragraph in soup.select("p"):
- text = paragraph.get_text()
- for line in text.splitlines():
- words = []
- for word in line.replace("\t", " ").split(" "):
- if word:
- words.append(word)
- count = len(words)
- if not count:
- continue
- if max_words:
- max_words -= count
- if max_words <= 0:
- break
- if clean_text:
- clean_text += "\n"
- clean_text += " ".join(words)
- return clean_text
- async def fetch_and_scrape(session: ClientSession, url: str, max_words: int = None) -> str:
- try:
- async with session.get(url) as response:
- if response.status == 200:
- html = await response.text()
- return scrape_text(html, max_words)
- except:
- return
- async def search(query: str, n_results: int = 5, max_words: int = 2500, add_text: bool = True) -> SearchResults:
- if not has_requirements:
- raise MissingRequirementsError('Install "duckduckgo-search" and "beautifulsoup4" package')
- with DDGS() as ddgs:
- results = []
- for result in ddgs.text(
- query,
- region="wt-wt",
- safesearch="moderate",
- timelimit="y",
- ):
- results.append(SearchResultEntry(
- result["title"],
- result["href"],
- result["body"]
- ))
- if len(results) >= n_results:
- break
- if add_text:
- requests = []
- async with ClientSession(timeout=ClientTimeout(5)) as session:
- for entry in results:
- requests.append(fetch_and_scrape(session, entry.url, int(max_words / (n_results - 1))))
- texts = await asyncio.gather(*requests)
- formatted_results = []
- left_words = max_words
- for i, entry in enumerate(results):
- if add_text:
- entry.text = texts[i]
- if left_words:
- left_words -= entry.title.count(" ") + 5
- if entry.text:
- left_words -= entry.text.count(" ")
- else:
- left_words -= entry.snippet.count(" ")
- if 0 > left_words:
- break
- formatted_results.append(entry)
- return SearchResults(formatted_results)
- def get_search_message(prompt) -> str:
- try:
- search_results = asyncio.run(search(prompt))
- message = f"""
- {search_results}
- Instruction: Using the provided web search results, to write a comprehensive reply to the user request.
- Make sure to add the sources of cites using [[Number]](Url) notation after the reference. Example: [[0]](http://google.com)
- If the provided search results refer to multiple subjects with the same name, write separate answers for each subject.
- User request:
- {prompt}
- """
- return message
- except Exception as e:
- print("Couldn't do web search:", e)
- return prompt
|