def test_list_container_objects_failed(self):
self.mock_object(
self.fake_driver.client, 'list_objects',
mock.Mock(side_effect=ClientError(
fake_error_code_resp,
'ListObjects'
))
)
self.assertRaises(ClientError,
self.fake_driver.list_container_objects,
'fake-container',
prefix=None,
delimiter=None)
self.fake_driver.client.list_objects.assert_called_once_with(
Bucket='fake-container',
Prefix=None,
Delimiter=None,
)
python类ClientError()的实例源码
def get_item(self, data):
"""Method to get an item
Args:
data (dict): A dictionary of attributes to put
Returns:
(dict)
"""
try:
response = self.table.get_item(Key=data,
ConsistentRead=True)
except ClientError as err:
raise IOError("Error getting item: {}".format(err.message))
if response['ResponseMetadata']['HTTPStatusCode'] != 200:
raise IOError("Error getting item: {}".format(response['ResponseMetadata']))
if "Item" in response:
return response["Item"]
else:
return None
def delete_item(self, data):
"""Method to get an item
Args:
data (dict): A dictionary of attributes to access an item (hash and sort keys)
Returns:
None
"""
try:
response = self.table.delete_item(Key=data)
except ClientError as err:
raise IOError("Error deleting item: {}".format(err.message))
if response['ResponseMetadata']['HTTPStatusCode'] != 200:
raise IOError("Error deleting item: {}".format(response['ResponseMetadata']))
def create_trigger_from_bucket(self, bucket_name, function_arn):
notification = { "LambdaFunctionConfigurations": [
{ "LambdaFunctionArn": function_arn,
"Events": [ "s3:ObjectCreated:*" ],
"Filter":
{ "Key":
{ "FilterRules": [
{ "Name": "prefix",
"Value": "input/"
}]
}
}
}]
}
try:
self.get_s3().put_bucket_notification_configuration( Bucket=bucket_name,
NotificationConfiguration=notification )
except ClientError as ce:
print ("Error configuring S3 bucket: %s" % ce)
def get_functions_arn_list(self):
arn_list = []
try:
# Creation of a function filter by tags
client = self.get_resource_groups_tagging_api()
tag_filters = [ { 'Key': 'owner', 'Values': [ self.get_user_name_or_id() ] },
{ 'Key': 'createdby', 'Values': ['scar'] } ]
response = client.get_resources(TagFilters=tag_filters,
TagsPerPage=100)
for function in response['ResourceTagMappingList']:
arn_list.append(function['ResourceARN'])
while ('PaginationToken' in response) and (response['PaginationToken']):
response = client.get_resources(PaginationToken=response['PaginationToken'],
TagFilters=tag_filters,
TagsPerPage=100)
for function in response['ResourceTagMappingList']:
arn_list.append(function['ResourceARN'])
except ClientError as ce:
print ("Error getting function arn by tag: %s" % ce)
return arn_list
def test_init_retention_policy_error(self, mock_aws_client):
mock_aws_client.return_value.get_access_key.return_value = 'test_key'
mock_aws_client.return_value.get_lambda.return_value.create_function.return_value = {'FunctionArn':'arn123',
'Timeout':'300',
'MemorySize':'512',
'FunctionName':'f1-name',
'Extra1':'e1',
'Extra2':'e2'}
mock_aws_client.return_value.get_log.return_value.put_retention_policy.side_effect = ClientError({'Error' : {'Code' : '42', 'Message' : 'test_message'}}, 'test2')
args = Args()
args.verbose = False
Scar().init(args)
output = TestScar.capturedOutput.getvalue()
self.assertTrue("Error setting log retention policy:" in output)
self.assertTrue("An error occurred (42) when calling the test2 operation: test_message" in output)
def test_init_event_source_error(self, mock_aws_client):
mock_aws_client.return_value.get_access_key.return_value = 'test_key'
mock_aws_client.return_value.get_lambda.return_value.create_function.return_value = {'FunctionArn':'arn123',
'Timeout':'300',
'MemorySize':'512',
'FunctionName':'f1-name',
'Extra1':'e1',
'Extra2':'e2'}
mock_aws_client.return_value.check_and_create_s3_bucket.side_effect = ClientError({'Error' : {'Code' : '42', 'Message' : 'test_message'}}, 'test2')
args = Args()
args.verbose = False
args.event_source = True
Scar().init(args)
output = TestScar.capturedOutput.getvalue()
self.assertTrue("Error creating the event source:" in output)
self.assertTrue("An error occurred (42) when calling the test2 operation: test_message" in output)
def test_init_log_group_error(self, mock_aws_client):
mock_aws_client.return_value.get_access_key.return_value = 'test_key'
mock_aws_client.return_value.get_lambda.return_value.create_function.return_value = {'FunctionArn':'arn123',
'Timeout':'300',
'MemorySize':'512',
'FunctionName':'f1-name',
'Extra1':'e1',
'Extra2':'e2'}
mock_aws_client.return_value.get_log.return_value.create_log_group.side_effect = ClientError({'Error' : {'Code' : '42', 'Message' : 'test_message'}}, 'test2')
args = Args()
args.verbose = False
Scar().init(args)
output = TestScar.capturedOutput.getvalue()
self.assertTrue("Error creating log groups:" in output)
self.assertTrue("An error occurred (42) when calling the test2 operation: test_message" in output)
def test_init_existing_log_group(self, mock_aws_client):
mock_aws_client.return_value.get_access_key.return_value = 'test_key'
mock_aws_client.return_value.get_lambda.return_value.create_function.return_value = {'FunctionArn':'arn123',
'Timeout':'300',
'MemorySize':'512',
'FunctionName':'f1-name',
'Extra1':'e1',
'Extra2':'e2'}
mock_aws_client.return_value.get_log.return_value.create_log_group.side_effect = ClientError({'Error' : {'Code' : 'ResourceAlreadyExistsException', 'Message' : 'test_message'}}, 'test2')
args = Args()
args.verbose = False
Scar().init(args)
output = TestScar.capturedOutput.getvalue()
self.assertTrue("Function 'test-name' successfully created.\n" in output)
self.assertTrue("Warning: Using existent log group '/aws/lambda/test-name'\n\n" in output)
def test_has_private_bubble_other_clienterrors(botomock):
def mock_api_call(self, operation_name, api_params):
parsed_response = {'Error': {'Code': '403', 'Message': 'Not found'}}
raise ClientError(parsed_response, operation_name)
urls = (
'https://s3.example.com/private/prefix/',
)
downloader = SymbolDownloader(urls)
# Expect this to raise a ClientError because the bucket ('private')
# doesn't exist. So boto3 would normally trigger a ClientError
# with a code 'Forbidden'.
with botomock(mock_api_call):
with pytest.raises(ClientError):
downloader.has_symbol(
'xul.pdb',
'44E4EC8C2F41492B9369D6B9A059577C2',
'xul.sym'
)
def test_get_stream_private_other_clienterrors(botomock):
def mock_api_call(self, operation_name, api_params):
assert operation_name == 'GetObject'
parsed_response = {
'Error': {'Code': '403', 'Message': 'Forbidden'},
}
raise ClientError(parsed_response, operation_name)
urls = (
'https://s3.example.com/private/prefix/',
)
downloader = SymbolDownloader(urls)
with botomock(mock_api_call):
stream = downloader.get_symbol_stream(
'xul.pdb',
'44E4EC8C2F41492B9369D6B9A059577C2',
'xul.sym'
)
with pytest.raises(ClientError):
next(stream)
def key_existing(client, bucket, key):
"""return a tuple of (
key's size if it exists or 0,
S3 key metadata
)
If the file doesn't exist, return None for the metadata.
"""
# Return 0 if the key can't be found so the memoize cache can cope
try:
response = client.head_object(
Bucket=bucket,
Key=key,
)
return response['ContentLength'], response.get('Metadata')
except ClientError as exception:
if exception.response['Error']['Code'] == '404':
return 0, None
raise
except (ReadTimeout, socket.timeout) as exception:
logger.info(
f'ReadTimeout trying to list_objects_v2 for {bucket}:'
f'{key} ({exception})'
)
return 0, None
def is_timeout(ex, op_name=None):
"""Check the exception to determine if it is a Boto3 ClientError
thrown because the task timed out.
Args:
ex (Exception) : Exception to check
op_name (string|None) : (Optional) name of the operation that was attempted
Returns:
bool : If the exception was caused by the task timing out
"""
try:
rst = ex.response['Error']['Code'] == 'TaskTimedOut'
if op_name:
rst &= ex.operation_name == op_name
return rst
except:
return False
def success(self, output):
"""Marks the task successfully complete and returns the processed data
Note: This method will silently fail if the task has timed out
Args:
output (string|dict): Json response to return to the state machine
"""
if self.token is None:
raise Exception("Not currently working on a task")
output = json.dumps(output)
try:
resp = self.client.send_task_success(taskToken = self.token,
output = output)
except ClientError as e:
# eat the timeout
if not self.is_timeout(e):
self.log.exception("Error sending task success")
raise
finally:
self.token = None # finished with task
def failure(self, error, cause):
"""Marks the task as a failure with a given reason
Note: This method will silently fail if the task has timed out
Args:
error (string): Failure error
cause (string): Failure error cause
"""
if self.token is None:
raise Exception("Not currently working on a task")
try:
resp = self.client.send_task_failure(taskToken = self.token,
error = error,
cause = cause)
except ClientError as e:
# eat the timeout
if not self.is_timeout(e):
self.log.exception("Eror sending task failure")
raise
finally:
self.token = None # finished with task
def heartbeat(self, token):
"""Called from the heartbeat thread every X seconds"""
if token is not None and token != self._heartbeat_fail_token:
try:
self.logger.debug('Sending heartbeat for task')
self.heartbeat_sf_client.send_task_heartbeat(taskToken=token)
self._heartbeat_fail_token = None
except ClientError as e:
ecode = e.response['Error'].get('Code', 'Unknown')
if ecode in ['TaskDoesNotExist', 'InvalidToken', 'TaskTimedOut']:
# We set the heartbeat_fail_token so we don't retry a heartbeat for this token.
self._heartbeat_fail_token = token
# We only use debug level logging since the task either deleted or ended.
self.logger.debug('Error sending heartbeat for task: %s', ecode)
else:
self.logger.exception('Error sending heartbeat for task')
except Exception:
self.logger.exception('Error sending heartbeat for task')
def process_DetachVolume(event,volume_id, username):
#
# Capture who did the detach and when
#
logger.info("user: " + username + " detached Volume " + volume_id )
try:
response = client.create_tags(
Resources=[ volume_id ],
Tags=[
{ 'Key': TAG_PREFIX + 'detached_by', 'Value': username },
{ 'Key': TAG_PREFIX + 'detached_date', 'Value': event['detail']['eventTime'] }
]
)
except ClientError as e:
logger.error("unable to tag volume " + volume_id + " with username " + username + ": " + e.message )
# end process_DetachVolume()
def email_user(email, message, account_name):
global ACTION_SUMMARY # This is what we send to the admins
if SEND_EMAIL != "true": return # Abort if we're not supposed to send email
if message == "": return # Don't send an empty message
client = boto3.client('ses')
body = EXPLANATION_HEADER + "\n" + message + "\n\n" + EXPLANATION_FOOTER
try:
response = client.send_email(
Source=FROM_ADDRESS,
Destination={ 'ToAddresses': [ email ] },
Message={
'Subject': { 'Data': email_subject.format(account_name) },
'Body': { 'Text': { 'Data': body } }
}
)
ACTION_SUMMARY = ACTION_SUMMARY + "\nEmail Sent to {}".format(email)
return
except ClientError as e:
print("Failed to send message to {}: {}".format(email, e.message))
ACTION_SUMMARY = ACTION_SUMMARY + "\nERROR: Message to {} was rejected: {}".format(email, e.message)
def modify_bucket_tags(session_factory, buckets, add_tags=(), remove_tags=()):
for bucket in buckets:
client = bucket_client(local_session(session_factory), bucket)
# all the tag marshalling back and forth is a bit gross :-(
new_tags = {t['Key']: t['Value'] for t in add_tags}
for t in bucket.get('Tags', ()):
if (t['Key'] not in new_tags and
not t['Key'].startswith('aws') and
t['Key'] not in remove_tags):
new_tags[t['Key']] = t['Value']
tag_set = [{'Key': k, 'Value': v} for k, v in new_tags.items()]
try:
client.put_bucket_tagging(
Bucket=bucket['Name'], Tagging={'TagSet': tag_set})
except ClientError as e:
log.exception(
'Exception tagging bucket %s: %s', bucket['Name'], e)
continue
def modify_bucket_tags(session_factory, buckets, add_tags=(), remove_tags=()):
for bucket in buckets:
client = bucket_client(local_session(session_factory), bucket)
# all the tag marshalling back and forth is a bit gross :-(
new_tags = {t['Key']: t['Value'] for t in add_tags}
for t in bucket.get('Tags', ()):
if (t['Key'] not in new_tags and
not t['Key'].startswith('aws') and
t['Key'] not in remove_tags):
new_tags[t['Key']] = t['Value']
tag_set = [{'Key': k, 'Value': v} for k, v in new_tags.items()]
try:
client.put_bucket_tagging(
Bucket=bucket['Name'], Tagging={'TagSet': tag_set})
except ClientError as e:
log.exception(
'Exception tagging bucket %s: %s', bucket['Name'], e)
continue