python类headers()的实例源码

__init__.py 文件源码 项目:FRG-Crowdsourcing 作者: 97amarnathk 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def auth_jwt_project(short_name):
    """Create a JWT for a project via its secret KEY."""
    project_secret_key = None
    if 'Authorization' in request.headers:
        project_secret_key = request.headers.get('Authorization')
    if project_secret_key:
        project = project_repo.get_by_shortname(short_name)
        if project and project.secret_key == project_secret_key:
            token = jwt.encode({'short_name': short_name,
                                'project_id': project.id},
                               project.secret_key, algorithm='HS256')
            return token
        else:
            return abort(404)
    else:
        return abort(403)
zmirror.py 文件源码 项目:zmirror 作者: aploium 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def zmirror_enter(input_path='/'):
    """??????, ??????????, ??? main_function() """
    try:
        resp = main_function(input_path=input_path)

        # ????????
        for name, value in parse.extra_resp_headers.items():
            resp.headers.set(name, value)

        # ?????cookies
        for name, cookie_string in parse.extra_cookies.items():
            resp.headers.add("Set-Cookie", cookie_string)

    except:  # coverage: exclude
        return generate_error_page(is_traceback=True)
    else:
        return resp


# noinspection PyUnusedLocal
views.py 文件源码 项目:oadoi 作者: Impactstory 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def after_request_stuff(resp):

    #support CORS
    resp.headers['Access-Control-Allow-Origin'] = "*"
    resp.headers['Access-Control-Allow-Methods'] = "POST, GET, OPTIONS, PUT, DELETE, PATCH"
    resp.headers['Access-Control-Allow-Headers'] = "origin, content-type, accept, x-requested-with"

    # remove session
    db.session.remove()

    # without this jason's heroku local buffers forever
    sys.stdout.flush()

    # log request for analytics
    log_request(resp)

    return resp
utils.py 文件源码 项目:grest 作者: mostafa 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def make_request(url, json_data=None, method="post", headers=None):
    if (headers is None):
        headers = {
            "Content-Type": "application/json",
            "Accept": "application/json"
        }

    try:
        func = getattr(requests, method)
        if (json_data):
            response = func(url, json=json_data, headers=headers)
        else:
            response = func(url, headers=headers)

        if (response.content):
            return json.loads(response.content)
    except Exception as e:
        app.ext_logger.exception(e)
        return None
zmirror.py 文件源码 项目:zmirror 作者: aploium 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def request_remote_site():
    """
    ???????(high-level), ????404/500??? domain_guess ??
    """

    # ????????
    # ??: ?zmirror?????????, ??????????????
    parse.remote_response = send_request(
        parse.remote_url,
        method=request.method,
        headers=parse.client_header,
        data=parse.request_data_encoded,
    )

    if parse.remote_response.url != parse.remote_url:
        warnprint("requests's remote url", parse.remote_response.url,
                  'does no equals our rewrited url', parse.remote_url)

    if 400 <= parse.remote_response.status_code <= 599:
        # ??url????????
        dbgprint("Domain guessing for", request.url)
        result = guess_correct_domain()
        if result is not None:
            parse.remote_response = result
controllers.py 文件源码 项目:IntegraTI-API 作者: discentes-imd 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def put(self):
        """Change the password"""
        us = User.query \
            .filter(User.disabled == 0) \
            .filter(User.id_user == g.current_user) \
            .first()
        abort_if_none(us, 404, 'User not found')

        if not check_password_hash(us.password, request.json['old_password']):
            return msg('Old password incorrect'), 403

        us.password = request.json['password']
        db.session.commit()
        cache.blacklisted_tokens.append(request.headers['Authorization'])

        return msg('success!')
ghost2loggersensor.py 文件源码 项目:stackstorm-ghost2logger 作者: StackStorm-Exchange 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _process_request(self, request):
        self._logger.info('[Ghost2logger]: Received Request')
        if request.headers['Content-Type'] == 'application/json':
            payload = request.json
            if 'pattern' and 'host' and 'hostpattern' and 'message' in payload:
                self._logger.info(request.json)
                self._logger.debug('[ghost2logger_sensor]: processing request'
                                   ' {}'.format(payload))

                self._sensor_service.dispatch(trigger=self._pmatch,
                                              payload=payload)
                return ('ok', 200)
            else:
                return ('fail', 415)
        else:
            return ('fail', 415)
