def test_iam_role_policy(resource_action, get_template, get_properties, get_details, construct_policy, session,
attach_profile_to_role):
"""IAM Role Policy should match deployment type."""
get_properties.return_value = {'type': 'ec2'}
get_details.return_value.iam.return_value = {'group': 1, 'policy': 2, 'profile': 3, 'role': 4, 'user': 5}
assert create_iam_resources()
get_template.assert_called_with(EC2_TEMPLATE_NAME)
calls = [
mock.call(
mock.ANY,
action='create_role',
log_format=mock.ANY,
RoleName=mock.ANY,
AssumeRolePolicyDocument=get_template.return_value)
]
resource_action.assert_has_calls(calls)
python类ANY的实例源码
def test_create_kernel_url():
mock_resp = asynctest.MagicMock(spec=aiohttp.ClientResponse)
mock_resp.status = 201
mock_resp.json = asynctest.MagicMock()
mock_req_obj = asynctest.MagicMock(spec=Request)
mock_req_obj.asend.return_value = mock_resp
with asynctest.patch('ai.backend.client.kernel.Request',
return_value=mock_req_obj) as mock_req_cls:
await Kernel.get_or_create('python')
mock_req_cls.assert_called_once_with(
'POST', '/kernel/create', mock.ANY)
mock_req_obj.asend.assert_called_once_with()
mock_req_obj.asend.return_value.json.assert_called_once_with()
def test_run_event_source(self, mock_aws_client):
mock_aws_client.return_value.get_s3_file_list.return_value = ['test_file_1','test_file_2','test_file_3','test_file_4']
args = Args()
args.verbose = False
args.event_source = 'test_bucket'
Scar().run(args)
self.assertEqual(mock_aws_client.call_count, 1)
# check_function_name_not_exists
mock_aws_client.mock_calls[1].assert_called_with('test-name', False)
# get_s3_file_list
mock_aws_client.mock_calls[2].assert_called_with('test_bucket')
# launch_request_response_event
mock_aws_client.mock_calls[3].assert_called_with('test_file_1', {'Records': [{'eventSource': 'aws:s3', 's3': {'bucket': {'name': 'test_bucket'}, 'object': {'key': ''}}}]}, ANY, ANY)
# launch_async_event
mock_aws_client.mock_calls[4].assert_called_with('test_file_2', {'Records': [{'eventSource': 'aws:s3', 's3': {'bucket': {'name': 'test_bucket'}, 'object': {'key': ''}}}]}, ANY, ANY)
# launch_async_event
mock_aws_client.mock_calls[5].assert_called_with('test_file_3', {'Records': [{'eventSource': 'aws:s3', 's3': {'bucket': {'name': 'test_bucket'}, 'object': {'key': ''}}}]}, ANY, ANY)
# launch_async_event
mock_aws_client.mock_calls[6].assert_called_with('test_file_4', {'Records': [{'eventSource': 'aws:s3', 's3': {'bucket': {'name': 'test_bucket'}, 'object': {'key': ''}}}]}, ANY, ANY)
def test_callback_with_exception(self):
def callback():
raise ValueError()
self.loop = mock.Mock()
self.loop.call_exception_handler = mock.Mock()
h = asyncio.Handle(callback, (), self.loop)
h._run()
self.loop.call_exception_handler.assert_called_with({
'message': test_utils.MockPattern('Exception in callback.*'),
'exception': mock.ANY,
'handle': h,
'source_traceback': h._source_traceback,
})
def test_rabbitmq_worker_callback_method_with_invalid_serialization_method(self, getLogger, import_string):
assert isinstance(import_string, mock.MagicMock)
# Arrange
queue_name = "queue_name"
rabbitmq_configs = {}
serialization_method = "invalid"
result_backend = {
"result_backend_class": "easy_job.result_backends.dummy.DummyBackend",
}
logger = getLogger.return_value = mock.MagicMock()
# Act
rbt = RabbitMQWorker(queue_name=queue_name,
rabbitmq_configs=rabbitmq_configs,
serialization_method=serialization_method,
result_backend=result_backend,
logger="log")
rbt.callback(mock.MagicMock(), mock.MagicMock(), None, "")
# Assert
logger.log.assert_called_once_with(logging.ERROR, mock.ANY)
def test_case_study_create_api_success(
mock_create_case_study, supplier_case_study_end_to_end, sso_user,
all_case_study_data, api_response_200
):
mock_create_case_study.return_value = api_response_200
response = supplier_case_study_end_to_end()
assert response.status_code == http.client.FOUND
assert response.get('Location') == reverse('company-detail')
data = {
**all_case_study_data,
'image_one': ANY, 'image_two': ANY, 'image_three': ANY,
}
# django converts uploaded files to UploadedFile, which makes
# `assert_called_once_with` tricky.
assert mock_create_case_study.call_count == 1
assert mock_create_case_study.call_args == call(
data=data,
sso_session_id=sso_user.session_id,
)
def test_case_study_update_api_success(
mock_update_case_study, supplier_case_study_end_to_end, sso_user,
all_case_study_data, api_response_200
):
mock_update_case_study.return_value = api_response_200
response = supplier_case_study_end_to_end(case_study_id='1')
assert response.status_code == http.client.FOUND
assert response.get('Location') == reverse('company-detail')
# django converts uploaded files to UploadedFile, which makes
# `assert_called_once_with` tricky.
data = {
**all_case_study_data,
'image_one': ANY, 'image_two': ANY, 'image_three': ANY,
}
mock_update_case_study.assert_called_once_with(
data=data,
case_study_id='1',
sso_session_id=sso_user.session_id,
)
def test_generate_endpoint_parser_noparam(addargument):
"""Generate a parser from endpoint metadata - no params"""
name = 'put-stuff'
metadata = {
'path': 'stuff',
'method': 'PUT',
'help': "Changes stuff",
'params': {},
}
parser = ArgumentParser()
subparsers = parser.add_subparsers()
generate_endpoint_parser(subparsers, name, metadata)
addargument.assert_has_calls([
# first helper for the main parser
mock.call('-h', '--help', action='help',
default=mock.ANY, help=mock.ANY),
# second helper for the 'put-stuff' subparser
mock.call('-h', '--help', action='help',
default=mock.ANY, help=mock.ANY)
])
def test_execute_without_failures(self, check_mock, wait_mock):
client_mock = self.aws_hook_mock.return_value.get_client_type.return_value
client_mock.run_task.return_value = RESPONSE_WITHOUT_FAILURES
self.ecs.execute(None)
self.aws_hook_mock.return_value.get_client_type.assert_called_once_with('ecs', region_name='eu-west-1')
client_mock.run_task.assert_called_once_with(
cluster='c',
overrides={},
startedBy=mock.ANY, # Can by 'airflow' or 'Airflow'
taskDefinition='t'
)
wait_mock.assert_called_once_with()
check_mock.assert_called_once_with()
self.assertEqual(self.ecs.arn, 'arn:aws:ecs:us-east-1:012345678910:task/d8c67b3c-ac87-4ffe-a847-4785bc3a8b55')
def test_exec(self, gcs_hook, dataflow_mock):
"""Test DataFlowHook is created and the right args are passed to
start_python_workflow.
"""
start_python_hook = dataflow_mock.return_value.start_python_dataflow
gcs_download_hook = gcs_hook.return_value.google_cloud_to_local
self.dataflow.execute(None)
self.assertTrue(dataflow_mock.called)
expected_options = {
'project': 'test',
'staging_location': 'gs://test/staging',
'output': 'gs://test/output'
}
gcs_download_hook.assert_called_once_with(PY_FILE)
start_python_hook.assert_called_once_with(TASK_ID, expected_options,
mock.ANY, PY_OPTIONS)
self.assertTrue(self.dataflow.py_file.startswith('/tmp/dataflow'))
def test_exit_stack_exception_propagate():
h1 = mock.MagicMock()
h2 = mock.MagicMock()
v1 = mock.MagicMock()
v2 = mock.MagicMock()
error = ValueError('FUUU')
with pytest.raises(ValueError) as exc:
with ExitStack() as stack:
v = stack.enter_context(AutoClose(h1, v=v1))
assert v is v1
v = stack.enter_context(AutoClose(h2, v=v2))
assert v is v2
raise error
assert exc.value is error
h2.close.assert_called_once_with(ValueError, error, mock.ANY)
h1.close.assert_called_once_with(ValueError, error, mock.ANY)
def test_exit_stack_exception_propagate():
h1 = mock.MagicMock()
h2 = mock.MagicMock()
v1 = mock.MagicMock()
v2 = mock.MagicMock()
error = ValueError('FUUU')
with pytest.raises(ValueError) as exc:
async with AsyncExitStack() as stack:
v = await stack.enter_context(AutoClose(h1, v=v1))
assert v is v1
v = await stack.enter_context(AutoClose(h2, v=v2))
assert v is v2
raise error
assert exc.value is error
h2.close.assert_called_once_with(ValueError, error, mock.ANY)
h1.close.assert_called_once_with(ValueError, error, mock.ANY)
def test_exit_stack_exception_propagate():
h1 = mock.MagicMock()
h2 = mock.MagicMock()
v1 = mock.MagicMock()
v2 = mock.MagicMock()
error = ValueError('FUUU')
with pytest.raises(ValueError) as exc:
async with AsyncExitStack() as stack:
v = await stack.enter_context(AutoClose(h1, v=v1))
assert v is v1
v = await stack.enter_context(AutoClose(h2, v=v2))
assert v is v2
raise error
assert exc.value is error
h2.close.assert_called_once_with(ValueError, error, mock.ANY)
h1.close.assert_called_once_with(ValueError, error, mock.ANY)
def test_Region_sync_custom_image_already_synced(tmpdir):
"""Test Region.sync_custom performs create and handles when its already
synced."""
region = make_Region()
image_path = tmpdir.join("image.tar.gz")
image_path.write(b"data")
region.sync_custom("image", {
"path": str(image_path),
"architecture": "amd64/generic",
"title": "My Title",
})
assert call(
"custom/image", "amd64/generic", ANY,
title="My Title", filetype=BootResourceFileType.TGZ,
progress_callback=ANY) == region.origin.BootResources.create.call_args
assert (
call("custom/image already in sync", level=MessageLevel.SUCCESS) ==
region.print_msg.call_args)
def test_ingest(self,
mock_process_pool,
mock_chunk_writer,
):
# The next three lines mock the ProcessPoolExecutor and it's map
# function.
executor_mock = mock.Mock()
executor_mock.map.return_value = []
mock_process_pool.return_value.__enter__.return_value = executor_mock
self.gulp_ingestor.adapter.__len__.return_value = 2
self.gulp_ingestor()
mock_chunk_writer.assert_called_once_with(self.adapter)
executor_mock.map.assert_called_once_with(
mock_chunk_writer.return_value.write_chunk,
mock.ANY,
[slice(0, 1), slice(1, 2)],
)
def test_swagger_minimal(app):
"""
Test a swagger config for a minimal setup.
"""
app.config['OPENAPI_INFO_VERSION'] = '1.2.3'
openapi = OpenAPI(app)
assert openapi.swagger == {
'swagger': '2.0',
'info': {
'title': 'test_swagger_minimal',
'version': '1.2.3'
},
'paths': ANY,
'schemes': ['http'],
}
def test_swagger_full(app):
"""
Test a swagger config for a fully configured setup.
"""
app.config.update(
SERVER_NAME='api.example.com',
OPENAPI_SHOW_HOST=True,
OPENAPI_INFO_VERSION='1.2.3'
)
openapi = OpenAPI(app)
assert openapi.swagger == {
'swagger': '2.0',
'info': {
'title': 'test_swagger_full',
'version': '1.2.3'
},
'paths': ANY,
'host': 'api.example.com',
'schemes': ['http']
}
def testCwdWithRelativeScriptPath(self, isdirMock, existsMock, accessMock,
subprocessMock):
"""
If a step has a cwd set and its script is a relative path, the path of
the executed script that is executed must be as specified (not
converted to an absolute path).
"""
subprocessMock.return_value = ''
sp = SlurmPipeline(
{
'steps': [
{
'cwd': '/tmp',
'name': 'name1',
'script': 'script1',
},
],
})
sp.schedule()
subprocessMock.assert_has_calls([
call(['script1'], cwd='/tmp', universal_newlines=True,
stdin=DEVNULL, env=ANY),
])
def testForce(self, existsMock, accessMock, subprocessMock):
"""
If force=True is given to SlurmPipeline, SP_FORCE must be set to '1'
in the step execution environment.
"""
subprocessMock.return_value = ''
sp = SlurmPipeline(
{
'steps': [
{
'name': 'name1',
'script': 'script1',
},
],
})
sp.schedule(force=True)
subprocessMock.assert_has_calls([
call(['script1'], cwd='.', universal_newlines=True,
stdin=DEVNULL, env=ANY),
])
env = subprocessMock.mock_calls[0][2]['env']
self.assertEqual('1', env['SP_FORCE'])
def testDefaultNice(self, existsMock, accessMock, subprocessMock):
"""
If no nice value is given to schedule, SP_NICE_ARG must be set to
'--nice' in the step execution environment.
"""
subprocessMock.return_value = ''
sp = SlurmPipeline(
{
'steps': [
{
'name': 'name1',
'script': 'script1',
},
],
})
sp.schedule()
subprocessMock.assert_has_calls([
call(['script1'], cwd='.', universal_newlines=True,
stdin=DEVNULL, env=ANY),
])
env = subprocessMock.mock_calls[0][2]['env']
self.assertEqual('--nice', env['SP_NICE_ARG'])
def testSpecificNice(self, existsMock, accessMock, subprocessMock):
"""
If a specific nice value is given to schedule, SP_NICE_ARG must be set
to the expected value in the step execution environment.
"""
subprocessMock.return_value = ''
sp = SlurmPipeline(
{
'steps': [
{
'name': 'name1',
'script': 'script1',
},
],
})
sp.schedule(nice=40)
subprocessMock.assert_has_calls([
call(['script1'], cwd='.', universal_newlines=True,
stdin=DEVNULL, env=ANY),
])
env = subprocessMock.mock_calls[0][2]['env']
self.assertEqual('--nice 40', env['SP_NICE_ARG'])
def test_simple(self):
sdm = MagicMock()
sdm.classical = {"A": 0, "B": 1}
sdm.apply_ptm = MagicMock()
c = circuit.Circuit()
c.add_gate("hadamard", "A", time=0, conditional_bit="B")
c.apply_to(sdm)
sdm.apply_ptm.assert_called_once_with("A", ptm=ANY)
sdm.ensure_classical.assert_called_once_with("B")
sdm = MagicMock()
sdm.classical = {"A": 0, "B": 0}
sdm.hadamard = MagicMock()
c.apply_to(sdm)
sdm.apply_ptm.assert_not_called()
sdm.ensure_classical.assert_called_once_with("B")
def test_temporarysshkey_remove_failure(self):
"""
Verify TemporarySSHKey.remove reacts properly to failure.
"""
mock_logger = mock.MagicMock(logging.Logger('test'))
key = TemporarySSHKey(TEST_HOST_CREDS, mock_logger)
key.create()
with mock.patch('os.unlink') as _unlink:
_unlink.side_effect = Exception
self.assertTrue(os.path.isfile(key.path))
key.remove()
self.assertTrue(os.path.isfile(key.path))
# We should have a warning in the log
mock_logger.warn.assert_called_once_with(
mock.ANY, mock.ANY, mock.ANY)
# Clean up the file
key.remove()
def test_render(self):
"""Test that the flow cell delete POST works"""
# Check precondition
self.assertEqual(FlowCell.objects.all().count(), 1)
# Simulate the POST
with self.login(self.user):
response = self.client.post(
reverse('flowcell_delete', kwargs={'pk': self.flow_cell.pk}))
# Check resulting database state
self.assertEqual(FlowCell.objects.all().count(), 0)
# Check call to sending emails
self.email_mock.assert_called_once_with(self.user, ANY)
m1 = model_to_dict(self.arg_flowcell)
del m1['id']
m2 = model_to_dict(self.flow_cell)
del m2['id']
self.assertEqual(m1, m2)
# Check resulting response
with self.login(self.user):
self.assertRedirects(
response, reverse('flowcell_list'))
def test_create_reader_instances_with_filenames(self):
import satpy.scene
filenames = ["bla", "foo", "bar"]
sensors = None
reader_name = None
with mock.patch('satpy.scene.Scene._compute_metadata_from_readers'):
with mock.patch('satpy.scene.ReaderFinder') as findermock:
scene = satpy.scene.Scene(filenames=filenames)
findermock.assert_called_once_with(ppp_config_dir=mock.ANY,
base_dir=None,
area=None,
end_time=None,
start_time=None)
findermock.return_value.assert_called_once_with(
reader=reader_name,
sensor=set(),
filenames=filenames,
reader_kwargs=None,
metadata={}
)
def test_create_reader_instances_with_sensor(self):
import satpy.scene
sensors = ["bla", "foo", "bar"]
filenames = None
reader_name = None
with mock.patch('satpy.scene.Scene._compute_metadata_from_readers'):
with mock.patch('satpy.scene.ReaderFinder') as findermock:
scene = satpy.scene.Scene(sensor=sensors)
findermock.assert_called_once_with(ppp_config_dir=mock.ANY,
base_dir=None,
area=None,
end_time=None,
start_time=None)
findermock.return_value.assert_called_once_with(
reader=reader_name,
sensor=sensors,
filenames=filenames,
reader_kwargs=None,
metadata={}
)
def test_create_reader_instances_with_sensor_and_filenames(self):
import satpy.scene
sensors = ["bla", "foo", "bar"]
filenames = ["1", "2", "3"]
reader_name = None
with mock.patch('satpy.scene.Scene._compute_metadata_from_readers'):
with mock.patch('satpy.scene.ReaderFinder') as findermock:
scene = satpy.scene.Scene(sensor=sensors, filenames=filenames)
findermock.assert_called_once_with(ppp_config_dir=mock.ANY,
base_dir=None,
area=None,
end_time=None,
start_time=None)
findermock.return_value.assert_called_once_with(
reader=reader_name,
sensor=sensors,
filenames=filenames,
reader_kwargs=None,
metadata={}
)
def test_create_reader_instances_with_reader(self):
from satpy.scene import Scene
reader = "foo"
filenames = ["1", "2", "3"]
sensors = set()
with mock.patch('satpy.scene.Scene._compute_metadata_from_readers'):
with mock.patch('satpy.scene.ReaderFinder') as findermock:
scene = Scene(reader=reader, filenames=filenames)
findermock.assert_called_once_with(ppp_config_dir=mock.ANY,
base_dir=None,
area=None,
end_time=None,
start_time=None)
findermock.return_value.assert_called_once_with(
reader=reader,
sensor=sensors,
filenames=filenames,
reader_kwargs=None,
metadata={}
)
def test_create_cluster_with_valid_network(self):
"""
Verify create_cluster uses valid networks as expected.
"""
bus = mock.MagicMock()
cluster = Cluster.new(name='test', network='test')
# The cluster doesn't exist yet
bus.storage.get_cluster.side_effect = Exception
# Network response
bus.storage.get_network.return_value = Network.new(name='test')
# Creation of the cluster
bus.storage.save.return_value = cluster
# Call the handler...
clusters.create_cluster.handler(copy.deepcopy(NETWORK_CLUSTER_REQUEST), bus)
bus.storage.save.assert_called_with(mock.ANY)
def test_create_cluster_with_invalid_network(self):
"""
Verify create_cluster reacts to invalid networks as expected.
"""
bus = mock.MagicMock()
cluster = Cluster.new(name='test', network='test')
# The cluster doesn't exist yet
bus.storage.get_cluster.side_effect = Exception
# The network doesn't exist
bus.storage.get_network.side_effect = Exception
# The cluster creation
bus.storage.save.return_value = cluster
# Call the handler...
clusters.create_cluster.handler(copy.deepcopy(NETWORK_CLUSTER_REQUEST), bus)
# Update clusters network to be 'default' as we expect 'test' to be
# rejected by the handler
cluster.network = 'default'
bus.storage.save.assert_called_with(mock.ANY)