def test_signature(self):
# Check signature of token by corrupting the signature
with self.assertRaises(Unauthorized) as context:
t = template.copy()
t['signature'] = t['signature'][:84] + '5' + t['signature'][85:] # Just modify one random letter.
validate_gamecenter_token(t)
self.assertIn("Can't verify signature:", context.exception.description)
self.assertIn("'padding check failed'", context.exception.description)
# Check signature of token by modifying the payload
with self.assertRaises(Unauthorized) as context:
t = template.copy()
t['player_id'] = 'G:5637867917'
validate_gamecenter_token(t)
self.assertIn("Can't verify signature:", context.exception.description)
self.assertIn("'bad signature'", context.exception.description)
# For requests library mock
python类Unauthorized()的实例源码
def auth():
"""
Check user/password and returns token if valid
"""
if settings.API.get('forwarded_host'):
try:
if not request.environ['HTTP_X_FORWARDED_HOST'] == settings.API['forwarded_host']:
raise BadRequest('Invalid HTTP_X_FORWARDED_HOST')
except KeyError:
raise BadRequest('Missing HTTP_X_FORWARDED_HOST')
body = request.get_json()
authenticated, ret = GeneralController.auth(body)
if authenticated:
return ret
else:
raise Unauthorized(ret)
def callback():
resp = oauth_provider.authorized_response()
if resp is None or isinstance(resp, OAuthException):
log.warning("Failed OAuth: %r", resp)
return Unauthorized("Authentication has failed.")
session['oauth'] = resp
if 'googleapis.com' in oauth_provider.base_url:
me = oauth_provider.get('userinfo')
session['user'] = me.data.get('email')
elif 'investigativedashboard.org' in oauth_provider.base_url:
me = oauth_provider.get('api/2/accounts/profile/')
session['user'] = me.data.get('email')
else:
return Unauthorized('Unknown OAuth provider: %r' %
oauth_provider.base_url)
log.info("Logged in: %s", session['user'])
return redirect(session.pop('next_url', '/'))
def _validate_http_auth(self):
"""Verify http auth request with values in "Authorization" header
"""
if not self.key or not self.secret:
return True
try:
auth_type, encoded_auth_str = \
request.headers['Authorization'].split(' ', 1)
if auth_type == 'Basic':
decoded_auth_str = base64.decodestring(encoded_auth_str)
auth_id, auth_token = decoded_auth_str.split(':', 1)
if auth_id == self.key and auth_token == self.secret:
return True
except (KeyError, ValueError, TypeError):
pass
raise Unauthorized("HTTP Auth Failed")
def get_validated_user_info():
"""Returns a valid (user email, user info), or raises Unauthorized or Forbidden."""
user_email = get_oauth_id()
# Allow clients to simulate an unauthentiated request (for testing)
# becaues we haven't found another way to create an unauthenticated request
# when using dev_appserver. When client tests are checking to ensure that an
# unauthenticated requests gets rejected, they helpfully add this header.
# The `application_id` check ensures this feature only works in dev_appserver.
if request.headers.get('unauthenticated') and app_identity.get_application_id() == 'None':
user_email = None
if user_email is None:
raise Unauthorized('No OAuth user found.')
user_info = lookup_user_info(user_email)
if user_info:
enforce_ip_whitelisted(request.remote_addr, get_whitelisted_ips(user_info))
enforce_appid_whitelisted(request.headers.get('X-Appengine-Inbound-Appid'),
get_whitelisted_appids(user_info))
logging.info('User %r ALLOWED', user_email)
return (user_email, user_info)
logging.info('User %r NOT ALLOWED' % user_email)
raise Forbidden()
def auth_required(role_whitelist):
"""A decorator that keeps the function from being called without auth.
role_whitelist can be a string or list of strings specifying one or
more roles that are allowed to call the function. """
assert role_whitelist, "Can't call `auth_required` with empty role_whitelist."
if type(role_whitelist) != list:
role_whitelist = [role_whitelist]
def auth_required_wrapper(func):
def wrapped(*args, **kwargs):
appid = app_identity.get_application_id()
# Only enforce HTTPS and auth for external requests; requests made for data generation
# are allowed through (when enabled).
if not _is_self_request():
if request.scheme.lower() != 'https' and appid not in ('None', 'testbed-test', 'testapp'):
raise Unauthorized('HTTPS is required for %r' % appid)
check_auth(role_whitelist)
return func(*args, **kwargs)
return wrapped
return auth_required_wrapper
def get_validated_user_info():
"""Returns a valid (user email, user info), or raises Unauthorized or Forbidden."""
user_email = get_oauth_id()
# Allow clients to simulate an unauthentiated request (for testing)
# becaues we haven't found another way to create an unauthenticated request
# when using dev_appserver. When client tests are checking to ensure that an
# unauthenticated requests gets rejected, they helpfully add this header.
# The `application_id` check ensures this feature only works in dev_appserver.
if request.headers.get('unauthenticated') and app_identity.get_application_id() == 'None':
user_email = None
if user_email is None:
raise Unauthorized('No OAuth user found.')
user_info = lookup_user_info(user_email)
if user_info:
enforce_ip_whitelisted(request.remote_addr, get_whitelisted_ips(user_info))
enforce_appid_whitelisted(request.headers.get('X-Appengine-Inbound-Appid'),
get_whitelisted_appids(user_info))
logging.info('User %r ALLOWED', user_email)
return (user_email, user_info)
logging.info('User %r NOT ALLOWED' % user_email)
raise Forbidden()
def _create_json_response(self, query_result, oid):
if len(query_result) == 1 and query_result[0] is None:
raise abort(404)
items = []
for result in query_result:
# This is for n_favs orderby case
if not isinstance(result, DomainObject):
result = result[0]
try:
if (result.__class__ != self.__class__):
(item, headline, rank) = result
else:
item = result
headline = None
rank = None
datum = self._create_dict_from_model(item)
if headline:
datum['headline'] = headline
if rank:
datum['rank'] = rank
ensure_authorized_to('read', item)
items.append(datum)
except (Forbidden, Unauthorized):
# Remove last added item, as it is 401 or 403
if len(items) > 0:
items.pop()
except Exception: # pragma: no cover
raise
if oid is not None:
ensure_authorized_to('read', query_result[0])
items = items[0]
return json.dumps(items)
def test_anonymous_user_create_given_helpingmaterial(self):
"""Test anonymous users cannot create helping materials for a project"""
project = ProjectFactory.create()
helping = HelpingMaterialFactory.build(project_id=project.id)
assert_raises(Unauthorized, ensure_authorized_to, 'create', helping)
def test_anonymous_user_create_helpingmaterials_for_given_project(self):
"""Test anonymous users cannot create helpingmaterials for a given project"""
project = ProjectFactory.create()
assert_raises(Unauthorized, ensure_authorized_to, 'create', HelpingMaterial, project_id=project.id)
def test_anonymous_user_create_helpingmaterials(self):
"""Test anonymous users cannot create any helpingmaterials"""
assert_raises(Unauthorized, ensure_authorized_to, 'create', HelpingMaterial)
def test_anonymous_user_read_given_helpingmaterial_draft_project(self):
"""Test anonymous users cannot read a given helpingmaterial of
a draft project"""
project = ProjectFactory.create(published=False)
helpingmaterial = HelpingMaterialFactory.create(project_id=project.id)
assert_raises(Unauthorized, ensure_authorized_to,
'read', helpingmaterial)
def test_anonymous_user_update_helpingmaterial(self):
"""Test anonymous users cannot update helpingmaterials"""
project = ProjectFactory.create(published=True)
helpingmaterial = HelpingMaterialFactory.create(project_id=project.id)
assert_raises(Unauthorized, ensure_authorized_to, 'update',
helpingmaterial)
def test_anonymous_user_delete_helpingmaterial(self):
"""Test anonymous users cannot delete helpingmaterials"""
project = ProjectFactory.create(published=True)
helpingmaterial = HelpingMaterialFactory.create(project_id=project.id)
assert_raises(Unauthorized, ensure_authorized_to, 'delete',
helpingmaterial)
def test_anonymous_user_cannot_read_auditlog(self):
"""Test anonymous users cannot read an auditlog"""
log = AuditlogFactory.create()
assert_raises(Unauthorized, ensure_authorized_to, 'read', log)
def test_anonymous_user_cannot_read_project_auditlogs(self):
"""Test anonymous users cannot read auditlogs of a specific project"""
project = ProjectFactory.create()
assert_raises(Unauthorized, ensure_authorized_to, 'read', Auditlog, project_id=project.id)
def test_anonymous_user_cannot_crud_auditlog(self):
"""Test anonymous users cannot crud auditlogs"""
log = Auditlog()
assert_raises(Unauthorized, ensure_authorized_to, 'create', log)
assert_raises(Unauthorized, ensure_authorized_to, 'update', log)
assert_raises(Unauthorized, ensure_authorized_to, 'delete', log)
def test_anonymous_user_update_anoymous_taskrun(self):
"""Test anonymous users cannot update an anonymously posted taskrun"""
anonymous_taskrun = AnonymousTaskRunFactory.create()
assert_raises(Unauthorized,
ensure_authorized_to, 'update', anonymous_taskrun)
def test_anonymous_user_update_user_taskrun(self):
"""Test anonymous user cannot update taskruns posted by authenticated users"""
user_taskrun = TaskRunFactory.create()
assert_raises(Unauthorized,
ensure_authorized_to, 'update', user_taskrun)
def test_anonymous_user_delete_anonymous_taskrun(self):
"""Test anonymous users cannot delete an anonymously posted taskrun"""
anonymous_taskrun = AnonymousTaskRunFactory.create()
assert_raises(Unauthorized,
ensure_authorized_to, 'delete', anonymous_taskrun)
def test_anonymous_user_delete_user_taskrun(self):
"""Test anonymous user cannot delete taskruns posted by authenticated users"""
user_taskrun = TaskRunFactory.create()
assert_raises(Unauthorized,
ensure_authorized_to, 'delete', user_taskrun)
def test_anonymous_user_cannot_save_results(self):
"""Test anonymous users cannot save results of a specific project"""
result = Result()
assert_raises(Unauthorized, ensure_authorized_to, 'create', result)
def test_anonymous_user_cannot_update_results(self):
"""Test anonymous users cannot update results of a specific project"""
result = self.create_result()
assert_raises(Unauthorized, ensure_authorized_to, 'update', result)
def test_anonymous_user_cannot_read_webhook(self):
"""Test anonymous users cannot read a webhook"""
project = ProjectFactory.create()
webhook = WebhookFactory.create(project_id=project.id)
assert_raises(Unauthorized, ensure_authorized_to, 'read', webhook)
def test_anonymous_user_cannot_read_project_webhooks(self):
"""Test anonymous users cannot read webhooks of a specific project"""
project = ProjectFactory.create()
assert_raises(Unauthorized, ensure_authorized_to, 'read', Webhook, project_id=project.id)
def test_anonymous_user_cannot_crud_webhook(self):
"""Test anonymous users cannot crud webhooks"""
webhook = Webhook()
assert_raises(Unauthorized, ensure_authorized_to, 'create', webhook)
assert_raises(Unauthorized, ensure_authorized_to, 'update', webhook)
assert_raises(Unauthorized, ensure_authorized_to, 'delete', webhook)
def test_anonymous_user_cannot_crud(self):
"""Test anonymous users cannot crud categories"""
category = CategoryFactory.build()
assert_raises(Unauthorized, ensure_authorized_to, 'create', category)
assert_not_raises(Exception, ensure_authorized_to, 'read', category)
assert_not_raises(Exception, ensure_authorized_to, 'read', Category)
assert_raises(Unauthorized, ensure_authorized_to, 'update', category)
assert_raises(Unauthorized, ensure_authorized_to, 'delete', category)
def test_anonymous_user_cannot_update_given_user(self):
"""Test anonymous users cannot update a given user"""
user = UserFactory.create()
assert_raises(Unauthorized, ensure_authorized_to, 'update', user)
def test_anonymous_user_cannot_delete_given_user(self):
"""Test anonymous users cannot delete a given user"""
user = UserFactory.create()
assert_raises(Unauthorized, ensure_authorized_to, 'delete', user)
def test_anonymous_user_cannot_crud(self):
"""Test anonymous users cannot crud tasks"""
user = UserFactory.create()
project = ProjectFactory.create(owner=user)
task = TaskFactory.create(project=project)
assert_raises(Unauthorized, ensure_authorized_to, 'create', task)
assert_not_raises(Forbidden, ensure_authorized_to, 'read', task)
assert_not_raises(Forbidden, ensure_authorized_to, 'read', Task)
assert_raises(Unauthorized, ensure_authorized_to, 'update', task)
assert_raises(Unauthorized, ensure_authorized_to, 'delete', task)