pyldn.py 文件源码 项目:pyldn 作者: albertmeronyo 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def get_inbox():
    pyldnlog.debug("Requested inbox data of {} in {}".format(request.url, request.headers['Accept']))
    if not request.headers['Accept'] or request.headers['Accept'] == '*/*' or 'text/html' in request.headers['Accept']:
        resp = make_response(inbox_graph.serialize(format='application/ld+json'))
        resp.headers['Content-Type'] = 'application/ld+json'
    elif request.headers['Accept'] in ACCEPTED_TYPES:
        resp = make_response(inbox_graph.serialize(format=request.headers['Accept']))
        resp.headers['Content-Type'] = request.headers['Accept']
    else:
        return 'Requested format unavailable', 415

    resp.headers['X-Powered-By'] = 'https://github.com/albertmeronyo/pyldn'
    resp.headers['Allow'] = "GET, HEAD, OPTIONS, POST"
    resp.headers['Link'] = '<http://www.w3.org/ns/ldp#Resource>; rel="type", <http://www.w3.org/ns/ldp#RDFSource>; rel="type", <http://www.w3.org/ns/ldp#Container>; rel="type", <http://www.w3.org/ns/ldp#BasicContainer>; rel="type"'
    resp.headers['Accept-Post'] = 'application/ld+json, text/turtle'

    return resp
pyldn.py 文件源码 项目:pyldn 作者: albertmeronyo 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def get_notification(id):
    pyldnlog.debug("Requested notification data of {}".format(request.url))
    pyldnlog.debug("Headers: {}".format(request.headers))

    # Check if the named graph exists
    pyldnlog.debug("Dict key is {}".format(pyldnconf._inbox_url + id))
    if pyldnconf._inbox_url + id not in graphs:
        return 'Requested notification does not exist', 404

    if 'Accept' not in request.headers or request.headers['Accept'] == '*/*' or 'text/html' in request.headers['Accept']:
        resp = make_response(graphs[pyldnconf._inbox_url + id].serialize(format='application/ld+json'))
        resp.headers['Content-Type'] = 'application/ld+json'
    elif request.headers['Accept'] in ACCEPTED_TYPES:
        resp = make_response(graphs[pyldnconf._inbox_url + id].serialize(format=request.headers['Accept']))
        resp.headers['Content-Type'] = request.headers['Accept']
    else:
        return 'Requested format unavailable', 415

    resp.headers['X-Powered-By'] = 'https://github.com/albertmeronyo/pyldn'
    resp.headers['Allow'] = "GET"

    return resp
adCentsE16-server.py 文件源码 项目:AdCentsE16 作者: pooleja 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def pay_user(to_user_name, to_address, amount):
    """
    Uses a BitTransferRequests to do an off chain payment.
    """
    headers = {BitTransferRequests.HTTP_BITCOIN_PRICE: amount,
               BitTransferRequests.HTTP_BITCOIN_ADDRESS: to_address,
               BitTransferRequests.HTTP_BITCOIN_USERNAME: to_user_name}
    response = MockRequest()
    setattr(response, 'headers', headers)
    setattr(response, 'url', 'http://10.244.119.122:11116')

    logger.debug("Making 402 payment request with headers: {}".format(response))
    req = requests.make_402_payment(response, amount)
    logger.debug("Have the payment: {}".format(req))
    transfer = BitTransfer(wallet, username=to_user_name)
    logger.debug("Have the transfer: {}".format(transfer))
    return transfer.redeem_payment(amount, req)
paging.py 文件源码 项目:microcosm-flask 作者: globality-corp 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def to_paginated_list(self, result, _ns, _operation, **kwargs):
        """
        Convert a controller result to a paginated list.

        The result format is assumed to meet the contract of this page class's `parse_result` function.

        """
        items, context = self.parse_result(result)
        headers = dict()
        paginated_list = PaginatedList(
            items=items,
            _page=self,
            _ns=_ns,
            _operation=_operation,
            _context=context,
        )
        return paginated_list, headers
