python类data()的实例源码

api.py 文件源码 项目:HBCTF 作者: osteth 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def Player_Checkin():
    '''Accepts json formatted request containing the teams TeamID, IP, and Port and pushes the info over to CheckinUpdate to be stored in the DB
    json data should be in format {"TeamID": "Mudkips","IP": "192.168.0.1","Port": "5001"}
    Example: 
    curl -H "Content-Type: application/json" -X POST -d '{"TeamID": "Mudkips","IP": "192.168.0.1","Port": "5001"}' http://localhost:5001/checkin/
    '''
    try:
        checkin_resp = request.data
        IP = checkin_resp.get('IP')
        Port  = checkin_resp.get('Port')
        TeamID  = checkin_resp.get('TeamID')
        if IP and Port and TeamID is not None:
            Token, key = CheckinUpdate(IP, Port, TeamID)
            return {'Score Token': Token,
                    'Key': key}
        else:
            return {'Invalid': 'request'}
    except:
        return {'Invalid': 'request'}
api.py 文件源码 项目:HBCTF 作者: osteth 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def STS():
    '''Accepts players score submitions and passes its info over to ServiceScore funtion to validate the token and then update the teams score when its valid.
    json data should be in format {"TeamID": "Highlander","Token": "therecanbeonlyone"}
    Example: 
    curl -H "Content-Type: application/json" -X POST -d '{"TeamID": "Highlander","Token": "therecanbeonlyone"}' http://localhost:5001/ScoreTokentSubmit/
    '''
    try:
        sts_resp = request.data
        Token = sts_resp.get('Token')
        TeamID  = sts_resp.get('TeamID')
        if Token and TeamID is not None:
            print('Team ' + TeamID + 'has scored service points!')
            ServiceScore(TeamID, Token)
            return {'request data': request.data}
        else:
            return {'Invalid': 'request'}
    except:
        return {'Invalid': 'request'}
api_base.py 文件源码 项目:FRG-Crowdsourcing 作者: 97amarnathk 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def post(self):
        """Post an item to the DB with the request.data JSON object.

        :arg self: The class of the object to be inserted
        :returns: The JSON item stored in the DB

        """
        try:
            self.valid_args()
            data = self._file_upload(request)
            if data is None:
                data = json.loads(request.data)
            self._forbidden_attributes(data)
            inst = self._create_instance_from_request(data)
            repo = repos[self.__class__.__name__]['repo']
            save_func = repos[self.__class__.__name__]['save']
            getattr(repo, save_func)(inst)
            self._log_changes(None, inst)
            return json.dumps(inst.dictize())
        except Exception as e:
            return error.format_exception(
                e,
                target=self.__class__.__name__.lower(),
                action='POST')
api_base.py 文件源码 项目:FRG-Crowdsourcing 作者: 97amarnathk 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def _update_instance(self, oid):
        repo = repos[self.__class__.__name__]['repo']
        query_func = repos[self.__class__.__name__]['get']
        existing = getattr(repo, query_func)(oid)
        if existing is None:
            raise NotFound
        ensure_authorized_to('update', existing)
        data = json.loads(request.data)
        self._forbidden_attributes(data)
        # Remove hateoas links
        data = self.hateoas.remove_links(data)
        # may be missing the id as we allow partial updates
        data['id'] = oid
        self.__class__(**data)
        old = self.__class__(**existing.dictize())
        for key in data:
            setattr(existing, key, data[key])
        self._update_attribute(existing, old)
        update_func = repos[self.__class__.__name__]['update']
        self._validate_instance(existing)
        getattr(repo, update_func)(existing)
        self._log_changes(old, existing)
        return existing
start_pokealarm.py 文件源码 项目:PokeAlarm 作者: PokeAlarm 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def accept_webhook():
    try:
        log.debug("POST request received from {}.".format(request.remote_addr))
        data = json.loads(request.data)
        if type(data) == dict: # older webhook style
            data_queue.put(data)
        else:   # For RM's frame
            for frame in data:
                data_queue.put(frame)
    except Exception as e:
        log.error("Encountered error while receiving webhook ({}: {})".format(type(e).__name__, e))
        abort(400)
    return "OK"  # request ok


