def fix_s3_host(request, signature_version, region_name,
default_endpoint_url='s3.amazonaws.com', **kwargs):
"""
This handler looks at S3 requests just before they are signed.
If there is a bucket name on the path (true for everything except
ListAllBuckets) it checks to see if that bucket name conforms to
the DNS naming conventions. If it does, it alters the request to
use ``virtual hosting`` style addressing rather than ``path-style``
addressing. This allows us to avoid 301 redirects for all
bucket names that can be CNAME'd.
"""
# By default we do not use virtual hosted style addressing when
# signed with signature version 4.
if signature_version is not botocore.UNSIGNED and \
's3v4' in signature_version:
return
elif not _allowed_region(region_name):
return
try:
switch_to_virtual_host_style(
request, signature_version, default_endpoint_url)
except InvalidDNSNameError as e:
bucket_name = e.kwargs['bucket_name']
logger.debug('Not changing URI, bucket is not DNS compatible: %s',
bucket_name)
python类UNSIGNED的实例源码
def fix_s3_host(request, signature_version, region_name,
default_endpoint_url='s3.amazonaws.com', **kwargs):
"""
This handler looks at S3 requests just before they are signed.
If there is a bucket name on the path (true for everything except
ListAllBuckets) it checks to see if that bucket name conforms to
the DNS naming conventions. If it does, it alters the request to
use ``virtual hosting`` style addressing rather than ``path-style``
addressing. This allows us to avoid 301 redirects for all
bucket names that can be CNAME'd.
"""
# By default we do not use virtual hosted style addressing when
# signed with signature version 4.
if signature_version is not botocore.UNSIGNED and \
's3v4' in signature_version:
return
elif not _allowed_region(region_name):
return
try:
switch_to_virtual_host_style(
request, signature_version, default_endpoint_url)
except InvalidDNSNameError as e:
bucket_name = e.kwargs['bucket_name']
logger.debug('Not changing URI, bucket is not DNS compatible: %s',
bucket_name)
def fix_s3_host(request, signature_version, region_name,
default_endpoint_url='s3.amazonaws.com', **kwargs):
"""
This handler looks at S3 requests just before they are signed.
If there is a bucket name on the path (true for everything except
ListAllBuckets) it checks to see if that bucket name conforms to
the DNS naming conventions. If it does, it alters the request to
use ``virtual hosting`` style addressing rather than ``path-style``
addressing. This allows us to avoid 301 redirects for all
bucket names that can be CNAME'd.
"""
# By default we do not use virtual hosted style addressing when
# signed with signature version 4.
if signature_version is not botocore.UNSIGNED and \
's3v4' in signature_version:
return
elif not _allowed_region(region_name):
return
try:
switch_to_virtual_host_style(
request, signature_version, default_endpoint_url)
except InvalidDNSNameError as e:
bucket_name = e.kwargs['bucket_name']
logger.debug('Not changing URI, bucket is not DNS compatible: %s',
bucket_name)
def main(loop, inventory):
"""
Trigger to populate kinto with the last inventories.
"""
server_url = os.getenv('SERVER_URL', 'http://localhost:8888/v1')
bucket = os.getenv('BUCKET', 'build-hub')
collection = os.getenv('COLLECTION', 'releases')
kinto_auth = tuple(os.getenv('AUTH', 'user:pass').split(':'))
kinto_client = kinto_http.Client(server_url=server_url, auth=kinto_auth,
bucket=bucket, collection=collection,
retry=NB_RETRY_REQUEST)
# Create bucket/collection and schemas.
if INITIALIZE_SERVER:
await initialize_kinto(loop, kinto_client, bucket, collection)
# Download CSVs, deduce records and push to Kinto.
session = aiobotocore.get_session(loop=loop)
boto_config = botocore.config.Config(signature_version=botocore.UNSIGNED)
async with session.create_client('s3', region_name=REGION_NAME, config=boto_config) as client:
keys_stream = list_manifest_entries(loop, client, inventory)
csv_stream = download_csv(loop, client, keys_stream)
records_stream = csv_to_records(loop, csv_stream, skip_incomplete=True)
await to_kinto(loop, records_stream, kinto_client, skip_existing=True)
def fix_s3_host(request, signature_version, region_name,
default_endpoint_url='s3.amazonaws.com', **kwargs):
"""
This handler looks at S3 requests just before they are signed.
If there is a bucket name on the path (true for everything except
ListAllBuckets) it checks to see if that bucket name conforms to
the DNS naming conventions. If it does, it alters the request to
use ``virtual hosting`` style addressing rather than ``path-style``
addressing. This allows us to avoid 301 redirects for all
bucket names that can be CNAME'd.
"""
# By default we do not use virtual hosted style addressing when
# signed with signature version 4.
if signature_version is not botocore.UNSIGNED and \
's3v4' in signature_version:
return
elif not _allowed_region(region_name):
return
try:
switch_to_virtual_host_style(
request, signature_version, default_endpoint_url)
except InvalidDNSNameError as e:
bucket_name = e.kwargs['bucket_name']
logger.debug('Not changing URI, bucket is not DNS compatible: %s',
bucket_name)
def fix_s3_host(request, signature_version, region_name,
default_endpoint_url='s3.amazonaws.com', **kwargs):
"""
This handler looks at S3 requests just before they are signed.
If there is a bucket name on the path (true for everything except
ListAllBuckets) it checks to see if that bucket name conforms to
the DNS naming conventions. If it does, it alters the request to
use ``virtual hosting`` style addressing rather than ``path-style``
addressing. This allows us to avoid 301 redirects for all
bucket names that can be CNAME'd.
"""
# By default we do not use virtual hosted style addressing when
# signed with signature version 4.
if signature_version is not botocore.UNSIGNED and \
's3v4' in signature_version:
return
elif not _allowed_region(region_name):
return
try:
switch_to_virtual_host_style(
request, signature_version, default_endpoint_url)
except InvalidDNSNameError as e:
bucket_name = e.kwargs['bucket_name']
logger.debug('Not changing URI, bucket is not DNS compatible: %s',
bucket_name)
utils.py 文件源码
项目:tf_aws_ecs_instance_draining_on_scale_in
作者: terraform-community-modules
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def fix_s3_host(request, signature_version, region_name,
default_endpoint_url='s3.amazonaws.com', **kwargs):
"""
This handler looks at S3 requests just before they are signed.
If there is a bucket name on the path (true for everything except
ListAllBuckets) it checks to see if that bucket name conforms to
the DNS naming conventions. If it does, it alters the request to
use ``virtual hosting`` style addressing rather than ``path-style``
addressing. This allows us to avoid 301 redirects for all
bucket names that can be CNAME'd.
"""
# By default we do not use virtual hosted style addressing when
# signed with signature version 4.
if signature_version is not botocore.UNSIGNED and \
's3v4' in signature_version:
return
elif not _allowed_region(region_name):
return
try:
switch_to_virtual_host_style(
request, signature_version, default_endpoint_url)
except InvalidDNSNameError as e:
bucket_name = e.kwargs['bucket_name']
logger.debug('Not changing URI, bucket is not DNS compatible: %s',
bucket_name)
def _choose_signer(self, operation_name, signing_type):
"""
Allow setting the signature version via the choose-signer event.
A value of `botocore.UNSIGNED` means no signing will be performed.
:param operation_name: The operation to sign.
:param signing_type: The type of signing that the signer is to be used
for.
:return: The signature version to sign with.
"""
signing_type_suffix_map = {
'presign-post': '-presign-post',
'presign-url': '-query'
}
suffix = signing_type_suffix_map.get(signing_type, '')
signature_version = self._signature_version
if signature_version is not botocore.UNSIGNED and not \
signature_version.endswith(suffix):
signature_version += suffix
handler, response = self._event_emitter.emit_until_response(
'choose-signer.{0}.{1}'.format(self._service_name, operation_name),
signing_name=self._signing_name, region_name=self._region_name,
signature_version=signature_version)
if response is not None:
signature_version = response
# The suffix needs to be checked again in case we get an improper
# signature version from choose-signer.
if signature_version is not botocore.UNSIGNED and not \
signature_version.endswith(suffix):
signature_version += suffix
return signature_version
def __init__(self, identity_pool_id=None):
# region name for the base account
self.region_name = 'eu-west-1'
self.identity_pool_id = identity_pool_id or const.AWS_IDENTITY_POOL_ID
self.cognito_client = boto3.client('cognito-identity', region_name=self.region_name,
config=Config(signature_version=UNSIGNED))
self.lambda_client_no_auth = self.create_aws_lambda_client()
def set_operation_specific_signer(context, signing_name, **kwargs):
""" Choose the operation-specific signer.
Individual operations may have a different auth type than the service as a
whole. This will most often manifest as operations that should not be
authenticated at all, but can include other auth modes such as sigv4
without body signing.
"""
auth_type = context.get('auth_type')
# Auth type will be None if the operation doesn't have a configured auth
# type.
if not auth_type:
return
# Auth type will be the string value 'none' if the operation should not
# be signed at all.
if auth_type == 'none':
return botocore.UNSIGNED
if auth_type.startswith('v4'):
signature_version = 'v4'
if signing_name == 's3':
signature_version = 's3v4'
# If the operation needs an unsigned body, we set additional context
# allowing the signer to be aware of this.
if auth_type == 'v4-unsigned-body':
context['payload_signing_enabled'] = False
return signature_version
def _choose_signer(self, operation_name, signing_type, context):
"""
Allow setting the signature version via the choose-signer event.
A value of `botocore.UNSIGNED` means no signing will be performed.
:param operation_name: The operation to sign.
:param signing_type: The type of signing that the signer is to be used
for.
:return: The signature version to sign with.
"""
signing_type_suffix_map = {
'presign-post': '-presign-post',
'presign-url': '-query'
}
suffix = signing_type_suffix_map.get(signing_type, '')
signature_version = self._signature_version
if signature_version is not botocore.UNSIGNED and not \
signature_version.endswith(suffix):
signature_version += suffix
handler, response = self._event_emitter.emit_until_response(
'choose-signer.{0}.{1}'.format(self._service_name, operation_name),
signing_name=self._signing_name, region_name=self._region_name,
signature_version=signature_version, context=context)
if response is not None:
signature_version = response
# The suffix needs to be checked again in case we get an improper
# signature version from choose-signer.
if signature_version is not botocore.UNSIGNED and not \
signature_version.endswith(suffix):
signature_version += suffix
return signature_version
def assume_role(account_role, samlAssertion):
conn = boto3.client('sts', config=client.Config(signature_version=botocore.UNSIGNED))
aws_session_token = conn.assume_role_with_saml(
RoleArn=account_role.role_arn,
PrincipalArn=account_role.principal_arn,
SAMLAssertion=samlAssertion,
DurationSeconds=3600,
)
return aws_session_token
def _choose_signer(self, operation_name, signing_type):
"""
Allow setting the signature version via the choose-signer event.
A value of `botocore.UNSIGNED` means no signing will be performed.
:param operation_name: The operation to sign.
:param signing_type: The type of signing that the signer is to be used
for.
:return: The signature version to sign with.
"""
signing_type_suffix_map = {
'presign-post': '-presign-post',
'presign-url': '-query'
}
suffix = signing_type_suffix_map.get(signing_type, '')
signature_version = self._signature_version
if signature_version is not botocore.UNSIGNED and not \
signature_version.endswith(suffix):
signature_version += suffix
handler, response = self._event_emitter.emit_until_response(
'choose-signer.{0}.{1}'.format(self._service_name, operation_name),
signing_name=self._signing_name, region_name=self._region_name,
signature_version=signature_version)
if response is not None:
signature_version = response
# The suffix needs to be checked again in case we get an improper
# signature version from choose-signer.
if signature_version is not botocore.UNSIGNED and not \
signature_version.endswith(suffix):
signature_version += suffix
return signature_version
def _choose_signer(self, operation_name, signing_type):
"""
Allow setting the signature version via the choose-signer event.
A value of `botocore.UNSIGNED` means no signing will be performed.
:param operation_name: The operation to sign.
:param signing_type: The type of signing that the signer is to be used
for.
:return: The signature version to sign with.
"""
signing_type_suffix_map = {
'presign-post': '-presign-post',
'presign-url': '-query'
}
suffix = signing_type_suffix_map.get(signing_type, '')
signature_version = self._signature_version
if signature_version is not botocore.UNSIGNED and not \
signature_version.endswith(suffix):
signature_version += suffix
handler, response = self._event_emitter.emit_until_response(
'choose-signer.{0}.{1}'.format(self._service_name, operation_name),
signing_name=self._signing_name, region_name=self._region_name,
signature_version=signature_version)
if response is not None:
signature_version = response
# The suffix needs to be checked again in case we get an improper
# signature version from choose-signer.
if signature_version is not botocore.UNSIGNED and not \
signature_version.endswith(suffix):
signature_version += suffix
return signature_version
def set_operation_specific_signer(context, signing_name, **kwargs):
""" Choose the operation-specific signer.
Individual operations may have a different auth type than the service as a
whole. This will most often manifest as operations that should not be
authenticated at all, but can include other auth modes such as sigv4
without body signing.
"""
auth_type = context.get('auth_type')
# Auth type will be None if the operation doesn't have a configured auth
# type.
if not auth_type:
return
# Auth type will be the string value 'none' if the operation should not
# be signed at all.
if auth_type == 'none':
return botocore.UNSIGNED
if auth_type.startswith('v4'):
signature_version = 'v4'
if signing_name == 's3':
signature_version = 's3v4'
# If the operation needs an unsigned body, we set additional context
# allowing the signer to be aware of this.
if auth_type == 'v4-unsigned-body':
context['payload_signing_enabled'] = False
return signature_version
def _choose_signer(self, operation_name, signing_type, context):
"""
Allow setting the signature version via the choose-signer event.
A value of `botocore.UNSIGNED` means no signing will be performed.
:param operation_name: The operation to sign.
:param signing_type: The type of signing that the signer is to be used
for.
:return: The signature version to sign with.
"""
signing_type_suffix_map = {
'presign-post': '-presign-post',
'presign-url': '-query'
}
suffix = signing_type_suffix_map.get(signing_type, '')
signature_version = self._signature_version
if signature_version is not botocore.UNSIGNED and not \
signature_version.endswith(suffix):
signature_version += suffix
handler, response = self._event_emitter.emit_until_response(
'choose-signer.{0}.{1}'.format(self._service_name, operation_name),
signing_name=self._signing_name, region_name=self._region_name,
signature_version=signature_version, context=context)
if response is not None:
signature_version = response
# The suffix needs to be checked again in case we get an improper
# signature version from choose-signer.
if signature_version is not botocore.UNSIGNED and not \
signature_version.endswith(suffix):
signature_version += suffix
return signature_version
def connect(self, refresh=False):
"""
Establish S3 connection object.
Parameters
----------
refresh : bool (True)
Whether to use cached filelists, if already read
"""
anon, key, secret, kwargs, ckwargs, token, ssl = (
self.anon, self.key, self.secret, self.kwargs,
self.client_kwargs, self.token, self.use_ssl)
# Include the current PID in the connection key so that different
# SSL connections are made for each process.
tok = tokenize(anon, key, secret, kwargs, ckwargs, token,
ssl, os.getpid())
if refresh:
self._conn.pop(tok, None)
if tok not in self._conn:
logger.debug("Open S3 connection. Anonymous: %s", self.anon)
if self.anon:
from botocore import UNSIGNED
conf = Config(connect_timeout=self.connect_timeout,
read_timeout=self.read_timeout,
signature_version=UNSIGNED, **self.config_kwargs)
self.session = boto3.Session(**self.kwargs)
else:
conf = Config(connect_timeout=self.connect_timeout,
read_timeout=self.read_timeout,
**self.config_kwargs)
self.session = boto3.Session(self.key, self.secret, self.token,
**self.kwargs)
s3 = self.session.client('s3', config=conf, use_ssl=ssl,
**self.client_kwargs)
self._conn[tok] = (s3, self.session)
else:
s3, session = self._conn[tok]
self.session = session
return s3
signers.py 文件源码
项目:tf_aws_ecs_instance_draining_on_scale_in
作者: terraform-community-modules
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def _choose_signer(self, operation_name, signing_type):
"""
Allow setting the signature version via the choose-signer event.
A value of `botocore.UNSIGNED` means no signing will be performed.
:param operation_name: The operation to sign.
:param signing_type: The type of signing that the signer is to be used
for.
:return: The signature version to sign with.
"""
signing_type_suffix_map = {
'presign-post': '-presign-post',
'presign-url': '-query'
}
suffix = signing_type_suffix_map.get(signing_type, '')
signature_version = self._signature_version
if signature_version is not botocore.UNSIGNED and not \
signature_version.endswith(suffix):
signature_version += suffix
handler, response = self._event_emitter.emit_until_response(
'choose-signer.{0}.{1}'.format(self._service_name, operation_name),
signing_name=self._signing_name, region_name=self._region_name,
signature_version=signature_version)
if response is not None:
signature_version = response
# The suffix needs to be checked again in case we get an improper
# signature version from choose-signer.
if signature_version is not botocore.UNSIGNED and not \
signature_version.endswith(suffix):
signature_version += suffix
return signature_version
def sign(self, operation_name, request, region_name=None,
signing_type='standard', expires_in=None):
"""Sign a request before it goes out over the wire.
:type operation_name: string
:param operation_name: The name of the current operation, e.g.
``ListBuckets``.
:type request: AWSRequest
:param request: The request object to be sent over the wire.
:type region_name: str
:param region_name: The region to sign the request for.
:type signing_type: str
:param signing_type: The type of signing to perform. This can be one of
three possible values:
* 'standard' - This should be used for most requests.
* 'presign-url' - This should be used when pre-signing a request.
* 'presign-post' - This should be used when pre-signing an S3 post.
:type expires_in: int
:param expires_in: The number of seconds the presigned url is valid
for. This parameter is only valid for signing type 'presign-url'.
"""
if region_name is None:
region_name = self._region_name
signature_version = self._choose_signer(operation_name, signing_type)
# Allow mutating request before signing
self._event_emitter.emit(
'before-sign.{0}.{1}'.format(self._service_name, operation_name),
request=request, signing_name=self._signing_name,
region_name=self._region_name,
signature_version=signature_version, request_signer=self)
if signature_version != botocore.UNSIGNED:
kwargs = {
'signing_name': self._signing_name,
'region_name': region_name,
'signature_version': signature_version
}
if expires_in is not None:
kwargs['expires'] = expires_in
try:
auth = self.get_auth_instance(**kwargs)
except UnknownSignatureVersionError as e:
if signing_type != 'standard':
raise UnsupportedSignatureVersionError(
signature_version=signature_version)
else:
raise e
auth.add_auth(request)
def sign(self, operation_name, request, region_name=None,
signing_type='standard', expires_in=None):
"""Sign a request before it goes out over the wire.
:type operation_name: string
:param operation_name: The name of the current operation, e.g.
``ListBuckets``.
:type request: AWSRequest
:param request: The request object to be sent over the wire.
:type region_name: str
:param region_name: The region to sign the request for.
:type signing_type: str
:param signing_type: The type of signing to perform. This can be one of
three possible values:
* 'standard' - This should be used for most requests.
* 'presign-url' - This should be used when pre-signing a request.
* 'presign-post' - This should be used when pre-signing an S3 post.
:type expires_in: int
:param expires_in: The number of seconds the presigned url is valid
for. This parameter is only valid for signing type 'presign-url'.
"""
if region_name is None:
region_name = self._region_name
signature_version = self._choose_signer(operation_name, signing_type)
# Allow mutating request before signing
self._event_emitter.emit(
'before-sign.{0}.{1}'.format(self._service_name, operation_name),
request=request, signing_name=self._signing_name,
region_name=self._region_name,
signature_version=signature_version, request_signer=self)
if signature_version != botocore.UNSIGNED:
kwargs = {
'signing_name': self._signing_name,
'region_name': region_name,
'signature_version': signature_version
}
if expires_in is not None:
kwargs['expires'] = expires_in
try:
auth = self.get_auth_instance(**kwargs)
except UnknownSignatureVersionError as e:
if signing_type != 'standard':
raise UnsupportedSignatureVersionError(
signature_version=signature_version)
else:
raise e
auth.add_auth(request)
def process_warcs(self, id_, iterator):
s3pattern = re.compile('^s3://([^/]+)/(.+)')
base_dir = os.path.abspath(os.path.dirname(__file__))
# S3 client (not thread-safe, initialize outside parallelized loop)
no_sign_request = botocore.client.Config(
signature_version=botocore.UNSIGNED)
s3client = boto3.client('s3', config=no_sign_request)
for uri in iterator:
self.warc_input_processed.add(1)
if uri.startswith('s3://'):
self.get_logger().info('Reading from S3 {}'.format(uri))
s3match = s3pattern.match(uri)
if s3match is None:
self.get_logger().error("Invalid S3 URI: " + uri)
continue
bucketname = s3match.group(1)
path = s3match.group(2)
warctemp = TemporaryFile(mode='w+b',
dir=self.args.local_temp_dir)
try:
s3client.download_fileobj(bucketname, path, warctemp)
except botocore.client.ClientError as exception:
self.get_logger().error(
'Failed to download {}: {}'.format(uri, exception))
self.warc_input_failed.add(1)
continue
warctemp.seek(0)
stream = warctemp
elif uri.startswith('hdfs://'):
self.get_logger().error("HDFS input not implemented: " + uri)
continue
else:
self.get_logger().info('Reading local stream {}'.format(uri))
if uri.startswith('file:'):
uri = uri[5:]
uri = os.path.join(base_dir, uri)
try:
stream = open(uri, 'rb')
except IOError as exception:
self.get_logger().error(
'Failed to open {}: {}'.format(uri, exception))
self.warc_input_failed.add(1)
continue
no_parse = (not self.warc_parse_http_header)
try:
for record in ArchiveIterator(stream,
no_record_parse=no_parse):
for res in self.process_record(record):
yield res
self.records_processed.add(1)
except ArchiveLoadFailed as exception:
self.warc_input_failed.add(1)
self.get_logger().error(
'Invalid WARC: {} - {}'.format(uri, exception))
def sign(self, operation_name, request, region_name=None,
signing_type='standard', expires_in=None):
"""Sign a request before it goes out over the wire.
:type operation_name: string
:param operation_name: The name of the current operation, e.g.
``ListBuckets``.
:type request: AWSRequest
:param request: The request object to be sent over the wire.
:type region_name: str
:param region_name: The region to sign the request for.
:type signing_type: str
:param signing_type: The type of signing to perform. This can be one of
three possible values:
* 'standard' - This should be used for most requests.
* 'presign-url' - This should be used when pre-signing a request.
* 'presign-post' - This should be used when pre-signing an S3 post.
:type expires_in: int
:param expires_in: The number of seconds the presigned url is valid
for. This parameter is only valid for signing type 'presign-url'.
"""
if region_name is None:
region_name = self._region_name
signature_version = self._choose_signer(operation_name, signing_type)
# Allow mutating request before signing
self._event_emitter.emit(
'before-sign.{0}.{1}'.format(self._service_name, operation_name),
request=request, signing_name=self._signing_name,
region_name=self._region_name,
signature_version=signature_version, request_signer=self)
if signature_version != botocore.UNSIGNED:
kwargs = {
'signing_name': self._signing_name,
'region_name': region_name,
'signature_version': signature_version
}
if expires_in is not None:
kwargs['expires'] = expires_in
try:
auth = self.get_auth_instance(**kwargs)
except UnknownSignatureVersionError as e:
if signing_type != 'standard':
raise UnsupportedSignatureVersionError(
signature_version=signature_version)
else:
raise e
auth.add_auth(request)
def sign(self, operation_name, request, region_name=None,
signing_type='standard', expires_in=None):
"""Sign a request before it goes out over the wire.
:type operation_name: string
:param operation_name: The name of the current operation, e.g.
``ListBuckets``.
:type request: AWSRequest
:param request: The request object to be sent over the wire.
:type region_name: str
:param region_name: The region to sign the request for.
:type signing_type: str
:param signing_type: The type of signing to perform. This can be one of
three possible values:
* 'standard' - This should be used for most requests.
* 'presign-url' - This should be used when pre-signing a request.
* 'presign-post' - This should be used when pre-signing an S3 post.
:type expires_in: int
:param expires_in: The number of seconds the presigned url is valid
for. This parameter is only valid for signing type 'presign-url'.
"""
if region_name is None:
region_name = self._region_name
signature_version = self._choose_signer(
operation_name, signing_type, request.context)
# Allow mutating request before signing
self._event_emitter.emit(
'before-sign.{0}.{1}'.format(self._service_name, operation_name),
request=request, signing_name=self._signing_name,
region_name=self._region_name,
signature_version=signature_version, request_signer=self)
if signature_version != botocore.UNSIGNED:
kwargs = {
'signing_name': self._signing_name,
'region_name': region_name,
'signature_version': signature_version
}
if expires_in is not None:
kwargs['expires'] = expires_in
try:
auth = self.get_auth_instance(**kwargs)
except UnknownSignatureVersionError as e:
if signing_type != 'standard':
raise UnsupportedSignatureVersionError(
signature_version=signature_version)
else:
raise e
auth.add_auth(request)
signers.py 文件源码
项目:tf_aws_ecs_instance_draining_on_scale_in
作者: terraform-community-modules
项目源码
文件源码
阅读 27
收藏 0
点赞 0
评论 0
def sign(self, operation_name, request, region_name=None,
signing_type='standard', expires_in=None):
"""Sign a request before it goes out over the wire.
:type operation_name: string
:param operation_name: The name of the current operation, e.g.
``ListBuckets``.
:type request: AWSRequest
:param request: The request object to be sent over the wire.
:type region_name: str
:param region_name: The region to sign the request for.
:type signing_type: str
:param signing_type: The type of signing to perform. This can be one of
three possible values:
* 'standard' - This should be used for most requests.
* 'presign-url' - This should be used when pre-signing a request.
* 'presign-post' - This should be used when pre-signing an S3 post.
:type expires_in: int
:param expires_in: The number of seconds the presigned url is valid
for. This parameter is only valid for signing type 'presign-url'.
"""
if region_name is None:
region_name = self._region_name
signature_version = self._choose_signer(operation_name, signing_type)
# Allow mutating request before signing
self._event_emitter.emit(
'before-sign.{0}.{1}'.format(self._service_name, operation_name),
request=request, signing_name=self._signing_name,
region_name=self._region_name,
signature_version=signature_version, request_signer=self)
if signature_version != botocore.UNSIGNED:
kwargs = {
'signing_name': self._signing_name,
'region_name': region_name,
'signature_version': signature_version
}
if expires_in is not None:
kwargs['expires'] = expires_in
try:
auth = self.get_auth_instance(**kwargs)
except UnknownSignatureVersionError as e:
if signing_type != 'standard':
raise UnsupportedSignatureVersionError(
signature_version=signature_version)
else:
raise e
auth.add_auth(request)