client.py 19 KB


  1. from __future__ import annotations
  2. import os
  3. import time
  4. import random
  5. import string
  6. import threading
  7. import asyncio
  8. import base64
  9. import aiohttp
  10. import queue
  11. from typing import Union, AsyncIterator, Iterator
  12. from ..providers.base_provider import AsyncGeneratorProvider
  13. from ..image import ImageResponse, to_image, to_data_uri
  14. from ..typing import Messages, ImageType
  15. from ..providers.types import BaseProvider, ProviderType, FinishReason
  16. from ..providers.conversation import BaseConversation
  17. from ..image import ImageResponse as ImageProviderResponse
  18. from ..errors import NoImageResponseError
  19. from .stubs import ChatCompletion, ChatCompletionChunk, Image, ImagesResponse
  20. from .image_models import ImageModels
  21. from .types import IterResponse, ImageProvider
  22. from .types import Client as BaseClient
  23. from .service import get_model_and_provider, get_last_provider
  24. from .helper import find_stop, filter_json, filter_none
  25. from ..models import ModelUtils
  26. from ..Provider import IterListProvider
  27. # Helper function to convert an async generator to a synchronous iterator
  28. def to_sync_iter(async_gen: AsyncIterator) -> Iterator:
  29. q = queue.Queue()
  30. loop = asyncio.new_event_loop()
  31. done = object()
  32. def _run():
  33. asyncio.set_event_loop(loop)
  34. async def iterate():
  35. try:
  36. async for item in async_gen:
  37. q.put(item)
  38. finally:
  39. q.put(done)
  40. loop.run_until_complete(iterate())
  41. loop.close()
  42. threading.Thread(target=_run).start()
  43. while True:
  44. item = q.get()
  45. if item is done:
  46. break
  47. yield item
  48. # Helper function to convert a synchronous iterator to an async iterator
  49. async def to_async_iterator(iterator):
  50. for item in iterator:
  51. yield item
  52. # Synchronous iter_response function
  53. def iter_response(
  54. response: Union[Iterator[str], AsyncIterator[str]],
  55. stream: bool,
  56. response_format: dict = None,
  57. max_tokens: int = None,
  58. stop: list = None
  59. ) -> Iterator[Union[ChatCompletion, ChatCompletionChunk]]:
  60. content = ""
  61. finish_reason = None
  62. completion_id = ''.join(random.choices(string.ascii_letters + string.digits, k=28))
  63. idx = 0
  64. if hasattr(response, '__aiter__'):
  65. # It's an async iterator, wrap it into a sync iterator
  66. response = to_sync_iter(response)
  67. for chunk in response:
  68. if isinstance(chunk, FinishReason):
  69. finish_reason = chunk.reason
  70. break
  71. elif isinstance(chunk, BaseConversation):
  72. yield chunk
  73. continue
  74. content += str(chunk)
  75. if max_tokens is not None and idx + 1 >= max_tokens:
  76. finish_reason = "length"
  77. first, content, chunk = find_stop(stop, content, chunk if stream else None)
  78. if first != -1:
  79. finish_reason = "stop"
  80. if stream:
  81. yield ChatCompletionChunk(chunk, None, completion_id, int(time.time()))
  82. if finish_reason is not None:
  83. break
  84. idx += 1
  85. finish_reason = "stop" if finish_reason is None else finish_reason
  86. if stream:
  87. yield ChatCompletionChunk(None, finish_reason, completion_id, int(time.time()))
  88. else:
  89. if response_format is not None and "type" in response_format:
  90. if response_format["type"] == "json_object":
  91. content = filter_json(content)
  92. yield ChatCompletion(content, finish_reason, completion_id, int(time.time()))
  93. # Synchronous iter_append_model_and_provider function
  94. def iter_append_model_and_provider(response: Iterator) -> Iterator:
  95. last_provider = None
  96. for chunk in response:
  97. last_provider = get_last_provider(True) if last_provider is None else last_provider
  98. chunk.model = last_provider.get("model")
  99. chunk.provider = last_provider.get("name")
  100. yield chunk
  101. class Client(BaseClient):
  102. def __init__(
  103. self,
  104. provider: ProviderType = None,
  105. image_provider: ImageProvider = None,
  106. **kwargs
  107. ) -> None:
  108. super().__init__(**kwargs)
  109. self.chat: Chat = Chat(self, provider)
  110. self._images: Images = Images(self, image_provider)
  111. @property
  112. def images(self) -> Images:
  113. return self._images
  114. async def async_images(self) -> Images:
  115. return self._images
  116. # For backwards compatibility and legacy purposes, use Client instead
  117. class AsyncClient(Client):
  118. """Legacy AsyncClient that redirects to the main Client class.
  119. This class exists for backwards compatibility."""
  120. def __init__(self, *args, **kwargs):
  121. import warnings
  122. warnings.warn(
  123. "AsyncClient is deprecated and will be removed in future versions."
  124. "Use Client instead, which now supports both sync and async operations.",
  125. DeprecationWarning,
  126. stacklevel=2
  127. )
  128. super().__init__(*args, **kwargs)
  129. async def async_create(self, *args, **kwargs):
  130. """Asynchronous create method that calls the synchronous method."""
  131. return await super().async_create(*args, **kwargs)
  132. async def async_generate(self, *args, **kwargs):
  133. """Asynchronous image generation method."""
  134. return await super().async_generate(*args, **kwargs)
  135. async def async_images(self) -> Images:
  136. """Asynchronous access to images."""
  137. return await super().async_images()
  138. async def async_fetch_image(self, url: str) -> bytes:
  139. """Asynchronous fetching of an image by URL."""
  140. return await self._fetch_image(url)
  141. class Completions:
  142. def __init__(self, client: Client, provider: ProviderType = None):
  143. self.client: Client = client
  144. self.provider: ProviderType = provider
  145. def create(
  146. self,
  147. messages: Messages,
  148. model: str,
  149. provider: ProviderType = None,
  150. stream: bool = False,
  151. proxy: str = None,
  152. response_format: dict = None,
  153. max_tokens: int = None,
  154. stop: Union[list[str], str] = None,
  155. api_key: str = None,
  156. ignored: list[str] = None,
  157. ignore_working: bool = False,
  158. ignore_stream: bool = False,
  159. **kwargs
  160. ) -> Union[ChatCompletion, Iterator[ChatCompletionChunk]]:
  161. model, provider = get_model_and_provider(
  162. model,
  163. self.provider if provider is None else provider,
  164. stream,
  165. ignored,
  166. ignore_working,
  167. ignore_stream,
  168. )
  169. stop = [stop] if isinstance(stop, str) else stop
  170. if asyncio.iscoroutinefunction(provider.create_completion):
  171. # Run the asynchronous function in an event loop
  172. response = asyncio.run(provider.create_completion(
  173. model,
  174. messages,
  175. stream=stream,
  176. **filter_none(
  177. proxy=self.client.get_proxy() if proxy is None else proxy,
  178. max_tokens=max_tokens,
  179. stop=stop,
  180. api_key=self.client.api_key if api_key is None else api_key
  181. ),
  182. **kwargs
  183. ))
  184. else:
  185. response = provider.create_completion(
  186. model,
  187. messages,
  188. stream=stream,
  189. **filter_none(
  190. proxy=self.client.get_proxy() if proxy is None else proxy,
  191. max_tokens=max_tokens,
  192. stop=stop,
  193. api_key=self.client.api_key if api_key is None else api_key
  194. ),
  195. **kwargs
  196. )
  197. if stream:
  198. if hasattr(response, '__aiter__'):
  199. # It's an async generator, wrap it into a sync iterator
  200. response = to_sync_iter(response)
  201. # Now 'response' is an iterator
  202. response = iter_response(response, stream, response_format, max_tokens, stop)
  203. response = iter_append_model_and_provider(response)
  204. return response
  205. else:
  206. if hasattr(response, '__aiter__'):
  207. # If response is an async generator, collect it into a list
  208. response = list(to_sync_iter(response))
  209. response = iter_response(response, stream, response_format, max_tokens, stop)
  210. response = iter_append_model_and_provider(response)
  211. return next(response)
  212. async def async_create(
  213. self,
  214. messages: Messages,
  215. model: str,
  216. provider: ProviderType = None,
  217. stream: bool = False,
  218. proxy: str = None,
  219. response_format: dict = None,
  220. max_tokens: int = None,
  221. stop: Union[list[str], str] = None,
  222. api_key: str = None,
  223. ignored: list[str] = None,
  224. ignore_working: bool = False,
  225. ignore_stream: bool = False,
  226. **kwargs
  227. ) -> Union[ChatCompletion, AsyncIterator[ChatCompletionChunk]]:
  228. model, provider = get_model_and_provider(
  229. model,
  230. self.provider if provider is None else provider,
  231. stream,
  232. ignored,
  233. ignore_working,
  234. ignore_stream,
  235. )
  236. stop = [stop] if isinstance(stop, str) else stop
  237. if asyncio.iscoroutinefunction(provider.create_completion):
  238. response = await provider.create_completion(
  239. model,
  240. messages,
  241. stream=stream,
  242. **filter_none(
  243. proxy=self.client.get_proxy() if proxy is None else proxy,
  244. max_tokens=max_tokens,
  245. stop=stop,
  246. api_key=self.client.api_key if api_key is None else api_key
  247. ),
  248. **kwargs
  249. )
  250. else:
  251. response = provider.create_completion(
  252. model,
  253. messages,
  254. stream=stream,
  255. **filter_none(
  256. proxy=self.client.get_proxy() if proxy is None else proxy,
  257. max_tokens=max_tokens,
  258. stop=stop,
  259. api_key=self.client.api_key if api_key is None else api_key
  260. ),
  261. **kwargs
  262. )
  263. # Removed 'await' here since 'async_iter_response' returns an async generator
  264. response = async_iter_response(response, stream, response_format, max_tokens, stop)
  265. response = async_iter_append_model_and_provider(response)
  266. if stream:
  267. return response
  268. else:
  269. async for result in response:
  270. return result
  271. class Chat:
  272. completions: Completions
  273. def __init__(self, client: Client, provider: ProviderType = None):
  274. self.completions = Completions(client, provider)
  275. # Asynchronous versions of the helper functions
  276. async def async_iter_response(
  277. response: Union[AsyncIterator[str], Iterator[str]],
  278. stream: bool,
  279. response_format: dict = None,
  280. max_tokens: int = None,
  281. stop: list = None
  282. ) -> AsyncIterator[Union[ChatCompletion, ChatCompletionChunk]]:
  283. content = ""
  284. finish_reason = None
  285. completion_id = ''.join(random.choices(string.ascii_letters + string.digits, k=28))
  286. idx = 0
  287. if not hasattr(response, '__aiter__'):
  288. response = to_async_iterator(response)
  289. async for chunk in response:
  290. if isinstance(chunk, FinishReason):
  291. finish_reason = chunk.reason
  292. break
  293. elif isinstance(chunk, BaseConversation):
  294. yield chunk
  295. continue
  296. content += str(chunk)
  297. if max_tokens is not None and idx + 1 >= max_tokens:
  298. finish_reason = "length"
  299. first, content, chunk = find_stop(stop, content, chunk if stream else None)
  300. if first != -1:
  301. finish_reason = "stop"
  302. if stream:
  303. yield ChatCompletionChunk(chunk, None, completion_id, int(time.time()))
  304. if finish_reason is not None:
  305. break
  306. idx += 1
  307. finish_reason = "stop" if finish_reason is None else finish_reason
  308. if stream:
  309. yield ChatCompletionChunk(None, finish_reason, completion_id, int(time.time()))
  310. else:
  311. if response_format is not None and "type" in response_format:
  312. if response_format["type"] == "json_object":
  313. content = filter_json(content)
  314. yield ChatCompletion(content, finish_reason, completion_id, int(time.time()))
  315. async def async_iter_append_model_and_provider(response: AsyncIterator) -> AsyncIterator:
  316. last_provider = None
  317. if not hasattr(response, '__aiter__'):
  318. response = to_async_iterator(response)
  319. async for chunk in response:
  320. last_provider = get_last_provider(True) if last_provider is None else last_provider
  321. chunk.model = last_provider.get("model")
  322. chunk.provider = last_provider.get("name")
  323. yield chunk
  324. async def iter_image_response(response: AsyncIterator) -> Union[ImagesResponse, None]:
  325. response_list = []
  326. async for chunk in response:
  327. if isinstance(chunk, ImageProviderResponse):
  328. response_list.extend(chunk.get_list())
  329. elif isinstance(chunk, str):
  330. response_list.append(chunk)
  331. if response_list:
  332. return ImagesResponse([Image(image) for image in response_list])
  333. return None
  334. async def create_image(client: Client, provider: ProviderType, prompt: str, model: str = "", **kwargs) -> AsyncIterator:
  335. if isinstance(provider, type) and provider.__name__ == "You":
  336. kwargs["chat_mode"] = "create"
  337. else:
  338. prompt = f"create an image with: {prompt}"
  339. if asyncio.iscoroutinefunction(provider.create_completion):
  340. response = await provider.create_completion(
  341. model,
  342. [{"role": "user", "content": prompt}],
  343. stream=True,
  344. proxy=client.get_proxy(),
  345. **kwargs
  346. )
  347. else:
  348. response = provider.create_completion(
  349. model,
  350. [{"role": "user", "content": prompt}],
  351. stream=True,
  352. proxy=client.get_proxy(),
  353. **kwargs
  354. )
  355. # Wrap synchronous iterator into async iterator if necessary
  356. if not hasattr(response, '__aiter__'):
  357. response = to_async_iterator(response)
  358. return response
  359. class Image:
  360. def __init__(self, url: str = None, b64_json: str = None):
  361. self.url = url
  362. self.b64_json = b64_json
  363. def __repr__(self):
  364. return f"Image(url={self.url}, b64_json={'<base64 data>' if self.b64_json else None})"
  365. class ImagesResponse:
  366. def __init__(self, data: list[Image]):
  367. self.data = data
  368. def __repr__(self):
  369. return f"ImagesResponse(data={self.data})"
  370. class Images:
  371. def __init__(self, client: 'Client', provider: 'ImageProvider' = None):
  372. self.client: 'Client' = client
  373. self.provider: 'ImageProvider' = provider
  374. self.models: ImageModels = ImageModels(client)
  375. def generate(self, prompt: str, model: str = None, response_format: str = "url", **kwargs) -> ImagesResponse:
  376. """
  377. Synchronous generate method that runs the async_generate method in an event loop.
  378. """
  379. return asyncio.run(self.async_generate(prompt, model, response_format=response_format, **kwargs))
  380. async def async_generate(self, prompt: str, model: str = None, response_format: str = "url", **kwargs) -> ImagesResponse:
  381. provider = self.models.get(model, self.provider)
  382. if provider is None:
  383. raise ValueError(f"Unknown model: {model}")
  384. if isinstance(provider, IterListProvider):
  385. if provider.providers:
  386. provider = provider.providers[0]
  387. else:
  388. raise ValueError(f"IterListProvider for model {model} has no providers")
  389. if isinstance(provider, type) and issubclass(provider, AsyncGeneratorProvider):
  390. messages = [{"role": "user", "content": prompt}]
  391. async for response in provider.create_async_generator(model, messages, **kwargs):
  392. if isinstance(response, ImageResponse):
  393. return await self._process_image_response(response, response_format)
  394. elif isinstance(response, str):
  395. image_response = ImageResponse([response], prompt)
  396. return await self._process_image_response(image_response, response_format)
  397. elif hasattr(provider, 'create'):
  398. if asyncio.iscoroutinefunction(provider.create):
  399. response = await provider.create(prompt)
  400. else:
  401. response = provider.create(prompt)
  402. if isinstance(response, ImageResponse):
  403. return await self._process_image_response(response, response_format)
  404. elif isinstance(response, str):
  405. image_response = ImageResponse([response], prompt)
  406. return await self._process_image_response(image_response, response_format)
  407. else:
  408. raise ValueError(f"Provider {provider} does not support image generation")
  409. raise NoImageResponseError(f"Unexpected response type: {type(response)}")
  410. async def _process_image_response(self, response: ImageResponse, response_format: str) -> ImagesResponse:
  411. processed_images = []
  412. for image_data in response.get_list():
  413. if image_data.startswith('http://') or image_data.startswith('https://'):
  414. if response_format == "url":
  415. processed_images.append(Image(url=image_data))
  416. elif response_format == "b64_json":
  417. # Fetch the image data and convert it to base64
  418. image_content = await self._fetch_image(image_data)
  419. b64_json = base64.b64encode(image_content).decode('utf-8')
  420. processed_images.append(Image(b64_json=b64_json))
  421. else:
  422. # Assume image_data is base64 data or binary
  423. if response_format == "url":
  424. if image_data.startswith('data:image'):
  425. # Remove the data URL scheme and get the base64 data
  426. header, base64_data = image_data.split(',', 1)
  427. else:
  428. base64_data = image_data
  429. # Decode the base64 data
  430. image_data_bytes = base64.b64decode(base64_data)
  431. # Convert bytes to an image
  432. image = to_image(image_data_bytes)
  433. file_name = self._save_image(image)
  434. processed_images.append(Image(url=file_name))
  435. elif response_format == "b64_json":
  436. if isinstance(image_data, bytes):
  437. b64_json = base64.b64encode(image_data).decode('utf-8')
  438. else:
  439. b64_json = image_data # If already base64-encoded string
  440. processed_images.append(Image(b64_json=b64_json))
  441. return ImagesResponse(processed_images)
  442. async def _fetch_image(self, url: str) -> bytes:
  443. # Asynchronously fetch image data from the URL
  444. async with aiohttp.ClientSession() as session:
  445. async with session.get(url) as resp:
  446. if resp.status == 200:
  447. return await resp.read()
  448. else:
  449. raise Exception(f"Failed to fetch image from {url}, status code {resp.status}")
  450. def _save_image(self, image: 'PILImage') -> str:
  451. os.makedirs('generated_images', exist_ok=True)
  452. file_name = f"generated_images/image_{int(time.time())}_{random.randint(0, 10000)}.png"
  453. image.save(file_name)
  454. return file_name
  455. async def create_variation(self, image: Union[str, bytes], model: str = None, response_format: str = "url", **kwargs):
  456. # Existing implementation, adjust if you want to support b64_json here as well
  457. pass