def hash_host(hostname, salt=None):
"""
Return a "hashed" form of the hostname, as used by OpenSSH when storing
hashed hostnames in the known_hosts file.
:param str hostname: the hostname to hash
:param str salt: optional salt to use when hashing (must be 20 bytes long)
:return: the hashed hostname as a `str`
"""
if salt is None:
salt = os.urandom(sha1().digest_size)
else:
if salt.startswith('|1|'):
salt = salt.split('|')[2]
salt = decodebytes(b(salt))
assert len(salt) == sha1().digest_size
hmac = HMAC(salt, b(hostname), sha1).digest()
hostkey = '|1|%s|%s' % (u(encodebytes(salt)), u(encodebytes(hmac)))
return hostkey.replace('\n', '')
python类HMAC的实例源码
def hash_host(hostname, salt=None):
"""
Return a "hashed" form of the hostname, as used by OpenSSH when storing
hashed hostnames in the known_hosts file.
:param str hostname: the hostname to hash
:param str salt: optional salt to use when hashing (must be 20 bytes long)
:return: the hashed hostname as a `str`
"""
if salt is None:
salt = os.urandom(sha1().digest_size)
else:
if salt.startswith('|1|'):
salt = salt.split('|')[2]
salt = decodebytes(b(salt))
assert len(salt) == sha1().digest_size
hmac = HMAC(salt, b(hostname), sha1).digest()
hostkey = '|1|%s|%s' % (u(encodebytes(salt)), u(encodebytes(hmac)))
return hostkey.replace('\n', '')
def _hi(data, salt, iterations):
"""A simple implementation of PBKDF2."""
mac = hmac.HMAC(data, None, sha1)
def _digest(msg, mac=mac):
"""Get a digest for msg."""
_mac = mac.copy()
_mac.update(msg)
return _mac.digest()
from_bytes = _from_bytes
to_bytes = _to_bytes
_u1 = _digest(salt + b'\x00\x00\x00\x01')
_ui = from_bytes(_u1, 'big')
for _ in range(iterations - 1):
_u1 = _digest(_u1)
_ui ^= from_bytes(_u1, 'big')
return to_bytes(_ui, 20, 'big')
def _authenticate_cram_md5(credentials, sock_info):
"""Authenticate using CRAM-MD5 (RFC 2195)
"""
source = credentials.source
username = credentials.username
password = credentials.password
# The password used as the mac key is the
# same as what we use for MONGODB-CR
passwd = _password_digest(username, password)
cmd = SON([('saslStart', 1),
('mechanism', 'CRAM-MD5'),
('payload', Binary(b'')),
('autoAuthorize', 1)])
response = sock_info.command(source, cmd)
# MD5 as implicit default digest for digestmod is deprecated
# in python 3.4
mac = hmac.HMAC(key=passwd.encode('utf-8'), digestmod=md5)
mac.update(response['payload'])
challenge = username.encode('utf-8') + b' ' + b(mac.hexdigest())
cmd = SON([('saslContinue', 1),
('conversationId', response['conversationId']),
('payload', Binary(challenge))])
sock_info.command(source, cmd)
def get_disqus_sso_payload(user):
"""Return remote_auth_s3 and api_key for user."""
DISQUS_PUBLIC_KEY = current_app.config.get('DISQUS_PUBLIC_KEY')
DISQUS_SECRET_KEY = current_app.config.get('DISQUS_SECRET_KEY')
if DISQUS_PUBLIC_KEY and DISQUS_SECRET_KEY:
if user:
data = simplejson.dumps({
'id': user.id,
'username': user.name,
'email': user.email_addr,
})
else:
data = simplejson.dumps({})
# encode the data to base64
message = base64.b64encode(data)
# generate a timestamp for signing the message
timestamp = int(time.time())
# generate our hmac signature
sig = hmac.HMAC(DISQUS_SECRET_KEY, '%s %s' % (message, timestamp),
hashlib.sha1).hexdigest()
return message, timestamp, sig, DISQUS_PUBLIC_KEY
else:
return None, None, None, None
def _authenticate_cram_md5(credentials, sock_info, cmd_func):
"""Authenticate using CRAM-MD5 (RFC 2195)
"""
source, username, password = credentials
# The password used as the mac key is the
# same as what we use for MONGODB-CR
passwd = _password_digest(username, password)
cmd = SON([('saslStart', 1),
('mechanism', 'CRAM-MD5'),
('payload', Binary(b(''))),
('autoAuthorize', 1)])
response, _ = cmd_func(sock_info, source, cmd)
# MD5 as implicit default digest for digestmod is deprecated
# in python 3.4
mac = hmac.HMAC(key=passwd.encode('utf-8'), digestmod=_DMOD)
mac.update(response['payload'])
challenge = username.encode('utf-8') + b(' ') + b(mac.hexdigest())
cmd = SON([('saslContinue', 1),
('conversationId', response['conversationId']),
('payload', Binary(challenge))])
cmd_func(sock_info, source, cmd)
def _authenticate_cram_md5(credentials, sock_info, cmd_func):
"""Authenticate using CRAM-MD5 (RFC 2195)
"""
source, username, password = credentials
# The password used as the mac key is the
# same as what we use for MONGODB-CR
passwd = _password_digest(username, password)
cmd = SON([('saslStart', 1),
('mechanism', 'CRAM-MD5'),
('payload', Binary(b(''))),
('autoAuthorize', 1)])
response, _ = cmd_func(sock_info, source, cmd)
# MD5 as implicit default digest for digestmod is deprecated
# in python 3.4
mac = hmac.HMAC(key=passwd.encode('utf-8'), digestmod=_DMOD)
mac.update(response['payload'])
challenge = username.encode('utf-8') + b(' ') + b(mac.hexdigest())
cmd = SON([('saslContinue', 1),
('conversationId', response['conversationId']),
('payload', Binary(challenge))])
cmd_func(sock_info, source, cmd)
def _authenticate_cram_md5(credentials, sock_info, cmd_func):
"""Authenticate using CRAM-MD5 (RFC 2195)
"""
source, username, password = credentials
# The password used as the mac key is the
# same as what we use for MONGODB-CR
passwd = _password_digest(username, password)
cmd = SON([('saslStart', 1),
('mechanism', 'CRAM-MD5'),
('payload', Binary(b(''))),
('autoAuthorize', 1)])
response, _ = cmd_func(sock_info, source, cmd)
# MD5 as implicit default digest for digestmod is deprecated
# in python 3.4
mac = hmac.HMAC(key=passwd.encode('utf-8'), digestmod=_DMOD)
mac.update(response['payload'])
challenge = username.encode('utf-8') + b(' ') + b(mac.hexdigest())
cmd = SON([('saslContinue', 1),
('conversationId', response['conversationId']),
('payload', Binary(challenge))])
cmd_func(sock_info, source, cmd)
def _authenticate_cram_md5(credentials, sock_info, cmd_func):
"""Authenticate using CRAM-MD5 (RFC 2195)
"""
source, username, password = credentials
# The password used as the mac key is the
# same as what we use for MONGODB-CR
passwd = _password_digest(username, password)
cmd = SON([('saslStart', 1),
('mechanism', 'CRAM-MD5'),
('payload', Binary(b(''))),
('autoAuthorize', 1)])
response, _ = cmd_func(sock_info, source, cmd)
# MD5 as implicit default digest for digestmod is deprecated
# in python 3.4
mac = hmac.HMAC(key=passwd.encode('utf-8'), digestmod=_DMOD)
mac.update(response['payload'])
challenge = username.encode('utf-8') + b(' ') + b(mac.hexdigest())
cmd = SON([('saslContinue', 1),
('conversationId', response['conversationId']),
('payload', Binary(challenge))])
cmd_func(sock_info, source, cmd)
def testValidLogin(self):
p = pop3.POP3()
p.factory = TestServerFactory()
p.factory.challengers = {'CRAM-MD5': cred.credentials.CramMD5Credentials}
p.portal = cred.portal.Portal(TestRealm())
ch = cred.checkers.InMemoryUsernamePasswordDatabaseDontUse()
ch.addUser('testuser', 'testpassword')
p.portal.registerChecker(ch)
s = StringIO.StringIO()
p.transport = internet.protocol.FileWrapper(s)
p.connectionMade()
p.lineReceived("CAPA")
self.failUnless(s.getvalue().find("SASL CRAM-MD5") >= 0)
p.lineReceived("AUTH CRAM-MD5")
chal = s.getvalue().splitlines()[-1][2:]
chal = base64.decodestring(chal)
response = hmac.HMAC('testpassword', chal).hexdigest()
p.lineReceived(base64.encodestring('testuser ' + response).rstrip('\n'))
self.failUnless(p.mbox)
self.failUnless(s.getvalue().splitlines()[-1].find("+OK") >= 0)
p.connectionLost(failure.Failure(Exception("Test harness disconnect")))
def test_legacy_block_size_warnings(self):
class MockCrazyHash(object):
"""Ain't no block_size attribute here."""
def __init__(self, *args):
self._x = hashlib.sha1(*args)
self.digest_size = self._x.digest_size
def update(self, v):
self._x.update(v)
def digest(self):
return self._x.digest()
with warnings.catch_warnings():
warnings.simplefilter('error', RuntimeWarning)
with self.assertRaises(RuntimeWarning):
hmac.HMAC(b'a', b'b', digestmod=MockCrazyHash)
self.fail('Expected warning about missing block_size')
MockCrazyHash.block_size = 1
with self.assertRaises(RuntimeWarning):
hmac.HMAC(b'a', b'b', digestmod=MockCrazyHash)
self.fail('Expected warning about small block_size')
def _buildIoTHubSasToken(self, deviceId, usage):
if usage==self.USAGE_CREATE_DEVICE:
keyValue=self.initialKeyValue
elif usage==self.USAGE_DEVICE_SENDS_MESSAGE:
keyValue=self.currentDeviceKey
else:
keyVale
resourceUri = '%s/devices/%s' % (self.iotHost, deviceId)
targetUri = resourceUri.lower()
expiryTime = self._buildExpiryOn()
toSign = '%s\n%s' % (targetUri, expiryTime)
key = base64.b64decode(keyValue.encode('utf-8'))
signature = urllib.request.pathname2url(
base64.b64encode(
hmac.HMAC(key, toSign.encode('utf-8'), hashlib.sha256).digest()
)
).replace('/', '%2F')
if usage==self.USAGE_CREATE_DEVICE:
return self.TOKEN_FORMAT_WITH_POLICY % (signature, expiryTime, self.initialKeyName, targetUri)
elif usage==self.USAGE_DEVICE_SENDS_MESSAGE:
return self.TOKEN_FORMAT_NO_POLICY % (signature, expiryTime, targetUri)
else:
return None
def test_legacy_block_size_warnings(self):
class MockCrazyHash(object):
"""Ain't no block_size attribute here."""
def __init__(self, *args):
self._x = hashlib.sha1(*args)
self.digest_size = self._x.digest_size
def update(self, v):
self._x.update(v)
def digest(self):
return self._x.digest()
with warnings.catch_warnings():
warnings.simplefilter('error', RuntimeWarning)
with self.assertRaises(RuntimeWarning):
hmac.HMAC('a', 'b', digestmod=MockCrazyHash)
self.fail('Expected warning about missing block_size')
MockCrazyHash.block_size = 1
with self.assertRaises(RuntimeWarning):
hmac.HMAC('a', 'b', digestmod=MockCrazyHash)
self.fail('Expected warning about small block_size')
def test_legacy_block_size_warnings(self):
class MockCrazyHash(object):
"""Ain't no block_size attribute here."""
def __init__(self, *args):
self._x = hashlib.sha1(*args)
self.digest_size = self._x.digest_size
def update(self, v):
self._x.update(v)
def digest(self):
return self._x.digest()
with warnings.catch_warnings():
warnings.simplefilter('error', RuntimeWarning)
with self.assertRaises(RuntimeWarning):
hmac.HMAC('a', 'b', digestmod=MockCrazyHash)
self.fail('Expected warning about missing block_size')
MockCrazyHash.block_size = 1
with self.assertRaises(RuntimeWarning):
hmac.HMAC('a', 'b', digestmod=MockCrazyHash)
self.fail('Expected warning about small block_size')
def testValidLogin(self):
p = pop3.POP3()
p.factory = TestServerFactory()
p.factory.challengers = {'CRAM-MD5': cred.credentials.CramMD5Credentials}
p.portal = cred.portal.Portal(TestRealm())
ch = cred.checkers.InMemoryUsernamePasswordDatabaseDontUse()
ch.addUser('testuser', 'testpassword')
p.portal.registerChecker(ch)
s = StringIO.StringIO()
p.transport = internet.protocol.FileWrapper(s)
p.connectionMade()
p.lineReceived("CAPA")
self.failUnless(s.getvalue().find("SASL CRAM-MD5") >= 0)
p.lineReceived("AUTH CRAM-MD5")
chal = s.getvalue().splitlines()[-1][2:]
chal = base64.decodestring(chal)
response = hmac.HMAC('testpassword', chal).hexdigest()
p.lineReceived(base64.encodestring('testuser ' + response).rstrip('\n'))
self.failUnless(p.mbox)
self.failUnless(s.getvalue().splitlines()[-1].find("+OK") >= 0)
p.connectionLost(failure.Failure(Exception("Test harness disconnect")))
def _hi(data, salt, iterations):
"""A simple implementation of PBKDF2."""
mac = hmac.HMAC(data, None, sha1)
def _digest(msg, mac=mac):
"""Get a digest for msg."""
_mac = mac.copy()
_mac.update(msg)
return _mac.digest()
from_bytes = _from_bytes
to_bytes = _to_bytes
_u1 = _digest(salt + b'\x00\x00\x00\x01')
_ui = from_bytes(_u1, 'big')
for _ in range(iterations - 1):
_u1 = _digest(_u1)
_ui ^= from_bytes(_u1, 'big')
return to_bytes(_ui, 20, 'big')
def _authenticate_cram_md5(credentials, sock_info):
"""Authenticate using CRAM-MD5 (RFC 2195)
"""
source = credentials.source
username = credentials.username
password = credentials.password
# The password used as the mac key is the
# same as what we use for MONGODB-CR
passwd = _password_digest(username, password)
cmd = SON([('saslStart', 1),
('mechanism', 'CRAM-MD5'),
('payload', Binary(b'')),
('autoAuthorize', 1)])
response = sock_info.command(source, cmd)
# MD5 as implicit default digest for digestmod is deprecated
# in python 3.4
mac = hmac.HMAC(key=passwd.encode('utf-8'), digestmod=md5)
mac.update(response['payload'])
challenge = username.encode('utf-8') + b' ' + b(mac.hexdigest())
cmd = SON([('saslContinue', 1),
('conversationId', response['conversationId']),
('payload', Binary(challenge))])
sock_info.command(source, cmd)
def time_code(self, moment: int=None):
"""
Returns a string indicating the current valid token which will be
generated, and which should be matched to authenticate the user.
:param moment: A time value, defaulting to now.
:type moment: int
:return: A 6-digit authentication token
:rtype: str
"""
if moment is None:
moment = time.time()
moment = int(moment // 30)
time_bytes = struct.pack('>q', moment)
hash_digest = hmac.HMAC(self._secret, time_bytes, hashlib.sha1).digest()
offset = hash_digest[-1] & 0x0F
truncated_digest = hash_digest[offset:offset + 4]
code = struct.unpack('>L', truncated_digest)[0]
code &= 0x7FFFFFFF
code %= 1000000
return '%06d' % code
def _authenticate_cram_md5(credentials, sock_info, cmd_func):
"""Authenticate using CRAM-MD5 (RFC 2195)
"""
source, username, password = credentials
# The password used as the mac key is the
# same as what we use for MONGODB-CR
passwd = _password_digest(username, password)
cmd = SON([('saslStart', 1),
('mechanism', 'CRAM-MD5'),
('payload', Binary(b(''))),
('autoAuthorize', 1)])
response, _ = cmd_func(sock_info, source, cmd)
# MD5 as implicit default digest for digestmod is deprecated
# in python 3.4
mac = hmac.HMAC(key=passwd.encode('utf-8'), digestmod=_DMOD)
mac.update(response['payload'])
challenge = username.encode('utf-8') + b(' ') + b(mac.hexdigest())
cmd = SON([('saslContinue', 1),
('conversationId', response['conversationId']),
('payload', Binary(challenge))])
cmd_func(sock_info, source, cmd)
def test_legacy_block_size_warnings(self):
class MockCrazyHash(object):
"""Ain't no block_size attribute here."""
def __init__(self, *args):
self._x = hashlib.sha1(*args)
self.digest_size = self._x.digest_size
def update(self, v):
self._x.update(v)
def digest(self):
return self._x.digest()
with warnings.catch_warnings():
warnings.simplefilter('error', RuntimeWarning)
with self.assertRaises(RuntimeWarning):
hmac.HMAC(b'a', b'b', digestmod=MockCrazyHash)
self.fail('Expected warning about missing block_size')
MockCrazyHash.block_size = 1
with self.assertRaises(RuntimeWarning):
hmac.HMAC(b'a', b'b', digestmod=MockCrazyHash)
self.fail('Expected warning about small block_size')
def valid_tokens(domain, secret, validity=86400):
if isinstance(secret, str):
secret = secret.encode()
if isinstance(domain, str):
domain = domain.encode('idna')
def token_at(when):
h = hmac.HMAC(secret, digestmod=hashlib.sha256)
h.update(domain)
h.update(str(int(when/validity)).encode())
return h.hexdigest()
# We're not totally strict on validity, but want
# to avoid the worst case where we provided a token
# that immediately expires. Allow up to three half-validity
# intervals in the past.
now = int(time.time())
validity = int(validity/2)
past = now - 3*validity
return [token_at(when) for when in range(past, now, validity)]
def _iotHubSasToken(self, uri):
"""Create the Azure IOT Hub SAS token
Args:
uri (str): Resource URI
Returns:
Token string
"""
expiry = str(int((time.time() + self.TOKEN_VALID_SECS)))
key = base64.b64decode(self.keyvalue.encode('utf-8'))
sig = '{}\n{}'.format(uri, expiry).encode('utf-8')
signature = urllib.quote(
base64.b64encode(hmac.HMAC(key, sig, hashlib.sha256).digest())
).replace('/', '%2F')
token = 'SharedAccessSignature sig={}&se={}&skn={}&sr={}'.format(
signature, expiry, self.keyname, uri.lower())
return token
def _CRAM_MD5_AUTH(self, challenge):
""" Authobject to use with CRAM-MD5 authentication. """
import hmac
return self.user + " " + hmac.HMAC(self.password, challenge).hexdigest()
def test_gensignature():
with patch.object(hmac.HMAC, 'digest', return_value='digest_output'):
api_key = ID_WITH_VALID_LENGTH
date = 'date'
content_type = 'content_type'
request_method = 'method'
request_body = 'body'
query_path = 'path'
signature = api_utils.gensignature(api_key, date, content_type, request_method, request_body, query_path)
assert signature == 'digest_output'
def _hash_internal(method, salt, password):
"""Internal password hash helper. Supports plaintext without salt,
unsalted and salted passwords. In case salted passwords are used
hmac is used.
"""
if method == 'plain':
return password, method
if isinstance(password, text_type):
password = password.encode('utf-8')
if method.startswith('pbkdf2:'):
args = method[7:].split(':')
if len(args) not in (1, 2):
raise ValueError('Invalid number of arguments for PBKDF2')
method = args.pop(0)
iterations = args and int(args[0] or 0) or DEFAULT_PBKDF2_ITERATIONS
is_pbkdf2 = True
actual_method = 'pbkdf2:%s:%d' % (method, iterations)
else:
is_pbkdf2 = False
actual_method = method
hash_func = _hash_funcs.get(method)
if hash_func is None:
raise TypeError('invalid method %r' % method)
if is_pbkdf2:
if not salt:
raise ValueError('Salt is required for PBKDF2')
rv = pbkdf2_hex(password, salt, iterations,
hashfunc=hash_func)
elif salt:
if isinstance(salt, text_type):
salt = salt.encode('utf-8')
rv = hmac.HMAC(salt, password, hash_func).hexdigest()
else:
h = hash_func()
h.update(password)
rv = h.hexdigest()
return rv, actual_method
def _hash_internal(method, salt, password):
"""Internal password hash helper. Supports plaintext without salt,
unsalted and salted passwords. In case salted passwords are used
hmac is used.
"""
if method == 'plain':
return password, method
if isinstance(password, text_type):
password = password.encode('utf-8')
if method.startswith('pbkdf2:'):
args = method[7:].split(':')
if len(args) not in (1, 2):
raise ValueError('Invalid number of arguments for PBKDF2')
method = args.pop(0)
iterations = args and int(args[0] or 0) or DEFAULT_PBKDF2_ITERATIONS
is_pbkdf2 = True
actual_method = 'pbkdf2:%s:%d' % (method, iterations)
else:
is_pbkdf2 = False
actual_method = method
hash_func = _hash_funcs.get(method)
if hash_func is None:
raise TypeError('invalid method %r' % method)
if is_pbkdf2:
if not salt:
raise ValueError('Salt is required for PBKDF2')
rv = pbkdf2_hex(password, salt, iterations,
hashfunc=hash_func)
elif salt:
if isinstance(salt, text_type):
salt = salt.encode('utf-8')
rv = hmac.HMAC(salt, password, hash_func).hexdigest()
else:
h = hash_func()
h.update(password)
rv = h.hexdigest()
return rv, actual_method
def _CRAM_MD5_AUTH(self, challenge):
"""Authobject to use with CRAM-MD5 authentication."""
import hmac
return self.user + " " + hmac.HMAC(self.password, challenge).hexdigest()
def _hash_internal(method, salt, password):
"""Internal password hash helper. Supports plaintext without salt,
unsalted and salted passwords. In case salted passwords are used
hmac is used.
"""
if method == 'plain':
return password, method
if isinstance(password, text_type):
password = password.encode('utf-8')
if method.startswith('pbkdf2:'):
args = method[7:].split(':')
if len(args) not in (1, 2):
raise ValueError('Invalid number of arguments for PBKDF2')
method = args.pop(0)
iterations = args and int(args[0] or 0) or DEFAULT_PBKDF2_ITERATIONS
is_pbkdf2 = True
actual_method = 'pbkdf2:%s:%d' % (method, iterations)
else:
is_pbkdf2 = False
actual_method = method
hash_func = _hash_funcs.get(method)
if hash_func is None:
raise TypeError('invalid method %r' % method)
if is_pbkdf2:
if not salt:
raise ValueError('Salt is required for PBKDF2')
rv = pbkdf2_hex(password, salt, iterations,
hashfunc=hash_func)
elif salt:
if isinstance(salt, text_type):
salt = salt.encode('utf-8')
rv = hmac.HMAC(salt, password, hash_func).hexdigest()
else:
h = hash_func()
h.update(password)
rv = h.hexdigest()
return rv, actual_method
def keyed_md5(secret, challenge):
"""Create the keyed MD5 string for the given secret and challenge."""
import warnings
warnings.warn(
"keyed_md5() is deprecated. Use the stdlib module hmac instead.",
DeprecationWarning, stacklevel=2
)
return hmac.HMAC(secret, challenge).hexdigest()
def testCheckPassword(self):
c = credentials.CramMD5Credentials()
chal = c.getChallenge()
c.response = hmac.HMAC('secret', chal).hexdigest()
self.failUnless(c.checkPassword('secret'))