def patch(self, slug):
obj = self.resource.model.query.get(slug)
if not obj:
return make_response(jsonify({'error': True, 'message': 'Resource not found'}), 404)
try:
data, status = self.resource.patch_resource(obj)
except (SQLIntegrityError, SQlOperationalError) as e:
db.session.rollback()
e.message['error'] = True
return make_response(jsonify(e.message), e.status)
return make_response(jsonify(data), status)
python类Resource()的实例源码
def delete(self, slug):
obj = self.resource.model.query.get(slug)
if obj:
if self.resource.has_delete_permission(obj):
db.session.delete(obj)
db.session.commit()
return make_response(jsonify({}), 204)
else:
return make_response(
jsonify({'error': True, 'message': 'Forbidden Permission Denied To Delete Resource'}), 403)
return make_response(jsonify({'error': True, 'message': 'Resource not found'}), 404)
def delete(self,ident):
return 'PUT is not a valid command', 202
# Resource Zone Collection API
def delete(self,ident):
return 'PUT is not a valid command', 202
# Resource Block Collection API
def get(self):
fmlogging.debug("Received GET request for all resources.")
resp_data = {}
env_name = request.args.get('env_name')
all_resources = ''
if env_name:
env_obj = env_db.Environment().get_by_name(env_name)
if env_obj:
all_resources = res_db.Resource().get_resources_for_env(env_obj.id)
resp_data['data'] = [res_db.Resource.to_json(res) for res in all_resources]
response = jsonify(**resp_data)
response.status_code = 200
return response
else:
message = ("Environment with name {env_name} does not exist").format(env_name=env_name)
fmlogging.debug(message)
resp_data = {'error': message}
response = jsonify(**resp_data)
response.status_code = 404
return response
else:
all_resources = res_db.Resource().get_all()
resp_data['data'] = [res_db.Resource.to_json(res) for res in all_resources]
response = jsonify(**resp_data)
response.status_code = 200
return response
def get(self, resource_id):
fmlogging.debug("Received GET request for resource %s" % resource_id)
resp_data = {}
response = jsonify(**resp_data)
resource = res_db.Resource().get(resource_id)
if resource:
resp_data['data'] = res_db.Resource.to_json(resource)
response = jsonify(**resp_data)
response.status_code = 200
else:
response.status_code = 404
return response
def add_mocks_to_method(self, resource_method):
"""Resource method decorator which supplies a mock
response if a client specifies `mock=True`.
Please see `osp_api.mocks`. A naming convention is used
to automatically find and use the appropriate mock as a
response.
Warning:
Mocks bypass any authentication (and possibly
other decorators, too!).
"""
resource_class_name, resource_type = self.resource_method_info(
resource_method,
)
if resource_type == ResourceType.collection:
index_before_suffix = -10
mock_suffix = '_collection'
elif resource_type == ResourceType.singleton:
index_before_suffix = -9
mock_suffix = '_singleton'
# Get the "resource name," e.g., "TopWorksCollection" becomes
# "top-works_collection."
resource_name = ''
for i, letter in enumerate(resource_class_name[:index_before_suffix]):
if i == 0:
resource_name += letter.lower()
elif letter.isupper():
resource_name += '-' + letter.lower()
else:
resource_name += letter
http_method = resource_method.__name__
mock_name = resource_name + mock_suffix + '_' + http_method
@wraps(resource_method)
def wrapped_resource_method(*args, **kwargs):
"""Replace the existing method with this one."""
if bool(flask.request.args.get('mock')):
mock_response = utils.load_json_mock(mock_name)
return mock_response
else:
return resource_method(*args, **kwargs)
return wrapped_resource_method
# FIXME: eventually this will be redone to tell ElasticSearch to use
# an index consisting of records a year of age or older. The age is
# measured by class year (when it was taught in a syllabus) and NOT
# simply when it was added to our database.
def DeleteComposedSystem(ident):
rb = g.rest_base
resource_ids={'Processors':[],'Memory':[],'SimpleStorage':[],'EthernetInterfaces':[]}
# Verify if the System exists and if is of type - "SystemType": "Composed"
if ident in members:
if members[ident]['SystemType'] == 'Composed':
# Remove Links to Composed System and change CompositionState (to 'Unused') in associated Resource Blocks
for block in members[ident]['Links']['ResourceBlocks']:
block = block['@odata.id'].replace(rb + 'CompositionService/ResourceBlocks/','')
resource_blocks[block]['Links']['ComputerSystems']
for index, item in enumerate(resource_blocks[block]['Links']['ComputerSystems']):
if resource_blocks[block]['Links']['ComputerSystems'][index]['@odata.id'].replace(rb + 'Systems/','') == ident:
del resource_blocks[block]['Links']['ComputerSystems'][index]
resource_blocks[block]['CompositionStatus']['CompositionState'] = 'Unused'
for device_type in resource_ids.keys():
for device in resource_blocks[block][device_type]:
resource_ids[device_type].append(device)
# Remove links to Processors, Memory, SimpleStorage, etc
for device_type in resource_ids.keys():
for device in resource_ids[device_type]:
if device_type == 'Processors':
device_back = device['@odata.id'].replace(rb + 'CompositionService/ResourceBlocks/','')
del processors[ident][device_back]
if processors[ident]=={}: del processors[ident]
elif device_type == 'Memory':
device_back = device['@odata.id'].replace(rb + 'CompositionService/ResourceBlocks/','')
del memory[ident][device_back]
if memory[ident]=={}: del memory[ident]
elif device_type == 'SimpleStorage':
device_back = device['@odata.id'].replace(rb + 'CompositionService/ResourceBlocks/','')
del simplestorage[ident][device_back]
if simplestorage[ident]=={}: del simplestorage[ident]
elif device_type == 'EthernetInterfaces':
device_back = device['@odata.id'].replace(rb + 'CompositionService/ResourceBlocks/','')
del ethernetinterfaces[ident][device_back]
if ethernetinterfaces[ident]=={}: del ethernetinterfaces[ident]
# Remove Composed System from System list
del members[ident]
resp = 200
else:
# It is not a Composed System and therefore cannot be deleted as such"
return INTERNAL_ERROR
#
return resp