def __connect(self, wsURL):
'''Connect to the websocket in a thread.'''
self.logger.debug("Starting thread")
ssl_defaults = ssl.get_default_verify_paths()
sslopt_ca_certs = {'ca_certs': ssl_defaults.cafile}
self.ws = websocket.WebSocketApp(wsURL,
on_message=self.__on_message,
on_close=self.__on_close,
on_open=self.__on_open,
on_error=self.__on_error,
header=self.__get_auth()
)
setup_custom_logger('websocket', log_level=settings.LOG_LEVEL)
self.wst = threading.Thread(target=lambda: self.ws.run_forever(sslopt=sslopt_ca_certs))
self.wst.daemon = True
self.wst.start()
self.logger.info("Started thread")
# Wait for connect before continuing
conn_timeout = 5
while (not self.ws.sock or not self.ws.sock.connected) and conn_timeout and not self._error:
sleep(1)
conn_timeout -= 1
if not conn_timeout or self._error:
self.logger.error("Couldn't connect to WS! Exiting.")
self.exit()
sys.exit(1)
python类get_default_verify_paths()的实例源码
def connect(self):
screepsConnection = API(u=self.user,p=self.password,ptr=self.ptr,host=self.host,secure=self.secure)
me = screepsConnection.me()
self.user_id = me['_id']
self.token = screepsConnection.token
if self.logging:
logging.getLogger('websocket').addHandler(logging.StreamHandler())
websocket.enableTrace(True)
else:
logging.getLogger('websocket').addHandler(logging.NullHandler())
websocket.enableTrace(False)
if self.host:
url = 'wss://' if self.secure else 'ws://'
url += self.host + '/socket/websocket'
elif not self.ptr:
url = 'wss://screeps.com/socket/websocket'
else:
url = 'wss://screeps.com/ptr/socket/websocket'
self.ws = websocket.WebSocketApp(url=url,
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close,
on_open=self.on_open)
ssl_defaults = ssl.get_default_verify_paths()
sslopt_ca_certs = {'ca_certs': ssl_defaults.cafile}
if 'http_proxy' in self.settings and self.settings['http_proxy'] is not None:
http_proxy_port = self.settings['http_proxy_port'] if 'http_proxy_port' in self.settings else 8080
self.ws.run_forever(http_proxy_host=self.settings['http_proxy'], http_proxy_port=http_proxy_port, ping_interval=1, sslopt=sslopt_ca_certs)
else:
self.ws.run_forever(ping_interval=1, sslopt=sslopt_ca_certs)
def _configure_ssl(self, properties):
if (not properties or
not self._SSL_PROPS.intersection(set(iter(properties)))):
return None
mode = proton.SSLDomain.MODE_CLIENT
if properties.get('x-ssl-server', properties.get('x-server')):
mode = proton.SSLDomain.MODE_SERVER
identity = properties.get('x-ssl-identity')
ca_file = properties.get('x-ssl-ca-file')
if (not ca_file and properties.get('x-ssl') and
hasattr(ssl, 'get_default_verify_paths')):
ca_file = ssl.get_default_verify_paths().cafile
hostname = properties.get('x-ssl-peer-name',
properties.get('hostname'))
# default to most secure level of certificate validation
if not ca_file:
vdefault = 'no-verify'
elif not hostname:
vdefault = 'verify-cert'
else:
vdefault = 'verify-peer'
vmode = properties.get('x-ssl-verify-mode', vdefault)
try:
vmode = self._VERIFY_MODES[vmode]
except KeyError:
raise proton.SSLException("bad value for x-ssl-verify-mode: '%s'" %
vmode)
if vmode == proton.SSLDomain.VERIFY_PEER_NAME:
if not hostname or not ca_file:
raise proton.SSLException("verify-peer needs x-ssl-peer-name"
" and x-ssl-ca-file")
elif vmode == proton.SSLDomain.VERIFY_PEER:
if not ca_file:
raise proton.SSLException("verify-cert needs x-ssl-ca-file")
# This will throw proton.SSLUnavailable if SSL support is not installed
domain = proton.SSLDomain(mode)
if identity:
# our identity:
domain.set_credentials(identity[0], identity[1], identity[2])
if ca_file:
# how we verify peers:
domain.set_trusted_ca_db(ca_file)
domain.set_peer_authentication(vmode, ca_file)
if mode == proton.SSLDomain.MODE_SERVER:
if properties.get('x-ssl-allow-cleartext'):
domain.allow_unsecured_client()
pn_ssl = proton.SSL(self._pn_transport, domain)
if hostname:
pn_ssl.peer_hostname = hostname
LOG.debug("SSL configured for connection %s", self._name)
return pn_ssl