def _sign_string(message, private_key_file=None, private_key_string=None):
"""
Signs a string for use with Amazon CloudFront.
Requires the rsa library be installed.
"""
try:
import rsa
except ImportError:
raise NotImplementedError("Boto depends on the python rsa "
"library to generate signed URLs for "
"CloudFront")
# Make sure only one of private_key_file and private_key_string is set
if private_key_file and private_key_string:
raise ValueError("Only specify the private_key_file or the private_key_string not both")
if not private_key_file and not private_key_string:
raise ValueError("You must specify one of private_key_file or private_key_string")
# If private_key_file is a file name, open it and read it
if private_key_string is None:
if isinstance(private_key_file, six.string_types):
with open(private_key_file, 'r') as file_handle:
private_key_string = file_handle.read()
# Otherwise, treat it like a file
else:
private_key_string = private_key_file.read()
# Sign it!
private_key = rsa.PrivateKey.load_pkcs1(private_key_string)
signature = rsa.sign(str(message), private_key, 'SHA-1')
return signature
python类sign()的实例源码
def perform_operation(self, indata, priv_key, cli_args):
'''Decrypts files.'''
hash_method = cli_args[1]
if hash_method not in HASH_METHODS:
raise SystemExit('Invalid hash method, choose one of %s' %
', '.join(HASH_METHODS))
return rsa.sign(indata, priv_key, hash_method)
def handler(self, operation_name=None, request=None, **kwargs):
# This is typically hooked up to the "request-created" event
# from a client's event emitter. When a new request is created
# this method is invoked to sign the request.
# Don't call this method directly.
return self.sign(operation_name, request)
def sign(self, operation_name, request):
"""Sign a request before it goes out over the wire.
:type operation_name: string
:param operation_name: The name of the current operation, e.g.
``ListBuckets``.
:type request: KSRequest
:param request: The request object to be sent over the wire.
"""
signature_version = self._signature_version
# Allow overriding signature version. A response of a blank
# string means no signing is performed. A response of ``None``
# means that the default signing method is used.
handler, response = self._event_emitter.emit_until_response(
'choose-signer.{0}.{1}'.format(self._service_name, operation_name),
signing_name=self._signing_name, region_name=self._region_name,
signature_version=signature_version)
if response is not None:
signature_version = response
# Allow mutating request before signing
self._event_emitter.emit(
'before-sign.{0}.{1}'.format(self._service_name, operation_name),
request=request, signing_name=self._signing_name,
region_name=self._region_name,
signature_version=signature_version, request_signer=self)
if signature_version != kscore.UNSIGNED:
signer = self.get_auth_instance(self._signing_name,
self._region_name,
signature_version)
signer.add_auth(request=request)
def handler(self, operation_name=None, request=None, **kwargs):
# This is typically hooked up to the "request-created" event
# from a client's event emitter. When a new request is created
# this method is invoked to sign the request.
# Don't call this method directly.
return self.sign(operation_name, request)
def _choose_signer(self, operation_name, signing_type):
"""
Allow setting the signature version via the choose-signer event.
A value of `botocore.UNSIGNED` means no signing will be performed.
:param operation_name: The operation to sign.
:param signing_type: The type of signing that the signer is to be used
for.
:return: The signature version to sign with.
"""
signing_type_suffix_map = {
'presign-post': '-presign-post',
'presign-url': '-query'
}
suffix = signing_type_suffix_map.get(signing_type, '')
signature_version = self._signature_version
if signature_version is not botocore.UNSIGNED and not \
signature_version.endswith(suffix):
signature_version += suffix
handler, response = self._event_emitter.emit_until_response(
'choose-signer.{0}.{1}'.format(self._service_name, operation_name),
signing_name=self._signing_name, region_name=self._region_name,
signature_version=signature_version)
if response is not None:
signature_version = response
# The suffix needs to be checked again in case we get an improper
# signature version from choose-signer.
if signature_version is not botocore.UNSIGNED and not \
signature_version.endswith(suffix):
signature_version += suffix
return signature_version
def generate_presigned_url(self, request_dict, operation_name,
expires_in=3600, region_name=None):
"""Generates a presigned url
:type request_dict: dict
:param request_dict: The prepared request dictionary returned by
``botocore.awsrequest.prepare_request_dict()``
:type operation_name: str
:param operation_name: The operation being signed.
:type expires_in: int
:param expires_in: The number of seconds the presigned url is valid
for. By default it expires in an hour (3600 seconds)
:type region_name: string
:param region_name: The region name to sign the presigned url.
:returns: The presigned url
"""
request = create_request_object(request_dict)
self.sign(operation_name, request, region_name,
'presign-url', expires_in)
request.prepare()
return request.url
def signMessage(self, data):
signature = ""
try:
signature = rsa.sign(data, self.DRMPrivateKey, 'SHA-1')
except Exception as e:
debug("crypt", "Failed to sign the message to the remote host. -> %s" % str(e))
return signature
def verifyTrustedSignature(self, data, signature):
result = False
try:
rsa.verify(data, signature, self.DRMTrustedKey)
result = True
except Exception as e:
debug("crypt", "Verifying the signature of the remote host's message. -> %s" % str(e))
return result
# Encrypt to recipient's public key, and sign with our private key
def encryptAndSign(self, plaintext, recipientPublic):
try:
content = rsa.encrypt(plaintext, recipientPublic)
signature = rsa.sign(content, self.DRMPrivateKey, "SHA-1")
return pickle.dumps([signature, content])
except Exception as e:
debug("crypt", "Failed encrypting the message. -> %s" % str(e))
return
# Verify signature with sender's public key, then proceed to use
# our own private key to decrypt the message
# pass the expected timestamp, to thawrt rplay attacks. gets cryptd
def perform_operation(self, indata, priv_key, cli_args):
"""Signs files."""
hash_method = cli_args[1]
if hash_method not in HASH_METHODS:
raise SystemExit('Invalid hash method, choose one of %s' %
', '.join(HASH_METHODS))
return rsa.sign(indata, priv_key, hash_method)
def perform_operation(self, indata, priv_key, cli_args):
"""Signs files."""
hash_method = cli_args[1]
if hash_method not in HASH_METHODS:
raise SystemExit('Invalid hash method, choose one of %s' %
', '.join(HASH_METHODS))
return rsa.sign(indata, priv_key, hash_method)
def perform_operation(self, indata, priv_key, cli_args):
"""Signs files."""
hash_method = cli_args[1]
if hash_method not in HASH_METHODS:
raise SystemExit('Invalid hash method, choose one of %s' %
', '.join(HASH_METHODS))
return rsa.sign(indata, priv_key, hash_method)
def perform_operation(self, indata, priv_key, cli_args):
"""Signs files."""
hash_method = cli_args[1]
if hash_method not in HASH_METHODS:
raise SystemExit('Invalid hash method, choose one of %s' %
', '.join(HASH_METHODS))
return rsa.sign(indata, priv_key, hash_method)
def perform_operation(self, indata, priv_key, cli_args):
'''Decrypts files.'''
hash_method = cli_args[1]
if hash_method not in HASH_METHODS:
raise SystemExit('Invalid hash method, choose one of %s' %
', '.join(HASH_METHODS))
return rsa.sign(indata, priv_key, hash_method)
def perform_operation(self, indata, priv_key, cli_args):
"""Signs files."""
hash_method = cli_args[1]
if hash_method not in HASH_METHODS:
raise SystemExit('Invalid hash method, choose one of %s' %
', '.join(HASH_METHODS))
return rsa.sign(indata, priv_key, hash_method)
def _create_signing_params(self, url, keypair_id,
expire_time=None, valid_after_time=None,
ip_address=None, policy_url=None,
private_key_file=None, private_key_string=None):
"""
Creates the required URL parameters for a signed URL.
"""
params = {}
# Check if we can use a canned policy
if expire_time and not valid_after_time and not ip_address and not policy_url:
# we manually construct this policy string to ensure formatting
# matches signature
policy = self._canned_policy(url, expire_time)
params["Expires"] = str(expire_time)
else:
# If no policy_url is specified, default to the full url.
if policy_url is None:
policy_url = url
# Can't use canned policy
policy = self._custom_policy(policy_url, expires=expire_time,
valid_after=valid_after_time,
ip_address=ip_address)
encoded_policy = self._url_base64_encode(policy)
params["Policy"] = encoded_policy
#sign the policy
signature = self._sign_string(policy, private_key_file, private_key_string)
#now base64 encode the signature (URL safe as well)
encoded_signature = self._url_base64_encode(signature)
params["Signature"] = encoded_signature
params["Key-Pair-Id"] = keypair_id
return params
def _sign_string(message, private_key_file=None, private_key_string=None):
"""
Signs a string for use with Amazon CloudFront.
Requires the rsa library be installed.
"""
try:
import rsa
except ImportError:
raise NotImplementedError("Boto depends on the python rsa "
"library to generate signed URLs for "
"CloudFront")
# Make sure only one of private_key_file and private_key_string is set
if private_key_file and private_key_string:
raise ValueError("Only specify the private_key_file or the private_key_string not both")
if not private_key_file and not private_key_string:
raise ValueError("You must specify one of private_key_file or private_key_string")
# If private_key_file is a file name, open it and read it
if private_key_string is None:
if isinstance(private_key_file, six.string_types):
with open(private_key_file, 'r') as file_handle:
private_key_string = file_handle.read()
# Otherwise, treat it like a file
else:
private_key_string = private_key_file.read()
# Sign it!
private_key = rsa.PrivateKey.load_pkcs1(private_key_string)
signature = rsa.sign(str(message), private_key, 'SHA-1')
return signature
def perform_operation(self, indata, priv_key, cli_args):
"""Signs files."""
hash_method = cli_args[1]
if hash_method not in HASH_METHODS:
raise SystemExit('Invalid hash method, choose one of %s' %
', '.join(HASH_METHODS))
return rsa.sign(indata, priv_key, hash_method)
def handler(self, operation_name=None, request=None, **kwargs):
# This is typically hooked up to the "request-created" event
# from a client's event emitter. When a new request is created
# this method is invoked to sign the request.
# Don't call this method directly.
return self.sign(operation_name, request)