python类Forbidden()的实例源码

test_security.py 文件源码 项目:repocribro 作者: MarekSuchanek 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_permission_admin(app, empty_db_session):
    with app.test_request_context('/'):
        @permissions.admin_role.require(403)
        def test():
            return 200

        with pytest.raises(Forbidden):
            assert test() == 200

        role_admin = Role('admin', '')
        account = UserAccount()
        account.id = 666
        account.roles.append(role_admin)
        empty_db_session.add(role_admin)
        empty_db_session.add(account)
        empty_db_session.commit()
        login(account)
GeneralController.py 文件源码 项目:cerberus-core 作者: ovh 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def toolbar(**kwargs):
    """ Get reports/tickets stats
    """
    user = kwargs['user']
    where = [Q()]

    if not AbusePermission.objects.filter(user=user.id).count():
        raise Forbidden('You are not allowed to see any category')

    user_specific_where = _get_user_specific_where(user)
    user_specific_where = reduce(operator.or_, user_specific_where)
    where.append(user_specific_where)

    # Aggregate all filters
    where = reduce(operator.and_, where)

    response = _get_toolbar_count(where, user)
    return response
test_contest_handler.py 文件源码 项目:territoriali-backend 作者: algorithm-ninja 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_sumbit_not_matching(self, g_f_s_mock, g_i_mock):
        Utils.start_contest()
        self._insert_data(token="token", task="poldo")
        self._insert_data(token="token2", task="poldo")
        backup = Logger.LOG_LEVEL
        Logger.LOG_LEVEL = 9001

        self.handler.generate_input(token='token', task='poldo', _ip='1.1.1.1')
        g_i_mock.return_value = ("inputid2", "/path")
        self.handler.generate_input(token='token2', task='poldo', _ip='1.1.1.1')

        Database.c.execute("INSERT INTO outputs (id, input, path, size, result) "
                           "VALUES ('outputid', 'inputid', '/output', 42, '{}')")
        Database.c.execute("INSERT INTO sources (id, input, path, size) "
                           "VALUES ('sourceid', 'inputid2', '/source', 42)")
        with self.assertRaises(Forbidden) as ex:
            self.handler.submit(output_id='outputid', source_id='sourceid', _ip='1.1.1.1')

        self.assertIn("The provided pair of source-output is invalid", ex.exception.response.data.decode())
        Logger.LOG_LEVEL = backup
validators.py 文件源码 项目:territoriali-backend 作者: algorithm-ninja 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def validate_id(param, name, getter, required=True):
        """
        Ensure that in the request "param" is present and it's valid. If it is present `getter` is called with the param
        and the return values is sent to the handler as "name". If the getter returns None an error is thrown
        It provides the handler with a `name` argument with the return value of getter.
        """
        def closure(handler):
            def handle(*args, **kwargs):
                if param in kwargs:
                    thing = getter(kwargs[param])
                    if thing is None:
                        BaseHandler.raise_exc(Forbidden, "FORBIDDEN", "No such " + name)
                    del kwargs[param]
                else:
                    thing = None
                kwargs[name] = thing
                return handler(*args, **kwargs)
            HandlerParams.initialize_handler_params(handle, handler)
            HandlerParams.add_handler_param(handle, param, str, required=required)
            # the case when the name of the model corresponds with the param
            if name != param:
                HandlerParams.remove_handler_param(handle, name)
            return handle
        return closure
validators.py 文件源码 项目:territoriali-backend 作者: algorithm-ninja 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _get_user_from_sso(jwt_token, token):
        try:
            data = jwt.decode(jwt_token, Config.jwt_secret, algorithms=['HS256'])
            username = data["username"]
            name = data.get("firstName", username)
            surname = data.get("lastName", "")
            if username != token:
                BaseHandler.raise_exc(Forbidden, "FORBIDDEN", "Use the same username from the SSO")
            if Database.get_user(username) is None:
                Database.begin()
                Database.add_user(username, name, surname, sso_user=True, autocommit=False)
                for task in Database.get_tasks():
                    Database.add_user_task(username, task["name"], autocommit=False)
                Database.commit()
                Logger.info("NEW_USER", "User %s created from SSO" % username)
            return Database.get_user(username)
        except jwt.exceptions.DecodeError:
            BaseHandler.raise_exc(Forbidden, "FORBIDDEN", "Please login at %s" % Config.sso_url)
