def check_credentials(request, other_application='admin',
expiration=60 * 60, gae_login=True):
"""Checks that user is authorized to access other_application"""
if request.env.web2py_runtime_gae:
from google.appengine.api import users
if users.is_current_user_admin():
return True
elif gae_login:
login_html = '<a href="%s">Sign in with your google account</a>.' \
% users.create_login_url(request.env.path_info)
raise HTTP(200, '<html><body>%s</body></html>' % login_html)
else:
return False
else:
t0 = time.time()
dt = t0 - expiration
s = get_session(request, other_application)
r = (s.authorized and s.last_time and s.last_time > dt)
if r:
s.last_time = t0
set_session(request, s, other_application)
return r
python类is_current_user_admin()的实例源码
def retrieve_user_from_gae(gae_user):
auth_id = 'federated_%s' % gae_user.user_id()
user_db = model.User.get_by('auth_ids', auth_id)
if user_db:
if not user_db.admin and users.is_current_user_admin():
user_db.admin = True
user_db.put()
return user_db
return auth.create_user_db(
auth_id=auth_id,
name=util.create_name_from_email(gae_user.email()),
username=gae_user.email(),
email=gae_user.email(),
verified=True,
admin=users.is_current_user_admin(),
)
def main():
"""CGI-style request handler to dump the configuration.
Put this in your app.yaml to enable (you can pick any URL):
- url: /lib_config
script: $PYTHON_LIB/google/appengine/api/lib_config.py
Note: unless you are using the SDK, you must be admin.
"""
if not os.getenv('SERVER_SOFTWARE', '').startswith('Dev'):
from google.appengine.api import users
if not users.is_current_user_admin():
if users.get_current_user() is None:
print 'Status: 302'
print 'Location:', users.create_login_url(os.getenv('PATH_INFO', ''))
else:
print 'Status: 403'
print
print 'Forbidden'
return
print 'Content-type: text/plain'
print
_default_registry._dump()
def main():
"""CGI-style request handler to dump the configuration.
Put this in your app.yaml to enable (you can pick any URL):
- url: /lib_config
script: $PYTHON_LIB/google/appengine/api/lib_config.py
Note: unless you are using the SDK, you must be admin.
"""
if not os.getenv('SERVER_SOFTWARE', '').startswith('Dev'):
from google.appengine.api import users
if not users.is_current_user_admin():
if users.get_current_user() is None:
print 'Status: 302'
print 'Location:', users.create_login_url(os.getenv('PATH_INFO', ''))
else:
print 'Status: 403'
print
print 'Forbidden'
return
print 'Content-type: text/plain'
print
_default_registry._dump()
def retrieve_user_from_gae(gae_user):
auth_id = 'federated_%s' % gae_user.user_id()
user_db = model.User.get_by('auth_ids', auth_id)
if user_db:
if not user_db.admin and users.is_current_user_admin():
user_db.admin = True
user_db.put()
return user_db
return auth.create_user_db(
auth_id=auth_id,
name=util.create_name_from_email(gae_user.email()),
username=gae_user.email(),
email=gae_user.email(),
verified=True,
admin=users.is_current_user_admin(),
)
def check_credentials(request, other_application='admin',
expiration=60 * 60, gae_login=True):
"""Checks that user is authorized to access other_application"""
if request.env.web2py_runtime_gae:
from google.appengine.api import users
if users.is_current_user_admin():
return True
elif gae_login:
login_html = '<a href="%s">Sign in with your google account</a>.' \
% users.create_login_url(request.env.path_info)
raise HTTP(200, '<html><body>%s</body></html>' % login_html)
else:
return False
else:
t0 = time.time()
dt = t0 - expiration
s = get_session(request, other_application)
r = (s.authorized and s.last_time and s.last_time > dt)
if r:
s.last_time = t0
set_session(request, s, other_application)
return r
def retrieve_user_from_gae(gae_user):
auth_id = 'federated_%s' % gae_user.user_id()
user_db = model.User.get_by('auth_ids', auth_id)
if user_db:
if not user_db.admin and users.is_current_user_admin():
user_db.admin = True
user_db.put()
return user_db
return auth.create_user_db(
auth_id=auth_id,
name=util.create_name_from_email(gae_user.email()),
username=gae_user.email(),
email=gae_user.email(),
verified=True,
admin=users.is_current_user_admin(),
)
def check_credentials(request, other_application='admin',
expiration=60 * 60, gae_login=True):
"""Checks that user is authorized to access other_application"""
if request.env.web2py_runtime_gae:
from google.appengine.api import users
if users.is_current_user_admin():
return True
elif gae_login:
login_html = '<a href="%s">Sign in with your google account</a>.' \
% users.create_login_url(request.env.path_info)
raise HTTP(200, '<html><body>%s</body></html>' % login_html)
else:
return False
else:
t0 = time.time()
dt = t0 - expiration
s = get_session(request, other_application)
r = (s.authorized and s.last_time and s.last_time > dt)
if r:
s.last_time = t0
set_session(request, s, other_application)
return r
def get_djangouser_for_user(cls, user):
django_user = cls.all().filter('user =', user).get()
if django_user:
if getattr(settings, 'AUTH_ADMIN_USER_AS_SUPERUSER', True):
is_admin = users.is_current_user_admin()
if django_user.is_staff != is_admin or \
django_user.is_superuser != is_admin:
django_user.is_superuser = django_user.is_staff = is_admin
django_user.put()
else:
django_user = cls.create_djangouser_for_user(user)
django_user.is_active = True
if getattr(settings, 'AUTH_ADMIN_USER_AS_SUPERUSER', True) and \
users.is_current_user_admin():
django_user.is_staff = True
django_user.is_superuser = True
django_user.put()
return django_user
def check_credentials(request, other_application='admin',
expiration=60 * 60, gae_login=True):
"""Checks that user is authorized to access other_application"""
if request.env.web2py_runtime_gae:
from google.appengine.api import users
if users.is_current_user_admin():
return True
elif gae_login:
login_html = '<a href="%s">Sign in with your google account</a>.' \
% users.create_login_url(request.env.path_info)
raise HTTP(200, '<html><body>%s</body></html>' % login_html)
else:
return False
else:
t0 = time.time()
dt = t0 - expiration
s = get_session(request, other_application)
r = (s.authorized and s.last_time and s.last_time > dt)
if r:
s.last_time = t0
set_session(request, s, other_application)
return r
def main():
"""CGI-style request handler to dump the configuration.
Put this in your app.yaml to enable (you can pick any URL):
- url: /lib_config
script: $PYTHON_LIB/google/appengine/api/lib_config.py
Note: unless you are using the SDK, you must be admin.
"""
if not os.getenv('SERVER_SOFTWARE', '').startswith('Dev'):
from google.appengine.api import users
if not users.is_current_user_admin():
if users.get_current_user() is None:
print 'Status: 302'
print 'Location:', users.create_login_url(os.getenv('PATH_INFO', ''))
else:
print 'Status: 403'
print
print 'Forbidden'
return
print 'Content-type: text/plain'
print
_default_registry._dump()
def check_credentials(request, other_application='admin',
expiration=60 * 60, gae_login=True):
"""Checks that user is authorized to access other_application"""
if request.env.web2py_runtime_gae:
from google.appengine.api import users
if users.is_current_user_admin():
return True
elif gae_login:
login_html = '<a href="%s">Sign in with your google account</a>.' \
% users.create_login_url(request.env.path_info)
raise HTTP(200, '<html><body>%s</body></html>' % login_html)
else:
return False
else:
t0 = time.time()
dt = t0 - expiration
s = get_session(request, other_application)
r = (s.authorized and s.last_time and s.last_time > dt)
if r:
s.last_time = t0
set_session(request, s, other_application)
return r
def sys_processor():
def _get_app_var(key):
if key in constants.APPVAR_DISPLAY_LIST:
return models.ApplicationVariable.get_app_var(key)
return None
def _is_logged_in():
if users.get_current_user():
return True
return False
def _is_admin():
return users.is_current_user_admin()
def _login_link(endpoint):
return users.create_login_url(endpoint)
return dict(get_app_var=_get_app_var,
is_logged_in=_is_logged_in,
is_admin=_is_admin,
login_link=_login_link)
def check_credentials(request, other_application='admin',
expiration=60 * 60, gae_login=True):
"""Checks that user is authorized to access other_application"""
if request.env.web2py_runtime_gae:
from google.appengine.api import users
if users.is_current_user_admin():
return True
elif gae_login:
login_html = '<a href="%s">Sign in with your google account</a>.' \
% users.create_login_url(request.env.path_info)
raise HTTP(200, '<html><body>%s</body></html>' % login_html)
else:
return False
else:
t0 = time.time()
dt = t0 - expiration
s = get_session(request, other_application)
r = (s.authorized and s.last_time and s.last_time > dt)
if r:
s.last_time = t0
set_session(request, s, other_application)
return r
def main():
"""CGI-style request handler to dump the configuration.
Put this in your app.yaml to enable (you can pick any URL):
- url: /lib_config
script: $PYTHON_LIB/google/appengine/api/lib_config.py
Note: unless you are using the SDK, you must be admin.
"""
if not os.getenv('SERVER_SOFTWARE', '').startswith('Dev'):
from google.appengine.api import users
if not users.is_current_user_admin():
if users.get_current_user() is None:
print 'Status: 302'
print 'Location:', users.create_login_url(os.getenv('PATH_INFO', ''))
else:
print 'Status: 403'
print
print 'Forbidden'
return
print 'Content-type: text/plain'
print
_default_registry._dump()
def getorcreate(user):
userprefs = memcache.get(user.user_id())
if userprefs is not None:
return userprefs
else:
userprefs = UserPrefs.get_by_id(user.user_id()) # new records have user_id as id
if userprefs != None:
userprefs.updateMemcache()
return userprefs
usersresult = UserPrefs.query(UserPrefs.userid == user.user_id()).fetch() # Fetching old records from userid
if len(usersresult) == 0:
userprefs = UserPrefs.create(user, users.is_current_user_admin(), users.is_current_user_admin())
userprefs.put()
else:
olduser = usersresult[0]
# old record, update to a new with user_id as id and email
if olduser != None:
userprefs = UserPrefs.create(user, olduser.hasAccess(), olduser.isAdmin())
userprefs.activeSemester = olduser.activeSemester
userprefs.groupaccess = olduser.groupaccess
userprefs.groupadmin = olduser.groupadmin
userprefs.put()
olduser.key.delete()
userprefs.updateMemcache()
return userprefs
def main():
"""CGI-style request handler to dump the configuration.
Put this in your app.yaml to enable (you can pick any URL):
- url: /lib_config
script: $PYTHON_LIB/google/appengine/api/lib_config.py
Note: unless you are using the SDK, you must be admin.
"""
if not os.getenv('SERVER_SOFTWARE', '').startswith('Dev'):
from google.appengine.api import users
if not users.is_current_user_admin():
if users.get_current_user() is None:
print 'Status: 302'
print 'Location:', users.create_login_url(os.getenv('PATH_INFO', ''))
else:
print 'Status: 403'
print
print 'Forbidden'
return
print 'Content-type: text/plain'
print
_default_registry._dump()
def retrieve_user_from_google(google_user):
auth_id = 'federated_%s' % google_user.user_id()
user_db = model.User.get_by('auth_ids', auth_id)
if user_db:
if not user_db.admin and users.is_current_user_admin():
user_db.admin = True
user_db.put()
return user_db
return auth.create_or_get_user_db(
auth_id=auth_id,
name=util.create_name_from_email(google_user.email()),
username=google_user.email(),
email=google_user.email(),
verified=True,
admin=users.is_current_user_admin(),
)
def check_credentials(request, other_application='admin',
expiration=60 * 60, gae_login=True):
"""Checks that user is authorized to access other_application"""
if request.env.web2py_runtime_gae:
from google.appengine.api import users
if users.is_current_user_admin():
return True
elif gae_login:
login_html = '<a href="%s">Sign in with your google account</a>.' \
% users.create_login_url(request.env.path_info)
raise HTTP(200, '<html><body>%s</body></html>' % login_html)
else:
return False
else:
t0 = time.time()
dt = t0 - expiration
s = get_session(request, other_application)
r = (s.authorized and s.last_time and s.last_time > dt)
if r:
s.last_time = t0
set_session(request,s,other_application)
return r
def admin_required(func):
@wraps(func)
def decorated_view(*args, **kwargs):
if users.get_current_user():
if not users.is_current_user_admin():
abort(401) # Unauthorized
return func(*args, **kwargs)
return redirect(users.create_login_url(request.url))
return decorated_view
def is_admin():
return users.get_current_user() and users.is_current_user_admin()
def admin_required(handler_method):
"""A decorator to require that a user be an admin for this application
to access a handler.
To use it, decorate your get() method like this::
@admin_required
def get(self):
user = users.get_current_user(self)
self.response.out.write('Hello, ' + user.nickname())
We will redirect to a login page if the user is not logged in. We always
redirect to the request URI, and Google Accounts only redirects back as
a GET request, so this should not be used for POSTs.
"""
def check_admin(self, *args, **kwargs):
if self.request.method != 'GET':
self.abort(400,
detail='The admin_required decorator '
'can only be used for GET requests.')
user = users.get_current_user()
if not user:
return self.redirect(users.create_login_url(self.request.url))
elif not users.is_current_user_admin():
self.abort(403)
else:
handler_method(self, *args, **kwargs)
return check_admin
def is_user_app_admin():
logging.debug("is_user_app_admin: %s", users.is_current_user_admin())
return users.is_current_user_admin()
def get_current_user(self):
user = users.get_current_user()
if user: user.administrator = users.is_current_user_admin()
return user
def get_current_user(self):
user = users.get_current_user()
if user: user.administrator = users.is_current_user_admin()
return user
def login(self, skey="", *args, **kwargs):
if users.get_current_user():
addSkel = skeletonByKind(self.userModule.addSkel().kindName) # Ensure that we have the full skeleton
currentUser = users.get_current_user()
uid = currentUser.user_id()
userSkel = addSkel().all().filter("uid =", uid).getSkel()
if not userSkel:
# We'll try again - checking if there's already an user with that email
userSkel = addSkel().all().filter("name.idx =", currentUser.email().lower()).getSkel()
if not userSkel: # Still no luck - it's a completely new user
if not self.registrationEnabled and not users.is_current_user_admin():
# Registration is disabled, it's a new user and that user is not admin
logging.warning("Denying registration of %s", currentUser.email())
raise errors.Forbidden("Registration for new users is disabled")
userSkel = addSkel() # We'll add a new user
userSkel["uid"] = uid
userSkel["name"] = currentUser.email()
isAdd = True
else:
isAdd = False
now = datetime.datetime.now()
if isAdd or (now-userSkel["lastlogin"]) > datetime.timedelta(minutes=30):
# Conserve DB-Writes: Update the user max once in 30 Minutes
userSkel["lastlogin"] = now
if users.is_current_user_admin():
if not userSkel["access"]:
userSkel["access"] = []
if not "root" in userSkel["access"]:
userSkel["access"].append("root")
userSkel["gaeadmin"] = True
else:
userSkel["gaeadmin"] = False
assert userSkel.toDB()
return self.userModule.continueAuthenticationFlow(self, userSkel["key"])
raise errors.Redirect( users.create_login_url( self.modulePath+"/login") )
def canCall( self ):
"""
Checks wherever the current user can execute this task
@returns bool
"""
return( users.is_current_user_admin() )
def register(flask_request):
gae_user = users.get_current_user()
if not gae_user:
raise errors.LoginError(
'Cannot register if not logged into AppEngine.')
data = flask_request.get_json()
user = controllers.register_user(
gae_user.email(), data['nick'], '',
data.get('team_id'), data.get('team_name'), data.get('team_code'))
if users.is_current_user_admin():
user.promote()
return user
def get(self):
user = users.get_current_user()
if user:
if users.is_current_user_admin():
self.response.write('You are an administrator.')
else:
self.response.write('You are not an administrator.')
else:
self.response.write('You are not logged in.')
def export():
user = users.get_current_user()
total_shared = Session.query(Session.shared == True).count()
if user and users.is_current_user_admin():
bucket_size = max(1, total_shared // (NUM_TASKS - 1))
for i in range(NUM_TASKS):
# start a task with delay of 60*i seconds
taskqueue.add(url='/tasks/process_export', method='GET',
params={'bucket': i, 'bucket_size': bucket_size}, countdown=60*i)
return 'Trigerred for %d queries' % total_shared, 200
else:
return 'Admin access only', 403