python类authorization()的实例源码

views.py 文件源码 项目:ssh-ca-server 作者: commercehub-oss 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def requires_auth(function):
    """ Bind against LDAP to validate users credentials """

    @wraps(function)
    def decorated(*args, **kwargs):

        logger = logging.getLogger("ca_server")
        auth = request.authorization
        ldap = LdapClient()

        if not auth or not ldap.check_auth(auth.username, auth.password):
            output = DataBag()
            output.set_error("Access denied")
            if hasattr(auth, "username"):
                logger.info("requires_auth: access denied {}".format(auth.username))
            else:
                logger.info("requires_auth: access denied")
            return output.get_json()

        return function(auth.username)
    return decorated
__init__.py 文件源码 项目:flask-api-skeleton 作者: ianunruh 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def check_auth():
    session = None
    user = None

    token = request.headers.get('X-Auth-Token')
    if token:
        session = Session.query.filter_by(token=token).first()
        if not session:
            return make_error_response('Invalid session token', 401)

        user = session.user
    else:
        auth = request.authorization
        if auth:
            user = User.find_by_email_or_username(auth.username)
            if not (user and user.password == auth.password):
                return make_error_response('Invalid username/password combination', 401)

    g.current_session = session
    g.current_user = user
cli.py 文件源码 项目:rq-scheduler-dashboard 作者: lamflam 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def add_basic_auth(blueprint, username, password, realm='RQ Scheduler Dashboard'):
    """Add HTTP Basic Auth to a blueprint.

    Note this is only for casual use!

    """
    @blueprint.before_request
    def basic_http_auth(*args, **kwargs):
        auth = request.authorization
        if (
                auth is None
                or auth.password != password
                or auth.username != username):
            return Response(
                'Please login',
                401,
                {'WWW-Authenticate': 'Basic realm="{}"'.format(realm)})
controllers.py 文件源码 项目:python-aws-ecr-deployer 作者: filc 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def _template_rendering(template):
    def decorator(fn):
        @wraps(fn)
        def inner_fn(*args, **kwargs):
            data = fn(*args, **kwargs)

            auth = request.authorization
            basic_auth = '' if not auth else base64.b64encode(bytes(':'.join([auth.username, auth.password]), 'utf-8')).decode('utf-8')

            data.update({
                'basic_auth': basic_auth,
                'base_url': g.cn.g_('app_config').get('base_url'),
                'ecs_clusters': g.cn.f_('aws.get_ecs_clusters', region=g.cn.g_('app_config').get('ecs_region')),
                'selected_ecs_cluster': g.cn.g_('session').get('selected_ecs_cluster')
            })

            return render_template(template, **data)
        return inner_fn
    return decorator
flask_httpauth.py 文件源码 项目:flasky 作者: RoseOu 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def login_required(self, f):
        @wraps(f)
        def decorated(*args, **kwargs):
            auth = request.authorization
            # We need to ignore authentication headers for OPTIONS to avoid
            # unwanted interactions with CORS.
            # Chrome and Firefox issue a preflight OPTIONS request to check
            # Access-Control-* headers, and will fail if it returns 401.
            if request.method != 'OPTIONS':
                if auth:
                    password = self.get_password_callback(auth.username)
                else:
                    password = None
                if not self.authenticate(auth, password):
                    return self.auth_error_callback()
            return f(*args, **kwargs)
        return decorated
decorators.py 文件源码 项目:heroku-python-boilerplate 作者: chavli 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def require_token(func):
    """ verifies the uuid/token combo of the given account. account type can be:
        customer, fox, merchant """
    @wraps(func)
    def decorator(*args, **kwargs):
        if request.authorization:
            uuid = request.authorization.username
            token = request.authorization.password
            try:
                manager = SessionManager()
                valid = manager.verify(uuid, token)
                if not valid:
                    return UnauthorizedResponseJson().make_response()
            except Exception as e:
                traceback.print_exc()
                return ExceptionResponseJson("unable to validate credentials", e).make_response()
        else:
            return UnauthorizedResponseJson().make_response()
        return func(*args, **kwargs)
    return decorator
