123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182 |
- """
- """
- __author__ = "P J Kershaw"
- __date__ = "02/06/05"
- __copyright__ = "(C) 2010 Science and Technology Facilities Council"
- __license__ = """BSD - See LICENSE file in top-level directory"""
- __contact__ = "Philip.Kershaw@stfc.ac.uk"
- __revision__ = '$Id: client.py 7928 2011-08-12 13:16:26Z pjkersha $'
- import logging
- log = logging.getLogger(__name__)
- import re
- class ServerSSLCertVerification(object):
- """Check server identity. If hostname doesn't match, allow match of
- host's Distinguished Name against server DN setting"""
- DN_LUT = {
- 'commonName': 'CN',
- 'organisationalUnitName': 'OU',
- 'organisation': 'O',
- 'countryName': 'C',
- 'emailAddress': 'EMAILADDRESS',
- 'localityName': 'L',
- 'stateOrProvinceName': 'ST',
- 'streetAddress': 'STREET',
- 'domainComponent': 'DC',
- 'userid': 'UID'
- }
- PARSER_RE_STR = '/(%s)=' % '|'.join(DN_LUT.keys() + DN_LUT.values())
- PARSER_RE = re.compile(PARSER_RE_STR)
- __slots__ = ('__hostname', '__certDN', '__serverCNPrefixes')
- def __init__(self, certDN=None, hostname=None, serverCNPrefixes=None):
- """Override parent class __init__ to enable setting of certDN
- setting
- @type certDN: string
- @param certDN: Set the expected Distinguished Name of the
- server to avoid errors matching hostnames. This is useful
- where the hostname is not fully qualified
- """
- self.__certDN = None
- self.__hostname = None
- self.__serverCNPrefixes = None
- if certDN is not None:
- self.certDN = certDN
- if hostname is not None:
- self.hostname = hostname
- if serverCNPrefixes is not None:
- self.serverCNPrefixes = serverCNPrefixes
- else:
- self.serverCNPrefixes = ['']
- def __call__(self, connection, peerCert, errorStatus, errorDepth,
- preverifyOK):
- """Verify server certificate
- @type connection: OpenSSL.SSL.Connection
- @param connection: SSL connection object
- @type peerCert: basestring
- @param peerCert: server host certificate as OpenSSL.crypto.X509
- instance
- @type errorStatus: int
- @param errorStatus: error status passed from caller. This is the value
- returned by the OpenSSL C function X509_STORE_CTX_get_error(). Look-up
- x509_vfy.h in the OpenSSL source to get the meanings of the different
- codes. PyOpenSSL doesn't help you!
- @type errorDepth: int
- @param errorDepth: a non-negative integer representing where in the
- certificate chain the error occurred. If it is zero it occured in the
- end entity certificate, one if it is the certificate which signed the
- end entity certificate and so on.
- @type preverifyOK: int
- @param preverifyOK: the error status - 0 = Error, 1 = OK of the current
- SSL context irrespective of any verification checks done here. If this
- function yields an OK status, it should enforce the preverifyOK value
- so that any error set upstream overrides and is honoured.
- @rtype: int
- @return: status code - 0/False = Error, 1/True = OK
- """
- if peerCert.has_expired():
- # Any expired certificate in the chain should result in an error
- log.error('Certificate %r in peer certificate chain has expired',
- peerCert.get_subject())
- return False
- elif errorDepth == 0:
- # Only interested in DN of last certificate in the chain - this must
- # match the expected Server DN setting
- peerCertSubj = peerCert.get_subject()
- peerCertDN = peerCertSubj.get_components()
- peerCertDN.sort()
- if self.certDN is None:
- # Check hostname against peer certificate CN field instead:
- if self.hostname is None:
- log.error('No "hostname" or "certDN" set to check peer '
- 'certificate against')
- return False
- acceptableCNs = [pfx + self.hostname
- for pfx in self.serverCNPrefixes]
- if peerCertSubj.commonName in acceptableCNs:
- return preverifyOK
- else:
- log.error('Peer certificate CN %r doesn\'t match the '
- 'expected CN %r', peerCertSubj.commonName,
- acceptableCNs)
- return False
- else:
- if peerCertDN == self.certDN:
- return preverifyOK
- else:
- log.error('Peer certificate DN %r doesn\'t match the '
- 'expected DN %r', peerCertDN, self.certDN)
- return False
- else:
- return preverifyOK
- def _getCertDN(self):
- return self.__certDN
- def _setCertDN(self, val):
- if isinstance(val, basestring):
- # Allow for quoted DN
- certDN = val.strip('"')
- dnFields = self.__class__.PARSER_RE.split(certDN)
- if len(dnFields) < 2:
- raise TypeError('Error parsing DN string: "%s"' % certDN)
- self.__certDN = zip(dnFields[1::2], dnFields[2::2])
- self.__certDN.sort()
- elif not isinstance(val, list):
- for i in val:
- if not len(i) == 2:
- raise TypeError('Expecting list of two element DN field, '
- 'DN field value pairs for "certDN" '
- 'attribute')
- self.__certDN = val
- else:
- raise TypeError('Expecting list or string type for "certDN" '
- 'attribute')
- certDN = property(fget=_getCertDN,
- fset=_setCertDN,
- doc="Distinguished Name for Server Certificate")
- # Get/Set Property methods
- def _getHostname(self):
- return self.__hostname
- def _setHostname(self, val):
- if not isinstance(val, basestring):
- raise TypeError("Expecting string type for hostname "
- "attribute")
- self.__hostname = val
- hostname = property(fget=_getHostname,
- fset=_setHostname,
- doc="hostname of server")
- def _getServerCNPrefixes(self):
- return self.__serverCNPrefixes
- def _setServerCNPrefixes(self, val):
- if not isinstance(val, list):
- raise TypeError("Expecting string type for ServerCNPrefixes "
- "attribute")
- self.__serverCNPrefixes = val
- serverCNPrefixes = property(fget=_getServerCNPrefixes,
- fset=_setServerCNPrefixes,
- doc="Server CN Prefixes")
|