miniirc_idc.py 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286
  1. #!/usr/bin/env python3
  2. #
  3. # This is very horrible and quickly written
  4. # But it works
  5. #
  6. # Copyright © 2022 by luk3yx
  7. #
  8. # Permission is hereby granted, free of charge, to any person obtaining a copy
  9. # of this software and associated documentation files (the "Software"), to deal
  10. # in the Software without restriction, including without limitation the rights
  11. # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  12. # copies of the Software, and to permit persons to whom the Software is
  13. # furnished to do so, subject to the following conditions:
  14. #
  15. # The above copyright notice and this permission notice shall be included in
  16. # all copies or substantial portions of the Software.
  17. #
  18. # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  19. # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  20. # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  21. # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  22. # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  23. # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
  24. # SOFTWARE.
  25. #
  26. from __future__ import annotations
  27. from collections.abc import Iterator, Mapping, Sequence
  28. from typing import Optional
  29. import datetime, miniirc, re, traceback # type: ignore
  30. assert miniirc.ver >= (1, 8, 1)
  31. _LEADING_COLON = "" if miniirc.ver[0] > 2 else ":"
  32. _esc_re = re.compile(r"\\(.)")
  33. # Backslash must be first
  34. _idc_escapes = {"\\": "\\", "r": "\r", "n": "\n", "t": "\t"}
  35. def _get_idc_args(
  36. command: str, kwargs: Mapping[str, Optional[str | float]]
  37. ) -> Iterator[str]:
  38. yield command
  39. for key, value in kwargs.items():
  40. if value is not None:
  41. value = str(value)
  42. for escape_char, char in _idc_escapes.items():
  43. value = value.replace(char, "\\" + escape_char)
  44. yield f"{key.upper()}={value}"
  45. def _parse_join(
  46. irc: IDC,
  47. hostmask: tuple[str, str, str],
  48. tags: Mapping[str, str],
  49. args: list[str],
  50. ) -> None:
  51. users = tags.get("=idc-join-users")
  52. if isinstance(users, str):
  53. irc._dispatch(
  54. "353", "", [irc.current_nick, "=", args[0], users]
  55. )
  56. irc._dispatch(
  57. "366", "", [irc.current_nick, args[0], "End of /NAMES list"]
  58. )
  59. class IDC(miniirc.IRC):
  60. if miniirc.ver[0] >= 2:
  61. def _dispatch(
  62. self, command: str, user: str, args: list[str]
  63. ) -> None:
  64. self.handle_msg(
  65. miniirc.IRCMessage(
  66. command,
  67. (user, "~u", f"idc/{user}")
  68. if user
  69. else ("", "", ""),
  70. {},
  71. args,
  72. )
  73. )
  74. else:
  75. def _dispatch(
  76. self, command: str, user: str, args: list[str]
  77. ) -> None:
  78. if args:
  79. args[-1] = _LEADING_COLON + args[-1]
  80. self._handle(
  81. command,
  82. (user, "~u", f"idc/{user}") if user else ("", "", ""),
  83. {},
  84. args,
  85. )
  86. def __init__(self, *args, **kwargs) -> None:
  87. super().__init__(*args, **kwargs)
  88. self.Handler("JOIN", colon=False, ircv3=True)(_parse_join)
  89. def _idc_message_parser_no_exc(
  90. self, msg: str
  91. ) -> Optional[
  92. tuple[str, tuple[str, str, str], dict[str, str], list[str]]
  93. ]:
  94. try:
  95. return self.idc_message_parser(msg)
  96. except Exception:
  97. traceback.print_exc()
  98. return None
  99. def idc_message_parser(
  100. self, msg: str
  101. ) -> Optional[
  102. tuple[str, tuple[str, str, str], dict[str, str], list[str]]
  103. ]:
  104. idc_cmd = None
  105. idc_args = {}
  106. for arg in msg.split("\t"):
  107. if "=" in arg:
  108. key, value = arg.split("=", 1)
  109. idc_args[key] = _esc_re.sub(
  110. lambda m: _idc_escapes.get(m.group(1), "\ufffd"),
  111. value,
  112. )
  113. else:
  114. idc_cmd = arg
  115. # Translate IDC keyword arguments into IRC positional ones
  116. tags = {}
  117. if idc_cmd == "PRIVMSG":
  118. command = "PRIVMSG"
  119. args = [self.current_nick, idc_args["MESSAGE"]]
  120. elif idc_cmd == "CHANMSG":
  121. command = "PRIVMSG"
  122. args = ["#" + idc_args["TARGET"], idc_args["MESSAGE"]]
  123. elif idc_cmd == "LOGIN_GOOD":
  124. command = "001"
  125. args = [
  126. self.current_nick,
  127. f"Welcome to IDC {self.current_nick}",
  128. ]
  129. elif idc_cmd == "PONG":
  130. command = "PONG"
  131. args = [self.ip, idc_args.get("COOKIE", "")]
  132. elif idc_cmd == "JOIN":
  133. command = "JOIN"
  134. idc_args["SOURCE"] = self.current_nick
  135. args = ["#" + idc_args["CHANNEL"]]
  136. # HACK: Add a message tag and fire other events later rather than
  137. # firing events from the parser function which feels worse.
  138. # The tag name starts with = so that it doesn't conflict with any
  139. # actual IRC tags.
  140. tags["=idc-join-users"] = idc_args["USERS"]
  141. else:
  142. return None
  143. # Add generic parameters
  144. if "SOURCE" in idc_args:
  145. user = idc_args["SOURCE"]
  146. hostmask = (user, "~u", f"idc/{user}")
  147. tags["account"] = user
  148. else:
  149. hostmask = ("", "", "")
  150. if command == "PRIVMSG":
  151. # If echo-message wasn't requested then don't send self messages
  152. if (
  153. hostmask[0] == self.current_nick
  154. and "echo-message" not in self.active_caps
  155. ):
  156. return None
  157. # Parse the message type
  158. msg_type = idc_args.get("TYPE", "").upper()
  159. if msg_type == "NOTICE":
  160. command = "NOTICE"
  161. elif msg_type == "ACTION":
  162. args[1] = f"\x01ACTION {args[1]}\x01"
  163. if "TS" in idc_args:
  164. dt = datetime.datetime.utcfromtimestamp(
  165. float(idc_args["TS"])
  166. )
  167. tags["time"] = dt.isoformat() + "Z"
  168. if "LABEL" in idc_args:
  169. tags["label"] = idc_args["LABEL"]
  170. if miniirc.ver[0] >= 2:
  171. return miniirc.IRCMessage(command, hostmask, tags, args)
  172. else:
  173. if args:
  174. args[-1] = _LEADING_COLON + args[-1]
  175. return command, hostmask, tags, args
  176. # Send raw messages
  177. def idc_send(self, command: str, **kwargs: Optional[str | float]):
  178. super().quote(
  179. "\t".join(_get_idc_args(command, kwargs)), force=True
  180. )
  181. def quote(
  182. self,
  183. *msg: str,
  184. force: Optional[bool] = None,
  185. tags: Optional[Mapping[str, str | bool]] = None,
  186. ) -> None:
  187. cmd, _, tags2, args = miniirc.ircv3_message_parser(
  188. " ".join(msg)
  189. )
  190. if miniirc.ver[0] < 2 and args and args[-1].startswith(":"):
  191. args[-1] = args[-1][1:]
  192. self.send(cmd, *args, force=force, tags=tags or tags2)
  193. def _get_idc_account(self) -> Sequence[str]:
  194. if isinstance(self.ns_identity, tuple):
  195. return self.ns_identity
  196. else:
  197. return self.ns_identity.split(" ", 1)
  198. @property
  199. def current_nick(self) -> str:
  200. return self._get_idc_account()[0]
  201. def send(
  202. self,
  203. cmd: str,
  204. *args: str,
  205. force: Optional[bool] = None,
  206. tags: Optional[Mapping[str, str | bool]] = None,
  207. ) -> None:
  208. cmd = cmd.upper()
  209. label = tags.get("label") if tags else None
  210. if cmd in ("PRIVMSG", "NOTICE"):
  211. target = args[0]
  212. # TODO: Make miniirc think that SASL worked PMs to NickServ don't
  213. # have to be blocked.
  214. if target == "NickServ":
  215. return
  216. msg = args[1]
  217. msg_type: Optional[str]
  218. if cmd == "NOTICE":
  219. msg_type = "NOTICE"
  220. elif msg.startswith("\x01ACTION"):
  221. msg = msg[8:].rstrip("\x01")
  222. msg_type = "ACTION"
  223. else:
  224. msg_type = None
  225. if target.startswith("#"):
  226. idc_cmd = "CHANMSG"
  227. target = target[1:]
  228. else:
  229. idc_cmd = "PRIVMSG"
  230. self.idc_send(
  231. idc_cmd,
  232. target=target,
  233. type=msg_type,
  234. message=msg,
  235. label=label,
  236. )
  237. elif cmd == "PING":
  238. self.idc_send("PING", cookie=args[0], label=label)
  239. elif cmd == "USER":
  240. user, password = self._get_idc_account()
  241. self.idc_send(
  242. "LOGIN", username=user, password=password, label=label
  243. )
  244. self.active_caps = self.ircv3_caps & {
  245. "account-tag",
  246. "echo-message",
  247. "labeled-response",
  248. }
  249. # Override the message parser to change the default parser.
  250. def change_parser(self, parser=None):
  251. super().change_parser(parser or self._idc_message_parser_no_exc)