decorators.py 文件源码 项目:heroku-python-boilerplate 作者: chavli 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def require_password(func):
    """ verifies the given username/password combo """
    @wraps(func)
    def decorator(*args, **kwargs):
        if request.authorization:
            username = request.authorization.username
            password = request.authorization.password
            try:
                manager = AccountManager()
                valid = manager.verify_account(username, password)
                if not valid:
                    return UnauthorizedResponseJson().make_response()
            except Exception as e:
                traceback.print_exc()
                return ExceptionResponseJson("unable to validate credentials", e).make_response()
        else:
            return UnauthorizedResponseJson().make_response()
        return func(*args, **kwargs)
    return decorator
auth.py 文件源码 项目:cabu 作者: thylong 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def requires_admin(f):  # pragma: no cover
    """Decorator to define endpoints that requires Basic Auth.

    Args:
        f (func): An route function.

    Returns:
        response (object): 401 if unauthorized.
        f (func): The route to call.
    """
    @wraps(f)
    def decorated(*args, **kwargs):
        auth = request.authorization
        if not auth or not check_auth(auth.username, auth.password):
            return authenticate()
        return f(*args, **kwargs)
    return decorated
flask_httpauth.py 文件源码 项目:Callandtext 作者: iaora 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def login_required(self, f):
        @wraps(f)
        def decorated(*args, **kwargs):
            auth = request.authorization
            # We need to ignore authentication headers for OPTIONS to avoid
            # unwanted interactions with CORS.
            # Chrome and Firefox issue a preflight OPTIONS request to check
            # Access-Control-* headers, and will fail if it returns 401.
            if request.method != 'OPTIONS':
                if auth:
                    password = self.get_password_callback(auth.username)
                else:
                    password = None
                if not self.authenticate(auth, password):
                    return self.auth_error_callback()
            return f(*args, **kwargs)
        return decorated
decorators.py 文件源码 项目:discovery 作者: lyft 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def basic_authenticate(f):
    @wraps(f)
    def decorated(*args, **kwargs):

        if not current_app.config.get("USE_AUTH"):
            return f(*args, **kwargs)

        if not getattr(f, 'basic_authenticate', True):
            return f(*args, **kwargs)

        auth = request.authorization

        if auth and auth.username and auth.password != '':
            password = current_app.config.get("{}_PASSWORD".format(auth.username.upper()))

            if auth.password == password:
                role = AUTH_ROLES[auth.username]
                logging.debug("Authenticated '{}' with role '{}' via basic auth".format(auth.username, role))
                current_app.auth_role = role
                return f(*args, **kwargs)

        return abort(401)
    return decorated
flask_httpauth.py 文件源码 项目:webapp 作者: superchilli 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def login_required(self, f):
        @wraps(f)
        def decorated(*args, **kwargs):
            auth = request.authorization
            # We need to ignore authentication headers for OPTIONS to avoid
            # unwanted interactions with CORS.
            # Chrome and Firefox issue a preflight OPTIONS request to check
            # Access-Control-* headers, and will fail if it returns 401.
            if request.method != 'OPTIONS':
                if auth:
                    password = self.get_password_callback(auth.username)
                else:
                    password = None
                if not self.authenticate(auth, password):
                    return self.auth_error_callback()
            return f(*args, **kwargs)
        return decorated
pipelines.py 文件源码 项目:pypers 作者: frankosan 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def put(self):
            """
            Queue the specific pipeline
            """
            data   = request.get_json(force=True)
            config = data.get('config')
            user   = auth_get_username(request.authorization, data.get('user'))

            errors = None # Pipeline.validate_config(config, user)
            if not errors:
                config = Pipeline.load_cfg(config)
                # Get id from DB
                db_info = dbmodel.PipelineDb(config['name'], config, Pipeline.ordered_steps(config), user)
                config['run_id'] = db_info.run_id

                ut.pretty_print("Submitting pipeline %s (ID %d) for user %s" % (config['label'], config['run_id'], user))
                return pm.add_pipeline(config, user)
            else:
                return errors, 400
