python类ClientError()的实例源码

test_amazon_driver.py 文件源码 项目:CAL 作者: HPCC-Cloud-Computing 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_list_container_objects_failed(self):
        self.mock_object(
            self.fake_driver.client, 'list_objects',
            mock.Mock(side_effect=ClientError(
                fake_error_code_resp,
                'ListObjects'
            ))
        )

        self.assertRaises(ClientError,
                          self.fake_driver.list_container_objects,
                          'fake-container',
                          prefix=None,
                          delimiter=None)
        self.fake_driver.client.list_objects.assert_called_once_with(
            Bucket='fake-container',
            Prefix=None,
            Delimiter=None,
        )
aws.py 文件源码 项目:donatemates 作者: donatemates 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def get_item(self, data):
        """Method to get an item

        Args:
            data (dict): A dictionary of attributes to put

        Returns:
            (dict)
        """
        try:
            response = self.table.get_item(Key=data,
                                           ConsistentRead=True)
        except ClientError as err:
            raise IOError("Error getting item: {}".format(err.message))

        if response['ResponseMetadata']['HTTPStatusCode'] != 200:
            raise IOError("Error getting item: {}".format(response['ResponseMetadata']))

        if "Item" in response:
            return response["Item"]
        else:
            return None
aws.py 文件源码 项目:donatemates 作者: donatemates 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def delete_item(self, data):
        """Method to get an item

        Args:
            data (dict): A dictionary of attributes to access an item (hash and sort keys)

        Returns:
            None
        """
        try:
            response = self.table.delete_item(Key=data)

        except ClientError as err:
            raise IOError("Error deleting item: {}".format(err.message))

        if response['ResponseMetadata']['HTTPStatusCode'] != 200:
            raise IOError("Error deleting item: {}".format(response['ResponseMetadata']))
scar.py 文件源码 项目:scar 作者: grycap 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def create_trigger_from_bucket(self, bucket_name, function_arn):
        notification = { "LambdaFunctionConfigurations": [
                            { "LambdaFunctionArn": function_arn,
                              "Events": [ "s3:ObjectCreated:*" ],
                              "Filter":
                                { "Key":
                                    { "FilterRules": [
                                        { "Name": "prefix",
                                          "Value": "input/"
                                        }]
                                    }
                                }
                            }]
                        }
        try:
            self.get_s3().put_bucket_notification_configuration( Bucket=bucket_name,
                                                                 NotificationConfiguration=notification )
        except ClientError as ce:
            print ("Error configuring S3 bucket: %s" % ce)
scar.py 文件源码 项目:scar 作者: grycap 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_functions_arn_list(self):
        arn_list = []
        try:
            # Creation of a function filter by tags
            client = self.get_resource_groups_tagging_api()
            tag_filters = [ { 'Key': 'owner', 'Values': [ self.get_user_name_or_id() ] },
                            { 'Key': 'createdby', 'Values': ['scar'] } ]
            response = client.get_resources(TagFilters=tag_filters,
                                                 TagsPerPage=100)

            for function in response['ResourceTagMappingList']:
                arn_list.append(function['ResourceARN'])

            while ('PaginationToken' in response) and (response['PaginationToken']):
                response = client.get_resources(PaginationToken=response['PaginationToken'],
                                                TagFilters=tag_filters,
                                                TagsPerPage=100)
                for function in response['ResourceTagMappingList']:
                    arn_list.append(function['ResourceARN'])

        except ClientError as ce:
            print ("Error getting function arn by tag: %s" % ce)

        return arn_list
TestScar.py 文件源码 项目:scar 作者: grycap 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_init_retention_policy_error(self, mock_aws_client):
        mock_aws_client.return_value.get_access_key.return_value = 'test_key'
        mock_aws_client.return_value.get_lambda.return_value.create_function.return_value = {'FunctionArn':'arn123',
                                                                                            'Timeout':'300',
                                                                                            'MemorySize':'512',
                                                                                            'FunctionName':'f1-name',
                                                                                            'Extra1':'e1',
                                                                                            'Extra2':'e2'}
        mock_aws_client.return_value.get_log.return_value.put_retention_policy.side_effect = ClientError({'Error' : {'Code' : '42', 'Message' : 'test_message'}}, 'test2')

        args = Args()
        args.verbose = False
        Scar().init(args)        
        output = TestScar.capturedOutput.getvalue()
        self.assertTrue("Error setting log retention policy:" in output)
        self.assertTrue("An error occurred (42) when calling the test2 operation: test_message" in output)
