def _test_private_jwk(key):
"""
Attempt to read in the key into a private key object
"""
keys = json.loads(key.decode())
public_key_numbers = rsa.RSAPublicNumbers(
long_from_bytes(keys['keys'][0]['e']),
long_from_bytes(keys['keys'][0]['n'])
)
private_key_numbers = rsa.RSAPrivateNumbers(
long_from_bytes(keys['keys'][0]['p']),
long_from_bytes(keys['keys'][0]['q']),
long_from_bytes(keys['keys'][0]['d']),
long_from_bytes(keys['keys'][0]['dp']),
long_from_bytes(keys['keys'][0]['dq']),
long_from_bytes(keys['keys'][0]['qi']),
public_key_numbers
)
return private_key_numbers.private_key(default_backend())
python类RSAPublicNumbers()的实例源码
def __init__(self, msg=None, data=None, filename=None, password=None,
key=None, file_obj=None):
self.key = None
if file_obj is not None:
self._from_private_key(file_obj, password)
return
if filename is not None:
self._from_private_key_file(filename, password)
return
if (msg is None) and (data is not None):
msg = Message(data)
if key is not None:
self.key = key
else:
if msg is None:
raise SSHException('Key object may not be empty')
if msg.get_text() != 'ssh-rsa':
raise SSHException('Invalid key')
self.key = rsa.RSAPublicNumbers(
e=msg.get_mpint(), n=msg.get_mpint()
).public_key(default_backend())
def calculate_cost(self):
"""Calculate the cost of fulfilling self condition.
The cost of the RSA condition is the size of the modulus
squared, divided By 64.
Returns:
int: Expected maximum cost to fulfill self condition.
"""
if self.modulus is None:
raise MissingDataError('Requires a public modulus')
public_numbers = RSAPublicNumbers(
PUBLIC_EXPONENT,
int.from_bytes(self.modulus, byteorder='big'),
)
public_key = public_numbers.public_key(default_backend())
modulus_bit_length = public_key.key_size
# TODO watch out >> in Python is not the sane as JS >>>, may need to be
# corrected. For instance see:
# http://grokbase.com/t/python/python-list/0454t3tgaw/zero-fill-shift
return int(math.pow(modulus_bit_length, 2)) >> RsaSha256.COST_RIGHT_SHIFT
def __init__(self, msg=None, data=None, filename=None, password=None,
key=None, file_obj=None):
self.key = None
if file_obj is not None:
self._from_private_key(file_obj, password)
return
if filename is not None:
self._from_private_key_file(filename, password)
return
if (msg is None) and (data is not None):
msg = Message(data)
if key is not None:
self.key = key
else:
if msg is None:
raise SSHException('Key object may not be empty')
if msg.get_text() != 'ssh-rsa':
raise SSHException('Invalid key')
self.key = rsa.RSAPublicNumbers(
e=msg.get_mpint(), n=msg.get_mpint()
).public_key(default_backend())
def __init__(self, msg=None, data=None, filename=None, password=None, key=None, file_obj=None):
self.key = None
if file_obj is not None:
self._from_private_key(file_obj, password)
return
if filename is not None:
self._from_private_key_file(filename, password)
return
if (msg is None) and (data is not None):
msg = Message(data)
if key is not None:
self.key = key
else:
if msg is None:
raise SSHException('Key object may not be empty')
if msg.get_text() != 'ssh-rsa':
raise SSHException('Invalid key')
self.key = rsa.RSAPublicNumbers(
e=msg.get_mpint(), n=msg.get_mpint()
).public_key(default_backend())
def __init__(self, msg=None, data=None, filename=None, password=None, key=None, file_obj=None):
self.key = None
if file_obj is not None:
self._from_private_key(file_obj, password)
return
if filename is not None:
self._from_private_key_file(filename, password)
return
if (msg is None) and (data is not None):
msg = Message(data)
if key is not None:
self.key = key
else:
if msg is None:
raise SSHException('Key object may not be empty')
if msg.get_text() != 'ssh-rsa':
raise SSHException('Invalid key')
self.key = rsa.RSAPublicNumbers(
e=msg.get_mpint(), n=msg.get_mpint()
).public_key(default_backend())
def _load_ssh_rsa_public_key(key_type, decoded_data, backend):
e, rest = _read_next_mpint(decoded_data)
n, rest = _read_next_mpint(rest)
if rest:
raise ValueError('Key body contains extra bytes.')
return rsa.RSAPublicNumbers(e, n).public_key(backend)
def _load_ssh_rsa_public_key(key_type, decoded_data, backend):
e, rest = _read_next_mpint(decoded_data)
n, rest = _read_next_mpint(rest)
if rest:
raise ValueError('Key body contains extra bytes.')
return rsa.RSAPublicNumbers(e, n).public_key(backend)
def fields_from_json(cls, jobj):
# pylint: disable=invalid-name
n, e = (cls._decode_param(jobj[x]) for x in ('n', 'e'))
public_numbers = rsa.RSAPublicNumbers(e=e, n=n)
if 'd' not in jobj: # public key
key = public_numbers.public_key(default_backend())
else: # private key
d = cls._decode_param(jobj['d'])
if ('p' in jobj or 'q' in jobj or 'dp' in jobj or
'dq' in jobj or 'qi' in jobj or 'oth' in jobj):
# "If the producer includes any of the other private
# key parameters, then all of the others MUST be
# present, with the exception of "oth", which MUST
# only be present when more than two prime factors
# were used."
p, q, dp, dq, qi, = all_params = tuple(
jobj.get(x) for x in ('p', 'q', 'dp', 'dq', 'qi'))
if tuple(param for param in all_params if param is None):
raise errors.Error(
'Some private parameters are missing: {0}'.format(
all_params))
p, q, dp, dq, qi = tuple(
cls._decode_param(x) for x in all_params)
# TODO: check for oth
else:
# cryptography>=0.8
p, q = rsa.rsa_recover_prime_factors(n, e, d)
dp = rsa.rsa_crt_dmp1(d, p)
dq = rsa.rsa_crt_dmq1(d, q)
qi = rsa.rsa_crt_iqmp(p, q)
key = rsa.RSAPrivateNumbers(
p, q, d, dp, dq, qi, public_numbers).private_key(
default_backend())
return cls(key=key)
def _load_ssh_rsa_public_key(key_type, decoded_data, backend):
e, rest = _ssh_read_next_mpint(decoded_data)
n, rest = _ssh_read_next_mpint(rest)
if rest:
raise ValueError('Key body contains extra bytes.')
return rsa.RSAPublicNumbers(e, n).public_key(backend)
def _load_ssh_rsa_public_key(key_type, decoded_data, backend):
e, rest = _read_next_mpint(decoded_data)
n, rest = _read_next_mpint(rest)
if rest:
raise ValueError('Key body contains extra bytes.')
return rsa.RSAPublicNumbers(e, n).public_key(backend)
def _load_ssh_rsa_public_key(key_type, decoded_data, backend):
e, rest = _read_next_mpint(decoded_data)
n, rest = _read_next_mpint(rest)
if rest:
raise ValueError('Key body contains extra bytes.')
return rsa.RSAPublicNumbers(e, n).public_key(backend)
def _load_ssh_rsa_public_key(key_type, decoded_data, backend):
e, rest = _read_next_mpint(decoded_data)
n, rest = _read_next_mpint(rest)
if rest:
raise ValueError('Key body contains extra bytes.')
return rsa.RSAPublicNumbers(e, n).public_key(backend)
def _load_ssh_rsa_public_key(key_type, decoded_data, backend):
e, rest = _read_next_mpint(decoded_data)
n, rest = _read_next_mpint(rest)
if rest:
raise ValueError('Key body contains extra bytes.')
return rsa.RSAPublicNumbers(e, n).public_key(backend)
def _load_ssh_rsa_public_key(key_type, decoded_data, backend):
e, rest = _read_next_mpint(decoded_data)
n, rest = _read_next_mpint(rest)
if rest:
raise ValueError('Key body contains extra bytes.')
return rsa.RSAPublicNumbers(e, n).public_key(backend)
def _load_ssh_rsa_public_key(key_type, decoded_data, backend):
e, rest = _read_next_mpint(decoded_data)
n, rest = _read_next_mpint(rest)
if rest:
raise ValueError('Key body contains extra bytes.')
return rsa.RSAPublicNumbers(e, n).public_key(backend)
def rsa_public_key_from_map(jwk):
nb64url = jwk['n'].encode('utf-8')
eb64url = jwk['e'].encode('utf-8')
n = util.parse_rsa_modules_params(nb64url)
e = util.parse_rsa_public_exponent_param(eb64url)
public_key = rsa.RSAPublicNumbers(e, n).public_key(default_backend())
return RSAPublicKey(public_key)
def fields_from_json(cls, jobj):
# pylint: disable=invalid-name
n, e = (cls._decode_param(jobj[x]) for x in ('n', 'e'))
public_numbers = rsa.RSAPublicNumbers(e=e, n=n)
if 'd' not in jobj: # public key
key = public_numbers.public_key(default_backend())
else: # private key
d = cls._decode_param(jobj['d'])
if ('p' in jobj or 'q' in jobj or 'dp' in jobj or
'dq' in jobj or 'qi' in jobj or 'oth' in jobj):
# "If the producer includes any of the other private
# key parameters, then all of the others MUST be
# present, with the exception of "oth", which MUST
# only be present when more than two prime factors
# were used."
p, q, dp, dq, qi, = all_params = tuple(
jobj.get(x) for x in ('p', 'q', 'dp', 'dq', 'qi'))
if tuple(param for param in all_params if param is None):
raise errors.Error(
'Some private parameters are missing: {0}'.format(
all_params))
p, q, dp, dq, qi = tuple(
cls._decode_param(x) for x in all_params)
# TODO: check for oth
else:
# cryptography>=0.8
p, q = rsa.rsa_recover_prime_factors(n, e, d)
dp = rsa.rsa_crt_dmp1(d, p)
dq = rsa.rsa_crt_dmq1(d, q)
qi = rsa.rsa_crt_iqmp(p, q)
key = rsa.RSAPrivateNumbers(
p, q, d, dp, dq, qi, public_numbers).private_key(
default_backend())
return cls(key=key)
def _test_public_jwk(key):
"""
Attempt to read in the key into a key object
"""
keys = json.loads(key.decode())
public_key_numbers = rsa.RSAPublicNumbers(
long_from_bytes(keys['keys'][0]['e']),
long_from_bytes(keys['keys'][0]['n'])
)
return public_key_numbers.public_key(default_backend())
def _load_ssh_rsa_public_key(key_type, decoded_data, backend):
e, rest = _read_next_mpint(decoded_data)
n, rest = _read_next_mpint(rest)
if rest:
raise ValueError('Key body contains extra bytes.')
return rsa.RSAPublicNumbers(e, n).public_key(backend)
def _load_ssh_rsa_public_key(key_type, decoded_data, backend):
e, rest = _read_next_mpint(decoded_data)
n, rest = _read_next_mpint(rest)
if rest:
raise ValueError('Key body contains extra bytes.')
return rsa.RSAPublicNumbers(e, n).public_key(backend)
def _load_ssh_rsa_public_key(key_type, decoded_data, backend):
e, rest = _read_next_mpint(decoded_data)
n, rest = _read_next_mpint(rest)
if rest:
raise ValueError('Key body contains extra bytes.')
return rsa.RSAPublicNumbers(e, n).public_key(backend)
def _load_ssh_rsa_public_key(key_type, decoded_data, backend):
e, rest = _read_next_mpint(decoded_data)
n, rest = _read_next_mpint(rest)
if rest:
raise ValueError('Key body contains extra bytes.')
return rsa.RSAPublicNumbers(e, n).public_key(backend)
def compute_rsa_private_key(cls, p, q, e, n, d):
"""
Computes RSA private key based on provided RSA semi-primes
and returns cryptography lib instance.
@developer: tnanoba
:param p: int
:param q: int
:param e: int
:param n: int
:param d: int
:return: object
"""
# Computes: d % (p - 1)
dmp1 = rsa.rsa_crt_dmp1(d, p)
# Computes: d % (q - 1)
dmq1 = rsa.rsa_crt_dmq1(d, q)
# Modular inverse q of p, (q ^ -1 mod p)
iqmp = rsa.rsa_crt_iqmp(p, q)
public_numbers = rsa.RSAPublicNumbers(e, n)
return rsa.RSAPrivateNumbers(p, q, d, dmp1, dmq1, iqmp, public_numbers).private_key(default_backend())
def compute_rsa_public_key(cls, e, n):
"""
Computes RSA public key based on provided RSA semi-primes (e, n)
and returns cryptography lib instance.
:return: object
"""
public_numbers = rsa.RSAPublicNumbers(e, n)
return public_numbers.public_key(default_backend())
def _load_ssh_rsa_public_key(key_type, decoded_data, backend):
e, rest = _ssh_read_next_mpint(decoded_data)
n, rest = _ssh_read_next_mpint(rest)
if rest:
raise ValueError('Key body contains extra bytes.')
return rsa.RSAPublicNumbers(e, n).public_key(backend)
def validate(self, message):
"""Verify the signature of self RSA fulfillment.
The signature of self RSA fulfillment is verified against the
provided message and the condition's public modulus.
Args:
message (bytes): Message to verify.
Returns:
bool: Whether self fulfillment is valid.
"""
if not isinstance(message, bytes):
raise Exception(
'Message must be provided as bytes, was: ' + message)
public_numbers = RSAPublicNumbers(
PUBLIC_EXPONENT,
int.from_bytes(self.modulus, byteorder='big'),
)
public_key = public_numbers.public_key(default_backend())
verifier = public_key.verifier(
self.signature,
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=SALT_LENGTH,
),
hashes.SHA256()
)
verifier.update(message)
try:
verifier.verify()
except InvalidSignature as exc:
raise ValidationError('Invalid RSA signature') from exc
return True
def _load_ssh_rsa_public_key(key_type, decoded_data, backend):
e, rest = _ssh_read_next_mpint(decoded_data)
n, rest = _ssh_read_next_mpint(rest)
if rest:
raise ValueError('Key body contains extra bytes.')
return rsa.RSAPublicNumbers(e, n).public_key(backend)
def _load_ssh_rsa_public_key(key_type, decoded_data, backend):
e, rest = _ssh_read_next_mpint(decoded_data)
n, rest = _ssh_read_next_mpint(rest)
if rest:
raise ValueError('Key body contains extra bytes.')
return rsa.RSAPublicNumbers(e, n).public_key(backend)
def _load_ssh_rsa_public_key(key_type, decoded_data, backend):
e, rest = _read_next_mpint(decoded_data)
n, rest = _read_next_mpint(rest)
if rest:
raise ValueError('Key body contains extra bytes.')
return rsa.RSAPublicNumbers(e, n).public_key(backend)