ssl_peer_verification.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. """
  2. """
  3. __author__ = "P J Kershaw"
  4. __date__ = "02/06/05"
  5. __copyright__ = "(C) 2010 Science and Technology Facilities Council"
  6. __license__ = """BSD - See LICENSE file in top-level directory"""
  7. __contact__ = "Philip.Kershaw@stfc.ac.uk"
  8. __revision__ = '$Id: client.py 7928 2011-08-12 13:16:26Z pjkersha $'
  9. import logging
  10. log = logging.getLogger(__name__)
  11. import re
  12. class ServerSSLCertVerification(object):
  13. """Check server identity. If hostname doesn't match, allow match of
  14. host's Distinguished Name against server DN setting"""
  15. DN_LUT = {
  16. 'commonName': 'CN',
  17. 'organisationalUnitName': 'OU',
  18. 'organisation': 'O',
  19. 'countryName': 'C',
  20. 'emailAddress': 'EMAILADDRESS',
  21. 'localityName': 'L',
  22. 'stateOrProvinceName': 'ST',
  23. 'streetAddress': 'STREET',
  24. 'domainComponent': 'DC',
  25. 'userid': 'UID'
  26. }
  27. PARSER_RE_STR = '/(%s)=' % '|'.join(DN_LUT.keys() + DN_LUT.values())
  28. PARSER_RE = re.compile(PARSER_RE_STR)
  29. __slots__ = ('__hostname', '__certDN', '__serverCNPrefixes')
  30. def __init__(self, certDN=None, hostname=None, serverCNPrefixes=None):
  31. """Override parent class __init__ to enable setting of certDN
  32. setting
  33. @type certDN: string
  34. @param certDN: Set the expected Distinguished Name of the
  35. server to avoid errors matching hostnames. This is useful
  36. where the hostname is not fully qualified
  37. """
  38. self.__certDN = None
  39. self.__hostname = None
  40. self.__serverCNPrefixes = None
  41. if certDN is not None:
  42. self.certDN = certDN
  43. if hostname is not None:
  44. self.hostname = hostname
  45. if serverCNPrefixes is not None:
  46. self.serverCNPrefixes = serverCNPrefixes
  47. else:
  48. self.serverCNPrefixes = ['']
  49. def __call__(self, connection, peerCert, errorStatus, errorDepth,
  50. preverifyOK):
  51. """Verify server certificate
  52. @type connection: OpenSSL.SSL.Connection
  53. @param connection: SSL connection object
  54. @type peerCert: basestring
  55. @param peerCert: server host certificate as OpenSSL.crypto.X509
  56. instance
  57. @type errorStatus: int
  58. @param errorStatus: error status passed from caller. This is the value
  59. returned by the OpenSSL C function X509_STORE_CTX_get_error(). Look-up
  60. x509_vfy.h in the OpenSSL source to get the meanings of the different
  61. codes. PyOpenSSL doesn't help you!
  62. @type errorDepth: int
  63. @param errorDepth: a non-negative integer representing where in the
  64. certificate chain the error occurred. If it is zero it occured in the
  65. end entity certificate, one if it is the certificate which signed the
  66. end entity certificate and so on.
  67. @type preverifyOK: int
  68. @param preverifyOK: the error status - 0 = Error, 1 = OK of the current
  69. SSL context irrespective of any verification checks done here. If this
  70. function yields an OK status, it should enforce the preverifyOK value
  71. so that any error set upstream overrides and is honoured.
  72. @rtype: int
  73. @return: status code - 0/False = Error, 1/True = OK
  74. """
  75. if peerCert.has_expired():
  76. # Any expired certificate in the chain should result in an error
  77. log.error('Certificate %r in peer certificate chain has expired',
  78. peerCert.get_subject())
  79. return False
  80. elif errorDepth == 0:
  81. # Only interested in DN of last certificate in the chain - this must
  82. # match the expected Server DN setting
  83. peerCertSubj = peerCert.get_subject()
  84. peerCertDN = peerCertSubj.get_components()
  85. peerCertDN.sort()
  86. if self.certDN is None:
  87. # Check hostname against peer certificate CN field instead:
  88. if self.hostname is None:
  89. log.error('No "hostname" or "certDN" set to check peer '
  90. 'certificate against')
  91. return False
  92. acceptableCNs = [pfx + self.hostname
  93. for pfx in self.serverCNPrefixes]
  94. if peerCertSubj.commonName in acceptableCNs:
  95. return preverifyOK
  96. else:
  97. log.error('Peer certificate CN %r doesn\'t match the '
  98. 'expected CN %r', peerCertSubj.commonName,
  99. acceptableCNs)
  100. return False
  101. else:
  102. if peerCertDN == self.certDN:
  103. return preverifyOK
  104. else:
  105. log.error('Peer certificate DN %r doesn\'t match the '
  106. 'expected DN %r', peerCertDN, self.certDN)
  107. return False
  108. else:
  109. return preverifyOK
  110. def _getCertDN(self):
  111. return self.__certDN
  112. def _setCertDN(self, val):
  113. if isinstance(val, basestring):
  114. # Allow for quoted DN
  115. certDN = val.strip('"')
  116. dnFields = self.__class__.PARSER_RE.split(certDN)
  117. if len(dnFields) < 2:
  118. raise TypeError('Error parsing DN string: "%s"' % certDN)
  119. self.__certDN = zip(dnFields[1::2], dnFields[2::2])
  120. self.__certDN.sort()
  121. elif not isinstance(val, list):
  122. for i in val:
  123. if not len(i) == 2:
  124. raise TypeError('Expecting list of two element DN field, '
  125. 'DN field value pairs for "certDN" '
  126. 'attribute')
  127. self.__certDN = val
  128. else:
  129. raise TypeError('Expecting list or string type for "certDN" '
  130. 'attribute')
  131. certDN = property(fget=_getCertDN,
  132. fset=_setCertDN,
  133. doc="Distinguished Name for Server Certificate")
  134. # Get/Set Property methods
  135. def _getHostname(self):
  136. return self.__hostname
  137. def _setHostname(self, val):
  138. if not isinstance(val, basestring):
  139. raise TypeError("Expecting string type for hostname "
  140. "attribute")
  141. self.__hostname = val
  142. hostname = property(fget=_getHostname,
  143. fset=_setHostname,
  144. doc="hostname of server")
  145. def _getServerCNPrefixes(self):
  146. return self.__serverCNPrefixes
  147. def _setServerCNPrefixes(self, val):
  148. if not isinstance(val, list):
  149. raise TypeError("Expecting string type for ServerCNPrefixes "
  150. "attribute")
  151. self.__serverCNPrefixes = val
  152. serverCNPrefixes = property(fget=_getServerCNPrefixes,
  153. fset=_setServerCNPrefixes,
  154. doc="Server CN Prefixes")