python类request()的实例源码

reqctx.py 文件源码 项目:flasky 作者: RoseOu 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_manual_context_binding(self):
        app = flask.Flask(__name__)
        @app.route('/')
        def index():
            return 'Hello %s!' % flask.request.args['name']

        ctx = app.test_request_context('/?name=World')
        ctx.push()
        self.assert_equal(index(), 'Hello World!')
        ctx.pop()
        try:
            index()
        except RuntimeError:
            pass
        else:
            self.assert_true(0, 'expected runtime error')
reqctx.py 文件源码 项目:flasky 作者: RoseOu 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_greenlet_context_copying(self):
        app = flask.Flask(__name__)
        greenlets = []

        @app.route('/')
        def index():
            reqctx = flask._request_ctx_stack.top.copy()
            def g():
                self.assert_false(flask.request)
                self.assert_false(flask.current_app)
                with reqctx:
                    self.assert_true(flask.request)
                    self.assert_equal(flask.current_app, app)
                    self.assert_equal(flask.request.path, '/')
                    self.assert_equal(flask.request.args['foo'], 'bar')
                self.assert_false(flask.request)
                return 42
            greenlets.append(greenlet(g))
            return 'Hello World!'

        rv = app.test_client().get('/?foo=bar')
        self.assert_equal(rv.data, b'Hello World!')

        result = greenlets[0].run()
        self.assert_equal(result, 42)
reqctx.py 文件源码 项目:flasky 作者: RoseOu 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_greenlet_context_copying_api(self):
        app = flask.Flask(__name__)
        greenlets = []

        @app.route('/')
        def index():
            reqctx = flask._request_ctx_stack.top.copy()
            @flask.copy_current_request_context
            def g():
                self.assert_true(flask.request)
                self.assert_equal(flask.current_app, app)
                self.assert_equal(flask.request.path, '/')
                self.assert_equal(flask.request.args['foo'], 'bar')
                return 42
            greenlets.append(greenlet(g))
            return 'Hello World!'

        rv = app.test_client().get('/?foo=bar')
        self.assert_equal(rv.data, b'Hello World!')

        result = greenlets[0].run()
        self.assert_equal(result, 42)

    # Disable test if we don't have greenlets available
fontfaces_controller.py 文件源码 项目:fontman-server 作者: fontman 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def find_fontface_by_font_id():
    response_data = []

    try:
        query_string = request.args.get("font_id")
        fontfaces = FontFaceService().find_by_font_id(query_string)

        for fontface in fontfaces:
            response_data.append(
                {
                    "fontface_id": fontface.fontface_id,
                    "fontface": fontface.fontface,
                    "font_id": fontface.font_id,
                    "resource_path": fontface.resource_path
                }
            )

        return jsonify(response_data)

    except:
        return jsonify({"error": "Invalid request"})
pushbullet.py 文件源码 项目:webhook-shims 作者: vmw-loginsight 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def pushbullet(ALERTID=None, TOKEN=None):
    """
    Send a `link` notification to all devices on pushbullet with a link back to the alert's query.
    If `TOKEN` is not passed, requires `PUSHBULLETTOKEN` defined, see https://www.pushbullet.com/#settings/account
    """
    if not PUSHBULLETURL:
        return ("PUSHBULLET parameter must be set, please edit the shim!", 500, None)
    if (TOKEN is not None):
        PUSHBULLETTOKEN = TOKEN
    if not PUSHBULLETTOKEN:
        return ("PUSHBULLETTOKEN parameter must be set, please edit the shim!", 500, None)

    a = parse(request)

    payload = {
        "body": a['info'],
        "title": a['AlertName'],
        "type": "link",
        "url": a['url'],
    }

    headers = {'Content-type': 'application/json', 'Access-Token': PUSHBULLETTOKEN}

    return callapi(PUSHBULLETURL, 'post', json.dumps(payload), headers)
fontfaces_controller.py 文件源码 项目:fontman-desktop-core 作者: fontman 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def find_fontface_by_font_id():
    response_data = []

    try:
        query_string = request.args.get("font_id")
        fontfaces = FontFaceService().find_by_font_id(query_string)

        for fontface in fontfaces:
            response_data.append(
                {
                    "fontface_id": fontface.fontface_id,
                    "fontface": fontface.fontface,
                    "font_id": fontface.font_id,
                    "resource_path": fontface.resource_path
                }
            )

        return jsonify(response_data)

    except:
        return jsonify({"error": "Invalid request"})