# Thread used to distribute the data into various processes (for RocketMap format)
frontend.py 文件源码 项目:gke-gobang-app-example 作者: GoogleCloudPlatform 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def put_stone_api(id):
    request_body = json.loads(request.data)
    try:
        x, y = int(request_body['x']), int(request_body['y'])
        player = int(request_body['player'])
        auto = int(request_body['auto'])
    except:
        response = jsonify({'result': 'error'})
        response.status_code = 500
        return response

    result = _put_stone(id, x, y, player, auto)
    if result == -1:
        response = jsonify({'result': 'error'})
        response.status_code = 500
    elif result == 1:
        response = jsonify({'result': 'win'})
        response.status_code = 200
    elif result == 99:
        response = jsonify({'result': 'end'})
        response.status_code = 200
    else:
        response = jsonify({'result': 'next'})
        response.status_code = 200
    return response
pyldn.py 文件源码 项目:pyldn 作者: albertmeronyo 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def get_inbox():
    pyldnlog.debug("Requested inbox data of {} in {}".format(request.url, request.headers['Accept']))
    if not request.headers['Accept'] or request.headers['Accept'] == '*/*' or 'text/html' in request.headers['Accept']:
        resp = make_response(inbox_graph.serialize(format='application/ld+json'))
        resp.headers['Content-Type'] = 'application/ld+json'
    elif request.headers['Accept'] in ACCEPTED_TYPES:
        resp = make_response(inbox_graph.serialize(format=request.headers['Accept']))
        resp.headers['Content-Type'] = request.headers['Accept']
    else:
        return 'Requested format unavailable', 415

    resp.headers['X-Powered-By'] = 'https://github.com/albertmeronyo/pyldn'
    resp.headers['Allow'] = "GET, HEAD, OPTIONS, POST"
    resp.headers['Link'] = '<http://www.w3.org/ns/ldp#Resource>; rel="type", <http://www.w3.org/ns/ldp#RDFSource>; rel="type", <http://www.w3.org/ns/ldp#Container>; rel="type", <http://www.w3.org/ns/ldp#BasicContainer>; rel="type"'
    resp.headers['Accept-Post'] = 'application/ld+json, text/turtle'

    return resp
pyldn.py 文件源码 项目:pyldn 作者: albertmeronyo 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def get_notification(id):
    pyldnlog.debug("Requested notification data of {}".format(request.url))
    pyldnlog.debug("Headers: {}".format(request.headers))

    # Check if the named graph exists
    pyldnlog.debug("Dict key is {}".format(pyldnconf._inbox_url + id))
    if pyldnconf._inbox_url + id not in graphs:
        return 'Requested notification does not exist', 404

    if 'Accept' not in request.headers or request.headers['Accept'] == '*/*' or 'text/html' in request.headers['Accept']:
        resp = make_response(graphs[pyldnconf._inbox_url + id].serialize(format='application/ld+json'))
        resp.headers['Content-Type'] = 'application/ld+json'
    elif request.headers['Accept'] in ACCEPTED_TYPES:
        resp = make_response(graphs[pyldnconf._inbox_url + id].serialize(format=request.headers['Accept']))
        resp.headers['Content-Type'] = request.headers['Accept']
    else:
        return 'Requested format unavailable', 415

    resp.headers['X-Powered-By'] = 'https://github.com/albertmeronyo/pyldn'
    resp.headers['Allow'] = "GET"

    return resp
tracking.py 文件源码 项目:drift 作者: dgnorth 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def register_extension(app):
    @app.before_request
    def add_correlation_id(*args, **kw):
        correlation_id = request.headers.get(CORRELATION_ID)
        log.debug("%s %s", request.method, request.url)
        if not correlation_id:
            correlation_id = str(uuid.uuid4())
            if request.method != "GET":
                """
                TODO: remove sensitive information such as username/password
                """
                log.debug({
                    "message": "Tracking request",
                    "correlation_id": correlation_id,
                    "method": request.method,
                    "uri": request.url,
                    "data": request.data,
                })
        request.correlation_id = correlation_id

    @app.after_request
    def save_correlation_id(response):
        if CORRELATION_ID not in response.headers:
            response.headers[CORRELATION_ID] = getattr(request, "correlation_id", None)
        return response
