123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162 |
- from __future__ import annotations
- import string
- import random
- import json
- import re
- import io
- import base64
- import numpy as np
- from PIL import Image
- from aiohttp import ClientSession
- async def upload_image(
- session: ClientSession,
- image: str,
- tone: str,
- proxy: str = None
- ):
- try:
- image_config = {
- "maxImagePixels": 360000,
- "imageCompressionRate": 0.7,
- "enableFaceBlurDebug": 0,
- }
- is_data_uri_an_image(image)
- img_binary_data = extract_data_uri(image)
- is_accepted_format(img_binary_data)
- img = Image.open(io.BytesIO(img_binary_data))
- width, height = img.size
- max_image_pixels = image_config['maxImagePixels']
- if max_image_pixels / (width * height) < 1:
- new_width = int(width * np.sqrt(max_image_pixels / (width * height)))
- new_height = int(height * np.sqrt(max_image_pixels / (width * height)))
- else:
- new_width = width
- new_height = height
- try:
- orientation = get_orientation(img)
- except Exception:
- orientation = None
- new_img = process_image(orientation, img, new_width, new_height)
- new_img_binary_data = compress_image_to_base64(new_img, image_config['imageCompressionRate'])
- data, boundary = build_image_upload_api_payload(new_img_binary_data, tone)
- headers = session.headers.copy()
- headers["content-type"] = f'multipart/form-data; boundary={boundary}'
- headers["referer"] = 'https://www.bing.com/search?q=Bing+AI&showconv=1&FORM=hpcodx'
- headers["origin"] = 'https://www.bing.com'
- async with session.post("https://www.bing.com/images/kblob", data=data, headers=headers, proxy=proxy) as response:
- if response.status != 200:
- raise RuntimeError("Failed to upload image.")
- image_info = await response.json()
- if not image_info.get('blobId'):
- raise RuntimeError("Failed to parse image info.")
- result = {'bcid': image_info.get('blobId', "")}
- result['blurredBcid'] = image_info.get('processedBlobId', "")
- if result['blurredBcid'] != "":
- result["imageUrl"] = "https://www.bing.com/images/blob?bcid=" + result['blurredBcid']
- elif result['bcid'] != "":
- result["imageUrl"] = "https://www.bing.com/images/blob?bcid=" + result['bcid']
- result['originalImageUrl'] = (
- "https://www.bing.com/images/blob?bcid="
- + result['blurredBcid']
- if image_config["enableFaceBlurDebug"]
- else "https://www.bing.com/images/blob?bcid="
- + result['bcid']
- )
- return result
- except Exception as e:
- raise RuntimeError(f"Upload image failed: {e}")
-
- def build_image_upload_api_payload(image_bin: str, tone: str):
- payload = {
- 'invokedSkills': ["ImageById"],
- 'subscriptionId': "Bing.Chat.Multimodal",
- 'invokedSkillsRequestData': {
- 'enableFaceBlur': True
- },
- 'convoData': {
- 'convoid': "",
- 'convotone': tone
- }
- }
- knowledge_request = {
- 'imageInfo': {},
- 'knowledgeRequest': payload
- }
- boundary="----WebKitFormBoundary" + ''.join(random.choices(string.ascii_letters + string.digits, k=16))
- data = (
- f'--{boundary}'
- + '\r\nContent-Disposition: form-data; name="knowledgeRequest"\r\n\r\n'
- + json.dumps(knowledge_request, ensure_ascii=False)
- + "\r\n--"
- + boundary
- + '\r\nContent-Disposition: form-data; name="imageBase64"\r\n\r\n'
- + image_bin
- + "\r\n--"
- + boundary
- + "--\r\n"
- )
- return data, boundary
- def is_data_uri_an_image(data_uri: str):
- # Check if the data URI starts with 'data:image' and contains an image format (e.g., jpeg, png, gif)
- if not re.match(r'data:image/(\w+);base64,', data_uri):
- raise ValueError("Invalid data URI image.")
- # Extract the image format from the data URI
- image_format = re.match(r'data:image/(\w+);base64,', data_uri).group(1)
- # Check if the image format is one of the allowed formats (jpg, jpeg, png, gif)
- if image_format.lower() not in ['jpeg', 'jpg', 'png', 'gif']:
- raise ValueError("Invalid image format (from mime file type).")
- def is_accepted_format(binary_data: bytes) -> bool:
- if binary_data.startswith(b'\xFF\xD8\xFF'):
- pass # It's a JPEG image
- elif binary_data.startswith(b'\x89PNG\r\n\x1a\n'):
- pass # It's a PNG image
- elif binary_data.startswith(b'GIF87a') or binary_data.startswith(b'GIF89a'):
- pass # It's a GIF image
- elif binary_data.startswith(b'\x89JFIF') or binary_data.startswith(b'JFIF\x00'):
- pass # It's a JPEG image
- elif binary_data.startswith(b'\xFF\xD8'):
- pass # It's a JPEG image
- elif binary_data.startswith(b'RIFF') and binary_data[8:12] == b'WEBP':
- pass # It's a WebP image
- else:
- raise ValueError("Invalid image format (from magic code).")
-
- def extract_data_uri(data_uri: str) -> bytes:
- data = data_uri.split(",")[1]
- data = base64.b64decode(data)
- return data
- def get_orientation(data: bytes) -> int:
- if data[:2] != b'\xFF\xD8':
- raise Exception('NotJpeg')
- with Image.open(data) as img:
- exif_data = img._getexif()
- if exif_data is not None:
- orientation = exif_data.get(274) # 274 corresponds to the orientation tag in EXIF
- if orientation is not None:
- return orientation
- def process_image(orientation: int, img: Image.Image, new_width: int, new_height: int) -> Image.Image:
- # Initialize the canvas
- new_img = Image.new("RGB", (new_width, new_height), color="#FFFFFF")
- if orientation:
- if orientation > 4:
- img = img.transpose(Image.FLIP_LEFT_RIGHT)
- if orientation in [3, 4]:
- img = img.transpose(Image.ROTATE_180)
- if orientation in [5, 6]:
- img = img.transpose(Image.ROTATE_270)
- if orientation in [7, 8]:
- img = img.transpose(Image.ROTATE_90)
- new_img.paste(img, (0, 0))
- return new_img
-
- def compress_image_to_base64(image: Image.Image, compression_rate: float) -> str:
- output_buffer = io.BytesIO()
- image.save(output_buffer, format="JPEG", quality=int(compression_rate * 100))
- return base64.b64encode(output_buffer.getvalue()).decode('utf-8')
|