python类is_current_user_admin()的实例源码

fileutils.py 文件源码 项目:touch-pay-client 作者: HackPucBemobi 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def check_credentials(request, other_application='admin',
                      expiration=60 * 60, gae_login=True):
    """Checks that user is authorized to access other_application"""
    if request.env.web2py_runtime_gae:
        from google.appengine.api import users
        if users.is_current_user_admin():
            return True
        elif gae_login:
            login_html = '<a href="%s">Sign in with your google account</a>.' \
                % users.create_login_url(request.env.path_info)
            raise HTTP(200, '<html><body>%s</body></html>' % login_html)
        else:
            return False
    else:
        t0 = time.time()
        dt = t0 - expiration
        s = get_session(request, other_application)
        r = (s.authorized and s.last_time and s.last_time > dt)
        if r:
            s.last_time = t0
            set_session(request, s, other_application)
        return r
gae.py 文件源码 项目:electron-crash-reporter 作者: lipis 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def retrieve_user_from_gae(gae_user):
  auth_id = 'federated_%s' % gae_user.user_id()
  user_db = model.User.get_by('auth_ids', auth_id)
  if user_db:
    if not user_db.admin and users.is_current_user_admin():
      user_db.admin = True
      user_db.put()
    return user_db

  return auth.create_user_db(
    auth_id=auth_id,
    name=util.create_name_from_email(gae_user.email()),
    username=gae_user.email(),
    email=gae_user.email(),
    verified=True,
    admin=users.is_current_user_admin(),
  )
lib_config.py 文件源码 项目:Intranet-Penetration 作者: yuxiaokui 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def main():
  """CGI-style request handler to dump the configuration.

  Put this in your app.yaml to enable (you can pick any URL):

  - url: /lib_config
    script: $PYTHON_LIB/google/appengine/api/lib_config.py

  Note: unless you are using the SDK, you must be admin.
  """
  if not os.getenv('SERVER_SOFTWARE', '').startswith('Dev'):
    from google.appengine.api import users
    if not users.is_current_user_admin():
      if users.get_current_user() is None:
        print 'Status: 302'
        print 'Location:', users.create_login_url(os.getenv('PATH_INFO', ''))
      else:
        print 'Status: 403'
        print
        print 'Forbidden'
      return

  print 'Content-type: text/plain'
  print
  _default_registry._dump()
lib_config.py 文件源码 项目:MKFQ 作者: maojingios 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def main():
  """CGI-style request handler to dump the configuration.

  Put this in your app.yaml to enable (you can pick any URL):

  - url: /lib_config
    script: $PYTHON_LIB/google/appengine/api/lib_config.py

  Note: unless you are using the SDK, you must be admin.
  """
  if not os.getenv('SERVER_SOFTWARE', '').startswith('Dev'):
    from google.appengine.api import users
    if not users.is_current_user_admin():
      if users.get_current_user() is None:
        print 'Status: 302'
        print 'Location:', users.create_login_url(os.getenv('PATH_INFO', ''))
      else:
        print 'Status: 403'
        print
        print 'Forbidden'
      return

  print 'Content-type: text/plain'
  print
  _default_registry._dump()
gae.py 文件源码 项目:meet-notes 作者: lipis 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def retrieve_user_from_gae(gae_user):
  auth_id = 'federated_%s' % gae_user.user_id()
  user_db = model.User.get_by('auth_ids', auth_id)
  if user_db:
    if not user_db.admin and users.is_current_user_admin():
      user_db.admin = True
      user_db.put()
    return user_db

  return auth.create_user_db(
    auth_id=auth_id,
    name=util.create_name_from_email(gae_user.email()),
    username=gae_user.email(),
    email=gae_user.email(),
    verified=True,
    admin=users.is_current_user_admin(),
  )