quit.py 文件源码 项目:QuitStore 作者: AKSW 项目源码 文件源码 阅读 77 收藏 0 点赞 0 评论 0
def checkrequest(request):
    """Analyze RDF data contained in a POST request.

    Args:
        request: A Flask HTTP Request.
    Returns:
        data: A list with RDFLib.quads object and the rdflib.ConjunciveGraph object
    Raises:
        Exception: I contained data is not valid nquads.

    """
    data = []
    reqdata = request.data
    graph = ConjunctiveGraph()

    try:
        graph.parse(data=reqdata, format='nquads')
    except Exception as e:
        raise e

    quads = graph.quads((None, None, None, None))
    data = splitinformation(quads, graph)

    return data
instance.py 文件源码 项目:eagle 作者: saga92 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def stop_instance():
    res = {}
    if request.method == 'POST':
        req_data = json.loads(request.data)
        db_session = db.Session()
        instance_query_result = db_session.query(Instance).filter(\
            Instance.container_serial == req_data['container_serial']).first()
        if instance_query_result is not None:
            policy = {}
            policy['operate'] = app.config['STOP']
            policy['container_serial'] = req_data['container_serial']
            policy['container_name'] = instance_query_result.container_name
            policy['user_name'] = req_data['user_name']
            message = json.dumps(policy)
            ui_mq = UiQueue()
            worker_res = ui_mq.send(message)
            worker_res_dict = json.loads(worker_res)
            res['code'] = worker_res_dict['code']
            res['message'] = worker_res_dict['message']
            res['container_serial'] = worker_res_dict['container_serial']
            eagle_logger.info(res['message'])
        else:
            res['code'] = '0x9'
            res['message'] = 'container not exist'
    return jsonify(**res)
instance.py 文件源码 项目:eagle 作者: saga92 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def restart_instance():
    res = {}
    if request.method == 'POST':
        req_data = json.loads(request.data)
        db_session = db.Session()
        instance_query_result = db_session.query(Instance).filter(\
            Instance.container_serial == req_data['container_serial']).first()
        if instance_query_result is not None:
            policy = {}
            policy['operate'] = app.config['RESTART']
            policy['container_serial'] = req_data['container_serial']
            policy['container_name'] = instance_query_result.container_name
            policy['user_name'] = req_data['user_name']
            message = json.dumps(policy)
            ui_mq = UiQueue()
            worker_res = ui_mq.send(message)
            worker_res_dict = json.loads(worker_res)
            res = worker_res_dict
            eagle_logger.info(res['message'])
        else:
            res['code'] = '0x9'
            res['message'] = 'container not exist'
    return jsonify(**res)
instance.py 文件源码 项目:eagle 作者: saga92 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def remove_instance():
    res = {}
    if request.method == 'POST':
        req_data = json.loads(request.data)
        db_session = db.Session()
        instance_query_result = db_session.query(Instance).filter(\
            Instance.container_serial == req_data['container_serial']).first()
        if instance_query_result is not None:
            policy = {}
            policy['operate'] = app.config['REMOVE']
            policy['container_serial'] = req_data['container_serial']
            policy['user_name'] = req_data['user_name']
            message = json.dumps(policy)
            ui_mq = UiQueue()
            worker_res = ui_mq.send(message)
            worker_res_dict = json.loads(worker_res)
            res['code'] = worker_res_dict['code']
            res['message'] = worker_res_dict['message']
            res['container_serial'] = worker_res_dict['container_serial']
            eagle_logger.info(res['message'])
        else:
            res['code'] = '0x9'
            res['message'] = 'container not exist'
    return jsonify(**res)
endpoints.py 文件源码 项目:lite-python 作者: Paybook 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def login():
    try:
        # Log call and get params:
        logger = logging.getLogger('app')
        logger.debug('\n/login')
        params = json.loads(request.data)
        logger.debug(params)
        username = params['username']
        password = params['password']
        logger.debug('Executing ... ')
        # Data base and paybook logic:
        db_user = _DB.User(username,password)
        logger.debug('DB authentication ... ')
        if db_user.login():
            id_user = db_user.get_id_user()
            logger.debug('Id user: ' + str(id_user))
            pb_user = paybook_sdk.User(id_user=id_user)
            session = paybook_sdk.Session(user=pb_user)
            login_response = _Utilities.Success(session.get_json()).get_response()
        else:
            login_response = _Utilities.Error('Invalid username or password',400).get_response()
    except paybook_sdk.Error as error:
        login_response = error.get_json()
    return login_response
