ocspclient.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. #!/usr/bin/python
  2. #
  3. from pyasn1.codec.der import decoder, encoder
  4. from pyasn1_modules import rfc2560, rfc2459, pem
  5. from pyasn1.type import univ
  6. import sys, hashlib
  7. try:
  8. import urllib2
  9. except ImportError:
  10. import urllib.request as urllib2
  11. sha1oid = univ.ObjectIdentifier((1, 3, 14, 3, 2, 26))
  12. class ValueOnlyBitStringEncoder(encoder.encoder.BitStringEncoder):
  13. # These methods just do not encode tag and length fields of TLV
  14. def encodeTag(self, *args): return ''
  15. def encodeLength(self, *args): return ''
  16. def encodeValue(*args):
  17. substrate, isConstructed = encoder.encoder.BitStringEncoder.encodeValue(*args)
  18. # OCSP-specific hack follows: cut off the "unused bit count"
  19. # encoded bit-string value.
  20. return substrate[1:], isConstructed
  21. def __call__(self, bitStringValue):
  22. return self.encode(None, bitStringValue, defMode=1, maxChunkSize=0)
  23. valueOnlyBitStringEncoder = ValueOnlyBitStringEncoder()
  24. def mkOcspRequest(issuerCert, userCert):
  25. issuerTbsCertificate = issuerCert.getComponentByName('tbsCertificate')
  26. issuerSubject = issuerTbsCertificate.getComponentByName('subject')
  27. userTbsCertificate = userCert.getComponentByName('tbsCertificate')
  28. userIssuer = userTbsCertificate.getComponentByName('issuer')
  29. assert issuerSubject == userIssuer, '%s\n%s' % (
  30. issuerSubject.prettyPrint(), userIssuer.prettyPrint()
  31. )
  32. userIssuerHash = hashlib.sha1(
  33. encoder.encode(userIssuer)
  34. ).digest()
  35. issuerSubjectPublicKey = issuerTbsCertificate.getComponentByName('subjectPublicKeyInfo').getComponentByName('subjectPublicKey')
  36. issuerKeyHash = hashlib.sha1(
  37. valueOnlyBitStringEncoder(issuerSubjectPublicKey)
  38. ).digest()
  39. userSerialNumber = userTbsCertificate.getComponentByName('serialNumber')
  40. # Build request object
  41. request = rfc2560.Request()
  42. reqCert = request.setComponentByName('reqCert').getComponentByName('reqCert')
  43. hashAlgorithm = reqCert.setComponentByName('hashAlgorithm').getComponentByName('hashAlgorithm')
  44. hashAlgorithm.setComponentByName('algorithm', sha1oid)
  45. reqCert.setComponentByName('issuerNameHash', userIssuerHash)
  46. reqCert.setComponentByName('issuerKeyHash', issuerKeyHash)
  47. reqCert.setComponentByName('serialNumber', userSerialNumber)
  48. ocspRequest = rfc2560.OCSPRequest()
  49. tbsRequest = ocspRequest.setComponentByName('tbsRequest').getComponentByName('tbsRequest')
  50. tbsRequest.setComponentByName('version', 'v1')
  51. requestList = tbsRequest.setComponentByName('requestList').getComponentByName('requestList')
  52. requestList.setComponentByPosition(0, request)
  53. return ocspRequest
  54. def parseOcspResponse(ocspResponse):
  55. responseStatus = ocspResponse.getComponentByName('responseStatus')
  56. assert responseStatus == rfc2560.OCSPResponseStatus('successful'), responseStatus.prettyPrint()
  57. responseBytes = ocspResponse.getComponentByName('responseBytes')
  58. responseType = responseBytes.getComponentByName('responseType')
  59. assert responseType == id_pkix_ocsp_basic, responseType.prettyPrint()
  60. response = responseBytes.getComponentByName('response')
  61. basicOCSPResponse, _ = decoder.decode(
  62. response, asn1Spec=rfc2560.BasicOCSPResponse()
  63. )
  64. tbsResponseData = basicOCSPResponse.getComponentByName('tbsResponseData')
  65. response0 = tbsResponseData.getComponentByName('responses').getComponentByPosition(0)
  66. return (
  67. tbsResponseData.getComponentByName('producedAt'),
  68. response0.getComponentByName('certID'),
  69. response0.getComponentByName('certStatus').getName(),
  70. response0.getComponentByName('thisUpdate')
  71. )
  72. if len(sys.argv) != 2:
  73. print("""Usage:
  74. $ cat CACertificate.pem userCertificate.pem | %s <ocsp-responder-url>""" % sys.argv[0])
  75. sys.exit(-1)
  76. else:
  77. ocspUrl = sys.argv[1]
  78. # Parse CA and user certificates
  79. issuerCert, _ = decoder.decode(
  80. pem.readPemBlocksFromFile(
  81. sys.stdin, ('-----BEGIN CERTIFICATE-----', '-----END CERTIFICATE-----')
  82. )[1],
  83. asn1Spec=rfc2459.Certificate()
  84. )
  85. userCert, _ = decoder.decode(
  86. pem.readPemBlocksFromFile(
  87. sys.stdin, ('-----BEGIN CERTIFICATE-----', '-----END CERTIFICATE-----')
  88. )[1],
  89. asn1Spec=rfc2459.Certificate()
  90. )
  91. # Build OCSP request
  92. ocspReq = mkOcspRequest(issuerCert, userCert)
  93. # Use HTTP POST to get response (see Appendix A of RFC 2560)
  94. # In case you need proxies, set the http_proxy env variable
  95. httpReq = urllib2.Request(
  96. ocspUrl,
  97. encoder.encode(ocspReq),
  98. { 'Content-Type': 'application/ocsp-request' }
  99. )
  100. httpRsp = urllib2.urlopen(httpReq).read()
  101. # Process OCSP response
  102. ocspRsp, _ = decoder.decode(httpRsp, asn1Spec=rfc2560.OCSPResponse())
  103. producedAt, certId, certStatus, thisUpdate = parseOcspResponse(ocspRsp)
  104. print('Certificate ID %s is %s at %s till %s\n' % (
  105. certId.getComponentByName('serialNumber'),
  106. certStatus,
  107. producedAt,
  108. thisUpdate))