def calc_signature(self, args):
scheme, host, port = requests.packages.urllib3.get_host(args['url'])
string_to_sign = '%s\n%s\n%s\n' % (args['method'], host, '/')
hmac = self.hmac.copy()
args['params']['SignatureMethod'] = 'HmacSHA256'
if self.credentials.token:
args['params']['SecurityToken'] = self.credentials.token
sorted_params = sorted(args['params'])
pairs = []
for key in sorted_params:
value = args['params'][key]
pairs.append(quote(key, safe='') + '=' +
quote(value, safe='-_~'))
qs = '&'.join(pairs)
string_to_sign += qs
print('string_to_sign')
print(string_to_sign)
hmac.update(string_to_sign.encode('utf-8'))
b64 = base64.b64encode(hmac.digest()).strip().decode('utf-8')
return (qs, b64)
python类update()的实例源码
def _calc_signature(self, params, verb, path, server_name):
boto.log.debug('using _calc_signature_2')
string_to_sign = '%s\n%s\n%s\n' % (verb, server_name.lower(), path)
hmac = self._get_hmac()
params['SignatureMethod'] = self.algorithm()
if self._provider.security_token:
params['SecurityToken'] = self._provider.security_token
keys = sorted(params.keys())
pairs = []
for key in keys:
val = boto.utils.get_utf8_value(params[key])
pairs.append(urllib.quote(key, safe='') + '=' +
urllib.quote(val, safe='-_~'))
qs = '&'.join(pairs)
boto.log.debug('query string: %s' % qs)
string_to_sign += qs
boto.log.debug('string_to_sign: %s' % string_to_sign)
hmac.update(string_to_sign)
b64 = base64.b64encode(hmac.digest())
boto.log.debug('len(b64)=%d' % len(b64))
boto.log.debug('base64 encoded digest: %s' % b64)
return (qs, b64)
def calc_signature(self, args):
scheme, host, port = requests.packages.urllib3.get_host(args['url'])
string_to_sign = '%s\n%s\n%s\n' % (args['method'], host, '/')
hmac = self.hmac.copy()
args['params']['SignatureMethod'] = 'HmacSHA256'
if self.credentials.token:
args['params']['SecurityToken'] = self.credentials.token
sorted_params = sorted(args['params'])
pairs = []
for key in sorted_params:
value = args['params'][key]
pairs.append(quote(key, safe='') + '=' +
quote(value, safe='-_~'))
qs = '&'.join(pairs)
string_to_sign += qs
print('string_to_sign')
print(string_to_sign)
hmac.update(string_to_sign.encode('utf-8'))
b64 = base64.b64encode(hmac.digest()).strip().decode('utf-8')
return (qs, b64)
def _calc_signature(self, params, verb, path, server_name):
boto.log.debug('using _calc_signature_2')
string_to_sign = '%s\n%s\n%s\n' % (verb, server_name.lower(), path)
hmac = self._get_hmac()
params['SignatureMethod'] = self.algorithm()
if self._provider.security_token:
params['SecurityToken'] = self._provider.security_token
keys = sorted(params.keys())
pairs = []
for key in keys:
val = boto.utils.get_utf8_value(params[key])
pairs.append(urllib.quote(key, safe='') + '=' +
urllib.quote(val, safe='-_~'))
qs = '&'.join(pairs)
boto.log.debug('query string: %s' % qs)
string_to_sign += qs
boto.log.debug('string_to_sign: %s' % string_to_sign)
hmac.update(string_to_sign)
b64 = base64.b64encode(hmac.digest())
boto.log.debug('len(b64)=%d' % len(b64))
boto.log.debug('base64 encoded digest: %s' % b64)
return (qs, b64)
def calc_signature(self, args):
scheme, host, port = requests.packages.urllib3.get_host(args['url'])
string_to_sign = '%s\n%s\n%s\n' % (args['method'], host, '/')
hmac = self.hmac.copy()
args['params']['SignatureMethod'] = 'HmacSHA256'
if self.credentials.token:
args['params']['SecurityToken'] = self.credentials.token
sorted_params = sorted(args['params'])
pairs = []
for key in sorted_params:
value = args['params'][key]
pairs.append(quote(key, safe='') + '=' +
quote(value, safe='-_~'))
qs = '&'.join(pairs)
string_to_sign += qs
print('string_to_sign')
print(string_to_sign)
hmac.update(string_to_sign.encode('utf-8'))
b64 = base64.b64encode(hmac.digest()).strip().decode('utf-8')
return (qs, b64)
def _calc_signature(self, params, verb, path, server_name):
boto.log.debug('using _calc_signature_2')
string_to_sign = '%s\n%s\n%s\n' % (verb, server_name.lower(), path)
hmac = self._get_hmac()
params['SignatureMethod'] = self.algorithm()
if self._provider.security_token:
params['SecurityToken'] = self._provider.security_token
keys = sorted(params.keys())
pairs = []
for key in keys:
val = boto.utils.get_utf8_value(params[key])
pairs.append(urllib.quote(key, safe='') + '=' +
urllib.quote(val, safe='-_~'))
qs = '&'.join(pairs)
boto.log.debug('query string: %s' % qs)
string_to_sign += qs
boto.log.debug('string_to_sign: %s' % string_to_sign)
hmac.update(string_to_sign)
b64 = base64.b64encode(hmac.digest())
boto.log.debug('len(b64)=%d' % len(b64))
boto.log.debug('base64 encoded digest: %s' % b64)
return (qs, b64)
def sign_string(self, string_to_sign):
new_hmac = self._get_hmac()
new_hmac.update(string_to_sign.encode('utf-8'))
return encodebytes(new_hmac.digest()).decode('utf-8').strip()
def mangle_path_and_params(self, req):
"""
Returns a copy of the request object with fixed ``auth_path/params``
attributes from the original.
"""
modified_req = copy.copy(req)
# Unlike the most other services, in S3, ``req.params`` isn't the only
# source of query string parameters.
# Because of the ``query_args``, we may already have a query string
# **ON** the ``path/auth_path``.
# Rip them apart, so the ``auth_path/params`` can be signed
# appropriately.
parsed_path = urllib.parse.urlparse(modified_req.auth_path)
modified_req.auth_path = parsed_path.path
if modified_req.params is None:
modified_req.params = {}
else:
# To keep the original request object untouched. We must make
# a copy of the params dictionary. Because the copy of the
# original request directly refers to the params dictionary
# of the original request.
copy_params = req.params.copy()
modified_req.params = copy_params
raw_qs = parsed_path.query
existing_qs = urllib.parse.parse_qs(
raw_qs,
keep_blank_values=True
)
# ``parse_qs`` will return lists. Don't do that unless there's a real,
# live list provided.
for key, value in existing_qs.items():
if isinstance(value, (list, tuple)):
if len(value) == 1:
existing_qs[key] = value[0]
modified_req.params.update(existing_qs)
return modified_req
def _calc_signature(self, params, *args):
boto.log.debug('using _calc_signature_0')
hmac = self._get_hmac()
s = params['Action'] + params['Timestamp']
hmac.update(s.encode('utf-8'))
keys = params.keys()
keys.sort(cmp=lambda x, y: cmp(x.lower(), y.lower()))
pairs = []
for key in keys:
val = boto.utils.get_utf8_value(params[key])
pairs.append(key + '=' + urllib.parse.quote(val))
qs = '&'.join(pairs)
return (qs, base64.b64encode(hmac.digest()))
def _calc_signature(self, params, *args):
boto.log.debug('using _calc_signature_1')
hmac = self._get_hmac()
keys = params.keys()
keys.sort(cmp=lambda x, y: cmp(x.lower(), y.lower()))
pairs = []
for key in keys:
hmac.update(key.encode('utf-8'))
val = boto.utils.get_utf8_value(params[key])
hmac.update(val)
pairs.append(key + '=' + urllib.parse.quote(val))
qs = '&'.join(pairs)
return (qs, base64.b64encode(hmac.digest()))
def sign_string(self, string_to_sign):
new_hmac = self._get_hmac()
new_hmac.update(string_to_sign.encode('utf-8'))
return encodebytes(new_hmac.digest()).decode('utf-8').strip()
def mangle_path_and_params(self, req):
"""
Returns a copy of the request object with fixed ``auth_path/params``
attributes from the original.
"""
modified_req = copy.copy(req)
# Unlike the most other services, in S3, ``req.params`` isn't the only
# source of query string parameters.
# Because of the ``query_args``, we may already have a query string
# **ON** the ``path/auth_path``.
# Rip them apart, so the ``auth_path/params`` can be signed
# appropriately.
parsed_path = urllib.parse.urlparse(modified_req.auth_path)
modified_req.auth_path = parsed_path.path
if modified_req.params is None:
modified_req.params = {}
else:
# To keep the original request object untouched. We must make
# a copy of the params dictionary. Because the copy of the
# original request directly refers to the params dictionary
# of the original request.
copy_params = req.params.copy()
modified_req.params = copy_params
raw_qs = parsed_path.query
existing_qs = urllib.parse.parse_qs(
raw_qs,
keep_blank_values=True
)
# ``parse_qs`` will return lists. Don't do that unless there's a real,
# live list provided.
for key, value in existing_qs.items():
if isinstance(value, (list, tuple)):
if len(value) == 1:
existing_qs[key] = value[0]
modified_req.params.update(existing_qs)
return modified_req
def _calc_signature(self, params, *args):
boto.log.debug('using _calc_signature_0')
hmac = self._get_hmac()
s = params['Action'] + params['Timestamp']
hmac.update(s.encode('utf-8'))
keys = params.keys()
keys.sort(cmp=lambda x, y: cmp(x.lower(), y.lower()))
pairs = []
for key in keys:
val = boto.utils.get_utf8_value(params[key])
pairs.append(key + '=' + urllib.parse.quote(val))
qs = '&'.join(pairs)
return (qs, base64.b64encode(hmac.digest()))
def _calc_signature(self, params, *args):
boto.log.debug('using _calc_signature_1')
hmac = self._get_hmac()
keys = list(params.keys())
keys.sort(key=lambda x: x.lower())
pairs = []
for key in keys:
hmac.update(key.encode('utf-8'))
val = boto.utils.get_utf8_value(params[key])
hmac.update(val)
pairs.append(key + '=' + urllib.parse.quote(val))
qs = '&'.join(pairs)
return (qs, base64.b64encode(hmac.digest()))
def sign_string(self, string_to_sign):
new_hmac = self._get_hmac()
new_hmac.update(string_to_sign)
return base64.encodestring(new_hmac.digest()).strip()
def _calc_signature(self, params, *args):
boto.log.debug('using _calc_signature_0')
hmac = self._get_hmac()
s = params['Action'] + params['Timestamp']
hmac.update(s)
keys = params.keys()
keys.sort(cmp=lambda x, y: cmp(x.lower(), y.lower()))
pairs = []
for key in keys:
val = boto.utils.get_utf8_value(params[key])
pairs.append(key + '=' + urllib.quote(val))
qs = '&'.join(pairs)
return (qs, base64.b64encode(hmac.digest()))
def _calc_signature(self, params, *args):
boto.log.debug('using _calc_signature_1')
hmac = self._get_hmac()
keys = params.keys()
keys.sort(cmp=lambda x, y: cmp(x.lower(), y.lower()))
pairs = []
for key in keys:
hmac.update(key)
val = boto.utils.get_utf8_value(params[key])
hmac.update(val)
pairs.append(key + '=' + urllib.quote(val))
qs = '&'.join(pairs)
return (qs, base64.b64encode(hmac.digest()))
def sign_string(self, string_to_sign):
new_hmac = self._get_hmac()
new_hmac.update(string_to_sign)
return base64.encodestring(new_hmac.digest()).strip()
def _calc_signature(self, params, *args):
boto.log.debug('using _calc_signature_0')
hmac = self._get_hmac()
s = params['Action'] + params['Timestamp']
hmac.update(s)
keys = params.keys()
keys.sort(cmp=lambda x, y: cmp(x.lower(), y.lower()))
pairs = []
for key in keys:
val = boto.utils.get_utf8_value(params[key])
pairs.append(key + '=' + urllib.quote(val))
qs = '&'.join(pairs)
return (qs, base64.b64encode(hmac.digest()))
def _calc_signature(self, params, *args):
boto.log.debug('using _calc_signature_1')
hmac = self._get_hmac()
keys = params.keys()
keys.sort(cmp=lambda x, y: cmp(x.lower(), y.lower()))
pairs = []
for key in keys:
hmac.update(key)
val = boto.utils.get_utf8_value(params[key])
hmac.update(val)
pairs.append(key + '=' + urllib.quote(val))
qs = '&'.join(pairs)
return (qs, base64.b64encode(hmac.digest()))
def sign_string(self, string_to_sign):
new_hmac = self._get_hmac()
new_hmac.update(string_to_sign.encode('utf-8'))
return encodebytes(new_hmac.digest()).decode('utf-8').strip()
def mangle_path_and_params(self, req):
"""
Returns a copy of the request object with fixed ``auth_path/params``
attributes from the original.
"""
modified_req = copy.copy(req)
# Unlike the most other services, in S3, ``req.params`` isn't the only
# source of query string parameters.
# Because of the ``query_args``, we may already have a query string
# **ON** the ``path/auth_path``.
# Rip them apart, so the ``auth_path/params`` can be signed
# appropriately.
parsed_path = urllib.parse.urlparse(modified_req.auth_path)
modified_req.auth_path = parsed_path.path
if modified_req.params is None:
modified_req.params = {}
else:
# To keep the original request object untouched. We must make
# a copy of the params dictionary. Because the copy of the
# original request directly refers to the params dictionary
# of the original request.
copy_params = req.params.copy()
modified_req.params = copy_params
raw_qs = parsed_path.query
existing_qs = parse_qs_safe(
raw_qs,
keep_blank_values=True
)
# ``parse_qs`` will return lists. Don't do that unless there's a real,
# live list provided.
for key, value in existing_qs.items():
if isinstance(value, (list, tuple)):
if len(value) == 1:
existing_qs[key] = value[0]
modified_req.params.update(existing_qs)
return modified_req
def _calc_signature(self, params, *args):
boto.log.debug('using _calc_signature_0')
hmac = self._get_hmac()
s = params['Action'] + params['Timestamp']
hmac.update(s.encode('utf-8'))
keys = params.keys()
keys.sort(cmp=lambda x, y: cmp(x.lower(), y.lower()))
pairs = []
for key in keys:
val = boto.utils.get_utf8_value(params[key])
pairs.append(key + '=' + urllib.parse.quote(val))
qs = '&'.join(pairs)
return (qs, base64.b64encode(hmac.digest()))
def _calc_signature(self, params, *args):
boto.log.debug('using _calc_signature_1')
hmac = self._get_hmac()
keys = list(params.keys())
keys.sort(key=lambda x: x.lower())
pairs = []
for key in keys:
hmac.update(key.encode('utf-8'))
val = boto.utils.get_utf8_value(params[key])
hmac.update(val)
pairs.append(key + '=' + urllib.parse.quote(val))
qs = '&'.join(pairs)
return (qs, base64.b64encode(hmac.digest()))
def sign_string(self, string_to_sign):
new_hmac = self._get_hmac()
new_hmac.update(string_to_sign)
return base64.encodestring(new_hmac.digest()).strip()
def _calc_signature(self, params, *args):
boto.log.debug('using _calc_signature_0')
hmac = self._get_hmac()
s = params['Action'] + params['Timestamp']
hmac.update(s)
keys = params.keys()
keys.sort(cmp=lambda x, y: cmp(x.lower(), y.lower()))
pairs = []
for key in keys:
val = boto.utils.get_utf8_value(params[key])
pairs.append(key + '=' + urllib.quote(val))
qs = '&'.join(pairs)
return (qs, base64.b64encode(hmac.digest()))
def _calc_signature(self, params, *args):
boto.log.debug('using _calc_signature_1')
hmac = self._get_hmac()
keys = params.keys()
keys.sort(cmp=lambda x, y: cmp(x.lower(), y.lower()))
pairs = []
for key in keys:
hmac.update(key)
val = boto.utils.get_utf8_value(params[key])
hmac.update(val)
pairs.append(key + '=' + urllib.quote(val))
qs = '&'.join(pairs)
return (qs, base64.b64encode(hmac.digest()))
def presign(self, req, expires, iso_date=None):
"""
Presign a request using SigV4 query params. Takes in an HTTP request
and an expiration time in seconds and returns a URL.
http://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-query-string-auth.html
"""
if iso_date is None:
iso_date = datetime.datetime.utcnow().strftime('%Y%m%dT%H%M%SZ')
region = self.determine_region_name(req.host)
service = self.determine_service_name(req.host)
params = {
'X-Amz-Algorithm': 'AWS4-HMAC-SHA256',
'X-Amz-Credential': '%s/%s/%s/%s/aws4_request' % (
self._provider.access_key,
iso_date[:8],
region,
service
),
'X-Amz-Date': iso_date,
'X-Amz-Expires': expires,
'X-Amz-SignedHeaders': 'host'
}
if self._provider.security_token:
params['X-Amz-Security-Token'] = self._provider.security_token
headers_to_sign = self.headers_to_sign(req)
l = sorted(['%s' % n.lower().strip() for n in headers_to_sign])
params['X-Amz-SignedHeaders'] = ';'.join(l)
req.params.update(params)
cr = self.canonical_request(req)
# We need to replace the payload SHA with a constant
cr = '\n'.join(cr.split('\n')[:-1]) + '\nUNSIGNED-PAYLOAD'
# Date header is expected for string_to_sign, but unused otherwise
req.headers['X-Amz-Date'] = iso_date
sts = self.string_to_sign(req, cr)
signature = self.signature(req, sts)
# Add signature to params now that we have it
req.params['X-Amz-Signature'] = signature
return 'https://%s%s?%s' % (req.host, req.path,
urllib.parse.urlencode(req.params))
def presign(self, req, expires, iso_date=None):
"""
Presign a request using SigV4 query params. Takes in an HTTP request
and an expiration time in seconds and returns a URL.
http://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-query-string-auth.html
"""
if iso_date is None:
iso_date = datetime.datetime.utcnow().strftime('%Y%m%dT%H%M%SZ')
region = self.determine_region_name(req.host)
service = self.determine_service_name(req.host)
params = {
'X-Amz-Algorithm': 'AWS4-HMAC-SHA256',
'X-Amz-Credential': '%s/%s/%s/%s/aws4_request' % (
self._provider.access_key,
iso_date[:8],
region,
service
),
'X-Amz-Date': iso_date,
'X-Amz-Expires': expires,
'X-Amz-SignedHeaders': 'host'
}
if self._provider.security_token:
params['X-Amz-Security-Token'] = self._provider.security_token
headers_to_sign = self.headers_to_sign(req)
l = sorted(['%s' % n.lower().strip() for n in headers_to_sign])
params['X-Amz-SignedHeaders'] = ';'.join(l)
req.params.update(params)
cr = self.canonical_request(req)
# We need to replace the payload SHA with a constant
cr = '\n'.join(cr.split('\n')[:-1]) + '\nUNSIGNED-PAYLOAD'
# Date header is expected for string_to_sign, but unused otherwise
req.headers['X-Amz-Date'] = iso_date
sts = self.string_to_sign(req, cr)
signature = self.signature(req, sts)
# Add signature to params now that we have it
req.params['X-Amz-Signature'] = signature
return 'https://%s%s?%s' % (req.host, req.path,
urllib.parse.urlencode(req.params))
def getSignedUrl(base_url, params, timestamp = None):
global HMAC
hmac = HMAC.copy()
# Add a ISO 8601 compliant timestamp (in GMT)
if timestamp:
params['Timestamp'] = timestamp
else:
params['Timestamp'] = time.strftime("%Y-%m-%dT%H:%M:%S", time.gmtime())
#params['SignatureVersion']='1'
# Sort the URL parameters by key
keys = params.keys()
#keys.sort(cmp = lambda x, y: cmp(x.lower(), y.lower()))
keys.sort()
# Reconstruct the URL parameters and encode them
pairs = []
for key in keys:
#val = urllib.quote(params[key])
val = params[key]
pairs.append(key + '=' + val)
url_string = '&'.join(pairs)
url_string = url_string.replace('+', "%20")
url_string = url_string.replace(':', "%3A")
#Construct the string to sign
urlparts = base_url.split('/')
string_to_sign = """GET
%s
/%s/%s
%s""" % (urlparts[2], urlparts[3], urlparts[4], url_string)
url_string = url_string.replace(';', urllib.quote(';'))
# Sign the request
hmac.update(string_to_sign)
signature = hmac.digest()
# Base64 encode the signature
signature = base64.encodestring(signature).strip()
signature = signature.replace('+','%2B');
signature = signature.replace('=','%3D');
signature = signature.replace('/','%2F');
# Make the signature URL safe
url_string += "&Signature=%s" % signature
return "%s?%s" % (base_url, url_string)
## main functions