python类get_json()的实例源码

__init__.py 文件源码 项目:fluffy 作者: m4ce 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def session_chain_add(session_name, table_name, chain_name):
    try:
        req = request.get_json()
        if req is None:
            req = {}
        fw.sessions[session_name].chains.add(
            name=chain_name, table=table_name, **req)
        return make_response(jsonify(message='Chain created'), status.HTTP_201_CREATED)
    except ChainExists as e:
        http_code = status.HTTP_409_CONFLICT
    except ChainNotValid as e:
        http_code = status.HTTP_400_BAD_REQUEST
    except Exception as e:
        http_code = status.HTTP_500_INTERNAL_SERVER_ERROR

    return make_response(jsonify(message='Failed to create chain', error=e.message), http_code)
users.py 文件源码 项目:Andy-Chat 作者: andriy-sa 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def create():
    form = UserCreateForm.from_json(request.get_json())

    if not form.validate():
        return jsonify(form.errors), 400

    user = User()
    user.email = form.data.get('email')
    user.first_name = form.data.get('first_name')
    user.last_name = form.data.get('last_name')
    user.avatar = form.data.get('avatar', None)
    user.password = User.make_password(form.data.get('password'))
    user.save()

    access_token = jwt.jwt_encode_callback(user)

    return jsonify({'user': user.serialize(), 'access_token': access_token.decode('utf-8')}), 200
auth.py 文件源码 项目:Andy-Chat 作者: andriy-sa 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def login():
    data = request.get_json()
    username = data.get(current_app.config.get('JWT_AUTH_USERNAME_KEY'), None)
    password = data.get(current_app.config.get('JWT_AUTH_PASSWORD_KEY'), None)
    criterion = [username, password, len(data) == 2]

    if not all(criterion):
        return jsonify({'message': 'Invalid credentials'}), 401

    user = jwt.authentication_callback(username, password)
    if user:
        if not user.is_active:
            return jsonify({'message': 'InActive User'}), 401

        access_token = jwt.jwt_encode_callback(user)

        return jsonify({'user': user.serialize(), 'access_token': access_token.decode('utf-8')}), 200
    else:
        return jsonify({'message': 'Invalid credentials'}), 401
app.py 文件源码 项目:microflack_messages 作者: miguelgrinberg 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def edit_message(id):
    """
    Modify an existing message.
    This endpoint is requires a valid user token.
    Note: users are only allowed to modify their own messages.
    """
    msg = Message.query.get_or_404(id)
    if msg.user_id != g.jwt_claims.get('user_id'):
        abort(403)
    msg.from_dict(request.get_json() or {})
    db.session.add(msg)
    db.session.commit()

    # render the markdown and expand the links in a background task
    if app.config['TESTING']:
        # for unit tests, render synchronously
        render_message(msg.id)
    else:
        # asynchronous rendering
        render_thread = threading.Thread(target=render_message,
                                         args=(msg.id,))
        render_thread.start()
    return '', 204
__init__.py 文件源码 项目:fluffy 作者: m4ce 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def session_interface_add(session_name, interface_name):
    try:
        req = request.get_json()
        if req is None:
            req = {}
        fw.sessions[session_name].interfaces.add(
            name=interface_name, **req)
        return make_response(jsonify(message='Interface created'), status.HTTP_201_CREATED)
    except InterfaceExists as e:
        http_code = status.HTTP_409_CONFLICT
    except InterfaceNotValid as e:
        http_code = status.HTTP_400_BAD_REQUEST
    except Exception as e:
        http_code = status.HTTP_500_INTERNAL_SERVER_ERROR

    return make_response(jsonify(message='Failed to create interface', error=e.message), http_code)