TestScar.py 文件源码 项目:scar 作者: grycap 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_init_event_source_error(self, mock_aws_client):
        mock_aws_client.return_value.get_access_key.return_value = 'test_key'
        mock_aws_client.return_value.get_lambda.return_value.create_function.return_value = {'FunctionArn':'arn123',
                                                                                            'Timeout':'300',
                                                                                            'MemorySize':'512',
                                                                                            'FunctionName':'f1-name',
                                                                                            'Extra1':'e1',
                                                                                            'Extra2':'e2'}
        mock_aws_client.return_value.check_and_create_s3_bucket.side_effect = ClientError({'Error' : {'Code' : '42', 'Message' : 'test_message'}}, 'test2')

        args = Args()
        args.verbose = False
        args.event_source = True       
        Scar().init(args)        
        output = TestScar.capturedOutput.getvalue()
        self.assertTrue("Error creating the event source:" in output)
        self.assertTrue("An error occurred (42) when calling the test2 operation: test_message" in output)
TestScar.py 文件源码 项目:scar 作者: grycap 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_init_log_group_error(self, mock_aws_client):
        mock_aws_client.return_value.get_access_key.return_value = 'test_key'
        mock_aws_client.return_value.get_lambda.return_value.create_function.return_value = {'FunctionArn':'arn123',
                                                                                            'Timeout':'300',
                                                                                            'MemorySize':'512',
                                                                                            'FunctionName':'f1-name',
                                                                                            'Extra1':'e1',
                                                                                            'Extra2':'e2'}
        mock_aws_client.return_value.get_log.return_value.create_log_group.side_effect = ClientError({'Error' : {'Code' : '42', 'Message' : 'test_message'}}, 'test2')

        args = Args()
        args.verbose = False
        Scar().init(args)        
        output = TestScar.capturedOutput.getvalue()
        self.assertTrue("Error creating log groups:" in output)
        self.assertTrue("An error occurred (42) when calling the test2 operation: test_message" in output)
TestScar.py 文件源码 项目:scar 作者: grycap 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_init_existing_log_group(self, mock_aws_client):
        mock_aws_client.return_value.get_access_key.return_value = 'test_key'
        mock_aws_client.return_value.get_lambda.return_value.create_function.return_value = {'FunctionArn':'arn123',
                                                                                            'Timeout':'300',
                                                                                            'MemorySize':'512',
                                                                                            'FunctionName':'f1-name',
                                                                                            'Extra1':'e1',
                                                                                            'Extra2':'e2'}
        mock_aws_client.return_value.get_log.return_value.create_log_group.side_effect = ClientError({'Error' : {'Code' : 'ResourceAlreadyExistsException', 'Message' : 'test_message'}}, 'test2')

        args = Args()
        args.verbose = False
        Scar().init(args)        
        output = TestScar.capturedOutput.getvalue()
        self.assertTrue("Function 'test-name' successfully created.\n" in output)
        self.assertTrue("Warning: Using existent log group '/aws/lambda/test-name'\n\n" in output)
test_symboldownloader.py 文件源码 项目:tecken 作者: mozilla-services 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_has_private_bubble_other_clienterrors(botomock):

    def mock_api_call(self, operation_name, api_params):
        parsed_response = {'Error': {'Code': '403', 'Message': 'Not found'}}
        raise ClientError(parsed_response, operation_name)

    urls = (
        'https://s3.example.com/private/prefix/',
    )
    downloader = SymbolDownloader(urls)
    # Expect this to raise a ClientError because the bucket ('private')
    # doesn't exist. So boto3 would normally trigger a ClientError
    # with a code 'Forbidden'.
    with botomock(mock_api_call):
        with pytest.raises(ClientError):
            downloader.has_symbol(
                'xul.pdb',
                '44E4EC8C2F41492B9369D6B9A059577C2',
                'xul.sym'
            )