contest_handler.py 文件源码 项目:territoriali-backend 作者: algorithm-ninja 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def generate_input(self, task, user):
        """
        POST /generate_input
        """
        token = user["token"]
        if Database.get_user_task(token, task["name"])["current_attempt"] is not None:
            self.raise_exc(Forbidden, "FORBIDDEN", "You already have a ready input!")

        attempt = Database.get_next_attempt(token, task["name"])
        id, path = ContestManager.get_input(task["name"], attempt)
        size = StorageManager.get_file_size(path)

        Database.begin()
        try:
            Database.add_input(id, token, task["name"], attempt, path, size, autocommit=False)
            Database.set_user_attempt(token, task["name"], attempt, autocommit=False)
            Database.commit()
        except:
            Database.rollback()
            raise
        Logger.info("CONTEST", "Generated input %s for user %s on task %s" % (id, token, task["name"]))
        return BaseHandler.format_dates(Database.get_input(id=id))
main.py 文件源码 项目:odoo-brasil 作者: Trust-Code 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _checkout_form_save(self, mode, checkout, all_values):
        Partner = request.env['res.partner']
        if mode[0] == 'new':
            partner_id = Partner.sudo().create(checkout)
        elif mode[0] == 'edit':
            partner_id = int(all_values.get('partner_id', 0))
            if partner_id:
                # double check
                order = request.website.sale_get_order()
                shippings = Partner.sudo().search(
                    [("id", "child_of",
                      order.partner_id.commercial_partner_id.ids)])
                if partner_id not in shippings.mapped('id') and \
                   partner_id != order.partner_id.id:
                    return Forbidden()

                Partner.browse(partner_id).sudo().write(checkout)
        return partner_id
test_auth.py 文件源码 项目:pillar 作者: armadillica 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_some_roles_required(self):
        from pillar.api.utils.authorization import require_login

        called = [False]

        @require_login(require_roles={'admin'})
        def call_me():
            called[0] = True

        with self.app.test_request_context():
            self.login_api_as(ObjectId(24 * 'a'), ['succubus'])
            self.assertRaises(Forbidden, call_me)
        self.assertFalse(called[0])

        with self.app.test_request_context():
            self.login_api_as(ObjectId(24 * 'a'), ['admin'])
            call_me()
        self.assertTrue(called[0])
test_auth.py 文件源码 项目:pillar 作者: armadillica 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_cap_required(self):
        from pillar.api.utils.authorization import require_login

        called = [False]

        @require_login(require_cap='subscriber')
        def call_me():
            called[0] = True

        with self.app.test_request_context():
            self.login_api_as(ObjectId(24 * 'a'), ['succubus'])
            self.assertRaises(Forbidden, call_me)
        self.assertFalse(called[0])

        with self.app.test_request_context():
            self.login_api_as(ObjectId(24 * 'a'), ['demo'])
            call_me()
        self.assertTrue(called[0])
comment.py 文件源码 项目:pillar 作者: armadillica 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def assert_is_valid_patch(node_id, patch):
    """Raises an exception when the patch isn't valid."""

    try:
        op = patch['op']
    except KeyError:
        raise wz_exceptions.BadRequest("PATCH should have a key 'op' indicating the operation.")

    if op not in VALID_COMMENT_OPERATIONS:
        raise wz_exceptions.BadRequest('Operation should be one of %s',
                                       ', '.join(VALID_COMMENT_OPERATIONS))

    if op not in COMMENT_VOTING_OPS:
        # We can't check here, we need the node owner for that.
        return

    # See whether the user is allowed to patch
    if authorization.user_matches_roles(current_app.config['ROLES_FOR_COMMENT_VOTING']):
        log.debug('User is allowed to upvote/downvote comment')
        return

    # Access denied.
    log.info('User %s wants to PATCH comment node %s, but is not allowed.',
             authentication.current_user_id(), node_id)
    raise wz_exceptions.Forbidden()
