python类Attr()的实例源码

expression.py 文件源码 项目:dynamodb-py 作者: gusibi 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def gte(self, value):  # >=
        # Creates a condition where the attribute is greater than or equal to
        # the value.
        # Attr & Key
        return self._expression_func('gte', value)
expression.py 文件源码 项目:dynamodb-py 作者: gusibi 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def between(self, low_value, high_value):
        # Creates a condition where the attribute is greater than or equal to
        # the low value and less than or equal to the high value.
        # Attr & Key
        return self._expression_func('between', low_value, high_value)
expression.py 文件源码 项目:dynamodb-py 作者: gusibi 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def begins_with(self, value):
        # Creates a condition where the attribute begins with the value
        # Attr & Key
        return self._expression_func('begins_with', value)
expression.py 文件源码 项目:dynamodb-py 作者: gusibi 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def exists(self):
        # Creates a condition where the attribute exists.
        # Attr
        if self.hash_key or self.range_key:
            # ValidationException
            raise ValidationException('Query key condition not supported')
        return self.name, Attr(self.name).exists(), False
expression.py 文件源码 项目:dynamodb-py 作者: gusibi 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def not_exists(self):
        # Creates a condition where the attribute does not exists.
        # Attr
        if self.hash_key or self.range_key:
            # ValidationException
            raise ValidationException('Query key condition not supported')
        return self.name, Attr(self.name).not_exists(), False
autosubnet.py 文件源码 项目:aCloudGuru-AdvancedCloudFormation 作者: acantril 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def deletesubnets(stack_id, ddb_t):
    """ Delete any subnets in the DB for a stack"""
    try:
        response = ddb_t.scan(
            FilterExpression=Attr('StackId').eq(stack_id)
        )
        for item in response['Items']:
            print item
            ddb_t.delete_item(Key={'Cidr' : item['Cidr']})
    except Exception:
        pass
db_utils.py 文件源码 项目:mentii 作者: mentii 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def scanFilter(attributeName, attribute, table):
  return table.scan(FilterExpression=Attr(attributeName).eq(attribute))
support.py 文件源码 项目:awslimits 作者: Yipit 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def get_pending_tickets():
    table = get_tickets_table()
    cases = table.scan(
        FilterExpression=Attr('limit_type').eq('unknown') & Attr('body').ne('N/A')
    )['Items']
    cases = sorted(cases, key=lambda case: case['display_id'], reverse=True)
    return cases
handler.py 文件源码 项目:aws-tailor 作者: alanwill 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def handler(event, context):
    log.debug("Received event {}".format(json.dumps(event)))

    accountInfo = dynamodb.Table(os.environ['TAILOR_TABLENAME_ACCOUNTINFO'])

    try:
        print('context:resource-path', event['context']['resource-path'] == '/cloudtrail/{accountId}')
        print('path:accountId', re.match("^[0-9]{12}$", event['params']['path']['accountId']))
    except Exception as e:
        print(e)
        print("regex not matching any values passed in request")
        raise Exception({"code": "4000", "message": "ERROR: Bad request"})

    # Payload processing logic
    if event['context']['resource-path'] == '/cloudtrail/{accountId}' and \
            re.match("^[0-9]{12}$", event['params']['path']['accountId']):

        requestId = str(uuid.uuid4())
        accountId = event['params']['path']['accountId']
        stage = event['stage-variables']['stage']

        # Check if account is known to Tailor
        getAccountId = accountInfo.scan(
            ProjectionExpression='accountId, accountEmailAddress',
            FilterExpression=Attr('accountId').eq(accountId)
        )

        if getAccountId['Count'] == 0:
            print("Account not found")
            raise Exception({"code": "4040", "message": "ERROR: Not found"})

        elif int(getAccountId['Count']) > 0:

            # Update accountInfo with new requestId
            accountInfo.update_item(
                Key={
                    'accountEmailAddress': getAccountId['Items'][0]['accountEmailAddress']
                },
                UpdateExpression='SET #requestId = :val1',
                ExpressionAttributeNames={'#requestId': "requestId"},
                ExpressionAttributeValues={':val1': requestId}
            )

            # Build Lambda invoke payload
            message = {"requestId": requestId,
                       "accountId": accountId,
                       "accountEmailAddress": getAccountId['Items'][0]['accountEmailAddress']}
            payload = {"message": message}

            # Call Lambda
            awslambda.invoke(
                FunctionName='talr-cloudtrail-' + stage,
                InvocationType='Event',
                Payload=json.dumps(payload),
            )

            return {"code": "2020", "message": "Request Accepted", "requestId": requestId}

    else:
        raise Exception({"code": "4000", "message": "ERROR: Bad request"})
