123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182 |
- from __future__ import annotations
- import asyncio
- import time
- import json
- from aiohttp import ClientSession, BaseConnector
- from urllib.parse import quote
- from typing import List, Dict
- try:
- from bs4 import BeautifulSoup
- has_requirements = True
- except ImportError:
- has_requirements = False
- from ..helper import get_connector
- from ...errors import MissingRequirementsError, RateLimitError
- from ...webdriver import WebDriver, get_driver_cookies, get_browser
- BING_URL = "https://www.bing.com"
- TIMEOUT_LOGIN = 1200
- TIMEOUT_IMAGE_CREATION = 300
- ERRORS = [
- "this prompt is being reviewed",
- "this prompt has been blocked",
- "we're working hard to offer image creator in more languages",
- "we can't create your images right now"
- ]
- BAD_IMAGES = [
- "https://r.bing.com/rp/in-2zU3AJUdkgFe7ZKv19yPBHVs.png",
- "https://r.bing.com/rp/TX9QuO3WzcCJz1uaaSwQAz39Kb0.jpg",
- ]
- def wait_for_login(driver: WebDriver, timeout: int = TIMEOUT_LOGIN) -> None:
- """
- Waits for the user to log in within a given timeout period.
- Args:
- driver (WebDriver): Webdriver for browser automation.
- timeout (int): Maximum waiting time in seconds.
- Raises:
- RuntimeError: If the login process exceeds the timeout.
- """
- driver.get(f"{BING_URL}/")
- start_time = time.time()
- while not driver.get_cookie("_U"):
- if time.time() - start_time > timeout:
- raise RuntimeError("Timeout error")
- time.sleep(0.5)
- def get_cookies_from_browser(proxy: str = None) -> dict[str, str]:
- """
- Retrieves cookies from the browser using webdriver.
- Args:
- proxy (str, optional): Proxy configuration.
- Returns:
- dict[str, str]: Retrieved cookies.
- """
- with get_browser(proxy=proxy) as driver:
- wait_for_login(driver)
- time.sleep(1)
- return get_driver_cookies(driver)
- def create_session(cookies: Dict[str, str], proxy: str = None, connector: BaseConnector = None) -> ClientSession:
- """
- Creates a new client session with specified cookies and headers.
- Args:
- cookies (Dict[str, str]): Cookies to be used for the session.
- Returns:
- ClientSession: The created client session.
- """
- headers = {
- "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
- "accept-encoding": "gzip, deflate, br",
- "accept-language": "en-US,en;q=0.9,zh-CN;q=0.8,zh-TW;q=0.7,zh;q=0.6",
- "content-type": "application/x-www-form-urlencoded",
- "referrer-policy": "origin-when-cross-origin",
- "referrer": "https://www.bing.com/images/create/",
- "origin": "https://www.bing.com",
- "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36 Edg/111.0.1661.54",
- "sec-ch-ua": "\"Microsoft Edge\";v=\"111\", \"Not(A:Brand\";v=\"8\", \"Chromium\";v=\"111\"",
- "sec-ch-ua-mobile": "?0",
- "sec-fetch-dest": "document",
- "sec-fetch-mode": "navigate",
- "sec-fetch-site": "same-origin",
- "sec-fetch-user": "?1",
- "upgrade-insecure-requests": "1",
- }
- if cookies:
- headers["Cookie"] = "; ".join(f"{k}={v}" for k, v in cookies.items())
- return ClientSession(headers=headers, connector=get_connector(connector, proxy))
- async def create_images(session: ClientSession, prompt: str, timeout: int = TIMEOUT_IMAGE_CREATION) -> List[str]:
- """
- Creates images based on a given prompt using Bing's service.
- Args:
- session (ClientSession): Active client session.
- prompt (str): Prompt to generate images.
- proxy (str, optional): Proxy configuration.
- timeout (int): Timeout for the request.
- Returns:
- List[str]: A list of URLs to the created images.
- Raises:
- RuntimeError: If image creation fails or times out.
- """
- if not has_requirements:
- raise MissingRequirementsError('Install "beautifulsoup4" package')
- url_encoded_prompt = quote(prompt)
- payload = f"q={url_encoded_prompt}&rt=4&FORM=GENCRE"
- url = f"{BING_URL}/images/create?q={url_encoded_prompt}&rt=4&FORM=GENCRE"
- async with session.post(url, allow_redirects=False, data=payload, timeout=timeout) as response:
- response.raise_for_status()
- text = (await response.text()).lower()
- if "0 coins available" in text:
- raise RateLimitError("No coins left. Log in with a different account or wait a while")
- for error in ERRORS:
- if error in text:
- raise RuntimeError(f"Create images failed: {error}")
- if response.status != 302:
- url = f"{BING_URL}/images/create?q={url_encoded_prompt}&rt=3&FORM=GENCRE"
- async with session.post(url, allow_redirects=False, timeout=timeout) as response:
- if response.status != 302:
- raise RuntimeError(f"Create images failed. Code: {response.status}")
- redirect_url = response.headers["Location"].replace("&nfy=1", "")
- redirect_url = f"{BING_URL}{redirect_url}"
- request_id = redirect_url.split("id=")[1]
- async with session.get(redirect_url) as response:
- response.raise_for_status()
- polling_url = f"{BING_URL}/images/create/async/results/{request_id}?q={url_encoded_prompt}"
- start_time = time.time()
- while True:
- if time.time() - start_time > timeout:
- raise RuntimeError(f"Timeout error after {timeout} sec")
- async with session.get(polling_url) as response:
- if response.status != 200:
- raise RuntimeError(f"Polling images faild. Code: {response.status}")
- text = await response.text()
- if not text or "GenerativeImagesStatusPage" in text:
- await asyncio.sleep(1)
- else:
- break
- error = None
- try:
- error = json.loads(text).get("errorMessage")
- except:
- pass
- if error == "Pending":
- raise RuntimeError("Prompt is been blocked")
- elif error:
- raise RuntimeError(error)
- return read_images(text)
- def read_images(html_content: str) -> List[str]:
- """
- Extracts image URLs from the HTML content.
- Args:
- html_content (str): HTML content containing image URLs.
- Returns:
- List[str]: A list of image URLs.
- """
- soup = BeautifulSoup(html_content, "html.parser")
- tags = soup.find_all("img", class_="mimg")
- if not tags:
- tags = soup.find_all("img", class_="gir_mmimg")
- images = [img["src"].split("?w=")[0] for img in tags]
- if any(im in BAD_IMAGES for im in images):
- raise RuntimeError("Bad images found")
- if not images:
- raise RuntimeError("No images found")
- return images
|