create_images.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. import asyncio
  2. import time, json, os
  3. from aiohttp import ClientSession
  4. from bs4 import BeautifulSoup
  5. from urllib.parse import quote
  6. from typing import Generator
  7. from ...webdriver import WebDriver, get_driver_cookies, get_browser
  8. from ...Provider.helper import get_event_loop
  9. from ...base_provider import ProviderType
  10. from ...Provider.create_images import CreateImagesProvider
  11. BING_URL = "https://www.bing.com"
  12. def wait_for_login(driver: WebDriver, timeout: int = 1200) -> Generator:
  13. driver.get(f"{BING_URL}/")
  14. value = driver.get_cookie("_U")
  15. if value:
  16. return
  17. login_url = os.environ.get("G4F_LOGIN_URL")
  18. if login_url:
  19. yield f"Please login: [Bing]({login_url})\n\n"
  20. start_time = time.time()
  21. while True:
  22. if time.time() - start_time > timeout:
  23. raise RuntimeError("Timeout error")
  24. value = driver.get_cookie("_U")
  25. if value:
  26. return
  27. time.sleep(0.1)
  28. def create_session(cookies: dict) -> ClientSession:
  29. headers = {
  30. "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
  31. "accept-encoding": "gzip, deflate, br",
  32. "accept-language": "en-US,en;q=0.9,zh-CN;q=0.8,zh-TW;q=0.7,zh;q=0.6",
  33. "content-type": "application/x-www-form-urlencoded",
  34. "referrer-policy": "origin-when-cross-origin",
  35. "referrer": "https://www.bing.com/images/create/",
  36. "origin": "https://www.bing.com",
  37. "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36 Edg/111.0.1661.54",
  38. "sec-ch-ua": "\"Microsoft Edge\";v=\"111\", \"Not(A:Brand\";v=\"8\", \"Chromium\";v=\"111\"",
  39. "sec-ch-ua-mobile": "?0",
  40. "sec-fetch-dest": "document",
  41. "sec-fetch-mode": "navigate",
  42. "sec-fetch-site": "same-origin",
  43. "sec-fetch-user": "?1",
  44. "upgrade-insecure-requests": "1",
  45. }
  46. if cookies:
  47. headers["cookie"] = "; ".join(f"{k}={v}" for k, v in cookies.items())
  48. return ClientSession(headers=headers)
  49. async def create_images(session: ClientSession, prompt: str, proxy: str = None, timeout: int = 200) -> list:
  50. url_encoded_prompt = quote(prompt)
  51. payload = f"q={url_encoded_prompt}&rt=4&FORM=GENCRE"
  52. url = f"{BING_URL}/images/create?q={url_encoded_prompt}&rt=4&FORM=GENCRE"
  53. async with session.post(
  54. url,
  55. allow_redirects=False,
  56. data=payload,
  57. timeout=timeout,
  58. ) as response:
  59. response.raise_for_status()
  60. errors = [
  61. "this prompt is being reviewed",
  62. "this prompt has been blocked",
  63. "we're working hard to offer image creator in more languages"
  64. ]
  65. text = (await response.text()).lower()
  66. for error in errors:
  67. if error in text:
  68. raise RuntimeError(f"Create images failed: {error}")
  69. if response.status != 302:
  70. url = f"{BING_URL}/images/create?q={url_encoded_prompt}&rt=3&FORM=GENCRE"
  71. async with session.post(url, allow_redirects=False, proxy=proxy, timeout=timeout) as response:
  72. if response.status != 302:
  73. raise RuntimeError(f"Create images failed. Status Code: {response.status}")
  74. redirect_url = response.headers["Location"].replace("&nfy=1", "")
  75. redirect_url = f"{BING_URL}{redirect_url}"
  76. request_id = redirect_url.split("id=")[1]
  77. async with session.get(redirect_url) as response:
  78. response.raise_for_status()
  79. polling_url = f"{BING_URL}/images/create/async/results/{request_id}?q={url_encoded_prompt}"
  80. start_time = time.time()
  81. while True:
  82. if time.time() - start_time > timeout:
  83. raise RuntimeError("Timeout error")
  84. async with session.get(polling_url) as response:
  85. if response.status != 200:
  86. raise RuntimeError(f"Polling images faild. Status Code: {response.status}")
  87. text = await response.text()
  88. if not text:
  89. await asyncio.sleep(1)
  90. else:
  91. break
  92. error = None
  93. try:
  94. error = json.loads(text).get("errorMessage")
  95. except:
  96. pass
  97. if error == "Pending":
  98. raise RuntimeError("Prompt is been blocked")
  99. elif error:
  100. raise RuntimeError(f"Error: {error}")
  101. return get_images(text)
  102. def format_images_markdown(images: list, prompt: str) -> str:
  103. images = [f"[![#{idx+1} {prompt}]({image}?w=200&h=200)]({image})" for idx, image in enumerate(images)]
  104. images = "\n".join(images)
  105. start_flag = "<!-- generated images start -->\n"
  106. end_flag = "<!-- generated images end -->\n"
  107. return f"\n\n<img data-prompt=\"{prompt}\">\n{start_flag}{images}\n{end_flag}\n"
  108. def get_images(text: str) -> list:
  109. html_soup = BeautifulSoup(text, "html.parser")
  110. tags = html_soup.find_all("img")
  111. image_links = [img["src"] for img in tags if "mimg" in img["class"]]
  112. images = [link.split("?w=")[0] for link in image_links]
  113. bad_images = [
  114. "https://r.bing.com/rp/in-2zU3AJUdkgFe7ZKv19yPBHVs.png",
  115. "https://r.bing.com/rp/TX9QuO3WzcCJz1uaaSwQAz39Kb0.jpg",
  116. ]
  117. if any(im in bad_images for im in images):
  118. raise RuntimeError("Bad images found")
  119. if not images:
  120. raise RuntimeError("No images found")
  121. return images
  122. def create_completion(prompt: str, proxy: str = None) -> Generator:
  123. loop = get_event_loop()
  124. driver = get_browser(proxy=proxy)
  125. async def run_session():
  126. cookies = get_driver_cookies(driver)
  127. session = create_session(cookies)
  128. try:
  129. return await create_images(session, prompt, proxy)
  130. finally:
  131. await session.close()
  132. try:
  133. yield from wait_for_login(driver)
  134. images = loop.run_until_complete(run_session())
  135. yield format_images_markdown(images, prompt)
  136. finally:
  137. driver.quit()
  138. def patch_provider(provider: ProviderType) -> CreateImagesProvider:
  139. return CreateImagesProvider(provider, create_completion)