def validate_parameters(params, shape):
"""Validates input parameters against a schema.
This is a convenience function that validates parameters against a schema.
You can also instantiate and use the ParamValidator class directly if you
want more control.
If there are any validation errors then a ParamValidationError
will be raised. If there are no validation errors than no exception
is raised and a value of None is returned.
:param params: The user provided input parameters.
:type shape: botocore.model.Shape
:param shape: The schema which the input parameters should
adhere to.
:raise: ParamValidationError
"""
validator = ParamValidator()
report = validator.validate(params, shape)
if report.has_errors():
raise ParamValidationError(report=report.generate_report())
python类ParamValidationError()的实例源码
def retry(attempts=3):
def wrapper(func):
@wraps(func)
def wrapped(*args, **kwargs):
tries = attempts
while True:
tries -= 1
try:
return func(*args, **kwargs)
except (ClientError, ParamValidationError) as error:
if tries > 0:
print('[ssha] {}'.format(error))
else:
raise
return wrapped
return wrapper
def validate_parameters(params, shape):
"""Validates input parameters against a schema.
This is a convenience function that validates parameters against a schema.
You can also instantiate and use the ParamValidator class directly if you
want more control.
If there are any validation errors then a ParamValidationError
will be raised. If there are no validation errors than no exception
is raised and a value of None is returned.
:param params: The user provided input parameters.
:type shape: botocore.model.Shape
:param shape: The schema which the input parameters should
adhere to.
:raise: ParamValidationError
"""
validator = ParamValidator()
report = validator.validate(params, shape)
if report.has_errors():
raise ParamValidationError(report=report.generate_report())
def start_query_execution(self, db, query):
try:
if not db:
raise ValueError('Schema must be specified when session schema is not set')
result_configuration = {
'OutputLocation': self.bucket,
}
if self.encryption:
result_configuration['EncryptionConfiguration'] = {
'EncryptionOption': 'SSE_S3'
}
return self.athena.start_query_execution(
QueryString=query,
ClientRequestToken=str(uuid.uuid4()),
QueryExecutionContext={
'Database': db
},
ResultConfiguration=result_configuration
)['QueryExecutionId']
except (ClientError, ParamValidationError, ValueError) as e:
print(e)
return
def get_lambda_alias(module, aws):
"""
Returns the lambda function alias if it exists.
:param module: Ansible module reference
:param aws: AWS client connection
:return:
"""
client = aws.client('lambda')
# set API parameters
api_params = set_api_params(module, ('function_name', 'name'))
# check if alias exists and get facts
try:
results = client.get_alias(**api_params)
except (ClientError, ParamValidationError, MissingParametersError) as e:
if e.response['Error']['Code'] == 'ResourceNotFoundException':
results = None
else:
module.fail_json(msg='Error retrieving function alias: {0}'.format(e))
return results
def validate_parameters(params, shape):
"""Validates input parameters against a schema.
This is a convenience function that validates parameters against a schema.
You can also instantiate and use the ParamValidator class directly if you
want more control.
If there are any validation errors then a ParamValidationError
will be raised. If there are no validation errors than no exception
is raised and a value of None is returned.
:param params: The user provided input parameters.
:type shape: botocore.model.Shape
:param shape: The schema which the input parameters should
adhere to.
:raise: ParamValidationError
"""
validator = ParamValidator()
report = validator.validate(params, shape)
if report.has_errors():
raise ParamValidationError(report=report.generate_report())
def validate_parameters(params, shape):
"""Validates input parameters against a schema.
This is a convenience function that validates parameters against a schema.
You can also instantiate and use the ParamValidator class directly if you
want more control.
If there are any validation errors then a ParamValidationError
will be raised. If there are no validation errors than no exception
is raised and a value of None is returned.
:param params: The user provided input parameters.
:type shape: botocore.model.Shape
:param shape: The schema which the input parameters should
adhere to.
:raise: ParamValidationError
"""
validator = ParamValidator()
report = validator.validate(params, shape)
if report.has_errors():
raise ParamValidationError(report=report.generate_report())
def validate_parameters(params, shape):
"""Validates input parameters against a schema.
This is a convenience function that validates parameters against a schema.
You can also instantiate and use the ParamValidator class directly if you
want more control.
If there are any validation errors then a ParamValidationError
will be raised. If there are no validation errors than no exception
is raised and a value of None is returned.
:param params: The user provided input parameters.
:type shape: botocore.model.Shape
:param shape: The schema which the input parameters should
adhere to.
:raise: ParamValidationError
"""
validator = ParamValidator()
report = validator.validate(params, shape)
if report.has_errors():
raise ParamValidationError(report=report.generate_report())
def info(self, path, refresh=False, **kwargs):
""" Detail on the specific file pointed to by path.
Gets details only for a specific key, directories/buckets cannot be
used with info.
"""
parent = path.rsplit('/', 1)[0]
files = self._lsdir(parent, refresh=refresh)
files = [f for f in files if f['Key'] == path and f['StorageClass'] not
in ['DIRECTORY', 'BUCKET']]
if len(files) == 1:
return files[0]
else:
try:
bucket, key = split_path(path)
out = self._call_s3(self.s3.head_object,
kwargs, Bucket=bucket, Key=key, **self.req_kw)
out = {'ETag': out['ETag'], 'Key': '/'.join([bucket, key]),
'LastModified': out['LastModified'],
'Size': out['ContentLength'], 'StorageClass': "STANDARD"}
return out
except (ClientError, ParamValidationError):
raise FileNotFoundError(path)
def touch(self, path, acl="", **kwargs):
"""
Create empty key
If path is a bucket only, attempt to create bucket.
"""
bucket, key = split_path(path)
acl = acl or self.s3_additional_kwargs.get('ACL', '')
if key:
if acl and acl not in key_acls:
raise ValueError('ACL not in %s', key_acls)
self._call_s3(
self.s3.put_object, kwargs,
Bucket=bucket, Key=key, ACL=acl)
self.invalidate_cache(path)
else:
if acl and acl not in buck_acls:
raise ValueError('ACL not in %s', buck_acls)
try:
self.s3.create_bucket(Bucket=bucket, ACL=acl)
self.invalidate_cache('')
self.invalidate_cache(bucket)
except (ClientError, ParamValidationError):
raise IOError('Bucket create failed', path)
def create_rest_api(client, module, title, description):
"""
Creates a new API with 'default' model and root resource node.
:param client:
:param module:
:param title:
:param description:
:return: API Id
"""
rest_api = None
try:
rest_api = client.create_rest_api(
name=title,
description=description
)
except (ClientError, ParamValidationError, MissingParametersError) as e:
module.fail_json(msg='Error creating REST API: {0}'.format(e))
return rest_api['id']
def delete_rest_api(client, module, rest_api_id):
"""
Deletes the entire API, including associated models.
:param client:
:param module:
:param rest_api_id:
:return:
"""
try:
client.delete_rest_api(restApiId=rest_api_id)
except (ClientError, ParamValidationError, MissingParametersError) as e:
module.fail_json(msg='Error deleting REST API: {0}'.format(e))
return
def put_integration_response(client, module, rest_api_id, resource_id, http_method, selection_pattern, integration_response):
response = None
api_params = dict(
restApiId=rest_api_id,
resourceId=resource_id,
httpMethod=http_method,
statusCode=integration_response['statusCode'],
selectionPattern=selection_pattern
)
for optional_params in ('responseParameters', 'responseTemplates', ):
if optional_params in integration_response:
api_params[optional_params] = integration_response[optional_params]
try:
response = client.put_integration_response(**api_params)
except (ClientError, ParamValidationError, MissingParametersError) as e:
module.fail_json(msg="Error creating integration response '{0}' for method '{1}', rid: {2}: {3}".format(selection_pattern, http_method, resource_id, e))
return response
validate.py 文件源码
项目:tf_aws_ecs_instance_draining_on_scale_in
作者: terraform-community-modules
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def validate_parameters(params, shape):
"""Validates input parameters against a schema.
This is a convenience function that validates parameters against a schema.
You can also instantiate and use the ParamValidator class directly if you
want more control.
If there are any validation errors then a ParamValidationError
will be raised. If there are no validation errors than no exception
is raised and a value of None is returned.
:param params: The user provided input parameters.
:type shape: botocore.model.Shape
:param shape: The schema which the input parameters should
adhere to.
:raise: ParamValidationError
"""
validator = ParamValidator()
report = validator.validate(params, shape)
if report.has_errors():
raise ParamValidationError(report=report.generate_report())
def validate_bucket_name(params, **kwargs):
if 'Bucket' not in params:
return
bucket = params['Bucket']
if VALID_BUCKET.search(bucket) is None:
error_msg = (
'Invalid bucket name "%s": Bucket name must match '
'the regex "%s"' % (bucket, VALID_BUCKET.pattern))
raise ParamValidationError(report=error_msg)
def _quote_source_header_from_dict(source_dict):
try:
bucket = source_dict['Bucket']
key = percent_encode(source_dict['Key'], safe=SAFE_CHARS + '/')
version_id = source_dict.get('VersionId')
except KeyError as e:
raise ParamValidationError(
report='Missing required parameter: %s' % str(e))
final = '%s/%s' % (bucket, key)
if version_id is not None:
final += '?versionId=%s' % version_id
return final
def validate_ascii_metadata(params, **kwargs):
"""Verify S3 Metadata only contains ascii characters.
From: http://docs.aws.amazon.com/AmazonS3/latest/dev/UsingMetadata.html
"Amazon S3 stores user-defined metadata in lowercase. Each name, value pair
must conform to US-ASCII when using REST and UTF-8 when using SOAP or
browser-based uploads via POST."
"""
metadata = params.get('Metadata')
if not metadata or not isinstance(metadata, dict):
# We have to at least type check the metadata as a dict type
# because this handler is called before param validation.
# We'll go ahead and return because the param validator will
# give a descriptive error message for us.
# We might need a post-param validation event.
return
for key, value in metadata.items():
try:
key.encode('ascii')
value.encode('ascii')
except UnicodeEncodeError as e:
error_msg = (
'Non ascii characters found in S3 metadata '
'for key "%s", value: "%s". \nS3 metadata can only '
'contain ASCII characters. ' % (key, value)
)
raise ParamValidationError(
report=error_msg)
def serialize_to_request(self, parameters, operation_model):
input_shape = operation_model.input_shape
if input_shape is not None:
report = self._param_validator.validate(parameters,
operation_model.input_shape)
if report.has_errors():
raise ParamValidationError(report=report.generate_report())
return self._serializer.serialize_to_request(parameters,
operation_model)
def validate_bucket_name(params, **kwargs):
if 'Bucket' not in params:
return
bucket = params['Bucket']
if VALID_BUCKET.search(bucket) is None:
error_msg = (
'Invalid bucket name "%s": Bucket name must match '
'the regex "%s"' % (bucket, VALID_BUCKET.pattern))
raise ParamValidationError(report=error_msg)
def _quote_source_header_from_dict(source_dict):
try:
bucket = source_dict['Bucket']
key = percent_encode(source_dict['Key'], safe=SAFE_CHARS + '/')
version_id = source_dict.get('VersionId')
except KeyError as e:
raise ParamValidationError(
report='Missing required parameter: %s' % str(e))
final = '%s/%s' % (bucket, key)
if version_id is not None:
final += '?versionId=%s' % version_id
return final
def validate_ascii_metadata(params, **kwargs):
"""Verify S3 Metadata only contains ascii characters.
From: http://docs.aws.amazon.com/AmazonS3/latest/dev/UsingMetadata.html
"Amazon S3 stores user-defined metadata in lowercase. Each name, value pair
must conform to US-ASCII when using REST and UTF-8 when using SOAP or
browser-based uploads via POST."
"""
metadata = params.get('Metadata')
if not metadata or not isinstance(metadata, dict):
# We have to at least type check the metadata as a dict type
# because this handler is called before param validation.
# We'll go ahead and return because the param validator will
# give a descriptive error message for us.
# We might need a post-param validation event.
return
for key, value in metadata.items():
try:
key.encode('ascii')
value.encode('ascii')
except UnicodeEncodeError as e:
error_msg = (
'Non ascii characters found in S3 metadata '
'for key "%s", value: "%s". \nS3 metadata can only '
'contain ASCII characters. ' % (key, value)
)
raise ParamValidationError(
report=error_msg)
def serialize_to_request(self, parameters, operation_model):
input_shape = operation_model.input_shape
if input_shape is not None:
report = self._param_validator.validate(parameters,
operation_model.input_shape)
if report.has_errors():
raise ParamValidationError(report=report.generate_report())
return self._serializer.serialize_to_request(parameters,
operation_model)
def can_retry(self, ex):
"""
Tests if a retry can be done based on the exception of an earlier call
:param ex: Execution raise by earlier call of the boto3 method
:return: True if any of the call_retry_strategy returns True, else False
"""
if type(ex) == ParamValidationError:
return False
return AwsApiServiceRetry.can_retry(self, ex)
def __init__(self, ansible_obj, resources, boto3=True):
try:
self.region, self.endpoint, aws_connect_kwargs = get_aws_connection_info(ansible_obj, boto3=boto3)
self.resource_client = dict()
if not resources:
resources = ['lambda']
resources.append('iam')
for resource in resources:
aws_connect_kwargs.update(dict(region=self.region,
endpoint=self.endpoint,
conn_type='client',
resource=resource
))
self.resource_client[resource] = boto3_conn(ansible_obj, **aws_connect_kwargs)
# if region is not provided, then get default profile/session region
if not self.region:
self.region = self.resource_client['lambda'].meta.region_name
except (ClientError, ParamValidationError, MissingParametersError) as e:
ansible_obj.fail_json(msg="Unable to connect, authorize or access resource: {0}".format(e))
try:
self.account_id = self.resource_client['iam'].get_user()['User']['Arn'].split(':')[4]
except (ClientError, ValueError, KeyError, IndexError):
self.account_id = ''
def __init__(self, ansible_obj, resources, use_boto3=True):
try:
self.region, self.endpoint, aws_connect_kwargs = get_aws_connection_info(ansible_obj, boto3=use_boto3)
self.resource_client = dict()
if not resources:
resources = ['lambda']
resources.append('iam')
for resource in resources:
aws_connect_kwargs.update(dict(region=self.region,
endpoint=self.endpoint,
conn_type='client',
resource=resource
))
self.resource_client[resource] = boto3_conn(ansible_obj, **aws_connect_kwargs)
# if region is not provided, then get default profile/session region
if not self.region:
self.region = self.resource_client['lambda'].meta.region_name
except (ClientError, ParamValidationError, MissingParametersError) as e:
ansible_obj.fail_json(msg="Unable to connect, authorize or access resource: {0}".format(e))
# set account ID
try:
self.account_id = self.resource_client['iam'].get_user()['User']['Arn'].split(':')[4]
except (ClientError, ValueError, KeyError, IndexError):
self.account_id = ''
def validate_bucket_name(params, **kwargs):
if 'Bucket' not in params:
return
bucket = params['Bucket']
if VALID_BUCKET.search(bucket) is None:
error_msg = (
'Invalid bucket name "%s": Bucket name must match '
'the regex "%s"' % (bucket, VALID_BUCKET.pattern))
raise ParamValidationError(report=error_msg)
def _quote_source_header_from_dict(source_dict):
try:
bucket = source_dict['Bucket']
key = percent_encode(source_dict['Key'], safe=SAFE_CHARS + '/')
version_id = source_dict.get('VersionId')
except KeyError as e:
raise ParamValidationError(
report='Missing required parameter: %s' % str(e))
final = '%s/%s' % (bucket, key)
if version_id is not None:
final += '?versionId=%s' % version_id
return final
def validate_ascii_metadata(params, **kwargs):
"""Verify S3 Metadata only contains ascii characters.
From: http://docs.aws.amazon.com/AmazonS3/latest/dev/UsingMetadata.html
"Amazon S3 stores user-defined metadata in lowercase. Each name, value pair
must conform to US-ASCII when using REST and UTF-8 when using SOAP or
browser-based uploads via POST."
"""
metadata = params.get('Metadata')
if not metadata or not isinstance(metadata, dict):
# We have to at least type check the metadata as a dict type
# because this handler is called before param validation.
# We'll go ahead and return because the param validator will
# give a descriptive error message for us.
# We might need a post-param validation event.
return
for key, value in metadata.items():
try:
key.encode('ascii')
value.encode('ascii')
except UnicodeEncodeError as e:
error_msg = (
'Non ascii characters found in S3 metadata '
'for key "%s", value: "%s". \nS3 metadata can only '
'contain ASCII characters. ' % (key, value)
)
raise ParamValidationError(
report=error_msg)
def serialize_to_request(self, parameters, operation_model):
input_shape = operation_model.input_shape
if input_shape is not None:
report = self._param_validator.validate(parameters,
operation_model.input_shape)
if report.has_errors():
raise ParamValidationError(report=report.generate_report())
return self._serializer.serialize_to_request(parameters,
operation_model)
def destroy_role(connection, module):
params = dict()
params['RoleName'] = module.params.get('name')
if get_role(connection, params['RoleName']):
# We need to remove any instance profiles from the role before we delete it
try:
instance_profiles = connection.list_instance_profiles_for_role(RoleName=params['RoleName'])['InstanceProfiles']
except ClientError as e:
module.fail_json(msg=e.message, **camel_dict_to_snake_dict(e.response))
# Now remove the role from the instance profile(s)
for profile in instance_profiles:
try:
connection.remove_role_from_instance_profile(InstanceProfileName=profile['InstanceProfileName'], RoleName=params['RoleName'])
except ClientError as e:
module.fail_json(msg=e.message, **camel_dict_to_snake_dict(e.response))
# Now remove any attached policies otherwise deletion fails
try:
for policy in get_attached_policy_list(connection, params['RoleName']):
connection.detach_role_policy(RoleName=params['RoleName'], PolicyArn=policy['PolicyArn'])
except (ClientError, ParamValidationError) as e:
module.fail_json(msg=e.message, **camel_dict_to_snake_dict(e.response))
try:
connection.delete_role(**params)
except ClientError as e:
module.fail_json(msg=e.message, **camel_dict_to_snake_dict(e.response))
else:
module.exit_json(changed=False)
module.exit_json(changed=True)