decorators.py 文件源码 项目:lxc-rest 作者: lxc-webpanel 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def user_has(ability, get_user=import_user):
    """
    Takes an ability (a string name of either a role or an ability) and returns the function if the user has that ability
    """
    def wrapper(func):
        @wraps(func)
        def inner(*args, **kwargs):
            desired_ability = Ability.query.filter_by(
                name=ability).first()
            user_abilities = []
            current_identity = get_user()
            for group in current_identity._groups:
                user_abilities += group.abilities
            if desired_ability.id in user_abilities or current_identity.admin:
                return func(*args, **kwargs)
            else:
                raise Forbidden("You do not have access")
        return inner
    return wrapper
app_util.py 文件源码 项目:raw-data-repository 作者: all-of-us 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def get_validated_user_info():
  """Returns a valid (user email, user info), or raises Unauthorized or Forbidden."""
  user_email = get_oauth_id()

  # Allow clients to simulate an unauthentiated request (for testing)
  # becaues we haven't found another way to create an unauthenticated request
  # when using dev_appserver. When client tests are checking to ensure that an
  # unauthenticated requests gets rejected, they helpfully add this header.
  # The `application_id` check ensures this feature only works in dev_appserver.
  if request.headers.get('unauthenticated') and app_identity.get_application_id() == 'None':
    user_email = None
  if user_email is None:
    raise Unauthorized('No OAuth user found.')

  user_info = lookup_user_info(user_email)
  if user_info:
    enforce_ip_whitelisted(request.remote_addr, get_whitelisted_ips(user_info))
    enforce_appid_whitelisted(request.headers.get('X-Appengine-Inbound-Appid'),
                              get_whitelisted_appids(user_info))
    logging.info('User %r ALLOWED', user_email)
    return (user_email, user_info)

  logging.info('User %r NOT ALLOWED' % user_email)
  raise Forbidden()
app_util.py 文件源码 项目:raw-data-repository 作者: all-of-us 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_validated_user_info():
  """Returns a valid (user email, user info), or raises Unauthorized or Forbidden."""
  user_email = get_oauth_id()

  # Allow clients to simulate an unauthentiated request (for testing)
  # becaues we haven't found another way to create an unauthenticated request
  # when using dev_appserver. When client tests are checking to ensure that an
  # unauthenticated requests gets rejected, they helpfully add this header.
  # The `application_id` check ensures this feature only works in dev_appserver.
  if request.headers.get('unauthenticated') and app_identity.get_application_id() == 'None':
    user_email = None
  if user_email is None:
    raise Unauthorized('No OAuth user found.')

  user_info = lookup_user_info(user_email)
  if user_info:
    enforce_ip_whitelisted(request.remote_addr, get_whitelisted_ips(user_info))
    enforce_appid_whitelisted(request.headers.get('X-Appengine-Inbound-Appid'),
                              get_whitelisted_appids(user_info))
    logging.info('User %r ALLOWED', user_email)
    return (user_email, user_info)

  logging.info('User %r NOT ALLOWED' % user_email)
  raise Forbidden()
participant_dao_test.py 文件源码 项目:raw-data-repository 作者: all-of-us 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_update_withdrawn_status_fails(self):
    p = Participant(withdrawalStatus=WithdrawalStatus.NO_USE)
    time = datetime.datetime(2016, 1, 1)
    with random_ids([1, 2]):
      with FakeClock(time):
        self.dao.insert(p)

    expected_participant = self._participant_with_defaults(
        participantId=1, version=1, biobankId=2, lastModified=time, signUpTime=time,
        withdrawalStatus=WithdrawalStatus.NO_USE)
    self.assertEquals(expected_participant.asdict(), p.asdict())

    p2 = self.dao.get(1)
    self.assertEquals(p.asdict(), p2.asdict())

    p.version = 1
    p.withdrawalStatus = WithdrawalStatus.NOT_WITHDRAWN
    with self.assertRaises(Forbidden):
      self.dao.update(p)
