def options(self, *authorities):
"""
Behaves like L{twisted.internet.ssl.PrivateCertificate.options}().
"""
if not self.client:
# do some crud with sslverify to generate a temporary self-signed
# certificate. This is SLOOOWWWWW so it is only in the absolute
# worst, most naive case.
# We have to do this because OpenSSL will not let both the server
# and client be anonymous.
sharedDN = DN(CN='TEMPORARY CERTIFICATE')
key = KeyPair.generate()
cr = key.certificateRequest(sharedDN)
sscrd = key.signCertificateRequest(sharedDN, cr, lambda dn: True, 1)
cert = key.newCertificate(sscrd)
return cert.options(*authorities)
options = dict()
if authorities:
options.update(dict(verify=True,
requireCertificate=True,
caCerts=[auth.original for auth in authorities]))
occo = CertificateOptions(**options)
return occo
python类CertificateOptions()的实例源码
def options(self, *authorities):
"""
Behaves like L{twisted.internet.ssl.PrivateCertificate.options}().
"""
if not self.client:
# do some crud with sslverify to generate a temporary self-signed
# certificate. This is SLOOOWWWWW so it is only in the absolute
# worst, most naive case.
# We have to do this because OpenSSL will not let both the server
# and client be anonymous.
sharedDN = DN(CN='TEMPORARY CERTIFICATE')
key = KeyPair.generate()
cr = key.certificateRequest(sharedDN)
sscrd = key.signCertificateRequest(sharedDN, cr, lambda dn: True, 1)
cert = key.newCertificate(sscrd)
return cert.options(*authorities)
options = dict()
if authorities:
options.update(dict(verify=True,
requireCertificate=True,
caCerts=[auth.original for auth in authorities]))
occo = CertificateOptions(**options)
return occo
def _check_native_endpoint(self, endpoint):
if IStreamClientEndpoint.providedBy(endpoint):
pass
elif isinstance(endpoint, dict):
if u'tls' in endpoint:
tls = endpoint[u'tls']
if isinstance(tls, (dict, bool)):
pass
elif IOpenSSLClientConnectionCreator.providedBy(tls):
pass
elif isinstance(tls, CertificateOptions):
pass
else:
raise ValueError(
"'tls' configuration must be a dict, CertificateOptions or"
" IOpenSSLClientConnectionCreator provider"
)
else:
raise ValueError(
"'endpoint' configuration must be a dict or IStreamClientEndpoint"
" provider"
)
def onProceed(self, obj):
"""
Proceed with TLS negotiation and reset the XML stream.
"""
self.xmlstream.removeObserver('/failure', self.onFailure)
ctx = ssl.CertificateOptions()
self.xmlstream.transport.startTLS(ctx)
self.xmlstream.reset()
self.xmlstream.sendHeader()
self._deferred.callback(Reset)
def createServer(self, address, portNumber, factory):
contextFactory = ssl.CertificateOptions()
return reactor.listenSSL(
portNumber, factory, contextFactory, interface=address)
def connectClient(self, address, portNumber, clientCreator):
contextFactory = ssl.CertificateOptions()
return clientCreator.connectSSL(address, portNumber, contextFactory)
def onProceed(self, obj):
"""
Proceed with TLS negotiation and reset the XML stream.
"""
self.xmlstream.removeObserver('/failure', self.onFailure)
ctx = ssl.CertificateOptions()
self.xmlstream.transport.startTLS(ctx)
self.xmlstream.reset()
self.xmlstream.sendHeader()
self._deferred.callback(Reset)
def createServer(self, address, portNumber, factory):
contextFactory = ssl.CertificateOptions()
return reactor.listenSSL(
portNumber, factory, contextFactory, interface=address)
def connectClient(self, address, portNumber, clientCreator):
contextFactory = ssl.CertificateOptions()
return clientCreator.connectSSL(address, portNumber, contextFactory)
def onProceed(self, obj):
"""
Proceed with TLS negotiation and reset the XML stream.
"""
self.xmlstream.removeObserver('/failure', self.onFailure)
ctx = ssl.CertificateOptions()
self.xmlstream.transport.startTLS(ctx)
self.xmlstream.reset()
self.xmlstream.sendHeader()
self._deferred.callback(Reset)
def _parseClientSSLOptions(kwargs):
"""
Parse common arguments for SSL endpoints, creating an L{CertificateOptions}
instance.
@param kwargs: A dict of keyword arguments to be parsed, potentially
containing keys C{certKey}, C{privateKey}, C{caCertsDir}, and
C{hostname}. See L{_parseClientSSL}.
@type kwargs: L{dict}
@return: The remaining arguments, including a new key C{sslContextFactory}.
"""
hostname = kwargs.pop('hostname', None)
clientCertificate = _privateCertFromPaths(kwargs.pop('certKey', None),
kwargs.pop('privateKey', None))
trustRoot = _parseTrustRootPath(kwargs.pop('caCertsDir', None))
if hostname is not None:
configuration = optionsForClientTLS(
_idnaText(hostname), trustRoot=trustRoot,
clientCertificate=clientCertificate
)
else:
# _really_ though, you should specify a hostname.
if clientCertificate is not None:
privateKeyOpenSSL = clientCertificate.privateKey.original
certificateOpenSSL = clientCertificate.original
else:
privateKeyOpenSSL = None
certificateOpenSSL = None
configuration = CertificateOptions(
trustRoot=trustRoot,
privateKey=privateKeyOpenSSL,
certificate=certificateOpenSSL,
)
kwargs['sslContextFactory'] = configuration
return kwargs
def connectClient(self, address, portNumber, clientCreator):
"""
Create an SSL client using L{IReactorSSL.connectSSL}.
"""
contextFactory = ssl.CertificateOptions()
return clientCreator.connectSSL(address, portNumber, contextFactory)
def start_responding(self, server_name, challenge, response):
"""
Put a context into the mapping.
"""
server_name = response.z_domain.decode('ascii')
cert, pkey = generate_tls_sni_01_cert(
server_name, _generate_private_key=self._generate_private_key)
server_name = server_name.encode('utf-8')
self._challenge_options[server_name] = CertificateOptions(
certificate=cert_cryptography_to_pyopenssl(cert),
privateKey=key_cryptography_to_pyopenssl(pkey))
def __invariant__(self):
certs = list(
ssl.Certificate.loadPEM(cert.as_bytes())
for cert
in self.chain.certificates
)
key = ssl.KeyPair.load(self.key.as_bytes(), FILETYPE_PEM)
# Invoke CertificateOptions' key/certificate match checking logic.
ssl.CertificateOptions(
privateKey=key.original,
certificate=certs[0].original,
extraCertChain=list(cert.original for cert in certs[1:]),
).getContext()
return (True, "")