manager.py 文件源码 项目:flask-kerberos-login 作者: ContinuumIO 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def extract_token(self):
        '''
        Extracts a token from the current HTTP request if it is available.

        Invokes the `save_user` callback if authentication is successful.
        '''
        header = request.headers.get(b'authorization')
        if header and header.startswith(b'Negotiate '):
            token = header[10:]
            user, token = _gssapi_authenticate(token, self._service_name)
            if token is not None:
                stack.top.kerberos_token = token

            if user is not None:
                self._save_user(user)
            else:
                # Invalid Kerberos ticket, we could not complete authentication
                abort(403)
reqctx.py 文件源码 项目:oa_qian 作者: sunqb 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_manual_context_binding(self):
        app = flask.Flask(__name__)
        @app.route('/')
        def index():
            return 'Hello %s!' % flask.request.args['name']

        ctx = app.test_request_context('/?name=World')
        ctx.push()
        self.assert_equal(index(), 'Hello World!')
        ctx.pop()
        try:
            index()
        except RuntimeError:
            pass
        else:
            self.assert_true(0, 'expected runtime error')
reqctx.py 文件源码 项目:oa_qian 作者: sunqb 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_greenlet_context_copying(self):
        app = flask.Flask(__name__)
        greenlets = []

        @app.route('/')
        def index():
            reqctx = flask._request_ctx_stack.top.copy()
            def g():
                self.assert_false(flask.request)
                self.assert_false(flask.current_app)
                with reqctx:
                    self.assert_true(flask.request)
                    self.assert_equal(flask.current_app, app)
                    self.assert_equal(flask.request.path, '/')
                    self.assert_equal(flask.request.args['foo'], 'bar')
                self.assert_false(flask.request)
                return 42
            greenlets.append(greenlet(g))
            return 'Hello World!'

        rv = app.test_client().get('/?foo=bar')
        self.assert_equal(rv.data, b'Hello World!')

        result = greenlets[0].run()
        self.assert_equal(result, 42)
reqctx.py 文件源码 项目:oa_qian 作者: sunqb 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_greenlet_context_copying_api(self):
        app = flask.Flask(__name__)
        greenlets = []

        @app.route('/')
        def index():
            reqctx = flask._request_ctx_stack.top.copy()
            @flask.copy_current_request_context
            def g():
                self.assert_true(flask.request)
                self.assert_equal(flask.current_app, app)
                self.assert_equal(flask.request.path, '/')
                self.assert_equal(flask.request.args['foo'], 'bar')
                return 42
            greenlets.append(greenlet(g))
            return 'Hello World!'

        rv = app.test_client().get('/?foo=bar')
        self.assert_equal(rv.data, b'Hello World!')

        result = greenlets[0].run()
        self.assert_equal(result, 42)

    # Disable test if we don't have greenlets available
app.py 文件源码 项目:who_the_hill 作者: newsdev 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def recongize():
    '''
    Receives POST request from Twilio. Sends data from request to Amazon Rekognize and receives the API's analysis.
    Returns the resulting analysis as JSON.
    '''
    # Reads image data from incoming request
    r = request
    data = r.files['file'].read()

    # Sends image data to Amazon Rekognize for analysis. Returns JSON response of results.
    try:
        results = client.recognize_celebrities(Image={'Bytes': data})
    except botocore.exceptions.ClientError as e:
        results = {"Rekognition Client Error": str(e)}
    logging.info(results)
    return jsonify(results)
session.py 文件源码 项目:mixmatch 作者: openstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def chunked_reader():
    try:
        # If we're running under uWSGI, use the uwsgi.chunked_read method
        # to read chunked input.
        import uwsgi  # noqa

        while True:
            chunk = uwsgi.chunked_read()
            if len(chunk) > 0:
                yield chunk
            else:
                return
    except ImportError:
        # Otherwise try to read the wsgi input. This works in embedded Apache.
        stream = flask.request.environ["wsgi.input"]
        try:
            while True:
                yield stream.next()
        except Exception:
            return
