def from_string(key_pem, is_x509_cert):
"""Construct a Verified instance from a string.
Args:
key_pem: string, public key in PEM format.
is_x509_cert: bool, True if key_pem is an X509 cert, otherwise it is
expected to be an RSA key in PEM format.
Returns:
Verifier instance.
"""
if is_x509_cert:
pemLines = key_pem.replace(' ', '').split()
certDer = _urlsafe_b64decode(''.join(pemLines[1:-1]))
certSeq = DerSequence()
certSeq.decode(certDer)
tbsSeq = DerSequence()
tbsSeq.decode(certSeq[0])
pubkey = RSA.importKey(tbsSeq[6])
else:
pubkey = RSA.importKey(key_pem)
return PyCryptoVerifier(pubkey)
python类importKey()的实例源码
def from_string(key, password='notasecret'):
"""Construct a Signer instance from a string.
Args:
key: string, private key in PEM format.
password: string, password for private key file. Unused for PEM files.
Returns:
Signer instance.
Raises:
NotImplementedError if they key isn't in PEM format.
"""
parsed_pem_key = _parse_pem_key(key)
if parsed_pem_key:
pkey = RSA.importKey(parsed_pem_key)
else:
raise NotImplementedError(
'PKCS12 format is not supported by the PyCrypto library. '
'Try converting to a "PEM" '
'(openssl pkcs12 -in xxxxx.p12 -nodes -nocerts > privatekey.pem) '
'or using PyOpenSSL if native code is an option.')
return PyCryptoSigner(pkey)
def from_string(key_pem, is_x509_cert):
"""Construct a Verified instance from a string.
Args:
key_pem: string, public key in PEM format.
is_x509_cert: bool, True if key_pem is an X509 cert, otherwise it
is expected to be an RSA key in PEM format.
Returns:
Verifier instance.
"""
if is_x509_cert:
key_pem = _helpers._to_bytes(key_pem)
pemLines = key_pem.replace(b' ', b'').split()
certDer = _helpers._urlsafe_b64decode(b''.join(pemLines[1:-1]))
certSeq = DerSequence()
certSeq.decode(certDer)
tbsSeq = DerSequence()
tbsSeq.decode(certSeq[0])
pubkey = RSA.importKey(tbsSeq[6])
else:
pubkey = RSA.importKey(key_pem)
return PyCryptoVerifier(pubkey)
def from_string(key, password='notasecret'):
"""Construct a Signer instance from a string.
Args:
key: string, private key in PEM format.
password: string, password for private key file. Unused for PEM
files.
Returns:
Signer instance.
Raises:
NotImplementedError if the key isn't in PEM format.
"""
parsed_pem_key = _helpers._parse_pem_key(_helpers._to_bytes(key))
if parsed_pem_key:
pkey = RSA.importKey(parsed_pem_key)
else:
raise NotImplementedError(
'No key in PEM format was detected. This implementation '
'can only use the PyCrypto library for keys in PEM '
'format.')
return PyCryptoSigner(pkey)
def __init__(self, grid_name, credentials, **kwargs):
self.grid_name = grid_name
self.credentials = urllib.unquote(credentials)
self.current_grid = GridEntity.select().where(
GridEntity.name == grid_name).get()
self.current_config = configs[
self.current_grid.provider].select().where(
configs[self.current_grid.provider].parentgrid ==
self.current_grid).get()
self.private_key_text = urllib.unquote(self.current_config.sshkeydata)
self.private_key = RSA.importKey(self.private_key_text.strip())
self.public_key_text = self.private_key.publickey().exportKey('OpenSSH')
self.current_groups = []
if not os.path.exists('result/{}/infrastructure'.format(grid_name)):
os.makedirs('result/{}/infrastructure'.format(grid_name))
for group in groups[self.current_grid.provider].select():
if group.parentgrid.name == grid_name:
self.current_groups.append(group)
self.config = AutoDict()
self.networking = AutoDict()
self.security = AutoDict()
self.terminal = AutoDict()
self.masters = AutoDict()
def __init__(self, grid_name, api_user, api_pass, api_url, **kwargs):
self.grid_name = grid_name
self.api_user = urllib.unquote(api_user)
self.api_pass = urllib.unquote(api_pass)
self.api_url = urllib.unquote(api_url)
self.current_grid = GridEntity.select().where(
GridEntity.name == grid_name).get()
self.current_config = configs[
self.current_grid.provider].select().where(
configs[self.current_grid.provider].parentgrid ==
self.current_grid).get()
self.private_key_text = urllib.unquote(self.current_config.sshkeydata)
self.private_key = RSA.importKey(self.private_key_text.strip())
self.public_key_text = self.private_key.publickey().exportKey('OpenSSH')
self.current_groups = []
if not os.path.exists('result/{}/infrastructure'.format(grid_name)):
os.makedirs('result/{}/infrastructure'.format(grid_name))
for group in groups[self.current_grid.provider].select():
if group.parentgrid.name == grid_name:
self.current_groups.append(group)
self.config = AutoDict()
self.networking = AutoDict()
self.security = AutoDict()
self.terminal = AutoDict()
self.masters = AutoDict()
def check_rsa_key(sample):
"""
Returns a 3-tuple (is_rsa_key, has_private_component, n_bit_length)
is_rsa_key - a bool indicating that the sample is, in fact, an RSA key
in a format readable by Crypto.PublicKey.RSA.importKey
has_private_component - a bool indicating whether or not d was in the
analyzed key, or false if the sample is not an RSA key
n_bit_length - an int representing the bit length of the modulus found
in the analyzed key, or False if the sample is not an RSA key
"""
is_rsa_key = has_private_component = n_bit_length = False
try:
rsakey = RSA.importKey(sample.strip())
is_rsa_key = True
if rsakey.has_private():
has_private_component = True
n_bit_length = gmpy.mpz(rsakey.n).bit_length()
# Don't really care why it fails, just want to see if it did
except:
is_rsa_key = False
return (is_rsa_key, has_private_component, n_bit_length)
def testEncrypt1(self):
for test in self._testData:
# Build the key
key = RSA.importKey(test[0])
# RNG that takes its random numbers from a pool given
# at initialization
class randGen:
def __init__(self, data):
self.data = data
self.idx = 0
def __call__(self, N):
r = self.data[self.idx:N]
self.idx += N
return r
# The real test
key._randfunc = randGen(t2b(test[3]))
cipher = PKCS.new(key)
ct = cipher.encrypt(b(test[1]))
self.assertEqual(ct, t2b(test[2]))
def testSign1(self):
for i in range(len(self._testData)):
row = self._testData[i]
# Build the key
if isStr(row[0]):
key = RSA.importKey(row[0])
else:
comps = [ long(rws(row[0][x]),16) for x in ('n','e','d') ]
key = RSA.construct(comps)
h = row[3].new()
# Data to sign can either be in hex form or not
try:
h.update(t2b(row[1]))
except:
h.update(b(row[1]))
# The real test
signer = PKCS.new(key)
self.failUnless(signer.can_sign())
s = signer.sign(h)
self.assertEqual(s, t2b(row[2]))
def testVerify1(self):
for i in range(len(self._testData)):
row = self._testData[i]
# Build the key
if isStr(row[0]):
key = RSA.importKey(row[0]).publickey()
else:
comps = [ long(rws(row[0][x]),16) for x in ('n','e') ]
key = RSA.construct(comps)
h = row[3].new()
# Data to sign can either be in hex form or not
try:
h.update(t2b(row[1]))
except:
h.update(b(row[1]))
# The real test
verifier = PKCS.new(key)
self.failIf(verifier.can_sign())
result = verifier.verify(h, t2b(row[2]))
self.failUnless(result)
def testEncrypt1(self):
for test in self._testData:
# Build the key
key = RSA.importKey(test[0])
# RNG that takes its random numbers from a pool given
# at initialization
class randGen:
def __init__(self, data):
self.data = data
self.idx = 0
def __call__(self, N):
r = self.data[self.idx:N]
self.idx += N
return r
# The real test
key._randfunc = randGen(t2b(test[3]))
cipher = PKCS.new(key)
ct = cipher.encrypt(b(test[1]))
self.assertEqual(ct, t2b(test[2]))
def testSign1(self):
for i in range(len(self._testData)):
row = self._testData[i]
# Build the key
if isStr(row[0]):
key = RSA.importKey(row[0])
else:
comps = [ int(rws(row[0][x]),16) for x in ('n','e','d') ]
key = RSA.construct(comps)
h = row[3].new()
# Data to sign can either be in hex form or not
try:
h.update(t2b(row[1]))
except:
h.update(b(row[1]))
# The real test
signer = PKCS.new(key)
self.failUnless(signer.can_sign())
s = signer.sign(h)
self.assertEqual(s, t2b(row[2]))
def testVerify1(self):
for i in range(len(self._testData)):
row = self._testData[i]
# Build the key
if isStr(row[0]):
key = RSA.importKey(row[0]).publickey()
else:
comps = [ int(rws(row[0][x]),16) for x in ('n','e') ]
key = RSA.construct(comps)
h = row[3].new()
# Data to sign can either be in hex form or not
try:
h.update(t2b(row[1]))
except:
h.update(b(row[1]))
# The real test
verifier = PKCS.new(key)
self.failIf(verifier.can_sign())
result = verifier.verify(h, t2b(row[2]))
self.failUnless(result)
def testEncrypt1(self):
for test in self._testData:
# Build the key
key = RSA.importKey(test[0])
# RNG that takes its random numbers from a pool given
# at initialization
class randGen:
def __init__(self, data):
self.data = data
self.idx = 0
def __call__(self, N):
r = self.data[self.idx:N]
self.idx += N
return r
# The real test
key._randfunc = randGen(t2b(test[3]))
cipher = PKCS.new(key)
ct = cipher.encrypt(b(test[1]))
self.assertEqual(ct, t2b(test[2]))
def testSign1(self):
for i in range(len(self._testData)):
row = self._testData[i]
# Build the key
if isStr(row[0]):
key = RSA.importKey(row[0])
else:
comps = [ long(rws(row[0][x]),16) for x in ('n','e','d') ]
key = RSA.construct(comps)
h = row[3].new()
# Data to sign can either be in hex form or not
try:
h.update(t2b(row[1]))
except:
h.update(b(row[1]))
# The real test
signer = PKCS.new(key)
self.failUnless(signer.can_sign())
s = signer.sign(h)
self.assertEqual(s, t2b(row[2]))
def testVerify1(self):
for i in range(len(self._testData)):
row = self._testData[i]
# Build the key
if isStr(row[0]):
key = RSA.importKey(row[0]).publickey()
else:
comps = [ long(rws(row[0][x]),16) for x in ('n','e') ]
key = RSA.construct(comps)
h = row[3].new()
# Data to sign can either be in hex form or not
try:
h.update(t2b(row[1]))
except:
h.update(b(row[1]))
# The real test
verifier = PKCS.new(key)
self.failIf(verifier.can_sign())
result = verifier.verify(h, t2b(row[2]))
self.failUnless(result)
def private_key(self):
"""Required by federation.
Corresponds to private key.
"""
if self.rsa_private_key:
return RSA.importKey(self.rsa_private_key)
def key(self):
"""Required by federation.
Corresponds to public key.
"""
if self.rsa_public_key:
return RSA.importKey(self.rsa_public_key)
def test_user_public_key(user_contract, web3):
key = encrypt.createKey()
binPubKey1 = key.publickey().exportKey('DER')
print(sys.getsizeof(binPubKey1))
user_contract.transact().setPubKey(binPubKey1)
binPubKey2 = user_contract.call().userPubkey(web3.eth.accounts[0])
print(binPubKey2)
pubKeyObj = RSA.importKey(binPubKey2)
enc_data = encrypt.encrypt(pubKeyObj, 'Hello there!')
print(encrypt.decrypt(enc_data, key))
def test_ipfs_integration(user_contract, web3):
# create new private key
key = encrypt.createKey()
# get public key from private key
binPubKey1 = key.publickey().exportKey('DER')
# set public key for user in blockchain
user_contract.transact().setPubKey(binPubKey1)
# retrieve public key for user
binPubKey2 = user_contract.call().userPubkey(web3.eth.accounts[0])
# import public key from blockchain
pubKeyObj = RSA.importKey(binPubKey2)
# encrypt message with public key of user
enc_data = encrypt.encrypt(pubKeyObj, 'Last Night...')
# connect to locally running IPFS daemon
api = ipfsapi.connect('127.0.0.1', 5001)
# add encrypted message
res = api.add_bytes(enc_data)
print("\n\nIPFS_HASH_CREATE:\t", res)
# filter for YouHaveMail events on contract
transfer_filter = user_contract.on("YouHaveMail")
user_contract.transact().sendMessage(web3.eth.accounts[1], res)
wait(transfer_filter)
log_entries = transfer_filter.get()
# extract ipfsHash from event
res = log_entries[0]['args']['_ipfsHash']
print("IPFS_HASH_EVENT:\t", res)
# get stored encrypted message from IPFS
ret = api.cat(res)
print("IPFS_HASH_DATA:\t\t", ret)
# decrypt message with private key of user
print("IFPS_DATA_DECR:\t\t", encrypt.decrypt(ret, key))