def keyHash(self):
"""
Compute a hash of the underlying PKey object.
The purpose of this method is to allow you to determine if two
certificates share the same public key; it is not really useful for
anything else.
In versions of Twisted prior to 15.0, C{keyHash} used a technique
involving certificate requests for computing the hash that was not
stable in the face of changes to the underlying OpenSSL library.
@return: Return a 32-character hexadecimal string uniquely identifying
this public key, I{for this version of Twisted}.
@rtype: native L{str}
"""
raw = crypto.dump_publickey(crypto.FILETYPE_ASN1, self.original)
h = md5()
h.update(raw)
return h.hexdigest()
python类dump_publickey()的实例源码
def get_fingerprint(path, passphrase=None):
"""Generate the fingerprint of the public key. """
fingerprint = {}
privatekey = load_privatekey(path, passphrase)
try:
publickey = crypto.dump_publickey(crypto.FILETYPE_ASN1, privatekey)
for algo in hashlib.algorithms:
f = getattr(hashlib, algo)
pubkey_digest = f(publickey).hexdigest()
fingerprint[algo] = ':'.join(pubkey_digest[i:i + 2] for i in range(0, len(pubkey_digest), 2))
except AttributeError:
# If PyOpenSSL < 16.0 crypto.dump_publickey() will fail.
# By doing this we prevent the code from raising an error
# yet we return no value in the fingerprint hash.
pass
return fingerprint
man_cert_setup.py 文件源码
项目:aws-greengrass-mini-fulfillment
作者: awslabs
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def create_group_cert(cli):
k = crypto.PKey()
k.generate_key(crypto.TYPE_RSA, 2048) # generate RSA key-pair
cert = crypto.X509()
cert.get_subject().countryName = "US"
cert.get_subject().stateOrProvinceName = "CA"
cert.get_subject().organizationName = "mini-fulfillment"
cert.get_subject().organizationalUnitName = "demo"
cert.get_subject().commonName = "mini-fulfillment"
cert.set_serial_number(1000)
cert.gmtime_adj_notBefore(0)
cert.gmtime_adj_notAfter(5 * 365 * 24 * 60 * 60) # 5 year expiry date
cert.set_issuer(cert.get_subject()) # self-sign this certificate
cert.set_pubkey(k)
san_list = ["IP:{0}".format(cli.ip_address)]
extension_list = [
crypto.X509Extension(type_name=b"basicConstraints",
critical=False, value=b"CA:false"),
crypto.X509Extension(type_name=b"subjectAltName",
critical=True, value=", ".join(san_list)),
# crypto.X509Extension(type_name=b"subjectKeyIdentifier",
# critical=True, value=b"hash")
]
cert.add_extensions(extension_list)
cert.sign(k, 'sha256')
prefix = str(cli.out_dir) + '/' + cli.group_name
open("{0}-server.crt".format(prefix), 'wt').write(
crypto.dump_certificate(crypto.FILETYPE_PEM, cert))
open("{0}-server-private.key".format(prefix), 'wt').write(
crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey=k))
open("{0}-server-public.key".format(prefix), 'wt').write(
crypto.dump_publickey(crypto.FILETYPE_PEM, pkey=k))
def test_tolerates_unicode_strings(self):
"""
load_publickey works with text strings, not just bytes.
"""
serialized = cleartextPublicKeyPEM.decode('ascii')
key = load_publickey(FILETYPE_PEM, serialized)
dumped_pem = dump_publickey(FILETYPE_PEM, key)
assert dumped_pem == cleartextPublicKeyPEM
def test_dump_publickey_pem(self):
"""
dump_publickey writes a PEM.
"""
key = load_publickey(FILETYPE_PEM, cleartextPublicKeyPEM)
dumped_pem = dump_publickey(FILETYPE_PEM, key)
assert dumped_pem == cleartextPublicKeyPEM
def test_dump_publickey_asn1(self):
"""
dump_publickey writes a DER.
"""
key = load_publickey(FILETYPE_PEM, cleartextPublicKeyPEM)
dumped_der = dump_publickey(FILETYPE_ASN1, key)
key2 = load_publickey(FILETYPE_ASN1, dumped_der)
dumped_pem2 = dump_publickey(FILETYPE_PEM, key2)
assert dumped_pem2 == cleartextPublicKeyPEM
def test_dump_publickey_invalid_type(self):
"""
dump_publickey doesn't support FILETYPE_TEXT.
"""
key = load_publickey(FILETYPE_PEM, cleartextPublicKeyPEM)
with pytest.raises(ValueError):
dump_publickey(FILETYPE_TEXT, key)
def _get_keys(self):
data_folder = self.get_plugin_data_folder()
key_filename = os.path.join(data_folder, 'p3d_key')
self._logger.debug('key_filename: {}'.format(key_filename))
if not os.path.isfile(key_filename):
self._logger.debug('Generating key pair')
key = crypto.PKey()
key.generate_key(crypto.TYPE_RSA, 2048)
with open(key_filename, 'w') as f:
f.write(crypto.dump_privatekey(crypto.FILETYPE_PEM, key))
if sys.platform != 'win32':
os.chmod(key_filename, stat.S_IRUSR | stat.S_IWUSR)
try:
with open(key_filename) as f:
key = f.read()
self._key = crypto.load_privatekey(crypto.FILETYPE_PEM, key)
except:
self._key = None
self._logger.error("Unable to generate or access key.")
return
if hasattr(self._key, 'dump_publickey'):
self._public_key = crypto.dump_publickey(crypto.FILETYPE_PEM, self._key)
else:
pubkey_filename = key_filename + ".pub"
if not os.path.isfile(pubkey_filename) or os.path.getsize(pubkey_filename) == 0:
if sys.platform != 'win32':
os.chmod(key_filename, stat.S_IRUSR | stat.S_IWUSR)
command_line = "ssh-keygen -e -m PEM -f {key_filename} > {pubkey_filename}".format(key_filename=key_filename, pubkey_filename=pubkey_filename)
returncode, stderr_text = self._system(command_line)
if returncode != 0:
self._logger.error("Unable to generate public key (may need to manually upgrade pyOpenSSL, see README) {}: {}".format(returncode, stderr_text))
self._key = None
try:
os.remove(pubkey_filename)
except OSError:
pass
return
with open(pubkey_filename) as f:
self._public_key = f.read()
def write_keyvalstore_files(self, cert_tuple, key_val_store_path, json_file_name):
user = self.directory.getUser(cert_tuple.user)
cert = self.directory.findCertForNodeAdminTuple(cert_tuple)
ski = calculate_ski_per_sdk_node(cert.get_pubkey())
# write out the keyValStore files
with open("{0}/{1}-priv".format(key_val_store_path, ski),
"w") as f:
f.write(crypto.dump_privatekey(crypto.FILETYPE_PEM, user.pKey))
with open("{0}/{1}-pub".format(key_val_store_path, ski), "w") as f:
f.write(crypto.dump_publickey(crypto.FILETYPE_PEM, cert.get_pubkey()))
adminDict = self.getAdminDict(ski=ski, certificate_as_pem=crypto.dump_certificate(crypto.FILETYPE_PEM, cert), msp_id=cert_tuple.organization)
with open("{0}/{1}".format(key_val_store_path, json_file_name), "w") as f:
f.write(json.dumps(adminDict, separators=(',', ':')))
def calculate_ski_per_sdk_node(public_key):
# uncompressed point (https://www.security-audit.com/files/x9-62-09-20-98.pdf, section 4.3.6)
part1 = bytearray()
part1.append(4)
# Now part 2, the public key
part2 = ecdsa.VerifyingKey.from_pem(crypto.dump_publickey(crypto.FILETYPE_PEM, public_key)).to_string()
# Construct the full byte array
data = bytearray().join([part1, part2])
return computeCryptoHash(data).encode('hex')
def verifySignature(self, signature, signersCert, data):
'Will verify the signature of an entity based upon public cert'
vk = ecdsa.VerifyingKey.from_der(crypto.dump_publickey(crypto.FILETYPE_ASN1, signersCert.get_pubkey()))
assert vk.verify(signature, data, hashfunc=self.hashfunc, sigdecode=self.sigdecode), "Invalid signature!!"
def dump_subkey():
global sub_publickey
sub_key = crypto.PKey()
sub_key.generate_key(crypto.TYPE_RSA, 2048)
with open(sub_keyfile, 'wb') as fp:
fp.write(crypto.dump_privatekey(crypto.FILETYPE_PEM, sub_key))
fp.write(crypto.dump_publickey(crypto.FILETYPE_PEM, sub_key))
sub_publickey = sub_key
def strip_key(pem):
""" Return only the b64 part of the ASCII armored PEM.
"""
key = crypto.load_privatekey(crypto.FILETYPE_PEM, pem)
public_pem = crypto.dump_publickey(crypto.FILETYPE_PEM, key)
return public_pem.replace(b"\n", b"").split(b"-----")[2]