def add_auth(self, req, **kwargs):
"""
Add AWS3 authentication to a request.
:type req: :class`boto.connection.HTTPRequest`
:param req: The HTTPRequest object.
"""
# This could be a retry. Make sure the previous
# authorization header is removed first.
if 'X-Amzn-Authorization' in req.headers:
del req.headers['X-Amzn-Authorization']
req.headers['X-Amz-Date'] = formatdate(usegmt=True)
if self._provider.security_token:
req.headers['X-Amz-Security-Token'] = self._provider.security_token
string_to_sign, headers_to_sign = self.string_to_sign(req)
boto.log.debug('StringToSign:\n%s' % string_to_sign)
hash_value = sha256(string_to_sign.encode('utf-8')).digest()
b64_hmac = self.sign_string(hash_value)
s = "AWS3 AWSAccessKeyId=%s," % self._provider.access_key
s += "Algorithm=%s," % self.algorithm()
s += "SignedHeaders=%s," % ';'.join(headers_to_sign)
s += "Signature=%s" % b64_hmac
req.headers['X-Amzn-Authorization'] = s
python类digest()的实例源码
def add_auth(self, req, **kwargs):
"""
Add AWS3 authentication to a request.
:type req: :class`boto.connection.HTTPRequest`
:param req: The HTTPRequest object.
"""
# This could be a retry. Make sure the previous
# authorization header is removed first.
if 'X-Amzn-Authorization' in req.headers:
del req.headers['X-Amzn-Authorization']
req.headers['X-Amz-Date'] = formatdate(usegmt=True)
if self._provider.security_token:
req.headers['X-Amz-Security-Token'] = self._provider.security_token
string_to_sign, headers_to_sign = self.string_to_sign(req)
boto.log.debug('StringToSign:\n%s' % string_to_sign)
hash_value = sha256(string_to_sign.encode('utf-8')).digest()
b64_hmac = self.sign_string(hash_value)
s = "AWS3 AWSAccessKeyId=%s," % self._provider.access_key
s += "Algorithm=%s," % self.algorithm()
s += "SignedHeaders=%s," % ';'.join(headers_to_sign)
s += "Signature=%s" % b64_hmac
req.headers['X-Amzn-Authorization'] = s
def calc_signature(self, args):
scheme, host, port = requests.packages.urllib3.get_host(args['url'])
string_to_sign = '%s\n%s\n%s\n' % (args['method'], host, '/')
hmac = self.hmac.copy()
args['params']['SignatureMethod'] = 'HmacSHA256'
if self.credentials.token:
args['params']['SecurityToken'] = self.credentials.token
sorted_params = sorted(args['params'])
pairs = []
for key in sorted_params:
value = args['params'][key]
pairs.append(quote(key, safe='') + '=' +
quote(value, safe='-_~'))
qs = '&'.join(pairs)
string_to_sign += qs
print('string_to_sign')
print(string_to_sign)
hmac.update(string_to_sign.encode('utf-8'))
b64 = base64.b64encode(hmac.digest()).strip().decode('utf-8')
return (qs, b64)
def add_auth(self, req, **kwargs):
"""
Add AWS3 authentication to a request.
:type req: :class`boto.connection.HTTPRequest`
:param req: The HTTPRequest object.
"""
# This could be a retry. Make sure the previous
# authorization header is removed first.
if 'X-Amzn-Authorization' in req.headers:
del req.headers['X-Amzn-Authorization']
req.headers['X-Amz-Date'] = formatdate(usegmt=True)
if self._provider.security_token:
req.headers['X-Amz-Security-Token'] = self._provider.security_token
string_to_sign, headers_to_sign = self.string_to_sign(req)
boto.log.debug('StringToSign:\n%s' % string_to_sign)
hash_value = sha256(string_to_sign).digest()
b64_hmac = self.sign_string(hash_value)
s = "AWS3 AWSAccessKeyId=%s," % self._provider.access_key
s += "Algorithm=%s," % self.algorithm()
s += "SignedHeaders=%s," % ';'.join(headers_to_sign)
s += "Signature=%s" % b64_hmac
req.headers['X-Amzn-Authorization'] = s
def _calc_signature(self, params, verb, path, server_name):
boto.log.debug('using _calc_signature_2')
string_to_sign = '%s\n%s\n%s\n' % (verb, server_name.lower(), path)
hmac = self._get_hmac()
params['SignatureMethod'] = self.algorithm()
if self._provider.security_token:
params['SecurityToken'] = self._provider.security_token
keys = sorted(params.keys())
pairs = []
for key in keys:
val = boto.utils.get_utf8_value(params[key])
pairs.append(urllib.quote(key, safe='') + '=' +
urllib.quote(val, safe='-_~'))
qs = '&'.join(pairs)
boto.log.debug('query string: %s' % qs)
string_to_sign += qs
boto.log.debug('string_to_sign: %s' % string_to_sign)
hmac.update(string_to_sign)
b64 = base64.b64encode(hmac.digest())
boto.log.debug('len(b64)=%d' % len(b64))
boto.log.debug('base64 encoded digest: %s' % b64)
return (qs, b64)
def calc_signature(self, args):
scheme, host, port = requests.packages.urllib3.get_host(args['url'])
string_to_sign = '%s\n%s\n%s\n' % (args['method'], host, '/')
hmac = self.hmac.copy()
args['params']['SignatureMethod'] = 'HmacSHA256'
if self.credentials.token:
args['params']['SecurityToken'] = self.credentials.token
sorted_params = sorted(args['params'])
pairs = []
for key in sorted_params:
value = args['params'][key]
pairs.append(quote(key, safe='') + '=' +
quote(value, safe='-_~'))
qs = '&'.join(pairs)
string_to_sign += qs
print('string_to_sign')
print(string_to_sign)
hmac.update(string_to_sign.encode('utf-8'))
b64 = base64.b64encode(hmac.digest()).strip().decode('utf-8')
return (qs, b64)
def add_auth(self, req, **kwargs):
"""
Add AWS3 authentication to a request.
:type req: :class`boto.connection.HTTPRequest`
:param req: The HTTPRequest object.
"""
# This could be a retry. Make sure the previous
# authorization header is removed first.
if 'X-Amzn-Authorization' in req.headers:
del req.headers['X-Amzn-Authorization']
req.headers['X-Amz-Date'] = formatdate(usegmt=True)
if self._provider.security_token:
req.headers['X-Amz-Security-Token'] = self._provider.security_token
string_to_sign, headers_to_sign = self.string_to_sign(req)
boto.log.debug('StringToSign:\n%s' % string_to_sign)
hash_value = sha256(string_to_sign).digest()
b64_hmac = self.sign_string(hash_value)
s = "AWS3 AWSAccessKeyId=%s," % self._provider.access_key
s += "Algorithm=%s," % self.algorithm()
s += "SignedHeaders=%s," % ';'.join(headers_to_sign)
s += "Signature=%s" % b64_hmac
req.headers['X-Amzn-Authorization'] = s
def _calc_signature(self, params, verb, path, server_name):
boto.log.debug('using _calc_signature_2')
string_to_sign = '%s\n%s\n%s\n' % (verb, server_name.lower(), path)
hmac = self._get_hmac()
params['SignatureMethod'] = self.algorithm()
if self._provider.security_token:
params['SecurityToken'] = self._provider.security_token
keys = sorted(params.keys())
pairs = []
for key in keys:
val = boto.utils.get_utf8_value(params[key])
pairs.append(urllib.quote(key, safe='') + '=' +
urllib.quote(val, safe='-_~'))
qs = '&'.join(pairs)
boto.log.debug('query string: %s' % qs)
string_to_sign += qs
boto.log.debug('string_to_sign: %s' % string_to_sign)
hmac.update(string_to_sign)
b64 = base64.b64encode(hmac.digest())
boto.log.debug('len(b64)=%d' % len(b64))
boto.log.debug('base64 encoded digest: %s' % b64)
return (qs, b64)
def add_auth(self, req, **kwargs):
"""
Add AWS3 authentication to a request.
:type req: :class`boto.connection.HTTPRequest`
:param req: The HTTPRequest object.
"""
# This could be a retry. Make sure the previous
# authorization header is removed first.
if 'X-Amzn-Authorization' in req.headers:
del req.headers['X-Amzn-Authorization']
req.headers['X-Amz-Date'] = formatdate(usegmt=True)
if self._provider.security_token:
req.headers['X-Amz-Security-Token'] = self._provider.security_token
string_to_sign, headers_to_sign = self.string_to_sign(req)
boto.log.debug('StringToSign:\n%s' % string_to_sign)
hash_value = sha256(string_to_sign.encode('utf-8')).digest()
b64_hmac = self.sign_string(hash_value)
s = "AWS3 AWSAccessKeyId=%s," % self._provider.access_key
s += "Algorithm=%s," % self.algorithm()
s += "SignedHeaders=%s," % ';'.join(headers_to_sign)
s += "Signature=%s" % b64_hmac
req.headers['X-Amzn-Authorization'] = s
def calc_signature(self, args):
scheme, host, port = requests.packages.urllib3.get_host(args['url'])
string_to_sign = '%s\n%s\n%s\n' % (args['method'], host, '/')
hmac = self.hmac.copy()
args['params']['SignatureMethod'] = 'HmacSHA256'
if self.credentials.token:
args['params']['SecurityToken'] = self.credentials.token
sorted_params = sorted(args['params'])
pairs = []
for key in sorted_params:
value = args['params'][key]
pairs.append(quote(key, safe='') + '=' +
quote(value, safe='-_~'))
qs = '&'.join(pairs)
string_to_sign += qs
print('string_to_sign')
print(string_to_sign)
hmac.update(string_to_sign.encode('utf-8'))
b64 = base64.b64encode(hmac.digest()).strip().decode('utf-8')
return (qs, b64)
def add_auth(self, req, **kwargs):
"""
Add AWS3 authentication to a request.
:type req: :class`boto.connection.HTTPRequest`
:param req: The HTTPRequest object.
"""
# This could be a retry. Make sure the previous
# authorization header is removed first.
if 'X-Amzn-Authorization' in req.headers:
del req.headers['X-Amzn-Authorization']
req.headers['X-Amz-Date'] = formatdate(usegmt=True)
if self._provider.security_token:
req.headers['X-Amz-Security-Token'] = self._provider.security_token
string_to_sign, headers_to_sign = self.string_to_sign(req)
boto.log.debug('StringToSign:\n%s' % string_to_sign)
hash_value = sha256(string_to_sign).digest()
b64_hmac = self.sign_string(hash_value)
s = "AWS3 AWSAccessKeyId=%s," % self._provider.access_key
s += "Algorithm=%s," % self.algorithm()
s += "SignedHeaders=%s," % ';'.join(headers_to_sign)
s += "Signature=%s" % b64_hmac
req.headers['X-Amzn-Authorization'] = s
def _calc_signature(self, params, verb, path, server_name):
boto.log.debug('using _calc_signature_2')
string_to_sign = '%s\n%s\n%s\n' % (verb, server_name.lower(), path)
hmac = self._get_hmac()
params['SignatureMethod'] = self.algorithm()
if self._provider.security_token:
params['SecurityToken'] = self._provider.security_token
keys = sorted(params.keys())
pairs = []
for key in keys:
val = boto.utils.get_utf8_value(params[key])
pairs.append(urllib.quote(key, safe='') + '=' +
urllib.quote(val, safe='-_~'))
qs = '&'.join(pairs)
boto.log.debug('query string: %s' % qs)
string_to_sign += qs
boto.log.debug('string_to_sign: %s' % string_to_sign)
hmac.update(string_to_sign)
b64 = base64.b64encode(hmac.digest())
boto.log.debug('len(b64)=%d' % len(b64))
boto.log.debug('base64 encoded digest: %s' % b64)
return (qs, b64)
def sign_string(self, string_to_sign):
new_hmac = self._get_hmac()
new_hmac.update(string_to_sign.encode('utf-8'))
return encodebytes(new_hmac.digest()).decode('utf-8').strip()
def _sign(self, key, msg, hex=False):
if not isinstance(key, bytes):
key = key.encode('utf-8')
if hex:
sig = hmac.new(key, msg.encode('utf-8'), sha256).hexdigest()
else:
sig = hmac.new(key, msg.encode('utf-8'), sha256).digest()
return sig
def _calc_signature(self, params, *args):
boto.log.debug('using _calc_signature_0')
hmac = self._get_hmac()
s = params['Action'] + params['Timestamp']
hmac.update(s.encode('utf-8'))
keys = params.keys()
keys.sort(cmp=lambda x, y: cmp(x.lower(), y.lower()))
pairs = []
for key in keys:
val = boto.utils.get_utf8_value(params[key])
pairs.append(key + '=' + urllib.parse.quote(val))
qs = '&'.join(pairs)
return (qs, base64.b64encode(hmac.digest()))
def _calc_signature(self, params, *args):
boto.log.debug('using _calc_signature_1')
hmac = self._get_hmac()
keys = params.keys()
keys.sort(cmp=lambda x, y: cmp(x.lower(), y.lower()))
pairs = []
for key in keys:
hmac.update(key.encode('utf-8'))
val = boto.utils.get_utf8_value(params[key])
hmac.update(val)
pairs.append(key + '=' + urllib.parse.quote(val))
qs = '&'.join(pairs)
return (qs, base64.b64encode(hmac.digest()))
def sign_string(self, string_to_sign):
new_hmac = self._get_hmac()
new_hmac.update(string_to_sign.encode('utf-8'))
return encodebytes(new_hmac.digest()).decode('utf-8').strip()
def _sign(self, key, msg, hex=False):
if not isinstance(key, bytes):
key = key.encode('utf-8')
if hex:
sig = hmac.new(key, msg.encode('utf-8'), sha256).hexdigest()
else:
sig = hmac.new(key, msg.encode('utf-8'), sha256).digest()
return sig
def _calc_signature(self, params, *args):
boto.log.debug('using _calc_signature_0')
hmac = self._get_hmac()
s = params['Action'] + params['Timestamp']
hmac.update(s.encode('utf-8'))
keys = params.keys()
keys.sort(cmp=lambda x, y: cmp(x.lower(), y.lower()))
pairs = []
for key in keys:
val = boto.utils.get_utf8_value(params[key])
pairs.append(key + '=' + urllib.parse.quote(val))
qs = '&'.join(pairs)
return (qs, base64.b64encode(hmac.digest()))
def _calc_signature(self, params, *args):
boto.log.debug('using _calc_signature_1')
hmac = self._get_hmac()
keys = list(params.keys())
keys.sort(key=lambda x: x.lower())
pairs = []
for key in keys:
hmac.update(key.encode('utf-8'))
val = boto.utils.get_utf8_value(params[key])
hmac.update(val)
pairs.append(key + '=' + urllib.parse.quote(val))
qs = '&'.join(pairs)
return (qs, base64.b64encode(hmac.digest()))
def sign_string(self, string_to_sign):
new_hmac = self._get_hmac()
new_hmac.update(string_to_sign)
return base64.encodestring(new_hmac.digest()).strip()
def _sign(self, key, msg, hex=False):
if hex:
sig = hmac.new(key, msg.encode('utf-8'), sha256).hexdigest()
else:
sig = hmac.new(key, msg.encode('utf-8'), sha256).digest()
return sig
def _calc_signature(self, params, *args):
boto.log.debug('using _calc_signature_0')
hmac = self._get_hmac()
s = params['Action'] + params['Timestamp']
hmac.update(s)
keys = params.keys()
keys.sort(cmp=lambda x, y: cmp(x.lower(), y.lower()))
pairs = []
for key in keys:
val = boto.utils.get_utf8_value(params[key])
pairs.append(key + '=' + urllib.quote(val))
qs = '&'.join(pairs)
return (qs, base64.b64encode(hmac.digest()))
def sign_string(self, string_to_sign):
new_hmac = self._get_hmac()
new_hmac.update(string_to_sign)
return base64.encodestring(new_hmac.digest()).strip()
def _sign(self, key, msg, hex=False):
if hex:
sig = hmac.new(key, msg.encode('utf-8'), sha256).hexdigest()
else:
sig = hmac.new(key, msg.encode('utf-8'), sha256).digest()
return sig
def _calc_signature(self, params, *args):
boto.log.debug('using _calc_signature_0')
hmac = self._get_hmac()
s = params['Action'] + params['Timestamp']
hmac.update(s)
keys = params.keys()
keys.sort(cmp=lambda x, y: cmp(x.lower(), y.lower()))
pairs = []
for key in keys:
val = boto.utils.get_utf8_value(params[key])
pairs.append(key + '=' + urllib.quote(val))
qs = '&'.join(pairs)
return (qs, base64.b64encode(hmac.digest()))
def generate_hmac(self, secret_key, counter):
"""Create a 160-bit HMAC from secret and counter.
Args:
secret_key: a byte string (recommended minimum 20 bytes) that is
the shared secret between the client and server.
counter: an integer value represented in an 8-byte string with
the most significant byte first and least significant byte
last
Returns:
The HMAC digest; a byte string, 20 bytes long.
Raises:
TypeError: if the counter and secret are not byte strings.
ValueError: if the counter is not 8 bytes long.
"""
from hashlib import sha1
import hmac
if not isinstance(secret_key, bytes):
raise TypeError('secret_key must be a byte string')
if not isinstance(counter, bytes):
raise TypeError('counter must be a byte string')
if (8 != len(counter)):
raise ValueError('counter must be 8 bytes')
hmac = hmac.new(secret_key, counter, sha1)
hash = hmac.digest()
return hash
def sign_string(self, string_to_sign):
new_hmac = self._get_hmac()
new_hmac.update(string_to_sign.encode('utf-8'))
return encodebytes(new_hmac.digest()).decode('utf-8').strip()
def _sign(self, key, msg, hex=False):
if not isinstance(key, bytes):
key = key.encode('utf-8')
if hex:
sig = hmac.new(key, msg.encode('utf-8'), sha256).hexdigest()
else:
sig = hmac.new(key, msg.encode('utf-8'), sha256).digest()
return sig
def _calc_signature(self, params, *args):
boto.log.debug('using _calc_signature_0')
hmac = self._get_hmac()
s = params['Action'] + params['Timestamp']
hmac.update(s.encode('utf-8'))
keys = params.keys()
keys.sort(cmp=lambda x, y: cmp(x.lower(), y.lower()))
pairs = []
for key in keys:
val = boto.utils.get_utf8_value(params[key])
pairs.append(key + '=' + urllib.parse.quote(val))
qs = '&'.join(pairs)
return (qs, base64.b64encode(hmac.digest()))