fileutils.py 文件源码 项目:true_review_web2py 作者: lucadealfaro 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def check_credentials(request, other_application='admin',
                      expiration=60 * 60, gae_login=True):
    """Checks that user is authorized to access other_application"""
    if request.env.web2py_runtime_gae:
        from google.appengine.api import users
        if users.is_current_user_admin():
            return True
        elif gae_login:
            login_html = '<a href="%s">Sign in with your google account</a>.' \
                % users.create_login_url(request.env.path_info)
            raise HTTP(200, '<html><body>%s</body></html>' % login_html)
        else:
            return False
    else:
        t0 = time.time()
        dt = t0 - expiration
        s = get_session(request, other_application)
        r = (s.authorized and s.last_time and s.last_time > dt)
        if r:
            s.last_time = t0
            set_session(request, s, other_application)
        return r
gae.py 文件源码 项目:vote4code 作者: welovecoding 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def retrieve_user_from_gae(gae_user):
  auth_id = 'federated_%s' % gae_user.user_id()
  user_db = model.User.get_by('auth_ids', auth_id)
  if user_db:
    if not user_db.admin and users.is_current_user_admin():
      user_db.admin = True
      user_db.put()
    return user_db

  return auth.create_user_db(
    auth_id=auth_id,
    name=util.create_name_from_email(gae_user.email()),
    username=gae_user.email(),
    email=gae_user.email(),
    verified=True,
    admin=users.is_current_user_admin(),
  )
fileutils.py 文件源码 项目:Problematica-public 作者: TechMaz 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def check_credentials(request, other_application='admin',
                      expiration=60 * 60, gae_login=True):
    """Checks that user is authorized to access other_application"""
    if request.env.web2py_runtime_gae:
        from google.appengine.api import users
        if users.is_current_user_admin():
            return True
        elif gae_login:
            login_html = '<a href="%s">Sign in with your google account</a>.' \
                % users.create_login_url(request.env.path_info)
            raise HTTP(200, '<html><body>%s</body></html>' % login_html)
        else:
            return False
    else:
        t0 = time.time()
        dt = t0 - expiration
        s = get_session(request, other_application)
        r = (s.authorized and s.last_time and s.last_time > dt)
        if r:
            s.last_time = t0
            set_session(request, s, other_application)
        return r
google_models.py 文件源码 项目:beg-django-e-commerce 作者: Apress 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def get_djangouser_for_user(cls, user):
        django_user = cls.all().filter('user =', user).get()
        if django_user:
            if getattr(settings, 'AUTH_ADMIN_USER_AS_SUPERUSER', True):
                is_admin = users.is_current_user_admin()
                if django_user.is_staff != is_admin or \
                        django_user.is_superuser != is_admin:
                    django_user.is_superuser = django_user.is_staff = is_admin
                    django_user.put()
        else:
            django_user = cls.create_djangouser_for_user(user)
            django_user.is_active = True
            if getattr(settings, 'AUTH_ADMIN_USER_AS_SUPERUSER', True) and \
                    users.is_current_user_admin():
                django_user.is_staff = True
                django_user.is_superuser = True
            django_user.put()
        return django_user
fileutils.py 文件源码 项目:rekall-agent-server 作者: rekall-innovations 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def check_credentials(request, other_application='admin',
                      expiration=60 * 60, gae_login=True):
    """Checks that user is authorized to access other_application"""
    if request.env.web2py_runtime_gae:
        from google.appengine.api import users
        if users.is_current_user_admin():
            return True
        elif gae_login:
            login_html = '<a href="%s">Sign in with your google account</a>.' \
                % users.create_login_url(request.env.path_info)
            raise HTTP(200, '<html><body>%s</body></html>' % login_html)
        else:
            return False
    else:
        t0 = time.time()
        dt = t0 - expiration
        s = get_session(request, other_application)
        r = (s.authorized and s.last_time and s.last_time > dt)
        if r:
            s.last_time = t0
            set_session(request, s, other_application)
        return r
lib_config.py 文件源码 项目:xxNet 作者: drzorm 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def main():
  """CGI-style request handler to dump the configuration.

  Put this in your app.yaml to enable (you can pick any URL):

  - url: /lib_config
    script: $PYTHON_LIB/google/appengine/api/lib_config.py

  Note: unless you are using the SDK, you must be admin.
  """
  if not os.getenv('SERVER_SOFTWARE', '').startswith('Dev'):
    from google.appengine.api import users
    if not users.is_current_user_admin():
      if users.get_current_user() is None:
        print 'Status: 302'
        print 'Location:', users.create_login_url(os.getenv('PATH_INFO', ''))
      else:
        print 'Status: 403'
        print
        print 'Forbidden'
      return

  print 'Content-type: text/plain'
  print
  _default_registry._dump()
