def sign_data(self, data: bytes) -> bytes:
"""
CA ???? DATA ??
:param data: ?? ?? ??
:return: ??
"""
if isinstance(self.__ca_pri, ec.EllipticCurvePrivateKeyWithSerialization):
signer = self.__ca_pri.signer(ec.ECDSA(hashes.SHA256()))
signer.update(data)
return signer.finalize()
elif isinstance(self.__ca_pri, rsa.RSAPrivateKeyWithSerialization):
return self.__ca_pri.sign(
data,
padding.PKCS1v15(),
hashes.SHA256()
)
else:
logging.debug("Unknown PrivateKey Type : %s", type(self.__ca_pri))
return None
python类SHA256的实例源码
def get_channel(self):
#first get a token we need to sign in order to prove we are who we say we are
r = requests.get(str(self.base_link_uri) + "/get_device_token", params={"UUID": self.uuid, })
token = r.json()["token"]
# get the private Key
with open(CloudLinkSettings.PRIVATE_KEY_LOCATION,'r') as key_file:
private_key = serialization.load_pem_private_key(key_file.read(),
password=CloudLinkSettings.PRIVATE_KEY_PASSPHRASE,
backend=default_backend())
# sign the token with our private key
signer = private_key.signer(padding.PKCS1v15(), hashes.SHA256())
signer.update(bytes(token))
signature = signer.finalize()
# get the randomly assigned channel for my UUID
r = requests.get(str(self.base_link_uri) + "/get_device_group",
params={"UUID": self.uuid, "signature": b64encode(signature), "format": "PKCS1_v1_5"})
if r.ok:
self.channel_uri = r.json()["channel"]
elif r.status_code == 400:
raise Exception("UUID or Token not registered with Cloud.")
elif r.status_code == 403:
raise Exception("Signature didn't verify correctly. Bad private key or signature.")
def sign(self, msg):
"""Create signature for message
Taken from https://github.com/madeddie/python-bunq - Thanks!
:param msg: data to be signed, usually action, headers and body
:type msg: str
"""
return base64.b64encode(
self.privkey_pem.sign(
msg.encode(),
padding.PKCS1v15(),
hashes.SHA256()
)
).decode()
def _encrypt_from_parts(self, data, current_time, iv):
if not isinstance(data, bytes):
raise TypeError("data must be bytes.")
padder = padding.PKCS7(algorithms.AES.block_size).padder()
padded_data = padder.update(data) + padder.finalize()
encryptor = Cipher(
algorithms.AES(self._encryption_key), modes.CBC(iv), self._backend
).encryptor()
ciphertext = encryptor.update(padded_data) + encryptor.finalize()
basic_parts = (
b"\x80" + struct.pack(">Q", current_time) + iv + ciphertext
)
h = HMAC(self._signing_key, hashes.SHA256(), backend=self._backend)
h.update(basic_parts)
hmac = h.finalize()
return base64.urlsafe_b64encode(basic_parts + hmac)
def __init__(self, key, length, algorithm, backend):
if not isinstance(backend, HMACBackend):
raise UnsupportedAlgorithm(
"Backend object does not implement HMACBackend.",
_Reasons.BACKEND_MISSING_INTERFACE
)
if len(key) < 16:
raise ValueError("Key length has to be at least 128 bits.")
if not isinstance(length, six.integer_types):
raise TypeError("Length parameter must be an integer type.")
if length < 6 or length > 8:
raise ValueError("Length of HOTP has to be between 6 to 8.")
if not isinstance(algorithm, (SHA1, SHA256, SHA512)):
raise TypeError("Algorithm must be SHA1, SHA256 or SHA512.")
self._key = key
self._length = length
self._algorithm = algorithm
self._backend = backend
def load_or_create_crl(crl_file, ca_crt, pkey):
if os.path.isfile(crl_file):
with open(crl_file, 'rb') as f:
crl = x509.load_pem_x509_crl(
data=f.read(),
backend=default_backend()
)
else:
crl = x509.CertificateRevocationListBuilder().issuer_name(
ca_crt.subject
).last_update(
datetime.datetime.utcnow()
).next_update(
datetime.datetime.utcnow() + datetime.timedelta(days=365 * 10)
).sign(
private_key=pkey,
algorithm=hashes.SHA256(),
backend=default_backend()
)
with open(crl_file, 'wb') as f:
f.write(crl.public_bytes(
encoding=serialization.Encoding.PEM,
))
return crl
def _encrypt_from_parts(self, data, current_time, iv):
if not isinstance(data, bytes):
raise TypeError("data must be bytes.")
padder = padding.PKCS7(algorithms.AES.block_size).padder()
padded_data = padder.update(data) + padder.finalize()
encryptor = Cipher(
algorithms.AES(self._encryption_key), modes.CBC(iv), self._backend
).encryptor()
ciphertext = encryptor.update(padded_data) + encryptor.finalize()
basic_parts = (
b"\x80" + struct.pack(">Q", current_time) + iv + ciphertext
)
h = HMAC(self._signing_key, hashes.SHA256(), backend=self._backend)
h.update(basic_parts)
hmac = h.finalize()
return base64.urlsafe_b64encode(basic_parts + hmac)
def __init__(self, key, length, algorithm, backend):
if not isinstance(backend, HMACBackend):
raise UnsupportedAlgorithm(
"Backend object does not implement HMACBackend.",
_Reasons.BACKEND_MISSING_INTERFACE
)
if len(key) < 16:
raise ValueError("Key length has to be at least 128 bits.")
if not isinstance(length, six.integer_types):
raise TypeError("Length parameter must be an integer type.")
if length < 6 or length > 8:
raise ValueError("Length of HOTP has to be between 6 to 8.")
if not isinstance(algorithm, (SHA1, SHA256, SHA512)):
raise TypeError("Algorithm must be SHA1, SHA256 or SHA512.")
self._key = key
self._length = length
self._algorithm = algorithm
self._backend = backend
def _encrypt_from_parts(self, data, current_time, iv):
if not isinstance(data, bytes):
raise TypeError("data must be bytes.")
padder = padding.PKCS7(algorithms.AES.block_size).padder()
padded_data = padder.update(data) + padder.finalize()
encryptor = Cipher(
algorithms.AES(self._encryption_key), modes.CBC(iv), self._backend
).encryptor()
ciphertext = encryptor.update(padded_data) + encryptor.finalize()
basic_parts = (
b"\x80" + struct.pack(">Q", current_time) + iv + ciphertext
)
h = HMAC(self._signing_key, hashes.SHA256(), backend=self._backend)
h.update(basic_parts)
hmac = h.finalize()
return base64.urlsafe_b64encode(basic_parts + hmac)
def _encrypt_from_parts(self, data, current_time, iv):
if not isinstance(data, bytes):
raise TypeError("data must be bytes.")
padder = padding.PKCS7(algorithms.AES.block_size).padder()
padded_data = padder.update(data) + padder.finalize()
encryptor = Cipher(
algorithms.AES(self._encryption_key), modes.CBC(iv), self._backend
).encryptor()
ciphertext = encryptor.update(padded_data) + encryptor.finalize()
basic_parts = (
b"\x80" + struct.pack(">Q", current_time) + iv + ciphertext
)
h = HMAC(self._signing_key, hashes.SHA256(), backend=self._backend)
h.update(basic_parts)
hmac = h.finalize()
return base64.urlsafe_b64encode(basic_parts + hmac)
def __init__(self, key, length, algorithm, backend):
if not isinstance(backend, HMACBackend):
raise UnsupportedAlgorithm(
"Backend object does not implement HMACBackend.",
_Reasons.BACKEND_MISSING_INTERFACE
)
if len(key) < 16:
raise ValueError("Key length has to be at least 128 bits.")
if not isinstance(length, six.integer_types):
raise TypeError("Length parameter must be an integer type.")
if length < 6 or length > 8:
raise ValueError("Length of HOTP has to be between 6 to 8.")
if not isinstance(algorithm, (SHA1, SHA256, SHA512)):
raise TypeError("Algorithm must be SHA1, SHA256 or SHA512.")
self._key = key
self._length = length
self._algorithm = algorithm
self._backend = backend
def _user_cert_validation(cert_str):
"""Prompt user for validation of certification from cluster
:param cert_str: cluster certificate bundle
:type cert_str: str
:returns whether or not user validated cert
:rtype: bool
"""
cert = x509.load_pem_x509_certificate(
cert_str.encode('utf-8'), default_backend())
fingerprint = cert.fingerprint(hashes.SHA256())
pp_fingerprint = ":".join("{:02x}".format(c) for c in fingerprint).upper()
msg = "SHA256 fingerprint of cluster certificate bundle:\n{}".format(
pp_fingerprint)
return confirm(msg, False)
def __init__(self):
self._private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=_KEY_SIZE,
backend=default_backend()
)
name = _name(u"Testing CA #" + random_text())
self._certificate = (
_cert_builder_common(name, name, self._private_key.public_key())
.add_extension(
x509.BasicConstraints(ca=True, path_length=9), critical=True,
)
.sign(
private_key=self._private_key,
algorithm=hashes.SHA256(),
backend=default_backend(),
)
)
self.cert_pem = Blob(self._certificate.public_bytes(Encoding.PEM))
def _encrypt_from_parts(self, data, current_time, iv):
if not isinstance(data, bytes):
raise TypeError("data must be bytes.")
padder = padding.PKCS7(algorithms.AES.block_size).padder()
padded_data = padder.update(data) + padder.finalize()
encryptor = Cipher(
algorithms.AES(self._encryption_key), modes.CBC(iv), self._backend
).encryptor()
ciphertext = encryptor.update(padded_data) + encryptor.finalize()
basic_parts = (
b"\x80" + struct.pack(">Q", current_time) + iv + ciphertext
)
h = HMAC(self._signing_key, hashes.SHA256(), backend=self._backend)
h.update(basic_parts)
hmac = h.finalize()
return base64.urlsafe_b64encode(basic_parts + hmac)
def __init__(self, key, length, algorithm, backend):
if not isinstance(backend, HMACBackend):
raise UnsupportedAlgorithm(
"Backend object does not implement HMACBackend.",
_Reasons.BACKEND_MISSING_INTERFACE
)
if len(key) < 16:
raise ValueError("Key length has to be at least 128 bits.")
if not isinstance(length, six.integer_types):
raise TypeError("Length parameter must be an integer type.")
if length < 6 or length > 8:
raise ValueError("Length of HOTP has to be between 6 to 8.")
if not isinstance(algorithm, (SHA1, SHA256, SHA512)):
raise TypeError("Algorithm must be SHA1, SHA256 or SHA512.")
self._key = key
self._length = length
self._algorithm = algorithm
self._backend = backend
def show_Certificate(cert, short=False):
"""
Print Fingerprints, Issuer and Subject of an X509 Certificate.
:param cert: X509 Certificate to print
:param short: Print in shortform for DN (Default: False)
:type cert: :class:`cryptography.x509.Certificate`
:type short: Boolean
"""
for h in [hashes.MD5, hashes.SHA1, hashes.SHA256, hashes.SHA512]:
print("{}: {}".format(h.name, binascii.hexlify(cert.fingerprint(h())).decode("ascii")))
print("Issuer: {}".format(get_Name(cert.issuer, short=short)))
print("Subject: {}".format(get_Name(cert.subject, short=short)))
################################## AXML FORMAT ########################################
# Translated from
# http://code.google.com/p/android4me/source/browse/src/android/content/res/AXmlResourceParser.java
def encrypt_key_with_public_key(secret_key, public_encryption_key):
"""
Encrypts the given secret key with the public key.
:param bytes secret_key: the key to encrypt
:param public_encryption_key: the public encryption key
:type public_encryption_key: :class:`~rsa.RSAPublicKey`
:return: the encrypted key
:rtype: bytes
"""
return public_encryption_key.encrypt(
secret_key,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None))
def decrypt_with_private_key(secret_key, private_encryption_key):
"""
Decrypts the given secret key with the private key.
:param bytes secret_key: the secret key to decrypt
:param private_encryption_key: the private encryption key
:type private_encryption_key: :class:`~rsa.RSAPrivateKey`
:return: the decrypted key
:rtype: bytes
"""
return private_encryption_key.decrypt(
secret_key,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None))
def __init__(self, curve_class, nist_name):
self.nist_name = nist_name
self.key_length = curve_class.key_size
# Defined in RFC 5656 6.2
self.key_format_identifier = "ecdsa-sha2-" + self.nist_name
# Defined in RFC 5656 6.2.1
if self.key_length <= 256:
self.hash_object = hashes.SHA256
elif self.key_length <= 384:
self.hash_object = hashes.SHA384
else:
self.hash_object = hashes.SHA512
self.curve_class = curve_class
def rsa(public_key, signature, message):
"""Verifies an RSA signature.
Args:
public_key (str): Public key with BEGIN and END sections.
signature (str): Hex value of the signature with its leading 0x stripped.
message (str): Message that was signed, unhashed.
"""
try:
public_rsa = load_pem_public_key(bytes(public_key), backend=default_backend())
hashed = util.sha256(message)
public_rsa.verify(
binascii.unhexlify(signature),
hashed,
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH
),
hashes.SHA256()
)
except InvalidSignature:
raise Exception('Invalid signature')