ssl_peer_verification.py 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228
  1. """ndg_httpsclient - module containing SSL peer verification class.
  2. """
  3. __author__ = "P J Kershaw (STFC)"
  4. __date__ = "09/12/11"
  5. __copyright__ = "(C) 2012 Science and Technology Facilities Council"
  6. __license__ = "BSD - see LICENSE file in top-level directory"
  7. __contact__ = "Philip.Kershaw@stfc.ac.uk"
  8. __revision__ = '$Id$'
  9. import re
  10. import logging
  11. log = logging.getLogger(__name__)
  12. try:
  13. from ndg.httpsclient.subj_alt_name import SubjectAltName
  14. from pyasn1.codec.der import decoder as der_decoder
  15. SUBJ_ALT_NAME_SUPPORT = True
  16. except ImportError, e:
  17. SUBJ_ALT_NAME_SUPPORT = False
  18. SUBJ_ALT_NAME_SUPPORT_MSG = (
  19. 'SubjectAltName support is disabled - check pyasn1 package '
  20. 'installation to enable'
  21. )
  22. import warnings
  23. warnings.warn(SUBJ_ALT_NAME_SUPPORT_MSG)
  24. class ServerSSLCertVerification(object):
  25. """Check server identity. If hostname doesn't match, allow match of
  26. host's Distinguished Name against server DN setting"""
  27. DN_LUT = {
  28. 'commonName': 'CN',
  29. 'organisationalUnitName': 'OU',
  30. 'organisation': 'O',
  31. 'countryName': 'C',
  32. 'emailAddress': 'EMAILADDRESS',
  33. 'localityName': 'L',
  34. 'stateOrProvinceName': 'ST',
  35. 'streetAddress': 'STREET',
  36. 'domainComponent': 'DC',
  37. 'userid': 'UID'
  38. }
  39. SUBJ_ALT_NAME_EXT_NAME = 'subjectAltName'
  40. PARSER_RE_STR = '/(%s)=' % '|'.join(DN_LUT.keys() + DN_LUT.values())
  41. PARSER_RE = re.compile(PARSER_RE_STR)
  42. __slots__ = ('__hostname', '__certDN', '__subj_alt_name_match')
  43. def __init__(self, certDN=None, hostname=None, subj_alt_name_match=True):
  44. """Override parent class __init__ to enable setting of certDN
  45. setting
  46. @type certDN: string
  47. @param certDN: Set the expected Distinguished Name of the
  48. server to avoid errors matching hostnames. This is useful
  49. where the hostname is not fully qualified
  50. @type hostname: string
  51. @param hostname: hostname to match against peer certificate
  52. subjectAltNames or subject common name
  53. @type subj_alt_name_match: bool
  54. @param subj_alt_name_match: flag to enable/disable matching of hostname
  55. against peer certificate subjectAltNames. Nb. A setting of True will
  56. be ignored if the pyasn1 package is not installed
  57. """
  58. self.__certDN = None
  59. self.__hostname = None
  60. if certDN is not None:
  61. self.certDN = certDN
  62. if hostname is not None:
  63. self.hostname = hostname
  64. if subj_alt_name_match:
  65. if not SUBJ_ALT_NAME_SUPPORT:
  66. log.warning('Overriding "subj_alt_name_match" keyword setting: '
  67. 'peer verification with subjectAltNames is disabled')
  68. self.__subj_alt_name_match = False
  69. else:
  70. self.__subj_alt_name_match = True
  71. else:
  72. log.debug('Disabling peer verification with subject '
  73. 'subjectAltNames!')
  74. self.__subj_alt_name_match = False
  75. def __call__(self, connection, peerCert, errorStatus, errorDepth,
  76. preverifyOK):
  77. """Verify server certificate
  78. @type connection: OpenSSL.SSL.Connection
  79. @param connection: SSL connection object
  80. @type peerCert: basestring
  81. @param peerCert: server host certificate as OpenSSL.crypto.X509
  82. instance
  83. @type errorStatus: int
  84. @param errorStatus: error status passed from caller. This is the value
  85. returned by the OpenSSL C function X509_STORE_CTX_get_error(). Look-up
  86. x509_vfy.h in the OpenSSL source to get the meanings of the different
  87. codes. PyOpenSSL doesn't help you!
  88. @type errorDepth: int
  89. @param errorDepth: a non-negative integer representing where in the
  90. certificate chain the error occurred. If it is zero it occured in the
  91. end entity certificate, one if it is the certificate which signed the
  92. end entity certificate and so on.
  93. @type preverifyOK: int
  94. @param preverifyOK: the error status - 0 = Error, 1 = OK of the current
  95. SSL context irrespective of any verification checks done here. If this
  96. function yields an OK status, it should enforce the preverifyOK value
  97. so that any error set upstream overrides and is honoured.
  98. @rtype: int
  99. @return: status code - 0/False = Error, 1/True = OK
  100. """
  101. if peerCert.has_expired():
  102. # Any expired certificate in the chain should result in an error
  103. log.error('Certificate %r in peer certificate chain has expired',
  104. peerCert.get_subject())
  105. return False
  106. elif errorDepth == 0:
  107. # Only interested in DN of last certificate in the chain - this must
  108. # match the expected Server DN setting
  109. peerCertSubj = peerCert.get_subject()
  110. peerCertDN = peerCertSubj.get_components()
  111. peerCertDN.sort()
  112. if self.certDN is None:
  113. # Check hostname against peer certificate CN field instead:
  114. if self.hostname is None:
  115. log.error('No "hostname" or "certDN" set to check peer '
  116. 'certificate against')
  117. return False
  118. # Check for subject alternative names
  119. if self.__subj_alt_name_match:
  120. dns_names = self._get_subj_alt_name(peerCert)
  121. if self.hostname in dns_names:
  122. return preverifyOK
  123. # If no subjectAltNames, default to check of subject Common Name
  124. if peerCertSubj.commonName == self.hostname:
  125. return preverifyOK
  126. else:
  127. log.error('Peer certificate CN %r doesn\'t match the '
  128. 'expected CN %r', peerCertSubj.commonName,
  129. self.hostname)
  130. return False
  131. else:
  132. if peerCertDN == self.certDN:
  133. return preverifyOK
  134. else:
  135. log.error('Peer certificate DN %r doesn\'t match the '
  136. 'expected DN %r', peerCertDN, self.certDN)
  137. return False
  138. else:
  139. return preverifyOK
  140. @classmethod
  141. def _get_subj_alt_name(cls, peer_cert):
  142. '''Extract subjectAltName DNS name settings from certificate extensions
  143. @param peer_cert: peer certificate in SSL connection. subjectAltName
  144. settings if any will be extracted from this
  145. @type peer_cert: OpenSSL.crypto.X509
  146. '''
  147. # Search through extensions
  148. dns_name = []
  149. general_names = SubjectAltName()
  150. for i in range(peer_cert.get_extension_count()):
  151. ext = peer_cert.get_extension(i)
  152. ext_name = ext.get_short_name()
  153. if ext_name == cls.SUBJ_ALT_NAME_EXT_NAME:
  154. # PyOpenSSL returns extension data in ASN.1 encoded form
  155. ext_dat = ext.get_data()
  156. decoded_dat = der_decoder.decode(ext_dat,
  157. asn1Spec=general_names)
  158. for name in decoded_dat:
  159. if isinstance(name, SubjectAltName):
  160. for entry in range(len(name)):
  161. component = name.getComponentByPosition(entry)
  162. dns_name.append(str(component.getComponent()))
  163. return dns_name
  164. def _getCertDN(self):
  165. return self.__certDN
  166. def _setCertDN(self, val):
  167. if isinstance(val, basestring):
  168. # Allow for quoted DN
  169. certDN = val.strip('"')
  170. dnFields = self.__class__.PARSER_RE.split(certDN)
  171. if len(dnFields) < 2:
  172. raise TypeError('Error parsing DN string: "%s"' % certDN)
  173. self.__certDN = zip(dnFields[1::2], dnFields[2::2])
  174. self.__certDN.sort()
  175. elif not isinstance(val, list):
  176. for i in val:
  177. if not len(i) == 2:
  178. raise TypeError('Expecting list of two element DN field, '
  179. 'DN field value pairs for "certDN" '
  180. 'attribute')
  181. self.__certDN = val
  182. else:
  183. raise TypeError('Expecting list or string type for "certDN" '
  184. 'attribute')
  185. certDN = property(fget=_getCertDN,
  186. fset=_setCertDN,
  187. doc="Distinguished Name for Server Certificate")
  188. # Get/Set Property methods
  189. def _getHostname(self):
  190. return self.__hostname
  191. def _setHostname(self, val):
  192. if not isinstance(val, basestring):
  193. raise TypeError("Expecting string type for hostname "
  194. "attribute")
  195. self.__hostname = val
  196. hostname = property(fget=_getHostname,
  197. fset=_setHostname,
  198. doc="hostname of server")