test_symboldownloader.py 文件源码 项目:tecken 作者: mozilla-services 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_get_stream_private_other_clienterrors(botomock):

    def mock_api_call(self, operation_name, api_params):
        assert operation_name == 'GetObject'
        parsed_response = {
            'Error': {'Code': '403', 'Message': 'Forbidden'},
        }
        raise ClientError(parsed_response, operation_name)

    urls = (
        'https://s3.example.com/private/prefix/',
    )
    downloader = SymbolDownloader(urls)
    with botomock(mock_api_call):
        stream = downloader.get_symbol_stream(
            'xul.pdb',
            '44E4EC8C2F41492B9369D6B9A059577C2',
            'xul.sym'
        )
        with pytest.raises(ClientError):
            next(stream)
utils.py 文件源码 项目:tecken 作者: mozilla-services 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def key_existing(client, bucket, key):
    """return a tuple of (
        key's size if it exists or 0,
        S3 key metadata
    )
    If the file doesn't exist, return None for the metadata.
    """
    # Return 0 if the key can't be found so the memoize cache can cope
    try:
        response = client.head_object(
            Bucket=bucket,
            Key=key,
        )
        return response['ContentLength'], response.get('Metadata')
    except ClientError as exception:
        if exception.response['Error']['Code'] == '404':
            return 0, None
        raise
    except (ReadTimeout, socket.timeout) as exception:
        logger.info(
            f'ReadTimeout trying to list_objects_v2 for {bucket}:'
            f'{key} ({exception})'
        )
        return 0, None
activities.py 文件源码 项目:heaviside 作者: jhuapl-boss 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def is_timeout(ex, op_name=None):
        """Check the exception to determine if it is a Boto3 ClientError
        thrown because the task timed out.

        Args:
            ex (Exception) : Exception to check
            op_name (string|None) : (Optional) name of the operation that was attempted

        Returns:
            bool : If the exception was caused by the task timing out
        """
        try:
            rst = ex.response['Error']['Code'] == 'TaskTimedOut'
            if op_name:
                rst &= ex.operation_name == op_name
            return rst
        except:
            return False
activities.py 文件源码 项目:heaviside 作者: jhuapl-boss 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def success(self, output):
        """Marks the task successfully complete and returns the processed data

        Note: This method will silently fail if the task has timed out

        Args:
            output (string|dict): Json response to return to the state machine
        """
        if self.token is None:
            raise Exception("Not currently working on a task")

        output = json.dumps(output)

        try:
            resp = self.client.send_task_success(taskToken = self.token,
                                                 output = output)
        except ClientError as e:
            # eat the timeout
            if not self.is_timeout(e):
                self.log.exception("Error sending task success")
                raise
        finally:
            self.token = None # finished with task
activities.py 文件源码 项目:heaviside 作者: jhuapl-boss 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def failure(self, error, cause):
        """Marks the task as a failure with a given reason

        Note: This method will silently fail if the task has timed out

        Args:
            error (string): Failure error
            cause (string): Failure error cause
        """
        if self.token is None:
            raise Exception("Not currently working on a task")

        try:
            resp = self.client.send_task_failure(taskToken = self.token,
                                                 error = error,
                                                 cause = cause)
        except ClientError as e:
            # eat the timeout
            if not self.is_timeout(e):
                self.log.exception("Eror sending task failure")
                raise
        finally:
            self.token = None # finished with task
worker.py 文件源码 项目:stefuna 作者: irothschild 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def heartbeat(self, token):
        """Called from the heartbeat thread every X seconds"""
        if token is not None and token != self._heartbeat_fail_token:
            try:
                self.logger.debug('Sending heartbeat for task')
                self.heartbeat_sf_client.send_task_heartbeat(taskToken=token)
                self._heartbeat_fail_token = None

            except ClientError as e:
                ecode = e.response['Error'].get('Code', 'Unknown')
                if ecode in ['TaskDoesNotExist', 'InvalidToken', 'TaskTimedOut']:
                    # We set the heartbeat_fail_token so we don't retry a heartbeat for this token.
                    self._heartbeat_fail_token = token
                    # We only use debug level logging since the task either deleted or ended.
                    self.logger.debug('Error sending heartbeat for task: %s', ecode)
                else:
                    self.logger.exception('Error sending heartbeat for task')
            except Exception:
                self.logger.exception('Error sending heartbeat for task')