run.py 文件源码 项目:apiTest 作者: wuranxu 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def run_selected_case():
    # return jsonify(dict(name='selenium'))

    data = request.get_json()
    start = handle.now_str()
    # ???mongo??case??
    db = MongoClient()
    case_list = db.get_case_by_name(data.get('case_name'))
    obj = apiFunc()
    rt = obj.run_tests(case_list, app.config['THREAD_NUM'])
    report = obj.writeReport(rt)
    html = render_template('testResult.html',failNum=rt['failed_num'], ignored=rt['ignored'],
                           successNum=rt['success_num'], total=rt['total'], start=start,
                           end=handle.now_str(), cost="{:.2}?".format(handle.delta(start, handle.now_str())),
                           fileName=report)
    return jsonify(dict(html=html))
test_subscription_transport.py 文件源码 项目:graphql-python-subscriptions 作者: hballard 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def create_app(sub_mgr, schema, options):
    app = Flask(__name__)
    sockets = Sockets(app)

    app.app_protocol = lambda environ_path_info: 'graphql-subscriptions'

    app.add_url_rule(
        '/graphql',
        view_func=GraphQLView.as_view('graphql', schema=schema, graphiql=True))

    @app.route('/publish', methods=['POST'])
    def sub_mgr_publish():
        sub_mgr.publish(*request.get_json())
        return jsonify(request.get_json())

    @sockets.route('/socket')
    def socket_channel(websocket):
        subscription_server = SubscriptionServer(sub_mgr, websocket, **options)
        subscription_server.handle()
        return []

    return app
app.py 文件源码 项目:microflack_messages 作者: miguelgrinberg 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def new_message():
    """
    Post a new message.
    This endpoint is requires a valid user token.
    """
    msg = Message(user_id=g.jwt_claims['user_id'])
    msg.from_dict(request.get_json(), partial_update=False)
    msg.html = '...'
    db.session.add(msg)
    db.session.commit()
    r = jsonify(msg.to_dict())
    r.status_code = 201
    r.headers['Location'] = url_for('get_message', id=msg.id)

    # render the markdown and expand the links in a background task
    if app.config['TESTING']:
        # for unit tests, render synchronously
        render_message(msg.id)
    else:
        # asynchronous rendering
        render_thread = threading.Thread(target=render_message,
                                         args=(msg.id,))
        render_thread.start()
    return r
views.py 文件源码 项目:MoegirlUpdater 作者: kafuuchino 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def userlist():
    u=User()
    form=AddUserForm()
    flag=current_user.is_administrator(g.user)
    if flag is True:
        userlist=u.GetUserList()
        jsondata=request.get_json()
        if request.method == 'POST' and jsondata:
            if jsondata['action'] == u'edit':
                username=jsondata['username']
                location=url_for('.admin_edit_profile',username=username)
                return jsonify({"status":302,"location":location})
            else:
                username=jsondata['username']
                u.RemUser(username)
                return redirect('userlist')
        elif request.method == 'POST' and form.validate():
            pwd=u.GetPassword(g.user)
            if u.verify_password(form.oripassword.data):
                u.AddUser(form.username.data,form.password.data,form.role.data,form.email.data)
                return redirect('userlist')
        else:
            return render_template('userlist.html',userlist=userlist,form=form)
    else:
        abort(403)
apiServer.py 文件源码 项目:AutoTriageBot 作者: salesforce 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def sendMessage() -> str:
    """ Send a message (internal or external) to the HackerOne report identified by the given ID"""
    data = request.get_json(force=True)
    message = data['message']
    internal = data['internal']
    id = data['id']

    if config.DEBUG:
        print("/v1/sendMessage: id=%s, internal=%s" % (id, internal))
    if config.DEBUGVERBOSE:
        print("message=%s" % message)

    h1Data = {'data': {'type': 'activity-comment',
                       'attributes': {'message': message,
                                      'internal': internal}}}
    headers = {'Content-Type': 'application/json'}
    resp = requests.post('https://api.hackerone.com/v1/reports/%s/activities' % id,
                         headers=headers,
                         data=json.dumps(h1Data).encode('utf-8'),
                         auth=(config.apiName, secrets.apiToken))
    return json.dumps(resp.json())
