def generate_peer_token(self, peer_sign, peer_cert, peer_id, peer_target,
group_id, peer_type, rand_key, token_interval):
peer_info = b''.join([peer_id.encode('utf-8'),
peer_target.encode('utf-8'),
group_id.encode('utf-8')]) + bytes([peer_type])
data = peer_info + rand_key
cert = x509.load_der_x509_certificate(peer_cert, default_backend())
if self.verify_data_with_cert(cert=cert, data=data, signature=peer_sign):
time = datetime.datetime.now() + datetime.timedelta(minutes=token_interval)
date = int(time.timestamp() * 1000).to_bytes(length=8, byteorder='big')
peer_pub = cert.public_key().public_bytes(encoding=serialization.Encoding.DER,
format=PublicFormat.SubjectPublicKeyInfo)
# token_bytes = peer_id || peer_target || group_id || peer_type || peer_pub
token_bytes = peer_info + date + peer_pub
logging.debug("TBS Token[%s]", token_bytes.hex())
# token = date || CA_Sign(token_bytes)
signed_token = self.sign_data(token_bytes)
token = b''.join([date, signed_token]).hex()
return token
else:
logging.debug('?? ?? ?? ?? ??? ?? ??')
return None
评论列表
文章目录