endpoints.py 文件源码 项目:lite-python 作者: Paybook 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def credentials():
    try:
        # Log call and get params:
        logger = logging.getLogger('app')
        logger.debug('\n/credentials')
        params = json.loads(request.data)
        logger.debug(params)
        token = params['token']
        id_site = params['id_site'] if 'id_site' in params else None
        credentials = params['credentials']
        logger.debug('Executing ... ')
        # Paybook logic:
        session = paybook_sdk.Session(token=token)
        logger.debug('Creating credentials ... ')
        logger.debug(credentials)
        credentials = paybook_sdk.Credentials(session=session,id_site=id_site,credentials=credentials)
        logger.debug('Id credential: ' + credentials.id_credential)
        logger.debug('Twofa: ' + credentials.twofa)
        logger.debug("curl -X POST -d '{\"twofa_key\":\"token\",\"token\":\"4f3e7fe982f18898f970e4805b4774a3\",\"twofa\":\"" + credentials.twofa + "\",\"twofa_value\":\"test\",\"id_credential\":\"" + credentials.id_credential + "\"}' http://localhost:5000/twofa --header \"Content-Type:application/json\"")
        logger.debug('Sending response ... ')
        credentials_response = _Utilities.Success(credentials.get_json()).get_response()
    except paybook_sdk.Error as error:
        credentials_response = _Utilities.Error(error.message,error.code).get_response()
    return credentials_response
endpoints.py 文件源码 项目:lite-python 作者: Paybook 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def delete_credentials():
    try:
        logger = logging.getLogger('app')
        logger.debug('\n/credentials')
        params = json.loads(request.data)
        logger.debug(params)
        token = params['token']
        id_credential = params['id_credential']
        logger.debug('Executing ... ')
        session = paybook_sdk.Session(token=token)
        credentials_deleted = paybook_sdk.Credentials.delete(session=session,id_credential=id_credential)
        logger.debug('Sending response ... ')
        delete_response = _Utilities.Success(credentials_deleted).get_response()
    except paybook_sdk.Error as error:
        delete_response = _Utilities.Error(error.message,error.code).get_response()
    return delete_response
payments.py 文件源码 项目:payments 作者: devops-s17-payments 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def update_payments(id):
    try:
        if not request.is_json:
            raise DataValidationError('Invalid payment: Content Type is not json')
        data = request.get_json(silent=True)
        message = payment_service.update_payment(id,payment_replacement=data)
        rc = status.HTTP_200_OK
    except PaymentNotFoundError as e:
        message = e.message
        rc = status.HTTP_404_NOT_FOUND
    except DataValidationError as e:
        message = e.message
        rc = status.HTTP_400_BAD_REQUEST
    return make_response(jsonify(message), rc)

######################################################################
# UPDATE AN EXISTING PAYMENT PARTIALLY
######################################################################
payments.py 文件源码 项目:payments 作者: devops-s17-payments 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def update_partial_payments(id):
    try:
        if not request.is_json:
            raise DataValidationError('Invalid payment: Content Type is not json')
        data = request.get_json(silent=True)
        message = payment_service.update_payment(id,payment_attributes=data)
        rc = status.HTTP_200_OK
    except PaymentNotFoundError as e:
        message = e.message
        rc = status.HTTP_404_NOT_FOUND
    except DataValidationError as e:
        message = e.message
        rc = status.HTTP_400_BAD_REQUEST
    return make_response(jsonify(message), rc)


######################################################################
# DELETE A PAYMENT
######################################################################
renderers.py 文件源码 项目:microservices 作者: viatoriche 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def render(self):
        if self.check_ignore_method():
            return self.data  # pragma: no cover
        response = {}
        self.update_by_name(response)
        self.update_by_info(response)
        self.update_by_request(response)
        self.update_by_status(response)
        self.update_by_status_code(response)
        self.update_by_headers(response)
        self.update_by_resource(response)
        self.update_by_methods(response)
        self.update_by_resources(response)
        self.update_by_update(response)
        self.update_by_data(response)
        return response
