def from_der(klass, der):
pem = ssl.DER_cert_to_PEM_cert(der)
return klass.from_pem(pem)
python类DER_cert_to_PEM_cert()的实例源码
def from_der(klass, der):
pem = ssl.DER_cert_to_PEM_cert(der)
return klass.from_pem(pem)
def from_der(klass, der):
pem = ssl.DER_cert_to_PEM_cert(der)
return klass.from_pem(pem)
def _read_ssl_default_ca_certs():
# it's not guaranteed to return PEM formatted certs when `binary_form` is False
der_certs = ssl.create_default_context().get_ca_certs(binary_form=True)
pem_certs = [ssl.DER_cert_to_PEM_cert(der_cert_bytes) for der_cert_bytes in der_certs]
return '\n'.join(pem_certs)
def _read_ssl_default_ca_certs():
# it's not guaranteed to return PEM formatted certs when `binary_form` is False
der_certs = ssl.create_default_context().get_ca_certs(binary_form=True)
pem_certs = [ssl.DER_cert_to_PEM_cert(der_cert_bytes) for der_cert_bytes in der_certs]
return '\n'.join(pem_certs)
def _read_ssl_default_ca_certs():
# it's not guaranteed to return PEM formatted certs when `binary_form` is False
der_certs = ssl.create_default_context().get_ca_certs(binary_form=True)
pem_certs = [ssl.DER_cert_to_PEM_cert(der_cert_bytes) for der_cert_bytes in der_certs]
return '\n'.join(pem_certs)
def from_der(klass, der):
pem = ssl.DER_cert_to_PEM_cert(der)
return klass.from_pem(pem)
def from_der(klass, der):
pem = ssl.DER_cert_to_PEM_cert(der)
return klass.from_pem(pem)
def save_PEMfile(self, certificate_path):
"""Save a certificate to a file in PEM format
"""
self._filepath = certificate_path
# convert to text (PEM format)
PEMdata = ssl.DER_cert_to_PEM_cert(self._data)
with open(self._filepath, "w") as output_file:
output_file.write(PEMdata)
def connect(self):
if len(self._sslArgs) == 0:
# No override
http_client.HTTPSConnection.connect(self)
return
# Big hack. We have to copy and paste the httplib connect fn for
# each python version in order to handle extra ssl paramters. Yuk!
if hasattr(self, "source_address"):
# Python 2.7
sock = socket.create_connection((self.host, self.port),
self.timeout, self.source_address)
if self._tunnel_host:
self.sock = sock
self._tunnel()
self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file,
**self._sslArgs)
elif hasattr(self, "timeout"):
# Python 2.6
sock = socket.create_connection((self.host, self.port), self.timeout)
self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file,
**self._sslArgs)
else:
# Unknown python version. Do nothing
http_client.HTTPSConnection.connect(self)
return
# TODO: Additional verification of peer cert if needed
# cert_reqs = self._sslArgs.get("cert_reqs", ssl.CERT_NONE)
# ca_certs = self._sslArgs.get("ca_certs", None)
# if cert_reqs != ssl.CERT_NONE and ca_certs:
# if hasattr(self.sock, "getpeercert"):
# # TODO: verify peer cert
# dercert = self.sock.getpeercert(False)
# # pemcert = ssl.DER_cert_to_PEM_cert(dercert)
## Stand-in for the HTTPSConnection class that will connect to a proxy and
## issue a CONNECT command to start an SSL tunnel.
def from_der(klass, der):
pem = ssl.DER_cert_to_PEM_cert(der)
return klass.from_pem(pem)
def from_der(klass, der):
pem = ssl.DER_cert_to_PEM_cert(der)
return klass.from_pem(pem)
def from_der(klass, der):
pem = ssl.DER_cert_to_PEM_cert(der)
return klass.from_pem(pem)
def save_PEMfile(self, certificate_path):
"""Save a certificate to a file in PEM format
"""
self._filepath = certificate_path
# convert to text (PEM format)
PEMdata = ssl.DER_cert_to_PEM_cert(self._data)
with open(self._filepath, "w") as output_file:
output_file.write(PEMdata)
def handleEvent(self, event):
eventName = event.eventType
srcModuleName = event.module
eventData = event.data
self.sf.debug("Received event, " + eventName + ", from " + srcModuleName)
if eventName == "LINKED_URL_INTERNAL":
fqdn = self.sf.urlFQDN(eventData.lower())
else:
fqdn = eventData
if fqdn not in self.results:
self.results[fqdn] = True
else:
return None
if not eventData.lower().startswith("https://") and not self.opts['tryhttp']:
return None
self.sf.debug("Testing SSL for: " + eventData)
# Re-fetch the certificate from the site and process
try:
s = socket.socket()
s.settimeout(int(self.opts['ssltimeout']))
s.connect((fqdn, 443))
sock = ssl.wrap_socket(s)
sock.do_handshake()
rawcert = sock.getpeercert(True)
cert = ssl.DER_cert_to_PEM_cert(rawcert)
m2cert = M2Crypto.X509.load_cert_string(str(cert).replace('\r', ''))
except BaseException as x:
self.sf.info("Unable to SSL-connect to " + fqdn + ": " + str(x))
return None
# Generate the event for the raw cert (in text form)
# Cert raw data text contains a lot of gems..
rawevt = SpiderFootEvent("SSL_CERTIFICATE_RAW",
m2cert.as_text().encode('raw_unicode_escape'),
self.__name__, event)
self.notifyListeners(rawevt)
# Generate events for other cert aspects
self.getIssued(m2cert, event)
self.getIssuer(m2cert, event)
self.checkHostMatch(m2cert, fqdn, event)
self.checkExpiry(m2cert, event)
# Report back who the certificate was issued to
def build_environ(self, sock_file, conn):
""" Build the execution environment. """
# Grab the request line
request = self.read_request_line(sock_file)
# Copy the Base Environment
environ = self.base_environ.copy()
# Grab the headers
for k, v in iteritems(self.read_headers(sock_file)):
environ[str('HTTP_' + k)] = v
# Add CGI Variables
environ['REQUEST_METHOD'] = request['method']
environ['PATH_INFO'] = request['path']
environ['SERVER_PROTOCOL'] = request['protocol']
environ['SERVER_PORT'] = str(conn.server_port)
environ['REMOTE_PORT'] = str(conn.client_port)
environ['REMOTE_ADDR'] = str(conn.client_addr)
environ['QUERY_STRING'] = request['query_string']
if 'HTTP_CONTENT_LENGTH' in environ:
environ['CONTENT_LENGTH'] = environ['HTTP_CONTENT_LENGTH']
if 'HTTP_CONTENT_TYPE' in environ:
environ['CONTENT_TYPE'] = environ['HTTP_CONTENT_TYPE']
# Save the request method for later
self.request_method = environ['REQUEST_METHOD']
# Add Dynamic WSGI Variables
if conn.ssl:
environ['wsgi.url_scheme'] = 'https'
environ['HTTPS'] = 'on'
try:
peercert = conn.socket.getpeercert(binary_form=True)
environ['SSL_CLIENT_RAW_CERT'] = \
peercert and to_native(ssl.DER_cert_to_PEM_cert(peercert))
except Exception:
print(sys.exc_info()[1])
else:
environ['wsgi.url_scheme'] = 'http'
if environ.get('HTTP_TRANSFER_ENCODING', '') == 'chunked':
environ['wsgi.input'] = ChunkedReader(sock_file)
else:
environ['wsgi.input'] = sock_file
return environ
def build_environ(self, sock_file, conn):
""" Build the execution environment. """
# Grab the request line
request = self.read_request_line(sock_file)
# Copy the Base Environment
environ = self.base_environ.copy()
# Grab the headers
for k, v in self.read_headers(sock_file).iteritems():
environ[str('HTTP_' + k)] = v
# Add CGI Variables
environ['REQUEST_METHOD'] = request['method']
environ['PATH_INFO'] = request['path']
environ['SERVER_PROTOCOL'] = request['protocol']
environ['SERVER_PORT'] = str(conn.server_port)
environ['REMOTE_PORT'] = str(conn.client_port)
environ['REMOTE_ADDR'] = str(conn.client_addr)
environ['QUERY_STRING'] = request['query_string']
if 'HTTP_CONTENT_LENGTH' in environ:
environ['CONTENT_LENGTH'] = environ['HTTP_CONTENT_LENGTH']
if 'HTTP_CONTENT_TYPE' in environ:
environ['CONTENT_TYPE'] = environ['HTTP_CONTENT_TYPE']
# Save the request method for later
self.request_method = environ['REQUEST_METHOD']
# Add Dynamic WSGI Variables
if conn.ssl:
environ['wsgi.url_scheme'] = 'https'
environ['HTTPS'] = 'on'
try:
peercert = conn.socket.getpeercert(binary_form=True)
environ['SSL_CLIENT_RAW_CERT'] = \
peercert and ssl.DER_cert_to_PEM_cert(peercert)
except Exception:
print sys.exc_info()[1]
else:
environ['wsgi.url_scheme'] = 'http'
if environ.get('HTTP_TRANSFER_ENCODING', '') == 'chunked':
environ['wsgi.input'] = ChunkedReader(sock_file)
else:
environ['wsgi.input'] = sock_file
return environ
def build_environ(self, sock_file, conn):
""" Build the execution environment. """
# Grab the request line
request = self.read_request_line(sock_file)
# Copy the Base Environment
environ = self.base_environ.copy()
# Grab the headers
for k, v in self.read_headers(sock_file).iteritems():
environ[str('HTTP_' + k)] = v
# Add CGI Variables
environ['REQUEST_METHOD'] = request['method']
environ['PATH_INFO'] = request['path']
environ['SERVER_PROTOCOL'] = request['protocol']
environ['SERVER_PORT'] = str(conn.server_port)
environ['REMOTE_PORT'] = str(conn.client_port)
environ['REMOTE_ADDR'] = str(conn.client_addr)
environ['QUERY_STRING'] = request['query_string']
if 'HTTP_CONTENT_LENGTH' in environ:
environ['CONTENT_LENGTH'] = environ['HTTP_CONTENT_LENGTH']
if 'HTTP_CONTENT_TYPE' in environ:
environ['CONTENT_TYPE'] = environ['HTTP_CONTENT_TYPE']
# Save the request method for later
self.request_method = environ['REQUEST_METHOD']
# Add Dynamic WSGI Variables
if conn.ssl:
environ['wsgi.url_scheme'] = 'https'
environ['HTTPS'] = 'on'
try:
peercert = conn.socket.getpeercert(binary_form=True)
environ['SSL_CLIENT_RAW_CERT'] = \
peercert and ssl.DER_cert_to_PEM_cert(peercert)
except Exception:
print sys.exc_info()[1]
else:
environ['wsgi.url_scheme'] = 'http'
if environ.get('HTTP_TRANSFER_ENCODING', '') == 'chunked':
environ['wsgi.input'] = ChunkedReader(sock_file)
else:
environ['wsgi.input'] = sock_file
return environ
def build_environ(self, sock_file, conn):
""" Build the execution environment. """
# Grab the request line
request = self.read_request_line(sock_file)
# Copy the Base Environment
environ = self.base_environ.copy()
# Grab the headers
for k, v in iteritems(self.read_headers(sock_file)):
environ[str('HTTP_' + k)] = v
# Add CGI Variables
environ['REQUEST_METHOD'] = request['method']
environ['PATH_INFO'] = request['path']
environ['SERVER_PROTOCOL'] = request['protocol']
environ['SERVER_PORT'] = str(conn.server_port)
environ['REMOTE_PORT'] = str(conn.client_port)
environ['REMOTE_ADDR'] = str(conn.client_addr)
environ['QUERY_STRING'] = request['query_string']
if 'HTTP_CONTENT_LENGTH' in environ:
environ['CONTENT_LENGTH'] = environ['HTTP_CONTENT_LENGTH']
if 'HTTP_CONTENT_TYPE' in environ:
environ['CONTENT_TYPE'] = environ['HTTP_CONTENT_TYPE']
# Save the request method for later
self.request_method = environ['REQUEST_METHOD']
# Add Dynamic WSGI Variables
if conn.ssl:
environ['wsgi.url_scheme'] = 'https'
environ['HTTPS'] = 'on'
try:
peercert = conn.socket.getpeercert(binary_form=True)
environ['SSL_CLIENT_RAW_CERT'] = \
peercert and to_native(ssl.DER_cert_to_PEM_cert(peercert))
except Exception:
print(sys.exc_info()[1])
else:
environ['wsgi.url_scheme'] = 'http'
if environ.get('HTTP_TRANSFER_ENCODING', '') == 'chunked':
environ['wsgi.input'] = ChunkedReader(sock_file)
else:
environ['wsgi.input'] = sock_file
return environ
def build_environ(self, sock_file, conn):
""" Build the execution environment. """
# Grab the request line
request = self.read_request_line(sock_file)
# Copy the Base Environment
environ = self.base_environ.copy()
# Grab the headers
for k, v in self.read_headers(sock_file).iteritems():
environ[str('HTTP_' + k)] = v
# Add CGI Variables
environ['REQUEST_METHOD'] = request['method']
environ['PATH_INFO'] = request['path']
environ['SERVER_PROTOCOL'] = request['protocol']
environ['SERVER_PORT'] = str(conn.server_port)
environ['REMOTE_PORT'] = str(conn.client_port)
environ['REMOTE_ADDR'] = str(conn.client_addr)
environ['QUERY_STRING'] = request['query_string']
if 'HTTP_CONTENT_LENGTH' in environ:
environ['CONTENT_LENGTH'] = environ['HTTP_CONTENT_LENGTH']
if 'HTTP_CONTENT_TYPE' in environ:
environ['CONTENT_TYPE'] = environ['HTTP_CONTENT_TYPE']
# Save the request method for later
self.request_method = environ['REQUEST_METHOD']
# Add Dynamic WSGI Variables
if conn.ssl:
environ['wsgi.url_scheme'] = 'https'
environ['HTTPS'] = 'on'
try:
peercert = conn.socket.getpeercert(binary_form=True)
environ['SSL_CLIENT_RAW_CERT'] = \
peercert and ssl.DER_cert_to_PEM_cert(peercert)
except Exception:
print sys.exc_info()[1]
else:
environ['wsgi.url_scheme'] = 'http'
if environ.get('HTTP_TRANSFER_ENCODING', '') == 'chunked':
environ['wsgi.input'] = ChunkedReader(sock_file)
else:
environ['wsgi.input'] = sock_file
return environ
def build_environ(self, sock_file, conn):
""" Build the execution environment. """
# Grab the request line
request = self.read_request_line(sock_file)
# Copy the Base Environment
environ = self.base_environ.copy()
# Grab the headers
for k, v in self.read_headers(sock_file).iteritems():
environ[str('HTTP_' + k)] = v
# Add CGI Variables
environ['REQUEST_METHOD'] = request['method']
environ['PATH_INFO'] = request['path']
environ['SERVER_PROTOCOL'] = request['protocol']
environ['SERVER_PORT'] = str(conn.server_port)
environ['REMOTE_PORT'] = str(conn.client_port)
environ['REMOTE_ADDR'] = str(conn.client_addr)
environ['QUERY_STRING'] = request['query_string']
if 'HTTP_CONTENT_LENGTH' in environ:
environ['CONTENT_LENGTH'] = environ['HTTP_CONTENT_LENGTH']
if 'HTTP_CONTENT_TYPE' in environ:
environ['CONTENT_TYPE'] = environ['HTTP_CONTENT_TYPE']
# Save the request method for later
self.request_method = environ['REQUEST_METHOD']
# Add Dynamic WSGI Variables
if conn.ssl:
environ['wsgi.url_scheme'] = 'https'
environ['HTTPS'] = 'on'
try:
peercert = conn.socket.getpeercert(binary_form=True)
environ['SSL_CLIENT_RAW_CERT'] = \
peercert and ssl.DER_cert_to_PEM_cert(peercert)
except Exception:
print sys.exc_info()[1]
else:
environ['wsgi.url_scheme'] = 'http'
if environ.get('HTTP_TRANSFER_ENCODING', '') == 'chunked':
environ['wsgi.input'] = ChunkedReader(sock_file)
else:
environ['wsgi.input'] = sock_file
return environ
def build_environ(self, sock_file, conn):
""" Build the execution environment. """
# Grab the request line
request = self.read_request_line(sock_file)
# Copy the Base Environment
environ = self.base_environ.copy()
# Grab the headers
for k, v in self.read_headers(sock_file).iteritems():
environ[str('HTTP_' + k)] = v
# Add CGI Variables
environ['REQUEST_METHOD'] = request['method']
environ['PATH_INFO'] = request['path']
environ['SERVER_PROTOCOL'] = request['protocol']
environ['SERVER_PORT'] = str(conn.server_port)
environ['REMOTE_PORT'] = str(conn.client_port)
environ['REMOTE_ADDR'] = str(conn.client_addr)
environ['QUERY_STRING'] = request['query_string']
if 'HTTP_CONTENT_LENGTH' in environ:
environ['CONTENT_LENGTH'] = environ['HTTP_CONTENT_LENGTH']
if 'HTTP_CONTENT_TYPE' in environ:
environ['CONTENT_TYPE'] = environ['HTTP_CONTENT_TYPE']
# Save the request method for later
self.request_method = environ['REQUEST_METHOD']
# Add Dynamic WSGI Variables
if conn.ssl:
environ['wsgi.url_scheme'] = 'https'
environ['HTTPS'] = 'on'
try:
peercert = conn.socket.getpeercert(binary_form=True)
environ['SSL_CLIENT_RAW_CERT'] = \
peercert and ssl.DER_cert_to_PEM_cert(peercert)
except Exception:
print sys.exc_info()[1]
else:
environ['wsgi.url_scheme'] = 'http'
if environ.get('HTTP_TRANSFER_ENCODING', '') == 'chunked':
environ['wsgi.input'] = ChunkedReader(sock_file)
else:
environ['wsgi.input'] = sock_file
return environ
def handleEvent(self, event):
eventName = event.eventType
srcModuleName = event.module
eventData = event.data
self.sf.debug("Received event, " + eventName + ", from " + srcModuleName)
if eventName == "LINKED_URL_INTERNAL":
fqdn = self.sf.urlFQDN(eventData.lower())
else:
fqdn = eventData
if fqdn not in self.results:
self.results[fqdn] = True
else:
return None
if not eventData.lower().startswith("https://") and not self.opts['tryhttp']:
return None
self.sf.debug("Testing SSL for: " + eventData)
# Re-fetch the certificate from the site and process
try:
s = socket.socket()
s.settimeout(int(self.opts['ssltimeout']))
s.connect((fqdn, 443))
sock = ssl.wrap_socket(s)
sock.do_handshake()
rawcert = sock.getpeercert(True)
cert = ssl.DER_cert_to_PEM_cert(rawcert)
m2cert = M2Crypto.X509.load_cert_string(str(cert).replace('\r', ''))
except BaseException as x:
self.sf.info("Unable to SSL-connect to " + fqdn + ": " + str(x))
return None
# Generate the event for the raw cert (in text form)
# Cert raw data text contains a lot of gems..
rawevt = SpiderFootEvent("SSL_CERTIFICATE_RAW",
m2cert.as_text().encode('raw_unicode_escape'),
self.__name__, event)
self.notifyListeners(rawevt)
# Generate events for other cert aspects
self.getIssued(m2cert, event)
self.getIssuer(m2cert, event)
self.checkHostMatch(m2cert, fqdn, event)
self.checkExpiry(m2cert, event)
# Report back who the certificate was issued to
def build_environ(self, sock_file, conn):
""" Build the execution environment. """
# Grab the request line
request = self.read_request_line(sock_file)
# Copy the Base Environment
environ = self.base_environ.copy()
# Grab the headers
for k, v in self.read_headers(sock_file).iteritems():
environ[str('HTTP_' + k)] = v
# Add CGI Variables
environ['REQUEST_METHOD'] = request['method']
environ['PATH_INFO'] = request['path']
environ['SERVER_PROTOCOL'] = request['protocol']
environ['SERVER_PORT'] = str(conn.server_port)
environ['REMOTE_PORT'] = str(conn.client_port)
environ['REMOTE_ADDR'] = str(conn.client_addr)
environ['QUERY_STRING'] = request['query_string']
if 'HTTP_CONTENT_LENGTH' in environ:
environ['CONTENT_LENGTH'] = environ['HTTP_CONTENT_LENGTH']
if 'HTTP_CONTENT_TYPE' in environ:
environ['CONTENT_TYPE'] = environ['HTTP_CONTENT_TYPE']
# Save the request method for later
self.request_method = environ['REQUEST_METHOD']
# Add Dynamic WSGI Variables
if conn.ssl:
environ['wsgi.url_scheme'] = 'https'
environ['HTTPS'] = 'on'
try:
peercert = conn.socket.getpeercert(binary_form=True)
environ['SSL_CLIENT_RAW_CERT'] = \
peercert and ssl.DER_cert_to_PEM_cert(peercert)
except Exception:
print sys.exc_info()[1]
else:
environ['wsgi.url_scheme'] = 'http'
if environ.get('HTTP_TRANSFER_ENCODING', '') == 'chunked':
environ['wsgi.input'] = ChunkedReader(sock_file)
else:
environ['wsgi.input'] = sock_file
return environ