app_factory.py 文件源码 项目:rpc-explorer 作者: jtimon 项目源码 文件源码 阅读 49 收藏 0 点赞 0 评论 0
def create_restmin_app(app_name, config_path, base_url, request_processor):
    from flask import Flask
    app = Flask(app_name)
    app.config.from_object(config_path)

    @app.route(base_url + '<string:resource>', methods = ['GET', 'POST', 'PUT'])
    @crossdomain(origin='*')
    def _api_generic(resource):
        return api_generic(request, request_processor, current_app, resource)

    @app.route(base_url + '<string:resource>', methods = ['OPTIONS'])
    @crossdomain(origin='*', headers='Content-Type, X-User, X-Token')
    def _options(self):
        return jsonify({'Allow' : 'GET,POST,PUT' }), 200

    return app
base.py 文件源码 项目:flask_atlassian_connect 作者: halkeye 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _provide_client_handler(self, section, name, kwargs_updator=None):
        def _wrapper(func):
            @wraps(func)
            def _handler(**kwargs):
                client_key = self.auth.authenticate(
                    request.method,
                    request.url,
                    request.headers)
                client = self.client_class.load(client_key)
                if not client:
                    abort(401)
                g.ac_client = client
                kwargs['client'] = client
                if kwargs_updator:
                    kwargs.update(kwargs_updator(**kwargs))

                ret = func(**kwargs)
                if ret is not None:
                    return ret
                return '', 204
            self._add_handler(section, name, _handler)
            return func
        return _wrapper
auth.py 文件源码 项目:OpenManga-sync 作者: nv95 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def auth_required(f):
    @wraps(f)
    def decorated(*args, **kwargs):
        try:
            token = request.headers['X-AuthToken']
            if token is None:
                return {'state': 'fail', 'message': 'Authorization required'}, 403
            token = Token.query.get(token)
            if token is None:
                return {'state': 'fail', 'message': 'Invalid token'}, 403
            if token.expires_at is not None and token.expires_at < datetime.datetime.now():
                token.delete()
                db.session.flush()
                db.session.commit()
                return {'state': 'fail', 'message': 'Token was expired'}, 403
            return f(token=token, *args, **kwargs)
        except Exception as e:
            db.session.rollback()
            log.exception(e)
            return {'state': 'fail', 'message': str(e)}, 500

    return decorated
bmx-sample-broker.py 文件源码 项目:Bluemix-ServiceBroker 作者: IBM-Cloud 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def catalog():
    # Return the catalog of services handled by this broker
    #
    # GET /v2/catalog:
    #
    # HEADER:
    #     X-Broker-Api-Version: <version>
    #
    # return:
    #     JSON document with details about the
    #     services offered through this broker

    api_version = request.headers.get('X-Broker-Api-Version')
    # Check broker API version
    if not api_version or float(api_version) < X_BROKER_API_VERSION:
        abort(412, "Precondition failed. Missing or incompatible %s. Expecting version %0.1f or later" % (X_BROKER_API_VERSION_NAME, X_BROKER_API_VERSION))
    services={"services": [pseudo_service]}
    return jsonify(services)


#
# Provision
#
auth.py 文件源码 项目:ambassador 作者: datawire 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def catch_all(path):
    if not path.startswith('/'):
        path = '/' + path

    resp = Response('You want path: %s' % path)

    if path.startswith('/ambassador/'):
        resp.status_code = 200     
    elif path.endswith("/good/") or path.endswith("/demo/"):
        resp.status_code = 200
        resp.headers['X-Hurkle'] = 'Oh baby, oh baby.'
    elif path.endswith("/nohdr/"):
        resp.status_code = 200
        # Don't add the header.
    else:
        resp.status_code = 403

    resp.headers['X-Test'] = 'Should not be seen.'
    return resp
pushes_api.py 文件源码 项目:hashtagtodo-open 作者: slackpad 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def calendar_push():
    watch_id = request.headers['X-Goog-Channel-ID']
    db_calendar = Calendar.get_by_watch_id(watch_id)
    if db_calendar:
        if db_calendar.active:
            user = db_calendar.key.parent().get()
            client = make_client(user)
            sync_time = datetime.utcnow()
            try:
                updates = sync_calendar(sync_time, user, client, db_calendar)
                if updates:
                    sync_user(sync_time, user, client)
            except Exception as e:
                print e
    else:
        resource_id = request.headers['X-Goog-Resource-ID']
        print 'Unknown push notification for resource id %s' % resource_id

    return make_response()
