def _validate_signed_claim(claim, claim_address, certificate):
if not claim.has_signature:
raise Exception("Claim is not signed")
if not base_decode(claim_address, ADDRESS_LENGTH, 58):
raise Exception("Not given a valid claim address")
try:
if claim.validate_signature(claim_address, certificate.protobuf):
return True
except BadSignatureError:
# print_msg("Signature for %s is invalid" % claim_id)
return False
except Exception as err:
log.error("Signature for %s is invalid, reason: %s - %s", claim_address,
str(type(err)), err)
return False
return False
python类BadSignatureError()的实例源码
def verify_tx_input(tx, i, script, sig, pub):
pub, sig, script = (binascii.unhexlify(x) for x in [pub, sig, script])
t = etr.Transaction(tx)
t.deserialize()
#to prepare for verification (to do the txhash for modtx)
#we need to have the "address" field set in the input.
typ, addr = etr.get_address_from_output_script(script)
if not typ == ebt.TYPE_ADDRESS:
#Don't support non-p2sh, non-p2pkh for now
log.debug("Invalid script")
return False
t.inputs()[i]["address"] = addr
t.inputs()[i]["type"] = 'p2pkh'
txforsig = etr.Hash(t.serialize_preimage(i).decode('hex'))
ecdsa_pub = get_ecdsa_verifying_key(pub)
if not ecdsa_pub:
return False
try:
verified = ecdsa_pub.verify_digest(sig, txforsig,
sigdecode = sigdecode_der)
except BadSignatureError, BadDigestError:
return False
return True
def chain_is_authentic(self):
try:
goofy_public_key = ExternalUser.load_goofy_public_key()
signature_to_check = encode_data((self.recipient_public_key, self.previous_hash))
if isinstance(self.previous_block,Coin):
if (goofy_public_key.verify(self.previous_block.signature, str(self.previous_block.coin_id).encode()) and
goofy_public_key.verify(self.signature, json.dumps((str(self.recipient_public_key),str(self.previous_block.coin_id))).encode())):
return True
else:
return False
elif isinstance(self.previous_block, TransactionBlock):
if (self.previous_hash == self.previous_block.hash and
self.spender_public_key == self.previous_block.recipient_public_key):
self.spender_public_key.verify(self.signature, signature_to_check)
return self.previous_block.chain_is_authentic()
else:
print("Transactions " + str(self.hash) + " and " + str(self.previous_block.hash) + " do not match.")
else:
print("Types do not match up in the chain.")
except ecdsa.BadSignatureError:
return False
def test_sign_verification_record(self):
""" testing crypto sign_verification_record """
test_output = crypto.sign_verification_record(SIGNATORY, PRIOR_BLOCK_HASH, LOWER_HASH, PUBLIC_KEY, PRIVATE_KEY, BLOCK_ID, PHASE, ORIGIN_ID,
VERIFICATION_TS, public_transmission=None, verification_info=None)
# testing that the generated hash from crypto.sign_verification_record is valid
test_signature = crypto.validate_signature(test_output['verification_record']['signature'])
self.assertTrue(test_signature, True)
# testing that an exception is thrown on invalid hashes
test_output['verification_record']['signature']['hash'] = 'invalid_hash'
self.assertRaises(BadSignatureError, crypto.validate_signature, test_output['verification_record']['signature'])
def test_sign_subscription(self):
""" testing crypto sign_subscription """
subscription = SUBSCRIPTION.copy()
crypto.sign_subscription(SIGNATORY, subscription, PRIVATE_KEY, PUBLIC_KEY)
# checking if signature made it into subscription
self.assertEqual('signature' in subscription, True)
# testing that the generated hash from crypto.sign_subscription is valid
test_signature = crypto.validate_signature(subscription['signature'])
self.assertTrue(test_signature, True)
# testing that an exception is thrown on invalid hashes
subscription['signature']['hash'] = 'invalid_hash'
self.assertRaises(BadSignatureError, crypto.validate_signature, subscription['signature'])
def test_validate_subscription(self):
""" testing crypto validate_subscription """
subscription = SUBSCRIPTION.copy()
crypto.sign_subscription(SIGNATORY, subscription, PRIVATE_KEY, PUBLIC_KEY)
signature_block = subscription['signature']
criteria = subscription['criteria']
# testing if validate_subscription passes
self.assertEqual(crypto.validate_subscription(signature_block, criteria, subscription['create_ts'], PUBLIC_KEY), True)
# testing that an exception is thrown on invalid hashes
signature_block['hash'] = 'invalid_hash'
self.assertRaises(BadSignatureError, crypto.validate_signature, signature_block)
def verify(pubkey, message, signature):
'''???? pubkey:hex pubkey, not hex_compressed'''
vk = ecdsa.VerifyingKey.from_string(binascii.unhexlify(pubkey[2:]), curve=ecdsa.SECP256k1, hashfunc = hashlib.sha256)
try:
return vk.verify(binascii.unhexlify(signature), binascii.unhexlify(message))
except ecdsa.BadSignatureError:
return False
def test_sign_transaction():
expected_message = encode_data(("bob", "123"))
actual_sign = alice.sign_transaction("bob", "123")
with pytest.raises(ecdsa.BadSignatureError):
alice.public_key.verify(actual_sign, expected_message)
expected_message = encode_data(("bob", first_block.hash))
actual_sign = alice.sign_transaction("bob", first_block)
assert alice.public_key.verify(actual_sign, expected_message)
def verify(self):
for signature in self.signatures:
header = signature['header']
protected_header = signature['protected']
sign = signature['signature']
payload = self.get_sign_bytes(protected_header)
vk = self.get_verify_key(header)
try:
vk.verify(_decode(sign), payload)
except BadSignatureError:
return False
return True
def validate_vapid_key(self, signed_token):
try:
key_data = urlsafe_b64decode(str(fix_padding(self.vapid_key)))
key_string = extract_public_key(key_data)
verifying_key = ecdsa.VerifyingKey.from_string(
key_string,
curve=ecdsa.NIST256p
)
signed_token = str(fix_padding(signed_token))
try:
submitted_claims_json = jws.verify(
signed_token, verifying_key, algorithms=['ES256']
)
submitted_claims = json.loads(submitted_claims_json)
self_claims = json.loads(self.vapid_key_token)
if submitted_claims['aud'] == self_claims['aud']:
self.vapid_key_status = 'valid'
self.validated = timezone.now()
self.save()
self.start_recording()
except JWSError:
self.vapid_key_status = 'invalid'
self.save()
except ecdsa.BadSignatureError:
self.vapid_key_status = 'invalid'
self.save()