def _on_auth(self, user):
if not user:
raise tornado.web.HTTPError(500, "Facebook auth failed")
self.set_secure_cookie("fbdemo_user", tornado.escape.json_encode(user))
self.redirect(self.get_argument("next", "/"))
python类auth()的实例源码
def get(self):
if self.get_argument("code", False):
user = yield self.get_authenticated_user(
redirect_uri=settings.get('redirect_uri'),
client_id=settings.get('fb_app_id'),
client_secret=settings.get('fb_app_secret'),
code=self.get_argument("code"))
self.set_secure_cookie('user', json.dumps(user))
self.redirect('/auth/admin/event/')
else:
yield self.authorize_redirect(
redirect_uri=settings.get('redirect_uri'),
client_id=settings.get('fb_app_id'),
extra_params={"scope": settings.get('FB_GRANTS')})
def _on_stream(self, stream):
if stream is None:
# Session may have expired
self.redirect("/auth/login")
return
self.render("stream.html", stream=stream)
def get(self):
my_url = (self.request.protocol + "://" + self.request.host +
"/auth/login?next=" +
tornado.escape.url_escape(self.get_argument("next", "/")))
if self.get_argument("code", False):
self.get_authenticated_user(
redirect_uri=my_url,
client_id=self.settings["facebook_api_key"],
client_secret=self.settings["facebook_secret"],
code=self.get_argument("code"),
callback=self._on_auth)
return
self.authorize_redirect(redirect_uri=my_url,
client_id=self.settings["facebook_api_key"],
extra_params={"scope": "user_posts"})
def _on_auth(self, user):
if not user:
raise tornado.web.HTTPError(500, "Facebook auth failed")
self.set_secure_cookie("fbdemo_user", tornado.escape.json_encode(user))
self.redirect(self.get_argument("next", "/"))
def login(self, username, token, next="/"):
generateToken = False
if token == Options['auth_key']:
# Auth_key token option for testing local-only proxy
generateToken = True
elif Options['no_auth'] and Options['debug'] and not Options['gsheet_url']:
# No authentication option for testing local-only proxy
generateToken = True
role = ''
if username == sdproxy.ADMIN_ROLE:
role = sdproxy.ADMIN_ROLE
elif username == sdproxy.GRADER_ROLE:
role = sdproxy.GRADER_ROLE
if generateToken:
token = gen_proxy_auth_token(username, role=role)
data = {}
comps = token.split(':')
if not role and (self.is_web_view() or len(comps) > 1):
if len(comps) != 3:
self.redirect('/_auth/login/' + '?error=' + tornado.escape.url_escape('Invalid locked access token. Expecting site:session:code'))
return
siteName, sessionName, _ = comps
if not sessionName:
# Locked site access
next = '/' + siteName
else:
# Locked session access
next = getSessionPath(sessionName)
if siteName: # Add site prefix separately because this is root site
next = '/' + siteName + next
data['locked_access'] = next
auth = self.check_locked(username, token, siteName, sessionName)
else:
auth = self.check_access(username, token, role=role)
if auth:
if Global.twitter_params:
data['site_twitter'] = Global.twitter_params['site_twitter']
self.set_id(username, data=data, role=role)
self.redirect(next)
else:
error_msg = "?error=" + tornado.escape.url_escape("Incorrect username or token")
self.redirect("/_auth/login/" + error_msg)
def get(self):
"""
Sets the 'user' cookie with an appropriate *upn* and *session* and any
other values that might be attached to the user object given to us by
Google.
"""
self.base_url = "{protocol}://{host}:{port}{url_prefix}".format(
protocol=self.request.protocol,
host=self.request.host,
port=self.settings['port'],
url_prefix=self.settings['url_prefix'])
uri_port = ':{0}/'.format(self.settings['port'])
if uri_port in self.base_url:
# Get rid of the port (will be added automatically)
self.base_url = self.base_url.replace(uri_port, '/', 1)
redirect_uri = "{base_url}auth".format(base_url=self.base_url)
check = self.get_argument("check", None)
if check:
self.set_header('Access-Control-Allow-Origin', '*')
user = self.get_current_user()
if user:
logging.debug('GoogleAuthHandler: user is authenticated')
self.write('authenticated')
else:
logging.debug('GoogleAuthHandler: user is NOT authenticated')
self.write('unauthenticated')
self.finish()
return
logout_url = "https://accounts.google.com/Logout"
logout = self.get_argument("logout", None)
if logout:
user = self.get_current_user()['upn']
self.clear_cookie('gateone_user')
self.user_logout(user, logout_url)
return
if self.get_argument('code', False):
user = yield self.get_authenticated_user(
redirect_uri=redirect_uri,
code=self.get_argument('code'))
if not user:
self.clear_all_cookies()
raise tornado.web.HTTPError(500, 'Google auth failed')
access_token = str(user['access_token'])
http_client = self.get_auth_http_client()
response = yield http_client.fetch(
'https://www.googleapis.com/oauth2/v1/userinfo?access_token='
+access_token)
if not response:
self.clear_all_cookies()
raise tornado.web.HTTPError(500, 'Google auth failed')
user = json.loads(response.body.decode('utf-8'))
self._on_auth(user)
else:
yield self.authorize_redirect(
redirect_uri=redirect_uri,
client_id=self.settings['google_oauth']['key'],
scope=['email'],
response_type='code',
extra_params={'approval_prompt': 'auto'})