def verify(self, message, signature):
"""Verifies a message against a signature.
Args:
message: string, The message to verify.
signature: string, The signature on the message.
Returns:
True if message was signed by the private key associated with the public
key that this object was constructed with.
"""
try:
return PKCS1_v1_5.new(self._pubkey).verify(
SHA256.new(message), signature)
except:
return False
python类new()的实例源码
def encrypt(self, data, pubkey):
# all or nothing data
transformPacket = AONTencrypt(data)
# message key
messageKey = random_aes_32_key()
if not isinstance(pubkey, RSA.RsaKey):
key = RSA.importKey(pubkey)
else:
key = pubkey
skey = PKCS1_OAEP.new(key)
frontMatter = skey.encrypt(messageKey) # , 16)
# print('RSA calls AES Encrypt with private key: %s' % key.publickey().exportKey())
# encrypted message
backMatter = default_aes(messageKey).encrypt(transformPacket)
# put together
message = frontMatter + backMatter
return message
def testSign1(self):
for i in range(len(self._testData)):
row = self._testData[i]
# Build the key
if isStr(row[0]):
key = RSA.importKey(row[0])
else:
comps = [ long(rws(row[0][x]),16) for x in ('n','e','d') ]
key = RSA.construct(comps)
h = row[3].new()
# Data to sign can either be in hex form or not
try:
h.update(t2b(row[1]))
except:
h.update(b(row[1]))
# The real test
signer = PKCS.new(key)
self.failUnless(signer.can_sign())
s = signer.sign(h)
self.assertEqual(s, t2b(row[2]))
def testVerify1(self):
for i in range(len(self._testData)):
row = self._testData[i]
# Build the key
if isStr(row[0]):
key = RSA.importKey(row[0]).publickey()
else:
comps = [ long(rws(row[0][x]),16) for x in ('n','e') ]
key = RSA.construct(comps)
h = row[3].new()
# Data to sign can either be in hex form or not
try:
h.update(t2b(row[1]))
except:
h.update(b(row[1]))
# The real test
verifier = PKCS.new(key)
self.failIf(verifier.can_sign())
result = verifier.verify(h, t2b(row[2]))
self.failUnless(result)
def testSign1(self):
for i in range(len(self._testData)):
row = self._testData[i]
# Build the key
if isStr(row[0]):
key = RSA.importKey(row[0])
else:
comps = [ int(rws(row[0][x]),16) for x in ('n','e','d') ]
key = RSA.construct(comps)
h = row[3].new()
# Data to sign can either be in hex form or not
try:
h.update(t2b(row[1]))
except:
h.update(b(row[1]))
# The real test
signer = PKCS.new(key)
self.assertTrue(signer.can_sign())
s = signer.sign(h)
self.assertEqual(s, t2b(row[2]))
def testVerify1(self):
for i in range(len(self._testData)):
row = self._testData[i]
# Build the key
if isStr(row[0]):
key = RSA.importKey(row[0]).publickey()
else:
comps = [ int(rws(row[0][x]),16) for x in ('n','e') ]
key = RSA.construct(comps)
h = row[3].new()
# Data to sign can either be in hex form or not
try:
h.update(t2b(row[1]))
except:
h.update(b(row[1]))
# The real test
verifier = PKCS.new(key)
self.assertFalse(verifier.can_sign())
result = verifier.verify(h, t2b(row[2]))
self.assertTrue(result)
def submit(self, nonce):
command = {'command': 'submission', 'args': {'nonce': nonce, 'wallet_id': self.wallet_id}}
message = json.dumps(command)
await self.socket.send(message)
message = await self.socket.recv()
response = json.loads(message)
print("Submission result: {0}".format(response))
if 'error' in response:
print("Error during submission : {0}".format(response["error"]))
if 'challenge_name' in response:
# we got a new challenge
challenge = Challenge()
challenge.fill_from_challenge(response)
return challenge
return None
def create_transaction(self, recipient, amount):
hasher = SHA256.new()
hasher.update("{0},{1},{2:.5f}".format(self.wallet_id, recipient, amount))
signature = self.sign_message(hasher)
command = {'command': 'create_transaction', 'args': {'source': self.wallet_id, 'recipient': recipient, 'amount': "{0:.5f}".format(amount), signature: signature}}
message = json.dumps(command)
await self.socket.send(message)
message = await self.socket.recv()
response = json.loads(message)
if 'error' in response:
print("Error during create_transaction : {0}".format(response['error']))
if 'id' in response:
return response['id']
return None
def verify(self, message, signature):
"""Verifies a message against a signature.
Args:
message: string, The message to verify.
signature: string, The signature on the message.
Returns:
True if message was signed by the private key associated with the public
key that this object was constructed with.
"""
try:
return PKCS1_v1_5.new(self._pubkey).verify(
SHA256.new(message), signature)
except:
return False
def verify_signature(data, signature, x509_certificate):
"""Verifies a signature using the given x.509 public key certificate."""
# PyCrypto 2.6 doesn't support x.509 certificates directly, so we'll need
# to extract the public key from it manually.
# This code is based on https://github.com/google/oauth2client/blob/master
# /oauth2client/_pycrypto_crypt.py
pem_lines = x509_certificate.replace(b' ', b'').split()
cert_der = base64.urlsafe_b64decode(b''.join(pem_lines[1:-1]))
cert_seq = DerSequence()
cert_seq.decode(cert_der)
tbs_seq = DerSequence()
tbs_seq.decode(cert_seq[0])
public_key = RSA.importKey(tbs_seq[6])
signer = PKCS1_v1_5.new(public_key)
digest = SHA256.new(data)
return signer.verify(digest, signature)
def verify(self, message, signature):
"""Verifies a message against a signature.
Args:
message: string, The message to verify.
signature: string, The signature on the message.
Returns:
True if message was signed by the private key associated with the public
key that this object was constructed with.
"""
try:
return PKCS1_v1_5.new(self._pubkey).verify(
SHA256.new(message), signature)
except:
return False
def verify(self, message, signature):
"""Verifies a message against a signature.
Args:
message: string, The message to verify.
signature: string, The signature on the message.
Returns:
True if message was signed by the private key associated with the public
key that this object was constructed with.
"""
try:
return PKCS1_v1_5.new(self._pubkey).verify(
SHA256.new(message), signature)
except:
return False
def verify(self, message, signature):
"""Verifies a message against a signature.
Args:
message: string, The message to verify.
signature: string, The signature on the message.
Returns:
True if message was signed by the private key associated with the public
key that this object was constructed with.
"""
try:
return PKCS1_v1_5.new(self._pubkey).verify(
SHA256.new(message), signature)
except:
return False
def verify(self, message, signature):
"""Verifies a message against a signature.
Args:
message: string, The message to verify.
signature: string, The signature on the message.
Returns:
True if message was signed by the private key associated with the public
key that this object was constructed with.
"""
try:
return PKCS1_v1_5.new(self._pubkey).verify(
SHA256.new(message), signature)
except:
return False
def verify(self, message, signature):
"""Verifies a message against a signature.
Args:
message: string, The message to verify.
signature: string, The signature on the message.
Returns:
True if message was signed by the private key associated with the public
key that this object was constructed with.
"""
try:
return PKCS1_v1_5.new(self._pubkey).verify(
SHA256.new(message), signature)
except:
return False
def testSign1(self):
for i in range(len(self._testData)):
row = self._testData[i]
# Build the key
if isStr(row[0]):
key = RSA.importKey(row[0])
else:
comps = [ int(rws(row[0][x]),16) for x in ('n','e','d') ]
key = RSA.construct(comps)
h = row[3].new()
# Data to sign can either be in hex form or not
try:
h.update(t2b(row[1]))
except:
h.update(b(row[1]))
# The real test
signer = PKCS.new(key)
self.assertTrue(signer.can_sign())
s = signer.sign(h)
self.assertEqual(s, t2b(row[2]))
def testVerify1(self):
for i in range(len(self._testData)):
row = self._testData[i]
# Build the key
if isStr(row[0]):
key = RSA.importKey(row[0]).publickey()
else:
comps = [ int(rws(row[0][x]),16) for x in ('n','e') ]
key = RSA.construct(comps)
h = row[3].new()
# Data to sign can either be in hex form or not
try:
h.update(t2b(row[1]))
except:
h.update(b(row[1]))
# The real test
verifier = PKCS.new(key)
self.assertFalse(verifier.can_sign())
result = verifier.verify(h, t2b(row[2]))
self.assertTrue(result)
def sign_dict(dict_, private_key):
"""Serializes a dict to JSON and sign it using RSA."""
try:
with open(private_key) as fp:
key = RSA.importKey(fp.read())
except (FileNotFoundError, ValueError, IndexError, TypeError):
raise ValueError('Invalid private key file.')
signer = PKCS1_v1_5.new(key)
# encodes message
message = SHA256.new(json.dumps(dict_, sort_keys=True).encode())
# sign
signature = signer.sign(message)
return base64.b64encode(signature).decode()
def verify_archive(self, dest):
self.assertTrue(os.path.exists(dest))
self.assertTrue(zipfile.is_zipfile(dest))
with zipfile.ZipFile(dest) as archive:
files = archive.namelist()
member = archive.extract(self.obj_sha256)
sig_fn = archive.extract('signature')
metadata_fn = archive.extract('metadata')
self.addCleanup(os.remove, member)
self.addCleanup(os.remove, sig_fn)
self.addCleanup(os.remove, metadata_fn)
self.assertEqual(len(files), 3)
self.assertIn(self.obj_sha256, files)
self.assertFalse(os.path.islink(member))
with open(metadata_fn) as fp:
message = SHA256.new(fp.read().encode())
with open(sig_fn) as fp:
signature = base64.b64decode(fp.read())
verifier = PKCS1_v1_5.new(self.private_key)
self.assertTrue(verifier.verify(message, signature))
def test_can_sign_dict(self):
_, fn = tempfile.mkstemp()
self.addCleanup(os.remove, fn)
key = RSA.generate(1024)
with open(fn, 'wb') as fp:
fp.write(key.exportKey())
dict_ = {}
message = SHA256.new(json.dumps(dict_).encode())
result = utils.sign_dict(dict_, fn)
signature = base64.b64decode(result)
verifier = PKCS1_v1_5.new(key)
is_valid = verifier.verify(message, signature)
self.assertTrue(is_valid)