123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101 |
- from __future__ import annotations
- import os
- import logging
- from . import debug, version
- from .models import Model
- from .client import Client, AsyncClient
- from .typing import Messages, CreateResult, AsyncResult, Union
- from .errors import StreamNotSupportedError, ModelNotAllowedError
- from .cookies import get_cookies, set_cookies
- from .providers.types import ProviderType
- from .providers.base_provider import AsyncGeneratorProvider
- from .client.service import get_model_and_provider, get_last_provider
- #Configure "g4f" logger
- logger = logging.getLogger(__name__)
- log_handler = logging.StreamHandler()
- log_handler.setFormatter(logging.Formatter(logging.BASIC_FORMAT))
- logger.addHandler(log_handler)
- logger.setLevel(logging.ERROR)
- class ChatCompletion:
- @staticmethod
- def create(model : Union[Model, str],
- messages : Messages,
- provider : Union[ProviderType, str, None] = None,
- stream : bool = False,
- auth : Union[str, None] = None,
- ignored : list[str] = None,
- ignore_working: bool = False,
- ignore_stream: bool = False,
- patch_provider: callable = None,
- **kwargs) -> Union[CreateResult, str]:
- model, provider = get_model_and_provider(
- model, provider, stream,
- ignored, ignore_working,
- ignore_stream or kwargs.get("ignore_stream_and_auth")
- )
- if auth is not None:
- kwargs['auth'] = auth
-
- if "proxy" not in kwargs:
- proxy = os.environ.get("G4F_PROXY")
- if proxy:
- kwargs['proxy'] = proxy
- if patch_provider:
- provider = patch_provider(provider)
- result = provider.create_completion(model, messages, stream=stream, **kwargs)
- return result if stream else ''.join([str(chunk) for chunk in result])
- @staticmethod
- def create_async(model : Union[Model, str],
- messages : Messages,
- provider : Union[ProviderType, str, None] = None,
- stream : bool = False,
- ignored : list[str] = None,
- ignore_working: bool = False,
- patch_provider: callable = None,
- **kwargs) -> Union[AsyncResult, str]:
- model, provider = get_model_and_provider(model, provider, False, ignored, ignore_working)
- if stream:
- if isinstance(provider, type) and issubclass(provider, AsyncGeneratorProvider):
- return provider.create_async_generator(model, messages, **kwargs)
- raise StreamNotSupportedError(f'{provider.__name__} does not support "stream" argument in "create_async"')
- if patch_provider:
- provider = patch_provider(provider)
- return provider.create_async(model, messages, **kwargs)
- class Completion:
- @staticmethod
- def create(model : Union[Model, str],
- prompt : str,
- provider : Union[ProviderType, None] = None,
- stream : bool = False,
- ignored : list[str] = None, **kwargs) -> Union[CreateResult, str]:
- allowed_models = [
- 'code-davinci-002',
- 'text-ada-001',
- 'text-babbage-001',
- 'text-curie-001',
- 'text-davinci-002',
- 'text-davinci-003'
- ]
- if model not in allowed_models:
- raise ModelNotAllowedError(f'Can\'t use {model} with Completion.create()')
- model, provider = get_model_and_provider(model, provider, stream, ignored)
- result = provider.create_completion(model, [{"role": "user", "content": prompt}], stream=stream, **kwargs)
- return result if stream else ''.join(result)
|