def CheckIsAdmin(self):
user_is_authorized = False
if users.is_current_user_admin():
user_is_authorized = True
if not user_is_authorized and config.CUSTOM_ENVIRONMENT_AUTHENTICATION:
if len(config.CUSTOM_ENVIRONMENT_AUTHENTICATION) == 2:
var, values = config.CUSTOM_ENVIRONMENT_AUTHENTICATION
if os.getenv(var) in values:
user_is_authorized = True
else:
logging.warning('remoteapi_CUSTOM_ENVIRONMENT_AUTHENTICATION is '
'configured incorrectly.')
if not user_is_authorized:
try:
user_is_authorized = (
oauth.is_current_user_admin(_scope=self.OAUTH_SCOPES))
except oauth.OAuthRequestError:
pass
if not user_is_authorized:
self.response.set_status(401)
self.response.out.write(
'You must be logged in as an administrator to access this.')
self.response.headers['Content-Type'] = 'text/plain'
return False
if 'X-appcfg-api-version' not in self.request.headers:
self.response.set_status(403)
self.response.out.write('This request did not contain a necessary header')
self.response.headers['Content-Type'] = 'text/plain'
return False
return True
python类is_current_user_admin()的实例源码
def __call__(self, environ, start_response):
if not environ.get('SERVER_SOFTWARE', '').startswith('Dev'):
if not users.is_current_user_admin():
if users.get_current_user() is None:
start_response('302 Found',
[('Location',
users.create_login_url(os.getenv('PATH_INFO', '')))])
return []
else:
start_response('403 Forbidden', [])
return ['Forbidden\n']
return self._application(environ, start_response)
def get(self):
if users.is_current_user_admin():
self.generate('interactive.html')
else:
logging.warning(
'Non admin user from IP %s attempted to use interactive console',
self.request.remote_addr)
self.error(404)
def post(self):
if users.is_current_user_admin():
if self.interactive_console_enabled():
save_stdout = sys.stdout
results_io = cStringIO.StringIO()
try:
sys.stdout = results_io
code = self.request.get('code')
code = code.replace('\r\n', '\n')
try:
compiled_code = compile(code, '<string>', 'exec')
exec(compiled_code, globals())
except Exception, e:
traceback.print_exc(file=results_io)
finally:
sys.stdout = save_stdout
results = results_io.getvalue()
else:
results = """The interactive console has been disabled for security
because the dev_appserver is listening on a non-default address.
If you would like to re-enable the console, invoke dev_appserver
with the --enable_console argument.
See https://developers.google.com/appengine/docs/python/tools/devserver#The_Interactive_Console
for more information."""
self.generate('interactive-output.html', {'output': results})
else:
logging.warning(
'Non admin user from IP %s attempted to use interactive console',
self.request.remote_addr)
self.error(404)
def admin_required(handler_method):
"""A decorator to require that a user be an admin for this application
to access a handler.
To use it, decorate your get() method like this::
@admin_required
def get(self):
user = users.get_current_user(self)
self.response.out.write('Hello, ' + user.nickname())
We will redirect to a login page if the user is not logged in. We always
redirect to the request URI, and Google Accounts only redirects back as
a GET request, so this should not be used for POSTs.
"""
def check_admin(self, *args, **kwargs):
if self.request.method != 'GET':
self.abort(400, detail='The admin_required decorator '
'can only be used for GET requests.')
user = users.get_current_user()
if not user:
return self.redirect(users.create_login_url(self.request.url))
elif not users.is_current_user_admin():
self.abort(403)
else:
handler_method(self, *args, **kwargs)
return check_admin
def get_current_user(self):
user = users.get_current_user()
if user: user.administrator = users.is_current_user_admin()
return user
def get_current_user(self):
user = users.get_current_user()
if user: user.administrator = users.is_current_user_admin()
return user
def is_server_admin(self):
return users.is_current_user_admin()
def get_current_user(self):
user = users.get_current_user()
if user: user.administrator = users.is_current_user_admin()
return user
def get_current_user(self):
user = users.get_current_user()
if user: user.administrator = users.is_current_user_admin()
return user
def auth_user(fn):
"""
Decorator to force user to be logged in with GAE
"""
@functools.wraps(fn)
def _wrapped(request, *args, **kwargs):
temp_request = request
bearer = request.META['HTTP_AUTHORIZATION']
url = "https://www.googleapis.com/userinfo/v2/me"
result = urlfetch.fetch(url=url,
method=urlfetch.GET,
headers={"Authorization" : bearer})
contents = json.loads(result.content)
gae_user = users.get_current_user()
is_admin = users.is_current_user_admin()
User = get_user_model()
django_user = None
try:
logging.debug("Getting django user")
django_user = User.objects.get(
email=contents['email'])
except User.DoesNotExist:
logging.info("User does not exist in Montage. Checking pending users")
try:
pending_user = PendingUser.objects.get(
email=contents['email'])
except PendingUser.DoesNotExist:
logging.info("No pending user record for this email")
user, created = get_user_model().objects.get_or_create(
email=email,
defaults={
'username': email.split('@')[0],
'is_active': True
}
)
return user
else:
logging.info("Pending user record found. Activating user.")
django_user = activate_pending_user(
pending_user, gae_user, is_admin)
except AttributeError:
return HttpResponseForbidden()
else:
logging.info("User found. Updating gaia_id and superuser status")
request = temp_request
# update_user(django_user, is_admin)
if django_user:
request.user = django_user
else:
return HttpResponseForbidden()
return fn(request, *args, **kwargs)
return _wrapped