__init__.py 文件源码 项目:pillar 作者: armadillica 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def get_blender_id_oauth_token() -> str:
    """Returns the Blender ID auth token, or an empty string if there is none."""

    from flask import request

    token = session.get('blender_id_oauth_token')
    if token:
        if isinstance(token, (tuple, list)):
            # In a past version of Pillar we accidentally stored tuples in the session.
            # Such sessions should be actively fixed.
            # TODO(anyone, after 2017-12-01): refactor this if-block so that it just converts
            # the token value to a string and use that instead.
            token = token[0]
            session['blender_id_oauth_token'] = token
        return token

    if request.authorization and request.authorization.username:
        return request.authorization.username

    if current_user.is_authenticated and current_user.id:
        return current_user.id

    return ''
rbd-target-api.py 文件源码 项目:ceph-iscsi-cli 作者: ceph 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def requires_basic_auth(f):
    """
    wrapper function to check authentication credentials are valid
    """

    @wraps(f)
    def decorated(*args, **kwargs):

        # check credentials supplied in the http request are valid
        auth = request.authorization
        if not auth:
            return jsonify(message="Missing credentials"), 401

        if (auth.username != settings.config.api_user or
           auth.password != settings.config.api_password):
            return jsonify(message="username/password mismatch with the "
                                   "configuration file"), 401

        return f(*args, **kwargs)

    return decorated
app.py 文件源码 项目:hydrus 作者: HTTP-APIs 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def get(self, id_, type_):
        """GET object with id = id_ from the database."""
        if get_authentication():
            if request.authorization is None:
                return failed_authentication()
            else:
                auth = check_authorization(request, get_session())
                if auth is False:
                    return failed_authentication()

        class_type = get_doc().collections[type_]["collection"].class_.title

        if checkClassOp(class_type, "GET"):

            try:
                response = crud.get(id_, class_type, api_name=get_api_name(), session=get_session())
                return set_response_headers(jsonify(hydrafy(response)))

            except Exception as e:
                status_code, message = e.get_HTTP()
                return set_response_headers(jsonify(message), status_code=status_code)

        abort(405)
app.py 文件源码 项目:hydrus 作者: HTTP-APIs 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def delete(self, id_, type_):
        """Delete object with id=id_ from database."""
        if get_authentication():
            if request.authorization is None:
                return failed_authentication()
            else:
                auth = check_authorization(request, get_session())
                if auth is False:
                    return failed_authentication()

        class_type = get_doc().collections[type_]["collection"].class_.title

        if checkClassOp(class_type, "DELETE"):
            try:
                crud.delete(id_, class_type, session=get_session())
                response = {"message": "Object with ID %s successfully deleted" % (id_)}
                return set_response_headers(jsonify(response))

            except Exception as e:
                status_code, message = e.get_HTTP()
                return set_response_headers(jsonify(message), status_code=status_code)

        abort(405)
app.py 文件源码 项目:hydrus 作者: HTTP-APIs 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def delete(self, type_):
        """Delete a non Collection class item."""
        if get_authentication():
            if request.authorization is None:
                return failed_authentication()
            else:
                auth = check_authorization(request, get_session())
                if auth is False:
                    return failed_authentication()

        if checkEndpoint("DELETE", type_):
            # No Delete Operation for collections
            if type_ in get_doc().parsed_classes and type_+"Collection" not in get_doc().collections:
                try:
                    crud.delete_single(type_, session=get_session())
                    response = {"message": "Object successfully deleted"}
                    return set_response_headers(jsonify(response))
                except Exception as e:
                    status_code, message = e.get_HTTP()
                    return set_response_headers(jsonify(message), status_code=status_code)
        abort(405)
