def get_s3_client():
endpoint_url = os.environ.get("S3_ENDPOINT_URL")
s3_client = boto3.client('s3',
# region_name='us-east-1',
aws_access_key_id=config['STORAGE_ACCESS_KEY_ID'],
config=Config(signature_version='s3v4'),
aws_secret_access_key=config['STORAGE_SECRET_ACCESS_KEY'],
endpoint_url=endpoint_url
)
if endpoint_url:
try:
s3 = boto3.resource('s3',
aws_access_key_id=config['STORAGE_ACCESS_KEY_ID'],
config=Config(signature_version='s3v4'),
aws_secret_access_key=config['STORAGE_SECRET_ACCESS_KEY'],
endpoint_url=endpoint_url)
s3.create_bucket(Bucket=config['STORAGE_BUCKET_NAME'])
bucket = s3.Bucket(config['STORAGE_BUCKET_NAME'])
bucket.Acl().put(ACL='public-read')
except: # noqa
logging.exception('Failed to create the bucket')
pass
return s3_client
python类client()的实例源码
test_f_aws_encryption_sdk_client.py 文件源码
项目:aws-encryption-sdk-python
作者: awslabs
项目源码
文件源码
阅读 24
收藏 0
点赞 0
评论 0
def fake_kms_client(keysize=32):
mock_kms_client = MagicMock(__class__=botocore.client.BaseClient)
mock_kms_client.generate_data_key.return_value = {
'Plaintext': VALUES['data_keys'][keysize]['plaintext'],
'CiphertextBlob': VALUES['data_keys'][keysize]['encrypted'],
'KeyId': VALUES['arn']
}
mock_kms_client.encrypt.return_value = {
'CiphertextBlob': VALUES['data_keys'][keysize]['encrypted'],
'KeyId': VALUES['arn']
}
mock_kms_client.decrypt.return_value = {
'Plaintext': VALUES['data_keys'][keysize]['plaintext'],
'KeyId': VALUES['arn']
}
return mock_kms_client
def _client(self, key_id):
"""Returns a Boto3 KMS client for the appropriate region.
:param str key_id: KMS CMK ID
"""
try:
region_name = key_id.split(':', 4)[3]
if self.default_region is None:
self.default_region = region_name
except IndexError:
if self.default_region is None:
raise UnknownRegionError(
'No default region found and no region determinable from key id: {}'.format(key_id)
)
region_name = self.default_region
self.add_regional_client(region_name)
return self._regional_clients[region_name]
def info(auth_token):
"""Authorize a client for the file uploading.
:param auth_token: authentication token to test
"""
# Verify client, deny access if not verified
try:
# Get request payload
userid = services.get_user_id(auth_token)
if userid is None:
return Response(status=401)
# Make response payload
urls = []
for scheme, port in [('http', '80'), ('https', '443')]:
for host, path in [(config['STORAGE_BUCKET_NAME'], '')]:
urls.extend([
'%s://%s:%s/%s' % (scheme, host, port, path),
'%s://%s/%s' % (scheme, host, path),
])
urls = [os.path.join(url, userid) for url in urls]
response_payload = {
'prefixes': urls
}
# Return response payload
return json.dumps(response_payload)
except Exception as exception:
logging.exception('Bad request (info)')
return Response(status=400)
def presign(auth_token, url, ownerid=None):
"""Generates S3 presigned URLs if necessary
:param auth_token: authentication token from auth
:param ownerid: ownerid for dataset
:param url: url to check for sigend URL
"""
s3 = get_s3_client()
try:
needs_signed_url = requests.head(url)
if needs_signed_url.status_code != 403:
return json.dumps({'url': url})
# Verify client, deny access if not verified
if ownerid is None:
return Response(status=401)
if not services.verify(auth_token, ownerid):
return Response(status=403)
parsed_url = urllib.parse.urlparse(url)
bucket = parsed_url.netloc
key = parsed_url.path.lstrip('/')
# Handle s3 path-style URLs
if bucket.endswith('amazonaws.com'):
bucket, key = key.split('/', 1)
# Make sure file belongs to user (only in case of pkgstore)
if (config['STORAGE_BUCKET_NAME'] != bucket) and (ownerid not in url):
return Response(status=403)
signed_url = s3.generate_presigned_url(
ClientMethod='get_object',
Params={
'Bucket': bucket,
'Key': key
},
ExpiresIn=3600*24)
return json.dumps({'url': signed_url})
except Exception as exception:
logging.exception('Bad request')
return Response(status=400)
def get_default_client_config(self):
"""Retrieves the default config for creating clients
:rtype: botocore.client.Config
:returns: The default client config object when creating clients. If
the value is ``None`` then there is no default config object
attached to the session.
"""
return self._client_config
def set_default_client_config(self, client_config):
"""Sets the default config for creating clients
:type client_config: botocore.client.Config
:param client_config: The default client config object when creating
clients. If the value is ``None`` then there is no default config
object attached to the session.
"""
self._client_config = client_config
def get_default_client_config(self):
"""Retrieves the default config for creating clients
:rtype: botocore.client.Config
:returns: The default client config object when creating clients. If
the value is ``None`` then there is no default config object
attached to the session.
"""
return self._client_config
def set_default_client_config(self, client_config):
"""Sets the default config for creating clients
:type client_config: botocore.client.Config
:param client_config: The default client config object when creating
clients. If the value is ``None`` then there is no default config
object attached to the session.
"""
self._client_config = client_config
def patch():
if getattr(botocore.client, '_datadog_patch', False):
return
setattr(botocore.client, '_datadog_patch', True)
wrapt.wrap_function_wrapper('botocore.client', 'BaseClient._make_api_call', patched_api_call)
Pin(service="aws", app="aws", app_type="web").onto(botocore.client.BaseClient)
def unpatch():
if getattr(botocore.client, '_datadog_patch', False):
setattr(botocore.client, '_datadog_patch', False)
unwrap(botocore.client.BaseClient, '_make_api_call')
def get_default_client_config(self):
"""Retrieves the default config for creating clients
:rtype: botocore.client.Config
:returns: The default client config object when creating clients. If
the value is ``None`` then there is no default config object
attached to the session.
"""
return self._client_config
def set_default_client_config(self, client_config):
"""Sets the default config for creating clients
:type client_config: botocore.client.Config
:param client_config: The default client config object when creating
clients. If the value is ``None`` then there is no default config
object attached to the session.
"""
self._client_config = client_config
test_rolling_upgrades.py 文件源码
项目:asg-rolling-upgrade
作者: crunch-accounting
项目源码
文件源码
阅读 19
收藏 0
点赞 0
评论 0
def mock_paginator():
return mock.Mock(spec=botocore.client.Paginator)
test_rolling_upgrades.py 文件源码
项目:asg-rolling-upgrade
作者: crunch-accounting
项目源码
文件源码
阅读 24
收藏 0
点赞 0
评论 0
def mock_ec2():
return mock.Mock(spec=botocore.client)
def get_default_client_config(self):
"""Retrieves the default config for creating clients
:rtype: botocore.client.Config
:returns: The default client config object when creating clients. If
the value is ``None`` then there is no default config object
attached to the session.
"""
return self._client_config
def set_default_client_config(self, client_config):
"""Sets the default config for creating clients
:type client_config: botocore.client.Config
:param client_config: The default client config object when creating
clients. If the value is ``None`` then there is no default config
object attached to the session.
"""
self._client_config = client_config
def get_default_client_config(self):
"""Retrieves the default config for creating clients
:rtype: botocore.client.Config
:returns: The default client config object when creating clients. If
the value is ``None`` then there is no default config object
attached to the session.
"""
return self._client_config
def set_default_client_config(self, client_config):
"""Sets the default config for creating clients
:type client_config: botocore.client.Config
:param client_config: The default client config object when creating
clients. If the value is ``None`` then there is no default config
object attached to the session.
"""
self._client_config = client_config
def get_s3_client():
return boto3.client(
's3', 'us-east-1', config=Config(
s3={'addressing_style': 'path'}
)
)
def __init__(self, max_tries=30, interval=5):
self.s3_client = get_s3_client()
self.athena_client = boto3.client(
'athena',
region_name='us-east-1'
)
self.max_tries = max_tries
self.interval = interval
test_providers_kms_master_key_provider.py 文件源码
项目:aws-encryption-sdk-python
作者: awslabs
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def setUp(self):
self.mock_botocore_session_patcher = patch(
'aws_encryption_sdk.key_providers.kms.botocore.session.Session'
)
self.mock_botocore_session = self.mock_botocore_session_patcher.start()
self.mock_boto3_session_patcher = patch(
'aws_encryption_sdk.key_providers.kms.boto3.session.Session'
)
self.mock_boto3_session = self.mock_boto3_session_patcher.start()
self.mock_boto3_session_instance = MagicMock()
self.mock_boto3_session.return_value = self.mock_boto3_session_instance
self.mock_boto3_client_instance = MagicMock()
self.mock_boto3_client_instance.__class__ = botocore.client.BaseClient
self.mock_boto3_session_instance.client.return_value = self.mock_boto3_client_instance
test_providers_kms_master_key_provider.py 文件源码
项目:aws-encryption-sdk-python
作者: awslabs
项目源码
文件源码
阅读 18
收藏 0
点赞 0
评论 0
def test_add_regional_client_new(self):
test = KMSMasterKeyProvider()
test._regional_clients = {}
test.add_regional_client('ex_region_name')
self.mock_boto3_session.assert_called_once_with(
region_name='ex_region_name',
botocore_session=ANY
)
self.mock_boto3_session_instance.client.assert_called_once_with('kms')
assert test._regional_clients['ex_region_name'] is self.mock_boto3_client_instance
test_providers_kms_master_key_provider.py 文件源码
项目:aws-encryption-sdk-python
作者: awslabs
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def test_client_valid_region_name(self, mock_add_client):
test = KMSMasterKeyProvider()
test._regional_clients['us-east-1'] = self.mock_boto3_client_instance
client = test._client('arn:aws:kms:us-east-1:222222222222:key/aaaaaaaa-1111-2222-3333-bbbbbbbbbbbb')
mock_add_client.assert_called_once_with('us-east-1')
assert client is self.mock_boto3_client_instance
test_providers_kms_master_key_provider.py 文件源码
项目:aws-encryption-sdk-python
作者: awslabs
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def test_client_no_region_name_with_default(self, mock_add_client):
test = KMSMasterKeyProvider()
test.default_region = sentinel.default_region
test._regional_clients[sentinel.default_region] = sentinel.default_client
client = test._client('')
assert client is sentinel.default_client
mock_add_client.assert_called_once_with(sentinel.default_region)
def add_regional_client(self, region_name):
"""Adds a regional client for the specified region if it does not already exist.
:param str region_name: AWS Region ID (ex: us-east-1)
"""
if region_name not in self._regional_clients:
self._regional_clients[region_name] = boto3.session.Session(
region_name=region_name,
botocore_session=self.config.botocore_session
).client('kms')
def _new_master_key(self, key_id):
"""Returns a KMSMasterKey for the specified key_id.
:param bytes key_id: KMS CMK ID
:returns: KMS Master Key based on key_id
:rtype: aws_encryption_sdk.key_providers.kms.KMSMasterKey
:raises InvalidKeyIdError: if key_id is not a valid KMS CMK ID to which this key provider has access
"""
_key_id = to_str(key_id) # KMS client requires str, not bytes
return KMSMasterKey(config=KMSMasterKeyConfig(
key_id=key_id,
client=self._client(_key_id)
))
def __init__(self, **kwargs): # pylint: disable=unused-argument
"""Performs transformations needed for KMS."""
self._key_id = to_str(self.key_id) # KMS client requires str, not bytes
self.config.client.meta.config.user_agent_extra = extend_user_agent_suffix(
user_agent=self.config.client.meta.config.user_agent_extra,
suffix=USER_AGENT_SUFFIX
)
def _generate_data_key(self, algorithm, encryption_context=None):
"""Generates data key and returns plaintext and ciphertext of key.
:param algorithm: Algorithm on which to base data key
:type algorithm: aws_encryption_sdk.identifiers.Algorithm
:param dict encryption_context: Encryption context to pass to KMS
:returns: Generated data key
:rtype: aws_encryption_sdk.structures.DataKey
"""
kms_params = {
'KeyId': self._key_id,
'NumberOfBytes': algorithm.kdf_input_len
}
if encryption_context is not None:
kms_params['EncryptionContext'] = encryption_context
if self.config.grant_tokens:
kms_params['GrantTokens'] = self.config.grant_tokens
# Catch any boto3 errors and normalize to expected EncryptKeyError
try:
response = self.config.client.generate_data_key(**kms_params)
plaintext = response['Plaintext']
ciphertext = response['CiphertextBlob']
key_id = response['KeyId']
except (ClientError, KeyError):
error_message = 'Master Key {key_id} unable to generate data key'.format(key_id=self._key_id)
_LOGGER.exception(error_message)
raise GenerateKeyError(error_message)
return DataKey(
key_provider=MasterKeyInfo(
provider_id=self.provider_id,
key_info=key_id
),
data_key=plaintext,
encrypted_data_key=ciphertext
)
def _decrypt_data_key(self, encrypted_data_key, algorithm, encryption_context=None):
"""Decrypts an encrypted data key and returns the plaintext.
:param data_key: Encrypted data key
:type data_key: aws_encryption_sdk.structures.EncryptedDataKey
:type algorithm: `aws_encryption_sdk.identifiers.Algorithm` (not used for KMS)
:param dict encryption_context: Encryption context to use in decryption
:returns: Decrypted data key
:rtype: aws_encryption_sdk.structures.DataKey
:raises DecryptKeyError: if Master Key is unable to decrypt data key
"""
kms_params = {
'CiphertextBlob': encrypted_data_key.encrypted_data_key
}
if encryption_context:
kms_params['EncryptionContext'] = encryption_context
if self.config.grant_tokens:
kms_params['GrantTokens'] = self.config.grant_tokens
# Catch any boto3 errors and normalize to expected DecryptKeyError
try:
response = self.config.client.decrypt(**kms_params)
plaintext = response['Plaintext']
except (ClientError, KeyError):
error_message = 'Master Key {key_id} unable to decrypt data key'.format(key_id=self._key_id)
_LOGGER.exception(error_message)
raise DecryptKeyError(error_message)
return DataKey(
key_provider=self.key_provider,
data_key=plaintext,
encrypted_data_key=encrypted_data_key.encrypted_data_key
)