def parse_filters(self, filters, doc_class):
index_name = None
filter_expression_list = []
query_params = {}
for idx, filter in enumerate(filters):
prop_name, prop_value = filter.split(':')[3:5]
if idx == 0:
prop = doc_class()._base_properties[prop_name]
index_name = prop.kwargs.get(self.index_field_name, None) or \
self.default_index_name.format(prop_name)
query_params['KeyConditionExpression'] = Key(prop_name).eq(prop_value)
else:
filter_expression_list.append(Attr(prop_name).eq(prop_value))
if len(filter_expression_list) > 1:
query_params['FilterExpression'] = And(*filter_expression_list)
elif len(filter_expression_list) == 1:
query_params['FilterExpression'] = filter_expression_list[0]
if index_name != '_id':
query_params['IndexName'] = index_name
return query_params
python类Key()的实例源码
def get_waiting_tasks(self, concurrency_key):
"""
Returns list of waiting tasks with the specified concurrency key
:param concurrency_key: concurrency key of the tasks
:return: concurrency_key: list of waiting tasks
"""
args = {
"IndexName": "WaitForExecutionTasks",
"Select": "ALL_ATTRIBUTES",
"KeyConditionExpression": Key(TASK_TR_CONCURRENCY_ID).eq(concurrency_key),
"FilterExpression": Attr(TASK_TR_STATUS).eq(STATUS_WAITING)
}
waiting_list = []
while True:
resp = self._action_table.query_with_retries(**args)
waiting_list += resp.get("Items", [])
last = resp.get("LastEvaluatedKey")
if last is not None:
args["ExclusiveStartKey"] = last
else:
break
return waiting_list
def remove_peer_from_info_hash_dynamo(
info_hash,
peer_id,
):
ensure_torrent_exists(info_hash)
# Update the torrents list with the new information
result = table.update_item(
Key={
'info_hash': info_hash,
},
UpdateExpression="REMOVE peers.#s",
ExpressionAttributeNames={
'#s': peer_id,
},
ReturnValues="UPDATED_NEW"
)
if result['ResponseMetadata']['HTTPStatusCode'] == 200 and 'Attributes' in result:
return True
return False
def increment_completed_dynamo(info_hash):
"""
Atomic increment completed for a torrent.
"""
ensure_torrent_exists(info_hash)
# Update the torrents list with the new information
result = table.update_item(
Key={
'info_hash': info_hash,
},
UpdateExpression="SET completed = completed + :incr",
ExpressionAttributeValues={
':incr': 1,
},
ReturnValues="UPDATED_NEW"
)
if result['ResponseMetadata']['HTTPStatusCode'] == 200 and 'Attributes' in result:
return True
return False
def get_item(self, data):
"""Method to get an item
Args:
data (dict): A dictionary of attributes to put
Returns:
(dict)
"""
try:
response = self.table.get_item(Key=data,
ConsistentRead=True)
except ClientError as err:
raise IOError("Error getting item: {}".format(err.message))
if response['ResponseMetadata']['HTTPStatusCode'] != 200:
raise IOError("Error getting item: {}".format(response['ResponseMetadata']))
if "Item" in response:
return response["Item"]
else:
return None
def delete_item(self, data):
"""Method to get an item
Args:
data (dict): A dictionary of attributes to access an item (hash and sort keys)
Returns:
None
"""
try:
response = self.table.delete_item(Key=data)
except ClientError as err:
raise IOError("Error deleting item: {}".format(err.message))
if response['ResponseMetadata']['HTTPStatusCode'] != 200:
raise IOError("Error deleting item: {}".format(response['ResponseMetadata']))
def update_attribute(self, key_dict, attribute_name, attribute_value):
"""Method to update a single attribute in a record
Args:
key_dict (dict): A dictionary containing the keys/values to query on. Supports simple and compound keys
attribute_name (str):
attribute_value (str):
Returns:
None
"""
response = self.table.update_item(Key=key_dict,
UpdateExpression="SET {} = :updated".format(attribute_name),
ExpressionAttributeValues={':updated': '{}'.format(attribute_value)})
if response['ResponseMetadata']['HTTPStatusCode'] != 200:
raise Exception("Error getting item: {}".format(response['ResponseMetadata']))
# TODO: Check if any sort of validation on a update should done. DynamoDB seems lax here.
#if "Attributes" in response:
# if len(response["Attributes"]) == 0:
# raise ValueError("Specified key does not exist. Update failed.")
#else:
# raise ValueError("Specified key does not exist. Update failed.")
def increment_attribute(self, key_dict, attribute_name, increment_value):
"""Method to increment a single attribute in a record
Args:
key_dict (dict): A dictionary containing the keys/values to query on. Supports simple and compound keys
attribute_name (str): The attribute to increment
increment_value (int): The amount to increment the attribute by
Returns:
None
"""
response = self.table.update_item(Key=key_dict,
UpdateExpression="SET {} = {} + :increment".format(attribute_name,
attribute_name),
ExpressionAttributeValues={':increment': increment_value},
ReturnValues="UPDATED_NEW")
if response['ResponseMetadata']['HTTPStatusCode'] != 200:
raise Exception("Error getting item: {}".format(response['ResponseMetadata']))
def set_message_read(user_id, msg_id):
try:
r=get_history_table(new_session=True).update_item(
Key={'userId':user_id,
'messageId':msg_id},
UpdateExpression="set is_read = :a",
ExpressionAttributeValues={':a': 1},
ConditionExpression="is_read <> :a")
LOGGER.info("Read-receipted user_id={0} message_id={1}".format(user_id,msg_id))
return True
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] == 'ConditionalCheckFailedException':
LOGGER.info("Message already read user_id={0}, msg_id={1}".format(user_id,msg_id))
return False
else:
LOGGER.exception("Eror updating read setting for user_id={0}, msg_id={1}".format(user_id,msg_id))
return False
def _analyze_table(self):
# First check the Key Schema
if len(self.table.key_schema) != 1:
LOG.info('cruddy does not support RANGE keys')
else:
self._indexes[self.table.key_schema[0]['AttributeName']] = None
# Now process any GSI's
if self.table.global_secondary_indexes:
for gsi in self.table.global_secondary_indexes:
# find HASH of GSI, that's all we support for now
# if the GSI has a RANGE, we ignore it for now
if len(gsi['KeySchema']) == 1:
gsi_hash = gsi['KeySchema'][0]['AttributeName']
self._indexes[gsi_hash] = gsi['IndexName']
# Because the Boto3 DynamoDB client turns all numeric types into Decimals
# (which is actually the right thing to do) we need to convert those
# Decimal values back into integers or floats before serializing to JSON.
def increment_counter(self, id, counter_name, increment=1,
id_name='id', **kwargs):
"""
Atomically increments a counter attribute in the item identified by
``id``. You must specify the name of the attribute as ``counter_name``
and, optionally, the ``increment`` which defaults to ``1``.
"""
response = self._new_response()
if self._check_supported_op('increment_counter', response):
params = {
'Key': {id_name: id},
'UpdateExpression': 'set #ctr = #ctr + :val',
'ExpressionAttributeNames': {"#ctr": counter_name},
'ExpressionAttributeValues': {
':val': decimal.Decimal(increment)},
'ReturnValues': 'UPDATED_NEW'
}
self._call_ddb_method(self.table.update_item, params, response)
if response.status == 'success':
if 'Attributes' in response.raw_response:
self._replace_decimals(response.raw_response)
attr = response.raw_response['Attributes'][counter_name]
response.data = attr
response.prepare()
return response
def list_ids(context, installed_region, aws_account_id=None):
"""Retrieve configuration from DynamoDB and return array of dictionary objects"""
found_configurations = {}
if aws_account_id is None:
aws_account_id = utils.get_owner_id(context)[0]
dynamodb = boto3.resource('dynamodb', region_name=installed_region)
table = dynamodb.Table('ebs_snapshot_configuration')
results = table.query(
KeyConditionExpression=Key('aws_account_id').eq(aws_account_id)
)
for item in results.get('Items', []):
str_item = item.get('configuration', None)
found_configurations[str_item] = item['id']
return found_configurations.values()
def list_configurations(context, installed_region, aws_account_id=None):
"""Retrieve configuration from DynamoDB and return array of dictionary objects"""
found_configurations = {}
if aws_account_id is None:
aws_account_id = utils.get_owner_id(context)[0]
dynamodb = boto3.resource('dynamodb', region_name=installed_region)
table = dynamodb.Table('ebs_snapshot_configuration')
results = table.query(
KeyConditionExpression=Key('aws_account_id').eq(aws_account_id)
)
for item in results.get('Items', []):
str_item = item.get('configuration', None)
try:
json_item = json.loads(str_item)
found_configurations[str_item] = json_item
except Exception as e:
raise EbsSnapperError('error loading configuration', e)
return found_configurations.values()
def get_configuration(context, installed_region, object_id, aws_account_id=None):
"""Retrieve configuration from DynamoDB and return single object"""
if aws_account_id is None:
aws_account_id = utils.get_owner_id(context)[0]
dynamodb = boto3.resource('dynamodb', region_name=installed_region)
table = dynamodb.Table('ebs_snapshot_configuration')
expr = Key('aws_account_id').eq(aws_account_id) & Key('id').eq(object_id)
results = table.query(KeyConditionExpression=expr)
for item in results.get('Items', []):
str_item = item.get('configuration', None)
try:
json_item = json.loads(str_item)
return json_item
except Exception as e:
raise EbsSnapperError('error loading configuration', e)
return None
def write_meta(self, lookup_key, key, value):
"""
Write the meta data to dyanmodb
Args:
lookup_key: Key for the object requested
key: Meta data key
value: Metadata value
Returns:
"""
response = self.table.put_item(
Item={
'lookup_key': lookup_key,
'key': key,
'metavalue': value,
}
)
return response
def get_meta(self, lookup_key, key):
"""
Retrieve the meta data for a given key
Args:
lookup_key: Key for the object requested
key: Metadata key
Returns:
"""
response = self.table.get_item(
Key={
'lookup_key': lookup_key,
'key': key,
}
)
if 'Item' in response:
return response['Item']
else:
return None
def update_meta(self, lookup_key, key, new_value):
"""
Update the Value for the given key
Args:
lookup_key: Key for the object requested
key: Metadata key
new_value: New meta data value
Returns:
"""
response = self.table.update_item(
Key={
'lookup_key': lookup_key,
'key': key,
},
UpdateExpression='SET metavalue = :val1',
ExpressionAttributeValues={
':val1': new_value
},
ReturnValues='UPDATED_NEW'
)
return response
def get_meta_list(self, lookup_key):
"""
Retrieve all the meta data for a given object using the lookupley
Args:
lookup_key: Key for the object requested
Returns:
"""
response = self.table.query(
KeyConditionExpression=Key('lookup_key').eq(lookup_key)
)
if 'Items' in response:
return response['Items']
else:
return None
def lambda_handler(event, context):
wotd = table.query(
KeyConditionExpression=Key('language').eq(event['request']['intent']['slots']["Language"]["value"].lower()) & Key('id').gt(0),
FilterExpression=Attr('isActive').eq(True)
)
print(wotd['Items'])
item = wotd["Items"]
print(item)
parsed = '<speak>The ' + event["request"]["intent"]["slots"]["Language"]["value"] + ' word of the day is <audio src="' + item[0]["word_sound"] + \
'"/> which means ' + item[0]["word_translation"] + \
'. Here is the word used in a sentence. <audio src="' + item[0]["sentence_sound"] + \
'"/> which means ' + item[0]["sentence_translation"] + '</speak>'
response = {
'version': '1.0',
'response': {
'outputSpeech': {
'type': 'SSML',
'ssml' : parsed
}
}
}
return response
def query_with_limit_and_filter_by_boto3():
table = dynamodb.Table('Movies')
print("Movies from 1992 - titles A-L, with genres and lead actor")
try:
response = table.query(
ProjectionExpression="#yr, title, info.genres, info.actors[0]",
ExpressionAttributeNames={"#yr": "year"}, # Expression Attribute Names for Projection Expression only.
KeyConditionExpression=Key('year').eq(1992) & Key('title').between('A', 'L'),
FilterExpression=Attr('rating').lt(decimal.Decimal(str('7.0'))),
Limit=10,
)
except ClientError as e:
print(e.response['Error']['Message'])
else:
items = response['Items']
for i in items:
print(i['year'], ":", i['title'])
def updatePasswordForEmailAndResetId(email, password, resetPasswordId, dbInstance):
res = None
user = getUserByEmail(email, dbInstance)
if user is not None:
storedResetPasswordId = user.get('resetPasswordId', None)
if storedResetPasswordId == resetPasswordId:
table = dbUtils.getTable('users', dbInstance)
if table is not None:
hashedPassword = hashPassword(password)
jsonData = {
'Key': {'email': email},
'UpdateExpression': 'SET password = :a REMOVE resetPasswordId',
'ExpressionAttributeValues': { ':a': hashedPassword },
'ReturnValues' : 'UPDATED_NEW'
}
res = dbUtils.updateItem(jsonData, table)
return res
def joinClass(jsonData, dynamoDBInstance, email=None, userRole=None):
response = ControllerResponse()
#g will be not be available during testing
#and email will need to be passed to the function
if g: # pragma: no cover
email = g.authenticatedUser['email']
userRole = g.authenticatedUser['userRole']
if 'code' not in jsonData.keys() or not jsonData['code']:
response.addError('Key Missing Error', 'class code missing from data')
elif userRole == 'teacher' or userRole == 'admin':
if class_ctrl.isCodeInTaughtList(jsonData, dynamoDBInstance, email):
response.addError('Role Error', 'Teachers cannot join their taught class as a student')
else:
classCode = jsonData['code']
addDataToClassAndUser(classCode, email, response, dynamoDBInstance)
else:
classCode = jsonData['code']
addDataToClassAndUser(classCode, email, response, dynamoDBInstance)
return response
def addClassCodeToStudent(email, classCode, dynamoDBInstance):
userTable = dbUtils.getTable('users', dynamoDBInstance)
if userTable:
codeSet = set([classCode])
addClassToUser = {
'Key': {'email': email},
'UpdateExpression': 'ADD classCodes :i',
'ExpressionAttributeValues': { ':i': codeSet },
'ReturnValues' : 'UPDATED_NEW'
}
res = dbUtils.updateItem(addClassToUser, userTable)
if ( res and
'Attributes' in res and
'classCodes' in res['Attributes'] and
classCode in res['Attributes']['classCodes']
):
return res['Attributes']['classCodes']
return None
def addStudentToClass(classCode, email, dynamoDBInstance):
classTable = dbUtils.getTable('classes', dynamoDBInstance)
if classTable:
emailSet = set([email])
addUserToClass = {
'Key': {'code': classCode},
'UpdateExpression': 'ADD students :i',
'ExpressionAttributeValues': { ':i': emailSet },
'ReturnValues' : 'ALL_NEW'
}
res = dbUtils.updateItem(addUserToClass, classTable)
if ( res and
'Attributes' in res and
'students' in res['Attributes'] and
email in res['Attributes']['students'] and
'title' in res['Attributes']
):
return res['Attributes']
return None
def getActiveClassList(dynamoDBInstance, email=None):
response = ControllerResponse()
usersTable = dbUtils.getTable('users', dynamoDBInstance)
classTable = dbUtils.getTable('classes', dynamoDBInstance)
if usersTable is None or classTable is None:
response.addError( 'Get Active Class List Failed',
'Unable to access users and/or classes')
else :
if email is None: # pragma: no cover
email = g.authenticatedUser['email']
classes = []
classCodes = getClassCodesFromUser(dynamoDBInstance, email)
for code in classCodes:
request = {'Key': {'code': code}}
res = dbUtils.getItem(request, classTable)
if res is not None and 'Item' in res:
classes.append(res['Item'])
response.addToPayload('classes', classes)
return response
def getClassCodesFromUser(dynamoDBInstance, email=None):
classCodes = set()
if email is None: # pragma: no cover
email = g.authenticatedUser['email']
usersTable = dbUtils.getTable('users', dynamoDBInstance)
if usersTable is None:
MentiiLogging.getLogger().error('Unable to get users table in getClassCodesFromUser')
else:
#An active class list is the list of class codes that
# a user has in the user table.
request = {"Key" : {"email": email}, "ProjectionExpression": "classCodes"}
res = dbUtils.getItem(request, usersTable)
#Get the class codes for the user.
if res is None or 'Item' not in res or 'classCodes' not in res['Item']:
MentiiLogging.getLogger().error('Unable to get user data in getClassCodesFromUser')
else:
classCodes = res['Item']['classCodes']
return classCodes
def getTaughtClassCodesFromUser(dynamoDBInstance, email=None):
classCodes = None
if email is None: # pragma: no cover
email = g.authenticatedUser['email']
usersTable = dbUtils.getTable('users', dynamoDBInstance)
if usersTable is None:
MentiiLogging.getLogger().error('Unable to get users table in getTaughtClassCodesFromUser')
else:
#An active class list is the list of class codes that
# a user has in the user table.
request = {'Key' : {'email': email}, 'ProjectionExpression': 'teaching'}
res = dbUtils.getItem(request, usersTable)
#Get the class codes for the user.
if res is not None and 'Item' in res:
classCodes = res['Item'].get('teaching', [])
return classCodes
def buildUpdateJsonData(keyName, keyValue, attributeName, attributeValue):
jsonData = {}
if len(attributeValue) == 0:
#remove attribute
jsonData = {
'Key': {keyName : keyValue},
'UpdateExpression': 'REMOVE '+ attributeName,
'ReturnValues' : 'UPDATED_NEW'
}
else:
#update attribute
jsonData = {
'Key': {keyName : keyValue},
'UpdateExpression': 'SET ' + attributeName + ' = :v',
'ExpressionAttributeValues': { ':v': attributeValue },
'ReturnValues' : 'UPDATED_NEW'
}
return jsonData
def getItem(jsonData, table):
if (type(jsonData) == str):
data = json.loads(jsonData)
else:
data = jsonData
projection_expression = data.get("ProjectionExpression")
key = data.get("Key")
if key is None:
message = "Unable to get item. Missing Key"
logger.error(message)
return None
if projection_expression is not None:
response = table.get_item(Key=key,ProjectionExpression=projection_expression)
else:
response = table.get_item(Key=key)
return response
def update_ticket(form):
table = get_tickets_table()
limit_type = form.limit_type.data
table.update_item(
Key={
"display_id": form.display_id.data,
},
AttributeUpdates={
'limit_type': {
'Value': limit_type,
'Action': 'PUT',
},
'limit_value': {
'Value': form.limit_value.data,
'Action': 'PUT',
},
})
update_limit_value(limit_type)