fileutils.py 文件源码 项目:web3py 作者: web2py 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def check_credentials(request, other_application='admin',
                      expiration=60 * 60, gae_login=True):
    """Checks that user is authorized to access other_application"""
    if request.env.web2py_runtime_gae:
        from google.appengine.api import users
        if users.is_current_user_admin():
            return True
        elif gae_login:
            login_html = '<a href="%s">Sign in with your google account</a>.' \
                % users.create_login_url(request.env.path_info)
            raise HTTP(200, '<html><body>%s</body></html>' % login_html)
        else:
            return False
    else:
        t0 = time.time()
        dt = t0 - expiration
        s = get_session(request, other_application)
        r = (s.authorized and s.last_time and s.last_time > dt)
        if r:
            s.last_time = t0
            set_session(request, s, other_application)
        return r
context.py 文件源码 项目:gae-sports-data 作者: jnguyen-ca 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def sys_processor():
    def _get_app_var(key):
        if key in constants.APPVAR_DISPLAY_LIST:
            return models.ApplicationVariable.get_app_var(key)
        return None

    def _is_logged_in():
        if users.get_current_user():
            return True
        return False

    def _is_admin():
        return users.is_current_user_admin()

    def _login_link(endpoint):
        return users.create_login_url(endpoint)

    return dict(get_app_var=_get_app_var,
                is_logged_in=_is_logged_in,
                is_admin=_is_admin,
                login_link=_login_link)
fileutils.py 文件源码 项目:slugiot-client 作者: slugiot 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def check_credentials(request, other_application='admin',
                      expiration=60 * 60, gae_login=True):
    """Checks that user is authorized to access other_application"""
    if request.env.web2py_runtime_gae:
        from google.appengine.api import users
        if users.is_current_user_admin():
            return True
        elif gae_login:
            login_html = '<a href="%s">Sign in with your google account</a>.' \
                % users.create_login_url(request.env.path_info)
            raise HTTP(200, '<html><body>%s</body></html>' % login_html)
        else:
            return False
    else:
        t0 = time.time()
        dt = t0 - expiration
        s = get_session(request, other_application)
        r = (s.authorized and s.last_time and s.last_time > dt)
        if r:
            s.last_time = t0
            set_session(request, s, other_application)
        return r
lib_config.py 文件源码 项目:Deploy_XXNET_Server 作者: jzp820927 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def main():
  """CGI-style request handler to dump the configuration.

  Put this in your app.yaml to enable (you can pick any URL):

  - url: /lib_config
    script: $PYTHON_LIB/google/appengine/api/lib_config.py

  Note: unless you are using the SDK, you must be admin.
  """
  if not os.getenv('SERVER_SOFTWARE', '').startswith('Dev'):
    from google.appengine.api import users
    if not users.is_current_user_admin():
      if users.get_current_user() is None:
        print 'Status: 302'
        print 'Location:', users.create_login_url(os.getenv('PATH_INFO', ''))
      else:
        print 'Status: 403'
        print
        print 'Forbidden'
      return

  print 'Content-type: text/plain'
  print
  _default_registry._dump()
data.py 文件源码 项目:skojjt 作者: martin-green 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def getorcreate(user):
        userprefs = memcache.get(user.user_id())
        if userprefs is not None:
            return userprefs
        else:
            userprefs = UserPrefs.get_by_id(user.user_id()) # new records have user_id as id
            if userprefs != None:
                userprefs.updateMemcache()
                return userprefs
            usersresult = UserPrefs.query(UserPrefs.userid == user.user_id()).fetch() # Fetching old records from userid
            if len(usersresult) == 0:
                userprefs = UserPrefs.create(user, users.is_current_user_admin(), users.is_current_user_admin())
                userprefs.put()
            else:
                olduser = usersresult[0]
                # old record, update to a new with user_id as id and email
                if olduser != None:
                    userprefs = UserPrefs.create(user, olduser.hasAccess(), olduser.isAdmin())
                    userprefs.activeSemester = olduser.activeSemester
                    userprefs.groupaccess = olduser.groupaccess
                    userprefs.groupadmin = olduser.groupadmin
                    userprefs.put()
                    olduser.key.delete()
            userprefs.updateMemcache()
            return userprefs