tenant_manager.py 文件源码 项目:os-reststack-manager 作者: gemagomez 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def authenticate():
    # logger.debug("endpoint request: %s" % request.endpoint)
    if re.search('tenant_provisioned', str(request.endpoint)):
        g.user = "phone_home"
        logger.info("Authentication bypassed: tenant_provisioned")
        return

    try:
        decoded = jwt.decode(request.headers['X-Auth-Token'], credentials['tenant_secret'], algorithms=['HS256'])
        g.user = decoded['user']
    except KeyError:
        logger.error("Error: key error.")
        abort(401)
    except jwt.DecodeError:
        logger.error("Error: decode error")
        abort(401)
views.py 文件源码 项目:esignature-recipes-python 作者: docusign 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def auth_redirect():
    err = ds_authentication.auth_redirect()
    # err is False or an error message
    # We will use the Flash technique to show the message on the home page.
    # Or a simpler alternative would be to show the error message on an intermediate
    # page, with a "Continue" link to the home page
    if err:
        flash(err)
    # flash("Debug info: " + str(request.headers))

    # Authentication / re-authentication was successful
    # Figure out what to do next
    if "auth_redirect" in session:
        auth_redirect = session["auth_redirect"]
        if auth_redirect:
            session["auth_redirect"] = False
            return redirect(auth_redirect)

    return redirect(ds_recipe_lib.get_base_url(1))
apiServer.py 文件源码 项目:sdos-core 作者: sdos 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def handle_auth():
    """
    Forward the auth request to swift
    replace the given storage url with our own:
    'X-Storage-Url': 'http://192.168.209.204:8080/v1/AUTH_test'
    becomes
    'X-Storage-Url': 'http://localhost:4000/v1/AUTH_test'

    this is the first request any client makes; we passed on an auth-token from swift
    which is used in further requests
    :return:
    """
    clientHeaders = request.headers
    swiftStatus, swiftHeaders, swiftBody = httpBackend.doAuthGetToken(reqHead=clientHeaders, method="GET")
    log.debug("swift response: {} {} {}".format(swiftStatus, swiftHeaders, swiftBody))
    if 200 == swiftStatus:
        replaceStorageUrl(swiftResponse=swiftHeaders)
        log.debug("proxy response: {} {} {}".format(swiftStatus, swiftHeaders, swiftBody))
    return Response(status=swiftStatus, headers=swiftHeaders, response=swiftBody)
auth.py 文件源码 项目:ijust_server 作者: koala-team 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def authenticate(self, f):
        @wraps(f)
        def decorated(*args, **kwargs):

            if not 'Access-Token' in request.headers:
                return abort(401, "Set token to access protected routes")

            token = request.headers['Access-Token']
            user_id = self.redis.get(token)

            if not user_id:
                return abort(401, "Token is invalid or has expired")

            g.user_id = user_id
            return f(*args, **kwargs)

        return decorated
__init__.py 文件源码 项目:webhook-shims 作者: vmw-loginsight 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test(ALERTID=None):
    """Log the auth header, request, and parsed moreinfo field. Respond with success. Don't send the payload anywhere."""
    try:
        logging.info(request.headers['Authorization'])
    except KeyError:
        pass
    if request.get_data():
        logging.info(request.get_data())
        a = parse(request)
        try:
            logging.info(a['moreinfo'])
        except KeyError:
            pass
    return "OK"


# Import individual shims
keys.py 文件源码 项目:nanobox-adapter-libcloud 作者: nanobox-io 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def key_create(adapter_id):
    """Creates a key using a certain adapter."""
    adapter = get_adapter(adapter_id)

    if not adapter:
        return output.failure("That adapter doesn't (yet) exist. Please check the adapter name and try again.", 501)

    if not adapter.do_verify(request.headers):
        return output.failure("Credential verification failed. Please check your credentials and try again.", 401)

    result = adapter.do_key_create(request.headers, request.json)

    if 'error' in result:
        return output.failure(result['error'], result['status'])

    return output.success(result['data'], result['status'])
keys.py 文件源码 项目:nanobox-adapter-libcloud 作者: nanobox-io 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def key_query(adapter_id, key_id):
    """Queries data about a key using a certain adapter."""
    adapter = get_adapter(adapter_id)

    if not adapter:
        return output.failure("That adapter doesn't (yet) exist. Please check the adapter name and try again.", 501)

    if not adapter.do_verify(request.headers):
        return output.failure("Credential verification failed. Please check your credentials and try again.", 401)

    result = adapter.do_key_query(request.headers, key_id)

    if 'error' in result:
        return output.failure(result['error'], result['status'])

    return output.success(result['data'], result['status'])