server.py 文件源码 项目:coinbin.org 作者: kennethreitz 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_history(coin):
    c = Coin(coin.lower())

    q = "SELECT * from api_coin WHERE name=:coin ORDER BY date desc"

    if request.args.get('key') in API_KEYS:
        print(crayons.red('Pro request!'))
        rows = pro_db.query(q, coin=c.name)
    else:
        rows = db.query(q, coin=c.name)

    return jsonify(history=[
        {
            'value': r.value,
            'value.currency': 'USD',
            'timestamp': maya.MayaDT.from_datetime(r.date).subtract(hours=4).iso8601(),
            'when': maya.MayaDT.from_datetime(r.date).subtract(hours=4).slang_time()
        } for r in rows]
    )
lti.py 文件源码 项目:CodeGra.de 作者: CodeGra-de 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def second_phase_lti_launch(
) -> helpers.JSONResponse[t.Mapping[str, t.Union[str, models.Assignment, bool]]
                          ]:
    launch_params = jwt.decode(
        flask.request.headers.get('Jwt', None),
        app.config['LTI_SECRET_KEY'],
        algorithm='HS512'
    )['params']
    lti = CanvasLTI(launch_params)

    user, new_token = lti.ensure_lti_user()
    course = lti.get_course()
    assig = lti.get_assignment(user)
    lti.set_user_role(user)
    new_role_created = lti.set_user_course_role(user, course)
    db.session.commit()

    result: t.Mapping[str, t.Union[str, models.Assignment, bool]]
    result = {'assignment': assig, 'new_role_created': new_role_created}
    if new_token is not None:
        result['access_token'] = new_token

    return helpers.jsonify(result)
dns-server.py 文件源码 项目:DnsE16 作者: pooleja 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def validate_sig(body, sig_str, pkh):
    """
    Validate the signature on the body of the request - throws exception if not valid.
    """
    # Check permission to update
    if (pkh is None):
        raise PermissionError("pkh not found.")

    # Validate the pkh format
    base58.b58decode_check(pkh)
    if (len(pkh) < 20) or (len(pkh) > 40):
        raise PermissionError("Invalid pkh")

    try:
        if not sig_str:
            raise PermissionError("X-Bitcoin-Sig header not found.")
        if not wallet.verify_bitcoin_message(body, sig_str, pkh):
            raise PermissionError("X-Bitcoin-Sig header not valid.")
    except Exception as err:
        logger.error("Failure: {0}".format(err))
        raise PermissionError("X-Bitcoin-Sig header validation failed.")

    return True
cache_management.py 文件源码 项目:conditional 作者: ComputerScienceHouse 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def clear_cache():
    user_name = request.headers.get('x-webauth-user')
    account = ldap_get_member(user_name)

    if not ldap_is_eval_director(account) and not ldap_is_rtp(account):
        return redirect("/dashboard")

    log = logger.new(request=request)
    log.info('Purge All Caches')

    _ldap_is_member_of_directorship.cache_clear()
    ldap_get_member.cache_clear()
    ldap_get_active_members.cache_clear()
    ldap_get_intro_members.cache_clear()
    ldap_get_onfloor_members.cache_clear()
    ldap_get_current_students.cache_clear()

    get_voting_members.cache_clear()
    get_members_info.cache_clear()
    get_onfloor_members.cache_clear()
    return "cache cleared", 200
__init__.py 文件源码 项目:conditional 作者: ComputerScienceHouse 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def database_processor(logger, log_method, event_dict): # pylint: disable=unused-argument, redefined-outer-name
    if 'request' in event_dict:
        if event_dict['method'] != 'GET':
            log = UserLog(
                ipaddr=event_dict['ip'],
                user=event_dict['user'],
                method=event_dict['method'],
                blueprint=event_dict['blueprint'],
                path=event_dict['path'],
                description=event_dict['event']
                )
            db.session.add(log)
            db.session.flush()
            db.session.commit()
        del event_dict['request']
    return event_dict