task_run.py 文件源码 项目:FRG-Crowdsourcing 作者: 97amarnathk 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _ensure_task_was_requested(self, task, guard):
        if not guard.check_task_stamped(task, get_user_id_or_ip()):
            raise Forbidden('You must request a task first!')
api_base.py 文件源码 项目:FRG-Crowdsourcing 作者: 97amarnathk 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def _create_json_response(self, query_result, oid):
        if len(query_result) == 1 and query_result[0] is None:
            raise abort(404)
        items = []
        for result in query_result:
            # This is for n_favs orderby case
            if not isinstance(result, DomainObject):
                result = result[0]
            try:
                if (result.__class__ != self.__class__):
                    (item, headline, rank) = result
                else:
                    item = result
                    headline = None
                    rank = None
                datum = self._create_dict_from_model(item)
                if headline:
                    datum['headline'] = headline
                if rank:
                    datum['rank'] = rank
                ensure_authorized_to('read', item)
                items.append(datum)
            except (Forbidden, Unauthorized):
                # Remove last added item, as it is 401 or 403
                if len(items) > 0:
                    items.pop()
            except Exception:  # pragma: no cover
                raise
        if oid is not None:
            ensure_authorized_to('read', query_result[0])
            items = items[0]
        return json.dumps(items)
project.py 文件源码 项目:FRG-Crowdsourcing 作者: 97amarnathk 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _forbidden_attributes(self, data):
        for key in data.keys():
            if key in self.reserved_keys:
                if key == 'published':
                    raise Forbidden('You cannot publish a project via the API')
                raise BadRequest("Reserved keys in payload")
test_helpingmaterial.py 文件源码 项目:FRG-Crowdsourcing 作者: 97amarnathk 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_non_owner_authenticated_user_create_given_helpingmaterial(self):
        """Test authenticated user cannot create a given helpingmaterial if is not the
        project owner"""

        project = ProjectFactory.create()
        helpingmaterial = HelpingMaterialFactory.build(project_id=project.id)

        assert self.mock_authenticated.id != project.owner_id
        assert_raises(Forbidden, ensure_authorized_to,
                      'create', helpingmaterial)
test_helpingmaterial.py 文件源码 项目:FRG-Crowdsourcing 作者: 97amarnathk 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_non_owner_authenticated_user_read_given_helpingmaterial_draft_project(self):
        """Test authenticated user cannot read a given helpingmaterial of a
        draft project if is not the project owner"""

        project = ProjectFactory.create(published=False)
        helpingmaterial = HelpingMaterialFactory.create(project_id=project.id)

        assert self.mock_authenticated.id != project.owner.id
        assert_raises(Forbidden, ensure_authorized_to, 'read', helpingmaterial)
test_helpingmaterial.py 文件源码 项目:FRG-Crowdsourcing 作者: 97amarnathk 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_non_owner_authenticated_user_read_helpingmaterials_for_given_draft_project(self):
        """Test authenticated user cannot read helpingmaterials of a given project if is
        a draft and is not the project owner"""

        project = ProjectFactory.create(published=False)

        assert self.mock_authenticated.id != project.owner.id
        assert_raises(Forbidden, ensure_authorized_to, 'read',
                      HelpingMaterial, project_id=project.id)
test_helpingmaterial.py 文件源码 项目:FRG-Crowdsourcing 作者: 97amarnathk 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_non_owner_authenticated_user_delete_helpingmaterial(self):
        """Test authenticated user cannot delete a helpingmaterial if is not the post
        owner and is not admin"""

        owner = UserFactory.create(id=5)
        project = ProjectFactory.create(owner=owner, published=True)
        helpingmaterial = HelpingMaterialFactory.create(project_id=project.id)

        assert self.mock_authenticated.id != owner.id
        assert not self.mock_authenticated.admin
        assert_raises(Forbidden, ensure_authorized_to, 'delete',
                      helpingmaterial)
