python类is_current_user_admin()的实例源码

handler.py 文件源码 项目:Deploy_XXNET_Server 作者: jzp820927 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def CheckIsAdmin(self):
    user_is_authorized = False
    if users.is_current_user_admin():
      user_is_authorized = True
    if not user_is_authorized and config.CUSTOM_ENVIRONMENT_AUTHENTICATION:
      if len(config.CUSTOM_ENVIRONMENT_AUTHENTICATION) == 2:
        var, values = config.CUSTOM_ENVIRONMENT_AUTHENTICATION
        if os.getenv(var) in values:
          user_is_authorized = True
      else:
        logging.warning('remoteapi_CUSTOM_ENVIRONMENT_AUTHENTICATION is '
                        'configured incorrectly.')

    if not user_is_authorized:
      try:
        user_is_authorized = (
            oauth.is_current_user_admin(_scope=self.OAUTH_SCOPES))
      except oauth.OAuthRequestError:

        pass
    if not user_is_authorized:
      self.response.set_status(401)
      self.response.out.write(
          'You must be logged in as an administrator to access this.')
      self.response.headers['Content-Type'] = 'text/plain'
      return False
    if 'X-appcfg-api-version' not in self.request.headers:
      self.response.set_status(403)
      self.response.out.write('This request did not contain a necessary header')
      self.response.headers['Content-Type'] = 'text/plain'
      return False
    return True
ui.py 文件源码 项目:Deploy_XXNET_Server 作者: jzp820927 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def __call__(self, environ, start_response):
    if not environ.get('SERVER_SOFTWARE', '').startswith('Dev'):
      if not users.is_current_user_admin():
        if users.get_current_user() is None:
          start_response('302 Found',
                         [('Location',
                           users.create_login_url(os.getenv('PATH_INFO', '')))])
          return []
        else:
          start_response('403 Forbidden', [])
          return ['Forbidden\n']
    return self._application(environ, start_response)
__init__.py 文件源码 项目:Deploy_XXNET_Server 作者: jzp820927 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def get(self):

    if users.is_current_user_admin():
      self.generate('interactive.html')
    else:
      logging.warning(
          'Non admin user from IP %s attempted to use interactive console',
          self.request.remote_addr)
      self.error(404)
__init__.py 文件源码 项目:Deploy_XXNET_Server 作者: jzp820927 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def post(self):
    if users.is_current_user_admin():
      if self.interactive_console_enabled():

        save_stdout = sys.stdout
        results_io = cStringIO.StringIO()
        try:
          sys.stdout = results_io


          code = self.request.get('code')
          code = code.replace('\r\n', '\n')

          try:
            compiled_code = compile(code, '<string>', 'exec')
            exec(compiled_code, globals())
          except Exception, e:
            traceback.print_exc(file=results_io)
        finally:
          sys.stdout = save_stdout

        results = results_io.getvalue()
      else:
        results = """The interactive console has been disabled for security
because the dev_appserver is listening on a non-default address.
If you would like to re-enable the console, invoke dev_appserver
with the --enable_console argument.

See https://developers.google.com/appengine/docs/python/tools/devserver#The_Interactive_Console
for more information."""
      self.generate('interactive-output.html', {'output': results})
    else:
      logging.warning(
          'Non admin user from IP %s attempted to use interactive console',
          self.request.remote_addr)
      self.error(404)
users.py 文件源码 项目:blockhooks 作者: EthereumWebhooks 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def admin_required(handler_method):
    """A decorator to require that a user be an admin for this application
    to access a handler.

    To use it, decorate your get() method like this::

        @admin_required
        def get(self):
            user = users.get_current_user(self)
            self.response.out.write('Hello, ' + user.nickname())

    We will redirect to a login page if the user is not logged in. We always
    redirect to the request URI, and Google Accounts only redirects back as
    a GET request, so this should not be used for POSTs.
    """
    def check_admin(self, *args, **kwargs):
        if self.request.method != 'GET':
            self.abort(400, detail='The admin_required decorator '
                'can only be used for GET requests.')

        user = users.get_current_user()
        if not user:
            return self.redirect(users.create_login_url(self.request.url))
        elif not users.is_current_user_admin():
            self.abort(403)
        else:
            handler_method(self, *args, **kwargs)

    return check_admin
blog.py 文件源码 项目:aweasome_learning 作者: Knight-ZXW 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def get_current_user(self):
        user = users.get_current_user()
        if user: user.administrator = users.is_current_user_admin()
        return user
blog.py 文件源码 项目:browser_vuln_check 作者: lcatro 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def get_current_user(self):
        user = users.get_current_user()
        if user: user.administrator = users.is_current_user_admin()
        return user
user_models.py 文件源码 项目:share-class 作者: junyiacademy 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def is_server_admin(self):
        return users.is_current_user_admin()
blog.py 文件源码 项目:LinuxBashShellScriptForOps 作者: DingGuodong 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_current_user(self):
        user = users.get_current_user()
        if user: user.administrator = users.is_current_user_admin()
        return user
blog.py 文件源码 项目:ProgrameFacil 作者: Gpzim98 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def get_current_user(self):
        user = users.get_current_user()
        if user: user.administrator = users.is_current_user_admin()
        return user
auth_utils.py 文件源码 项目:montage 作者: storyful 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def auth_user(fn):
    """
        Decorator to force user to be logged in with GAE
    """
    @functools.wraps(fn)
    def _wrapped(request, *args, **kwargs):
        temp_request = request
        bearer = request.META['HTTP_AUTHORIZATION']
        url = "https://www.googleapis.com/userinfo/v2/me"
        result = urlfetch.fetch(url=url,
            method=urlfetch.GET,
            headers={"Authorization" : bearer})
        contents = json.loads(result.content)
        gae_user = users.get_current_user()
        is_admin = users.is_current_user_admin()

        User = get_user_model()
        django_user = None
        try:
            logging.debug("Getting django user")
            django_user = User.objects.get(
                email=contents['email'])
        except User.DoesNotExist:
            logging.info("User does not exist in Montage. Checking pending users")
            try:
                pending_user = PendingUser.objects.get(
                    email=contents['email'])
            except PendingUser.DoesNotExist:
                logging.info("No pending user record for this email")
                user, created = get_user_model().objects.get_or_create(
                    email=email,
                    defaults={
                        'username': email.split('@')[0],
                        'is_active': True
                    }
                )
                return user
            else:
                logging.info("Pending user record found. Activating user.")
                django_user = activate_pending_user(
                    pending_user, gae_user, is_admin)
        except AttributeError:
            return HttpResponseForbidden()

        else:
            logging.info("User found. Updating gaia_id and superuser status")
            request = temp_request
            # update_user(django_user, is_admin)

        if django_user:
            request.user = django_user
        else:
            return HttpResponseForbidden()

        return fn(request, *args, **kwargs)
    return _wrapped


问题


面经


文章

微信
公众号

扫码关注公众号