def put(self, group_id):
logging.debug("API CALL: %s PUT" % str(self.__class__.__name__))
try:
request_dict = json.loads(request.data).get("port_pair_group")
port_pair_group = self.api.compute.find_port_pair_group_by_name_or_id(group_id)
if "name" in request_dict:
port_pair_group.name = request_dict["name"]
if "description" in request_dict:
port_pair_group.description = request_dict["description"]
if "port_pairs" in request_dict:
port_pair_group.port_pairs = request_dict["port_pairs"]
resp = {
"port_pair_group": port_pair_group.create_dict(self.api.compute)
}
return Response(json.dumps(resp), status=200, mimetype='application/json')
except Exception as ex:
logging.exception("Neutron SFC: %s Exception." % str(self.__class__.__name__))
return Response(ex.message, status=500, mimetype='application/json')
python类data()的实例源码
def put(self, flow_classifier_id):
logging.debug("API CALL: %s PUT" % str(self.__class__.__name__))
try:
request_dict = json.loads(request.data).get("flow_classifier")
flow_classifier = self.api.compute.find_flow_classifier_by_name_or_id(flow_classifier_id)
if "name" in request_dict:
flow_classifier.name = request_dict["name"]
if "description" in request_dict:
flow_classifier.description = request_dict["description"]
resp = {
"flow_classifier": flow_classifier.create_dict(self.api.compute)
}
return Response(json.dumps(resp), status=200, mimetype='application/json')
except Exception as ex:
logging.exception("Neutron SFC: %s Exception." % str(self.__class__.__name__))
return Response(ex.message, status=500, mimetype='application/json')
def post(self):
logging.debug("API CALL: %s POST" % str(self.__class__.__name__))
try:
request_dict = json.loads(request.data).get("port_chain")
port_chain = self.api.compute.create_port_chain(request_dict["name"])
port_chain.port_pair_groups = request_dict["port_pair_groups"]
if "description" in request_dict:
port_chain.description = request_dict["description"]
if "flow_classifiers" in request_dict:
port_chain.flow_classifiers = request_dict["flow_classifiers"]
if "chain_parameters" in request_dict:
port_chain.chain_parameters = request_dict["chain_parameters"]
port_chain.install(self.api.compute)
resp = {
"port_chain": port_chain.create_dict(self.api.compute)
}
return Response(json.dumps(resp), status=201, mimetype='application/json')
except Exception as ex:
logging.exception("Neutron SFC: %s Exception." % str(self.__class__.__name__))
return Response(ex.message, status=500, mimetype='application/json')
def post(self):
"""
Creates a network with the name, specified within the request under ['network']['name'].
:return: * 400, if the network already exists.
* 500, if any exception occurred while creation.
* 201, if everything worked out.
:rtype: :class:`flask.response`
"""
LOG.debug("API CALL: %s POST" % str(self.__class__.__name__))
try:
network_dict = json.loads(request.data)
name = network_dict['network']['name']
net = self.api.compute.find_network_by_name_or_id(name)
if net is not None:
return Response('Network already exists.\n', status=400, mimetype='application/json')
net = self.api.compute.create_network(name)
return Response(json.dumps({"network": net.create_network_dict()}), status=201, mimetype='application/json')
except Exception as ex:
LOG.exception("Neutron: Create network excepiton.")
return Response(ex.message, status=500, mimetype='application/json')
def post(self, id):
LOG.debug("API CALL: %s POST" % str(self.__class__.__name__))
data = json.loads(request.data).get("flavor")
LOG.warning("Create Flavor: %s" % str(data))
# add to internal dict
f = self.api.compute.add_flavor(
data.get("name"),
data.get("vcpus"),
data.get("ram"), "MB",
data.get("disk"), "GB")
# create response based on incoming data
data["id"] = f.id
data["links"] = [{'href': "http://%s:%d/v2.1/%s/flavors/%s" % (get_host(request),
self.api.port,
id,
f.id)}]
resp = {"flavor": data}
return Response(json.dumps(resp), status=200, mimetype="application/json")
def post(self, id):
LOG.debug("API CALL: %s POST" % str(self.__class__.__name__))
data = json.loads(request.data).get("flavor")
LOG.warning("Create Flavor: %s" % str(data))
# add to internal dict
f = self.api.compute.add_flavor(
data.get("name"),
data.get("vcpus"),
data.get("ram"), "MB",
data.get("disk"), "GB")
# create response based on incoming data
data["id"] = f.id
data["links"] = [{'href': "http://%s:%d/v2.1/%s/flavors/%s" % (get_host(request),
self.api.port,
id,
f.id)}]
resp = {"flavor": data}
return Response(json.dumps(resp), status=200, mimetype="application/json")
def post(self, id=None):
try:
if id:
raise NotFoundError() # 404
if app.service.enforcesAccess and not app.acl.get_permission(app.acl.get_user()).x:
raise UnauthorizedError()
objs = json.loads(request.data)
if not isinstance(objs, list):
raise ParseError()
pids = app.db.get_service().providesCollectionPids
if pids:
objs = [CollectionObject(**obj.update({'id': app.mint.get_id(CollectionObject)})) if isinstance(obj, dict) else obj for obj in objs]
if app.db.ask_collection([obj.id for obj in objs]):
raise ConflictError()
app.db.set_collection(objs)
return jsonify(objs), 201
except (NotFoundError, DBError, UnauthorizedError, ConflictError):
raise
except:
raise ParseError() # 400
def put(self, id=None):
try:
if not id:
raise NotFoundError()
if app.service.enforcesAccess and not app.acl.get_permission(app.acl.get_user(),id).w:
raise UnauthorizedError()
c_obj = json.loads(request.data)
if c_obj.id != id:
raise ParseError()
d_obj = app.db.get_collection(id)
# todo: implement propertiesAreMutable check
return jsonify(app.db.set_collection(c_obj)), 200
except (NotFoundError, DBError, UnauthorizedError):
raise
except:
raise ParseError() # 400
def train_svm():
data = json.loads(request.data)
detections = data.get('detections', None)
image = data.get('image', None)
positive_crop = data.get('positive_crop', None)
use_dense_sift = data.get('use_dense_sift', False)
clustering = data.get('clustering', 'kmeans')
augment_data = data.get('augment_data', True)
if not (detections and image and positive_crop):
return json.dumps({'error': 'Parameters error'})
image = imread_from_base64(image)
positive_crop = imread_from_base64(positive_crop)
SVM_MODEL = train_exemplar_svm_on_sift_features(
image, positive_crop,
detections[0], detections[1],
dense_sift=use_dense_sift,
clustering=clustering,
augment_data=augment_data
)
return json.dumps({'result': 'Success'})
def post(self):
"""Facebook's API webhook."""
print("Webhook request data: " + request.data)
data = request.get_json()
entry = data['entry'][0]
messaging_events = entry['messaging']
for event in messaging_events:
sender_id = event['sender']['id']
if 'message' in event:
response_body = webhook.handle_message(sender_id, event['message'])
if response_body:
self.api.post("/me/messages", response_body)
elif 'postback' in event:
response_callbacks = webhook.handle_postback(sender_id, event['postback'])
for response_callback in response_callbacks:
self.api.post("/me/messages", response_callback)
return "OK"
def hello():
print request.data
if request.method == 'GET':
return request.args.get('hub.challenge')
data = request.get_json()
if data["object"] == "page":
for entry in data["entry"]:
for messaging_event in entry["messaging"]:
if messaging_event.get("postback"):
sender_id = messaging_event["sender"]["id"]
payload = messaging_event["postback"]["payload"]
send_weburl(payload, sender_id)
if messaging_event.get("message"): # readers send us a message
sender_id = messaging_event["sender"]["id"]
send_postback(sender_id)
return "ok", 200
def feature_request_add():
data = request.data
data_dict = json.loads(data)
feature_requests_dict = {}
feature_request = FeatureRequest(test=test_mode)
feature_request.title = data_dict['title']
feature_request.description = data_dict['description']
feature_request.client_name = data_dict['client_name']
feature_request.client_priority = data_dict['client_priority']
feature_request.target_date = data_dict['target_date']
feature_request.product_area = data_dict['product_area']
feature_request.agent_name = data_dict['agent_name']
feature_request.save()
return jsonify({
'status': 'success',
'message': 'Feature request added',
'feature_request': feature_request.to_dict()
})
# Update existing feature request
def home():
valid = (request.data).decode()
print(valid.__len__())
print('HELOO NEW VISITOR:')
if valid.__len__() != 0:
#Place for Your PowerShell
html = shell_generator()
print('return: \n', html)
print('Return powershell')
else:
html = 'error'
print('Return error')
return html
def login():
try:
res = json.loads(request.data) or None
if not res:
return custResponse(400, "data required for update")
postEmail = res.get("emailAddress")
postPass = res.get("password")
if not utils.isValidEmail(postEmail):
return custResponse(400, "Invalid email address.")
if not utils.isValidPass(postPass):
return custResponse(400, "Invalid password. Must be at least " + configFile["Security"]["minPassLength"] + " characters long.")
accId = accFunctions.signin(postEmail, postPass)
if not accId:
return custResponse(400, "Login failed. Incorrect email or password")
accData = accFunctions.getAccData(accId)
accData["authToken"] = accFunctions.genToken(accId)
return custResponse(200, "Login Successful", accData)
except Exception as e:
if app.config["DEBUG"] == True:
print("*-*-*-*")
print(e)
return custResponse(500,{"Err": str(e)})
else:
return custResponse(500, "An unknown error occured.")
def custResponse(code=404, message="Error: Not Found", data=None):
message = {
"status": code,
"message": message
}
if data:
for k, v in data.items():
message[k] = v
resp = jsonify(message)
resp.status_code = code
return resp
def dbconnect():
with open('DBdata.json') as data_file:
data = json.load(data_file)
ssl = data.get('ssl')
username = data.get('username')
password = data.get('password')
host = data.get('host')
port = data.get('port')
data_file.close()
if ssl is False:
ssl = 'http://'
else:
ssl = 'https://'
couch = couchdb.Server(ssl + str(username) + ':' + str(password) + '@' + str(host) +':' + str(port) + '/')
return(couch)
def CheckinUpdate(IP, Port, TeamID):
'''Takes in data from Player_Checkin and udates the database.'''
key = subcipher.generate_key()
Token = RandomToken(64)
encrypted = subcipher.encrypt(key, Token)
return(Token, key)
def ServiceScore(TeamID, Token):
'''takes in data from STS, checks token for validity and increases the teams score if valid.'''
return()
def gathering_body(data_type):
"""
??request.data ??post?json??
:param data_type:
:type data_type:
:return:
:rtype:
"""
return data_type.from_json_dict(request.get_json())
def webhook():
webhook_helper.check_signature(
request.headers['X-Hub-Signature'],
request.data)
logging.info('Delivery: {}'.format(
request.headers.get('X-GitHub-Delivery')))
result = webhook_helper.process(request)
return jsonify(result)