test_auditlog_auth.py 文件源码 项目:FRG-Crowdsourcing 作者: 97amarnathk 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_authenticated_user_cannot_crud_auditlog(self):
        """Test authenticated users cannot crud auditlogs"""

        log = Auditlog()

        assert_raises(Forbidden, ensure_authorized_to, 'create', log)
        assert_raises(Forbidden, ensure_authorized_to, 'update', log)
        assert_raises(Forbidden, ensure_authorized_to, 'delete', log)
test_auditlog_auth.py 文件源码 项目:FRG-Crowdsourcing 作者: 97amarnathk 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_admin_user_cannot_crud_auditlog(self):
        """Test admin users cannot crud auditlogs"""

        log = Auditlog()

        assert_raises(Forbidden, ensure_authorized_to, 'create', log)
        assert_raises(Forbidden, ensure_authorized_to, 'update', log)
        assert_raises(Forbidden, ensure_authorized_to, 'delete', log)
test_taskrun_auth.py 文件源码 项目:FRG-Crowdsourcing 作者: 97amarnathk 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_anonymous_user_can_create_taskrun_for_draft_project(self):
        """Test anonymous users can create a taskrun for a project that
        is a draft"""
        project = ProjectFactory.create(published=False)
        task = TaskFactory.create(project=project)
        taskrun = AnonymousTaskRunFactory.build(task_id=task.id,
                                                project_id=project.id)

        assert_not_raises(Forbidden, ensure_authorized_to, 'create', taskrun)
test_taskrun_auth.py 文件源码 项目:FRG-Crowdsourcing 作者: 97amarnathk 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_authenticated_user_create_repeated_taskrun(self):
        """Test authenticated user cannot create two taskruns for the same task"""
        task = TaskFactory.create()
        taskrun1 = TaskRunFactory.create(task=task)
        taskrun2 = TaskRunFactory.build(task=task, user=taskrun1.user)

        assert self.mock_authenticated.id == taskrun1.user.id
        assert_raises(Forbidden, ensure_authorized_to, 'create', taskrun2)
test_taskrun_auth.py 文件源码 项目:FRG-Crowdsourcing 作者: 97amarnathk 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_authenticated_user_can_create_taskrun_for_draft_project(self):
        """Test authenticated users can create a taskrun for a project that
        is a draft"""
        project = ProjectFactory.create(published=False)
        task = TaskFactory.create(project=project)
        taskrun = TaskRunFactory.build(task_id=task.id, project_id=project.id)

        assert_not_raises(Forbidden, ensure_authorized_to, 'create', taskrun)
test_taskrun_auth.py 文件源码 项目:FRG-Crowdsourcing 作者: 97amarnathk 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_authenticated_user_update_anonymous_taskrun(self):
        """Test authenticated users cannot update an anonymously posted taskrun"""
        anonymous_taskrun = AnonymousTaskRunFactory.create()

        assert_raises(Forbidden,
                      ensure_authorized_to, 'update', anonymous_taskrun)
test_taskrun_auth.py 文件源码 项目:FRG-Crowdsourcing 作者: 97amarnathk 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_admin_update_anonymous_taskrun(self):
        """Test admins cannot update anonymously posted taskruns"""
        anonymous_taskrun = AnonymousTaskRunFactory.create()

        assert_raises(Forbidden,
                      ensure_authorized_to, 'update', anonymous_taskrun)
test_taskrun_auth.py 文件源码 项目:FRG-Crowdsourcing 作者: 97amarnathk 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_admin_update_user_taskrun(self):
        """Test admins cannot update taskruns posted by authenticated users"""
        user_taskrun = TaskRunFactory.create()

        assert self.mock_admin.id != user_taskrun.user.id
        assert_raises(Forbidden, ensure_authorized_to, 'update', user_taskrun)
test_taskrun_auth.py 文件源码 项目:FRG-Crowdsourcing 作者: 97amarnathk 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_authenticated_user_delete_anonymous_taskrun(self):
        """Test authenticated users cannot delete an anonymously posted taskrun"""
        anonymous_taskrun = AnonymousTaskRunFactory.create()

        assert_raises(Forbidden,
                      ensure_authorized_to, 'delete', anonymous_taskrun)


问题


面经


文章

微信
公众号

扫码关注公众号