apiServer.py 文件源码 项目:AutoTriageBot 作者: salesforce 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def changeStatus() -> str:
    """ Change the status of the report at the given ID to the given status """
    data = request.get_json(force=True)
    status = data['status']
    message = data['message']
    id = data['id']

    if config.DEBUG:
        print("/v1/changeStatus: id=%s, status=%s" % (id, status))
    if config.DEBUGVERBOSE:
        print("message=%s" % message)

    h1Data = {'data': {'type': 'state-change',
                       'attributes': {'message': message,
                                      'state': status}}}
    headers = {'Content-Type': 'application/json'}
    resp = requests.post('https://api.hackerone.com/v1/reports/%s/state_changes' % id,
                         headers=headers,
                         data=json.dumps(h1Data).encode('utf-8'),
                         auth=(config.apiName, secrets.apiToken))
    return json.dumps(resp.json())
apiServer.py 文件源码 项目:AutoTriageBot 作者: salesforce 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def getReportIDs() -> str:
    """ Get a list of report IDs created after the given time of openOnly is true, then only the IDs of open reports """
    data = request.get_json(force=True)
    startTime = parseTime(data['time'])
    openOnly = data['openOnly']

    if config.DEBUGVERBOSE:
        print("/v1/getReportIDs: time=%s, openOnly=%s" % (startTime.isoformat(), str(openOnly)))

    if openOnly:
        allIDs = []
        for state in ['new', 'triaged', 'needs-more-info']:
            url = "https://api.hackerone.com/v1/reports?filter[program][]=%s&page[size]=100&filter[state][]=%s" % \
                  (config.programName, state)
            ids = [report['id'] for report in getEndpointPaginated(url)
                   if parseTime(report['attributes']['created_at']) > startTime]
            allIDs.extend(ids)
        return json.dumps(allIDs)
    else:
        url = "https://api.hackerone.com/v1/reports?filter[program][]=%s&page[size]=100" % config.programName
        return json.dumps([report['id'] for report in getEndpointPaginated(url)
                           if parseTime(report['attributes']['created_at']) > startTime])
metadata.py 文件源码 项目:morpy 作者: iurykrieger96 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def post(self, meta_type):
        try:
            if meta_type == 'item':
                service = self.item_meta_service
                new_metadata = ItemMetadata(request.get_json(), version=1, active=True)
            elif meta_type == 'user':
                service = self.user_meta_service
                new_metadata = UserMetadata(request.get_json(), version=1, active=True)
            else:
                raise StatusCodeException('Invalid type', 400)

            if not service.get_active():
                service.insert(new_metadata.to_database())
                return make_response(new_metadata.to_json())
            else:
                raise StatusCodeException('Conflict', 409)
        except StatusCodeException as ex:
            return ex.to_response()
        except Exception as ex:
            return StatusCodeException(ex.message, 500).to_response()
schemachecker.py 文件源码 项目:drift 作者: dgnorth 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def simple_schema_request(request_schema_properties, required=None, json_object=None):
    def wrapper(fn):
        @wraps(fn)
        def decorated(*args, **kwargs):
            ob = json_object or request.get_json() or {}
            schema = {
                "type": "object",
                "additionalProperties": False,
                "properties": request_schema_properties,
            }

            # Excuse the chattyness here. Can it be made shorter?
            if required is None:
                required_fields = request_schema_properties.keys()
            else:
                required_fields = required
            if required_fields:
                schema["required"] = required_fields

            check_schema(ob, schema)
            return fn(*args, **kwargs)

        return decorated
    return wrapper
schemachecker.py 文件源码 项目:drift 作者: dgnorth 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def schema_request(media_type_name):
    def wrapper(fn):
        @wraps(fn)
        def decorated(*args, **kwargs):
            if isinstance(media_type_name, basestring):
                schema = get_schema_for_media_type(media_type_name)
            else:
                schema = media_type_name
            json_object = request.get_json()
            try:
                validate(json_object, schema)
            except ValidationError as e:
                report = generate_validation_error_report(e, json_object)
                abort(400, description=report)
            return fn(*args, **kwargs)

        return decorated
    return wrapper
