def get_rest_token(self):
"""
Returns an auth token for making calls to eventhub REST API.
"""
uri = urllib.parse.quote_plus("https://{}.servicebus.windows.net/{}" \
.format(self.sb_name, self.eh_name))
sas = self.sas_key.encode('utf-8')
expiry = str(int(time.time() + 10000))
string_to_sign = ('{}\n{}'.format(uri,expiry)).encode('utf-8')
signed_hmac_sha256 = hmac.HMAC(sas, string_to_sign, hashlib.sha256)
signature = urllib.parse.quote(base64.b64encode(signed_hmac_sha256.digest()))
return 'SharedAccessSignature sr={}&sig={}&se={}&skn={}' \
.format(uri, signature, expiry, self.policy)
评论列表
文章目录