web.py 文件源码 项目:Monzo-Meter 作者: d-Rickyy-b 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def catch():
    j = json.loads(request.data)
    data = j['data']
    if mode == VARIABLE_MAX:
        if data['is_load']:
            # If you put money onto your account, save the amount of money as peak
            r.set("peak", int(data['amount']))
            r.set("balance", int(getRedisValue("balance")) + int(data['amount']))
        else:
            # If money is withdrawn OR in case of refunds or chargebacks the peak won't be set
            r.set("balance", int(getRedisValue("balance")) + int(data['amount']))

            if int(data['amount']) > int(getRedisValue("peak")):
                # Only if the current balance is greater than the saved peak, save it as peak
                r.set("peak", int(data['amount']))
    else:
        r.set("balance", int(getRedisValue("balance")) + int(data['amount']))
        r.set("peak", max_bal)

    notify_particle()
    return "{} | {}".format(r.get("balance"), r.get("peak"))
web.py 文件源码 项目:Monzo-Meter 作者: d-Rickyy-b 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def notify_particle():
    # The particle device gets notified about changes here
    peak_v = float(getRedisValue("peak"))
    balance_v = float(getRedisValue("balance"))

    # If the balance exceeds the peak/below 0, then the servo/needle might break.
    # Because of this, the angle's value gets checked before sending to particle.io
    if balance_v > peak_v:
        balance_v = peak_v
    elif balance_v < 0:
        balance_v = 0

    # Prevent division by zero
    if peak_v <= 0:
        peak_v = 1

    angle_v = angle(peak_v, balance_v)
    data = {"access_token": particle_token, "arg": angle_v}
    requests.post("https://api.particle.io/v1/devices/{}/gotoPos".format(device_id), data=data)
    return angle_v
api.py 文件源码 项目:bitwrap-io 作者: bitwrap 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def post(self):
        """ handle rpc calls """
        res = {}
        req = {}

        try:
            if request.data == '':
              payload = request.form.get('json')
            else:
              payload = request.data

            req = json.loads(payload)
            res['id'] = req.get('id')
            res['result'] = call(req['method'], req['params'])
            res['error'] = None
        except Exception as (ex):
            res = { 'id': req.get('id'), 'error': ex.message }

        return res, 200, None
decorators.py 文件源码 项目:heroku-python-boilerplate 作者: chavli 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def validate_jpeg_binary(func):
    """ checks the mimetype and the binary data to ensure it's a JPEG """
    @wraps(func)
    def wrapper(*args, **kwargs):
        if request.content_type != "image/jpeg":
            return ErrorResponseJson("invalid content type: {}".format(request.content_type)).make_response()
        if imghdr.test_jpeg(request.data, None) != "jpeg":
            return ErrorResponseJson("invalid jpeg data").make_response()
        return func(*args, **kwargs)
    return wrapper
request_handler.py 文件源码 项目:cc-server 作者: curious-containers 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def validation(schema):
    """function decorator"""
    def dec(func):
        def wrapper(self, *args, **kwargs):
            try:
                rawdata = request.data
                enc = chardet.detect(rawdata)
                data = rawdata.decode(enc['encoding'])
                json_input = json.loads(data)
                jsonschema.validate(json_input, schema)
                json_input = prepare_input(json_input)
            except:
                raise BadRequest('JSON input not valid: {}'.format(format_exc()))
            return func(self, json_input, *args, **kwargs)
        return wrapper
    return dec
view.py 文件源码 项目:pia 作者: soasme 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def builtin_jq():
    """
    Builtin program: `jq`.
    It will run a `jq` progress and return a json object.
    """
    program = request.args.get('program', ".")
    command = request.data
    try:
        data = jq(program, command)
        resp = make_response(data)
        resp.content_type = 'application/json'
        return resp
    except InvalidJQFilter as exception:
        return jsonify(message=str(exception)), 400
embedding_server.py 文件源码 项目:bitcointalk-sentiment 作者: DolphinBlockchainIntelligence 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def transform():
    if model is None:
        abort(500, 'Model is not initialized.')
    global maxSeqLength
    global numFeatures
    global model
    response = {}
    for item in request.data['texts']:
        try:
            id = item['id']
            vector_sequence = get_sequence_matrix(get_tokens(item['text']), maxSeqLength, numFeatures, model).tolist()
            response[id] = vector_sequence
        except KeyError as e:
            abort(400, 'Wrong JSON format, key %s' % e)
        except Exception as e:
            abort(500, 'Internal server error: %s' % str(e))
        return jsonify(response)
