123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140 |
- from __future__ import annotations
- import re
- import json
- from urllib import parse
- from datetime import datetime
- from ...typing import AsyncResult, Messages
- from ..base_provider import AsyncGeneratorProvider
- from ...requests import StreamSession
- class Phind(AsyncGeneratorProvider):
- url = "https://www.phind.com"
- working = False
- lockdown = True
- supports_stream = True
- supports_message_history = True
- @classmethod
- async def create_async_generator(
- cls,
- model: str,
- messages: Messages,
- proxy: str = None,
- timeout: int = 120,
- creative_mode: bool = False,
- **kwargs
- ) -> AsyncResult:
- headers = {
- "Accept": "*/*",
- "Origin": cls.url,
- "Referer": f"{cls.url}/search",
- "Sec-Fetch-Dest": "empty",
- "Sec-Fetch-Mode": "cors",
- "Sec-Fetch-Site": "same-origin",
- }
- async with StreamSession(
- headers=headers,
- impersonate="chrome",
- proxies={"https": proxy},
- timeout=timeout
- ) as session:
- url = "https://www.phind.com/search?home=true"
- async with session.get(url) as response:
- text = await response.text()
- match = re.search(r'<script id="__NEXT_DATA__" type="application/json">(?P<json>[\S\s]+?)</script>', text)
- data = json.loads(match.group("json"))
- challenge_seeds = data["props"]["pageProps"]["challengeSeeds"]
-
- prompt = messages[-1]["content"]
- data = {
- "question": prompt,
- "question_history": [
- message["content"] for message in messages[:-1] if message["role"] == "user"
- ],
- "answer_history": [
- message["content"] for message in messages if message["role"] == "assistant"
- ],
- "webResults": [],
- "options": {
- "date": datetime.now().strftime("%d.%m.%Y"),
- "language": "en-US",
- "detailed": True,
- "anonUserId": "",
- "answerModel": "GPT-4" if model.startswith("gpt-4") else "Phind-34B",
- "creativeMode": creative_mode,
- "customLinks": []
- },
- "context": "\n".join([message["content"] for message in messages if message["role"] == "system"]),
- }
- data["challenge"] = generate_challenge(data, **challenge_seeds)
- async with session.post(f"https://https.api.phind.com/infer/", headers=headers, json=data) as response:
- new_line = False
- async for line in response.iter_lines():
- if line.startswith(b"data: "):
- chunk = line[6:]
- if chunk.startswith(b'<PHIND_DONE/>'):
- break
- if chunk.startswith(b'<PHIND_BACKEND_ERROR>'):
- raise RuntimeError(f"Response: {chunk.decode()}")
- if chunk.startswith(b'<PHIND_WEBRESULTS>') or chunk.startswith(b'<PHIND_FOLLOWUP>'):
- pass
- elif chunk.startswith(b"<PHIND_METADATA>") or chunk.startswith(b"<PHIND_INDICATOR>"):
- pass
- elif chunk.startswith(b"<PHIND_SPAN_BEGIN>") or chunk.startswith(b"<PHIND_SPAN_END>"):
- pass
- elif chunk:
- yield chunk.decode()
- elif new_line:
- yield "\n"
- new_line = False
- else:
- new_line = True
- def deterministic_stringify(obj):
- def handle_value(value):
- if isinstance(value, (dict, list)):
- if isinstance(value, list):
- return '[' + ','.join(sorted(map(handle_value, value))) + ']'
- else: # It's a dict
- return '{' + deterministic_stringify(value) + '}'
- elif isinstance(value, bool):
- return 'true' if value else 'false'
- elif isinstance(value, (int, float)):
- return format(value, '.8f').rstrip('0').rstrip('.')
- elif isinstance(value, str):
- return f'"{value}"'
- else:
- return 'null'
- items = sorted(obj.items(), key=lambda x: x[0])
- return ','.join([f'{k}:{handle_value(v)}' for k, v in items if handle_value(v) is not None])
- def prng_general(seed, multiplier, addend, modulus):
- a = seed * multiplier + addend
- if a < 0:
- return ((a%modulus)-modulus)/modulus
- else:
- return a%modulus/modulus
- def generate_challenge_seed(l):
- I = deterministic_stringify(l)
- d = parse.quote(I, safe='')
- return simple_hash(d)
- def simple_hash(s):
- d = 0
- for char in s:
- if len(char) > 1 or ord(char) >= 256:
- continue
- d = ((d << 5) - d + ord(char[0])) & 0xFFFFFFFF
- if d > 0x7FFFFFFF: # 2147483647
- d -= 0x100000000 # Subtract 2**32
- return d
- def generate_challenge(obj, **kwargs):
- return prng_general(
- seed=generate_challenge_seed(obj),
- **kwargs
- )
|