123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218 |
- from __future__ import annotations
- import re
- import os
- import time
- import random
- import string
- from .stubs import ChatCompletion, ChatCompletionChunk, Image, ImagesResponse
- from .typing import Union, Iterator, Messages, ImageType
- from .providers.types import BaseProvider, ProviderType
- from .image import ImageResponse as ImageProviderResponse
- from .Provider.BingCreateImages import BingCreateImages
- from .Provider.needs_auth import Gemini, OpenaiChat
- from .errors import NoImageResponseError
- from . import get_model_and_provider, get_last_provider
- ImageProvider = Union[BaseProvider, object]
- Proxies = Union[dict, str]
- IterResponse = Iterator[Union[ChatCompletion, ChatCompletionChunk]]
- def read_json(text: str) -> dict:
- """
- Parses JSON code block from a string.
- Args:
- text (str): A string containing a JSON code block.
- Returns:
- dict: A dictionary parsed from the JSON code block.
- """
- match = re.search(r"```(json|)\n(?P<code>[\S\s]+?)\n```", text)
- if match:
- return match.group("code")
- return text
- def iter_response(
- response: iter[str],
- stream: bool,
- response_format: dict = None,
- max_tokens: int = None,
- stop: list = None
- ) -> IterResponse:
- content = ""
- finish_reason = None
- completion_id = ''.join(random.choices(string.ascii_letters + string.digits, k=28))
- for idx, chunk in enumerate(response):
- content += str(chunk)
- if max_tokens is not None and idx + 1 >= max_tokens:
- finish_reason = "length"
- first = -1
- word = None
- if stop is not None:
- for word in list(stop):
- first = content.find(word)
- if first != -1:
- content = content[:first]
- break
- if stream and first != -1:
- first = chunk.find(word)
- if first != -1:
- chunk = chunk[:first]
- else:
- first = 0
- if first != -1:
- finish_reason = "stop"
- if stream:
- yield ChatCompletionChunk(chunk, None, completion_id, int(time.time()))
- if finish_reason is not None:
- break
- finish_reason = "stop" if finish_reason is None else finish_reason
- if stream:
- yield ChatCompletionChunk(None, finish_reason, completion_id, int(time.time()))
- else:
- if response_format is not None and "type" in response_format:
- if response_format["type"] == "json_object":
- content = read_json(content)
- yield ChatCompletion(content, finish_reason, completion_id, int(time.time()))
- def iter_append_model_and_provider(response: IterResponse) -> IterResponse:
- last_provider = None
- for chunk in response:
- last_provider = get_last_provider(True) if last_provider is None else last_provider
- chunk.model = last_provider.get("model")
- chunk.provider = last_provider.get("name")
- yield chunk
- class Client():
- def __init__(
- self,
- api_key: str = None,
- proxies: Proxies = None,
- provider: ProviderType = None,
- image_provider: ImageProvider = None,
- **kwargs
- ) -> None:
- self.api_key: str = api_key
- self.proxies: Proxies = proxies
- self.chat: Chat = Chat(self, provider)
- self.images: Images = Images(self, image_provider)
- def get_proxy(self) -> Union[str, None]:
- if isinstance(self.proxies, str):
- return self.proxies
- elif self.proxies is None:
- return os.environ.get("G4F_PROXY")
- elif "all" in self.proxies:
- return self.proxies["all"]
- elif "https" in self.proxies:
- return self.proxies["https"]
- def filter_none(**kwargs):
- for key in list(kwargs.keys()):
- if kwargs[key] is None:
- del kwargs[key]
- return kwargs
- class Completions():
- def __init__(self, client: Client, provider: ProviderType = None):
- self.client: Client = client
- self.provider: ProviderType = provider
- def create(
- self,
- messages: Messages,
- model: str,
- provider: ProviderType = None,
- stream: bool = False,
- response_format: dict = None,
- max_tokens: int = None,
- stop: Union[list[str], str] = None,
- api_key: str = None,
- **kwargs
- ) -> Union[ChatCompletion, Iterator[ChatCompletionChunk]]:
- model, provider = get_model_and_provider(
- model,
- self.provider if provider is None else provider,
- stream,
- **kwargs
- )
- stop = [stop] if isinstance(stop, str) else stop
- response = provider.create_completion(
- model, messages, stream,
- **filter_none(
- proxy=self.client.get_proxy(),
- max_tokens=max_tokens,
- stop=stop,
- api_key=self.client.api_key if api_key is None else api_key
- ),
- **kwargs
- )
- response = iter_response(response, stream, response_format, max_tokens, stop)
- response = iter_append_model_and_provider(response)
- return response if stream else next(response)
- class Chat():
- completions: Completions
- def __init__(self, client: Client, provider: ProviderType = None):
- self.completions = Completions(client, provider)
- class ImageModels():
- gemini = Gemini
- openai = OpenaiChat
- def __init__(self, client: Client) -> None:
- self.client = client
- self.default = BingCreateImages(proxy=self.client.get_proxy())
- def get(self, name: str, default: ImageProvider = None) -> ImageProvider:
- return getattr(self, name) if hasattr(self, name) else default or self.default
- class Images():
- def __init__(self, client: Client, provider: ImageProvider = None):
- self.client: Client = client
- self.provider: ImageProvider = provider
- self.models: ImageModels = ImageModels(client)
- def generate(self, prompt, model: str = None, **kwargs):
- provider = self.models.get(model, self.provider)
- if isinstance(provider, BaseProvider) or isinstance(provider, type) and issubclass(provider, BaseProvider):
- prompt = f"create a image: {prompt}"
- response = provider.create_completion(
- "",
- [{"role": "user", "content": prompt}],
- True,
- proxy=self.client.get_proxy(),
- **kwargs
- )
- else:
- response = provider.create(prompt)
- for chunk in response:
- if isinstance(chunk, ImageProviderResponse):
- images = [chunk.images] if isinstance(chunk.images, str) else chunk.images
- return ImagesResponse([Image(image) for image in images])
- raise NoImageResponseError()
- def create_variation(self, image: ImageType, model: str = None, **kwargs):
- provider = self.models.get(model, self.provider)
- result = None
- if isinstance(provider, type) and issubclass(provider, BaseProvider):
- response = provider.create_completion(
- "",
- [{"role": "user", "content": "create a image like this"}],
- True,
- image=image,
- proxy=self.client.get_proxy(),
- **kwargs
- )
- for chunk in response:
- if isinstance(chunk, ImageProviderResponse):
- result = ([chunk.images] if isinstance(chunk.images, str) else chunk.images)
- result = ImagesResponse([Image(image)for image in result])
- if result is None:
- raise NoImageResponseError()
- return result
|