You.py 1.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041
  1. from __future__ import annotations
  2. import json
  3. from ..requests import StreamSession
  4. from ..typing import AsyncGenerator, Messages
  5. from .base_provider import AsyncGeneratorProvider, format_prompt
  6. class You(AsyncGeneratorProvider):
  7. url = "https://you.com"
  8. working = True
  9. supports_gpt_35_turbo = True
  10. @classmethod
  11. async def create_async_generator(
  12. cls,
  13. model: str,
  14. messages: Messages,
  15. proxy: str = None,
  16. timeout: int = 120,
  17. **kwargs,
  18. ) -> AsyncGenerator:
  19. async with StreamSession(proxies={"https": proxy}, impersonate="chrome107", timeout=timeout) as session:
  20. headers = {
  21. "Accept": "text/event-stream",
  22. "Referer": f"{cls.url}/search?fromSearchBar=true&tbm=youchat",
  23. }
  24. data = {"q": format_prompt(messages), "domain": "youchat", "chat": ""}
  25. async with session.get(
  26. f"{cls.url}/api/streamingSearch",
  27. params=data,
  28. headers=headers
  29. ) as response:
  30. response.raise_for_status()
  31. start = b'data: {"youChatToken": '
  32. async for line in response.iter_lines():
  33. if line.startswith(start):
  34. yield json.loads(line[len(start):-1])