lib_config.py 文件源码 项目:Docker-XX-Net 作者: kuanghy 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def main():
  """CGI-style request handler to dump the configuration.

  Put this in your app.yaml to enable (you can pick any URL):

  - url: /lib_config
    script: $PYTHON_LIB/google/appengine/api/lib_config.py

  Note: unless you are using the SDK, you must be admin.
  """
  if not os.getenv('SERVER_SOFTWARE', '').startswith('Dev'):
    from google.appengine.api import users
    if not users.is_current_user_admin():
      if users.get_current_user() is None:
        print 'Status: 302'
        print 'Location:', users.create_login_url(os.getenv('PATH_INFO', ''))
      else:
        print 'Status: 403'
        print
        print 'Forbidden'
      return

  print 'Content-type: text/plain'
  print
  _default_registry._dump()
google.py 文件源码 项目:gae_test 作者: huijari 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def retrieve_user_from_google(google_user):
    auth_id = 'federated_%s' % google_user.user_id()
    user_db = model.User.get_by('auth_ids', auth_id)
    if user_db:
        if not user_db.admin and users.is_current_user_admin():
            user_db.admin = True
            user_db.put()
        return user_db

    return auth.create_or_get_user_db(
        auth_id=auth_id,
        name=util.create_name_from_email(google_user.email()),
        username=google_user.email(),
        email=google_user.email(),
        verified=True,
        admin=users.is_current_user_admin(),
    )
fileutils.py 文件源码 项目:StuffShare 作者: StuffShare 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def check_credentials(request, other_application='admin',
                      expiration=60 * 60, gae_login=True):
    """Checks that user is authorized to access other_application"""
    if request.env.web2py_runtime_gae:
        from google.appengine.api import users
        if users.is_current_user_admin():
            return True
        elif gae_login:
            login_html = '<a href="%s">Sign in with your google account</a>.' \
                % users.create_login_url(request.env.path_info)
            raise HTTP(200, '<html><body>%s</body></html>' % login_html)
        else:
            return False
    else:
        t0 = time.time()
        dt = t0 - expiration
        s = get_session(request, other_application)
        r = (s.authorized and s.last_time and s.last_time > dt)
        if r:
            s.last_time = t0
            set_session(request,s,other_application)
        return r
decorators.py 文件源码 项目:love 作者: Yelp 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def admin_required(func):
    @wraps(func)
    def decorated_view(*args, **kwargs):
        if users.get_current_user():
            if not users.is_current_user_admin():
                abort(401)  # Unauthorized
            return func(*args, **kwargs)
        return redirect(users.create_login_url(request.url))
    return decorated_view
auth.py 文件源码 项目:love 作者: Yelp 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def is_admin():
    return users.get_current_user() and users.is_current_user_admin()
users.py 文件源码 项目:webapp2 作者: GoogleCloudPlatform 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def admin_required(handler_method):
    """A decorator to require that a user be an admin for this application
    to access a handler.

    To use it, decorate your get() method like this::

        @admin_required
        def get(self):
            user = users.get_current_user(self)
            self.response.out.write('Hello, ' + user.nickname())

    We will redirect to a login page if the user is not logged in. We always
    redirect to the request URI, and Google Accounts only redirects back as
    a GET request, so this should not be used for POSTs.
    """
    def check_admin(self, *args, **kwargs):
        if self.request.method != 'GET':
            self.abort(400,
                       detail='The admin_required decorator '
                              'can only be used for GET requests.')

        user = users.get_current_user()
        if not user:
            return self.redirect(users.create_login_url(self.request.url))
        elif not users.is_current_user_admin():
            self.abort(403)
        else:
            handler_method(self, *args, **kwargs)

    return check_admin
