response.py 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228
  1. from __future__ import annotations
  2. import re
  3. from typing import Union
  4. from abc import abstractmethod
  5. from urllib.parse import quote_plus, unquote_plus
  6. def quote_url(url: str) -> str:
  7. url = unquote_plus(url)
  8. url = url.split("//", maxsplit=1)
  9. # If there is no "//" in the URL, then it is a relative URL
  10. if len(url) == 1:
  11. return quote_plus(url[0], '/?&=#')
  12. url[1] = url[1].split("/", maxsplit=1)
  13. # If there is no "/" after the domain, then it is a domain URL
  14. if len(url[1]) == 1:
  15. return url[0] + "//" + url[1][0]
  16. return url[0] + "//" + url[1][0] + "/" + quote_plus(url[1][1], '/?&=#')
  17. def quote_title(title: str) -> str:
  18. if title:
  19. title = title.strip()
  20. title = " ".join(title.split())
  21. return title.replace('[', '').replace(']', '')
  22. return ""
  23. def format_link(url: str, title: str = None) -> str:
  24. if title is None:
  25. title = unquote_plus(url.split("//", maxsplit=1)[1].split("?")[0].replace("www.", ""))
  26. return f"[{quote_title(title)}]({quote_url(url)})"
  27. def format_image(image: str, alt: str, preview: str = None) -> str:
  28. """
  29. Formats the given image as a markdown string.
  30. Args:
  31. image: The image to format.
  32. alt (str): The alt for the image.
  33. preview (str, optional): The preview URL format. Defaults to "{image}?w=200&h=200".
  34. Returns:
  35. str: The formatted markdown string.
  36. """
  37. return f"[![{quote_title(alt)}]({quote_url(preview.replace('{image}', image) if preview else image)})]({quote_url(image)})"
  38. def format_images_markdown(images: Union[str, list], alt: str, preview: Union[str, list] = None) -> str:
  39. """
  40. Formats the given images as a markdown string.
  41. Args:
  42. images: The images to format.
  43. alt (str): The alt for the images.
  44. preview (str, optional): The preview URL format. Defaults to "{image}?w=200&h=200".
  45. Returns:
  46. str: The formatted markdown string.
  47. """
  48. if isinstance(images, list) and len(images) == 1:
  49. images = images[0]
  50. if isinstance(images, str):
  51. result = format_image(images, alt, preview)
  52. else:
  53. result = "\n".join(
  54. format_image(image, f"#{idx+1} {alt}", preview[idx] if isinstance(preview, list) else preview)
  55. for idx, image in enumerate(images)
  56. )
  57. start_flag = "<!-- generated images start -->\n"
  58. end_flag = "<!-- generated images end -->\n"
  59. return f"\n{start_flag}{result}\n{end_flag}\n"
  60. class ResponseType:
  61. @abstractmethod
  62. def __str__(self) -> str:
  63. raise NotImplementedError
  64. class JsonMixin:
  65. def __init__(self, **kwargs) -> None:
  66. for key, value in kwargs.items():
  67. setattr(self, key, value)
  68. def get_dict(self):
  69. return {
  70. key: value
  71. for key, value in self.__dict__.items()
  72. if not key.startswith("__")
  73. }
  74. def reset(self):
  75. self.__dict__ = {}
  76. class RawResponse(ResponseType, JsonMixin):
  77. pass
  78. class HiddenResponse(ResponseType):
  79. def __str__(self) -> str:
  80. return ""
  81. class FinishReason(JsonMixin, HiddenResponse):
  82. def __init__(self, reason: str) -> None:
  83. self.reason = reason
  84. class ToolCalls(HiddenResponse):
  85. def __init__(self, list: list):
  86. self.list = list
  87. def get_list(self) -> list:
  88. return self.list
  89. class Usage(JsonMixin, HiddenResponse):
  90. pass
  91. class AuthResult(JsonMixin, HiddenResponse):
  92. pass
  93. class TitleGeneration(HiddenResponse):
  94. def __init__(self, title: str) -> None:
  95. self.title = title
  96. class DebugResponse(HiddenResponse):
  97. def __init__(self, log: str) -> None:
  98. self.log = log
  99. class Reasoning(ResponseType):
  100. def __init__(
  101. self,
  102. token: str = None,
  103. status: str = None,
  104. is_thinking: str = None
  105. ) -> None:
  106. self.token = token
  107. self.status = status
  108. self.is_thinking = is_thinking
  109. def __str__(self) -> str:
  110. if self.is_thinking is not None:
  111. return self.is_thinking
  112. if self.token is not None:
  113. return self.token
  114. if self.status is not None:
  115. return f"{self.status}\n"
  116. return ""
  117. def get_dict(self):
  118. if self.is_thinking is None:
  119. if self.status is None:
  120. return {"token": self.token}
  121. {"token": self.token, "status": self.status}
  122. return {"token": self.token, "status": self.status, "is_thinking": self.is_thinking}
  123. class Sources(ResponseType):
  124. def __init__(self, sources: list[dict[str, str]]) -> None:
  125. self.list = []
  126. for source in sources:
  127. self.add_source(source)
  128. def add_source(self, source: dict[str, str]):
  129. url = source.get("url", source.get("link", None))
  130. if url is not None:
  131. url = re.sub(r"[&?]utm_source=.+", "", url)
  132. source["url"] = url
  133. self.list.append(source)
  134. def __str__(self) -> str:
  135. return "\n\n" + ("\n".join([
  136. f"{idx+1}. {format_link(link['url'], link.get('title', None))}"
  137. for idx, link in enumerate(self.list)
  138. ]))
  139. class BaseConversation(ResponseType):
  140. def __str__(self) -> str:
  141. return ""
  142. class JsonConversation(BaseConversation, JsonMixin):
  143. pass
  144. class SynthesizeData(HiddenResponse, JsonMixin):
  145. def __init__(self, provider: str, data: dict):
  146. self.provider = provider
  147. self.data = data
  148. class RequestLogin(ResponseType):
  149. def __init__(self, label: str, login_url: str) -> None:
  150. self.label = label
  151. self.login_url = login_url
  152. def __str__(self) -> str:
  153. return format_link(self.login_url, f"[Login to {self.label}]") + "\n\n"
  154. class ImageResponse(ResponseType):
  155. def __init__(
  156. self,
  157. images: Union[str, list],
  158. alt: str,
  159. options: dict = {}
  160. ):
  161. self.images = images
  162. self.alt = alt
  163. self.options = options
  164. def __str__(self) -> str:
  165. return format_images_markdown(self.images, self.alt, self.get("preview"))
  166. def get(self, key: str):
  167. return self.options.get(key)
  168. def get_list(self) -> list[str]:
  169. return [self.images] if isinstance(self.images, str) else self.images
  170. class ImagePreview(ImageResponse):
  171. def __str__(self):
  172. return ""
  173. def to_string(self):
  174. return super().__str__()
  175. class PreviewResponse(HiddenResponse):
  176. def __init__(self, data: str):
  177. self.data = data
  178. def to_string(self):
  179. return self.data
  180. class Parameters(ResponseType, JsonMixin):
  181. def __str__(self):
  182. return ""
  183. class ProviderInfo(JsonMixin, HiddenResponse):
  184. pass