fake-cisco-server.py 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
  1. #!/usr/bin/env python3
  2. #
  3. # Copyright © 2021 Daniel Lenski
  4. # Copyright © 2021 Tom Carroll
  5. #
  6. # This file is part of openconnect.
  7. #
  8. # This is free software; you can redistribute it and/or
  9. # modify it under the terms of the GNU Lesser General Public License
  10. # as published by the Free Software Foundation; either version 2.1 of
  11. # the License, or (at your option) any later version.
  12. #
  13. # This library is distributed in the hope that it will be useful, but
  14. # WITHOUT ANY WARRANTY; without even the implied warranty of
  15. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  16. # Lesser General Public License for more details.
  17. #
  18. # You should have received a copy of the GNU Lesser General Public License
  19. # along with this program. If not, see <http://www.gnu.org/licenses/>
  20. import argparse
  21. import ssl
  22. from base64 import b64decode
  23. from flask import Flask, request, session
  24. from textwrap import dedent
  25. import xmltodict
  26. import OpenSSL
  27. from OpenSSL.crypto import _lib, X509
  28. from OpenSSL.crypto import load_certificate, X509Store, X509StoreContext
  29. app = Flask(__name__)
  30. app.config.update(SECRET_KEY=b'fake', DEBUG=True, SESSION_COOKIE_NAME='fake')
  31. ########################################
  32. def is_ca_cert(cert):
  33. for ext in (cert.get_extension(i) for i in range(cert.get_extension_count())):
  34. if (ext.get_short_name() == b'basicConstraints' and str(ext).find('CA:TRUE') > -1):
  35. return True
  36. return False
  37. def get_certs(p7):
  38. # client-cert is a PKCS7-encoded set of certificates.
  39. # GnuTLS and OpenSSL order the certificates differently.
  40. # GnuTLS provides the certificates in 'canonical order',
  41. # while OpenSSL provides it in the order the programmer
  42. # added it to the PKCS#7 structure.
  43. #
  44. # Testing shows that Cisco servers can handle any order
  45. if p7.type_is_signed():
  46. certs = p7._pkcs7.d.sign.cert
  47. elif p7.type_is_signedAndEnveloped():
  48. certs = p7._pkcs7.d.signed_and_enveloped.cert
  49. else:
  50. return ()
  51. # Ensure that we have exactly one usercert, and that
  52. # all the rest are (possibly-intermediate) CA certs
  53. usercert = None
  54. extracerts = []
  55. for i in range(_lib.sk_X509_num(certs)):
  56. cert = _lib.X509_dup(_lib.sk_X509_value(certs, i))
  57. pycert = X509._from_raw_x509_ptr(cert)
  58. if is_ca_cert(pycert):
  59. extracerts.append(pycert)
  60. else:
  61. assert usercert is None
  62. usercert = pycert
  63. assert usercert
  64. # Build a path from the usercert to the root
  65. path = [usercert]
  66. # Verify that there are no duplicates in the set
  67. issuers = {}
  68. for c in extracerts:
  69. subject = c.get_subject().der()
  70. assert subject not in issuers
  71. issuers[subject] = c
  72. while True:
  73. try:
  74. path.append(issuers.pop(path[-1].get_issuer().der()))
  75. except KeyError:
  76. break
  77. # Verify that there are no remaining (unused) certificates
  78. assert len(issuers) == 0
  79. return tuple(path)
  80. def verify_certs(certs, ca_certs):
  81. # Initialize trust store with CA certificates
  82. store = X509Store()
  83. for cert in ca_certs:
  84. store.add_cert(cert)
  85. # Incrementally build up trust by first checking intermedaries
  86. for cert in reversed(certs):
  87. store_ctx = X509StoreContext(store, cert)
  88. store_ctx.verify_certificate()
  89. # Add intermediary to trust
  90. store.add_cert(cert)
  91. ########################################
  92. ALLOWED_HASH_ALGORITHMS = ('sha256', 'sha384', 'sha512')
  93. INITIAL_RESPONSE = dedent('''
  94. <?xml version="1.0" encoding="UTF-8"?>
  95. <config-auth client="vpn" type="auth-request" aggregate-auth-version="2">
  96. <multiple-client-cert-request>{}</multiple-client-cert-request>
  97. <cert-authenticated></cert-authenticated>
  98. </config-auth>'''.format(''.join(
  99. '<hash-algorithm>%s</hash-algorithm>' % algo for algo in ALLOWED_HASH_ALGORITHMS)))
  100. AUTH_COMPLETE_RESPONSE = dedent('''
  101. <?xml version="1.0" encoding="UTF-8"?>
  102. <config-auth client="vpn" type="complete" aggregate-auth-version="2">
  103. <session-id>123456789</session-id>
  104. <session-token>1234567890ABCDEF1234567890ABCDEF1234567890ABCDEF1234567890ABCD</session-token>
  105. <auth id="success"/>
  106. <config/>
  107. </config-auth>''')
  108. # Respond to XML/POST auth requests
  109. @app.route('/', methods=('POST',))
  110. def handle_xmlpost(usergroup=None):
  111. dict_req = xmltodict.parse(request.data)
  112. assert 'config-auth' in dict_req
  113. assert '@client' in dict_req['config-auth'] and dict_req['config-auth']['@client'] == 'vpn'
  114. assert '@type' in dict_req['config-auth']
  115. step = dict_req['config-auth']['@type']
  116. session.update(step=step, authid='main')
  117. if step == 'init':
  118. return initial_request(dict_req)
  119. elif step == 'auth-reply':
  120. return auth_reply(dict_req)
  121. else:
  122. raise AssertionError('Unexpected config-auth/@type %r' % step)
  123. def initial_request(dict_req):
  124. config_auth = dict_req['config-auth']
  125. # Expected:
  126. # <config-auth client="vpn" type="init">
  127. # <capabilities>
  128. # <auth-method>single-sign-on</auth-method>
  129. # <auth-method>single-sign-on-v2</auth-method>
  130. # <auth-method>...</auth-method>
  131. # <auth-method>multiple-cert</auth-method>
  132. # </capabilities>
  133. # </config-auth>
  134. assert 'multiple-cert' in config_auth['capabilities']['auth-method']
  135. return INITIAL_RESPONSE
  136. def auth_reply(dict_req):
  137. # Expected:
  138. # <config-auth type="auth-reply">
  139. # <auth>
  140. # <client-cert-chain cert-store="1M">
  141. # <client-cert-sent-via-protocol/>
  142. # </client-cert-chain>
  143. # <client-cert-chain cert-store="1U">
  144. # <client-cert cert-format="pkcs7">${certs_pkcs7}</client-cert>
  145. # <client-cert-auth-signature hash-algorithm-chosen="${algo}">${signature}</client-cert-auth-signature>
  146. # </client-cert-chain>
  147. # </auth>
  148. # </config-auth>
  149. config_auth = dict_req['config-auth']
  150. assert 'client-cert-chain' in config_auth['auth']
  151. client_cert_chain = config_auth['auth']['client-cert-chain']
  152. assert client_cert_chain[0]['@cert-store'] == '1M'
  153. assert client_cert_chain[0]['client-cert-sent-via-protocol'] is None # empty tag
  154. assert client_cert_chain[1]['@cert-store'] == '1U'
  155. assert client_cert_chain[1]['client-cert']['@cert-format'] == 'pkcs7'
  156. certs_pkcs7 = b64decode(client_cert_chain[1]['client-cert']['#text'])
  157. signature = b64decode(client_cert_chain[1]['client-cert-auth-signature']['#text'])
  158. algo = client_cert_chain[1]['client-cert-auth-signature']['@hash-algorithm-chosen']
  159. assert algo in ALLOWED_HASH_ALGORITHMS
  160. certs = get_certs(OpenSSL.crypto.load_pkcs7_data(OpenSSL.crypto.FILETYPE_ASN1, certs_pkcs7))
  161. assert 1 <= len(certs) <= 10
  162. if app.config['ca_certs']:
  163. verify_certs(certs, app.config['ca_certs'])
  164. # Verify that the client has signed the INITIAL_RESPONSE using the private key corresponding to
  165. # the appropriate certificate (rooted in one of the ca_certs), and using the chosen hash algorithm.
  166. OpenSSL.crypto.verify(certs[0], signature, INITIAL_RESPONSE.encode(), algo)
  167. return AUTH_COMPLETE_RESPONSE
  168. def main(args):
  169. context = ssl.SSLContext()
  170. # Verify that TLS requests include the appropriate client certificate
  171. context.load_cert_chain(args.cert, args.key)
  172. # Read cafile, parsing each certificate found
  173. ca_certs = []
  174. if args.cafile:
  175. with open(args.cafile, 'r') as f:
  176. root_cas_pem = f.read()
  177. delimiter = '-----BEGIN CERTIFICATE-----\n'
  178. offset = 0
  179. while True:
  180. offset = root_cas_pem.find(delimiter, offset)
  181. if offset < 0:
  182. break
  183. cert = load_certificate(OpenSSL.crypto.FILETYPE_PEM, root_cas_pem[offset:])
  184. ca_certs.append(cert)
  185. offset += len(delimiter)
  186. assert ca_certs
  187. app.config['ca_certs'] = ca_certs
  188. app.run(host=args.host, port=args.port, debug=True, ssl_context=context)
  189. if __name__ == '__main__':
  190. parser = argparse.ArgumentParser(description='Cisco AnyConnect server stub')
  191. parser.add_argument('--enable-multicert', action='store_true',
  192. help='Enable multiple-certificate authentication')
  193. parser.add_argument('--cafile', help='Path to CA file')
  194. parser.add_argument('host', help='Bind address')
  195. parser.add_argument('port', type=int, help='Bind port')
  196. parser.add_argument('cert', help='TLS user certificate to validate')
  197. parser.add_argument('key', nargs='?', help='Key of TLS user certificate to validate')
  198. args = parser.parse_args()
  199. if not args.enable_multicert:
  200. parser.error("This server stub is solely implemented to exercise "
  201. "multiple-certificate authentication.")
  202. main(args)