authplugin-example.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291
  1. #!/usr/bin/env python3
  2. # This is a demonstration example of how to write a
  3. # keyboard-interactive authentication helper plugin using PuTTY's
  4. # protocol for involving it in SSH connection setup.
  5. # The protocol, and the purpose of an authentication plugin, is
  6. # fully documented in an appendix to the PuTTY manual.
  7. import io
  8. import os
  9. import struct
  10. import sys
  11. # Exception class we'll use to get a clean exit on EOF.
  12. class PluginEOF(Exception): pass
  13. # ----------------------------------------------------------------------
  14. #
  15. # Marshalling and unmarshalling routines to write and read the
  16. # necessary SSH data types to/from a binary file handle (which can
  17. # include an io.BytesIO if you need to encode/decode in-process).
  18. #
  19. # Error handling is a totally ad-hoc mixture of 'assert' and just
  20. # assuming things will have the right type, or be the right length of
  21. # tuple, or be valid UTF-8. So it should be _robust_, in the sense
  22. # that you'll get a Python exception if anything fails. But no
  23. # sensible error reporting or recovery is implemented.
  24. #
  25. # That should be good enough, because PuTTY will log the plugin's
  26. # standard error in its Event Log, so if the plugin crashes, you'll be
  27. # able to retrieve the traceback.
  28. def wr_byte(fh, b):
  29. assert 0 <= b < 0x100
  30. fh.write(bytes([b]))
  31. def wr_boolean(fh, b):
  32. wr_byte(fh, 1 if b else 0)
  33. def wr_uint32(fh, u):
  34. assert 0 <= u < 0x100000000
  35. fh.write(struct.pack(">I", u))
  36. def wr_string(fh, s):
  37. wr_uint32(fh, len(s))
  38. fh.write(s)
  39. def wr_string_utf8(fh, s):
  40. wr_string(fh, s.encode("UTF-8"))
  41. def rd_n(fh, n):
  42. data = fh.read(n)
  43. if len(data) < n:
  44. raise PluginEOF()
  45. return data
  46. def rd_byte(fh):
  47. return rd_n(fh, 1)[0]
  48. def rd_boolean(fh):
  49. return rd_byte(fh) != 0
  50. def rd_uint32(fh):
  51. return struct.unpack(">I", rd_n(fh, 4))[0]
  52. def rd_string(fh):
  53. length = rd_uint32(fh)
  54. return rd_n(fh, length)
  55. def rd_string_utf8(fh):
  56. return rd_string(fh).decode("UTF-8")
  57. # ----------------------------------------------------------------------
  58. #
  59. # Protocol definitions.
  60. our_max_version = 2
  61. PLUGIN_INIT = 1
  62. PLUGIN_INIT_RESPONSE = 2
  63. PLUGIN_PROTOCOL = 3
  64. PLUGIN_PROTOCOL_ACCEPT = 4
  65. PLUGIN_PROTOCOL_REJECT = 5
  66. PLUGIN_AUTH_SUCCESS = 6
  67. PLUGIN_AUTH_FAILURE = 7
  68. PLUGIN_INIT_FAILURE = 8
  69. PLUGIN_KI_SERVER_REQUEST = 20
  70. PLUGIN_KI_SERVER_RESPONSE = 21
  71. PLUGIN_KI_USER_REQUEST = 22
  72. PLUGIN_KI_USER_RESPONSE = 23
  73. # ----------------------------------------------------------------------
  74. #
  75. # Classes to make it easy to construct and receive messages.
  76. #
  77. # OutMessage is constructed with the message type; then you use the
  78. # wr_foo() routines to add fields to it, and finally call its send()
  79. # method.
  80. #
  81. # InMessage is constructed via the expect() class method, to which you
  82. # give a list of message types you expect to see one of at this stage.
  83. # Once you've got one, you can rd_foo() fields from it.
  84. class OutMessage:
  85. def __init__(self, msgtype):
  86. self.buf = io.BytesIO()
  87. wr_byte(self.buf, msgtype)
  88. self.write = self.buf.write
  89. def send(self, fh=sys.stdout.buffer):
  90. wr_string(fh, self.buf.getvalue())
  91. fh.flush()
  92. class InMessage:
  93. @classmethod
  94. def expect(cls, expected_types, fh=sys.stdin.buffer):
  95. self = cls()
  96. self.buf = io.BytesIO(rd_string(fh))
  97. self.msgtype = rd_byte(self.buf)
  98. self.read = self.buf.read
  99. if self.msgtype not in expected_types:
  100. raise ValueError("received packet type {:d}, expected {}".format(
  101. self.msgtype, ",".join(map("{:d}".format,
  102. sorted(expected_types)))))
  103. return self
  104. # ----------------------------------------------------------------------
  105. #
  106. # The main implementation of the protocol.
  107. def protocol():
  108. # Start by expecting PLUGIN_INIT.
  109. msg = InMessage.expect({PLUGIN_INIT})
  110. their_version = rd_uint32(msg)
  111. hostname = rd_string_utf8(msg)
  112. port = rd_uint32(msg)
  113. username = rd_string_utf8(msg)
  114. print(f"Got hostname {hostname!r}, port {port!r}", file=sys.stderr,
  115. flush=True)
  116. # Decide which protocol version we're speaking.
  117. version = min(their_version, our_max_version)
  118. assert version != 0, "Protocol version 0 does not exist"
  119. if "TESTPLUGIN_INIT_FAIL" in os.environ:
  120. # Test the plugin failing at startup time.
  121. msg = OutMessage(PLUGIN_INIT_FAILURE)
  122. wr_string_utf8(msg, os.environ["TESTPLUGIN_INIT_FAIL"])
  123. msg.send()
  124. return
  125. # Send INIT_RESPONSE, with our protocol version and an overridden
  126. # username.
  127. #
  128. # By default this test plugin doesn't override the username, but
  129. # you can make it do so by setting TESTPLUGIN_USERNAME in the
  130. # environment.
  131. msg = OutMessage(PLUGIN_INIT_RESPONSE)
  132. wr_uint32(msg, version)
  133. wr_string_utf8(msg, os.environ.get("TESTPLUGIN_USERNAME", ""))
  134. msg.send()
  135. # Outer loop run once per authentication protocol.
  136. while True:
  137. # Expect a message telling us what the protocol is.
  138. msg = InMessage.expect({PLUGIN_PROTOCOL})
  139. method = rd_string(msg)
  140. if "TESTPLUGIN_PROTO_REJECT" in os.environ:
  141. # Test the plugin failing at PLUGIN_PROTOCOL time.
  142. msg = OutMessage(PLUGIN_PROTOCOL_REJECT)
  143. wr_string_utf8(msg, os.environ["TESTPLUGIN_PROTO_REJECT"])
  144. msg.send()
  145. continue
  146. # We only support keyboard-interactive. If we supported other
  147. # auth methods, this would be the place to add further clauses
  148. # to this if statement for them.
  149. if method == b"keyboard-interactive":
  150. msg = OutMessage(PLUGIN_PROTOCOL_ACCEPT)
  151. msg.send()
  152. # Inner loop run once per keyboard-interactive exchange
  153. # with the SSH server.
  154. while True:
  155. # Expect a set of prompts from the server, or
  156. # terminate the loop on SUCCESS or FAILURE.
  157. #
  158. # (We could also respond to SUCCESS or FAILURE by
  159. # updating caches of our own, if we had any that were
  160. # useful.)
  161. msg = InMessage.expect({PLUGIN_KI_SERVER_REQUEST,
  162. PLUGIN_AUTH_SUCCESS,
  163. PLUGIN_AUTH_FAILURE})
  164. if (msg.msgtype == PLUGIN_AUTH_SUCCESS or
  165. msg.msgtype == PLUGIN_AUTH_FAILURE):
  166. break
  167. # If we didn't just break, we're sitting on a
  168. # PLUGIN_KI_SERVER_REQUEST message. Get all its bits
  169. # and pieces out.
  170. name = rd_string_utf8(msg)
  171. instructions = rd_string_utf8(msg)
  172. language = rd_string(msg)
  173. nprompts = rd_uint32(msg)
  174. prompts = []
  175. for i in range(nprompts):
  176. prompt = rd_string_utf8(msg)
  177. echo = rd_boolean(msg)
  178. prompts.append((prompt, echo))
  179. # Actually make up some answers for the prompts. This
  180. # is the part that a non-example implementation would
  181. # do very differently, of course!
  182. #
  183. # Here, we answer "foo" to every prompt, except that
  184. # if there are exactly two prompts in the packet then
  185. # we answer "stoat" to the first and "weasel" to the
  186. # second.
  187. #
  188. # (These answers are consistent with the ones required
  189. # by PuTTY's test SSH server Uppity in its own
  190. # keyboard-interactive test implementation: that
  191. # presents a two-prompt packet and expects
  192. # "stoat","weasel" as the answers, and then presents a
  193. # zero-prompt packet. So this test plugin will get you
  194. # through Uppity's k-i in a one-touch manner. The
  195. # "foo" in this code isn't used by Uppity at all; I
  196. # just include it because I had to have _some_
  197. # handling for the else clause.)
  198. #
  199. # If TESTPLUGIN_PROMPTS is set in the environment, we
  200. # ask the user questions of our own by sending them
  201. # back to PuTTY as USER_REQUEST messages.
  202. if nprompts == 2:
  203. if "TESTPLUGIN_PROMPTS" in os.environ:
  204. for i in range(2):
  205. # Make up some questions to ask.
  206. msg = OutMessage(PLUGIN_KI_USER_REQUEST)
  207. wr_string_utf8(
  208. msg, "Plugin request #{:d} (name)".format(i))
  209. wr_string_utf8(
  210. msg, "Plugin request #{:d} (instructions)"
  211. .format(i))
  212. wr_string(msg, b"")
  213. wr_uint32(msg, 2)
  214. wr_string_utf8(msg, "Prompt 1 of 2 (echo): ")
  215. wr_boolean(msg, True)
  216. wr_string_utf8(msg, "Prompt 2 of 2 (no echo): ")
  217. wr_boolean(msg, False)
  218. msg.send()
  219. # Expect the answers.
  220. msg = InMessage.expect({PLUGIN_KI_USER_RESPONSE})
  221. user_nprompts = rd_uint32(msg)
  222. assert user_nprompts == 2, (
  223. "Should match what we just sent")
  224. for i in range(nprompts):
  225. user_response = rd_string_utf8(msg)
  226. # We don't actually check these
  227. # responses for anything.
  228. answers = ["stoat", "weasel"]
  229. else:
  230. answers = ["foo"] * nprompts
  231. # Send the answers to the SSH server's questions.
  232. msg = OutMessage(PLUGIN_KI_SERVER_RESPONSE)
  233. wr_uint32(msg, len(answers))
  234. for answer in answers:
  235. wr_string_utf8(msg, answer)
  236. msg.send()
  237. else:
  238. # Default handler if we don't speak the offered protocol
  239. # at all.
  240. msg = OutMessage(PLUGIN_PROTOCOL_REJECT)
  241. wr_string_utf8(msg, "")
  242. msg.send()
  243. # Demonstration write to stderr, to prove that it shows up in PuTTY's
  244. # Event Log.
  245. print("Hello from test plugin's stderr", file=sys.stderr, flush=True)
  246. try:
  247. protocol()
  248. except PluginEOF:
  249. pass