def _get_verifying_key(self, host_key_blob):
# Parse the received data from the host_key_blob
index, host_key_type = parse_string(host_key_blob, 0)
index, curve_name = parse_string(host_key_blob, index)
index, host_public_key = parse_string(host_key_blob, index)
# Find the expected host key in ~/.ssh/known_hosts
expected_host_key_type = None
expected_host_public_key = None
known_hosts_filename = os.path.expanduser('~/.ssh/known_hosts')
for line in open(known_hosts_filename, 'r'):
if len(line.strip()) > 0:
current_hostname, current_key_type, current_key = line.split(' ')
if current_hostname == self.hostname:
expected_host_key_type = current_key_type
expected_host_public_key = current_key.decode('base64')
break
# If we *did* find the host key (i.e. we've already connected to this server), check that
# everything matches
if expected_host_key_type is not None:
assert host_key_type == expected_host_key_type, 'Unexpected host key type: %s' % host_key_type
assert curve_name == 'nistp256', 'Unknown curve name: %s' % curve_name
assert host_key_blob == expected_host_public_key, \
'Unexpected host public key: %s' % repr(host_key_blob)
# Otherwise, if we haven't seen the host key before, prompt the user to see if they're okay with
# that
else:
assert host_key_type == 'ecdsa-sha2-nistp256', 'Unknown host key type: %s' % host_key_type
key_fingerprint = hashlib.sha256(host_key_blob).digest().encode('base64')
# Remove the base64-added new lines, and the padding '=' characters
key_fingerprint = key_fingerprint.replace('\n', '').rstrip('=')
print "The authenticity of host '%s' can't be established." % self.hostname
print "ECDSA key fingerprint is SHA256:%s." % key_fingerprint
answer = raw_input("Are you sure you want to continue connecting (yes/no)?\n").strip()
while answer not in ['yes', 'no', '']:
answer = raw_input("Please type 'yes' or 'no': ").strip()
# Add key to ~/.ssh/known_hosts
if answer == 'yes':
with open(known_hosts_filename, 'a') as known_hosts_file:
host_key_base64 = host_key_blob.encode('base64').replace('\n', '')
known_hosts_file.write('%s %s %s\n' % (self.hostname, host_key_type, host_key_base64))
else:
assert False, 'Host key verification failed.'
# NFI why we need to skip a byte here - I can't find this format documented anywhere. I assume
# this is some kind of type indicator.
assert host_public_key[0] == '\x04'
return ecdsa.VerifyingKey.from_string(host_public_key[1:], curve=ecdsa.NIST256p)
评论列表
文章目录