def test_permission_admin(app, empty_db_session):
with app.test_request_context('/'):
@permissions.admin_role.require(403)
def test():
return 200
with pytest.raises(Forbidden):
assert test() == 200
role_admin = Role('admin', '')
account = UserAccount()
account.id = 666
account.roles.append(role_admin)
empty_db_session.add(role_admin)
empty_db_session.add(account)
empty_db_session.commit()
login(account)
python类Forbidden()的实例源码
def toolbar(**kwargs):
""" Get reports/tickets stats
"""
user = kwargs['user']
where = [Q()]
if not AbusePermission.objects.filter(user=user.id).count():
raise Forbidden('You are not allowed to see any category')
user_specific_where = _get_user_specific_where(user)
user_specific_where = reduce(operator.or_, user_specific_where)
where.append(user_specific_where)
# Aggregate all filters
where = reduce(operator.and_, where)
response = _get_toolbar_count(where, user)
return response
test_contest_handler.py 文件源码
项目:territoriali-backend
作者: algorithm-ninja
项目源码
文件源码
阅读 18
收藏 0
点赞 0
评论 0
def test_sumbit_not_matching(self, g_f_s_mock, g_i_mock):
Utils.start_contest()
self._insert_data(token="token", task="poldo")
self._insert_data(token="token2", task="poldo")
backup = Logger.LOG_LEVEL
Logger.LOG_LEVEL = 9001
self.handler.generate_input(token='token', task='poldo', _ip='1.1.1.1')
g_i_mock.return_value = ("inputid2", "/path")
self.handler.generate_input(token='token2', task='poldo', _ip='1.1.1.1')
Database.c.execute("INSERT INTO outputs (id, input, path, size, result) "
"VALUES ('outputid', 'inputid', '/output', 42, '{}')")
Database.c.execute("INSERT INTO sources (id, input, path, size) "
"VALUES ('sourceid', 'inputid2', '/source', 42)")
with self.assertRaises(Forbidden) as ex:
self.handler.submit(output_id='outputid', source_id='sourceid', _ip='1.1.1.1')
self.assertIn("The provided pair of source-output is invalid", ex.exception.response.data.decode())
Logger.LOG_LEVEL = backup
def validate_id(param, name, getter, required=True):
"""
Ensure that in the request "param" is present and it's valid. If it is present `getter` is called with the param
and the return values is sent to the handler as "name". If the getter returns None an error is thrown
It provides the handler with a `name` argument with the return value of getter.
"""
def closure(handler):
def handle(*args, **kwargs):
if param in kwargs:
thing = getter(kwargs[param])
if thing is None:
BaseHandler.raise_exc(Forbidden, "FORBIDDEN", "No such " + name)
del kwargs[param]
else:
thing = None
kwargs[name] = thing
return handler(*args, **kwargs)
HandlerParams.initialize_handler_params(handle, handler)
HandlerParams.add_handler_param(handle, param, str, required=required)
# the case when the name of the model corresponds with the param
if name != param:
HandlerParams.remove_handler_param(handle, name)
return handle
return closure
def _get_user_from_sso(jwt_token, token):
try:
data = jwt.decode(jwt_token, Config.jwt_secret, algorithms=['HS256'])
username = data["username"]
name = data.get("firstName", username)
surname = data.get("lastName", "")
if username != token:
BaseHandler.raise_exc(Forbidden, "FORBIDDEN", "Use the same username from the SSO")
if Database.get_user(username) is None:
Database.begin()
Database.add_user(username, name, surname, sso_user=True, autocommit=False)
for task in Database.get_tasks():
Database.add_user_task(username, task["name"], autocommit=False)
Database.commit()
Logger.info("NEW_USER", "User %s created from SSO" % username)
return Database.get_user(username)
except jwt.exceptions.DecodeError:
BaseHandler.raise_exc(Forbidden, "FORBIDDEN", "Please login at %s" % Config.sso_url)
def generate_input(self, task, user):
"""
POST /generate_input
"""
token = user["token"]
if Database.get_user_task(token, task["name"])["current_attempt"] is not None:
self.raise_exc(Forbidden, "FORBIDDEN", "You already have a ready input!")
attempt = Database.get_next_attempt(token, task["name"])
id, path = ContestManager.get_input(task["name"], attempt)
size = StorageManager.get_file_size(path)
Database.begin()
try:
Database.add_input(id, token, task["name"], attempt, path, size, autocommit=False)
Database.set_user_attempt(token, task["name"], attempt, autocommit=False)
Database.commit()
except:
Database.rollback()
raise
Logger.info("CONTEST", "Generated input %s for user %s on task %s" % (id, token, task["name"]))
return BaseHandler.format_dates(Database.get_input(id=id))
def _checkout_form_save(self, mode, checkout, all_values):
Partner = request.env['res.partner']
if mode[0] == 'new':
partner_id = Partner.sudo().create(checkout)
elif mode[0] == 'edit':
partner_id = int(all_values.get('partner_id', 0))
if partner_id:
# double check
order = request.website.sale_get_order()
shippings = Partner.sudo().search(
[("id", "child_of",
order.partner_id.commercial_partner_id.ids)])
if partner_id not in shippings.mapped('id') and \
partner_id != order.partner_id.id:
return Forbidden()
Partner.browse(partner_id).sudo().write(checkout)
return partner_id
def test_some_roles_required(self):
from pillar.api.utils.authorization import require_login
called = [False]
@require_login(require_roles={'admin'})
def call_me():
called[0] = True
with self.app.test_request_context():
self.login_api_as(ObjectId(24 * 'a'), ['succubus'])
self.assertRaises(Forbidden, call_me)
self.assertFalse(called[0])
with self.app.test_request_context():
self.login_api_as(ObjectId(24 * 'a'), ['admin'])
call_me()
self.assertTrue(called[0])
def test_cap_required(self):
from pillar.api.utils.authorization import require_login
called = [False]
@require_login(require_cap='subscriber')
def call_me():
called[0] = True
with self.app.test_request_context():
self.login_api_as(ObjectId(24 * 'a'), ['succubus'])
self.assertRaises(Forbidden, call_me)
self.assertFalse(called[0])
with self.app.test_request_context():
self.login_api_as(ObjectId(24 * 'a'), ['demo'])
call_me()
self.assertTrue(called[0])
def assert_is_valid_patch(node_id, patch):
"""Raises an exception when the patch isn't valid."""
try:
op = patch['op']
except KeyError:
raise wz_exceptions.BadRequest("PATCH should have a key 'op' indicating the operation.")
if op not in VALID_COMMENT_OPERATIONS:
raise wz_exceptions.BadRequest('Operation should be one of %s',
', '.join(VALID_COMMENT_OPERATIONS))
if op not in COMMENT_VOTING_OPS:
# We can't check here, we need the node owner for that.
return
# See whether the user is allowed to patch
if authorization.user_matches_roles(current_app.config['ROLES_FOR_COMMENT_VOTING']):
log.debug('User is allowed to upvote/downvote comment')
return
# Access denied.
log.info('User %s wants to PATCH comment node %s, but is not allowed.',
authentication.current_user_id(), node_id)
raise wz_exceptions.Forbidden()
def user_has(ability, get_user=import_user):
"""
Takes an ability (a string name of either a role or an ability) and returns the function if the user has that ability
"""
def wrapper(func):
@wraps(func)
def inner(*args, **kwargs):
desired_ability = Ability.query.filter_by(
name=ability).first()
user_abilities = []
current_identity = get_user()
for group in current_identity._groups:
user_abilities += group.abilities
if desired_ability.id in user_abilities or current_identity.admin:
return func(*args, **kwargs)
else:
raise Forbidden("You do not have access")
return inner
return wrapper
def get_validated_user_info():
"""Returns a valid (user email, user info), or raises Unauthorized or Forbidden."""
user_email = get_oauth_id()
# Allow clients to simulate an unauthentiated request (for testing)
# becaues we haven't found another way to create an unauthenticated request
# when using dev_appserver. When client tests are checking to ensure that an
# unauthenticated requests gets rejected, they helpfully add this header.
# The `application_id` check ensures this feature only works in dev_appserver.
if request.headers.get('unauthenticated') and app_identity.get_application_id() == 'None':
user_email = None
if user_email is None:
raise Unauthorized('No OAuth user found.')
user_info = lookup_user_info(user_email)
if user_info:
enforce_ip_whitelisted(request.remote_addr, get_whitelisted_ips(user_info))
enforce_appid_whitelisted(request.headers.get('X-Appengine-Inbound-Appid'),
get_whitelisted_appids(user_info))
logging.info('User %r ALLOWED', user_email)
return (user_email, user_info)
logging.info('User %r NOT ALLOWED' % user_email)
raise Forbidden()
def get_validated_user_info():
"""Returns a valid (user email, user info), or raises Unauthorized or Forbidden."""
user_email = get_oauth_id()
# Allow clients to simulate an unauthentiated request (for testing)
# becaues we haven't found another way to create an unauthenticated request
# when using dev_appserver. When client tests are checking to ensure that an
# unauthenticated requests gets rejected, they helpfully add this header.
# The `application_id` check ensures this feature only works in dev_appserver.
if request.headers.get('unauthenticated') and app_identity.get_application_id() == 'None':
user_email = None
if user_email is None:
raise Unauthorized('No OAuth user found.')
user_info = lookup_user_info(user_email)
if user_info:
enforce_ip_whitelisted(request.remote_addr, get_whitelisted_ips(user_info))
enforce_appid_whitelisted(request.headers.get('X-Appengine-Inbound-Appid'),
get_whitelisted_appids(user_info))
logging.info('User %r ALLOWED', user_email)
return (user_email, user_info)
logging.info('User %r NOT ALLOWED' % user_email)
raise Forbidden()
def test_update_withdrawn_status_fails(self):
p = Participant(withdrawalStatus=WithdrawalStatus.NO_USE)
time = datetime.datetime(2016, 1, 1)
with random_ids([1, 2]):
with FakeClock(time):
self.dao.insert(p)
expected_participant = self._participant_with_defaults(
participantId=1, version=1, biobankId=2, lastModified=time, signUpTime=time,
withdrawalStatus=WithdrawalStatus.NO_USE)
self.assertEquals(expected_participant.asdict(), p.asdict())
p2 = self.dao.get(1)
self.assertEquals(p.asdict(), p2.asdict())
p.version = 1
p.withdrawalStatus = WithdrawalStatus.NOT_WITHDRAWN
with self.assertRaises(Forbidden):
self.dao.update(p)
def _ensure_task_was_requested(self, task, guard):
if not guard.check_task_stamped(task, get_user_id_or_ip()):
raise Forbidden('You must request a task first!')
def _create_json_response(self, query_result, oid):
if len(query_result) == 1 and query_result[0] is None:
raise abort(404)
items = []
for result in query_result:
# This is for n_favs orderby case
if not isinstance(result, DomainObject):
result = result[0]
try:
if (result.__class__ != self.__class__):
(item, headline, rank) = result
else:
item = result
headline = None
rank = None
datum = self._create_dict_from_model(item)
if headline:
datum['headline'] = headline
if rank:
datum['rank'] = rank
ensure_authorized_to('read', item)
items.append(datum)
except (Forbidden, Unauthorized):
# Remove last added item, as it is 401 or 403
if len(items) > 0:
items.pop()
except Exception: # pragma: no cover
raise
if oid is not None:
ensure_authorized_to('read', query_result[0])
items = items[0]
return json.dumps(items)
def _forbidden_attributes(self, data):
for key in data.keys():
if key in self.reserved_keys:
if key == 'published':
raise Forbidden('You cannot publish a project via the API')
raise BadRequest("Reserved keys in payload")
def test_non_owner_authenticated_user_create_given_helpingmaterial(self):
"""Test authenticated user cannot create a given helpingmaterial if is not the
project owner"""
project = ProjectFactory.create()
helpingmaterial = HelpingMaterialFactory.build(project_id=project.id)
assert self.mock_authenticated.id != project.owner_id
assert_raises(Forbidden, ensure_authorized_to,
'create', helpingmaterial)
def test_non_owner_authenticated_user_read_given_helpingmaterial_draft_project(self):
"""Test authenticated user cannot read a given helpingmaterial of a
draft project if is not the project owner"""
project = ProjectFactory.create(published=False)
helpingmaterial = HelpingMaterialFactory.create(project_id=project.id)
assert self.mock_authenticated.id != project.owner.id
assert_raises(Forbidden, ensure_authorized_to, 'read', helpingmaterial)
def test_non_owner_authenticated_user_read_helpingmaterials_for_given_draft_project(self):
"""Test authenticated user cannot read helpingmaterials of a given project if is
a draft and is not the project owner"""
project = ProjectFactory.create(published=False)
assert self.mock_authenticated.id != project.owner.id
assert_raises(Forbidden, ensure_authorized_to, 'read',
HelpingMaterial, project_id=project.id)
def test_non_owner_authenticated_user_delete_helpingmaterial(self):
"""Test authenticated user cannot delete a helpingmaterial if is not the post
owner and is not admin"""
owner = UserFactory.create(id=5)
project = ProjectFactory.create(owner=owner, published=True)
helpingmaterial = HelpingMaterialFactory.create(project_id=project.id)
assert self.mock_authenticated.id != owner.id
assert not self.mock_authenticated.admin
assert_raises(Forbidden, ensure_authorized_to, 'delete',
helpingmaterial)
def test_authenticated_user_cannot_crud_auditlog(self):
"""Test authenticated users cannot crud auditlogs"""
log = Auditlog()
assert_raises(Forbidden, ensure_authorized_to, 'create', log)
assert_raises(Forbidden, ensure_authorized_to, 'update', log)
assert_raises(Forbidden, ensure_authorized_to, 'delete', log)
def test_admin_user_cannot_crud_auditlog(self):
"""Test admin users cannot crud auditlogs"""
log = Auditlog()
assert_raises(Forbidden, ensure_authorized_to, 'create', log)
assert_raises(Forbidden, ensure_authorized_to, 'update', log)
assert_raises(Forbidden, ensure_authorized_to, 'delete', log)
def test_anonymous_user_can_create_taskrun_for_draft_project(self):
"""Test anonymous users can create a taskrun for a project that
is a draft"""
project = ProjectFactory.create(published=False)
task = TaskFactory.create(project=project)
taskrun = AnonymousTaskRunFactory.build(task_id=task.id,
project_id=project.id)
assert_not_raises(Forbidden, ensure_authorized_to, 'create', taskrun)
def test_authenticated_user_create_repeated_taskrun(self):
"""Test authenticated user cannot create two taskruns for the same task"""
task = TaskFactory.create()
taskrun1 = TaskRunFactory.create(task=task)
taskrun2 = TaskRunFactory.build(task=task, user=taskrun1.user)
assert self.mock_authenticated.id == taskrun1.user.id
assert_raises(Forbidden, ensure_authorized_to, 'create', taskrun2)
def test_authenticated_user_can_create_taskrun_for_draft_project(self):
"""Test authenticated users can create a taskrun for a project that
is a draft"""
project = ProjectFactory.create(published=False)
task = TaskFactory.create(project=project)
taskrun = TaskRunFactory.build(task_id=task.id, project_id=project.id)
assert_not_raises(Forbidden, ensure_authorized_to, 'create', taskrun)
def test_authenticated_user_update_anonymous_taskrun(self):
"""Test authenticated users cannot update an anonymously posted taskrun"""
anonymous_taskrun = AnonymousTaskRunFactory.create()
assert_raises(Forbidden,
ensure_authorized_to, 'update', anonymous_taskrun)
def test_admin_update_anonymous_taskrun(self):
"""Test admins cannot update anonymously posted taskruns"""
anonymous_taskrun = AnonymousTaskRunFactory.create()
assert_raises(Forbidden,
ensure_authorized_to, 'update', anonymous_taskrun)
def test_admin_update_user_taskrun(self):
"""Test admins cannot update taskruns posted by authenticated users"""
user_taskrun = TaskRunFactory.create()
assert self.mock_admin.id != user_taskrun.user.id
assert_raises(Forbidden, ensure_authorized_to, 'update', user_taskrun)
def test_authenticated_user_delete_anonymous_taskrun(self):
"""Test authenticated users cannot delete an anonymously posted taskrun"""
anonymous_taskrun = AnonymousTaskRunFactory.create()
assert_raises(Forbidden,
ensure_authorized_to, 'delete', anonymous_taskrun)