auth.py 文件源码 项目:toll_road 作者: idosekely 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def requires_auth(f):
    @wraps(f)
    def decorated(*args, **kwargs):
        auth = request.authorization
        if not auth or not check_auth(auth.username, auth.password):
            return authenticate()
        return f(*args, **kwargs)

    return decorated
__init__.py 文件源码 项目:flask-basic-roles 作者: ownaginatious 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def require(self, users=(), roles=(), test_auth=None, test_method=None):
        """
        Authenticates/authorizes a request based on the content of the
        request.authorization parameter.

        users       -- users permitted to call this method
        roles       -- roles permitted to call this method
        test_auth   -- credentials only for testing
        test_method -- method only for testing
        """

        users = self._process_targets(users)
        roles = self._process_targets(roles)

        def loaded_decorated(f):
            @wraps(f)
            def decorated(*args, **kwargs):
                auth = test_auth or request.authorization
                method = test_method.upper() if test_method else request.method
                authenticated = auth and (auth.username in self.users) and \
                    self.users[auth.username] == auth.password
                if not authenticated:
                    return self.no_authentication()
                allowed_users = users[method] if users else None
                allowed_roles = roles[method] if roles else None
                if allowed_users or allowed_roles:
                    auth_as_user = auth.username in allowed_users
                    auth_as_role = allowed_roles & self.roles[auth.username]
                    if not auth_as_user and not auth_as_role:
                        return self.no_authorization()
                return f(*args, **kwargs)
            return decorated
        return loaded_decorated
ui.py 文件源码 项目:gransk 作者: pcbje 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def requires_auth(f):
    @wraps(f)
    def decorated(*args, **kwargs):
        auth = request.authorization
        if not auth or not check_auth(auth.username, auth.password):
          if _globals.get('test'):
            return f(*args, **kwargs)
          return authenticate()
        return f(*args, **kwargs)
    return decorated
ghost2loggersensor.py 文件源码 项目:stackstorm-ghost2logger 作者: StackStorm-Exchange 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def requires_auth(f):
    """Wrapper function."""
    @wraps(f)
    def decorated(*args, **kwargs):
        auth = request.authorization
        if not auth or not check_auth(auth.username, auth.password):
            return authenticate()
        return f(*args, **kwargs)
    return decorated
apiServer.py 文件源码 项目:AutoTriageBot 作者: salesforce 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def authed(func: Callable[[], str]) -> Callable[[], Union[Response, str]]:
    """ Given a function returns one that requires basic auth """
    @wraps(func)
    def decorator():
        auth = request.authorization
        if auth and validAuth(auth.username, auth.password):
            return func()
        return authFailure()
    return decorator
decorators.py 文件源码 项目:flask-vue-example 作者: levi-lq 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def login_required(func):
    """
    desc:   ???????

    """
    @wraps(func)
    def _decorator_func(*args, **kwargs):
        if request.authorization is None:
            content_type = request.headers["Content-Type"]
            if "application/x-www-form-urlencoded" in content_type:
                data = request.form.to_dict()
            elif "application/json" in content_type:
                data = request.get_json()
            elif "multipart/form-data" in content_type:
                data = request.get_json()
                if data is None:
                    data = request.form.to_dict()
            else:
                raise error_handlers.BadToken(http_responses.HTTP_400_BAD_REQUEST(msg={"error": u"????"}))
            if not isinstance(data, dict):
                raise error_handlers.BadToken(http_responses.HTTP_400_BAD_REQUEST(msg={"error": u"???json????"}))
            token = data.get("token", None)
            if token is None:
                raise error_handlers.MissToken(
                    http_responses.HTTP_400_BAD_REQUEST(msg={"error": u"???????token"})
                )
        else:
            token = request.authorization["username"]

        g.user = User.verify_auth_token(token)

        # ????
        identity = g.cache.get(token)

        if identity is not None:
            g.identity = pickle.loads(identity)
        return func(*args, **kwargs)

    return _decorator_func
