def _PopulateX509(self):
with self._x509_init_lock:
if self._x509 is None:
url = ('https://www.googleapis.com/service_accounts/v1/metadata/x509/%s'
% urllib.unquote_plus(self._credentials.service_account_email))
response = urlfetch.fetch(
url=url,
validate_certificate=True,
method=urlfetch.GET)
if response.status_code != 200:
raise apiproxy_errors.ApplicationError(
app_identity_service_pb.AppIdentityServiceError.UNKNOWN_ERROR,
'Unable to load X509 cert: %s Response code: %i, Content: %s' % (
url, response.status_code, response.content))
message = 'dummy'
_, signature = self._credentials.sign_blob(message)
for signing_key, x509 in json.loads(response.content).items():
der = rsa.pem.load_pem(x509, 'CERTIFICATE')
asn1_cert, _ = decoder.decode(der, asn1Spec=Certificate())
key_bitstring = (
asn1_cert['tbsCertificate']
['subjectPublicKeyInfo']
['subjectPublicKey'])
key_bytearray = BitStringToByteString(key_bitstring)
public_key = rsa.PublicKey.load_pkcs1(key_bytearray, 'DER')
try:
if rsa.pkcs1.verify(message, signature, public_key):
self._x509 = x509
self._signing_key = signing_key
return
except rsa.pkcs1.VerificationError:
pass
raise apiproxy_errors.ApplicationError(
app_identity_service_pb.AppIdentityServiceError.UNKNOWN_ERROR,
'Unable to find matching X509 cert for private key: %s' % url)
app_identity_defaultcredentialsbased_stub.py 文件源码
python
阅读 28
收藏 0
点赞 0
评论 0
评论列表
文章目录