123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152 |
- import asyncio
- import time, json, os
- from aiohttp import ClientSession
- from bs4 import BeautifulSoup
- from urllib.parse import quote
- from typing import Generator
- from ...webdriver import WebDriver, get_driver_cookies, get_browser
- from ...Provider.helper import get_event_loop
- from ...base_provider import ProviderType
- from ...Provider.create_images import CreateImagesProvider
- BING_URL = "https://www.bing.com"
- def wait_for_login(driver: WebDriver, timeout: int = 1200) -> Generator:
- driver.get(f"{BING_URL}/")
- value = driver.get_cookie("_U")
- if value:
- return
- login_url = os.environ.get("G4F_LOGIN_URL")
- if login_url:
- yield f"Please login: [Bing]({login_url})\n\n"
- start_time = time.time()
- while True:
- if time.time() - start_time > timeout:
- raise RuntimeError("Timeout error")
- value = driver.get_cookie("_U")
- if value:
- return
- time.sleep(0.1)
- def create_session(cookies: dict) -> ClientSession:
- headers = {
- "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
- "accept-encoding": "gzip, deflate, br",
- "accept-language": "en-US,en;q=0.9,zh-CN;q=0.8,zh-TW;q=0.7,zh;q=0.6",
- "content-type": "application/x-www-form-urlencoded",
- "referrer-policy": "origin-when-cross-origin",
- "referrer": "https://www.bing.com/images/create/",
- "origin": "https://www.bing.com",
- "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36 Edg/111.0.1661.54",
- "sec-ch-ua": "\"Microsoft Edge\";v=\"111\", \"Not(A:Brand\";v=\"8\", \"Chromium\";v=\"111\"",
- "sec-ch-ua-mobile": "?0",
- "sec-fetch-dest": "document",
- "sec-fetch-mode": "navigate",
- "sec-fetch-site": "same-origin",
- "sec-fetch-user": "?1",
- "upgrade-insecure-requests": "1",
- }
- if cookies:
- headers["cookie"] = "; ".join(f"{k}={v}" for k, v in cookies.items())
- return ClientSession(headers=headers)
- async def create_images(session: ClientSession, prompt: str, proxy: str = None, timeout: int = 200) -> list:
- url_encoded_prompt = quote(prompt)
- payload = f"q={url_encoded_prompt}&rt=4&FORM=GENCRE"
- url = f"{BING_URL}/images/create?q={url_encoded_prompt}&rt=4&FORM=GENCRE"
- async with session.post(
- url,
- allow_redirects=False,
- data=payload,
- timeout=timeout,
- ) as response:
- response.raise_for_status()
- errors = [
- "this prompt is being reviewed",
- "this prompt has been blocked",
- "we're working hard to offer image creator in more languages"
- ]
- text = (await response.text()).lower()
- for error in errors:
- if error in text:
- raise RuntimeError(f"Create images failed: {error}")
- if response.status != 302:
- url = f"{BING_URL}/images/create?q={url_encoded_prompt}&rt=3&FORM=GENCRE"
- async with session.post(url, allow_redirects=False, proxy=proxy, timeout=timeout) as response:
- if response.status != 302:
- raise RuntimeError(f"Create images failed. Status Code: {response.status}")
- redirect_url = response.headers["Location"].replace("&nfy=1", "")
- redirect_url = f"{BING_URL}{redirect_url}"
- request_id = redirect_url.split("id=")[1]
- async with session.get(redirect_url) as response:
- response.raise_for_status()
- polling_url = f"{BING_URL}/images/create/async/results/{request_id}?q={url_encoded_prompt}"
- start_time = time.time()
- while True:
- if time.time() - start_time > timeout:
- raise RuntimeError("Timeout error")
- async with session.get(polling_url) as response:
- if response.status != 200:
- raise RuntimeError(f"Polling images faild. Status Code: {response.status}")
- text = await response.text()
- if not text:
- await asyncio.sleep(1)
- else:
- break
- error = None
- try:
- error = json.loads(text).get("errorMessage")
- except:
- pass
- if error == "Pending":
- raise RuntimeError("Prompt is been blocked")
- elif error:
- raise RuntimeError(f"Error: {error}")
- return get_images(text)
- def format_images_markdown(images: list, prompt: str) -> str:
- images = [f"[![#{idx+1} {prompt}]({image}?w=200&h=200)]({image})" for idx, image in enumerate(images)]
- images = "\n".join(images)
- start_flag = "<!-- generated images start -->\n"
- end_flag = "<!-- generated images end -->\n"
- return f"\n\n<img data-prompt=\"{prompt}\">\n{start_flag}{images}\n{end_flag}\n"
- def get_images(text: str) -> list:
- html_soup = BeautifulSoup(text, "html.parser")
- tags = html_soup.find_all("img")
- image_links = [img["src"] for img in tags if "mimg" in img["class"]]
- images = [link.split("?w=")[0] for link in image_links]
- bad_images = [
- "https://r.bing.com/rp/in-2zU3AJUdkgFe7ZKv19yPBHVs.png",
- "https://r.bing.com/rp/TX9QuO3WzcCJz1uaaSwQAz39Kb0.jpg",
- ]
- if any(im in bad_images for im in images):
- raise RuntimeError("Bad images found")
- if not images:
- raise RuntimeError("No images found")
- return images
- def create_completion(prompt: str, proxy: str = None) -> Generator:
- loop = get_event_loop()
- driver = get_browser(proxy=proxy)
- async def run_session():
- cookies = get_driver_cookies(driver)
- session = create_session(cookies)
- try:
- return await create_images(session, prompt, proxy)
- finally:
- await session.close()
- try:
- yield from wait_for_login(driver)
- images = loop.run_until_complete(run_session())
- yield format_images_markdown(images, prompt)
- finally:
- driver.quit()
- def patch_provider(provider: ProviderType) -> CreateImagesProvider:
- return CreateImagesProvider(provider, create_completion)
|