ca.py 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. #!/usr/bin/env python3
  2. #
  3. # Implementation of OpenSSH certificate creation. Used in
  4. # cryptsuite.py to construct certificates for test purposes.
  5. #
  6. # Can also be run standalone to function as an actual CA, though I
  7. # don't currently know of any reason you'd want to use it in place of
  8. # ssh-keygen. In that mode, it depends on having an SSH agent
  9. # available to do the signing.
  10. import argparse
  11. import base64
  12. import enum
  13. import hashlib
  14. import io
  15. import os
  16. import ssh
  17. class Container:
  18. pass
  19. class CertType(enum.Enum):
  20. user = 1
  21. host = 2
  22. def maybe_encode(s):
  23. if isinstance(s, bytes):
  24. return s
  25. return s.encode('UTF-8')
  26. def make_signature_preimage(
  27. key_to_certify, ca_key, certtype, keyid, serial, principals,
  28. valid_after=0, valid_before=0xFFFFFFFFFFFFFFFF,
  29. critical_options={}, extensions={},
  30. reserved=b'', nonce=None):
  31. alg, pubkeydata = ssh.ssh_decode_string(key_to_certify, True)
  32. if nonce is None:
  33. nonce = os.urandom(32)
  34. buf = io.BytesIO()
  35. buf.write(ssh.ssh_string(alg + b"-cert-v01@openssh.com"))
  36. buf.write(ssh.ssh_string(nonce))
  37. buf.write(pubkeydata)
  38. buf.write(ssh.ssh_uint64(serial))
  39. buf.write(ssh.ssh_uint32(certtype.value if isinstance(certtype, CertType)
  40. else certtype))
  41. buf.write(ssh.ssh_string(maybe_encode(keyid)))
  42. buf.write(ssh.ssh_string(b''.join(
  43. ssh.ssh_string(maybe_encode(principal))
  44. for principal in principals)))
  45. buf.write(ssh.ssh_uint64(valid_after))
  46. buf.write(ssh.ssh_uint64(valid_before))
  47. buf.write(ssh.ssh_string(b''.join(
  48. ssh.ssh_string(opt) + ssh.ssh_string(val)
  49. for opt, val in sorted([(maybe_encode(opt), maybe_encode(val))
  50. for opt, val in critical_options.items()]))))
  51. buf.write(ssh.ssh_string(b''.join(
  52. ssh.ssh_string(opt) + ssh.ssh_string(val)
  53. for opt, val in sorted([(maybe_encode(opt), maybe_encode(val))
  54. for opt, val in extensions.items()]))))
  55. buf.write(ssh.ssh_string(reserved))
  56. # The CA key here can be a raw 'bytes', or an ssh_key object
  57. # exposed via testcrypt
  58. if type(ca_key) != bytes:
  59. ca_key = ca_key.public_blob()
  60. buf.write(ssh.ssh_string(ca_key))
  61. return buf.getvalue()
  62. def make_full_cert(preimage, signature):
  63. return preimage + ssh.ssh_string(signature)
  64. def sign_cert_via_testcrypt(preimage, ca_key, signflags=None):
  65. # Expects ca_key to be a testcrypt ssh_key object
  66. signature = ca_key.sign(preimage, 0 if signflags is None else signflags)
  67. return make_full_cert(preimage, signature)
  68. def sign_cert_via_agent(preimage, ca_key, signflags=None):
  69. # Expects ca_key to be a binary public key blob, and for a
  70. # currently running SSH agent to contain the corresponding private
  71. # key.
  72. import agenttest
  73. sign_request = (ssh.ssh_byte(ssh.SSH2_AGENTC_SIGN_REQUEST) +
  74. ssh.ssh_string(ca_key) + ssh.ssh_string(preimage))
  75. if signflags is not None:
  76. sign_request += ssh.ssh_uint32(signflags)
  77. sign_response = agenttest.agent_query(sign_request)
  78. msgtype, sign_response = ssh.ssh_decode_byte(sign_response, True)
  79. if msgtype == ssh.SSH2_AGENT_SIGN_RESPONSE:
  80. signature, sign_response = ssh.ssh_decode_string(sign_response, True)
  81. return make_full_cert(preimage, signature)
  82. elif msgtype == ssh.SSH2_AGENT_FAILURE:
  83. raise IOError("Agent refused to return a signature")
  84. else:
  85. raise IOError("Agent returned unexpecteed message type {:d}"
  86. .format(msgtype))
  87. def read_pubkey_file(fh):
  88. b64buf = io.StringIO()
  89. comment = None
  90. lines = (line.rstrip("\r\n") for line in iter(fh.readline, ""))
  91. line = next(lines)
  92. if line == "---- BEGIN SSH2 PUBLIC KEY ----":
  93. # RFC 4716 public key. Read headers like Comment:
  94. line = next(lines)
  95. while ":" in line:
  96. key, val = line.split(":", 1)
  97. if key == "Comment":
  98. comment = val.strip("\r\n")
  99. line = next(lines)
  100. # Now expect lines of base64 data.
  101. while line != "---- END SSH2 PUBLIC KEY ----":
  102. b64buf.write(line)
  103. line = next(lines)
  104. else:
  105. # OpenSSH public key. Expect the b64buf blob to be the second word.
  106. fields = line.split(" ", 2)
  107. b64buf.write(fields[1])
  108. if len(fields) > 1:
  109. comment = fields[2]
  110. return base64.b64decode(b64buf.getvalue()), comment
  111. def write_pubkey_file(fh, key, comment=None):
  112. alg = ssh.ssh_decode_string(key)
  113. fh.write(alg.decode('ASCII'))
  114. fh.write(" " + base64.b64encode(key).decode('ASCII'))
  115. if comment is not None:
  116. fh.write(" " + comment)
  117. fh.write("\n")
  118. def default_signflags(key):
  119. alg = ssh.ssh_decode_string(key)
  120. if alg == b'ssh-rsa':
  121. return 4 # RSA-SHA-512
  122. def main():
  123. parser = argparse.ArgumentParser(
  124. description='Create and sign OpenSSH certificates.')
  125. parser.add_argument("key_to_certify", help="Public key to be certified.")
  126. parser.add_argument("--ca-key", required=True,
  127. help="Public key of the CA. Must be present in a "
  128. "currently accessible SSH agent.")
  129. parser.add_argument("-o", "--output", required=True,
  130. help="File to write output OpenSSH key to.")
  131. parser.add_argument("--type", required=True, choices={'user', 'host'},
  132. help="Type of certificate to make.")
  133. parser.add_argument("--principal", "--user", "--host",
  134. required=True, action="append",
  135. help="User names or host names to authorise.")
  136. parser.add_argument("--key-id", "--keyid", required=True,
  137. help="Human-readable key ID string for log files.")
  138. parser.add_argument("--serial", type=int, required=True,
  139. help="Serial number to write into certificate.")
  140. parser.add_argument("--signflags", type=int, help="Signature flags "
  141. "(e.g. 2 = RSA-SHA-256, 4 = RSA-SHA-512).")
  142. args = parser.parse_args()
  143. with open(args.key_to_certify) as fh:
  144. key_to_certify, comment = read_pubkey_file(fh)
  145. with open(args.ca_key) as fh:
  146. ca_key, _ = read_pubkey_file(fh)
  147. extensions = {
  148. 'permit-X11-forwarding': '',
  149. 'permit-agent-forwarding': '',
  150. 'permit-port-forwarding': '',
  151. 'permit-pty': '',
  152. 'permit-user-rc': '',
  153. }
  154. # FIXME: for a full-featured command-line CA we'd need to add
  155. # command-line options for crit opts, extensions and validity
  156. # period
  157. preimage = make_signature_preimage(
  158. key_to_certify = key_to_certify,
  159. ca_key = ca_key,
  160. certtype = getattr(CertType, args.type),
  161. keyid = args.key_id,
  162. serial = args.serial,
  163. principals = args.principal,
  164. extensions = extensions)
  165. signflags = (args.signflags if args.signflags is not None
  166. else default_signflags(ca_key))
  167. cert = sign_cert_via_agent(preimage, ca_key, signflags)
  168. with open(args.output, "w") as fh:
  169. write_pubkey_file(fh, cert, comment)
  170. if __name__ == '__main__':
  171. main()