def _get_cipher(self, name, key, iv):
if name not in self._cipher_info:
raise SSHException('Unknown client cipher ' + name)
if name in ('arcfour128', 'arcfour256'):
# arcfour cipher
cipher = self._cipher_info[name]['class'].new(key)
# as per RFC 4345, the first 1536 bytes of keystream
# generated by the cipher MUST be discarded
cipher.encrypt(" " * 1536)
return cipher
elif name.endswith("-ctr"):
# CTR modes, we need a counter
counter = Counter.new(nbits=self._cipher_info[name]['block-size'] * 8, initial_value=util.inflate_long(iv, True))
return self._cipher_info[name]['class'].new(key, self._cipher_info[name]['mode'], iv, counter)
else:
return self._cipher_info[name]['class'].new(key, self._cipher_info[name]['mode'], iv)
python类util()的实例源码
def _get_cipher(self, name, key, iv):
if name not in self._cipher_info:
raise SSHException('Unknown client cipher ' + name)
if name in ('arcfour128', 'arcfour256'):
# arcfour cipher
cipher = self._cipher_info[name]['class'].new(key)
# as per RFC 4345, the first 1536 bytes of keystream
# generated by the cipher MUST be discarded
cipher.encrypt(" " * 1536)
return cipher
elif name.endswith("-ctr"):
# CTR modes, we need a counter
counter = Counter.new(nbits=self._cipher_info[name]['block-size'] * 8, initial_value=util.inflate_long(iv, True))
return self._cipher_info[name]['class'].new(key, self._cipher_info[name]['mode'], iv, counter)
else:
return self._cipher_info[name]['class'].new(key, self._cipher_info[name]['mode'], iv)
def _get_cipher(self, name, key, iv):
if name not in self._cipher_info:
raise SSHException('Unknown client cipher ' + name)
if name in ('arcfour128', 'arcfour256'):
# arcfour cipher
cipher = self._cipher_info[name]['class'].new(key)
# as per RFC 4345, the first 1536 bytes of keystream
# generated by the cipher MUST be discarded
cipher.encrypt(" " * 1536)
return cipher
elif name.endswith("-ctr"):
# CTR modes, we need a counter
counter = Counter.new(nbits=self._cipher_info[name]['block-size'] * 8, initial_value=util.inflate_long(iv, True))
return self._cipher_info[name]['class'].new(key, self._cipher_info[name]['mode'], iv, counter)
else:
return self._cipher_info[name]['class'].new(key, self._cipher_info[name]['mode'], iv)
def test_7_host_config_expose_issue_33(self):
test_config_file = """
Host www13.*
Port 22
Host *.example.com
Port 2222
Host *
Port 3333
"""
f = StringIO(test_config_file)
config = paramiko.util.parse_ssh_config(f)
host = 'www13.example.com'
self.assertEqual(
paramiko.util.lookup_ssh_host_config(host, config),
{'hostname': host, 'port': '22'}
)
def test_8_eintr_retry(self):
self.assertEqual('foo', paramiko.util.retry_on_signal(lambda: 'foo'))
# Variables that are set by raises_intr
intr_errors_remaining = [3]
call_count = [0]
def raises_intr():
call_count[0] += 1
if intr_errors_remaining[0] > 0:
intr_errors_remaining[0] -= 1
raise IOError(errno.EINTR, 'file', 'interrupted system call')
self.assertTrue(paramiko.util.retry_on_signal(raises_intr) is None)
self.assertEqual(0, intr_errors_remaining[0])
self.assertEqual(4, call_count[0])
def raises_ioerror_not_eintr():
raise IOError(errno.ENOENT, 'file', 'file not found')
self.assertRaises(IOError,
lambda: paramiko.util.retry_on_signal(raises_ioerror_not_eintr))
def raises_other_exception():
raise AssertionError('foo')
self.assertRaises(AssertionError,
lambda: paramiko.util.retry_on_signal(raises_other_exception))
def test_9_proxycommand_config_equals_parsing(self):
"""
ProxyCommand should not split on equals signs within the value.
"""
conf = """
Host space-delimited
ProxyCommand foo bar=biz baz
Host equals-delimited
ProxyCommand=foo bar=biz baz
"""
f = StringIO(conf)
config = paramiko.util.parse_ssh_config(f)
for host in ('space-delimited', 'equals-delimited'):
self.assertEqual(
host_config(host, config)['proxycommand'],
'foo bar=biz baz'
)
def test_11_host_config_test_negation(self):
test_config_file = """
Host www13.* !*.example.com
Port 22
Host *.example.com !www13.*
Port 2222
Host www13.*
Port 8080
Host *
Port 3333
"""
f = StringIO(test_config_file)
config = paramiko.util.parse_ssh_config(f)
host = 'www13.example.com'
self.assertEqual(
paramiko.util.lookup_ssh_host_config(host, config),
{'hostname': host, 'port': '8080'}
)
def test_proxycommand_none_issue_418(self):
test_config_file = """
Host proxycommand-standard-none
ProxyCommand None
Host proxycommand-with-equals-none
ProxyCommand=None
"""
for host, values in {
'proxycommand-standard-none': {'hostname': 'proxycommand-standard-none'},
'proxycommand-with-equals-none': {'hostname': 'proxycommand-with-equals-none'}
}.items():
f = StringIO(test_config_file)
config = paramiko.util.parse_ssh_config(f)
self.assertEqual(
paramiko.util.lookup_ssh_host_config(host, config),
values
)
def _get_cipher(self, name, key, iv):
if name not in self._cipher_info:
raise SSHException('Unknown client cipher ' + name)
if name in ('arcfour128', 'arcfour256'):
# arcfour cipher
cipher = self._cipher_info[name]['class'].new(key)
# as per RFC 4345, the first 1536 bytes of keystream
# generated by the cipher MUST be discarded
cipher.encrypt(" " * 1536)
return cipher
elif name.endswith("-ctr"):
# CTR modes, we need a counter
counter = Counter.new(nbits=self._cipher_info[name]['block-size'] * 8, initial_value=util.inflate_long(iv, True))
return self._cipher_info[name]['class'].new(key, self._cipher_info[name]['mode'], iv, counter)
else:
return self._cipher_info[name]['class'].new(key, self._cipher_info[name]['mode'], iv)
def set_log_channel(self, name):
"""
Set the channel for this transport's logging. The default is
C{"paramiko.transport"} but it can be set to anything you want.
(See the C{logging} module for more info.) SSH Channels will log
to a sub-channel of the one specified.
@param name: new channel name for logging
@type name: str
@since: 1.1
"""
self.log_name = name
self.logger = util.get_logger(name)
self.packetizer.set_log(self.logger)
def _parse_debug(self, m):
always_display = m.get_boolean()
msg = m.get_string()
lang = m.get_string()
self._log(DEBUG, 'Debug msg: ' + util.safe_string(msg))
def set_log_channel(self, name):
"""
Set the channel for this transport's logging. The default is
``"paramiko.transport"`` but it can be set to anything you want. (See
the `.logging` module for more info.) SSH Channels will log to a
sub-channel of the one specified.
:param str name: new channel name for logging
.. versionadded:: 1.1
"""
self.log_name = name
self.logger = util.get_logger(name)
self.packetizer.set_log(self.logger)
def _parse_debug(self, m):
always_display = m.get_boolean()
msg = m.get_string()
lang = m.get_string()
self._log(DEBUG, 'Debug msg: {0}'.format(util.safe_string(msg)))
def set_log_channel(self, name):
"""
Set the channel for this transport's logging. The default is
``"paramiko.transport"`` but it can be set to anything you want. (See
the `.logging` module for more info.) SSH Channels will log to a
sub-channel of the one specified.
:param str name: new channel name for logging
.. versionadded:: 1.1
"""
self.log_name = name
self.logger = util.get_logger(name)
self.packetizer.set_log(self.logger)
def _parse_debug(self, m):
always_display = m.get_boolean()
msg = m.get_string()
lang = m.get_string()
self._log(DEBUG, 'Debug msg: {0}'.format(util.safe_string(msg)))
def test_1_import(self):
"""
verify that all the classes can be imported from paramiko.
"""
symbols = list(globals().keys())
self.assertTrue('Transport' in symbols)
self.assertTrue('SSHClient' in symbols)
self.assertTrue('MissingHostKeyPolicy' in symbols)
self.assertTrue('AutoAddPolicy' in symbols)
self.assertTrue('RejectPolicy' in symbols)
self.assertTrue('WarningPolicy' in symbols)
self.assertTrue('SecurityOptions' in symbols)
self.assertTrue('SubsystemHandler' in symbols)
self.assertTrue('Channel' in symbols)
self.assertTrue('RSAKey' in symbols)
self.assertTrue('DSSKey' in symbols)
self.assertTrue('Message' in symbols)
self.assertTrue('SSHException' in symbols)
self.assertTrue('AuthenticationException' in symbols)
self.assertTrue('PasswordRequiredException' in symbols)
self.assertTrue('BadAuthenticationType' in symbols)
self.assertTrue('ChannelException' in symbols)
self.assertTrue('SFTP' in symbols)
self.assertTrue('SFTPFile' in symbols)
self.assertTrue('SFTPHandle' in symbols)
self.assertTrue('SFTPClient' in symbols)
self.assertTrue('SFTPServer' in symbols)
self.assertTrue('SFTPError' in symbols)
self.assertTrue('SFTPAttributes' in symbols)
self.assertTrue('SFTPServerInterface' in symbols)
self.assertTrue('ServerInterface' in symbols)
self.assertTrue('BufferedFile' in symbols)
self.assertTrue('Agent' in symbols)
self.assertTrue('AgentKey' in symbols)
self.assertTrue('HostKeys' in symbols)
self.assertTrue('SSHConfig' in symbols)
self.assertTrue('util' in symbols)
def test_2_parse_config(self):
global test_config_file
f = StringIO(test_config_file)
config = paramiko.util.parse_ssh_config(f)
self.assertEqual(config._config,
[{'host': ['*'], 'config': {}}, {'host': ['*'], 'config': {'identityfile': ['~/.ssh/id_rsa'], 'user': 'robey'}},
{'host': ['*.example.com'], 'config': {'user': 'bjork', 'port': '3333'}},
{'host': ['*'], 'config': {'crazy': 'something dumb '}},
{'host': ['spoo.example.com'], 'config': {'crazy': 'something else'}}])
def test_4_generate_key_bytes(self):
x = paramiko.util.generate_key_bytes(sha1, b'ABCDEFGH', 'This is my secret passphrase.', 64)
hex = ''.join(['%02x' % byte_ord(c) for c in x])
self.assertEqual(hex, '9110e2f6793b69363e58173e9436b13a5a4b339005741d5c680e505f57d871347b4239f14fb5c46e857d5e100424873ba849ac699cea98d729e57b3e84378e8b')
def test_5_host_keys(self):
with open('hostfile.temp', 'w') as f:
f.write(test_hosts_file)
try:
hostdict = paramiko.util.load_host_keys('hostfile.temp')
self.assertEqual(2, len(hostdict))
self.assertEqual(1, len(list(hostdict.values())[0]))
self.assertEqual(1, len(list(hostdict.values())[1]))
fp = hexlify(hostdict['secure.example.com']['ssh-rsa'].get_fingerprint()).upper()
self.assertEqual(b'E6684DB30E109B67B70FF1DC5C7F1363', fp)
finally:
os.unlink('hostfile.temp')
def test_12_host_config_test_proxycommand(self):
test_config_file = """
Host proxy-with-equal-divisor-and-space
ProxyCommand = foo=bar
Host proxy-with-equal-divisor-and-no-space
ProxyCommand=foo=bar
Host proxy-without-equal-divisor
ProxyCommand foo=bar:%h-%p
"""
for host, values in {
'proxy-with-equal-divisor-and-space' :{'hostname': 'proxy-with-equal-divisor-and-space',
'proxycommand': 'foo=bar'},
'proxy-with-equal-divisor-and-no-space':{'hostname': 'proxy-with-equal-divisor-and-no-space',
'proxycommand': 'foo=bar'},
'proxy-without-equal-divisor' :{'hostname': 'proxy-without-equal-divisor',
'proxycommand':
'foo=bar:proxy-without-equal-divisor-22'}
}.items():
f = StringIO(test_config_file)
config = paramiko.util.parse_ssh_config(f)
self.assertEqual(
paramiko.util.lookup_ssh_host_config(host, config),
values
)
def test_11_host_config_test_identityfile(self):
test_config_file = """
IdentityFile id_dsa0
Host *
IdentityFile id_dsa1
Host dsa2
IdentityFile id_dsa2
Host dsa2*
IdentityFile id_dsa22
"""
for host, values in {
'foo' :{'hostname': 'foo',
'identityfile': ['id_dsa0', 'id_dsa1']},
'dsa2' :{'hostname': 'dsa2',
'identityfile': ['id_dsa0', 'id_dsa1', 'id_dsa2', 'id_dsa22']},
'dsa22' :{'hostname': 'dsa22',
'identityfile': ['id_dsa0', 'id_dsa1', 'id_dsa22']}
}.items():
f = StringIO(test_config_file)
config = paramiko.util.parse_ssh_config(f)
self.assertEqual(
paramiko.util.lookup_ssh_host_config(host, config),
values
)
def test_12_config_addressfamily_and_lazy_fqdn(self):
"""
Ensure the code path honoring non-'all' AddressFamily doesn't asplode
"""
test_config = """
AddressFamily inet
IdentityFile something_%l_using_fqdn
"""
config = paramiko.util.parse_ssh_config(StringIO(test_config))
assert config.lookup('meh') # will die during lookup() if bug regresses
def test_clamp_value(self):
self.assertEqual(32768, paramiko.util.clamp_value(32767, 32768, 32769))
self.assertEqual(32767, paramiko.util.clamp_value(32767, 32765, 32769))
self.assertEqual(32769, paramiko.util.clamp_value(32767, 32770, 32769))
def test_quoted_host_names(self):
test_config_file = """\
Host "param pam" param "pam"
Port 1111
Host "param2"
Port 2222
Host param3 parara
Port 3333
Host param4 "p a r" "p" "par" para
Port 4444
"""
res = {
'param pam': {'hostname': 'param pam', 'port': '1111'},
'param': {'hostname': 'param', 'port': '1111'},
'pam': {'hostname': 'pam', 'port': '1111'},
'param2': {'hostname': 'param2', 'port': '2222'},
'param3': {'hostname': 'param3', 'port': '3333'},
'parara': {'hostname': 'parara', 'port': '3333'},
'param4': {'hostname': 'param4', 'port': '4444'},
'p a r': {'hostname': 'p a r', 'port': '4444'},
'p': {'hostname': 'p', 'port': '4444'},
'par': {'hostname': 'par', 'port': '4444'},
'para': {'hostname': 'para', 'port': '4444'},
}
f = StringIO(test_config_file)
config = paramiko.util.parse_ssh_config(f)
for host, values in res.items():
self.assertEquals(
paramiko.util.lookup_ssh_host_config(host, config),
values
)
def test_quoted_params_in_config(self):
test_config_file = """\
Host "param pam" param "pam"
IdentityFile id_rsa
Host "param2"
IdentityFile "test rsa key"
Host param3 parara
IdentityFile id_rsa
IdentityFile "test rsa key"
"""
res = {
'param pam': {'hostname': 'param pam', 'identityfile': ['id_rsa']},
'param': {'hostname': 'param', 'identityfile': ['id_rsa']},
'pam': {'hostname': 'pam', 'identityfile': ['id_rsa']},
'param2': {'hostname': 'param2', 'identityfile': ['test rsa key']},
'param3': {'hostname': 'param3', 'identityfile': ['id_rsa', 'test rsa key']},
'parara': {'hostname': 'parara', 'identityfile': ['id_rsa', 'test rsa key']},
}
f = StringIO(test_config_file)
config = paramiko.util.parse_ssh_config(f)
for host, values in res.items():
self.assertEquals(
paramiko.util.lookup_ssh_host_config(host, config),
values
)
def test_N_file_with_percent(self):
"""
verify that we can create a file with a '%' in the filename.
( it needs to be properly escaped by _log() )
"""
self.assertTrue( paramiko.util.get_logger("paramiko").handlers, "This unit test requires logging to be enabled" )
f = sftp.open(FOLDER + '/test%file', 'w')
try:
self.assertEqual(f.stat().st_size, 0)
finally:
f.close()
sftp.remove(FOLDER + '/test%file')
def set_log_channel(self, name):
"""
Set the channel for this transport's logging. The default is
``"paramiko.transport"`` but it can be set to anything you want. (See
the `.logging` module for more info.) SSH Channels will log to a
sub-channel of the one specified.
:param str name: new channel name for logging
.. versionadded:: 1.1
"""
self.log_name = name
self.logger = util.get_logger(name)
self.packetizer.set_log(self.logger)
def _parse_debug(self, m):
always_display = m.get_boolean()
msg = m.get_string()
lang = m.get_string()
self._log(DEBUG, 'Debug msg: {0}'.format(util.safe_string(msg)))
def set_log_channel(self, name):
"""
Set the channel for this transport's logging. The default is
``"paramiko.transport"`` but it can be set to anything you want. (See
the `.logging` module for more info.) SSH Channels will log to a
sub-channel of the one specified.
:param str name: new channel name for logging
.. versionadded:: 1.1
"""
self.log_name = name
self.logger = util.get_logger(name)
self.packetizer.set_log(self.logger)
def _parse_debug(self, m):
m.get_boolean() # always_display
msg = m.get_string()
m.get_string() # language
self._log(DEBUG, 'Debug msg: {0}'.format(util.safe_string(msg)))