def gte(self, value): # >=
# Creates a condition where the attribute is greater than or equal to
# the value.
# Attr & Key
return self._expression_func('gte', value)
python类Attr()的实例源码
def between(self, low_value, high_value):
# Creates a condition where the attribute is greater than or equal to
# the low value and less than or equal to the high value.
# Attr & Key
return self._expression_func('between', low_value, high_value)
def begins_with(self, value):
# Creates a condition where the attribute begins with the value
# Attr & Key
return self._expression_func('begins_with', value)
def exists(self):
# Creates a condition where the attribute exists.
# Attr
if self.hash_key or self.range_key:
# ValidationException
raise ValidationException('Query key condition not supported')
return self.name, Attr(self.name).exists(), False
def not_exists(self):
# Creates a condition where the attribute does not exists.
# Attr
if self.hash_key or self.range_key:
# ValidationException
raise ValidationException('Query key condition not supported')
return self.name, Attr(self.name).not_exists(), False
def deletesubnets(stack_id, ddb_t):
""" Delete any subnets in the DB for a stack"""
try:
response = ddb_t.scan(
FilterExpression=Attr('StackId').eq(stack_id)
)
for item in response['Items']:
print item
ddb_t.delete_item(Key={'Cidr' : item['Cidr']})
except Exception:
pass
def scanFilter(attributeName, attribute, table):
return table.scan(FilterExpression=Attr(attributeName).eq(attribute))
def get_pending_tickets():
table = get_tickets_table()
cases = table.scan(
FilterExpression=Attr('limit_type').eq('unknown') & Attr('body').ne('N/A')
)['Items']
cases = sorted(cases, key=lambda case: case['display_id'], reverse=True)
return cases
def handler(event, context):
log.debug("Received event {}".format(json.dumps(event)))
accountInfo = dynamodb.Table(os.environ['TAILOR_TABLENAME_ACCOUNTINFO'])
try:
print('context:resource-path', event['context']['resource-path'] == '/cloudtrail/{accountId}')
print('path:accountId', re.match("^[0-9]{12}$", event['params']['path']['accountId']))
except Exception as e:
print(e)
print("regex not matching any values passed in request")
raise Exception({"code": "4000", "message": "ERROR: Bad request"})
# Payload processing logic
if event['context']['resource-path'] == '/cloudtrail/{accountId}' and \
re.match("^[0-9]{12}$", event['params']['path']['accountId']):
requestId = str(uuid.uuid4())
accountId = event['params']['path']['accountId']
stage = event['stage-variables']['stage']
# Check if account is known to Tailor
getAccountId = accountInfo.scan(
ProjectionExpression='accountId, accountEmailAddress',
FilterExpression=Attr('accountId').eq(accountId)
)
if getAccountId['Count'] == 0:
print("Account not found")
raise Exception({"code": "4040", "message": "ERROR: Not found"})
elif int(getAccountId['Count']) > 0:
# Update accountInfo with new requestId
accountInfo.update_item(
Key={
'accountEmailAddress': getAccountId['Items'][0]['accountEmailAddress']
},
UpdateExpression='SET #requestId = :val1',
ExpressionAttributeNames={'#requestId': "requestId"},
ExpressionAttributeValues={':val1': requestId}
)
# Build Lambda invoke payload
message = {"requestId": requestId,
"accountId": accountId,
"accountEmailAddress": getAccountId['Items'][0]['accountEmailAddress']}
payload = {"message": message}
# Call Lambda
awslambda.invoke(
FunctionName='talr-cloudtrail-' + stage,
InvocationType='Event',
Payload=json.dumps(payload),
)
return {"code": "2020", "message": "Request Accepted", "requestId": requestId}
else:
raise Exception({"code": "4000", "message": "ERROR: Bad request"})
def handler(event, context):
log.debug("Received event {}".format(json.dumps(event)))
accountInfo = dynamodb.Table(os.environ['TAILOR_TABLENAME_ACCOUNTINFO'])
try:
print('context:resource-path', event['context']['resource-path'] == '/cloudabilty')
print('body-json:accountId', re.match("^[0-9]{12}$", event['body-json']['accountId']))
except Exception as e:
print(e)
print("regex not matching any values passed in request")
raise Exception({"code": "4000", "message": "ERROR: Bad request"})
# VPC DNS logic
if event['context']['resource-path'] == '/cloudability' and \
re.match("^[0-9]{12}$", event['body-json']['accountId']):
requestId = str(uuid.uuid4())
accountId = event['body-json']['accountId']
stage = event['stage-variables']['stage']
# Check if account already exists
getAccountId = accountInfo.scan(
ProjectionExpression='accountId, accountEmailAddress',
FilterExpression=Attr('accountId').eq(accountId)
)
if getAccountId['Count'] == 0:
print("Account not found")
raise Exception({"code": "4040", "message": "ERROR: Not found"})
elif int(getAccountId['Count']) > 0:
# Update accountInfo with new requestId
accountInfo.update_item(
Key={
'accountEmailAddress': getAccountId['Items'][0]['accountEmailAddress']
},
UpdateExpression='SET #requestId = :val1',
ExpressionAttributeNames={'#requestId': "requestId"},
ExpressionAttributeValues={':val1': requestId}
)
# Build Lambda invoke payload
message = {"requestId": requestId, "accountId": accountId, "accountEmailAddress": getAccountId['Items'][0]['accountEmailAddress'] }
payload = {"Records": [{"Sns": {"Message": message}}]}
# Call Lambda
awslambda.invoke(
FunctionName='talr-cloudability-' + stage,
InvocationType='Event',
Payload=json.dumps(payload),
)
return {"code": "2020", "message": "Request Accepted", "requestId": requestId}
else:
raise Exception({"code": "4000", "message": "ERROR: Bad request"})
def Q(**mapping):
"""A Q object represents an AND'd together query using boto3's Attr object, based on a set of keyword arguments that
support the full access to the operations (eq, ne, between, etc) as well as nested attributes.
It can be used input to both scan operations as well as update conditions.
"""
expression = None
while len(mapping):
attr, value = mapping.popitem()
parts = attr.split('__')
attr = Attr(parts.pop(0))
op = 'eq'
while len(parts):
if not hasattr(attr, parts[0]):
# this is a nested field, extend the attr
attr = Attr('.'.join([attr.name, parts.pop(0)]))
else:
op = parts.pop(0)
break
assert len(parts) == 0, "Left over parts after parsing query attr"
op = getattr(attr, op)
try:
attr_expression = op(value)
except TypeError:
# A TypeError calling our attr op likely means we're invoking exists, not_exists or another op that
# doesn't take an arg or takes multiple args. If our value is True then we try to re-call the op
# function without any arguments, if our value is a list we use it as the arguments for the function,
# otherwise we bubble it up.
if value is True:
attr_expression = op()
elif isinstance(value, collections.Iterable):
attr_expression = op(*value)
else:
raise
try:
expression = expression & attr_expression
except TypeError:
expression = attr_expression
return expression
def is_in(self, value):
# Creates a condition where the attribute is in the value
# Attr
if self.hash_key or self.range_key:
# ValidationException
raise ValidationException('Query key condition not supported')
return self.name, Attr(self.name).is_in(value), False