handler.py 文件源码 项目:aws-tailor 作者: alanwill 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def handler(event, context):
    log.debug("Received event {}".format(json.dumps(event)))

    accountInfo = dynamodb.Table(os.environ['TAILOR_TABLENAME_ACCOUNTINFO'])

    try:
        print('context:resource-path', event['context']['resource-path'] == '/cloudabilty')
        print('body-json:accountId', re.match("^[0-9]{12}$", event['body-json']['accountId']))
    except Exception as e:
        print(e)
        print("regex not matching any values passed in request")
        raise Exception({"code": "4000", "message": "ERROR: Bad request"})

    # VPC DNS logic
    if event['context']['resource-path'] == '/cloudability' and \
            re.match("^[0-9]{12}$", event['body-json']['accountId']):

        requestId = str(uuid.uuid4())
        accountId = event['body-json']['accountId']
        stage = event['stage-variables']['stage']

        # Check if account already exists
        getAccountId = accountInfo.scan(
            ProjectionExpression='accountId, accountEmailAddress',
            FilterExpression=Attr('accountId').eq(accountId)
        )

        if getAccountId['Count'] == 0:
            print("Account not found")
            raise Exception({"code": "4040", "message": "ERROR: Not found"})

        elif int(getAccountId['Count']) > 0:

            # Update accountInfo with new requestId
            accountInfo.update_item(
                Key={
                    'accountEmailAddress': getAccountId['Items'][0]['accountEmailAddress']
                },
                UpdateExpression='SET #requestId = :val1',
                ExpressionAttributeNames={'#requestId': "requestId"},
                ExpressionAttributeValues={':val1': requestId}
            )

            # Build Lambda invoke payload
            message = {"requestId": requestId, "accountId": accountId, "accountEmailAddress": getAccountId['Items'][0]['accountEmailAddress'] }
            payload = {"Records": [{"Sns": {"Message": message}}]}

            # Call Lambda
            awslambda.invoke(
                FunctionName='talr-cloudability-' + stage,
                InvocationType='Event',
                Payload=json.dumps(payload),
            )

            return {"code": "2020", "message": "Request Accepted", "requestId": requestId}

    else:
        raise Exception({"code": "4000", "message": "ERROR: Bad request"})
table.py 文件源码 项目:dynamorm 作者: NerdWalletOSS 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def Q(**mapping):
    """A Q object represents an AND'd together query using boto3's Attr object, based on a set of keyword arguments that
    support the full access to the operations (eq, ne, between, etc) as well as nested attributes.

    It can be used input to both scan operations as well as update conditions.
    """
    expression = None

    while len(mapping):
        attr, value = mapping.popitem()

        parts = attr.split('__')
        attr = Attr(parts.pop(0))
        op = 'eq'

        while len(parts):
            if not hasattr(attr, parts[0]):
                # this is a nested field, extend the attr
                attr = Attr('.'.join([attr.name, parts.pop(0)]))
            else:
                op = parts.pop(0)
                break

        assert len(parts) == 0, "Left over parts after parsing query attr"

        op = getattr(attr, op)
        try:
            attr_expression = op(value)
        except TypeError:
            # A TypeError calling our attr op likely means we're invoking exists, not_exists or another op that
            # doesn't take an arg or takes multiple args. If our value is True then we try to re-call the op
            # function without any arguments, if our value is a list we use it as the arguments for the function,
            # otherwise we bubble it up.
            if value is True:
                attr_expression = op()
            elif isinstance(value, collections.Iterable):
                attr_expression = op(*value)
            else:
                raise

        try:
            expression = expression & attr_expression
        except TypeError:
            expression = attr_expression

    return expression
expression.py 文件源码 项目:dynamodb-py 作者: gusibi 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def is_in(self, value):
        # Creates a condition where the attribute is in the value
        # Attr
        if self.hash_key or self.range_key:
            # ValidationException
            raise ValidationException('Query key condition not supported')
        return self.name, Attr(self.name).is_in(value), False


问题


面经


文章

微信
公众号

扫码关注公众号