def check_auth_publickey(self, username, key):
self.logger.info('%s:%d: trying publickey authentication for "%s"' % (self.client_address + (username,)))
# Check the username
upstream = self._findUpstream(username)
if not upstream:
return paramiko.AUTH_FAILED
# Look for the client key in upstream's authorized_keys file
if not upstream.upstream_key:
self.logger.warning('%s:%d: publickey authentication is disabled for "%s"' % (self.client_address + (username,)))
return paramiko.AUTH_FAILED
authenticated = False
if self._connectToUpstream(upstream, publickey=True) == paramiko.AUTH_SUCCESSFUL:
try:
sftp = self.client.open_sftp()
with sftp.file(upstream.upstream_authorized_keys, 'r') as file:
for line in file.readlines():
line = line.split(' ')
if (len(line) >= 2) and (line[0] == key.get_name()) and (line[1] == key.get_base64()):
authenticated = True
break
sftp.close()
except Exception:
self.logger.info('%s:%d: an error occurred while looking for the public key of "%s" in upstream\'s "%s" file' % (self.client_address + (username, upstream.upstream_authorized_keys)))
self.logger.debug('Catched exception', exc_info=True)
# Close all connections
self.upstream = None
self.shellchannel.close()
self.client.close()
if not authenticated:
self.logger.critical('%s:%d: authentication of "%s" with publickey failed' % (self.client_address + (username,)))
self.upstream = None
return paramiko.AUTH_FAILED
# Connect to the upstream
return self._connectToUpstream(upstream, publickey=True)
评论列表
文章目录