keys.py 文件源码 项目:nanobox-adapter-libcloud 作者: nanobox-io 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def key_delete(adapter_id, key_id):
    """Deletes a key using a certain adapter."""
    adapter = get_adapter(adapter_id)

    if not adapter:
        return output.failure("That adapter doesn't (yet) exist. Please check the adapter name and try again.", 501)

    if not adapter.do_verify(request.headers):
        return output.failure("Credential verification failed. Please check your credentials and try again.", 401)

    result = adapter.do_key_delete(request.headers, key_id)

    if isinstance(result, dict) and 'error' in result:
        return output.failure(result['error'], result['status'])

    return ""
servers.py 文件源码 项目:nanobox-adapter-libcloud 作者: nanobox-io 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def server_create(adapter_id):
    """Creates a server using a certain adapter."""
    adapter = get_adapter(adapter_id)

    if not adapter:
        return output.failure("That adapter doesn't (yet) exist. Please check the adapter name and try again.", 501)

    if not adapter.do_verify(request.headers):
        return output.failure("Credential verification failed. Please check your credentials and try again.", 401)

    result = adapter.do_server_create(request.headers, request.json)

    if 'error' in result:
        return output.failure(result['error'], result['status'])

    return output.success(result['data'], result['status'])
servers.py 文件源码 项目:nanobox-adapter-libcloud 作者: nanobox-io 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def server_cancel(adapter_id, server_id):
    """Cancels a server using a certain adapter."""
    adapter = get_adapter(adapter_id)

    if not adapter:
        return output.failure("That adapter doesn't (yet) exist. Please check the adapter name and try again.", 501)

    if not adapter.do_verify(request.headers):
        return output.failure("Credential verification failed. Please check your credentials and try again.", 401)

    result = adapter.do_server_cancel(request.headers, server_id)

    if isinstance(result, dict) and 'error' in result:
        return output.failure(result['error'], result['status'])

    return ""
servers.py 文件源码 项目:nanobox-adapter-libcloud 作者: nanobox-io 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def server_install_key(adapter_id, server_id):
    """Installs an SSH key on a server using a certain adapter, if that adapter supports key installation."""
    adapter = get_adapter(adapter_id)

    if not adapter:
        return output.failure("That adapter doesn't (yet) exist. Please check the adapter name and try again.", 501)

    if not adapter.can_install_key():
        return output.failure("This adapter doesn't support installing keys on servers.", 501)

    if not adapter.do_verify(request.headers):
        return output.failure("Credential verification failed. Please check your credentials and try again.", 401)

    result = adapter.do_install_key(request.headers, server_id, request.json)

    if isinstance(result, dict) and 'error' in result:
        return output.failure(result['error'], result['status'])

    return ""
servers.py 文件源码 项目:nanobox-adapter-libcloud 作者: nanobox-io 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def server_reboot(adapter_id, server_id):
    """Reboots a server using a certain adapter, if that adapter supports rebooting."""
    adapter = get_adapter(adapter_id)

    if not adapter:
        return output.failure("That adapter doesn't (yet) exist. Please check the adapter name and try again.", 501)

    if not adapter.can_reboot():
        return output.failure("This adapter doesn't support rebooting servers.", 501)

    if not adapter.do_verify(request.headers):
        return output.failure("Credential verification failed. Please check your credentials and try again.", 401)

    result = adapter.do_server_reboot(request.headers, server_id)

    if isinstance(result, dict) and 'error' in result:
        return output.failure(result['error'], result['status'])

    return ""
servers.py 文件源码 项目:nanobox-adapter-libcloud 作者: nanobox-io 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def server_rename(adapter_id, server_id):
    """Renames a server using a certain adapter, if that adapter supports renaming."""
    adapter = get_adapter(adapter_id)

    if not adapter:
        return output.failure("That adapter doesn't (yet) exist. Please check the adapter name and try again.", 501)

    if not adapter.can_rename():
        return output.failure("This adapter doesn't support renaming servers.", 501)

    if not adapter.do_verify(request.headers):
        return output.failure("Credential verification failed. Please check your credentials and try again.", 401)

    result = adapter.do_server_rename(request.headers, server_id, request.json)

    if isinstance(result, dict) and 'error' in result:
        return output.failure(result['error'], result['status'])

    return ""


问题


面经


文章

微信
公众号

扫码关注公众号