def test_exchange_code_and_file_for_token(self):
payload = (b'{'
b' "access_token":"asdfghjkl",'
b' "expires_in":3600'
b'}')
http = http_mock.HttpMock(data=payload)
credentials = client.credentials_from_clientsecrets_and_code(
datafile('client_secrets.json'), self.scope,
self.code, http=http)
self.assertEqual(credentials.access_token, 'asdfghjkl')
self.assertNotEqual(None, credentials.token_expiry)
self.assertEqual(set(['foo']), credentials.scopes)
python类credentials_from_clientsecrets_and_code()的实例源码
def test_exchange_code_and_cached_file_for_token(self):
http = http_mock.HttpMock(data=b'{ "access_token":"asdfghjkl"}')
cache_mock = http_mock.CacheMock()
load_and_cache('client_secrets.json', 'some_secrets', cache_mock)
credentials = client.credentials_from_clientsecrets_and_code(
'some_secrets', self.scope,
self.code, http=http, cache=cache_mock)
self.assertEqual(credentials.access_token, 'asdfghjkl')
self.assertEqual(set(['foo']), credentials.scopes)
def test_exchange_code_and_file_for_token_fail(self):
http = http_mock.HttpMock(
headers={'status': http_client.BAD_REQUEST},
data=b'{"error":"invalid_request"}',
)
with self.assertRaises(client.FlowExchangeError):
client.credentials_from_clientsecrets_and_code(
datafile('client_secrets.json'), self.scope,
self.code, http=http)
def test_exchange_code_and_file_for_token(self):
payload = (b'{'
b' "access_token":"asdfghjkl",'
b' "expires_in":3600'
b'}')
http = http_mock.HttpMock(data=payload)
credentials = client.credentials_from_clientsecrets_and_code(
datafile('client_secrets.json'), self.scope,
self.code, http=http)
self.assertEqual(credentials.access_token, 'asdfghjkl')
self.assertNotEqual(None, credentials.token_expiry)
self.assertEqual(set(['foo']), credentials.scopes)
def test_exchange_code_and_cached_file_for_token(self):
http = http_mock.HttpMock(data=b'{ "access_token":"asdfghjkl"}')
cache_mock = http_mock.CacheMock()
load_and_cache('client_secrets.json', 'some_secrets', cache_mock)
credentials = client.credentials_from_clientsecrets_and_code(
'some_secrets', self.scope,
self.code, http=http, cache=cache_mock)
self.assertEqual(credentials.access_token, 'asdfghjkl')
self.assertEqual(set(['foo']), credentials.scopes)
def test_exchange_code_and_file_for_token_fail(self):
http = http_mock.HttpMock(
headers={'status': http_client.BAD_REQUEST},
data=b'{"error":"invalid_request"}',
)
with self.assertRaises(client.FlowExchangeError):
client.credentials_from_clientsecrets_and_code(
datafile('client_secrets.json'), self.scope,
self.code, http=http)
def google_login(request):
"""
1. Receive google token from google's signin callback.
2. Get token's email & get/create a user against it.
3. Mark the user as logged in.
"""
# Exchange auth code for access token, refresh token, and ID token
auth_code = request.POST['authCode']
credentials = client.credentials_from_clientsecrets_and_code(
settings.CLIENT_SECRET_FILE,
['https://www.googleapis.com/auth/drive', 'profile', 'email'],
auth_code)
email = credentials.id_token['email']
guser_id = credentials.id_token['sub']
username = email.split('@')[0]
try:
user = User.objects.get(email=email)
except:
user = User(username=username, email=email)
user.save()
googleuser, created = GoogleUser.objects.get_or_create(user=user)
googleuser.access_token = credentials.access_token
googleuser.refresh_token = credentials.refresh_token
googleuser.guser_id = guser_id
googleuser.save()
login(request, user)
return JsonResponse({"success": True})
def oauth2callback(request):
auth_code = request.GET['code']
print auth_code
credentials = client.credentials_from_clientsecrets_and_code(
settings.CLIENT_SECRET_FILE,
['https://www.googleapis.com/auth/drive', 'profile', 'email'],
auth_code,
redirect_uri=settings.HOST_ADDR+'/oauth2callback')
# credentials = flow.step2_exchange(auth_code)
print credentials.id_token
email = credentials.id_token['email']
guser_id = credentials.id_token['sub']
print credentials.id_token
username = email.split('@')[0]
try:
user = User.objects.get(email=email)
except:
user = User(username=username, email=email)
user.save()
googleuser, created = GoogleUser.objects.get_or_create(user=user)
googleuser.access_token = credentials.access_token
googleuser.refresh_token = credentials.refresh_token
googleuser.guser_id = guser_id
googleuser.save()
login(request, user)
state = request.GET.get('state')
if state:
param = state
else:
param = ''
redirect_uri = '/?' + param
return HttpResponseRedirect(redirect_uri)
def googlesignin():
auth_code = Utils.getParam(request.form, 'data', '')
if not auth_code:
return ''
client_secret_file = '/etc/ostrich_conf/google_client_secret.json'
credentials = client.credentials_from_clientsecrets_and_code(
client_secret_file,
['profile', 'email'],
auth_code)
http_auth = credentials.authorize(httplib2.Http())
users_service = discovery.build('oauth2', 'v2', http=http_auth)
user_document = users_service.userinfo().get().execute()
#TODO check consistencies b/w session and user cache
#from app import cache
#cache_key = credentials.id_token['sub']
#user = cache.get(cache_key)
user = session.get('_user', None)
if not user:
user_data = {
'username': user_document['email'],
'name': user_document['name'],
'email': user_document['email'],
'google_id': credentials.id_token['sub'],
'picture': user_document['picture'],
'source': 'web'
}
user = User.createUser(user_data)
WebUtils.storeUserSession(user, client=Utils.getParam(request.form, 'client', default=None))
user = session['_user']
visible_user_data = {'user': user}
return jsonify(data=visible_user_data)
def action(self, action, d):
base = "http://localhost:8080" if tools.on_dev_server() else BASE
if action == 'login':
scope = "email profile"
flow = User.get_auth_flow(scope=scope)
flow.params['access_type'] = 'offline'
flow.params['include_granted_scopes'] = 'true'
auth_uri = flow.step1_get_authorize_url(state=scope)
self.json_out({'auth_uri': auth_uri}, success=True, debug=True)
elif action == 'oauth2callback':
error = self.request.get('error')
code = self.request.get('code')
scope = self.request.get('scope')
state_scopes = self.request.get('state')
if code:
CLIENT_SECRET_FILE = os.path.join(os.path.dirname(__file__), 'client_secrets.json')
credentials = client.credentials_from_clientsecrets_and_code(
CLIENT_SECRET_FILE,
scope.split(' '),
code,
redirect_uri=base + "/api/auth/oauth2callback")
user = self.user
if not user:
email = credentials.id_token['email']
user = User.GetByEmail(email)
if not user:
# Create account
user = User.Create(email=email)
if user:
user.save_credentials(credentials)
user.put()
self.session['user'] = user
elif error:
logging.error(error)
self.redirect("/app/settings")