def __init__(self):
"""
Construct a CryptographyEngine.
"""
self.logger = logging.getLogger('kmip.server.engine.cryptography')
self._symmetric_key_algorithms = {
enums.CryptographicAlgorithm.TRIPLE_DES: algorithms.TripleDES,
enums.CryptographicAlgorithm.AES: algorithms.AES,
enums.CryptographicAlgorithm.BLOWFISH: algorithms.Blowfish,
enums.CryptographicAlgorithm.CAMELLIA: algorithms.Camellia,
enums.CryptographicAlgorithm.CAST5: algorithms.CAST5,
enums.CryptographicAlgorithm.IDEA: algorithms.IDEA,
enums.CryptographicAlgorithm.RC4: algorithms.ARC4
}
self._asymetric_key_algorithms = {
enums.CryptographicAlgorithm.RSA: self._create_rsa_key_pair
}
self._hash_algorithms = {
enums.CryptographicAlgorithm.HMAC_SHA1: hashes.SHA1,
enums.CryptographicAlgorithm.HMAC_SHA224: hashes.SHA224,
enums.CryptographicAlgorithm.HMAC_SHA256: hashes.SHA256,
enums.CryptographicAlgorithm.HMAC_SHA384: hashes.SHA384,
enums.CryptographicAlgorithm.HMAC_SHA512: hashes.SHA512,
enums.CryptographicAlgorithm.HMAC_MD5: hashes.MD5
}
python类SHA512的实例源码
def _oaep_hash_supported(self, algorithm):
if self._lib.Cryptography_HAS_RSA_OAEP_MD:
return isinstance(
algorithm, (
hashes.SHA1,
hashes.SHA224,
hashes.SHA256,
hashes.SHA384,
hashes.SHA512,
)
)
else:
return isinstance(algorithm, hashes.SHA1)
def get_default_algorithms():
"""
Returns the algorithms that are implemented by the library.
"""
default_algorithms = {
'none': NoneAlgorithm(),
'HS256': HMACAlgorithm(HMACAlgorithm.SHA256),
'HS384': HMACAlgorithm(HMACAlgorithm.SHA384),
'HS512': HMACAlgorithm(HMACAlgorithm.SHA512)
}
if has_crypto:
default_algorithms.update({
'RS256': RSAAlgorithm(RSAAlgorithm.SHA256),
'RS384': RSAAlgorithm(RSAAlgorithm.SHA384),
'RS512': RSAAlgorithm(RSAAlgorithm.SHA512),
'ES256': ECAlgorithm(ECAlgorithm.SHA256),
'ES384': ECAlgorithm(ECAlgorithm.SHA384),
'ES521': ECAlgorithm(ECAlgorithm.SHA512),
'ES512': ECAlgorithm(ECAlgorithm.SHA512), # Backward compat for #219 fix
'PS256': RSAPSSAlgorithm(RSAPSSAlgorithm.SHA256),
'PS384': RSAPSSAlgorithm(RSAPSSAlgorithm.SHA384),
'PS512': RSAPSSAlgorithm(RSAPSSAlgorithm.SHA512)
})
return default_algorithms
def _oaep_hash_supported(self, algorithm):
if self._lib.Cryptography_HAS_RSA_OAEP_MD:
return isinstance(
algorithm, (
hashes.SHA1,
hashes.SHA224,
hashes.SHA256,
hashes.SHA384,
hashes.SHA512,
)
)
else:
return isinstance(algorithm, hashes.SHA1)
def _oaep_hash_supported(self, algorithm):
if self._lib.Cryptography_HAS_RSA_OAEP_MD:
return isinstance(
algorithm, (
hashes.SHA1,
hashes.SHA224,
hashes.SHA256,
hashes.SHA384,
hashes.SHA512,
)
)
else:
return isinstance(algorithm, hashes.SHA1)
def test_encryption_context_hash(encryption_context, result):
hasher = hashes.Hash(
hashes.SHA512(),
backend=default_backend()
)
assert _encryption_context_hash(hasher, encryption_context) == b64decode(result)
def test_encrypted_data_keys_hash(encrypted_data_keys, result):
hasher = hashes.Hash(
hashes.SHA512(),
backend=default_backend()
)
assert _encrypted_data_keys_hash(hasher, encrypted_data_keys) == b''.join([b64decode(each) for each in result])
def _new_cache_key_hasher():
"""Builds a new instance of the hasher used for building cache keys.
:rtype: cryptography.hazmat.primitives.hashes.Hash
"""
return hashes.Hash(
hashes.SHA512(),
backend=default_backend()
)
def keygen(pub_key, priv_key, user_id, priv_key_password=None):
"""Generate new private key and certificate RSA_SHA512_4096"""
# Generate our key
key = rsa.generate_private_key(public_exponent=65537, key_size=4096,
backend=default_backend())
if priv_key_password:
ea = serialization.BestAvailableEncryption(priv_key_password)
else:
ea = serialization.NoEncryption()
# Write our key to disk for safe keeping
with open(priv_key, "wb") as f:
f.write(key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=ea,
))
# Various details about who we are. For a self-signed certificate the
# subject and issuer are always the same.
subject = issuer = x509.Name([
x509.NameAttribute(NameOID.COUNTRY_NAME, "XX"),
x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, "XX"),
x509.NameAttribute(NameOID.LOCALITY_NAME, "XX"),
x509.NameAttribute(NameOID.ORGANIZATION_NAME, "I2P Anonymous Network"),
x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, "I2P"),
x509.NameAttribute(NameOID.COMMON_NAME, user_id),
])
cert = x509.CertificateBuilder() \
.subject_name(subject) \
.issuer_name(issuer) \
.public_key(key.public_key()) \
.not_valid_before(datetime.datetime.utcnow()) \
.not_valid_after(
datetime.datetime.utcnow() + datetime.timedelta(days=365*10)
) \
.serial_number(random.randrange(1000000000, 2000000000)) \
.add_extension(
x509.SubjectKeyIdentifier.from_public_key(key.public_key()),
critical=False,
).sign(key, hashes.SHA512(), default_backend())
with open(pub_key, "wb") as f:
f.write(cert.public_bytes(serialization.Encoding.PEM))
def jws_encapsulate(key,
header,
payload,
digest=hashes.SHA256,
padder=asymmetric.padding.PKCS1v15):
if digest == hashes.SHA256:
suffix = '256'
elif digest == hashes.SHA384:
suffix = '384'
elif digest == hashes.SHA512:
suffix = '512'
else:
raise ValueError('RFC 7518 non-compliant digest: ' + digest)
if isinstance(key, bytes):
algorithm = 'HS' + suffix
signer = hmac.HMAC(key, digest(), backend)
elif isinstance(key, ec.EllipticCurvePrivateKey):
algorithm = 'ES' + suffix
signer = key.signer(ec.ECDSA(digest()))
elif isinstance(key, rsa.RSAPrivateKey):
if padder == asymmetric.padding.PSS:
algorithm = 'PS' + suffix
signer = key.signer(padder(padding.MGF1(digest()),
padder.MAX_LENGTH).
digest())
elif padder == asymmetric.padding.PKCS1v15:
algorithm = 'RS' + suffix
signer = key.signer(padder(), digest())
else:
raise ValueError('RFC 7518 non-compliant padding: ' + \
str(type(padder)))
else:
raise ValueError('RFC 7518 non-compliant key: ' + str(type(key)))
pubkey = key_to_pubkey(key)
header['alg'] = algorithm
header['jwk'] = pubkey_to_jwk(pubkey)
protected = jws_safe_obj(header)
payload = jws_safe_obj(payload)
signer.update(protected + b'.' + payload)
signature = acme_safe_b64_encode(signer.finalize())
return json.dumps({
'protected': protected.decode('ascii'),
'payload': payload.decode('ascii'),
'signature': signature.decode('ascii'),
}).encode('ascii')