def get_aws_metadata(headers, provider=None):
if not provider:
provider = boto.provider.get_default()
metadata_prefix = provider.metadata_prefix
metadata = {}
for hkey in headers.keys():
if hkey.lower().startswith(metadata_prefix):
val = urllib.parse.unquote(headers[hkey])
if isinstance(val, bytes):
try:
val = val.decode('utf-8')
except UnicodeDecodeError:
# Just leave the value as-is
pass
metadata[hkey[len(metadata_prefix):]] = val
del headers[hkey]
return metadata
python类provider()的实例源码
def get_aws_metadata(headers, provider=None):
if not provider:
provider = boto.provider.get_default()
metadata_prefix = provider.metadata_prefix
metadata = {}
for hkey in headers.keys():
if hkey.lower().startswith(metadata_prefix):
val = urllib.parse.unquote(headers[hkey])
if isinstance(val, bytes):
try:
val = val.decode('utf-8')
except UnicodeDecodeError:
# Just leave the value as-is
pass
metadata[hkey[len(metadata_prefix):]] = val
del headers[hkey]
return metadata
def aws_access_key_id(self):
return self.provider.access_key
def aws_secret_access_key(self):
return self.provider.secret_key
def profile_name(self):
return self.provider.profile_name
def merge_meta(headers, metadata, provider=None):
if not provider:
provider = boto.provider.get_default()
metadata_prefix = provider.metadata_prefix
final_headers = headers.copy()
for k in metadata.keys():
if k.lower() in boto.s3.key.Key.base_user_settable_fields:
final_headers[k] = metadata[k]
else:
final_headers[metadata_prefix + k] = metadata[k]
return final_headers
def _get_session_token(self):
self.provider = Provider(self._provider_type)
self._auth_handler.update_provider(self.provider)
def aws_access_key_id(self):
return self.provider.access_key
def aws_secret_access_key(self):
return self.provider.secret_key
def profile_name(self):
return self.provider.profile_name
def __init__(self, aws_access_key_id=None, aws_secret_access_key=None,
is_secure=True, port=None, proxy=None, proxy_port=None,
proxy_user=None, proxy_pass=None, host=None, debug=0,
https_connection_factory=None, path='/', security_token=None,
validate_certs=True, profile_name=None, provider='aws'):
super(AWSQueryConnection, self).__init__(
host, aws_access_key_id,
aws_secret_access_key,
is_secure, port, proxy,
proxy_port, proxy_user, proxy_pass,
debug, https_connection_factory, path,
security_token=security_token,
validate_certs=validate_certs,
profile_name=profile_name,
provider=provider)
def __init__(self, aws_access_key_id=None, aws_secret_access_key=None,
is_secure=True, port=None, proxy=None, proxy_port=None,
proxy_user=None, proxy_pass=None, debug=0,
https_connection_factory=None, region=None, path='/',
converter=None, validate_certs=True, anon=False,
security_token=None, profile_name=None):
"""
:type anon: boolean
:param anon: If this parameter is True, the ``STSConnection`` object
will make anonymous requests, and it will not use AWS
Credentials or even search for AWS Credentials to make these
requests.
"""
if not region:
region = RegionInfo(self, self.DefaultRegionName,
self.DefaultRegionEndpoint,
connection_cls=STSConnection)
self.region = region
self.anon = anon
self._mutex = threading.Semaphore()
provider = 'aws'
# If an anonymous request is sent, do not try to look for credentials.
# So we pass in dummy values for the access key id, secret access
# key, and session token. It does not matter that they are
# not actual values because the request is anonymous.
if self.anon:
provider = Provider('aws', NO_CREDENTIALS_PROVIDED,
NO_CREDENTIALS_PROVIDED,
NO_CREDENTIALS_PROVIDED)
super(STSConnection, self).__init__(aws_access_key_id,
aws_secret_access_key,
is_secure, port, proxy, proxy_port,
proxy_user, proxy_pass,
self.region.endpoint, debug,
https_connection_factory, path,
validate_certs=validate_certs,
security_token=security_token,
profile_name=profile_name,
provider=provider)
def merge_meta(headers, metadata, provider=None):
if not provider:
provider = boto.provider.get_default()
metadata_prefix = provider.metadata_prefix
final_headers = headers.copy()
for k in metadata.keys():
if k.lower() in boto.s3.key.Key.base_user_settable_fields:
final_headers[k] = metadata[k]
else:
final_headers[metadata_prefix + k] = metadata[k]
return final_headers
def _get_session_token(self):
self.provider = Provider(self._provider_type)
self._auth_handler.update_provider(self.provider)
def aws_access_key_id(self):
return self.provider.access_key
def aws_secret_access_key(self):
return self.provider.secret_key
def merge_meta(headers, metadata, provider=None):
if not provider:
provider = boto.provider.get_default()
metadata_prefix = provider.metadata_prefix
final_headers = headers.copy()
for k in metadata.keys():
if k.lower() in ['cache-control', 'content-md5', 'content-type',
'content-encoding', 'content-disposition',
'date', 'expires']:
final_headers[k] = metadata[k]
else:
final_headers[metadata_prefix + k] = metadata[k]
return final_headers
def get_aws_metadata(headers, provider=None):
if not provider:
provider = boto.provider.get_default()
metadata_prefix = provider.metadata_prefix
metadata = {}
for hkey in headers.keys():
if hkey.lower().startswith(metadata_prefix):
val = urllib.unquote_plus(headers[hkey])
try:
metadata[hkey[len(metadata_prefix):]] = unicode(val, 'utf-8')
except UnicodeDecodeError:
metadata[hkey[len(metadata_prefix):]] = val
del headers[hkey]
return metadata
def aws_access_key_id(self):
return self.provider.access_key
def aws_secret_access_key(self):
return self.provider.secret_key
def merge_meta(headers, metadata, provider=None):
if not provider:
provider = boto.provider.get_default()
metadata_prefix = provider.metadata_prefix
final_headers = headers.copy()
for k in metadata.keys():
if k.lower() in ['cache-control', 'content-md5', 'content-type',
'content-encoding', 'content-disposition',
'date', 'expires']:
final_headers[k] = metadata[k]
else:
final_headers[metadata_prefix + k] = metadata[k]
return final_headers
def get_aws_metadata(headers, provider=None):
if not provider:
provider = boto.provider.get_default()
metadata_prefix = provider.metadata_prefix
metadata = {}
for hkey in headers.keys():
if hkey.lower().startswith(metadata_prefix):
val = urllib.unquote_plus(headers[hkey])
try:
metadata[hkey[len(metadata_prefix):]] = unicode(val, 'utf-8')
except UnicodeDecodeError:
metadata[hkey[len(metadata_prefix):]] = val
del headers[hkey]
return metadata
def aws_access_key_id(self):
return self.provider.access_key
def aws_secret_access_key(self):
return self.provider.secret_key
def profile_name(self):
return self.provider.profile_name
def __init__(self, aws_access_key_id=None, aws_secret_access_key=None,
is_secure=True, port=None, proxy=None, proxy_port=None,
proxy_user=None, proxy_pass=None, host=None, debug=0,
https_connection_factory=None, path='/', security_token=None,
validate_certs=True, profile_name=None, provider='aws'):
super(AWSQueryConnection, self).__init__(
host, aws_access_key_id,
aws_secret_access_key,
is_secure, port, proxy,
proxy_port, proxy_user, proxy_pass,
debug, https_connection_factory, path,
security_token=security_token,
validate_certs=validate_certs,
profile_name=profile_name,
provider=provider)
def __init__(self, aws_access_key_id=None, aws_secret_access_key=None,
is_secure=True, port=None, proxy=None, proxy_port=None,
proxy_user=None, proxy_pass=None, debug=0,
https_connection_factory=None, region=None, path='/',
converter=None, validate_certs=True, anon=False,
security_token=None, profile_name=None):
"""
:type anon: boolean
:param anon: If this parameter is True, the ``STSConnection`` object
will make anonymous requests, and it will not use AWS
Credentials or even search for AWS Credentials to make these
requests.
"""
if not region:
region = RegionInfo(self, self.DefaultRegionName,
self.DefaultRegionEndpoint,
connection_cls=STSConnection)
self.region = region
self.anon = anon
self._mutex = threading.Semaphore()
provider = 'aws'
# If an anonymous request is sent, do not try to look for credentials.
# So we pass in dummy values for the access key id, secret access
# key, and session token. It does not matter that they are
# not actual values because the request is anonymous.
if self.anon:
provider = Provider('aws', NO_CREDENTIALS_PROVIDED,
NO_CREDENTIALS_PROVIDED,
NO_CREDENTIALS_PROVIDED)
super(STSConnection, self).__init__(aws_access_key_id,
aws_secret_access_key,
is_secure, port, proxy, proxy_port,
proxy_user, proxy_pass,
self.region.endpoint, debug,
https_connection_factory, path,
validate_certs=validate_certs,
security_token=security_token,
profile_name=profile_name,
provider=provider)
def merge_meta(headers, metadata, provider=None):
if not provider:
provider = boto.provider.get_default()
metadata_prefix = provider.metadata_prefix
final_headers = headers.copy()
for k in metadata.keys():
if k.lower() in boto.s3.key.Key.base_user_settable_fields:
final_headers[k] = metadata[k]
else:
final_headers[metadata_prefix + k] = metadata[k]
return final_headers
def _get_session_token(self):
self.provider = Provider(self._provider_type)
self._auth_handler.update_provider(self.provider)
def aws_access_key_id(self):
return self.provider.access_key