book_room.py 文件源码 项目:roomfinder 作者: GuillaumeMorini 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def dispo():
    j = request.get_json()
    if "key" not in j:
        return "Sorry, no building where specified !"
    sys.stderr.write("key: "+str(j["key"])+"\n")
    key=str(j["key"])
    if "start" not in j:
        return dispo_building(key)
    start=str(j["start"])
    end=None
    if "end" in j:
        end=str(j["end"])
    else:
        end=(datetime.datetime.strptime(start, "%Y-%m-%dT%H:%M:%S" )+datetime.timedelta(hours=2)).isoformat()
    sys.stderr.write("start: "+str(start)+" end: "+str(end)+"\n")
    return dispo_building(key,datetime.datetime.strptime(start, "%Y-%m-%dT%H:%M:%S" ),datetime.datetime.strptime(end, "%Y-%m-%dT%H:%M:%S" ))
book_room.py 文件源码 项目:roomfinder 作者: GuillaumeMorini 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def where():
    j = request.get_json()
    if "room" not in j:
        return "Sorry, no room where specified !"
    room=str(j["room"])
    sys.stderr.write("room: "+room+"\n")
    rooms=findRooms(prefix=room,anywhere=True)
    sys.stderr.write("rooms: "+str(rooms)+"\n")
    # if len(rooms)==1:
    #     reply="Here is the full name of the room: \n * "+str(rooms.items()[0][1])
    # else:
    #     reply="Do you mean:\n"
    #     for r in rooms:
    #         reply+="* "+str(rooms[r])+"\n"
    reply=""
    if len(rooms)==0:
        return "Sorry, no room with this name found !"
    for r in rooms:
        reply+=str(rooms[r])+";"
    return reply[0:len(reply)-1]
audit.py 文件源码 项目:microcosm-flask 作者: globality-corp 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def capture_request(self):
        if not current_app.debug:
            # only capture request body on debug
            return

        if not self.options.include_request_body:
            # only capture request body if requested
            return

        if (
                request.content_length and
                self.options.include_request_body is not True and
                request.content_length >= self.options.include_request_body
        ):
            # don't capture request body if it's too large
            return

        if not request.get_json(force=True, silent=True):
            # only capture request body if json
            return

        self.request_body = request.get_json(force=True)
app.py 文件源码 项目:leaf-server 作者: jawolf17 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def add_freind(uid):
    #Get request data
    data = request.get_json(force=True)
    #Path to file
    db_file = "db/server.db"
    con = sqlite3.connect(db_file)
    with con:
        cur = con.cursor()
        #Find User in DB
        cur.execute("SELECT * FROM namePass WHERE %s=?"%"id",(uid,))
        id_exists = cur.fetchone()
        #Check existance
        if id_exists:
            cur = con.cursor()
            cur.execute("UPDATE namePass SET friendsList=? WHERE id=?",(data["friendsList"],uid,))
            con.commit()
            return jsonify({"code": 200, "message": "Friends List Updated"})
        else:
            return jsonify({"code": 403, "message": "User does not exists"})
app.py 文件源码 项目:leaf-server 作者: jawolf17 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def create_event():
    #Get Request
    data = request.get_json(force = True)
    #connect to db
    connection = sqlite3.connect('db/server.db')

    #generate unique id
    u_id = str(uuid.uuid4())



    #format data for insertions
    user = ((data["date"],data["time"],data["location"],data["name"],data["description"],data["listofPart"],data["image"],data["owner"],data["arrivalNot"],u_id,data["LAT"],data["LONG"],data["public"],),)

    with connection:
        cur = connection.cursor()
        cur.executemany("INSERT INTO events VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)", user)
        connection.commit()

    #Create Response
    response = {"message": "A new event has successfully been added to the database. ",
                "code": 200}

    print "[POST] NEW EVENT CREATION: New event has been added to the database"
    return jsonify(response)
