def sign_blob(self, blob):
"""Cryptographically sign a blob (of bytes).
Implements abstract method
:meth:`oauth2client.client.AssertionCredentials.sign_blob`.
Args:
blob: bytes, Message to be signed.
Returns:
tuple, A pair of the private key ID used to sign the blob and
the signed contents.
"""
return app_identity.sign_blob(blob)
python类sign_blob()的实例源码
def sign_blob(self, blob):
"""Cryptographically sign a blob (of bytes).
Implements abstract method
:meth:`oauth2client.client.AssertionCredentials.sign_blob`.
Args:
blob: bytes, Message to be signed.
Returns:
tuple, A pair of the private key ID used to sign the blob and
the signed contents.
"""
return app_identity.sign_blob(blob)
def sign_blob(self, blob):
"""Cryptographically sign a blob (of bytes).
Implements abstract method
:meth:`oauth2client.client.AssertionCredentials.sign_blob`.
Args:
blob: bytes, Message to be signed.
Returns:
tuple, A pair of the private key ID used to sign the blob and
the signed contents.
"""
return app_identity.sign_blob(blob)
def sign_blob(self, blob):
"""Cryptographically sign a blob (of bytes).
Implements abstract method
:meth:`oauth2client.client.AssertionCredentials.sign_blob`.
Args:
blob: bytes, Message to be signed.
Returns:
tuple, A pair of the private key ID used to sign the blob and
the signed contents.
"""
return app_identity.sign_blob(blob)
def sign_blob(self, blob):
"""Cryptographically sign a blob (of bytes).
Implements abstract method
:meth:`oauth2client.client.AssertionCredentials.sign_blob`.
Args:
blob: bytes, Message to be signed.
Returns:
tuple, A pair of the private key ID used to sign the blob and
the signed contents.
"""
return app_identity.sign_blob(blob)
def generate_jwt():
"""Generates a signed JSON Web Token using the Google App Engine default
service account."""
now = int(time.time())
header_json = json.dumps({
"typ": "JWT",
"alg": "RS256"})
payload_json = json.dumps({
"iat": now,
# expires after one hour.
"exp": now + 3600,
# iss is the service account email.
"iss": SERVICE_ACCOUNT_EMAIL,
# target_audience is the URL of the target service.
"target_audience": TARGET_AUD,
# aud must be Google token endpoints URL.
"aud": "https://www.googleapis.com/oauth2/v4/token"
})
headerAndPayload = '{}.{}'.format(
base64.urlsafe_b64encode(header_json),
base64.urlsafe_b64encode(payload_json))
(key_name, signature) = app_identity.sign_blob(headerAndPayload)
signed_jwt = '{}.{}'.format(
headerAndPayload,
base64.urlsafe_b64encode(signature))
return signed_jwt
def generate_jwt():
"""Generates a signed JSON Web Token using the Google App Engine default
service account."""
now = int(time.time())
header_json = json.dumps({
"typ": "JWT",
"alg": "RS256"})
payload_json = json.dumps({
'iat': now,
# expires after one hour.
"exp": now + 3600,
# iss is the Google App Engine default service account email.
'iss': DEFAULT_SERVICE_ACCOUNT,
'sub': DEFAULT_SERVICE_ACCOUNT,
# aud must match 'audience' in the security configuration in your
# swagger spec.It can be any string.
'aud': 'echo.endpoints.sample.google.com',
"email": DEFAULT_SERVICE_ACCOUNT
})
headerAndPayload = '{}.{}'.format(
base64.urlsafe_b64encode(header_json),
base64.urlsafe_b64encode(payload_json))
(key_name, signature) = app_identity.sign_blob(headerAndPayload)
signed_jwt = '{}.{}'.format(
headerAndPayload,
base64.urlsafe_b64encode(signature))
return signed_jwt
def get(self):
message = 'Hello, world!'
signing_key_name, signature = app_identity.sign_blob(message)
verified = verify_signed_by_app(message, signature)
self.response.content_type = 'text/plain'
self.response.write('Message: {}\n'.format(message))
self.response.write(
'Signature: {}\n'.format(base64.b64encode(signature)))
self.response.write('Verified: {}\n'.format(verified))
def create_custom_token(uid, valid_minutes=60):
"""Create a secure token for the given id.
This method is used to create secure custom JWT tokens to be passed to
clients. It takes a unique id (uid) that will be used by Firebase's
security rules to prevent unauthorized access. In this case, the uid will
be the channel id which is a combination of user_id and game_key
"""
# use the app_identity service from google.appengine.api to get the
# project's service account email automatically
client_email = app_identity.get_service_account_name()
now = int(time.time())
# encode the required claims
# per https://firebase.google.com/docs/auth/server/create-custom-tokens
payload = base64.b64encode(json.dumps({
'iss': client_email,
'sub': client_email,
'aud': _IDENTITY_ENDPOINT,
'uid': uid, # the important parameter, as it will be the channel id
'iat': now,
'exp': now + (valid_minutes * 60),
}))
# add standard header to identify this as a JWT
header = base64.b64encode(json.dumps({'typ': 'JWT', 'alg': 'RS256'}))
to_sign = '{}.{}'.format(header, payload)
# Sign the jwt using the built in app_identity service
return '{}.{}'.format(to_sign, base64.b64encode(
app_identity.sign_blob(to_sign)[1]))
def generate_jwt():
"""Generates a signed JSON Web Token using the Google App Engine default
service account."""
now = int(time.time())
header_json = json.dumps({
"typ": "JWT",
"alg": "RS256"})
payload_json = json.dumps({
"iat": now,
# expires after one hour.
"exp": now + 3600,
# iss is the Google App Engine default service account email.
"iss": DEFAUTL_SERVICE_ACCOUNT,
# scope must match 'audience' for google_id_token in the security
# configuration in your swagger spec.
"scope": TARGET_AUD,
# aud must be Google token endpoints URL.
"aud": "https://www.googleapis.com/oauth2/v4/token"
})
headerAndPayload = '{}.{}'.format(
base64.urlsafe_b64encode(header_json),
base64.urlsafe_b64encode(payload_json))
(key_name, signature) = app_identity.sign_blob(headerAndPayload)
signed_jwt = '{}.{}'.format(
headerAndPayload,
base64.urlsafe_b64encode(signature))
return signed_jwt
def sign_blob(self, blob):
"""Cryptographically sign a blob (of bytes).
Implements abstract method
:meth:`oauth2client.client.AssertionCredentials.sign_blob`.
Args:
blob: bytes, Message to be signed.
Returns:
tuple, A pair of the private key ID used to sign the blob and
the signed contents.
"""
return app_identity.sign_blob(blob)
def sign_blob(self, blob):
"""Cryptographically sign a blob (of bytes).
Implements abstract method
:meth:`oauth2client.client.AssertionCredentials.sign_blob`.
Args:
blob: bytes, Message to be signed.
Returns:
tuple, A pair of the private key ID used to sign the blob and
the signed contents.
"""
return app_identity.sign_blob(blob)
def sign_blob(self, blob):
"""Cryptographically sign a blob (of bytes).
Implements abstract method
:meth:`oauth2client.client.AssertionCredentials.sign_blob`.
Args:
blob: bytes, Message to be signed.
Returns:
tuple, A pair of the private key ID used to sign the blob and
the signed contents.
"""
return app_identity.sign_blob(blob)
def sign_blob(self, blob):
"""Cryptographically sign a blob (of bytes).
Implements abstract method
:meth:`oauth2client.client.AssertionCredentials.sign_blob`.
Args:
blob: bytes, Message to be signed.
Returns:
tuple, A pair of the private key ID used to sign the blob and
the signed contents.
"""
return app_identity.sign_blob(blob)
def sign_blob(self, blob):
"""Cryptographically sign a blob (of bytes).
Implements abstract method
:meth:`oauth2client.client.AssertionCredentials.sign_blob`.
Args:
blob: bytes, Message to be signed.
Returns:
tuple, A pair of the private key ID used to sign the blob and
the signed contents.
"""
return app_identity.sign_blob(blob)
app_engine.py 文件源码
项目:google-auth-library-python
作者: GoogleCloudPlatform
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def sign(self, message):
message = _helpers.to_bytes(message)
_, signature = app_identity.sign_blob(message)
return signature
def sign_gcs_url(gcs_filename, expires_after_seconds=6):
""" cloudstorage signed url to download cloudstorage object without login
Docs : https://cloud.google.com/storage/docs/access-control?hl=bg#Signed-URLs
API : https://cloud.google.com/storage/docs/reference-methods?hl=bg#getobject
"""
GCS_API_ACCESS_ENDPOINT = 'https://storage.googleapis.com'
google_access_id = app_identity.get_service_account_name()
method = 'GET'
content_md5, content_type = None, None
# expiration : number of seconds since epoch
expiration_dt = datetime.utcnow() + timedelta(
seconds=expires_after_seconds)
expiration = int(time.mktime(expiration_dt.timetuple()))
# Generate the string to sign.
signature_string = '\n'.join([
method,
content_md5 or '',
content_type or '',
str(expiration),
gcs_filename])
signature_bytes = app_identity.sign_blob(str(signature_string))[1]
# Set the right query parameters. we use a gae service account for the id
query_params = {'GoogleAccessId': google_access_id,
'Expires': str(expiration),
'Signature': base64.b64encode(signature_bytes)}
# Return the built URL.
result = '{endpoint}{resource}?{querystring}'.format(
endpoint=GCS_API_ACCESS_ENDPOINT,
resource=gcs_filename,
querystring=urllib.urlencode(query_params))
return str(result)