fileserver.py 文件源码 项目:pyupdater-wx-demo 作者: wettenhj 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def RunFileServer(fileServerDir, fileServerPort):
    """
    Run a Flask file server on the given port.

    Explicitly specify instance_path, because Flask's
    auto_find_instance_path can fail when run in a frozen app.
    """
    app = Flask(__name__, instance_path=fileServerDir)

    @app.route('/fileserver-is-ready', methods=['GET'])
    def FileserverIsReady():  # pylint: disable=unused-variable
        """
        Used to test if file server has started.
        """
        return 'Fileserver is ready!'

    @app.route('/<path:filename>', methods=['GET'])
    def ServeFile(filename):  # pylint: disable=unused-variable
        """
        Serves up a file from PYUPDATER_FILESERVER_DIR.
        """
        return send_from_directory(fileServerDir, filename.strip('/'))

    def ShutDownServer():
        """
        Shut down the file server.
        """
        func = request.environ.get('werkzeug.server.shutdown')
        if func is None:
            raise RuntimeError('Not running with the Werkzeug Server')
        func()

    @app.route('/shutdown', methods=['POST'])
    def ShutDown():  # pylint: disable=unused-variable
        """
        Respond to a POSTed request to shut down the file server.
        """
        ShutDownServer()
        return 'Server shutting down...'

    app.run(host=LOCALHOST, port=fileServerPort)
forgot_password.py 文件源码 项目:PimuxBot 作者: Finn10111 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def request_password():
    jid = request.form.get('jid')
    re = s.query(RecoveryEmail).filter(RecoveryEmail.jid==jid, RecoveryEmail.confirmed==True).one_or_none()
    if re:
        password_code = ''.join(random.SystemRandom().choice(string.ascii_lowercase + string.ascii_uppercase + string.digits) for _ in range(64))
        re.password_code = password_code
        s.merge(re)
        s.commit()
        password_code_link = 'https://www.pimux.de/password/?code=%s' % password_code
        sendMail(re.email, 'password reset request', 'click here: %s' % password_code_link)
        content = render_template('success.html', message='link was sent')
    else:
        content = render_template('error.html', message='user not found')
    return content
app.py 文件源码 项目:active_stream 作者: flinder 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def connected():
    logging.debug('Received connect request')
    emit('log', {'data': 'Connected'})
lti.py 文件源码 项目:CodeGra.de 作者: CodeGra-de 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def create_from_request(cls: t.Type['LTI'], req: flask.Request) -> 'LTI':
        params = req.form.copy()

        lti_provider = models.LTIProvider.query.filter_by(
            key=params['oauth_consumer_key']
        ).first()
        if lti_provider is None:
            lti_provider = models.LTIProvider(key=params['oauth_consumer_key'])
            db.session.add(lti_provider)
            db.session.commit()

        params['lti_provider_id'] = lti_provider.id

        # This is semi sensitive information so it should not end up in the JWT
        # token.
        launch_params = {}
        for key, value in params.items():
            if not key.startswith('oauth'):
                launch_params[key] = value

        self = cls(launch_params, lti_provider)

        auth.ensure_valid_oauth(self.key, self.secret, req)

        return self
test_service_detail.py 文件源码 项目:TopChef 作者: TopChef 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def setUp(self) -> None:
        """
        Set up some mock model classes and create an endpoint
        """
        self.session = mock.MagicMock(spec=Session)  # type: Session
        self.request = mock.MagicMock(spec=Request)  # type: Request
        self.service_list = mock.MagicMock(
            spec=ServiceList
        )  # type: ServiceList

        self._app = Flask(__name__)
        self._app.add_url_rule(
            '/', view_func=ServiceDetail.as_view(
                ServiceDetail.__name__, self.session
            )
        )

        self._context = self._app.test_request_context()

        self._context.push()
test_services_list.py 文件源码 项目:TopChef 作者: TopChef 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_invalid_post(self, request_body: dict) -> None:
        assume("name" not in request_body.keys())
        assume("description" not in request_body.keys())
        assume("job_registration_schema" not in request_body.keys())
        assume("job_result_schema" not in request_body.keys())

        request = mock.MagicMock(spec=Request)
        request.get_json = mock.MagicMock(return_value=request_body)
        endpoint = ServicesList(
            self.session, request
        )

        with self.assertRaises(endpoint.Abort):
            endpoint.post()

        self.assertTrue(endpoint.errors)