tag_ebs.py 文件源码 项目:aws-account-automation 作者: jchrisfarris 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def process_DetachVolume(event,volume_id, username):
  #
  # Capture who did the detach and when
  #
  logger.info("user: " + username + " detached Volume " + volume_id )
  try: 
    response = client.create_tags(
      Resources=[ volume_id ],
      Tags=[ 
        { 'Key': TAG_PREFIX + 'detached_by', 'Value': username },
        { 'Key': TAG_PREFIX + 'detached_date', 'Value':  event['detail']['eventTime'] }
      ]
    )
  except ClientError as e:
    logger.error("unable to tag volume " + volume_id + " with username " + username + ": " + e.message )
# end process_DetachVolume()
ExpireUsers.py 文件源码 项目:aws-account-automation 作者: jchrisfarris 项目源码 文件源码 阅读 50 收藏 0 点赞 0 评论 0
def email_user(email, message, account_name):
    global ACTION_SUMMARY # This is what we send to the admins
    if SEND_EMAIL != "true": return # Abort if we're not supposed to send email
    if message == "": return # Don't send an empty message
    client = boto3.client('ses')
    body = EXPLANATION_HEADER + "\n" + message + "\n\n" + EXPLANATION_FOOTER
    try: 
        response = client.send_email(
            Source=FROM_ADDRESS,
            Destination={ 'ToAddresses': [ email ] },
            Message={
                'Subject': { 'Data': email_subject.format(account_name) },
                'Body': { 'Text': { 'Data': body } }
            }
        )
        ACTION_SUMMARY = ACTION_SUMMARY + "\nEmail Sent to {}".format(email)
        return
    except ClientError as e:
        print("Failed to send message to {}: {}".format(email, e.message))
        ACTION_SUMMARY = ACTION_SUMMARY + "\nERROR: Message to {} was rejected: {}".format(email, e.message)
s3.py 文件源码 项目:Cloud-Custodian 作者: jtroberts83 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def modify_bucket_tags(session_factory, buckets, add_tags=(), remove_tags=()):
    for bucket in buckets:
        client = bucket_client(local_session(session_factory), bucket)
        # all the tag marshalling back and forth is a bit gross :-(
        new_tags = {t['Key']: t['Value'] for t in add_tags}
        for t in bucket.get('Tags', ()):
            if (t['Key'] not in new_tags and
                    not t['Key'].startswith('aws') and
                    t['Key'] not in remove_tags):
                new_tags[t['Key']] = t['Value']
        tag_set = [{'Key': k, 'Value': v} for k, v in new_tags.items()]
        try:
            client.put_bucket_tagging(
                Bucket=bucket['Name'], Tagging={'TagSet': tag_set})
        except ClientError as e:
            log.exception(
                'Exception tagging bucket %s: %s', bucket['Name'], e)
            continue
s3.py 文件源码 项目:Cloud-Custodian 作者: jtroberts83 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def modify_bucket_tags(session_factory, buckets, add_tags=(), remove_tags=()):
    for bucket in buckets:
        client = bucket_client(local_session(session_factory), bucket)
        # all the tag marshalling back and forth is a bit gross :-(
        new_tags = {t['Key']: t['Value'] for t in add_tags}
        for t in bucket.get('Tags', ()):
            if (t['Key'] not in new_tags and
                    not t['Key'].startswith('aws') and
                    t['Key'] not in remove_tags):
                new_tags[t['Key']] = t['Value']
        tag_set = [{'Key': k, 'Value': v} for k, v in new_tags.items()]
        try:
            client.put_bucket_tagging(
                Bucket=bucket['Name'], Tagging={'TagSet': tag_set})
        except ClientError as e:
            log.exception(
                'Exception tagging bucket %s: %s', bucket['Name'], e)
            continue


问题


面经


文章

微信
公众号

扫码关注公众号