dummygatekeeper.py 文件源码 项目:son-emu 作者: sonata-nfv 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _calculate_placement(self, algorithm):
        """
        Do placement by adding the a field "dc" to
        each VNFD that points to one of our
        data center objects known to the gatekeeper.
        """
        assert(len(self.vnfds) > 0)
        assert(len(GK.dcs) > 0)
        # instantiate algorithm an place
        p = algorithm()
        p.place(self.nsd, self.vnfds, self.saps, GK.dcs)
        LOG.info("Using placement algorithm: %r" % p.__class__.__name__)
        # lets print the placement result
        for name, vnfd in self.vnfds.iteritems():
            LOG.info("Placed VNF %r on DC %r" % (name, str(vnfd.get("dc"))))
        for sap in self.saps:
            sap_dict = self.saps[sap]
            LOG.info("Placed SAP %r on DC %r" % (sap, str(sap_dict.get("dc"))))
chain_api.py 文件源码 项目:son-emu 作者: sonata-nfv 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def put(self, src_vnf, src_intfs, dst_vnf, dst_intfs):
        """
        A put request to "/v1/chain/<src_vnf>/<src_intfs>/<dst_vnf>/<dst_intfs>"
        will create a chain between two interfaces at the specified vnfs.

        Note:
           Does not allow a custom path. Uses ``.post``
           Internally just makes a POST request with no POST data!

        :param src_vnf: Name of the source VNF
        :type src_vnf: ``str``
        :param src_intfs: Name of the source VNF interface to chain on
        :type src_intfs: ``str``
        :param dst_vnf: Name of the destination VNF
        :type dst_vnf: ``str``
        :param dst_intfs: Name of the destination VNF interface to chain on
        :type dst_intfs: ``str``
        :return: flask.Response 200 if set up correctly else 500 also returns the cookie as dict {'cookie': value}
         501 if one of the VNF / intfs does not exist
        :rtype: :class:`flask.Response`
        """
        return self.post(src_vnf, src_intfs, dst_vnf, dst_intfs)
neutron_sfc_dummy_api.py 文件源码 项目:son-emu 作者: sonata-nfv 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def post(self):
        logging.debug("API CALL: %s POST" % str(self.__class__.__name__))

        try:
            request_dict = json.loads(request.data).get("port_pair")
            name = request_dict["name"]

            ingress_port = self.api.compute.find_port_by_name_or_id(request_dict["ingress"])
            egress_port = self.api.compute.find_port_by_name_or_id(request_dict["egress"])

            port_pair = self.api.compute.create_port_pair(name)
            port_pair.ingress = ingress_port
            port_pair.egress = egress_port
            if "description" in request_dict:
                port_pair.description = request_dict["description"]
            if "service_function_parameters" in request_dict:
                port_pair.service_function_parameters = request_dict["service_function_parameters"]

            resp = {
                "port_pair": port_pair.create_dict(self.api.compute)
            }
            return Response(json.dumps(resp), status=201, mimetype='application/json')
        except Exception as ex:
            logging.exception("Neutron SFC: %s Exception." % str(self.__class__.__name__))
            return Response(ex.message, status=500, mimetype='application/json')
neutron_sfc_dummy_api.py 文件源码 项目:son-emu 作者: sonata-nfv 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def post(self):
        logging.debug("API CALL: %s POST" % str(self.__class__.__name__))

        try:
            request_dict = json.loads(request.data).get("port_pair_group")

            port_pair_group = self.api.compute.create_port_pair_group(request_dict["name"])
            port_pair_group.port_pairs = request_dict["port_pairs"]
            if "description" in request_dict:
                port_pair_group.description = request_dict["description"]
            if "port_pair_group_parameters" in request_dict:
                port_pair_group.port_pair_group_parameters = request_dict["port_pair_group_parameters"]

            resp = {
                "port_pair_group": port_pair_group.create_dict(self.api.compute)
            }
            return Response(json.dumps(resp), status=201, mimetype='application/json')
        except Exception as ex:
            logging.exception("Neutron SFC: %s Exception." % str(self.__class__.__name__))
            return Response(ex.message, status=500, mimetype='application/json')


问题


面经


文章

微信
公众号

扫码关注公众号