def lookupWorkloadSpecification(self, partitionTargetValue):
try:
dynamodbItem=self.dynDBC.get_item(
TableName=self.workloadSpecificationTableName,
Key={
self.workloadSpecificationPartitionKey : { "S" : partitionTargetValue }
},
ConsistentRead=False,
ReturnConsumedCapacity="TOTAL",
)
except ClientError as e:
self.logger.error('lookupWorkloadSpecification()' + e.response['Error']['Message'])
else:
# Get the dynamoDB Item from the result
resultItem=dynamodbItem['Item']
for attributeName in resultItem:
# Validate the attributes entered into DynamoDB are valid. If not, spit out individual warning messages
if( attributeName in self.workloadSpecificationValidAttributeList ):
attributeValue=resultItem[attributeName].values()[0]
self.logger.info('Workload Attribute [%s maps to %s]' % (attributeName, attributeValue))
self.workloadSpecificationDict[attributeName]=attributeValue
else:
self.logger.warning('Invalid dynamoDB attribute specified->'+str(attributeName)+'<- will be ignored')
python类Key()的实例源码
def purge_expired_peers():
"""
Removes peers who haven't announced in the last internval.
Should be set as a recurring event source in your Zappa config.
"""
if DATASTORE == "DynamoDB":
# This is a costly operation, but I think it has to be done.
# Optimizations (pagination? queries? batching?) welcomed.
all_torrents = table.scan()
for torrent in all_torrents['Items']:
for peer_id in torrent['peers']:
peer_last_announce = int(torrent['peers'][peer_id][0]['last_announce'])
window = datetime.now() - timedelta(seconds=ANNOUNCE_INTERVAL)
window_unix = int(time.mktime(window.timetuple()))
if peer_last_announce < window_unix:
remove_peer_from_info_hash(torrent['info_hash'], peer_id)
else:
# There must be a better way to do this.
# Also, it should probably be done as a recurring function and cache,
# not dynamically every time.
for key in s3_client.list_objects(Bucket=BUCKET_NAME)['Contents']:
if 'peers.json' in key['Key']:
remote_object = s3.Object(BUCKET_NAME, key['Key']).get()
content = remote_object['Body'].read().decode('utf-8')
torrent = json.loads(content)
for peer_id in torrent['peers']:
peer_last_announce = int(torrent['peers'][peer_id]['last_announce'])
window = datetime.now() - timedelta(seconds=ANNOUNCE_INTERVAL)
window_unix = int(time.mktime(window.timetuple()))
if peer_last_announce < window_unix:
remove_peer_from_info_hash(torrent['info_hash'], peer_id)
return
##
# Database
##
def get_peers_for_info_hash_dynamodb(
info_hash,
limit=50
):
"""
Get current peers
"""
response = table.query(
KeyConditionExpression=Key('info_hash').eq(info_hash)
)
if response['Count'] == 0:
return []
else:
return response['Items'][0]['peers']
def get_all_items():
"""
Get all items
"""
if DATASTORE == "DynamoDB":
response = table.scan()
if response['Count'] == 0:
return []
else:
return response['Items']
else:
# We want info_hash, peers, and completed.
items = []
# There must be a better way to do this.
# Also, it should probably be done as a recurring function and cache,
# not dynamically every time.
for key in s3_client.list_objects(Bucket=BUCKET_NAME)['Contents']:
if 'peers.json' in key['Key']:
remote_object = s3.Object(BUCKET_NAME, key['Key']).get()
content = remote_object['Body'].read().decode('utf-8')
torrent_info = json.loads(content)
item = {
'info_hash': torrent_info['info_hash'],
'peers': torrent_info['peers'],
'completed': torrent_info['completed']
}
items.append(item)
return items
###
# Utility
###
# Helper class to convert a DynamoDB item to JSON.
def freeprefix(nipap_deamon_ip, account_cb_alias, account_iam_alias, vpc_network, vpc_prefix):
# Lookup nipap daemon password cipher
nipapCfn = dynamodb.Table(os.environ['TAILOR_TABLENAME_NIPAPCFN'])
getNipapCfn = nipapCfn.get_item(
Key={
'nipapAlias': account_cb_alias
}
)
# Decrypt nipap daemon password
nipapDaemonPasswordCipherBlob = getNipapCfn['Item']['nipapDaemonPasswordCipherBlob']
nipapDeamonPassword = bytes(kms.decrypt(CiphertextBlob=b64decode(nipapDaemonPasswordCipherBlob))['Plaintext'])
# Look up free CIDR block
pynipap.xmlrpc_uri = "http://tailor:" + nipapDeamonPassword.rstrip() + "@" + nipap_deamon_ip + ":1337"
a = pynipap.AuthOptions({
'authoritative_source': 'tailor_nipap_client'
})
# Allocate first available
new_prefix = Prefix()
new_prefix.description = account_iam_alias
new_prefix.type = "assignment"
# Save will communicate with the backend and ask for the next available desired prefix size
new_prefix.save({'from-prefix': [vpc_network], 'prefix_length': vpc_prefix})
# Read the assigned prefix from the new_prefix object
print("VPC Cidr is: ", new_prefix.prefix)
return new_prefix.prefix
def add_dhcp_optionset(la_credentials, la_vpc_id, dns_server_1, dns_server_2, region):
# Initiate linked account ec2 client
laEc2 = boto3.client(
'ec2',
region_name=region,
aws_access_key_id=la_credentials[0],
aws_secret_access_key=la_credentials[1],
aws_session_token=la_credentials[2],
)
if region == 'us-east-1':
domainName = 'ec2.internal'
else:
domainName = region + '.compute.internal'
dhcpOptionsset = laEc2.create_dhcp_options(
DhcpConfigurations=[
{
'Key': 'domain-name-servers',
'Values': [dns_server_1, dns_server_2, 'AmazonProvidedDNS']
},
{
'Key': 'domain-name',
'Values': [domainName]
},
],
)
laEc2.associate_dhcp_options(
DhcpOptionsId=dhcpOptionsset['DhcpOptions']['DhcpOptionsId'],
VpcId=la_vpc_id,
)
return
def _search(self, search_terms, begins_with=None):
"""
Returns a list of Archive id's in the table on Dynamo
"""
kwargs = dict(
ProjectionExpression='#id',
ExpressionAttributeNames={"#id": "_id"})
if len(search_terms) > 0:
kwargs['FilterExpression'] = reduce(
lambda x, y: x & y,
[Attr('tags').contains(arg) for arg in search_terms])
if begins_with:
if 'FilterExpression' in kwargs:
kwargs['FilterExpression'] = kwargs[
'FilterExpression'] & Key('_id').begins_with(begins_with)
else:
kwargs['FilterExpression'] = Key(
'_id').begins_with(begins_with)
while True:
res = self._table.scan(**kwargs)
for r in res['Items']:
yield r['_id']
if 'LastEvaluatedKey' in res:
kwargs['ExclusiveStartKey'] = res['LastEvaluatedKey']
else:
break
def _update(self, archive_name, version_metadata):
'''
Updates the version specific metadata attribute in DynamoDB
In DynamoDB this is simply a list append on this attribute value
Parameters
----------
archive_name: str
unique '_id' primary key
version_metadata: dict
dictionary of version metadata values
Returns
-------
dict
list of dictionaries of version_history
'''
command = "SET version_history = list_append(version_history, :v)"
self._table.update_item(
Key={'_id': archive_name},
UpdateExpression=command,
ExpressionAttributeValues={':v': [version_metadata]},
ReturnValues='ALL_NEW')
def _update_spec_config(self, document_name, spec):
'''
Dynamo implementation of project specific metadata spec
'''
# add the updated archive_metadata object to Dynamo
self._spec_table.update_item(
Key={'_id': '{}'.format(document_name)},
UpdateExpression="SET config = :v",
ExpressionAttributeValues={':v': spec},
ReturnValues='ALL_NEW')
def _update_metadata(self, archive_name, archive_metadata):
"""
Appends the updated_metada dict to the Metadata Attribute list
Parameters
----------
archive_name: str
ID of archive to update
updated_metadata: dict
dictionary of metadata keys and values to update. If the value
for a particular key is `None`, the key is removed.
"""
archive_metadata_current = self._get_archive_metadata(archive_name)
archive_metadata_current.update(archive_metadata)
for k, v in archive_metadata_current.items():
if v is None:
del archive_metadata_current[k]
# add the updated archive_metadata object to Dynamo
self._table.update_item(
Key={'_id': archive_name},
UpdateExpression="SET archive_metadata = :v",
ExpressionAttributeValues={':v': archive_metadata_current},
ReturnValues='ALL_NEW')
def _get_archive_listing(self, archive_name):
'''
Return full document for ``{_id:'archive_name'}``
.. note::
DynamoDB specific results - do not expose to user
'''
return self._table.get_item(Key={'_id': archive_name})['Item']
def _set_tags(self, archive_name, updated_tag_list):
self._table.update_item(
Key={'_id': archive_name},
UpdateExpression="SET tags = :t",
ExpressionAttributeValues={':t': updated_tag_list},
ReturnValues='ALL_NEW')
def clear_all():
for i in find_all():
table.delete_item(Key={'Name': i['Name']})
def query_hash(self, hash_name, hash_value, index=None, forward=True, limit=None, projection=None):
"""Method to query an index
Args:
data (dict): A dictionary of attributes to put
Returns:
(dict)
"""
params = {"ScanIndexForward": forward,
"KeyConditionExpression": Key(hash_name).eq(hash_value)}
if index:
params["IndexName"] = index
else:
# If primary index, consistent read
params["ConsistentRead"] = True
if limit:
params["Limit"] = limit
if projection:
params["ProjectionExpression"] = projection
response = self.table.query(**params)
if response['ResponseMetadata']['HTTPStatusCode'] != 200:
raise Exception("Error getting item: {}".format(response['ResponseMetadata']))
return response["Items"]
def query_24hr(self, hash_key, hash_value, sort_key, date_str):
"""Method to query for a date range for the day provided in the date_str
Date_str must be in ISO-8601
This assumes you are "centering" the 24hr block from midnight-midnight EST
Args:
hash_key (str): Hash key name
sort_key (str): Sort key name
date_str (str): The date string containing the day to query in UTC time
Returns:
list
"""
# Convert ISO time to be EST
date_in = arrow.get(date_str)
date_in_est = date_in.to('EST')
# Compute start date str
start_date = date_in_est.replace(hour=0, minute=0)
# Compute end date str
date_range = start_date.span('day')
response = self.table.update_item(Key={hash_key: hash_value},
KeyConditionExpression="{} >= :morning AND {} <= :midnight".format(sort_key,
sort_key),
ExpressionAttributeValues={':morning': date_range[0].isoformat(),
':midnight': date_range[1].isoformat()},
ReturnValues="UPDATED_NEW")
if response['ResponseMetadata']['HTTPStatusCode'] != 200:
raise Exception("Error getting item: {}".format(response['ResponseMetadata']))
if "Items" in response:
return response["Items"]
else:
return []
def query_most_recent(self, hash_key, hash_value, sort_key, date_str, limit=1):
"""Method to query for the record most recently in the past based on the date_str
Date_str must be in ISO-8601
Args:
hash_key (str): Hash key name
sort_key (str): Sort key name
date_str (str): The date string containing the day to query in UTC time
Returns:
dict
"""
response = self.table.query(KeyConditionExpression=Key(hash_key).eq(hash_value) & Key(sort_key).lte(date_str),
Limit=limit,
ScanIndexForward=False,
Select="ALL_ATTRIBUTES")
if response['ResponseMetadata']['HTTPStatusCode'] != 200:
raise Exception("Error getting item: {}".format(response['ResponseMetadata']))
if "Items" in response:
if response["Items"]:
return response["Items"]
else:
return []
else:
return []
def query_biggest(self, hash_key, hash_value, num_items, index=None, forward=False):
"""Method to query for the largest N records
Args:
hash_key (str): Hash key name
hash_value (str): Hash key value
num_items (int): The number of items to return
index (str): Name of index if not primary
forward (bool): flag indicating sort direction
Returns:
dict
"""
params = {"ScanIndexForward": forward,
"KeyConditionExpression": Key(hash_key).eq(hash_value),
"Select": "ALL_ATTRIBUTES",
"Limit": num_items}
if index:
params["IndexName"] = index
params["Select"] = "ALL_PROJECTED_ATTRIBUTES"
else:
# If primary index, consistent read
params["ConsistentRead"] = True
response = self.table.query(**params)
if response['ResponseMetadata']['HTTPStatusCode'] != 200:
raise Exception("Error getting item: {}".format(response['ResponseMetadata']))
if "Items" in response:
if response["Items"]:
return response["Items"]
else:
return []
else:
return []
def get_data(event):
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table(tablename)
response = table.query(
KeyConditionExpression=Key(hashkey).eq(event[hashkey]) \
& Key(rangekey).eq(event[rangekey])
)
for item in response['Items']:
objkeypair = ast.literal_eval(item['mappings'])
if 'lookup' in event:
return objkeypair[event['lookup']]
else:
return objkeypair
def delete(self, doc_obj):
self._indexer.delete_item(Key={'_id': doc_obj._data['_id']})
def get(self, doc_obj, doc_id):
response = self._indexer.get_item(Key={'_id': doc_obj.get_doc_id(doc_id)})
doc = response.get('Item', None)
if not doc:
raise DocNotFoundError
return doc_obj(**doc)