app.py 文件源码 项目:gremlin 作者: amalgam8 项目源码 文件源码 阅读 85 收藏 0 点赞 0 评论 0
def post_recipe():
    payload = request.get_json()

    topology = payload.get("topology")
    scenarios = payload.get("scenarios")
    headers = payload.get("headers")
    #pattern = payload.get("header_pattern")

    if not topology:
        abort(400, "Topology required")

    if not scenarios:
        abort(400, "Failure scenarios required")

    if headers and type(headers)!=dict:
        abort(400, "Headers must be a dictionary")

    # if not pattern:
    #     abort(400, "Header_pattern required")

    appgraph = ApplicationGraph(topology)
    fg = A8FailureGenerator(appgraph, a8_controller_url='{0}/v1/rules'.format(a8_controller_url), a8_controller_token=a8_controller_token, headers=headers, debug=debug)
    fg.setup_failures(scenarios)
    return make_response(jsonify(recipe_id=fg.get_id()), 201, {'location': url_for('get_recipe_results', recipe_id=fg.get_id())})
views.py 文件源码 项目:QuickTry-Server 作者: QuickTry 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def run():
    """ Expects a json dictionary that specifies the language, code snippet, any
    expected input for the snippet. """
    content = request.get_json()
    if content is None:
        return jsonify({'status': -1, 'output': 'no input'})

    print(content)

    err, output = sandbox.execute(
            content.get('lang').lower(),
            content.get('code'),
            content.get('params'),
            os.path.join(os.getcwd(), 'tmp'))

    print("error code {}\n{}".format(err, output))
    return jsonify({'status': err, 'output': output})
notification.py 文件源码 项目:gwot-physical 作者: JanVan01 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def subscription(self):
        input = request.get_json()
        if input is None:
            return self.get_view().bad_request('Expected json')
        if 'notifier' in input and 'sensor' in input and 'settings' in input:
            notifier = Notifiers().get(input['notifier'])
            if notifier is None or not notifier.is_public():
                return self.get_view().bad_request('Not a valid notifier')
            sensor = Sensors().get(input['sensor'])
            if sensor is None:
                return self.get_view().bad_request('Not a valid sensor')

            subscription = Subscribers().create()
            try:
                subscription.set_notifier(int(input['notifier']))
                subscription.set_sensor(int(input['sensor']))
                # ToDo: Validate subscription settings
                subscription.set_settings(input['settings'])
                if not subscription.create():
                    return self.get_view().bad_request('The subscription you are trying to create does not exist try to create it instead')
            except ValueError:
                return self.get_view().bad_request('input not in the right format')
        else:
            return self.get_view().bad_request('not all necessary field set')
        return self.get_view().success()
config.py 文件源码 项目:gwot-physical 作者: JanVan01 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def complete_config(self):
        if (request.method == 'GET'):
            data = {}
            data['name'] = self.config.get_name()
            data['interval'] = self.config.get_interval()
            data['location'] = self._get_location(self.config.get_location())
            return self.get_view(template_file='config.html').data(data)
        elif (request.method == 'PUT'):
            input = request.get_json()
            if(input is None):
                return self.get_view().bad_request('expected json')
            try:
                if ('name' in input):
                    self.config.set_name(str(input['name']))
                if ('interval' in input):
                    self.config.set_interval(int(input['interval']))
                if ('location' in input):
                    self.config.set_location(int(input['location']))
            except ValueError:
                return self.get_view().bad_request('Input not in the right format')
            return self.get_view().success()
views.py 文件源码 项目:isthislegit 作者: duo-labs 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def rules():
    if request.method == "GET":
        rules = Rule.domain_query(g.domain).fetch()
        return render_template('rules.html', rules=rules, title='Rules')

    try:
        rule_json = request.get_json()
        Rule.validate(rule_json)
    except Exception as e:
        return json_error(400, str(e), {})

    base_query = Rule.domain_query(g.domain)
    name_rule = Rule.get_by_name(base_query, request.get_json().get('name'))
    if name_rule:
        return json_error(400, 'Rule name already in use', {})

    try:
        rule = Rule()
        rule.from_dict(request.get_json())
        rule.owner_domain = g.domain
        rule.created_by = g.user.email()
        rule.put()
        return jsonify(rule.to_dict())
    except Exception as e:
        return json_error(400, str(e), {})
