def test_scan_invalid(self):
db = await self.client.db
db.scan = asynctest.CoroutineMock(side_effect=ParamValidationError(report="Exception raised"))
res = await self.client.scan()
db.scan.assert_called_once_with(Limit=5, Select="ALL_ATTRIBUTES", TableName="livebridge_test")
assert res == []
python类ParamValidationError()的实例源码
def test_insert_post(self):
api_res = {'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': 'd92c4314-439a-4bc7-90f5-125a615dfaa2'}}
db = await self.client.db
db.put_item = asynctest.CoroutineMock(return_value=api_res)
date = datetime.utcnow()
date_str = datetime.strftime(date, self.client.date_fmt)
kwargs = {"target_id": "target-id",
"post_id": "post-id",
"source_id": "source-id",
"text": "Text",
"sticky": True,
"created": date,
"updated": date,
"target_doc": {"foo": "doc"}}
res = await self.client.insert_post(**kwargs)
assert res is True
assert type(res) == bool
db.put_item.assert_called_once_with(
Item={
'source_id': {'S': 'source-id'},
'text': {'S': 'Text'},
'target_id': {'S': 'target-id'},
'post_id': {'S': 'post-id'},
'created': {'S': date_str},
'updated': {'S': date_str},
'target_doc': {'S': '{"foo": "doc"}'},
'sticky': {'N': '1'}},
TableName='livebridge_test')
# insert_post failing(self):
db.put_item = asynctest.CoroutineMock(side_effect=ParamValidationError(report="Exception raised"))
res = await self.client.insert_post(**kwargs)
assert res is False
assert type(res) == bool
def _validate_keys(dynamodb_data):
"""Helper method to check if query key empty or duplicated"""
result = []
if not dynamodb_data:
err_msg = {'Error': {'Code': 403, 'Message': 'Empty query keys'}}
raise ParamValidationError(report=err_msg)
deserializer = TypeDeserializer()
for raw_data in dynamodb_data:
for _, val in raw_data.iteritems():
python_data = deserializer.deserialize(val).lower()
if not python_data or python_data in result:
err_msg = {'Error': {'Code': 403, 'Message': 'Parameter Validation Error'}}
raise ParamValidationError(report=err_msg)
result.append(python_data)
def invoke(cls, **kwargs):
"""Mocked invoke function that returns a reponse mimicking boto3's reponse
Keyword Arguments:
FuncitonName (str): The AWS Lambda function name being invoked
InvocationType (str): Type of invocation (typically 'Event')
Payload (str): Payload in string or file format to send to lambda
Qualifier (str): Alias for fully qualified AWS ARN
Returns:
dict: Response dictionary containing a fake RequestId
"""
if cls._raise_exception:
# Turn of the raise exception boolean so we don't do this next time
cls._raise_exception = not cls._raise_exception
err = {'Error': {'Code': 400, 'Message': 'raising test exception'}}
raise ClientError(err, 'invoke')
req_keywords = {'FunctionName', 'InvocationType', 'Payload'}
key_diff = req_keywords.difference(set(kwargs))
if key_diff:
message = 'required keyword missing: {}'.format(', '.join(key_diff))
err = {'Error': {'Code': 400, 'Message': message}}
raise ClientError(err, 'invoke')
if not isinstance(kwargs['Payload'], (str, bytearray)):
if not hasattr(kwargs['Payload'], 'read'):
err = ('Invalid type for parameter Payload, value: {}, type: {}, '
'valid types: <type \'str\'>, <type \'bytearray\'>, '
'file-like object').format(kwargs['Payload'], type(kwargs['Payload']))
raise ParamValidationError(response=err)
return {'ResponseMetadata': {'RequestId': '9af88643-7b3c-43cd-baae-addb73bb4d27'}}
def validate_bucket_name(params, **kwargs):
if 'Bucket' not in params:
return
bucket = params['Bucket']
if VALID_BUCKET.search(bucket) is None:
error_msg = (
'Invalid bucket name "%s": Bucket name must match '
'the regex "%s"' % (bucket, VALID_BUCKET.pattern))
raise ParamValidationError(report=error_msg)
def _quote_source_header_from_dict(source_dict):
try:
bucket = source_dict['Bucket']
key = percent_encode(source_dict['Key'], safe=SAFE_CHARS + '/')
version_id = source_dict.get('VersionId')
except KeyError as e:
raise ParamValidationError(
report='Missing required parameter: %s' % str(e))
final = '%s/%s' % (bucket, key)
if version_id is not None:
final += '?versionId=%s' % version_id
return final
def validate_ascii_metadata(params, **kwargs):
"""Verify S3 Metadata only contains ascii characters.
From: http://docs.aws.amazon.com/AmazonS3/latest/dev/UsingMetadata.html
"Amazon S3 stores user-defined metadata in lowercase. Each name, value pair
must conform to US-ASCII when using REST and UTF-8 when using SOAP or
browser-based uploads via POST."
"""
metadata = params.get('Metadata')
if not metadata or not isinstance(metadata, dict):
# We have to at least type check the metadata as a dict type
# because this handler is called before param validation.
# We'll go ahead and return because the param validator will
# give a descriptive error message for us.
# We might need a post-param validation event.
return
for key, value in metadata.items():
try:
key.encode('ascii')
value.encode('ascii')
except UnicodeEncodeError as e:
error_msg = (
'Non ascii characters found in S3 metadata '
'for key "%s", value: "%s". \nS3 metadata can only '
'contain ASCII characters. ' % (key, value)
)
raise ParamValidationError(
report=error_msg)
def serialize_to_request(self, parameters, operation_model):
input_shape = operation_model.input_shape
if input_shape is not None:
report = self._param_validator.validate(parameters,
operation_model.input_shape)
if report.has_errors():
raise ParamValidationError(report=report.generate_report())
return self._serializer.serialize_to_request(parameters,
operation_model)
def validate_bucket_name(params, **kwargs):
if 'Bucket' not in params:
return
bucket = params['Bucket']
if VALID_BUCKET.search(bucket) is None:
error_msg = (
'Invalid bucket name "%s": Bucket name must match '
'the regex "%s"' % (bucket, VALID_BUCKET.pattern))
raise ParamValidationError(report=error_msg)
def _quote_source_header_from_dict(source_dict):
try:
bucket = source_dict['Bucket']
key = percent_encode(source_dict['Key'], safe=SAFE_CHARS + '/')
version_id = source_dict.get('VersionId')
except KeyError as e:
raise ParamValidationError(
report='Missing required parameter: %s' % str(e))
final = '%s/%s' % (bucket, key)
if version_id is not None:
final += '?versionId=%s' % version_id
return final
def validate_ascii_metadata(params, **kwargs):
"""Verify S3 Metadata only contains ascii characters.
From: http://docs.aws.amazon.com/AmazonS3/latest/dev/UsingMetadata.html
"Amazon S3 stores user-defined metadata in lowercase. Each name, value pair
must conform to US-ASCII when using REST and UTF-8 when using SOAP or
browser-based uploads via POST."
"""
metadata = params.get('Metadata')
if not metadata or not isinstance(metadata, dict):
# We have to at least type check the metadata as a dict type
# because this handler is called before param validation.
# We'll go ahead and return because the param validator will
# give a descriptive error message for us.
# We might need a post-param validation event.
return
for key, value in metadata.items():
try:
key.encode('ascii')
value.encode('ascii')
except UnicodeEncodeError as e:
error_msg = (
'Non ascii characters found in S3 metadata '
'for key "%s", value: "%s". \nS3 metadata can only '
'contain ASCII characters. ' % (key, value)
)
raise ParamValidationError(
report=error_msg)
def serialize_to_request(self, parameters, operation_model):
input_shape = operation_model.input_shape
if input_shape is not None:
report = self._param_validator.validate(parameters,
operation_model.input_shape)
if report.has_errors():
raise ParamValidationError(report=report.generate_report())
return self._serializer.serialize_to_request(parameters,
operation_model)
def copy(self, path1, path2, **kwargs):
""" Copy file between locations on S3 """
buc1, key1 = split_path(path1)
buc2, key2 = split_path(path2)
try:
self._call_s3(
self.s3.copy_object,
kwargs,
Bucket=buc2, Key=key2, CopySource='/'.join([buc1, key1])
)
except (ClientError, ParamValidationError):
raise IOError('Copy failed', (path1, path2))
self.invalidate_cache(path2)
def get_rest_api(client, module, swagger_spec):
rest_api = None
info_title = None
rest_api_id = module.params['rest_api_id']
try:
info_title = swagger_spec['info']['title']
except KeyError:
module.fail_json(msg="Missing required value in swagger spec: info.title")
if rest_api_id == '*':
try:
rest_apis = client.get_rest_apis(limit=500)['items']
choices = [api for api in rest_apis if api['name'] == info_title]
except ClientError as e:
choices = None
module.fail_json(msg="Error retrieving REST APIs: {0}".format(e))
if len(choices) > 1:
module.fail_json(msg="More than one API found: {0}".format(choices))
elif len(choices) > 0:
try:
rest_api_id = choices[0]['id']
rest_api = client.get_rest_api(restApiId=rest_api_id)
except (ClientError, ParamValidationError, MissingParametersError) as e:
if not e.response['Error']['Code'] == 'NotFoundException':
module.fail_json(msg='Error retrieving REST API: {0}'.format(e))
return rest_api
def create_models(client, module, rest_api_id, schemas):
"""
Creates models based on schemas.
"""
models = None
try:
for model in schemas.keys():
schema = schemas[model]
schema.update({
"$schema": "http://json-schema.org/draft-04/schema#",
"type": "object",
"title": "{0} schema".format(model)
})
models = client.create_model(
restApiId=rest_api_id,
name=model,
description='added by Ansible module',
contentType='application/json',
schema=json.dumps(schema)
)
except (ClientError, ParamValidationError, MissingParametersError) as e:
#TODO: should report warning or update existing model
if not e.response['Error']['Code'] == 'ConflictException':
module.fail_json(msg='Error creating API model {0}: {1}'.format(model, e))
return models
def create_resource(client, module, rest_api_id, parent_id, path_part):
resource = None
try:
resource = client.create_resource(
restApiId=rest_api_id,
pathPart=path_part,
parentId=parent_id
)
except (ClientError, ParamValidationError, MissingParametersError) as e:
module.fail_json(msg="Error creating API resource {0} pid: {1}: {2}".format(path_part, parent_id, e))
return resource
def put_method_response(client, module, rest_api_id, resource_id, http_method, status_code, response):
method_response = None
if not re.match(r'^[2-6]\d\d$', str(status_code)):
module.fail_json(msg="Error creating response {0} for method {1} rid: {2}: invalid response code.".format(status_code, http_method, resource_id))
api_params = dict(
restApiId=rest_api_id,
resourceId=resource_id,
httpMethod=http_method,
statusCode=str(status_code)
)
if 'headers' in response:
response_parameters = dict()
for header in response['headers'].keys():
destination = 'method.response.header.{0}'.format(header)
response_parameters[destination] = True
if response_parameters:
api_params['responseParameters'] = response_parameters
try:
method_response = client.put_method_response(**api_params)
except (ClientError, ParamValidationError, MissingParametersError) as e:
module.fail_json(msg="Error creating response {0} for method {1} rid: {2}: {3}".format(status_code, http_method, resource_id, e))
return method_response
def put_integration(client, module, rest_api_id, resource_id, http_method, integration):
method_integration = None
api_params = dict(
restApiId=rest_api_id,
resourceId=resource_id,
httpMethod=http_method,
type=integration['type'].upper(),
)
if 'httpMethod' in integration:
api_params['integrationHttpMethod'] = integration['httpMethod']
for optional_params in ('uri', 'credentials', 'requestParameters', 'requestTemplates', 'cacheNameSpace'):
if optional_params in integration:
api_params[optional_params] = integration[optional_params]
if 'cacheKeyParameters' in integration:
cache_key_parameters = []
for parameter in integration['cacheKeyParameters']:
cache_key_parameters.append('method.request.querystring.{0}'.format(parameter.split('.')[-1]))
if cache_key_parameters:
api_params['cacheKeyParameters'] = cache_key_parameters
try:
method_integration = client.put_integration(**api_params)
except (ClientError, ParamValidationError, MissingParametersError) as e:
module.fail_json(msg="Error creating integration for method {0} rid: {1}: {2}".format(http_method, resource_id, e))
return method_integration
handlers.py 文件源码
项目:tf_aws_ecs_instance_draining_on_scale_in
作者: terraform-community-modules
项目源码
文件源码
阅读 19
收藏 0
点赞 0
评论 0
def validate_bucket_name(params, **kwargs):
if 'Bucket' not in params:
return
bucket = params['Bucket']
if VALID_BUCKET.search(bucket) is None:
error_msg = (
'Invalid bucket name "%s": Bucket name must match '
'the regex "%s"' % (bucket, VALID_BUCKET.pattern))
raise ParamValidationError(report=error_msg)
handlers.py 文件源码
项目:tf_aws_ecs_instance_draining_on_scale_in
作者: terraform-community-modules
项目源码
文件源码
阅读 17
收藏 0
点赞 0
评论 0
def _quote_source_header_from_dict(source_dict):
try:
bucket = source_dict['Bucket']
key = percent_encode(source_dict['Key'], safe=SAFE_CHARS + '/')
version_id = source_dict.get('VersionId')
except KeyError as e:
raise ParamValidationError(
report='Missing required parameter: %s' % str(e))
final = '%s/%s' % (bucket, key)
if version_id is not None:
final += '?versionId=%s' % version_id
return final