def oauth_request_parameters(consumer_token, url, access_token, parameters={},
method="GET", oauth_version="1.0a",
override_version=""):
base_args = dict(
oauth_consumer_key=consumer_token["key"],
oauth_token=access_token["key"],
oauth_signature_method="HMAC-SHA1",
oauth_timestamp=str(int(time.time())),
oauth_nonce=binascii.b2a_hex(uuid.uuid4().bytes),
oauth_version=override_version or oauth_version,
)
args = base_args.copy()
args.update(parameters)
if oauth_version == "1.0a":
signature = tornado.auth._oauth10a_signature(consumer_token, method, url, args, access_token)
else:
signature = tornado.auth._oauth_signature(consumer_token, method, url, args, access_token)
base_args["oauth_signature"] = signature
return base_args
python类auth()的实例源码
def user_login(self, user):
"""
This is an override of BaseAuthHandler since anonymous auth is special.
Generates a unique session ID for this user and saves it in a browser
cookie. This is to ensure that anonymous users can't access each
other's sessions.
"""
logging.debug("NullAuthHandler.user_login(%s)" % user['upn'])
# Make a directory to store this user's settings/files/logs/etc
user_dir = os.path.join(self.settings['user_dir'], user['upn'])
if not os.path.exists(user_dir):
logging.info(_("Creating user directory: %s" % user_dir))
mkdir_p(user_dir)
os.chmod(user_dir, 0o700)
session_info = {
'session': generate_session_id()
}
session_info.update(user)
#print session_info
self.set_secure_cookie(
"gateone_user", tornado.escape.json_encode(session_info))
def get(self):
"""
Deletes the 'gateone_user' cookie and handles some other situations for
backwards compatibility.
"""
# Get rid of the cookie no matter what (API auth doesn't use cookies)
user = self.current_user
self.clear_cookie('gateone_user')
check = self.get_argument("check", None)
if check:
# This lets any origin check if the user has been authenticated
# (necessary to prevent "not allowed ..." XHR errors)
self.set_header('Access-Control-Allow-Origin', '*')
logout = self.get_argument("logout", None)
if logout:
self.user_logout(user['upn'])
return
logging.debug('APIAuthHandler: user is NOT authenticated')
self.write('unauthenticated')
self.finish()
def __init__(self):
handlers = [
(r"/", MainHandler),
(r"/auth/login", AuthLoginHandler),
(r"/auth/logout", AuthLogoutHandler),
]
settings = dict(
cookie_secret="__TODO:_GENERATE_YOUR_OWN_RANDOM_VALUE_HERE__",
login_url="/auth/login",
template_path=os.path.join(os.path.dirname(__file__), "templates"),
static_path=os.path.join(os.path.dirname(__file__), "static"),
xsrf_cookies=True,
facebook_api_key=options.facebook_api_key,
facebook_secret=options.facebook_secret,
ui_modules={"Post": PostModule},
debug=True,
autoescape=None,
)
tornado.web.Application.__init__(self, handlers, **settings)
def main():
parse_command_line()
app = tornado.web.Application(
[
(r"/", MainHandler),
(r"/auth/login", AuthLoginHandler),
(r"/auth/logout", AuthLogoutHandler),
(r"/a/message/new", MessageNewHandler),
(r"/a/message/updates", MessageUpdatesHandler),
],
cookie_secret="__TODO:_GENERATE_YOUR_OWN_RANDOM_VALUE_HERE__",
login_url="/auth/login",
template_path=os.path.join(os.path.dirname(__file__), "templates"),
static_path=os.path.join(os.path.dirname(__file__), "static"),
xsrf_cookies=True,
)
app.listen(options.port)
tornado.ioloop.IOLoop.instance().start()
def _on_auth(self, user):
if not user:
raise tornado.web.HTTPError(500, "Google auth failed")
author = self.db.get("SELECT * FROM authors WHERE email = %s",
user["email"])
if not author:
# Auto-create first author
any_author = self.db.get("SELECT * FROM authors LIMIT 1")
if not any_author:
author_id = self.db.execute(
"INSERT INTO authors (email,name) VALUES (%s,%s)",
user["email"], user["name"])
else:
self.redirect("/")
return
else:
author_id = author["id"]
self.set_secure_cookie("blogdemo_user", str(author_id))
self.redirect(self.get_argument("next", "/"))
def __init__(self):
handlers = [
(r"/", MainHandler),
(r"/auth/login", AuthLoginHandler),
(r"/auth/logout", AuthLogoutHandler),
]
settings = dict(
cookie_secret="__TODO:_GENERATE_YOUR_OWN_RANDOM_VALUE_HERE__",
login_url="/auth/login",
template_path=os.path.join(os.path.dirname(__file__), "templates"),
static_path=os.path.join(os.path.dirname(__file__), "static"),
xsrf_cookies=True,
facebook_api_key=options.facebook_api_key,
facebook_secret=options.facebook_secret,
ui_modules={"Post": PostModule},
debug=True,
autoescape=None,
)
tornado.web.Application.__init__(self, handlers, **settings)
def get(self):
if self.get_argument("oauth_token", None):
user = yield self.get_authenticated_user()
if not user:
return tornado.web.HTTPError(500, "Twitter auth failed")
reversi_user = User.get_or_create(twitter_id=user['id'])[0]
reversi_user.name = user['name']
reversi_user.save()
self.save_current_user(reversi_user)
self.redirect(self.get_argument("next", "/"))
else:
yield self.authorize_redirect(callback_uri=self.request.full_url())
self.authenticate_redirect()
def __init__(self):
handlers = [
(r"/", MainHandler),
(r"/auth/login", AuthLoginHandler),
(r"/auth/logout", AuthLogoutHandler),
]
settings = dict(
cookie_secret="__TODO:_GENERATE_YOUR_OWN_RANDOM_VALUE_HERE__",
login_url="/auth/login",
template_path=os.path.join(os.path.dirname(__file__), "templates"),
static_path=os.path.join(os.path.dirname(__file__), "static"),
xsrf_cookies=True,
facebook_api_key=options.facebook_api_key,
facebook_secret=options.facebook_secret,
ui_modules={"Post": PostModule},
debug=True,
autoescape=None,
)
tornado.web.Application.__init__(self, handlers, **settings)
def __init__(self):
handlers = [
(r"/", MainHandler),
(r"/auth/login", AuthLoginHandler),
(r"/auth/logout", AuthLogoutHandler),
]
settings = dict(
cookie_secret="__TODO:_GENERATE_YOUR_OWN_RANDOM_VALUE_HERE__",
login_url="/auth/login",
template_path=os.path.join(os.path.dirname(__file__), "templates"),
static_path=os.path.join(os.path.dirname(__file__), "static"),
xsrf_cookies=True,
facebook_api_key=options.facebook_api_key,
facebook_secret=options.facebook_secret,
ui_modules={"Post": PostModule},
debug=True,
autoescape=None,
)
tornado.web.Application.__init__(self, handlers, **settings)
def __init__(self):
handlers = [
(r"/", MainHandler),
(r"/auth/login", AuthLoginHandler),
(r"/auth/logout", AuthLogoutHandler),
]
settings = dict(
cookie_secret="__TODO:_GENERATE_YOUR_OWN_RANDOM_VALUE_HERE__",
login_url="/auth/login",
template_path=os.path.join(os.path.dirname(__file__), "templates"),
static_path=os.path.join(os.path.dirname(__file__), "static"),
xsrf_cookies=True,
facebook_api_key=options.facebook_api_key,
facebook_secret=options.facebook_secret,
ui_modules={"Post": PostModule},
debug=True,
autoescape=None,
)
tornado.web.Application.__init__(self, handlers, **settings)
def __init__(self):
handlers = [
(r"/", MainHandler),
(r"/auth/login", AuthLoginHandler),
(r"/auth/logout", AuthLogoutHandler),
]
settings = dict(
cookie_secret="__TODO:_GENERATE_YOUR_OWN_RANDOM_VALUE_HERE__",
login_url="/auth/login",
template_path=os.path.join(os.path.dirname(__file__), "templates"),
static_path=os.path.join(os.path.dirname(__file__), "static"),
xsrf_cookies=True,
facebook_api_key=options.facebook_api_key,
facebook_secret=options.facebook_secret,
ui_modules={"Post": PostModule},
debug=True,
autoescape=None,
)
tornado.web.Application.__init__(self, handlers, **settings)
def get(self):
if self.get_argument('code', False):
user = yield self.get_authenticated_user(
redirect_uri='http://your.site.com/auth/google',
code=self.get_argument('code'))
# Save the user with e.g. set_secure_cookie
else:
yield self.authorize_redirect(
redirect_uri='http://your.site.com/auth/google',
client_id=self.settings['google_oauth']['key'],
scope=['profile', 'email'],
response_type='code',
extra_params={'approval_prompt': 'auto'})
def _on_auth(self, user):
if not user:
raise tornado.web.HTTPError(500, _("Kerberos auth failed"))
logging.debug(_("KerberosAuthHandler user: %s" % user))
user = {'upn': user}
# This takes care of the user's settings dir and their session info
self.user_login(user)
# TODO: Add some LDAP or local DB lookups here to add more detail to user objects
next_url = self.get_argument("next", None)
if next_url:
self.redirect(next_url)
else:
self.redirect(self.settings['url_prefix'])
def __init__(self):
handlers = [
(r"/", MainHandler),
(r"/auth/login", AuthHandler),
(r"/auth/logout", LogoutHandler),
]
settings = dict(
cookie_secret="__TODO:_GENERATE_YOUR_OWN_RANDOM_VALUE_HERE__",
login_url="/auth/login",
)
tornado.web.Application.__init__(self, handlers, **settings)
def get(self):
name = tornado.escape.xhtml_escape(self.current_user["name"])
self.write("Hello, " + name)
self.write("<br><br><a href=\"/auth/logout\">Log out</a>")
def _on_stream(self, stream):
if stream is None:
# Session may have expired
self.redirect("/auth/login")
return
self.render("stream.html", stream=stream)
def get(self):
my_url = (self.request.protocol + "://" + self.request.host +
"/auth/login?next=" +
tornado.escape.url_escape(self.get_argument("next", "/")))
if self.get_argument("code", False):
self.get_authenticated_user(
redirect_uri=my_url,
client_id=self.settings["facebook_api_key"],
client_secret=self.settings["facebook_secret"],
code=self.get_argument("code"),
callback=self._on_auth)
return
self.authorize_redirect(redirect_uri=my_url,
client_id=self.settings["facebook_api_key"],
extra_params={"scope": "read_stream"})
def __init__(self):
handlers = [
(r"/", HomeHandler),
(r"/archive", ArchiveHandler),
(r"/feed", FeedHandler),
(r"/entry/([^/]+)", EntryHandler),
(r"/compose", ComposeHandler),
(r"/auth/login", AuthLoginHandler),
(r"/auth/logout", AuthLogoutHandler),
]
settings = dict(
blog_title=u"Tornado Blog",
template_path=os.path.join(os.path.dirname(__file__), "templates"),
static_path=os.path.join(os.path.dirname(__file__), "static"),
ui_modules={"Entry": EntryModule},
xsrf_cookies=True,
cookie_secret="__TODO:_GENERATE_YOUR_OWN_RANDOM_VALUE_HERE__",
login_url="/auth/login",
debug=True,
)
tornado.web.Application.__init__(self, handlers, **settings)
# Have one global connection to the blog DB across all handlers
self.db = torndb.Connection(
host=options.mysql_host, database=options.mysql_database,
user=options.mysql_user, password=options.mysql_password)
def _on_stream(self, stream):
if stream is None:
# Session may have expired
self.redirect("/auth/login")
return
self.render("stream.html", stream=stream)
def _on_auth(self, user):
if not user:
raise tornado.web.HTTPError(500, "Facebook auth failed")
self.set_secure_cookie("fbdemo_user", tornado.escape.json_encode(user))
self.redirect(self.get_argument("next", "/"))
def _on_auth(self, user):
if not user:
raise tornado.web.HTTPError(500, "Twitter auth failed")
#is there an existing external account?
current_user = self.get_current_user()
authenticated_user = User.get("id=%s", current_user['id'])
existing = Externalservice.by_user(authenticated_user, Externalservice.TWITTER)
if existing:
existing.service_id = user['access_token']['user_id']
existing.service_secret = user['access_token']['secret']
existing.service_key = user['access_token']['key']
existing.screen_name = user['access_token']['screen_name']
existing.save()
else:
external_service = Externalservice(
user_id=authenticated_user.id,
service_id=user['access_token']['user_id'],
screen_name=user['access_token']['screen_name'],
type=Externalservice.TWITTER,
service_key=user['access_token']['key'],
service_secret=user['access_token']['secret'])
external_service.save()
# if not, insert credentials for this user
# if there is, update that account
return self.render("tools/twitter-connected.html")
def _on_stream(self, stream):
if stream is None:
# Session may have expired
self.redirect("/auth/login")
return
self.render("stream.html", stream=stream)
def get(self):
my_url = (self.request.protocol + "://" + self.request.host +
"/auth/login?next=" +
tornado.escape.url_escape(self.get_argument("next", "/")))
if self.get_argument("code", False):
self.get_authenticated_user(
redirect_uri=my_url,
client_id=self.settings["facebook_api_key"],
client_secret=self.settings["facebook_secret"],
code=self.get_argument("code"),
callback=self._on_auth)
return
self.authorize_redirect(redirect_uri=my_url,
client_id=self.settings["facebook_api_key"],
extra_params={"scope": "user_posts"})
def _on_auth(self, user):
if not user:
raise tornado.web.HTTPError(500, "Facebook auth failed")
self.set_secure_cookie("fbdemo_user", tornado.escape.json_encode(user))
self.redirect(self.get_argument("next", "/"))
def _on_stream(self, stream):
if stream is None:
# Session may have expired
self.redirect("/auth/login")
return
self.render("stream.html", stream=stream)
def get(self):
my_url = (self.request.protocol + "://" + self.request.host +
"/auth/login?next=" +
tornado.escape.url_escape(self.get_argument("next", "/")))
if self.get_argument("code", False):
self.get_authenticated_user(
redirect_uri=my_url,
client_id=self.settings["facebook_api_key"],
client_secret=self.settings["facebook_secret"],
code=self.get_argument("code"),
callback=self._on_auth)
return
self.authorize_redirect(redirect_uri=my_url,
client_id=self.settings["facebook_api_key"],
extra_params={"scope": "user_posts"})
def _on_auth(self, user):
if not user:
raise tornado.web.HTTPError(500, "Facebook auth failed")
self.set_secure_cookie("fbdemo_user", tornado.escape.json_encode(user))
self.redirect(self.get_argument("next", "/"))
def _on_stream(self, stream):
if stream is None:
# Session may have expired
self.redirect("/auth/login")
return
self.render("stream.html", stream=stream)
def get(self):
my_url = (self.request.protocol + "://" + self.request.host +
"/auth/login?next=" +
tornado.escape.url_escape(self.get_argument("next", "/")))
if self.get_argument("code", False):
self.get_authenticated_user(
redirect_uri=my_url,
client_id=self.settings["facebook_api_key"],
client_secret=self.settings["facebook_secret"],
code=self.get_argument("code"),
callback=self._on_auth)
return
self.authorize_redirect(redirect_uri=my_url,
client_id=self.settings["facebook_api_key"],
extra_params={"scope": "user_posts"})