def _create_ssh(self, **kwargs):
try:
ssh = paramiko.SSHClient()
known_hosts_file = kwargs.get('known_hosts_file', None)
if known_hosts_file is None:
ssh.load_system_host_keys()
else:
# Make sure we can open the file for appending first.
# This is needed to create the file when we run CI tests with
# no existing key file.
open(known_hosts_file, 'a').close()
ssh.load_host_keys(known_hosts_file)
missing_key_policy = kwargs.get('missing_key_policy', None)
if missing_key_policy is None:
missing_key_policy = paramiko.AutoAddPolicy()
elif isinstance(missing_key_policy, basestring):
# To make it configurable, allow string to be mapped to object.
if missing_key_policy == paramiko.AutoAddPolicy().__class__.\
__name__:
missing_key_policy = paramiko.AutoAddPolicy()
elif missing_key_policy == paramiko.RejectPolicy().__class__.\
__name__:
missing_key_policy = paramiko.RejectPolicy()
elif missing_key_policy == paramiko.WarningPolicy().__class__.\
__name__:
missing_key_policy = paramiko.WarningPolicy()
else:
raise exceptions.SSHException(
"Invalid missing_key_policy: %s" % missing_key_policy
)
ssh.set_missing_host_key_policy(missing_key_policy)
self.ssh = ssh
except Exception as e:
msg = "Error connecting via ssh: %s" % e
self._logger.error(msg)
raise paramiko.SSHException(msg)
评论列表
文章目录