views.py 文件源码 项目:isthislegit 作者: duo-labs 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def edit_rule(rule_id):
    orig_rule = Rule.get_by_id(rule_id)
    if not orig_rule or orig_rule.owner_domain != g.domain:
        return json_error(404, "Rule not found", {})

    try:
        rule_json = request.get_json()
        Rule.validate(rule_json)
    except Exception as e:
        return json_error(400, str(e), {})

    base_query = Rule.domain_query(g.domain)
    name_rule = Rule.get_by_name(base_query, rule_json.get('name'))
    if name_rule and name_rule.key.id() != rule_id:
        return json_error(400, 'Rule name already in use', {})

    try:
        orig_rule.from_dict(request.get_json())
        orig_rule.put()
        return jsonify(orig_rule.to_dict())
    except Exception as e:
        return json_error(400, str(e), {})
__init__.py 文件源码 项目:fluffy 作者: m4ce 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def session_addressbook_add(session_name, address_name):
    try:
        req = request.get_json()
        if req is None:
            req = {}
        fw.sessions[session_name].addressbook.add(
            name=address_name, **req)
        return make_response(jsonify(message='Address created'), status.HTTP_201_CREATED)
    except AddressExists as e:
        http_code = status.HTTP_409_CONFLICT
    except AddressNotValid as e:
        http_code = status.HTTP_400_BAD_REQUEST
    except Exception as e:
        http_code = status.HTTP_500_INTERNAL_SERVER_ERROR

    return make_response(jsonify(message='Failed to create address', error=e.message), http_code)
__init__.py 文件源码 项目:fluffy 作者: m4ce 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def session_addressbook_update(session_name, address_name):
    try:
        req = request.get_json()
        if req is None:
            req = {}
        fw.sessions[session_name].addressbook.update(
            name=address_name, **req)
        return make_response(jsonify(message='Address updated'), status.HTTP_200_OK)
    except AddressNotFound as e:
        http_code = status.HTTP_404_NOT_FOUND
    except AddressNotValid as e:
        http_code = status.HTTP_400_BAD_REQUEST
    except AddressNotUpdated as e:
        return make_response(jsonify(message='Address not updated'), status.HTTP_204_NO_CONTENT)
    except Exception as e:
        http_code = status.HTTP_500_INTERNAL_SERVER_ERROR

    return make_response(jsonify(message='Failed to update address', error=e.message), http_code)
__init__.py 文件源码 项目:fluffy 作者: m4ce 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def session_service_update(session_name, service_name):
    try:
        req = request.get_json()
        if req is None:
            req = {}
        fw.sessions[session_name].services.update(
            name=service_name, **req)
        return make_response(jsonify(message='Service updated'), status.HTTP_200_OK)
    except ServiceNotFound as e:
        http_code = status.HTTP_404_NOT_FOUND
    except ServiceNotValid as e:
        http_code = status.HTTP_400_BAD_REQUEST
    except ServiceNotUpdated as e:
        return make_response(jsonify(message='Service not updated'), status.HTTP_204_NO_CONTENT)
    except Exception as e:
        http_code = status.HTTP_500_INTERNAL_SERVER_ERROR

    return make_response(jsonify(message='Failed to update service', error=e.message), http_code)
__init__.py 文件源码 项目:fluffy 作者: m4ce 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def session_rule_update(session_name, rule_name):
    try:
        req = request.get_json()
        if req is None:
            req = {}
        fw.sessions[session_name].rules.update(name=rule_name, **req)
        return make_response(jsonify(message='Rule updated'), status.HTTP_200_OK)
    except RuleNotFound as e:
        http_code = status.HTTP_404_NOT_FOUND
    except RuleNotValid as e:
        http_code = status.HTTP_400_BAD_REQUEST
    except RuleNotUpdated as e:
        return make_response(jsonify(message='Rule not updated'), status.HTTP_204_NO_CONTENT)
    except Exception as e:
        http_code = status.HTTP_500_INTERNAL_SERVER_ERROR

    return make_response(jsonify(message='Failed to update rule', error=e.message), http_code)


问题


面经


文章

微信
公众号

扫码关注公众号