def inspect(host, port):
try:
r = requests.get('https://%s:%s' % (host, port), timeout=2)
except SSLError as e:
errmsg = str(e)
# CHECK ERR_HOST_NOT_MATCH
if errmsg.startswith('hostname'):
return ERR_HOST_NOT_MATCH
try:
raw_cert = ssl.get_server_certificate((host, port))
except:
return ERR_UNKNOWN
x509 = crypto.load_certificate(crypto.FILETYPE_PEM, raw_cert)
# CHECK ERR_EXPIRED
now = datetime.now()
not_after = datetime.strptime(x509.get_notAfter().decode('utf-8'),
"%Y%m%d%H%M%SZ")
not_before = datetime.strptime(x509.get_notBefore().decode('utf-8'),
"%Y%m%d%H%M%SZ")
if now > not_after or now < not_before:
return ERR_EXPIRED
# otherwise ERR_SELF_SIGNED
return ERR_SELF_SIGNED
except ConnectionError as e:
return ERR_TIMEOUT
except Timeout as e:
return ERR_TIMEOUT
except:
return ERR_UNKNOWN
return ERR_NONE
python类get_server_certificate()的实例源码
def check():
hostname = "your_server.com"
port = 443
num_days = 7
cert = ssl.get_server_certificate(
(hostname, port), ssl_version=ssl.PROTOCOL_TLSv1)
x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, cert)
expiry_date = x509.get_notAfter()
print(str(expiry_date))
assert expiry_date, "Cert doesn't have an expiry date."
ssl_date_fmt = r'%Y%m%d%H%M%SZ'
expires = datetime.datetime.strptime(str(expiry_date)[2:-1], ssl_date_fmt)
remaining = (expires - datetime.datetime.utcnow()).days
if remaining <= num_days:
print("ALERTING!")
else:
print("Not alerting. You have " + str(remaining) + " days")
def get_ssl_certificate(self, ssl_version=None):
"""
Get an OpenSSL cryptographic PEM certificate from the endpoint using the specified SSL version.
:param ssl_version: The SSL version to connect with. If None, then let OpenSSL negotiate which
protocol to use.
:return: A tuple containing (1) the certificate string and (2) an OpenSSL cryptographic PEM
certificate from the endpoint.
"""
try:
if ssl_version is not None:
cert = ssl.get_server_certificate((self.address, self.port), ssl_version=getattr(ssl, ssl_version))
else:
cert = ssl.get_server_certificate((self.address, self.port))
return cert, OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, cert)
except ssl.SSLError as e:
raise SslCertificateRetrievalFailedError(
message="SSL error thrown when retrieving SSL certificate: %s." % (e,)
)
# Protected Methods
# Private Methods
def _certificate_required(cls, hostname, port=XCLI_DEFAULT_PORT,
ca_certs=None, validate=None):
'''
returns true if connection should verify certificate
'''
if not ca_certs:
return False
xlog.debug("CONNECT SSL %s:%s, cert_file=%s",
hostname, port, ca_certs)
certificate = ssl.get_server_certificate((hostname, port),
ca_certs=None)
# handle XIV pre-defined certifications
# if a validation function was given - we let the user check
# the certificate himself, with the user's own validate function.
# if the validate returned True - the user checked the cert
# and we don't need check it, so we return false.
if validate:
return not validate(certificate)
return True
def _get_cert_from_endpoint(server, port=443):
try:
cert = ssl.get_server_certificate((server, port))
except Exception:
log.error('Unable to retrieve certificate from {0}'.format(server))
cert = None
if not cert:
return None
return cert
def _fetch_and_store_cert(self, server, port, path):
"""Grabs a certificate from a server and writes it to a given path."""
try:
cert = ssl.get_server_certificate((server, port))
except Exception as e:
raise cfg.Error(_('Could not retrieve initial '
'certificate from controller %(server)s. '
'Error details: %(error)s') %
{'server': server, 'error': str(e)})
LOG.warning(_LW("Storing to certificate for host %(server)s "
"at %(path)s") % {'server': server,
'path': path})
self._file_put_contents(path, cert)
return cert
def save_certificate(self):
port = self.url.port if self.url.port else 443
adr = (self.url.hostname, port)
if self.debug: print("save_certificate of %s to %s" % (str(adr), self.cafile))
capath = os.path.dirname(self.cafile)
if not os.path.exists(capath): os.makedirs(capath)
cert = ssl.get_server_certificate(adr)
with open(self.cafile, "w") as outfile:
outfile.write(cert)
def get_vc_fingerprint(vcenter_ip):
cert_pem = ssl.get_server_certificate((vcenter_ip, VCENTER_PORT),
ssl_version=ssl.PROTOCOL_TLSv1)
x509 = X509.load_cert_string(cert_pem, X509.FORMAT_PEM)
fp = x509.get_fingerprint('sha1')
return ':'.join(a+b for a, b in zip(fp[::2], fp[1::2]))
def process(self, profile, state, vertex):
if 'type' not in vertex:
return { 'error' : "No vertex type defined!" }
properties = dict()
if vertex['type'] == "domain":
try:
begin = time.time()
properties['content'] = ssl.get_server_certificate((vertex['id'], self.ssl_port))
properties['time'] = time.time() - begin
except Exception as e:
properties['error'] = str(e)
return { "properties": properties, "neighbors" : [] }
def check_certificate(self, domain):
"""
Download and get information from the TLS certificate
"""
pem = ssl.get_server_certificate((domain, 443))
if self.output:
with open(os.path.join(self.output, 'cert.pem'), 'wb') as f:
f.write(pem)
cert = x509.load_pem_x509_certificate(str(pem), default_backend())
self.log.critical("\tCertificate:")
self.log.critical("\t\tDomain: %s", ",".join(map(lambda x: x.value, cert.subject)))
self.log.critical("\t\tNot After: %s", str(cert.not_valid_after))
self.log.critical("\t\tNot Before: %s", str(cert.not_valid_before))
self.log.critical("\t\tCA Issuer: %s", ", ".join(map(lambda x:x.value, cert.issuer)))
self.log.critical("\t\tSerial: %s", cert.serial_number)
for ext in cert.extensions:
if ext.oid._name == 'basicConstraints':
if ext.value.ca:
self.log.critical("\t\tBasic Constraints: True")
elif ext.oid._name == 'subjectAltName':
self.log.critical("\t\tAlternate names: %s", ", ".join(ext.value.get_values_for_type(x509.DNSName)))
def _get_cert_from_endpoint(server, port=443):
try:
cert = ssl.get_server_certificate((server, port))
except Exception:
log.error('Unable to retrieve certificate from {0}'.format(server))
cert = None
if not cert:
return None
return cert
def _get_cert_from_endpoint(server, port=443):
try:
cert = ssl.get_server_certificate((server, port))
except Exception:
log.error('Unable to retrieve certificate from {0}'.format(server))
cert = None
if not cert:
return None
return cert
def tls(ctx, domain_name):
"""Get the TLS certificate for a domain.
Example:
$ lokey fetch tls gliderlabs.com
"""
try:
cert = stdlib_ssl.get_server_certificate((domain_name, 443))
click.echo(cert)
except:
msg =("Unable to fetch key from {}, "
"is that domain configured for TLS?").format(domain_name)
raise click.ClickException(msg)
def get_server_certificate(*args, **kwargs):
return await run_in_thread(partial(_ssl.get_server_certificate, *args, **kwargs))
# Small wrapper class to make sure the wrap_socket() method returns the right type
def onHtml(self, event):
fila = self.listadoVM
for i in range(len(fila)):
if logger != None: logger.info(fila[i])
# El tercer elemento es la ip y el 9 es el UUID
vm = conexion.searchIndex.FindByUuid(None,fila[8], True)
vm_name = vm.summary.config.name
vm_moid = vm._moId
if logger != None: logger.info('void= '.format(vm_moid))
vcenter_data = conexion.setting
vcenter_settings = vcenter_data.setting
console_port = '9443'
puerto_vcenter= '443'
for item in vcenter_settings:
key = getattr(item, 'key')
#print ('key: ' + key + ' =>'+ str(getattr(item, 'value')))
if key == 'VirtualCenter.FQDN':
vcenter_fqdn = getattr(item, 'value')
#if key == 'WebService.Ports.https':
#console_port = str(getattr(item, 'value'))
host = vcenter_fqdn
session_manager = conexion.sessionManager
session = session_manager.AcquireCloneTicket()
vc_cert = ssl.get_server_certificate((host, int(puerto_vcenter)))
vc_pem = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,vc_cert)
vc_fingerprint = vc_pem.digest('sha1')
if logger != None: logger.info("Open the following URL in your browser to access the " \
"Remote Console.\n" \
"You have 60 seconds to open the URL, or the session" \
"will be terminated.\n")
print(str(vcenter_data))
# Locate the version of vcenter the object .version for locate the version of vcenter
object_about = conexion.about
#For version vcenter 5.5
if object_about.version == '5.5.0':
console_portv5 = '7331'
URL5 = "http://" + host + ":" + console_portv5 + "/console/?vmId=" \
+ str(vm_moid) + "&vmName=" + vm_name + "&host=" + vcenter_fqdn \
+ "&sessionTicket=" + session + "&thumbprint=" + vc_fingerprint.decode('utf8')
webbrowser.open(URL5, new=1, autoraise=True)
#For version vcenter 6.0 and 6.5
if object_about.version == '6.0.0' or object_about.version == '6.5.0':
URL = "https://" + host + ":" + console_port + "/vsphere-client/webconsole.html?vmId=" \
+ str(vm_moid) + "&vmName=" + vm_name + "&host=" + vcenter_fqdn \
+ "&sessionTicket=" + session + "&thumbprint.info=" + vc_fingerprint.decode('utf-8')
if logger != None: logger.info(URL)
webbrowser.open(URL, new=1, autoraise=True)
if logger != None: logger.info ("Waiting for 60 seconds, then exit")