def configure_client_socket(self):
self.log.info("SSHProtocol: ccs: configuring client socket")
host_key = paramiko.RSAKey(filename='test_rsa.key')
self.sourcet = paramiko.Transport(self.source)
try:
self.sourcet.load_server_moduli()
except:
self.log.error("SSHProtocol: Failed to load moduli -- gex " \
"will be unsupported.)")
raise
self.sourcet.add_server_key(host_key)
# Initialize the SSH connection to the server
# BUGFIX: client must init before the server start. A race condition
# was created between the server and the client. If the client can
# get credentials to SSHServer before client socket config
# this method there will be an error.
self.log.info("SSHProtocol: initializing client to server SSH")
self.init_ssh_client()
# Start up the SSH server.
server = SSHServer(self)
try:
self.sourcet.start_server(server=server)
except paramiko.SSHException, x:
self.log.error("SSHProtocol: SSH negotiation failed.")
return
# wait for auth
chan = self.sourcet.accept(20)
if chan is None:
self.log.error("*** No channel.")
sys.exit(1)
self.log.info("SSHProtocol: User authenticated!")
server.event.wait(10)
if not server.event.isSet():
self.log.warn("SSHProtocol: client never asked for a anything.")
return
chan.send('\r\n\r\nThings just got real.\r\n\r\n')
self.sourceschan = chan
self.log.debug("SSHProtocol: finished configuring client socket")
self.destschan = self.destt.open_session()
self.destschan.get_pty()
self.destschan.invoke_shell()
# chan = t.open_session()
# chan.get_pty()
# chan.invoke_shell()
#chan.close()
#self.sourcet.close()
python类RSAKey()的实例源码
def get_user_auth_keys (self, username):
"""Parse the users's authorized_keys file if any to look for authorized keys"""
if username in self.users_keys:
return self.users_keys[username]
self.users_keys[username] = []
userdir = os.path.expanduser("~" + username)
if not userdir:
return self.users_keys[username]
keyfile = os.path.join(userdir, ".ssh/authorized_keys")
if not keyfile or not os.path.exists(keyfile):
return self.users_keys[username]
with open(keyfile) as f:
for line in f.readlines():
line = line.strip()
if not line or line.startswith("#"):
continue
values = [ x.strip() for x in line.split() ]
exp = None
try:
int(values[0]) # bits value?
except ValueError:
# Type 1 or type 2, type 1 is bits in second value
options_ktype = values[0]
try:
int(values[1]) # bits value?
except ValueError:
# type 2 with options
ktype = options_ktype
data = values[1]
else:
# Type 1 no options.
exp = int(values[1])
data = values[2]
else:
# Type 1 no options.
exp = int(values[1])
data = values[2]
# XXX For now skip type 1 keys
if exp is not None:
continue
if data:
import base64
if ktype == "ssh-rsa":
key = ssh.RSAKey(data=base64.decodebytes(data.encode('ascii')))
elif ktype == "ssh-dss":
key = ssh.DSSKey(data=base64.decodebytes(data.encode('ascii')))
else:
key = None
if key:
self.users_keys[username].append(key)
return self.users_keys[username]
def _test_connection(self, **kwargs):
"""
(Most) kwargs get passed directly into SSHClient.connect().
The exception is ``allowed_keys`` which is stripped and handed to the
``NullServer`` used for testing.
"""
run_kwargs = {'allowed_keys': kwargs.pop('allowed_keys', None)}
# Server setup
threading.Thread(target=self._run, kwargs=run_kwargs).start()
host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key'))
public_host_key = paramiko.RSAKey(data=host_key.asbytes())
# Client setup
self.tc = paramiko.SSHClient()
self.tc.get_host_keys().add('[%s]:%d' % (self.addr, self.port), 'ssh-rsa', public_host_key)
# Actual connection
self.tc.connect(self.addr, self.port, username='slowdive', **kwargs)
# Authentication successful?
self.event.wait(1.0)
self.assertTrue(self.event.is_set())
self.assertTrue(self.ts.is_active())
self.assertEqual('slowdive', self.ts.get_username())
self.assertEqual(True, self.ts.is_authenticated())
# Command execution functions?
stdin, stdout, stderr = self.tc.exec_command('yes')
schan = self.ts.accept(1.0)
schan.send('Hello there.\n')
schan.send_stderr('This is on stderr.\n')
schan.close()
self.assertEqual('Hello there.\n', stdout.readline())
self.assertEqual('', stdout.readline())
self.assertEqual('This is on stderr.\n', stderr.readline())
self.assertEqual('', stderr.readline())
# Cleanup
stdin.close()
stdout.close()
stderr.close()
def add_server_key(self, key):
"""
Add a host key to the list of keys used for server mode. When behaving
as a server, the host key is used to sign certain packets during the
SSH2 negotiation, so that the client can trust that we are who we say
we are. Because this is used for signing, the key must contain private
key info, not just the public half. Only one key of each type (RSA or
DSS) is kept.
@param key: the host key to add, usually an L{RSAKey <rsakey.RSAKey>} or
L{DSSKey <dsskey.DSSKey>}.
@type key: L{PKey <pkey.PKey>}
"""
self.server_key_dict[key.get_name()] = key