def save_auth_tokens(token_dict, user=None):
"""Associates the tokens with the current user and writes to the datastore.
If there us no current user, the tokens are not written and this function
returns None.
Returns:
The key of the datastore entity containing the user's tokens, or None if
there was no current user.
"""
if user is None:
user = users.get_current_user()
if user is None:
return None
memcache.set('gdata_pickled_tokens:%s' % user, pickle.dumps(token_dict))
user_tokens = TokenCollection.all().filter('user =', user).get()
if user_tokens:
user_tokens.pickled_tokens = pickle.dumps(token_dict)
return user_tokens.put()
else:
user_tokens = TokenCollection(
user=user,
pickled_tokens=pickle.dumps(token_dict))
return user_tokens.put()
python类get_current_user()的实例源码
def load_auth_tokens(user=None):
"""Reads a dictionary of the current user's tokens from the datastore.
If there is no current user (a user is not signed in to the app) or the user
does not have any tokens, an empty dictionary is returned.
"""
if user is None:
user = users.get_current_user()
if user is None:
return {}
pickled_tokens = memcache.get('gdata_pickled_tokens:%s' % user)
if pickled_tokens:
return pickle.loads(pickled_tokens)
user_tokens = TokenCollection.all().filter('user =', user).get()
if user_tokens:
memcache.set('gdata_pickled_tokens:%s' % user, user_tokens.pickled_tokens)
return pickle.loads(user_tokens.pickled_tokens)
return {}
def GetUserByAuth():
"""Returns current users profile."""
user_key = users.get_current_user().user_id()
user_key = ndb.Key('Profile', user_key)
profile = user_key.get()
for ledger in profile.user_ledger:
try:
price = GetPriceByPredictionId(ledger.prediction_id)
ledger.value = math.fabs((price * ledger.contract_one) - (
price * ledger.contract_two))
ledger.prediction_statement = ndb.Key(
urlsafe=ledger.prediction_id).get().statement
except:
ledger.value = 404
ledger.prediction_statement = 'ERROR'
return render_template('profile.html', profile=profile)
def SellStake():
user_id = users.get_current_user().user_id()
user_key = ndb.Key('Profile', user_id)
current_user = user_key.get()
prediction_key = ndb.Key(urlsafe=request.form['prediction_id'])
prediction = prediction_key.get()
portfolio = GetUserPortfolioByAuth(request.form['prediction_id'])
for ledger in portfolio:
if ledger.contract_one > 0:
contract = 'CONTRACT_ONE'
quantity = ledger.contract_one
else:
contract = 'CONTRACT_TWO'
quantity = ledger.contract_two
trade = Trade(
prediction_id=prediction_key,
user_id=user_key,
direction='SELL',
contract=contract,
quantity=float(quantity))
err = CreateTradeAction(prediction, current_user, trade)
if err != 'error':
flash('You sold your stake!')
return redirect('/users/me')
def main():
"""CGI-style request handler to dump the configuration.
Put this in your app.yaml to enable (you can pick any URL):
- url: /lib_config
script: $PYTHON_LIB/google/appengine/api/lib_config.py
Note: unless you are using the SDK, you must be admin.
"""
if not os.getenv('SERVER_SOFTWARE', '').startswith('Dev'):
from google.appengine.api import users
if not users.is_current_user_admin():
if users.get_current_user() is None:
print 'Status: 302'
print 'Location:', users.create_login_url(os.getenv('PATH_INFO', ''))
else:
print 'Status: 403'
print
print 'Forbidden'
return
print 'Content-type: text/plain'
print
_default_registry._dump()
def main():
"""CGI-style request handler to dump the configuration.
Put this in your app.yaml to enable (you can pick any URL):
- url: /lib_config
script: $PYTHON_LIB/google/appengine/api/lib_config.py
Note: unless you are using the SDK, you must be admin.
"""
if not os.getenv('SERVER_SOFTWARE', '').startswith('Dev'):
from google.appengine.api import users
if not users.is_current_user_admin():
if users.get_current_user() is None:
print 'Status: 302'
print 'Location:', users.create_login_url(os.getenv('PATH_INFO', ''))
else:
print 'Status: 403'
print
print 'Forbidden'
return
print 'Content-type: text/plain'
print
_default_registry._dump()
def _CheckUserAllowedToSeeMovementMaybeSetStatus(self, tourney, player_pair):
error = "Forbidden User"
user = users.get_current_user()
if user and tourney.owner_id == user.user_id():
return True
pair_id = GetPairIdFromRequest(self.request)
if not pair_id:
SetErrorStatus(self.response, 403, error,
"User does not own tournament and is not authenticated " +
"with a pair code to see this movement")
return False
if pair_id != player_pair.id:
SetErrorStatus(self.response, 403, error,
"User does not own tournament and is authenticated with " +
"the wrong code for pair {}".format(player_pair.pair_no))
return False
return True
def get(self, id):
tourney = GetTourneyWithIdAndMaybeReturnStatus(self.response, id)
if not tourney:
return
if not CheckUserOwnsTournamentAndMaybeReturnStatus(self.response,
users.get_current_user(), tourney):
return
boards = ReadJSONInput(tourney.GetScoredHandList())
max_rounds = GetMaxRounds(boards)
summaries = Calculate(boards, max_rounds)
mp_summaries = summaries
ap_summaries = summaries
boards.sort(key=lambda bs : bs._board_no, reverse = False)
wb = WriteResultsToXlsx(max_rounds, mp_summaries, ap_summaries, boards,
name_list=self._GetPlayerListForTourney(tourney))
self.response.out.write(OutputWorkbookAsBytesIO(wb).getvalue())
self.response.headers['Content-Type'] = 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'
self.response.headers['Content-disposition'] = str('attachment; filename=' +
tourney.name + 'TournamentResults.xlsx')
self.response.headers['Content-Transfer-Encoding'] = 'Binary'
self.response.set_status(200)
def get(self, id):
tourney = GetTourneyWithIdAndMaybeReturnStatus(self.response, id)
if not tourney:
return
if not CheckUserOwnsTournamentAndMaybeReturnStatus(
self.response,
users.get_current_user(),
tourney):
return
boards = tourney.GetBoards()
boardgenerator.RenderToIo(boards, self.response.out)
self.response.headers['Content-Type'] = 'application/pdf'
self.response.headers['Content-Disposition'] = (
'attachment; filename=%sBoards.pdf' % str(urllib.quote(tourney.name)))
self.response.set_status(200)
def main():
"""CGI-style request handler to dump the configuration.
Put this in your app.yaml to enable (you can pick any URL):
- url: /lib_config
script: $PYTHON_LIB/google/appengine/api/lib_config.py
Note: unless you are using the SDK, you must be admin.
"""
if not os.getenv('SERVER_SOFTWARE', '').startswith('Dev'):
from google.appengine.api import users
if not users.is_current_user_admin():
if users.get_current_user() is None:
print 'Status: 302'
print 'Location:', users.create_login_url(os.getenv('PATH_INFO', ''))
else:
print 'Status: 403'
print
print 'Forbidden'
return
print 'Content-type: text/plain'
print
_default_registry._dump()
def sys_processor():
def _get_app_var(key):
if key in constants.APPVAR_DISPLAY_LIST:
return models.ApplicationVariable.get_app_var(key)
return None
def _is_logged_in():
if users.get_current_user():
return True
return False
def _is_admin():
return users.is_current_user_admin()
def _login_link(endpoint):
return users.create_login_url(endpoint)
return dict(get_app_var=_get_app_var,
is_logged_in=_is_logged_in,
is_admin=_is_admin,
login_link=_login_link)
def save_auth_tokens(token_dict, user=None):
"""Associates the tokens with the current user and writes to the datastore.
If there us no current user, the tokens are not written and this function
returns None.
Returns:
The key of the datastore entity containing the user's tokens, or None if
there was no current user.
"""
if user is None:
user = users.get_current_user()
if user is None:
return None
memcache.set('gdata_pickled_tokens:%s' % user, pickle.dumps(token_dict))
user_tokens = TokenCollection.all().filter('user =', user).get()
if user_tokens:
user_tokens.pickled_tokens = pickle.dumps(token_dict)
return user_tokens.put()
else:
user_tokens = TokenCollection(
user=user,
pickled_tokens=pickle.dumps(token_dict))
return user_tokens.put()
def load_auth_tokens(user=None):
"""Reads a dictionary of the current user's tokens from the datastore.
If there is no current user (a user is not signed in to the app) or the user
does not have any tokens, an empty dictionary is returned.
"""
if user is None:
user = users.get_current_user()
if user is None:
return {}
pickled_tokens = memcache.get('gdata_pickled_tokens:%s' % user)
if pickled_tokens:
return pickle.loads(pickled_tokens)
user_tokens = TokenCollection.all().filter('user =', user).get()
if user_tokens:
memcache.set('gdata_pickled_tokens:%s' % user, user_tokens.pickled_tokens)
return pickle.loads(user_tokens.pickled_tokens)
return {}
def dispatch(self, request, **kwargs):
"""
Override disptach to validate export format and set user.
While generally not good practice, we set CSRF as exempt
because no POST requests handled by this view modify any
server side data.
"""
self.user = users.get_current_user()
self.format = request.POST.get('format', 'csv').lower()
eventbus.publish_appevent(
kind=EventKind.USEREXPORTEDVIDEOS,
object_id=request.user.pk,
project_id=kwargs['project_id'],
user=request.user,
meta=self.format
)
if self.format not in ('csv', 'kml'):
return HttpResponseBadRequest("Format not valid")
return super(BaseExportView, self).dispatch(request, **kwargs)
def post(self):
# We set the same parent key on the 'Greeting' to ensure each greeting
# is in the same entity group. Queries across the single entity group
# are strongly consistent. However, the write rate to a single entity
# group is limited to ~1/second.
guestbook_name = self.request.get('guestbook_name')
greeting = Greeting(parent=guestbook_key(guestbook_name))
if users.get_current_user():
greeting.author = users.get_current_user().nickname()
greeting.content = self.request.get('content')
greeting.put()
memcache.delete('{}:greetings'.format(guestbook_name))
self.redirect('/?' +
urllib.urlencode({'guestbook_name': guestbook_name}))
def post(self):
try:
upload = self.get_uploads()[0]
user_photo = UserPhoto(
user=users.get_current_user().user_id(),
blob_key=upload.key())
user_photo.put()
self.redirect('/view_photo/%s' % upload.key())
except:
self.error(500)
# [END upload_handler]
# [START download_handler]
def post(self):
guestbook_name = self.request.get('guestbook_name')
greeting = Greeting(parent=guestbook_key(guestbook_name))
if users.get_current_user():
greeting.author = users.get_current_user().nickname()
greeting.content = self.request.get('content')
# [START sign_handler_1]
avatar = self.request.get('img')
# [END sign_handler_1]
# [START transform]
avatar = images.resize(avatar, 32, 32)
# [END transform]
# [START sign_handler_2]
greeting.avatar = avatar
greeting.put()
# [END sign_handler_1]
self.redirect('/?' + urllib.urlencode(
{'guestbook_name': guestbook_name}))
# [END sign_handler]
def login_required(handler_method):
"""A decorator to require that a user be logged in to access a handler.
To use it, decorate your get() method like this:
@login_required
def get(self):
user = users.get_current_user(self)
self.response.out.write('Hello, ' + user.nickname())
We will redirect to a login page if the user is not logged in. We always
redirect to the request URI, and Google Accounts only redirects back as a GET
request, so this should not be used for POSTs.
"""
def check_login(self, *args):
if self.request.method != 'GET':
raise webapp.Error('The check_login decorator can only be used for GET '
'requests')
user = users.get_current_user()
if not user:
self.redirect(users.create_login_url(self.request.uri))
return
else:
handler_method(self, *args)
return check_login
def main():
"""CGI-style request handler to dump the configuration.
Put this in your app.yaml to enable (you can pick any URL):
- url: /lib_config
script: $PYTHON_LIB/google/appengine/api/lib_config.py
Note: unless you are using the SDK, you must be admin.
"""
if not os.getenv('SERVER_SOFTWARE', '').startswith('Dev'):
from google.appengine.api import users
if not users.is_current_user_admin():
if users.get_current_user() is None:
print 'Status: 302'
print 'Location:', users.create_login_url(os.getenv('PATH_INFO', ''))
else:
print 'Status: 403'
print
print 'Forbidden'
return
print 'Content-type: text/plain'
print
_default_registry._dump()
def save_auth_tokens(token_dict, user=None):
"""Associates the tokens with the current user and writes to the datastore.
If there us no current user, the tokens are not written and this function
returns None.
Returns:
The key of the datastore entity containing the user's tokens, or None if
there was no current user.
"""
if user is None:
user = users.get_current_user()
if user is None:
return None
memcache.set('gdata_pickled_tokens:%s' % user, pickle.dumps(token_dict))
user_tokens = TokenCollection.all().filter('user =', user).get()
if user_tokens:
user_tokens.pickled_tokens = pickle.dumps(token_dict)
return user_tokens.put()
else:
user_tokens = TokenCollection(
user=user,
pickled_tokens=pickle.dumps(token_dict))
return user_tokens.put()
def load_auth_tokens(user=None):
"""Reads a dictionary of the current user's tokens from the datastore.
If there is no current user (a user is not signed in to the app) or the user
does not have any tokens, an empty dictionary is returned.
"""
if user is None:
user = users.get_current_user()
if user is None:
return {}
pickled_tokens = memcache.get('gdata_pickled_tokens:%s' % user)
if pickled_tokens:
return pickle.loads(pickled_tokens)
user_tokens = TokenCollection.all().filter('user =', user).get()
if user_tokens:
memcache.set('gdata_pickled_tokens:%s' % user, user_tokens.pickled_tokens)
return pickle.loads(user_tokens.pickled_tokens)
return {}
def main():
"""CGI-style request handler to dump the configuration.
Put this in your app.yaml to enable (you can pick any URL):
- url: /lib_config
script: $PYTHON_LIB/google/appengine/api/lib_config.py
Note: unless you are using the SDK, you must be admin.
"""
if not os.getenv('SERVER_SOFTWARE', '').startswith('Dev'):
from google.appengine.api import users
if not users.is_current_user_admin():
if users.get_current_user() is None:
print 'Status: 302'
print 'Location:', users.create_login_url(os.getenv('PATH_INFO', ''))
else:
print 'Status: 403'
print
print 'Forbidden'
return
print 'Content-type: text/plain'
print
_default_registry._dump()
def get(self):
user = UserData.get_current_user()
if user:
# we already have this user in datastore, do nothing at this time
pass
else:
logging.info('into Login handler')
google_user_property = users.get_current_user()
user = UserData.get_by_user_email(google_user_property.email())
if user:
# ???????????email??????????????
logging.error('Email %s already used by one of our user' % user.user_email)
raise Exception('Email %s already used by one of our user' % user.user_email)
else: # email ???????????????
new_user = UserData(google_user_id=google_user_property.user_id(),
user_email=google_user_property.email(),
user_nickname=google_user_property.nickname()
)
new_user.put()
self.redirect('/find-resource')
def oauth_aware(self, method):
"""Decorator that sets up for OAuth 2.0 dance, but doesn't do it.
Does all the setup for the OAuth dance, but doesn't initiate it.
This decorator is useful if you want to create a page that knows
whether or not the user has granted access to this application.
From within a method decorated with @oauth_aware the has_credentials()
and authorize_url() methods can be called.
Args:
method: callable, to be decorated method of a webapp.RequestHandler
instance.
"""
def setup_oauth(request_handler, *args, **kwargs):
if self._in_error:
self._display_error_message(request_handler)
return
user = users.get_current_user()
# Don't use @login_decorator as this could be used in a
# POST request.
if not user:
request_handler.redirect(users.create_login_url(
request_handler.request.uri))
return
self._create_flow(request_handler)
self.flow.params['state'] = _build_state_value(request_handler,
user)
self.credentials = self._storage_class(
self._credentials_class, None,
self._credentials_property_name, user=user).get()
try:
resp = method(request_handler, *args, **kwargs)
finally:
self.credentials = None
return resp
return setup_oauth
def get(self):
if decorator.has_credentials():
# result = service.tasks().list(tasklist='@default').execute(
# http=decorator.http())
# tasks = result.get('items', [])
# for task in tasks:
# task['title_short'] = truncate(task['title'], 26)
logging.debug(decorator.credentials)
user = users.get_current_user()
msg = test_imap(user.email())
# self.response.write(msg)
self.render_response('index.html', tasks=[], msg=msg)
else:
url = decorator.authorize_url()
self.render_response('index.html', tasks=[], authorize_url=url)
def oauth_aware(self, method):
"""Decorator that sets up for OAuth 2.0 dance, but doesn't do it.
Does all the setup for the OAuth dance, but doesn't initiate it.
This decorator is useful if you want to create a page that knows
whether or not the user has granted access to this application.
From within a method decorated with @oauth_aware the has_credentials()
and authorize_url() methods can be called.
Args:
method: callable, to be decorated method of a webapp.RequestHandler
instance.
"""
def setup_oauth(request_handler, *args, **kwargs):
if self._in_error:
self._display_error_message(request_handler)
return
user = users.get_current_user()
# Don't use @login_decorator as this could be used in a POST request.
if not user:
request_handler.redirect(users.create_login_url(
request_handler.request.uri))
return
self._create_flow(request_handler)
self.flow.params['state'] = _build_state_value(request_handler, user)
self.credentials = self._storage_class(
self._credentials_class, None,
self._credentials_property_name, user=user).get()
try:
resp = method(request_handler, *args, **kwargs)
finally:
self.credentials = None
return resp
return setup_oauth
def get_current_employee(cls):
user = users.get_current_user()
user_email = user.email()
employee = cls.query(cls.user == user, cls.terminated == False).get() # noqa
if employee is None:
raise NoSuchEmployee('Couldn\'t find a Google Apps user with email {}'.format(user_email))
return employee
def user_required(func):
@wraps(func)
def decorated_view(*args, **kwargs):
if not users.get_current_user():
return redirect(users.create_login_url(request.url))
return func(*args, **kwargs)
return decorated_view
def admin_required(func):
@wraps(func)
def decorated_view(*args, **kwargs):
if users.get_current_user():
if not users.is_current_user_admin():
abort(401) # Unauthorized
return func(*args, **kwargs)
return redirect(users.create_login_url(request.url))
return decorated_view
def is_admin():
return users.get_current_user() and users.is_current_user_admin()