users.py 文件源码 项目:rekall-agent-server 作者: rekall-innovations 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def is_user_app_admin():
    logging.debug("is_user_app_admin: %s", users.is_current_user_admin())
    return users.is_current_user_admin()
blog.py 文件源码 项目:annotated-py-tornado 作者: hhstore 项目源码 文件源码 阅读 42 收藏 0 点赞 0 评论 0
def get_current_user(self):
        user = users.get_current_user()
        if user: user.administrator = users.is_current_user_admin()
        return user
blog.py 文件源码 项目:annotated-py-tornado 作者: hhstore 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def get_current_user(self):
        user = users.get_current_user()
        if user: user.administrator = users.is_current_user_admin()
        return user
user.py 文件源码 项目:server 作者: viur-framework 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def login(self, skey="", *args, **kwargs):
        if users.get_current_user():
            addSkel = skeletonByKind(self.userModule.addSkel().kindName) # Ensure that we have the full skeleton
            currentUser = users.get_current_user()
            uid = currentUser.user_id()
            userSkel = addSkel().all().filter("uid =", uid).getSkel()
            if not userSkel:
                # We'll try again - checking if there's already an user with that email
                userSkel = addSkel().all().filter("name.idx =", currentUser.email().lower()).getSkel()
                if not userSkel: # Still no luck - it's a completely new user
                    if not self.registrationEnabled and not users.is_current_user_admin():
                        # Registration is disabled, it's a new user and that user is not admin
                        logging.warning("Denying registration of %s", currentUser.email())
                        raise errors.Forbidden("Registration for new users is disabled")
                    userSkel = addSkel() # We'll add a new user
                userSkel["uid"] = uid
                userSkel["name"] = currentUser.email()
                isAdd = True
            else:
                isAdd = False
            now = datetime.datetime.now()
            if isAdd or (now-userSkel["lastlogin"]) > datetime.timedelta(minutes=30):
                # Conserve DB-Writes: Update the user max once in 30 Minutes
                userSkel["lastlogin"] = now
                if users.is_current_user_admin():
                    if not userSkel["access"]:
                        userSkel["access"] = []
                    if not "root" in userSkel["access"]:
                        userSkel["access"].append("root")
                    userSkel["gaeadmin"] = True
                else:
                    userSkel["gaeadmin"] = False
                assert userSkel.toDB()
            return self.userModule.continueAuthenticationFlow(self, userSkel["key"])
        raise errors.Redirect( users.create_login_url( self.modulePath+"/login") )
tasks.py 文件源码 项目:server 作者: viur-framework 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def canCall( self ):
        """
            Checks wherever the current user can execute this task
            @returns bool
        """
        return( users.is_current_user_admin() )
appengine.py 文件源码 项目:ctfscoreboard 作者: google 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def register(flask_request):
    gae_user = users.get_current_user()
    if not gae_user:
        raise errors.LoginError(
                'Cannot register if not logged into AppEngine.')
    data = flask_request.get_json()
    user = controllers.register_user(
            gae_user.email(), data['nick'], '',
            data.get('team_id'), data.get('team_name'), data.get('team_code'))
    if users.is_current_user_admin():
        user.promote()
    return user
main.py 文件源码 项目:appbackendapi 作者: codesdk 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get(self):
        user = users.get_current_user()
        if user:
            if users.is_current_user_admin():
                self.response.write('You are an administrator.')
            else:
                self.response.write('You are not an administrator.')
        else:
            self.response.write('You are not logged in.')
main.py 文件源码 项目:cas-eval 作者: varepsilon 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def export():
    user = users.get_current_user()
    total_shared = Session.query(Session.shared == True).count()
    if user and users.is_current_user_admin():
        bucket_size = max(1, total_shared // (NUM_TASKS - 1))
        for i in range(NUM_TASKS):
            # start a task with delay of 60*i seconds
            taskqueue.add(url='/tasks/process_export', method='GET',
                    params={'bucket':  i, 'bucket_size': bucket_size}, countdown=60*i)
        return 'Trigerred for %d queries' % total_shared, 200
    else:
        return 'Admin access only', 403


问题


面经


文章

微信
公众号

扫码关注公众号