Aibn.py 1.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546
  1. from __future__ import annotations
  2. import time
  3. import hashlib
  4. from ...typing import AsyncResult, Messages
  5. from ...requests import StreamSession
  6. from ..base_provider import AsyncGeneratorProvider
  7. class Aibn(AsyncGeneratorProvider):
  8. url = "https://aibn.cc"
  9. working = False
  10. supports_message_history = True
  11. supports_gpt_35_turbo = True
  12. @classmethod
  13. async def create_async_generator(
  14. cls,
  15. model: str,
  16. messages: Messages,
  17. proxy: str = None,
  18. timeout: int = 120,
  19. **kwargs
  20. ) -> AsyncResult:
  21. async with StreamSession(
  22. impersonate="chrome107",
  23. proxies={"https": proxy},
  24. timeout=timeout
  25. ) as session:
  26. timestamp = int(time.time())
  27. data = {
  28. "messages": messages,
  29. "pass": None,
  30. "sign": generate_signature(timestamp, messages[-1]["content"]),
  31. "time": timestamp
  32. }
  33. async with session.post(f"{cls.url}/api/generate", json=data) as response:
  34. response.raise_for_status()
  35. async for chunk in response.iter_content():
  36. yield chunk.decode()
  37. def generate_signature(timestamp: int, message: str, secret: str = "undefined"):
  38. data = f"{timestamp}:{message}:{secret}"
  39. return hashlib.sha256(data.encode()).hexdigest()