def add_auth(self, http_request, **kwargs):
headers = http_request.headers
params = http_request.params
params['AWSAccessKeyId'] = self._provider.access_key
params['SignatureVersion'] = self.SignatureVersion
params['Timestamp'] = boto.utils.get_ts()
qs, signature = self._calc_signature(
http_request.params, http_request.method,
http_request.auth_path, http_request.host)
boto.log.debug('query_string: %s Signature: %s' % (qs, signature))
if http_request.method == 'POST':
headers['Content-Type'] = 'application/x-www-form-urlencoded; charset=UTF-8'
http_request.body = qs + '&Signature=' + urllib.parse.quote_plus(signature)
http_request.headers['Content-Length'] = str(len(http_request.body))
else:
http_request.body = ''
# if this is a retried request, the qs from the previous try will
# already be there, we need to get rid of that and rebuild it
http_request.path = http_request.path.split('?')[0]
http_request.path = (http_request.path + '?' + qs +
'&Signature=' + urllib.parse.quote_plus(signature))
python类utils()的实例源码
def _calc_signature(self, params, verb, path, server_name):
boto.log.debug('using _calc_signature_2')
string_to_sign = '%s\n%s\n%s\n' % (verb, server_name.lower(), path)
hmac = self._get_hmac()
params['SignatureMethod'] = self.algorithm()
if self._provider.security_token:
params['SecurityToken'] = self._provider.security_token
keys = sorted(params.keys())
pairs = []
for key in keys:
val = boto.utils.get_utf8_value(params[key])
pairs.append(urllib.parse.quote(key, safe='') + '=' +
urllib.parse.quote(val, safe='-_~'))
qs = '&'.join(pairs)
boto.log.debug('query string: %s' % qs)
string_to_sign += qs
boto.log.debug('string_to_sign: %s' % string_to_sign)
hmac.update(string_to_sign.encode('utf-8'))
b64 = base64.b64encode(hmac.digest())
boto.log.debug('len(b64)=%d' % len(b64))
boto.log.debug('base64 encoded digest: %s' % b64)
return (qs, b64)
def add_auth(self, req, **kwargs):
req.params['AWSAccessKeyId'] = self._provider.access_key
req.params['SignatureVersion'] = self.SignatureVersion
req.params['Timestamp'] = boto.utils.get_ts()
qs, signature = self._calc_signature(req.params, req.method,
req.auth_path, req.host)
boto.log.debug('query_string: %s Signature: %s' % (qs, signature))
if req.method == 'POST':
req.headers['Content-Length'] = str(len(req.body))
req.headers['Content-Type'] = req.headers.get('Content-Type',
'text/plain')
else:
req.body = ''
# if this is a retried req, the qs from the previous try will
# already be there, we need to get rid of that and rebuild it
req.path = req.path.split('?')[0]
req.path = (req.path + '?' + qs +
'&Signature=' + urllib.parse.quote_plus(signature))
def _populate_keys_from_metadata_server(self):
# get_instance_metadata is imported here because of a circular
# dependency.
boto.log.debug("Retrieving credentials from metadata server.")
from boto.utils import get_instance_metadata
timeout = config.getfloat('Boto', 'metadata_service_timeout', 1.0)
metadata = get_instance_metadata(timeout=timeout, num_retries=1)
# I'm assuming there's only one role on the instance profile.
if metadata and 'iam' in metadata:
security = metadata['iam']['security-credentials'].values()[0]
self._access_key = security['AccessKeyId']
self._secret_key = self._convert_key_to_str(security['SecretAccessKey'])
self._security_token = security['Token']
expires_at = security['Expiration']
self._credential_expiry_time = datetime.strptime(
expires_at, "%Y-%m-%dT%H:%M:%SZ")
boto.log.debug("Retrieved credentials will expire in %s at: %s",
self._credential_expiry_time - datetime.now(), expires_at)
def set_cors_xml(self, cors_xml, headers=None):
"""
Set the CORS (Cross-Origin Resource Sharing) for a bucket.
:type cors_xml: str
:param cors_xml: The XML document describing your desired
CORS configuration. See the S3 documentation for details
of the exact syntax required.
"""
fp = StringIO.StringIO(cors_xml)
md5 = boto.utils.compute_md5(fp)
if headers is None:
headers = {}
headers['Content-MD5'] = md5[1]
headers['Content-Type'] = 'text/xml'
response = self.connection.make_request('PUT', self.name,
data=fp.getvalue(),
query_args='cors',
headers=headers)
body = response.read()
if response.status == 200:
return True
else:
raise self.connection.provider.storage_response_error(
response.status, response.reason, body)
def add_auth(self, http_request, **kwargs):
headers = http_request.headers
method = http_request.method
auth_path = http_request.auth_path
if 'Date' not in headers:
headers['Date'] = formatdate(usegmt=True)
if self._provider.security_token:
key = self._provider.security_token_header
headers[key] = self._provider.security_token
string_to_sign = boto.utils.canonical_string(method, auth_path,
headers, None,
self._provider)
boto.log.debug('StringToSign:\n%s' % string_to_sign)
b64_hmac = self.sign_string(string_to_sign)
auth_hdr = self._provider.auth_header
headers['Authorization'] = ("%s %s:%s" %
(auth_hdr,
self._provider.access_key, b64_hmac))
def add_auth(self, http_request, **kwargs):
headers = http_request.headers
params = http_request.params
params['AWSAccessKeyId'] = self._provider.access_key
params['SignatureVersion'] = self.SignatureVersion
params['Timestamp'] = boto.utils.get_ts()
qs, signature = self._calc_signature(
http_request.params, http_request.method,
http_request.auth_path, http_request.host)
boto.log.debug('query_string: %s Signature: %s' % (qs, signature))
if http_request.method == 'POST':
headers['Content-Type'] = 'application/x-www-form-urlencoded; charset=UTF-8'
http_request.body = qs + '&Signature=' + urllib.quote_plus(signature)
http_request.headers['Content-Length'] = str(len(http_request.body))
else:
http_request.body = ''
# if this is a retried request, the qs from the previous try will
# already be there, we need to get rid of that and rebuild it
http_request.path = http_request.path.split('?')[0]
http_request.path = (http_request.path + '?' + qs +
'&Signature=' + urllib.quote_plus(signature))
def add_auth(self, req, **kwargs):
req.params['AWSAccessKeyId'] = self._provider.access_key
req.params['SignatureVersion'] = self.SignatureVersion
req.params['Timestamp'] = boto.utils.get_ts()
qs, signature = self._calc_signature(req.params, req.method,
req.auth_path, req.host)
boto.log.debug('query_string: %s Signature: %s' % (qs, signature))
if req.method == 'POST':
req.headers['Content-Length'] = str(len(req.body))
req.headers['Content-Type'] = req.headers.get('Content-Type',
'text/plain')
else:
req.body = ''
# if this is a retried req, the qs from the previous try will
# already be there, we need to get rid of that and rebuild it
req.path = req.path.split('?')[0]
req.path = (req.path + '?' + qs +
'&Signature=' + urllib.quote_plus(signature))
def get_aws_instance_meta(utils):
identity = utils.get_instance_identity()
meta = utils.get_instance_metadata()
return {
'instance-id': meta['instance-id'],
'region': identity['document']['region'],
'av-zone': meta['placement']['availability-zone'],
}
def detach_ebs(aws_access_key_id, aws_secret_access_key, devices):
import boto
import boto.ec2
meta = get_aws_instance_meta(boto.utils)
connection = boto.ec2.connect_to_region(
meta['region'],
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key
)
bd_mapping = get_aws_block_device_mapping(
connection, meta['instance-id']
)
for key, bd in bd_mapping.iteritems():
if key in devices:
connection.detach_volume(bd.volume_id)
def test_model(self,hashfunc=None):
from boto.utils import Password
from boto.sdb.db.model import Model
from boto.sdb.db.property import PasswordProperty
import hashlib
class MyModel(Model):
password=PasswordProperty(hashfunc=hashfunc)
return MyModel
def test_custom_password_class(self):
from boto.utils import Password
from boto.sdb.db.model import Model
from boto.sdb.db.property import PasswordProperty
import hmac, hashlib
myhashfunc = hashlib.md5
## Define a new Password class
class MyPassword(Password):
hashfunc = myhashfunc #hashlib.md5 #lambda cls,msg: hmac.new('mysecret',msg)
## Define a custom password property using the new Password class
class MyPasswordProperty(PasswordProperty):
data_type=MyPassword
type_name=MyPassword.__name__
## Define a model using the new password property
class MyModel(Model):
password=MyPasswordProperty()#hashfunc=hashlib.md5)
obj = MyModel()
obj.password = 'bar'
expected = myhashfunc('bar').hexdigest() #hmac.new('mysecret','bar').hexdigest()
log.debug("\npassword=%s\nexpected=%s" % (obj.password, expected))
self.assertTrue(obj.password == 'bar' )
obj.save()
id= obj.id
time.sleep(5)
obj = MyModel.get_by_id(id)
self.assertEquals(obj.password, 'bar')
self.assertEquals(str(obj.password), expected)
#hmac.new('mysecret','bar').hexdigest())
def build_base_http_request(self, method, path, auth_path,
params=None, headers=None, data='', host=None):
path = self.get_path(path)
if auth_path is not None:
auth_path = self.get_path(auth_path)
if params is None:
params = {}
else:
params = params.copy()
if headers is None:
headers = {}
else:
headers = headers.copy()
if self.host_header and not boto.utils.find_matching_headers('host', headers):
headers['host'] = self.host_header
host = host or self.host
if self.use_proxy:
if not auth_path:
auth_path = path
path = self.prefix_proxy_to_path(path, host)
if self.proxy_user and self.proxy_pass and not self.is_secure:
# If is_secure, we don't have to set the proxy authentication
# header here, we did that in the CONNECT to the proxy.
headers.update(self.get_proxy_auth_header())
return HTTPRequest(method, self.protocol, host, self.port,
path, auth_path, params, headers, data)
def get_utf8_value(self, value):
return boto.utils.get_utf8_value(value)
def _get_key_internal(self, key_name, headers, query_args_l):
query_args = '&'.join(query_args_l) or None
response = self.connection.make_request('HEAD', self.name, key_name,
headers=headers,
query_args=query_args)
response.read()
# Allow any success status (2xx) - for example this lets us
# support Range gets, which return status 206:
if response.status / 100 == 2:
k = self.key_class(self)
provider = self.connection.provider
k.metadata = boto.utils.get_aws_metadata(response.msg, provider)
for field in Key.base_fields:
k.__dict__[field.lower().replace('-', '_')] = \
response.getheader(field)
# the following machinations are a workaround to the fact that
# apache/fastcgi omits the content-length header on HEAD
# requests when the content-length is zero.
# See http://goo.gl/0Tdax for more details.
clen = response.getheader('content-length')
if clen:
k.size = int(response.getheader('content-length'))
else:
k.size = 0
k.name = key_name
k.handle_version_headers(response)
k.handle_encryption_headers(response)
k.handle_restore_headers(response)
k.handle_addl_headers(response.getheaders())
return k, response
else:
if response.status == 404:
return None, response
else:
raise self.connection.provider.storage_response_error(
response.status, response.reason, '')
def configure_lifecycle(self, lifecycle_config, headers=None):
"""
Configure lifecycle for this bucket.
:type lifecycle_config: :class:`boto.s3.lifecycle.Lifecycle`
:param lifecycle_config: The lifecycle configuration you want
to configure for this bucket.
"""
xml = lifecycle_config.to_xml()
#xml = xml.encode('utf-8')
fp = StringIO(xml)
md5 = boto.utils.compute_md5(fp)
if headers is None:
headers = {}
headers['Content-MD5'] = md5[1]
headers['Content-Type'] = 'text/xml'
response = self.connection.make_request('PUT', self.name,
data=fp.getvalue(),
query_args='lifecycle',
headers=headers)
body = response.read()
if response.status == 200:
return True
else:
raise self.connection.provider.storage_response_error(
response.status, response.reason, body)
def query_string(self, http_request):
parameter_names = sorted(http_request.params.keys())
pairs = []
for pname in parameter_names:
pval = boto.utils.get_utf8_value(http_request.params[pname])
pairs.append(urllib.parse.quote(pname, safe='') + '=' +
urllib.parse.quote(pval, safe='-_~'))
return '&'.join(pairs)
def canonical_query_string(self, http_request):
# POST requests pass parameters in through the
# http_request.body field.
if http_request.method == 'POST':
return ""
l = []
for param in sorted(http_request.params):
value = boto.utils.get_utf8_value(http_request.params[param])
l.append('%s=%s' % (urllib.parse.quote(param, safe='-_.~'),
urllib.parse.quote(value, safe='-_.~')))
return '&'.join(l)
def payload(self, http_request):
body = http_request.body
# If the body is a file like object, we can use
# boto.utils.compute_hash, which will avoid reading
# the entire body into memory.
if hasattr(body, 'seek') and hasattr(body, 'read'):
return boto.utils.compute_hash(body, hash_algorithm=sha256)[0]
elif not isinstance(body, bytes):
body = body.encode('utf-8')
return sha256(body).hexdigest()
def canonical_query_string(self, http_request):
# Note that we just do not return an empty string for
# POST request. Query strings in url are included in canonical
# query string.
l = []
for param in sorted(http_request.params):
value = boto.utils.get_utf8_value(http_request.params[param])
l.append('%s=%s' % (urllib.parse.quote(param, safe='-_.~'),
urllib.parse.quote(value, safe='-_.~')))
return '&'.join(l)