123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199 |
- import ast
- import logging
- import time
- import json
- import random
- import string
- import uvicorn
- import nest_asyncio
- from fastapi import FastAPI, Response, Request
- from fastapi.responses import StreamingResponse
- from typing import List, Union, Any, Dict, AnyStr
- #from ._tokenizer import tokenize
- import g4f
- from .. import debug
- debug.logging = True
- class Api:
- def __init__(self, engine: g4f, debug: bool = True, sentry: bool = False,
- list_ignored_providers: List[str] = None) -> None:
- self.engine = engine
- self.debug = debug
- self.sentry = sentry
- self.list_ignored_providers = list_ignored_providers
- self.app = FastAPI()
- nest_asyncio.apply()
- JSONObject = Dict[AnyStr, Any]
- JSONArray = List[Any]
- JSONStructure = Union[JSONArray, JSONObject]
- @self.app.get("/")
- async def read_root():
- return Response(content=json.dumps({"info": "g4f API"}, indent=4), media_type="application/json")
- @self.app.get("/v1")
- async def read_root_v1():
- return Response(content=json.dumps({"info": "Go to /v1/chat/completions or /v1/models."}, indent=4), media_type="application/json")
- @self.app.get("/v1/models")
- async def models():
- model_list = []
- for model in g4f.Model.__all__():
- model_info = (g4f.ModelUtils.convert[model])
- model_list.append({
- 'id': model,
- 'object': 'model',
- 'created': 0,
- 'owned_by': model_info.base_provider}
- )
- return Response(content=json.dumps({
- 'object': 'list',
- 'data': model_list}, indent=4), media_type="application/json")
- @self.app.get("/v1/models/{model_name}")
- async def model_info(model_name: str):
- try:
- model_info = (g4f.ModelUtils.convert[model_name])
- return Response(content=json.dumps({
- 'id': model_name,
- 'object': 'model',
- 'created': 0,
- 'owned_by': model_info.base_provider
- }, indent=4), media_type="application/json")
- except:
- return Response(content=json.dumps({"error": "The model does not exist."}, indent=4), media_type="application/json")
- @self.app.post("/v1/chat/completions")
- async def chat_completions(request: Request, item: JSONStructure = None):
- item_data = {
- 'model': 'gpt-3.5-turbo',
- 'stream': False,
- }
- # item contains byte keys, and dict.get suppresses error
- item_data.update({
- key.decode('utf-8') if isinstance(key, bytes) else key: str(value)
- for key, value in (item or {}).items()
- })
- # messages is str, need dict
- if isinstance(item_data.get('messages'), str):
- item_data['messages'] = ast.literal_eval(item_data.get('messages'))
- model = item_data.get('model')
- stream = True if item_data.get("stream") == "True" else False
- messages = item_data.get('messages')
- provider = item_data.get('provider', '').replace('g4f.Provider.', '')
- provider = provider if provider and provider != "Auto" else None
- try:
- response = g4f.ChatCompletion.create(
- model=model,
- stream=stream,
- messages=messages,
- provider = provider,
- ignored=self.list_ignored_providers
- )
- except Exception as e:
- logging.exception(e)
- content = json.dumps({
- "error": {"message": f"An error occurred while generating the response:\n{e}"},
- "model": model,
- "provider": g4f.get_last_provider(True)
- })
- return Response(content=content, status_code=500, media_type="application/json")
- completion_id = ''.join(random.choices(string.ascii_letters + string.digits, k=28))
- completion_timestamp = int(time.time())
- if not stream:
- #prompt_tokens, _ = tokenize(''.join([message['content'] for message in messages]))
- #completion_tokens, _ = tokenize(response)
- json_data = {
- 'id': f'chatcmpl-{completion_id}',
- 'object': 'chat.completion',
- 'created': completion_timestamp,
- 'model': model,
- 'provider': g4f.get_last_provider(True),
- 'choices': [
- {
- 'index': 0,
- 'message': {
- 'role': 'assistant',
- 'content': response,
- },
- 'finish_reason': 'stop',
- }
- ],
- 'usage': {
- 'prompt_tokens': 0, #prompt_tokens,
- 'completion_tokens': 0, #completion_tokens,
- 'total_tokens': 0, #prompt_tokens + completion_tokens,
- },
- }
- return Response(content=json.dumps(json_data, indent=4), media_type="application/json")
- def streaming():
- try:
- for chunk in response:
- completion_data = {
- 'id': f'chatcmpl-{completion_id}',
- 'object': 'chat.completion.chunk',
- 'created': completion_timestamp,
- 'model': model,
- 'provider': g4f.get_last_provider(True),
- 'choices': [
- {
- 'index': 0,
- 'delta': {
- 'role': 'assistant',
- 'content': chunk,
- },
- 'finish_reason': None,
- }
- ],
- }
- yield f'data: {json.dumps(completion_data)}\n\n'
- time.sleep(0.03)
- end_completion_data = {
- 'id': f'chatcmpl-{completion_id}',
- 'object': 'chat.completion.chunk',
- 'created': completion_timestamp,
- 'model': model,
- 'provider': g4f.get_last_provider(True),
- 'choices': [
- {
- 'index': 0,
- 'delta': {},
- 'finish_reason': 'stop',
- }
- ],
- }
- yield f'data: {json.dumps(end_completion_data)}\n\n'
- except GeneratorExit:
- pass
- except Exception as e:
- logging.exception(e)
- content = json.dumps({
- "error": {"message": f"An error occurred while generating the response:\n{e}"},
- "model": model,
- "provider": g4f.get_last_provider(True),
- })
- yield f'data: {content}'
- return StreamingResponse(streaming(), media_type="text/event-stream")
- @self.app.post("/v1/completions")
- async def completions():
- return Response(content=json.dumps({'info': 'Not working yet.'}, indent=4), media_type="application/json")
- def run(self, ip):
- split_ip = ip.split(":")
- uvicorn.run(app=self.app, host=split_ip[0], port=int(split_ip[1]), use_colors=False)
|