def from_string(cls, key_string):
try:
pkey = paramiko.RSAKey(file_obj=StringIO(key_string))
return pkey
except paramiko.SSHException:
try:
pkey = paramiko.DSSKey(file_obj=StringIO(key_string))
return pkey
except paramiko.SSHException:
return None
python类RSAKey()的实例源码
def ssh_key_gen(length=2048, type='rsa', password=None, username='jumpserver', hostname=None):
"""Generate user ssh private and public key
Use paramiko RSAKey generate it.
:return private key str and public key str
"""
if hostname is None:
hostname = os.uname()[1]
f = StringIO()
try:
if type == 'rsa':
private_key_obj = paramiko.RSAKey.generate(length)
elif type == 'dsa':
private_key_obj = paramiko.DSSKey.generate(length)
else:
raise IOError('SSH private key must be `rsa` or `dsa`')
private_key_obj.write_private_key(f, password=password)
private_key = f.getvalue()
public_key = ssh_pubkey_gen(private_key_obj, username=username, hostname=hostname)
return private_key, public_key
except IOError:
raise IOError('These is error when generate ssh key.')
def test_4_auto_add_policy(self):
"""
verify that SSHClient's AutoAddPolicy works.
"""
threading.Thread(target=self._run).start()
host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key'))
public_host_key = paramiko.RSAKey(data=host_key.asbytes())
self.tc = paramiko.SSHClient()
self.tc.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.assertEqual(0, len(self.tc.get_host_keys()))
self.tc.connect(self.addr, self.port, username='slowdive', password='pygmalion')
self.event.wait(1.0)
self.assertTrue(self.event.is_set())
self.assertTrue(self.ts.is_active())
self.assertEqual('slowdive', self.ts.get_username())
self.assertEqual(True, self.ts.is_authenticated())
self.assertEqual(1, len(self.tc.get_host_keys()))
self.assertEqual(public_host_key, self.tc.get_host_keys()['[%s]:%d' % (self.addr, self.port)]['ssh-rsa'])
def test_7_banner_timeout(self):
"""
verify that the SSHClient has a configurable banner timeout.
"""
# Start the thread with a 1 second wait.
threading.Thread(target=self._run, kwargs={'delay': 1}).start()
host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key'))
public_host_key = paramiko.RSAKey(data=host_key.asbytes())
self.tc = paramiko.SSHClient()
self.tc.get_host_keys().add('[%s]:%d' % (self.addr, self.port), 'ssh-rsa', public_host_key)
# Connect with a half second banner timeout.
self.assertRaises(
paramiko.SSHException,
self.tc.connect,
self.addr,
self.port,
username='slowdive',
password='pygmalion',
banner_timeout=0.5
)
def test_4_dict_set(self):
hostdict = paramiko.HostKeys('hostfile.temp')
key = paramiko.RSAKey(data=decodebytes(keyblob))
key_dss = paramiko.DSSKey(data=decodebytes(keyblob_dss))
hostdict['secure.example.com'] = {
'ssh-rsa': key,
'ssh-dss': key_dss
}
hostdict['fake.example.com'] = {}
hostdict['fake.example.com']['ssh-rsa'] = key
self.assertEqual(3, len(hostdict))
self.assertEqual(2, len(list(hostdict.values())[0]))
self.assertEqual(1, len(list(hostdict.values())[1]))
self.assertEqual(1, len(list(hostdict.values())[2]))
fp = hexlify(hostdict['secure.example.com']['ssh-rsa'].get_fingerprint()).upper()
self.assertEqual(b'7EC91BB336CB6D810B124B1353C32396', fp)
fp = hexlify(hostdict['secure.example.com']['ssh-dss'].get_fingerprint()).upper()
self.assertEqual(b'4478F0B9A23CC5182009FF755BC1D26C', fp)
def validate_key(key_path):
""" Validate a key
:param key_path: path to a key to use for authentication
:type key_path: str
:return: key object used for authentication
:rtype: paramiko.RSAKey
"""
key_path = os.path.expanduser(key_path)
if not os.path.isfile(key_path):
return False
return paramiko.RSAKey.from_private_key_file(key_path)
def generate_key(bits):
"""Generate a paramiko RSAKey"""
# NOTE(dims): pycryptodome has changed the signature of the RSA.generate
# call. specifically progress_func has been dropped. paramiko still uses
# pycrypto. However some projects like latest pysaml2 have switched from
# pycrypto to pycryptodome as pycrypto seems to have been abandoned.
# paramiko project has started transition to pycryptodome as well but
# there is no release yet with that support. So at the moment depending on
# which version of pysaml2 is installed, Nova is likely to break. So we
# call "RSA.generate(bits)" which works on both pycrypto and pycryptodome
# and then wrap it into a paramiko.RSAKey
rsa = RSA.generate(bits)
key = paramiko.RSAKey(vals=(rsa.e, rsa.n))
key.d = rsa.d
key.p = rsa.p
key.q = rsa.q
return key
def setup_travis ():
import getpass
import sys
global private_key # pylint: disable=W0603
logging.basicConfig(level=logging.DEBUG, stream=sys.stderr)
print("Setup called.")
if 'USER' in os.environ:
if os.environ['USER'] != "travis":
return
else:
if getpass.getuser() != "travis":
return
print("Executing under Travis-CI")
ssh_dir = "{}/.ssh".format(os.environ['HOME'])
priv_filename = os.path.join(ssh_dir, "id_rsa")
if os.path.exists(priv_filename):
logger.error("Found private keyfile")
print("Found private keyfile")
return
else:
logger.error("Creating ssh dir " + ssh_dir)
print("Creating ssh dir " + ssh_dir)
os.system("mkdir -p {}".format(ssh_dir))
priv = ssh.RSAKey.generate(bits=1024)
private_key = priv
logger.error("Generating private keyfile " + priv_filename)
print("Generating private keyfile " + priv_filename)
priv.write_private_key_file(filename=priv_filename)
pub = ssh.RSAKey(filename=priv_filename)
auth_filename = os.path.join(ssh_dir, "authorized_keys")
logger.error("Adding keys to authorized_keys file " + auth_filename)
print("Adding keys to authorized_keys file " + auth_filename)
with open(auth_filename, "a") as authfile:
authfile.write("{} {}\n".format(pub.get_name(), pub.get_base64()))
logger.error("Done generating keys")
print("Done generating keys")
def ssh_key_string_to_obj(text):
key_f = StringIO(text)
key = None
try:
key = paramiko.RSAKey.from_private_key(key_f)
except paramiko.SSHException:
pass
try:
key = paramiko.DSSKey.from_private_key(key_f)
except paramiko.SSHException:
pass
return key
def get_host_key(cls):
logger.debug("Get ssh server host key")
if not os.path.isfile(cls.host_key_path):
cls.host_key_gen()
return paramiko.RSAKey(filename=cls.host_key_path)
def __init__(self, server, client_conn):
self.server = server
self.thread = None
self.command_queues = {}
client, _ = client_conn
self.transport = t = paramiko.Transport(client)
t.add_server_key(paramiko.RSAKey(filename=SERVER_KEY_PATH))
t.set_subsystem_handler("sftp", sftp.SFTPServer)
def add_user(self, uid, private_key_path):
k = paramiko.RSAKey.from_private_key_file(private_key_path)
self._users[uid] = (private_key_path, k)
def client(self, uid):
private_key_path, _ = self._users[uid]
c = paramiko.SSHClient()
host_keys = c.get_host_keys()
key = paramiko.RSAKey.from_private_key_file(SERVER_KEY_PATH)
host_keys.add(self.host, "ssh-rsa", key)
host_keys.add("[%s]:%d" % (self.host, self.port), "ssh-rsa", key)
c.set_missing_host_key_policy(paramiko.RejectPolicy())
c.connect(hostname=self.host,
port=self.port,
username=uid,
key_filename=private_key_path,
allow_agent=False,
look_for_keys=False)
return c
def generate_keys():
file_obj = StringIO.StringIO()
key = paramiko.RSAKey.generate(1024)
key.write_private_key(file_obj)
public = key.get_base64()
private = file_obj.getvalue()
file_obj.close()
return {'private': private,
'public': public}
def load_keyfile(file_path):
with open(file_path, 'r') as private_key_file:
private = private_key_file.read()
key = paramiko.RSAKey(file_obj=StringIO.StringIO(private))
public = key.get_base64()
return {'private': private,
'public': public}
def dump_keyfile(file_path, key):
key = paramiko.RSAKey(file_obj=StringIO.StringIO(key['private']))
key.write_private_key_file(file_path)
os.chmod(file_path, 0o644)
def _run(self, allowed_keys=None, delay=0):
if allowed_keys is None:
allowed_keys = FINGERPRINTS.keys()
self.socks, addr = self.sockl.accept()
self.ts = paramiko.Transport(self.socks)
host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key'))
self.ts.add_server_key(host_key)
server = NullServer(allowed_keys=allowed_keys)
if delay:
time.sleep(delay)
self.ts.start_server(self.event, server)
def test_5_save_host_keys(self):
"""
verify that SSHClient correctly saves a known_hosts file.
"""
warnings.filterwarnings('ignore', 'tempnam.*')
host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key'))
public_host_key = paramiko.RSAKey(data=host_key.asbytes())
fd, localname = mkstemp()
os.close(fd)
client = paramiko.SSHClient()
self.assertEquals(0, len(client.get_host_keys()))
host_id = '[%s]:%d' % (self.addr, self.port)
client.get_host_keys().add(host_id, 'ssh-rsa', public_host_key)
self.assertEquals(1, len(client.get_host_keys()))
self.assertEquals(public_host_key, client.get_host_keys()[host_id]['ssh-rsa'])
client.save_host_keys(localname)
with open(localname) as fd:
assert host_id in fd.read()
os.unlink(localname)
def test_6_cleanup(self):
"""
verify that when an SSHClient is collected, its transport (and the
transport's packetizer) is closed.
"""
# Unclear why this is borked on Py3, but it is, and does not seem worth
# pursuing at the moment.
# XXX: It's the release of the references to e.g packetizer that fails
# in py3...
if not PY2:
return
threading.Thread(target=self._run).start()
host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key'))
public_host_key = paramiko.RSAKey(data=host_key.asbytes())
self.tc = paramiko.SSHClient()
self.tc.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.assertEqual(0, len(self.tc.get_host_keys()))
self.tc.connect(self.addr, self.port, username='slowdive', password='pygmalion')
self.event.wait(1.0)
self.assertTrue(self.event.is_set())
self.assertTrue(self.ts.is_active())
p = weakref.ref(self.tc._transport.packetizer)
self.assertTrue(p() is not None)
self.tc.close()
del self.tc
# hrm, sometimes p isn't cleared right away. why is that?
#st = time.time()
#while (time.time() - st < 5.0) and (p() is not None):
# time.sleep(0.1)
# instead of dumbly waiting for the GC to collect, force a collection
# to see whether the SSHClient object is deallocated correctly
import gc
gc.collect()
self.assertTrue(p() is None)
def _run(self):
self.socks, addr = self.sockl.accept()
self.ts = paramiko.Transport(self.socks)
host_key = paramiko.RSAKey.from_private_key_file('tests/test_rsa.key')
self.ts.add_server_key(host_key)
server = NullServer()
self.ts.start_server(self.event, server)
def test_1_gss_auth(self):
"""
Verify that Paramiko can handle SSHv2 GSS-API / SSPI authentication
(gssapi-with-mic) in client and server mode.
"""
host_key = paramiko.RSAKey.from_private_key_file('tests/test_rsa.key')
public_host_key = paramiko.RSAKey(data=host_key.asbytes())
self.tc = paramiko.SSHClient()
self.tc.get_host_keys().add('[%s]:%d' % (self.hostname, self.port),
'ssh-rsa', public_host_key)
self.tc.connect(self.hostname, self.port, username=self.username,
gss_auth=True)
self.event.wait(1.0)
self.assert_(self.event.is_set())
self.assert_(self.ts.is_active())
self.assertEquals(self.username, self.ts.get_username())
self.assertEquals(True, self.ts.is_authenticated())
stdin, stdout, stderr = self.tc.exec_command('yes')
schan = self.ts.accept(1.0)
schan.send('Hello there.\n')
schan.send_stderr('This is on stderr.\n')
schan.close()
self.assertEquals('Hello there.\n', stdout.readline())
self.assertEquals('', stdout.readline())
self.assertEquals('This is on stderr.\n', stderr.readline())
self.assertEquals('', stderr.readline())
stdin.close()
stdout.close()
stderr.close()
def test_2_add(self):
hostdict = paramiko.HostKeys('hostfile.temp')
hh = '|1|BMsIC6cUIP2zBuXR3t2LRcJYjzM=|hpkJMysjTk/+zzUUzxQEa2ieq6c='
key = paramiko.RSAKey(data=decodebytes(keyblob))
hostdict.add(hh, 'ssh-rsa', key)
self.assertEqual(3, len(list(hostdict)))
x = hostdict['foo.example.com']
fp = hexlify(x['ssh-rsa'].get_fingerprint()).upper()
self.assertEqual(b'7EC91BB336CB6D810B124B1353C32396', fp)
self.assertTrue(hostdict.check('foo.example.com', key))
def test_1_gsskex_and_auth(self):
"""
Verify that Paramiko can handle SSHv2 GSS-API / SSPI authenticated
Diffie-Hellman Key Exchange and user authentication with the GSS-API
context created during key exchange.
"""
host_key = paramiko.RSAKey.from_private_key_file('tests/test_rsa.key')
public_host_key = paramiko.RSAKey(data=host_key.asbytes())
self.tc = paramiko.SSHClient()
self.tc.get_host_keys().add('[%s]:%d' % (self.hostname, self.port),
'ssh-rsa', public_host_key)
self.tc.connect(self.hostname, self.port, username=self.username,
gss_auth=True, gss_kex=True)
self.event.wait(1.0)
self.assert_(self.event.is_set())
self.assert_(self.ts.is_active())
self.assertEquals(self.username, self.ts.get_username())
self.assertEquals(True, self.ts.is_authenticated())
stdin, stdout, stderr = self.tc.exec_command('yes')
schan = self.ts.accept(1.0)
schan.send('Hello there.\n')
schan.send_stderr('This is on stderr.\n')
schan.close()
self.assertEquals('Hello there.\n', stdout.readline())
self.assertEquals('', stdout.readline())
self.assertEquals('This is on stderr.\n', stderr.readline())
self.assertEquals('', stderr.readline())
stdin.close()
stdout.close()
stderr.close()
def handle(self):
transport = paramiko.Transport(self.request)
rsafile = self.server.cfg.get("ssh", "private_rsa")
dsafile = self.server.cfg.get("ssh", "private_dsa")
rsakey = paramiko.RSAKey(filename=rsafile)
dsakey = paramiko.DSSKey(filename=dsafile)
transport.add_server_key(rsakey)
transport.add_server_key(dsakey)
transport.local_version = self.server.cfg.get("ssh", "banner")
transport.set_subsystem_handler('sftp', paramiko.SFTPServer,
sftpServer.sftp_server)
nw = network.network(self.client_address[0],
self.server.cfg.get("wetland", "docker_addr"))
nw.create()
sServer = sshServer.ssh_server(transport=transport, network=nw)
try:
transport.start_server(server=sServer)
except paramiko.SSHException:
return
except Exception as e:
print e
nw.delete()
sServer.docker_trans.close()
return
try:
while True:
chann = transport.accept(60)
# no channel left
if not transport._channels.values():
break
except Exception as e:
print e
finally:
nw.delete()
sServer.docker_trans.close()
def get_transport(host, username, key):
""" Create a transport object
:param host: the hostname to connect to
:type host: str
:param username: SSH username
:type username: str
:param key: key object used for authentication
:type key: paramiko.RSAKey
:return: a transport object
:rtype: paramiko.Transport
"""
if host == shakedown.master_ip():
transport = paramiko.Transport(host)
else:
transport_master = paramiko.Transport(shakedown.master_ip())
transport_master = start_transport(transport_master, username, key)
if not transport_master.is_authenticated():
print("error: unable to authenticate {}@{} with key {}".format(username, shakedown.master_ip(), key_path))
return False
try:
channel = transport_master.open_channel('direct-tcpip', (host, 22), ('127.0.0.1', 0))
except paramiko.SSHException:
print("error: unable to connect to {}".format(host))
return False
transport = paramiko.Transport(channel)
return transport
def start_transport(transport, username, key):
""" Begin a transport client and authenticate it
:param transport: the transport object to start
:type transport: paramiko.Transport
:param username: SSH username
:type username: str
:param key: key object used for authentication
:type key: paramiko.RSAKey
:return: the transport object passed
:rtype: paramiko.Transport
"""
transport.start_client()
agent = paramiko.agent.Agent()
keys = itertools.chain((key,) if key else (), agent.get_keys())
for test_key in keys:
try:
transport.auth_publickey(username, test_key)
break
except paramiko.AuthenticationException as e:
pass
else:
raise ValueError('No valid key supplied')
return transport
# SSH connection will be auto-terminated at the conclusion of this operation, causing
# a race condition; the try/except block attempts to close the channel and/or transport
# but does not issue a failure if it has already been closed.
def get_private_key(self, password=None):
if self.private_key_file is not None:
with open(self.private_key_file, "r") as f:
key_head = f.readline()
if 'DSA' in key_head:
key_type = paramiko.DSSKey
elif 'RSA' in key_head:
key_type = paramiko.RSAKey
else:
raise ValueError("can't identify private key type")
with open(self.private_key_file, "r") as f:
return key_type.from_private_key(f, password=password)
def load_remote_rsa_key(self, remote_filename):
"""
Returns paramiko.RSAKey object for an RSA key located on the remote
machine
"""
rfile = self.remote_file(remote_filename, 'r')
key = get_rsa_key(key_file_obj=rfile)
rfile.close()
return key
def get_rsa_key(key_location=None, key_file_obj=None, passphrase=None,
use_pycrypto=False):
key_fobj = key_file_obj or open(key_location)
try:
if use_pycrypto:
key = RSA.importKey(key_fobj, passphrase=passphrase)
else:
key = paramiko.RSAKey.from_private_key(key_fobj,
password=passphrase)
return key
except (paramiko.SSHException, ValueError):
raise exception.SSHError(
"Invalid RSA private key file or missing passphrase: %s" %
key_location)
def generate_rsa_key():
return paramiko.RSAKey.generate(2048)