app_factory.py 文件源码 项目:rpc-explorer 作者: jtimon 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def api_generic(request, request_processor, app, resource):
    req = {}
    req['method'] = request.method
    req['resource'] = resource
    req['json'] = request.json
    req['params'] = {}

    params = {}
    for k, v in request.args.iteritems():
        try:
            in_json = json.loads(v)
            if type(in_json) is dict:
                params[k] = in_json
            else:
                params[k] = str(in_json)
        except:
            params[k] = v
    req['params'] = params

    auth_form = {}
    if request.authorization:
        auth_form['user'] = request.authorization.username
        auth_form['password'] = request.authorization.password
    else:
        if 'X-User' in request.headers:
            auth_form['user'] = request.headers['X-User']
        if 'X-Token' in request.headers:
            auth_form['token'] = request.headers['X-Token']
    req['auth_form'] = auth_form

    response = request_processor(app, req)
    if 'errors' in response:
        return jsonify({'errors': response['errors']}), 400
    elif request.method == 'GET':
        return jsonify({resource: response['json']}), response['status']
    return jsonify(response['json']), response['status']
server.py 文件源码 项目:bowtie-demo 作者: jwkvam 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def requires_auth(f):
    @wraps(f)
    def decorated(*args, **kwargs):
        auth = request.authorization
        if not auth or not check_auth(auth.username, auth.password):
            return authenticate()
        return f(*args, **kwargs)
    return decorated


# import the user created module
views.py 文件源码 项目:flamyngo 作者: materialsvirtuallab 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def requires_auth(f):
    @wraps(f)
    def decorated(*args, **kwargs):
        auth = request.authorization
        api_key = request.headers.get("API_KEY") or request.args.get("API_KEY")
        if (API_KEY is not None) and api_key == API_KEY:
            return f(*args, **kwargs)
        if (AUTH_USER is not None) and (not auth or not check_auth(
                auth.username, auth.password)):
            return authenticate()
        return f(*args, **kwargs)
    return decorated
flask_httpauth.py 文件源码 项目:flasky 作者: RoseOu 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def username(self):
        if not request.authorization:
            return ""
        return request.authorization.username
server.py 文件源码 项目:coscup-line-bot 作者: ncuoolab 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def requires_auth(f):
    @wraps(f)
    def decorated(*args, **kwargs):
        auth = request.authorization
        if not auth or not check_auth(auth.username, auth.password):
            return authenticate()
        return f(*args, **kwargs)

    return decorated
auth.py 文件源码 项目:kuberdock-platform 作者: cloudlinux 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def token():
    if request.authorization is not None:
        username = request.authorization.get('username', None)
        passwd = request.authorization.get('password', None)
        if username is not None and passwd is not None:
            user = User.query.filter(User.username_iequal(username)).first()
            if user is None or user.deleted:
                pass
            elif not user.active:
                raise APIError("User '{0}' is blocked".format(username), 403)
            elif user.verify_password(passwd):
                return jsonify({'status': 'OK', 'token': user.get_token()})
            raise APIError('Username or password invalid', 401)
    raise APIError('You are not authorized to access the resource', 401)
apikeymgmt.py 文件源码 项目:assimilator 作者: videlanicolas 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def requires_auth(f):
    @wraps(f)
    def decorated(*args, **kwargs):
        logger.info("{0} {1} {2} {3}".format(request.remote_addr, request.method, request.url, str(request.args)))
        logger.debug("data: {0}".format(str(request.form)))
        auth = request.authorization
        logger.debug('Check_auth: ' + str(auth))
        if not auth or not check_auth(auth.username, auth.password):
            logger.warning("Unauthorized.")
            return {'error' : 'Unauthorized.'}, 401
        return f(*args, **kwargs)
    return decorated


问题


面经


文章

微信
公众号

扫码关注公众号