upload_image.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. from __future__ import annotations
  2. import string
  3. import random
  4. import json
  5. import re
  6. import io
  7. import base64
  8. import numpy as np
  9. from PIL import Image
  10. from aiohttp import ClientSession
  11. async def upload_image(
  12. session: ClientSession,
  13. image: str,
  14. tone: str,
  15. proxy: str = None
  16. ):
  17. try:
  18. image_config = {
  19. "maxImagePixels": 360000,
  20. "imageCompressionRate": 0.7,
  21. "enableFaceBlurDebug": 0,
  22. }
  23. is_data_uri_an_image(image)
  24. img_binary_data = extract_data_uri(image)
  25. is_accepted_format(img_binary_data)
  26. img = Image.open(io.BytesIO(img_binary_data))
  27. width, height = img.size
  28. max_image_pixels = image_config['maxImagePixels']
  29. if max_image_pixels / (width * height) < 1:
  30. new_width = int(width * np.sqrt(max_image_pixels / (width * height)))
  31. new_height = int(height * np.sqrt(max_image_pixels / (width * height)))
  32. else:
  33. new_width = width
  34. new_height = height
  35. try:
  36. orientation = get_orientation(img)
  37. except Exception:
  38. orientation = None
  39. new_img = process_image(orientation, img, new_width, new_height)
  40. new_img_binary_data = compress_image_to_base64(new_img, image_config['imageCompressionRate'])
  41. data, boundary = build_image_upload_api_payload(new_img_binary_data, tone)
  42. headers = session.headers.copy()
  43. headers["content-type"] = f'multipart/form-data; boundary={boundary}'
  44. headers["referer"] = 'https://www.bing.com/search?q=Bing+AI&showconv=1&FORM=hpcodx'
  45. headers["origin"] = 'https://www.bing.com'
  46. async with session.post("https://www.bing.com/images/kblob", data=data, headers=headers, proxy=proxy) as response:
  47. if response.status != 200:
  48. raise RuntimeError("Failed to upload image.")
  49. image_info = await response.json()
  50. if not image_info.get('blobId'):
  51. raise RuntimeError("Failed to parse image info.")
  52. result = {'bcid': image_info.get('blobId', "")}
  53. result['blurredBcid'] = image_info.get('processedBlobId', "")
  54. if result['blurredBcid'] != "":
  55. result["imageUrl"] = "https://www.bing.com/images/blob?bcid=" + result['blurredBcid']
  56. elif result['bcid'] != "":
  57. result["imageUrl"] = "https://www.bing.com/images/blob?bcid=" + result['bcid']
  58. result['originalImageUrl'] = (
  59. "https://www.bing.com/images/blob?bcid="
  60. + result['blurredBcid']
  61. if image_config["enableFaceBlurDebug"]
  62. else "https://www.bing.com/images/blob?bcid="
  63. + result['bcid']
  64. )
  65. return result
  66. except Exception as e:
  67. raise RuntimeError(f"Upload image failed: {e}")
  68. def build_image_upload_api_payload(image_bin: str, tone: str):
  69. payload = {
  70. 'invokedSkills': ["ImageById"],
  71. 'subscriptionId': "Bing.Chat.Multimodal",
  72. 'invokedSkillsRequestData': {
  73. 'enableFaceBlur': True
  74. },
  75. 'convoData': {
  76. 'convoid': "",
  77. 'convotone': tone
  78. }
  79. }
  80. knowledge_request = {
  81. 'imageInfo': {},
  82. 'knowledgeRequest': payload
  83. }
  84. boundary="----WebKitFormBoundary" + ''.join(random.choices(string.ascii_letters + string.digits, k=16))
  85. data = (
  86. f'--{boundary}'
  87. + '\r\nContent-Disposition: form-data; name="knowledgeRequest"\r\n\r\n'
  88. + json.dumps(knowledge_request, ensure_ascii=False)
  89. + "\r\n--"
  90. + boundary
  91. + '\r\nContent-Disposition: form-data; name="imageBase64"\r\n\r\n'
  92. + image_bin
  93. + "\r\n--"
  94. + boundary
  95. + "--\r\n"
  96. )
  97. return data, boundary
  98. def is_data_uri_an_image(data_uri: str):
  99. # Check if the data URI starts with 'data:image' and contains an image format (e.g., jpeg, png, gif)
  100. if not re.match(r'data:image/(\w+);base64,', data_uri):
  101. raise ValueError("Invalid data URI image.")
  102. # Extract the image format from the data URI
  103. image_format = re.match(r'data:image/(\w+);base64,', data_uri).group(1)
  104. # Check if the image format is one of the allowed formats (jpg, jpeg, png, gif)
  105. if image_format.lower() not in ['jpeg', 'jpg', 'png', 'gif']:
  106. raise ValueError("Invalid image format (from mime file type).")
  107. def is_accepted_format(binary_data: bytes) -> bool:
  108. if binary_data.startswith(b'\xFF\xD8\xFF'):
  109. pass # It's a JPEG image
  110. elif binary_data.startswith(b'\x89PNG\r\n\x1a\n'):
  111. pass # It's a PNG image
  112. elif binary_data.startswith(b'GIF87a') or binary_data.startswith(b'GIF89a'):
  113. pass # It's a GIF image
  114. elif binary_data.startswith(b'\x89JFIF') or binary_data.startswith(b'JFIF\x00'):
  115. pass # It's a JPEG image
  116. elif binary_data.startswith(b'\xFF\xD8'):
  117. pass # It's a JPEG image
  118. elif binary_data.startswith(b'RIFF') and binary_data[8:12] == b'WEBP':
  119. pass # It's a WebP image
  120. else:
  121. raise ValueError("Invalid image format (from magic code).")
  122. def extract_data_uri(data_uri: str) -> bytes:
  123. data = data_uri.split(",")[1]
  124. data = base64.b64decode(data)
  125. return data
  126. def get_orientation(data: bytes) -> int:
  127. if data[:2] != b'\xFF\xD8':
  128. raise Exception('NotJpeg')
  129. with Image.open(data) as img:
  130. exif_data = img._getexif()
  131. if exif_data is not None:
  132. orientation = exif_data.get(274) # 274 corresponds to the orientation tag in EXIF
  133. if orientation is not None:
  134. return orientation
  135. def process_image(orientation: int, img: Image.Image, new_width: int, new_height: int) -> Image.Image:
  136. # Initialize the canvas
  137. new_img = Image.new("RGB", (new_width, new_height), color="#FFFFFF")
  138. if orientation:
  139. if orientation > 4:
  140. img = img.transpose(Image.FLIP_LEFT_RIGHT)
  141. if orientation in [3, 4]:
  142. img = img.transpose(Image.ROTATE_180)
  143. if orientation in [5, 6]:
  144. img = img.transpose(Image.ROTATE_270)
  145. if orientation in [7, 8]:
  146. img = img.transpose(Image.ROTATE_90)
  147. new_img.paste(img, (0, 0))
  148. return new_img
  149. def compress_image_to_base64(image: Image.Image, compression_rate: float) -> str:
  150. output_buffer = io.BytesIO()
  151. image.save(output_buffer, format="JPEG", quality=int(compression_rate * 100))
  152. return base64.b64encode(output_buffer.getvalue()).decode('utf-8')