def encrypt(message, receiver_public_key):
sender_private_key = ec.generate_private_key(ec.SECP256K1(), backend)
shared_key = sender_private_key.exchange(ec.ECDH(), receiver_public_key)
sender_public_key = sender_private_key.public_key()
point = sender_public_key.public_numbers().encode_point()
iv = '000000000000'
xkdf = x963kdf.X963KDF(
algorithm = hashes.SHA256(),
length = 32,
sharedinfo = '',
backend = backend
)
key = xkdf.derive(shared_key)
encryptor = Cipher(
algorithms.AES(key),
modes.GCM(iv),
backend = backend
).encryptor()
ciphertext = encryptor.update(message) + encryptor.finalize()
return point + encryptor.tag + ciphertext
python类SHA256的实例源码
def verify_hmac(expected_result, secret_key, unique_prefix, data):
'''
Perform a HMAC using the secret key, unique hash prefix, and data, and
verify that the result of:
HMAC-SHA256(secret_key, unique_prefix | data)
matches the bytes in expected_result.
The key must be kept secret. The prefix ensures hash uniqueness.
Returns True if the signature matches, and False if it does not.
'''
# If the secret key is shorter than the digest size, security is reduced
assert secret_key
assert len(secret_key) >= CryptoHash.digest_size
h = hmac.HMAC(bytes(secret_key), CryptoHash(), backend=default_backend())
h.update(bytes(unique_prefix))
h.update(bytes(data))
try:
h.verify(bytes(expected_result))
return True
except InvalidSignature:
return False
def generate_cert(key_path, cert_out_path):
private_key = load_private_key_file(key_path)
public_key = private_key.public_key()
builder = x509.CertificateBuilder()
builder = builder.subject_name(x509.Name([
x509.NameAttribute(x509.OID_COMMON_NAME, u'PrivCount User'),
]))
builder = builder.issuer_name(x509.Name([
x509.NameAttribute(x509.OID_COMMON_NAME, u'PrivCount Authority'),
]))
builder = builder.not_valid_before(datetime.datetime.today() - datetime.timedelta(days=1))
builder = builder.not_valid_after(datetime.datetime(2020, 1, 1))
builder = builder.serial_number(int(uuid.uuid4()))
builder = builder.public_key(public_key)
builder = builder.add_extension(x509.BasicConstraints(ca=False, path_length=None), critical=True)
certificate = builder.sign(private_key=private_key, algorithm=hashes.SHA256(), backend=default_backend())
with open(cert_out_path, 'wb') as outf:
print >>outf, certificate.public_bytes(encoding=serialization.Encoding.PEM)
def decrypt(message, receiver_private_key):
point = message[0:65]
tag = message[65:81]
ciphertext = message[81:]
sender_public_numbers = ec.EllipticCurvePublicNumbers.from_encoded_point(ec.SECP256K1(), point)
sender_public_key = sender_public_numbers.public_key(backend)
shared_key = receiver_private_key.exchange(ec.ECDH(), sender_public_key)
iv = '000000000000'
xkdf = x963kdf.X963KDF(
algorithm = hashes.SHA256(),
length = 32,
sharedinfo = '',
backend = backend
)
key = xkdf.derive(shared_key)
decryptor = Cipher(
algorithms.AES(key),
modes.GCM(iv,tag),
backend = backend
).decryptor()
message = decryptor.update(ciphertext) + decryptor.finalize()
return message
def _encrypt_from_parts(self, data, current_time, iv):
if not isinstance(data, bytes):
raise TypeError("data must be bytes.")
padder = padding.PKCS7(algorithms.AES.block_size).padder()
padded_data = padder.update(data) + padder.finalize()
encryptor = Cipher(
algorithms.AES(self._encryption_key), modes.CBC(iv), self._backend
).encryptor()
ciphertext = encryptor.update(padded_data) + encryptor.finalize()
basic_parts = (
b"\x80" + struct.pack(">Q", current_time) + iv + ciphertext
)
h = HMAC(self._signing_key, hashes.SHA256(), backend=self._backend)
h.update(basic_parts)
hmac = h.finalize()
return base64.urlsafe_b64encode(basic_parts + hmac)
def __init__(self, key, length, algorithm, backend):
if not isinstance(backend, HMACBackend):
raise UnsupportedAlgorithm(
"Backend object does not implement HMACBackend.",
_Reasons.BACKEND_MISSING_INTERFACE
)
if len(key) < 16:
raise ValueError("Key length has to be at least 128 bits.")
if not isinstance(length, six.integer_types):
raise TypeError("Length parameter must be an integer type.")
if length < 6 or length > 8:
raise ValueError("Length of HOTP has to be between 6 to 8.")
if not isinstance(algorithm, (SHA1, SHA256, SHA512)):
raise TypeError("Algorithm must be SHA1, SHA256 or SHA512.")
self._key = key
self._length = length
self._algorithm = algorithm
self._backend = backend
def _encrypt_from_parts(self, data, current_time, iv):
if not isinstance(data, bytes):
raise TypeError("data must be bytes.")
padder = padding.PKCS7(algorithms.AES.block_size).padder()
padded_data = padder.update(data) + padder.finalize()
encryptor = Cipher(
algorithms.AES(self._encryption_key), modes.CBC(iv), self._backend
).encryptor()
ciphertext = encryptor.update(padded_data) + encryptor.finalize()
basic_parts = (
b"\x80" + struct.pack(">Q", current_time) + iv + ciphertext
)
h = HMAC(self._signing_key, hashes.SHA256(), backend=self._backend)
h.update(basic_parts)
hmac = h.finalize()
return base64.urlsafe_b64encode(basic_parts + hmac)
def post(self):
user = DB.session.query(
User).filter(User.name == self.get_argument(
"name")).one_or_none()
if user:
password = self.get_argument("password").encode('utf-8')
kpdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=self.settings['secret'].encode('utf-8'),
iterations=1000000,
backend=password_backend
)
try:
kpdf.verify(password, user.password)
self.set_secure_cookie("user", self.get_argument("name"))
self.redirect("/dashboard")
except InvalidKey:
print(self.get_argument("password"))
print(user.password)
self.redirect("/login?status=wrongpassword")
else:
self.redirect("/login?status=usernotfound")
def sign(self, msg):
"""Create signature for message
Taken from https://github.com/madeddie/python-bunq - Thanks!
:param msg: data to be signed, usually action, headers and body
:type msg: str
"""
return base64.b64encode(
self.privkey_pem.sign(
msg.encode(),
padding.PKCS1v15(),
hashes.SHA256()
)
).decode()
def kdf_rfc5869_derive(secret_input_bytes, output_len, m_expand=M_EXPAND_NTOR,
t_key=T_KEY_NTOR):
'''
Return output_len bytes generated from secret_input_bytes using RFC5869
with HKDF-SHA256.
There is no equivalent verification function, as only the nonce part of
the KDF result is verified directly.
See https://gitweb.torproject.org/torspec.git/tree/tor-spec.txt#n1026
'''
hkdf_sha256 = hkdf.HKDF(algorithm=hashes.SHA256(),
length=output_len,
info=bytes(m_expand),
salt=bytes(t_key),
backend=backends.default_backend())
output_bytes = hkdf_sha256.derive(bytes(secret_input_bytes))
assert len(output_bytes) == output_len
return bytearray(output_bytes)
def check_password(self, password):
kpdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=SECRET,
iterations=1000000,
backend=password_backend
)
try:
kpdf.verify(password.encode('utf-8'), self.get_password())
correct = True
except InvalidKey as e:
print(e)
correct = False
return correct
def create_master(password):
kpdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=SECRET,
iterations=1000000,
backend=password_backend
)
master_user = User()
master_user.name = 'master'
master_user.when = datetime.datetime.now()
master_user.set_permissions(Permissions.ROOT)
master_user.set_password(kpdf.derive(password.encode('utf-8')))
DB.session.add(master_user)
DB.session.commit()
def create_user(name, password):
kpdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=SECRET,
iterations=1000000,
backend=password_backend
)
user = User()
user.name = name
user.when = datetime.datetime.now()
user.set_permissions(Permissions.USER)
user.set_password(kpdf.derive(password.encode('utf-8')))
DB.session.add(user)
DB.session.commit()
def _encrypt_from_parts(self, data, current_time, iv):
if not isinstance(data, bytes):
raise TypeError("data must be bytes.")
padder = padding.PKCS7(algorithms.AES.block_size).padder()
padded_data = padder.update(data) + padder.finalize()
encryptor = Cipher(
algorithms.AES(self._encryption_key), modes.CBC(iv), self._backend
).encryptor()
ciphertext = encryptor.update(padded_data) + encryptor.finalize()
basic_parts = (
b"\x80" + struct.pack(">Q", current_time) + iv + ciphertext
)
h = HMAC(self._signing_key, hashes.SHA256(), backend=self._backend)
h.update(basic_parts)
hmac = h.finalize()
return base64.urlsafe_b64encode(basic_parts + hmac)
def __init__(self, key, length, algorithm, backend,
enforce_key_length=True):
if not isinstance(backend, HMACBackend):
raise UnsupportedAlgorithm(
"Backend object does not implement HMACBackend.",
_Reasons.BACKEND_MISSING_INTERFACE
)
if len(key) < 16 and enforce_key_length is True:
raise ValueError("Key length has to be at least 128 bits.")
if not isinstance(length, six.integer_types):
raise TypeError("Length parameter must be an integer type.")
if length < 6 or length > 8:
raise ValueError("Length of HOTP has to be between 6 to 8.")
if not isinstance(algorithm, (SHA1, SHA256, SHA512)):
raise TypeError("Algorithm must be SHA1, SHA256 or SHA512.")
self._key = key
self._length = length
self._algorithm = algorithm
self._backend = backend
def _encrypt_from_parts(self, data, current_time, iv):
if not isinstance(data, bytes):
raise TypeError("data must be bytes.")
padder = padding.PKCS7(algorithms.AES.block_size).padder()
padded_data = padder.update(data) + padder.finalize()
encryptor = Cipher(
algorithms.AES(self._encryption_key), modes.CBC(iv), self._backend
).encryptor()
ciphertext = encryptor.update(padded_data) + encryptor.finalize()
basic_parts = (
b"\x80" + struct.pack(">Q", current_time) + iv + ciphertext
)
h = HMAC(self._signing_key, hashes.SHA256(), backend=self._backend)
h.update(basic_parts)
hmac = h.finalize()
return base64.urlsafe_b64encode(basic_parts + hmac)
def __init__(self, key, length, algorithm, backend):
if not isinstance(backend, HMACBackend):
raise UnsupportedAlgorithm(
"Backend object does not implement HMACBackend.",
_Reasons.BACKEND_MISSING_INTERFACE
)
if len(key) < 16:
raise ValueError("Key length has to be at least 128 bits.")
if not isinstance(length, six.integer_types):
raise TypeError("Length parameter must be an integer type.")
if length < 6 or length > 8:
raise ValueError("Length of HOTP has to be between 6 to 8.")
if not isinstance(algorithm, (SHA1, SHA256, SHA512)):
raise TypeError("Algorithm must be SHA1, SHA256 or SHA512.")
self._key = key
self._length = length
self._algorithm = algorithm
self._backend = backend
def _encrypt_from_parts(self, data, current_time, iv):
if not isinstance(data, bytes):
raise TypeError("data must be bytes.")
padder = padding.PKCS7(algorithms.AES.block_size).padder()
padded_data = padder.update(data) + padder.finalize()
encryptor = Cipher(
algorithms.AES(self._encryption_key), modes.CBC(iv), self._backend
).encryptor()
ciphertext = encryptor.update(padded_data) + encryptor.finalize()
basic_parts = (
b"\x80" + struct.pack(">Q", current_time) + iv + ciphertext
)
h = HMAC(self._signing_key, hashes.SHA256(), backend=self._backend)
h.update(basic_parts)
hmac = h.finalize()
return base64.urlsafe_b64encode(basic_parts + hmac)
def _encrypt_from_parts(self, data, current_time, iv):
if not isinstance(data, bytes):
raise TypeError("data must be bytes.")
padder = padding.PKCS7(algorithms.AES.block_size).padder()
padded_data = padder.update(data) + padder.finalize()
encryptor = Cipher(
algorithms.AES(self._encryption_key), modes.CBC(iv), self._backend
).encryptor()
ciphertext = encryptor.update(padded_data) + encryptor.finalize()
basic_parts = (
b"\x80" + struct.pack(">Q", current_time) + iv + ciphertext
)
h = HMAC(self._signing_key, hashes.SHA256(), backend=self._backend)
h.update(basic_parts)
hmac = h.finalize()
return base64.urlsafe_b64encode(basic_parts + hmac)
def __init__(self, key, length, algorithm, backend):
if not isinstance(backend, HMACBackend):
raise UnsupportedAlgorithm(
"Backend object does not implement HMACBackend.",
_Reasons.BACKEND_MISSING_INTERFACE
)
if len(key) < 16:
raise ValueError("Key length has to be at least 128 bits.")
if not isinstance(length, six.integer_types):
raise TypeError("Length parameter must be an integer type.")
if length < 6 or length > 8:
raise ValueError("Length of HOTP has to be between 6 to 8.")
if not isinstance(algorithm, (SHA1, SHA256, SHA512)):
raise TypeError("Algorithm must be SHA1, SHA256 or SHA512.")
self._key = key
self._length = length
self._algorithm = algorithm
self._backend = backend
def _encrypt_from_parts(self, data, current_time, iv):
if not isinstance(data, bytes):
raise TypeError("data must be bytes.")
padder = padding.PKCS7(algorithms.AES.block_size).padder()
padded_data = padder.update(data) + padder.finalize()
encryptor = Cipher(
algorithms.AES(self._encryption_key), modes.CBC(iv), self._backend
).encryptor()
ciphertext = encryptor.update(padded_data) + encryptor.finalize()
basic_parts = (
b"\x80" + struct.pack(">Q", current_time) + iv + ciphertext
)
h = HMAC(self._signing_key, hashes.SHA256(), backend=self._backend)
h.update(basic_parts)
hmac = h.finalize()
return base64.urlsafe_b64encode(basic_parts + hmac)
def __init__(self, key, length, algorithm, backend):
if not isinstance(backend, HMACBackend):
raise UnsupportedAlgorithm(
"Backend object does not implement HMACBackend.",
_Reasons.BACKEND_MISSING_INTERFACE
)
if len(key) < 16:
raise ValueError("Key length has to be at least 128 bits.")
if not isinstance(length, six.integer_types):
raise TypeError("Length parameter must be an integer type.")
if length < 6 or length > 8:
raise ValueError("Length of HOTP has to be between 6 to 8.")
if not isinstance(algorithm, (SHA1, SHA256, SHA512)):
raise TypeError("Algorithm must be SHA1, SHA256 or SHA512.")
self._key = key
self._length = length
self._algorithm = algorithm
self._backend = backend
def _encrypt_from_parts(self, data, current_time, iv):
if not isinstance(data, bytes):
raise TypeError("data must be bytes.")
padder = padding.PKCS7(algorithms.AES.block_size).padder()
padded_data = padder.update(data) + padder.finalize()
encryptor = Cipher(
algorithms.AES(self._encryption_key), modes.CBC(iv), self._backend
).encryptor()
ciphertext = encryptor.update(padded_data) + encryptor.finalize()
basic_parts = (
b"\x80" + struct.pack(">Q", current_time) + iv + ciphertext
)
h = HMAC(self._signing_key, hashes.SHA256(), backend=self._backend)
h.update(basic_parts)
hmac = h.finalize()
return base64.urlsafe_b64encode(basic_parts + hmac)
def _encrypt_from_parts(self, data, current_time, iv):
if not isinstance(data, bytes):
raise TypeError("data must be bytes.")
padder = padding.PKCS7(algorithms.AES.block_size).padder()
padded_data = padder.update(data) + padder.finalize()
encryptor = Cipher(
algorithms.AES(self._encryption_key), modes.CBC(iv), self._backend
).encryptor()
ciphertext = encryptor.update(padded_data) + encryptor.finalize()
basic_parts = (
b"\x80" + struct.pack(">Q", current_time) + iv + ciphertext
)
h = HMAC(self._signing_key, hashes.SHA256(), backend=self._backend)
h.update(basic_parts)
hmac = h.finalize()
return base64.urlsafe_b64encode(basic_parts + hmac)
def __init__(self, key, length, algorithm, backend):
if not isinstance(backend, HMACBackend):
raise UnsupportedAlgorithm(
"Backend object does not implement HMACBackend.",
_Reasons.BACKEND_MISSING_INTERFACE
)
if len(key) < 16:
raise ValueError("Key length has to be at least 128 bits.")
if not isinstance(length, six.integer_types):
raise TypeError("Length parameter must be an integer type.")
if length < 6 or length > 8:
raise ValueError("Length of HOTP has to be between 6 to 8.")
if not isinstance(algorithm, (SHA1, SHA256, SHA512)):
raise TypeError("Algorithm must be SHA1, SHA256 or SHA512.")
self._key = key
self._length = length
self._algorithm = algorithm
self._backend = backend
def sign_request(key, header, protected_header, payload):
"""
Creates a JSON Web Signature for the request header and payload using the
specified account key.
"""
protected = jose_b64(json.dumps(protected_header).encode('utf8'))
payload = jose_b64(json.dumps(payload).encode('utf8'))
signer = key.signer(padding.PKCS1v15(), hashes.SHA256())
signer.update(protected.encode('ascii'))
signer.update(b'.')
signer.update(payload.encode('ascii'))
return json.dumps({
'header': header,
'protected': protected,
'payload': payload,
'signature': jose_b64(signer.finalize()),
})
def create_csr(key, domains, must_staple=False):
"""
Creates a CSR in DER format for the specified key and domain names.
"""
assert domains
name = x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, domains[0]),
])
san = x509.SubjectAlternativeName([x509.DNSName(domain) for domain in domains])
csr = x509.CertificateSigningRequestBuilder().subject_name(name) \
.add_extension(san, critical=False)
if must_staple:
ocsp_must_staple = x509.TLSFeature(features=[x509.TLSFeatureType.status_request])
csr = csr.add_extension(ocsp_must_staple, critical=False)
csr = csr.sign(key, hashes.SHA256(), default_backend())
return export_csr_for_acme(csr)
def _gen_key_initctr(cls, b_password, b_salt):
# 16 for AES 128, 32 for AES256
keylength = 32
# match the size used for counter.new to avoid extra work
ivlength = 16
if HAS_PBKDF2HMAC:
backend = default_backend()
kdf = PBKDF2HMAC(
algorithm=c_SHA256(),
length=2 * keylength + ivlength,
salt=b_salt,
iterations=10000,
backend=backend)
b_derivedkey = kdf.derive(b_password)
else:
b_derivedkey = cls._create_key(b_password, b_salt, keylength, ivlength)
b_key1 = b_derivedkey[:keylength]
b_key2 = b_derivedkey[keylength:(keylength * 2)]
b_iv = b_derivedkey[(keylength * 2):(keylength * 2) + ivlength]
return b_key1, b_key2, hexlify(b_iv)
def kdf(key_material):
"""NIST SP 800-56a Concatenation Key Derivation Function (see section 5.8.1).
Pretty much copied from geth's implementation:
https://github.com/ethereum/go-ethereum/blob/673007d7aed1d2678ea3277eceb7b55dc29cf092/crypto/ecies/ecies.go#L167
"""
key = b""
hash_blocksize = hashes.SHA256().block_size
reps = ((KEY_LEN + 7) * 8) / (hash_blocksize * 8)
counter = 0
while counter <= reps:
counter += 1
ctx = sha256()
ctx.update(struct.pack('>I', counter))
ctx.update(key_material)
key += ctx.digest()
return key[:KEY_LEN]
def sign_test(self):
"""???/??? ?? ? ?? ???
:return: ??? ??(True/False)
"""
if self.is_secure is False:
logging.debug("CA is not secure_mode")
return False
data = b"test"
signature = self.__ca_pri.sign(
data,
ec.ECDSA(hashes.SHA256())
)
try:
pub_key = self.__ca_cert.public_key()
return pub_key.verify(
signature,
data,
ec.ECDSA(hashes.SHA256())
)
except InvalidSignature:
logging.debug("cert test fail!!!")
return False