def from_string(cls, key_string):
try:
pkey = paramiko.RSAKey(file_obj=StringIO(key_string))
return pkey
except paramiko.SSHException:
try:
pkey = paramiko.DSSKey(file_obj=StringIO(key_string))
return pkey
except paramiko.SSHException:
return None
python类DSSKey()的实例源码
def ssh_key_gen(length=2048, type='rsa', password=None, username='jumpserver', hostname=None):
"""Generate user ssh private and public key
Use paramiko RSAKey generate it.
:return private key str and public key str
"""
if hostname is None:
hostname = os.uname()[1]
f = StringIO()
try:
if type == 'rsa':
private_key_obj = paramiko.RSAKey.generate(length)
elif type == 'dsa':
private_key_obj = paramiko.DSSKey.generate(length)
else:
raise IOError('SSH private key must be `rsa` or `dsa`')
private_key_obj.write_private_key(f, password=password)
private_key = f.getvalue()
public_key = ssh_pubkey_gen(private_key_obj, username=username, hostname=hostname)
return private_key, public_key
except IOError:
raise IOError('These is error when generate ssh key.')
def test_4_dict_set(self):
hostdict = paramiko.HostKeys('hostfile.temp')
key = paramiko.RSAKey(data=decodebytes(keyblob))
key_dss = paramiko.DSSKey(data=decodebytes(keyblob_dss))
hostdict['secure.example.com'] = {
'ssh-rsa': key,
'ssh-dss': key_dss
}
hostdict['fake.example.com'] = {}
hostdict['fake.example.com']['ssh-rsa'] = key
self.assertEqual(3, len(hostdict))
self.assertEqual(2, len(list(hostdict.values())[0]))
self.assertEqual(1, len(list(hostdict.values())[1]))
self.assertEqual(1, len(list(hostdict.values())[2]))
fp = hexlify(hostdict['secure.example.com']['ssh-rsa'].get_fingerprint()).upper()
self.assertEqual(b'7EC91BB336CB6D810B124B1353C32396', fp)
fp = hexlify(hostdict['secure.example.com']['ssh-dss'].get_fingerprint()).upper()
self.assertEqual(b'4478F0B9A23CC5182009FF755BC1D26C', fp)
def ssh_key_string_to_obj(text):
key_f = StringIO(text)
key = None
try:
key = paramiko.RSAKey.from_private_key(key_f)
except paramiko.SSHException:
pass
try:
key = paramiko.DSSKey.from_private_key(key_f)
except paramiko.SSHException:
pass
return key
def handle(self):
transport = paramiko.Transport(self.request)
rsafile = self.server.cfg.get("ssh", "private_rsa")
dsafile = self.server.cfg.get("ssh", "private_dsa")
rsakey = paramiko.RSAKey(filename=rsafile)
dsakey = paramiko.DSSKey(filename=dsafile)
transport.add_server_key(rsakey)
transport.add_server_key(dsakey)
transport.local_version = self.server.cfg.get("ssh", "banner")
transport.set_subsystem_handler('sftp', paramiko.SFTPServer,
sftpServer.sftp_server)
nw = network.network(self.client_address[0],
self.server.cfg.get("wetland", "docker_addr"))
nw.create()
sServer = sshServer.ssh_server(transport=transport, network=nw)
try:
transport.start_server(server=sServer)
except paramiko.SSHException:
return
except Exception as e:
print e
nw.delete()
sServer.docker_trans.close()
return
try:
while True:
chann = transport.accept(60)
# no channel left
if not transport._channels.values():
break
except Exception as e:
print e
finally:
nw.delete()
sServer.docker_trans.close()
def get_private_key(self, password=None):
if self.private_key_file is not None:
with open(self.private_key_file, "r") as f:
key_head = f.readline()
if 'DSA' in key_head:
key_type = paramiko.DSSKey
elif 'RSA' in key_head:
key_type = paramiko.RSAKey
else:
raise ValueError("can't identify private key type")
with open(self.private_key_file, "r") as f:
return key_type.from_private_key(f, password=password)
def get_user_auth_keys (self, username):
"""Parse the users's authorized_keys file if any to look for authorized keys"""
if username in self.users_keys:
return self.users_keys[username]
self.users_keys[username] = []
userdir = os.path.expanduser("~" + username)
if not userdir:
return self.users_keys[username]
keyfile = os.path.join(userdir, ".ssh/authorized_keys")
if not keyfile or not os.path.exists(keyfile):
return self.users_keys[username]
with open(keyfile) as f:
for line in f.readlines():
line = line.strip()
if not line or line.startswith("#"):
continue
values = [ x.strip() for x in line.split() ]
exp = None
try:
int(values[0]) # bits value?
except ValueError:
# Type 1 or type 2, type 1 is bits in second value
options_ktype = values[0]
try:
int(values[1]) # bits value?
except ValueError:
# type 2 with options
ktype = options_ktype
data = values[1]
else:
# Type 1 no options.
exp = int(values[1])
data = values[2]
else:
# Type 1 no options.
exp = int(values[1])
data = values[2]
# XXX For now skip type 1 keys
if exp is not None:
continue
if data:
import base64
if ktype == "ssh-rsa":
key = ssh.RSAKey(data=base64.decodebytes(data.encode('ascii')))
elif ktype == "ssh-dss":
key = ssh.DSSKey(data=base64.decodebytes(data.encode('ascii')))
else:
key = None
if key:
self.users_keys[username].append(key)
return self.users_keys[username]
def add_server_key(self, key):
"""
Add a host key to the list of keys used for server mode. When behaving
as a server, the host key is used to sign certain packets during the
SSH2 negotiation, so that the client can trust that we are who we say
we are. Because this is used for signing, the key must contain private
key info, not just the public half. Only one key of each type (RSA or
DSS) is kept.
@param key: the host key to add, usually an L{RSAKey <rsakey.RSAKey>} or
L{DSSKey <dsskey.DSSKey>}.
@type key: L{PKey <pkey.PKey>}
"""
self.server_key_dict[key.get_name()] = key