jobs_list.py 文件源码 项目:TopChef 作者: TopChef 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __init__(
            self,
            session: Session,
            flask_request: Request=request,
            job_list_model: Optional[JobListInterface]=None
    ) -> None:
        """

        :param session: The session to use
        :param flask_request: The Flask request that this endpoint needs to
            process
        """
        super(self.__class__, self).__init__(session, flask_request)
        if job_list_model is None:
            self.job_list = JobListModel(self.database_session)
        else:
            self.job_list = job_list_model
api_metadata.py 文件源码 项目:TopChef 作者: TopChef 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def __init__(
            self,
            session: Session,
            flask_request: Request=request,
            metadata: APIMetadataModelInterface=MetadataModel()
    ) -> None:
        """

        :param session: The SQLAlchemy ORM Session to use for communicating
            with the database. This is required by ``AbstractEndpoint`` in
            order to perform its transaction management
        :param flask_request: The request to process. By default, this is
            flask's ``request`` variable
        :param metadata: The metadata to serialize. By default, this is an
            instance of the ``MetadataModel`` that pulls all of its values
            from the ``config`` script.
        """
        super(APIMetadata, self).__init__(session, request=flask_request)
        self._api_metadata = metadata
services_list.py 文件源码 项目:TopChef 作者: TopChef 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def __init__(
            self, session: Session,
            flask_request: Request=request,
            service_list: Optional[ServiceListInterface]=None
    ) -> None:
        """

        Create a service list model that is capable of getting services from
        the DB

        :param session: The database session to use
        """
        super(self.__class__, self).__init__(session, flask_request)

        if service_list is not None:
            self.service_list = service_list
        else:
            self.service_list = ServiceListModel(session)
types.py 文件源码 项目:SuperOcto 作者: mcecchi 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def will_handle_ui(self, request):
        """
        Called by OctoPrint to determine if the mixin implementation will be
        able to handle the ``request`` provided as a parameter.

        Return ``True`` here to signal that your implementation will handle
        the request and that the result of its :meth:`~octoprint.plugin.UiPlugin.on_ui_render` method
        is what should be served to the user.

        The execution order of calls to this method can be influenced via the sorting context
        ``UiPlugin.will_handle_ui``.

        Arguments:
            request (flask.Request): A Flask `Request <http://flask.pocoo.org/docs/0.10/api/#flask.Request>`_
                object.

        Returns:
            bool: ``True`` if the the implementation will serve the request,
                ``False`` otherwise.
        """
        return False
helpers.py 文件源码 项目:Flask_Blog 作者: sugarguo 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_modified_url_encoding(self):
        class ModifiedRequest(flask.Request):
            url_charset = 'euc-kr'
        app = flask.Flask(__name__)
        app.testing = True
        app.request_class = ModifiedRequest
        app.url_map.charset = 'euc-kr'

        @app.route('/')
        def index():
            return flask.request.args['foo']

        rv = app.test_client().get(u'/?foo=????'.encode('euc-kr'))
        self.assert_equal(rv.status_code, 200)
        self.assert_equal(rv.data, u'????'.encode('utf-8'))
helpers.py 文件源码 项目:zanph 作者: zanph 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_modified_url_encoding(self):
        class ModifiedRequest(flask.Request):
            url_charset = 'euc-kr'
        app = flask.Flask(__name__)
        app.testing = True
        app.request_class = ModifiedRequest
        app.url_map.charset = 'euc-kr'

        @app.route('/')
        def index():
            return flask.request.args['foo']

        rv = app.test_client().get(u'/?foo=????'.encode('euc-kr'))
        self.assert_equal(rv.status_code, 200)
        self.assert_equal(rv.data, u'????'.encode('utf-8'))
helpers.py 文件源码 项目:Texty 作者: sarthfrey 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_modified_url_encoding(self):
        class ModifiedRequest(flask.Request):
            url_charset = 'euc-kr'
        app = flask.Flask(__name__)
        app.testing = True
        app.request_class = ModifiedRequest
        app.url_map.charset = 'euc-kr'

        @app.route('/')
        def index():
            return flask.request.args['foo']

        rv = app.test_client().get(u'/?foo=????'.encode('euc-kr'))
        self.assert_equal(rv.status_code, 200)
        self.assert_equal(rv.data, u'????'.encode('utf-8'))


问题


面经


文章

微信
公众号

扫码关注公众号