def secure_copy(user, host, src, dest, key_filename=None, allow_agent=True):
keys = _load_keys(key_filename, allow_agent)
pkey = keys[0]
ssh = paramiko.SSHClient()
proxy = None
ssh_config_file = os.path.expanduser("~/.ssh/config")
if os.path.exists(ssh_config_file):
conf = paramiko.SSHConfig()
with open(ssh_config_file) as f:
conf.parse(f)
host_config = conf.lookup(host)
if 'proxycommand' in host_config:
proxy = paramiko.ProxyCommand(host_config['proxycommand'])
ssh.load_system_host_keys()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(host, username=user, pkey=pkey, sock=proxy)
scp = SCPClient(ssh.get_transport())
scp.get(src, dest)
scp.close()
python类SSHConfig()的实例源码
def get_a_ssh_config(box_name):
"""Gives back a map of all the machine's ssh configurations"""
output = subprocess.check_output(["vagrant", "ssh-config", box_name])
config = SSHConfig()
config.parse(StringIO(output))
host_config = config.lookup(box_name)
# man 5 ssh_config:
# > It is possible to have multiple identity files ...
# > all these identities will be tried in sequence.
for id in host_config['identityfile']:
if os.path.isfile(id):
host_config['identityfile'] = id
return dict((v, host_config[k]) for k, v in _ssh_to_ansible)
# List out servers that vagrant has running
# ------------------------------
def get_ssh_key_for_host(host):
ssh_config = paramiko.SSHConfig()
user_config_file = os.path.expanduser("~/.ssh/config")
if os.path.exists(user_config_file):
with open(user_config_file) as f:
ssh_config.parse(f)
user_config = ssh_config.lookup(host)
if 'identityfile' in user_config:
path = os.path.expanduser(user_config['identityfile'][0])
if not os.path.exists(path):
raise Exception("Specified IdentityFile "+path
+ " for " + host + " in ~/.ssh/config not existing anymore.")
return path
def load_from_system_ssh_config(host):
import paramiko
paramiko_SSHClient_proxy_config = paramiko.SSHConfig()
merged_config = StringIO()
for conf_file in [os.path.join(os.path.sep, 'etc', 'ssh', 'ssh_config'), os.path.join(os.getenv('HOME','/tmp'), '.ssh', 'config')]:
try:
config = open(conf_file)
merged_config.write(config.read())
merged_config.write('\n')
config.close()
del config
except IOError as e:
pass
merged_config.seek(0,0)
paramiko_SSHClient_proxy_config.parse(merged_config)
del merged_config
return PluginSSHConfig.__rescurive_load_from_system_ssh_config(paramiko_SSHClient_proxy_config, host)
def get_config():
if not os.path.isfile(os.path.expanduser(SSH_CONF)):
return {}
with open(os.path.expanduser(SSH_CONF)) as f:
cfg = paramiko.SSHConfig()
cfg.parse(f)
ret_dict = {}
hostnames = cfg.get_hostnames()
for hostname in hostnames:
if ('?' in hostname) or ('*' in hostname):
continue
options = cfg.lookup(hostname)
# We want the ssh config to point to the real hostname, but we dont want to
# set ansible_ssh_host to the real name, but the ssh_config alias
if options['hostname'] == 'localhost':
options['hostname'] = hostname
ret_dict[hostname] = options
return ret_dict
def dummytest():
"""
Code posted on the github issue regarding the SSH Banner Error on the paramiko github.
https://github.com/paramiko/paramiko/issues/673
"""
import os
import paramiko
import logging
logging.basicConfig(level=logging.DEBUG)
# Loading ssh configuration to get the IP and user of the desired host (here 'bastion')
cfg = paramiko.SSHConfig()
with open(os.path.expanduser("~/.ssh/config")) as f:
cfg.parse(f)
host_cfg = cfg.lookup('bastion')
sock = paramiko.ProxyCommand("ssh {}@{} nc 10.8.9.160 22".format(host_cfg.get('user'), host_cfg.get('hostname')))
sock.settimeout(30)
# Sock stays open until a client tries to use it (or the program exits)
# Client Setup
client = paramiko.SSHClient()
client.load_system_host_keys()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# Connect and execute command
# The following line hangs for 15 seconds (increase with banner_timeout argument) and raises an SSHError
client.connect("10.8.9.160", username='root', sock=sock)
(stdin, stdout, stderr) = client.exec_command("echo 'Hello World !'")
for line in stdout.readlines():
print(line)
client.close()
def paramiko_connect(host):
client = paramiko.SSHClient()
client._policy = paramiko.WarningPolicy()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh_config = paramiko.SSHConfig()
user_config_file = os.path.expanduser("~/.ssh/config")
try:
with open(user_config_file) as f:
ssh_config.parse(f)
except FileNotFoundError:
print("{} file could not be found. Aborting.".format(user_config_file))
sys.exit(1)
cfg = {'hostname': options['hostname'], 'username': options["username"]}
user_config = ssh_config.lookup(cfg['hostname'])
for k in ('hostname', 'username', 'port'):
if k in user_config:
cfg[k] = user_config[k]
if 'proxycommand' in user_config:
cfg['sock'] = paramiko.ProxyCommand(user_config['proxycommand'])
return client.connect(**cfg)
def _get_ssh_config(config_path='~/.ssh/config'):
"""Extract the configuration located at ``config_path``.
Returns:
paramiko.SSHConfig: the configuration instance.
"""
ssh_config = paramiko.SSHConfig()
try:
with open(os.path.realpath(os.path.expanduser(config_path))) as f:
ssh_config.parse(f)
except IOError:
pass
return ssh_config
def __init__(self):
super(_SSHClientBuilder, self).__init__()
self._hostname = None
self._port = 22
self._username = None
self._password = None
self._config = paramiko.SSHConfig()
self._client_class = paramiko.SSHClient
self._missing_host_key_policy = paramiko.AutoAddPolicy
self._timeout = DEFAULT_TIMEOUT
self._banner_timeout = DEFAULT_TIMEOUT
self._allow_agent = None
self._proxy_command = None
self._sock = None
def read_ssh_config():
ssh_config_file = os.path.join(
os.environ['HOME'],
'.ssh',
'config')
ssh_cfg = None
try:
with open(ssh_config_file) as ssh_cfg_fh:
ssh_cfg = SSHConfig()
ssh_cfg.parse(ssh_cfg_fh)
except Exception as err: # pylint: disable=broad-except
LOG.warning("Default SSH configuration could not be read: {}".format(err))
return ssh_cfg
def gethostconfig(self, file, host):
import paramiko
file = os.path.expanduser(file)
if not os.path.isfile(file):
return {}
sshconfig = paramiko.SSHConfig()
try:
sshconfig.parse(open(file))
except Exception as e:
raise BackendException("could not load '%s', maybe corrupt?" % (file))
return sshconfig.lookup(host)
def through(self, host, user='root'):
"""
Defines a proxy command to rebound on the host.
This method is actually unsafe to use and will cause an SSH Banner Error. This library can't work through a
proxy
:param string host: Either a valid name stored in ~/.ssh/config or a valid IP address.
:param string user: A valid user for the host. Default : 'root'.
:return: This instance. Allows to chain methods.
:rtype: CloudApp
"""
ssh_conf_file = os.path.expanduser("~/.ssh/config")
try:
ssh_conf = paramiko.SSHConfig()
with open(ssh_conf_file) as f:
ssh_conf.parse(f)
if host in ssh_conf.get_hostnames():
host_conf = ssh_conf.lookup(host)
user = host_conf.get('user', 'root')
host = host_conf.get('hostname')
else:
print("Could not find host {} in {}. Using raw hostname.".format(host, ssh_conf_file))
except FileNotFoundError as e:
print("Could not load {} : {}. Using raw hostname.".format(ssh_conf_file, e))
# "ssh -W %h:%p {}@{}" doesn't create an entry in the logs. "ssh {}@{} nc %h %p"
# Actually connects to name (causing an entry in the logs)
print("ssh {}@{} nc {} 22".format(user, host, self.instance.private_ip_address))
self.ssh_sock = paramiko.ProxyCommand("ssh {}@{} nc {} 22".format(user, host, self.instance.private_ip_address))
return self
def ssh_connection(self):
"""Reusable :obj:`paramiko.client.SSHClient`."""
if hasattr(self, "_ssh_connection"):
return self._ssh_connection
# TODO configurable
ssh_config_file = os.path.join(os.path.expanduser("~"), ".ssh", "config")
client = paramiko.SSHClient()
client._policy = paramiko.WarningPolicy()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh_config = paramiko.SSHConfig()
if os.path.exists(ssh_config_file):
with open(ssh_config_file) as f:
ssh_config.parse(f)
parameters = {
"hostname": self.host,
"username": self.username,
"password": self.password,
"port": self.ssh_port,
}
user_config = ssh_config.lookup(self.host)
for k in ('hostname', 'username', 'port'):
if k in user_config:
parameters[k] = user_config[k]
if 'proxycommand' in user_config:
parameters['sock'] = paramiko.ProxyCommand(user_config['proxycommand'])
# TODO configurable
# if ssh_key_file:
# parameters['key_filename'] = ssh_key_file
if 'identityfile' in user_config:
parameters['key_filename'] = user_config['identityfile']
client.connect(**parameters)
self._ssh_connection = client
return client
def ssh(host, forward_agent=False, sudoable=False, max_attempts=1, max_timeout=5):
"""Manages a SSH connection to the desired host.
Will leverage your ssh config at ~/.ssh/config if available
:param host: the server to connect to
:type host: str
:param forward_agent: forward the local agents
:type forward_agent: bool
:param sudoable: allow sudo commands
:type sudoable: bool
:param max_attempts: the maximum attempts to connect to the desired host
:type max_attempts: int
:param max_timeout: the maximum timeout in seconds to sleep between attempts
:type max_timeout: int
:returns a SSH connection to the desired host
:rtype: Connection
:raises MaxConnectionAttemptsError: Exceeded the maximum attempts
to establish the SSH connection.
"""
with closing(SSHClient()) as client:
client.set_missing_host_key_policy(AutoAddPolicy())
cfg = {
"hostname": host,
"timeout": max_timeout,
}
ssh_config = SSHConfig()
user_config_file = os.path.expanduser("~/.ssh/config")
if os.path.exists(user_config_file):
with open(user_config_file) as f:
ssh_config.parse(f)
host_config = ssh_config.lookup(host)
if "user" in host_config:
cfg["username"] = host_config["user"]
if "proxycommand" in host_config:
cfg["sock"] = ProxyCommand(host_config["proxycommand"])
if "identityfile" in host_config:
cfg['key_filename'] = host_config['identityfile']
if "port" in host_config:
cfg["port"] = int(host_config["port"])
attempts = 0
while attempts < max_attempts:
try:
attempts += 1
client.connect(**cfg)
break
except socket.error:
if attempts < max_attempts:
time.sleep(max_timeout)
else:
raise MaxConnectionAttemptsError(
"Exceeded max attempts to connect to host: {0}".format(max_attempts)
)
yield Connection(client, forward_agent, sudoable)