python类ANY的实例源码

test_iam_create.py 文件源码 项目:foremast 作者: gogoair 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_iam_role_policy(resource_action, get_template, get_properties, get_details, construct_policy, session,
                         attach_profile_to_role):
    """IAM Role Policy should match deployment type."""
    get_properties.return_value = {'type': 'ec2'}
    get_details.return_value.iam.return_value = {'group': 1, 'policy': 2, 'profile': 3, 'role': 4, 'user': 5}

    assert create_iam_resources()

    get_template.assert_called_with(EC2_TEMPLATE_NAME)
    calls = [
        mock.call(
            mock.ANY,
            action='create_role',
            log_format=mock.ANY,
            RoleName=mock.ANY,
            AssumeRolePolicyDocument=get_template.return_value)
    ]
    resource_action.assert_has_calls(calls)
test_asyncio_kernel.py 文件源码 项目:backend.ai-client-py 作者: lablup 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_create_kernel_url():
    mock_resp = asynctest.MagicMock(spec=aiohttp.ClientResponse)
    mock_resp.status = 201
    mock_resp.json = asynctest.MagicMock()

    mock_req_obj = asynctest.MagicMock(spec=Request)
    mock_req_obj.asend.return_value = mock_resp

    with asynctest.patch('ai.backend.client.kernel.Request',
                         return_value=mock_req_obj) as mock_req_cls:
        await Kernel.get_or_create('python')

        mock_req_cls.assert_called_once_with(
            'POST', '/kernel/create', mock.ANY)
        mock_req_obj.asend.assert_called_once_with()
        mock_req_obj.asend.return_value.json.assert_called_once_with()
TestScar.py 文件源码 项目:scar 作者: grycap 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_run_event_source(self, mock_aws_client):
        mock_aws_client.return_value.get_s3_file_list.return_value = ['test_file_1','test_file_2','test_file_3','test_file_4']
        args = Args()
        args.verbose = False
        args.event_source = 'test_bucket'
        Scar().run(args)        
        self.assertEqual(mock_aws_client.call_count, 1)
        # check_function_name_not_exists
        mock_aws_client.mock_calls[1].assert_called_with('test-name', False)
        # get_s3_file_list
        mock_aws_client.mock_calls[2].assert_called_with('test_bucket')
        # launch_request_response_event
        mock_aws_client.mock_calls[3].assert_called_with('test_file_1', {'Records': [{'eventSource': 'aws:s3', 's3': {'bucket': {'name': 'test_bucket'}, 'object': {'key': ''}}}]}, ANY, ANY)
        # launch_async_event
        mock_aws_client.mock_calls[4].assert_called_with('test_file_2', {'Records': [{'eventSource': 'aws:s3', 's3': {'bucket': {'name': 'test_bucket'}, 'object': {'key': ''}}}]}, ANY, ANY)
        # launch_async_event
        mock_aws_client.mock_calls[5].assert_called_with('test_file_3', {'Records': [{'eventSource': 'aws:s3', 's3': {'bucket': {'name': 'test_bucket'}, 'object': {'key': ''}}}]}, ANY, ANY)
        # launch_async_event
        mock_aws_client.mock_calls[6].assert_called_with('test_file_4', {'Records': [{'eventSource': 'aws:s3', 's3': {'bucket': {'name': 'test_bucket'}, 'object': {'key': ''}}}]}, ANY, ANY)
test_events.py 文件源码 项目:annotated-py-asyncio 作者: hhstore 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_callback_with_exception(self):
        def callback():
            raise ValueError()

        self.loop = mock.Mock()
        self.loop.call_exception_handler = mock.Mock()

        h = asyncio.Handle(callback, (), self.loop)
        h._run()

        self.loop.call_exception_handler.assert_called_with({
            'message': test_utils.MockPattern('Exception in callback.*'),
            'exception': mock.ANY,
            'handle': h,
            'source_traceback': h._source_traceback,
        })
worker_tests.py 文件源码 项目:easy-job 作者: inb-co 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_rabbitmq_worker_callback_method_with_invalid_serialization_method(self, getLogger, import_string):
        assert isinstance(import_string, mock.MagicMock)
        # Arrange
        queue_name = "queue_name"
        rabbitmq_configs = {}
        serialization_method = "invalid"
        result_backend = {
            "result_backend_class": "easy_job.result_backends.dummy.DummyBackend",
        }
        logger = getLogger.return_value = mock.MagicMock()
        # Act
        rbt = RabbitMQWorker(queue_name=queue_name,
                             rabbitmq_configs=rabbitmq_configs,
                             serialization_method=serialization_method,
                             result_backend=result_backend,
                             logger="log")
        rbt.callback(mock.MagicMock(), mock.MagicMock(), None, "")

        # Assert

        logger.log.assert_called_once_with(logging.ERROR, mock.ANY)
test_views.py 文件源码 项目:directory-ui-buyer 作者: uktrade 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_case_study_create_api_success(
    mock_create_case_study, supplier_case_study_end_to_end, sso_user,
    all_case_study_data, api_response_200
):
    mock_create_case_study.return_value = api_response_200

    response = supplier_case_study_end_to_end()
    assert response.status_code == http.client.FOUND
    assert response.get('Location') == reverse('company-detail')
    data = {
        **all_case_study_data,
        'image_one': ANY, 'image_two': ANY, 'image_three': ANY,
    }
    # django converts uploaded files to UploadedFile, which makes
    # `assert_called_once_with` tricky.

    assert mock_create_case_study.call_count == 1
    assert mock_create_case_study.call_args == call(
        data=data,
        sso_session_id=sso_user.session_id,
    )
test_views.py 文件源码 项目:directory-ui-buyer 作者: uktrade 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_case_study_update_api_success(
    mock_update_case_study, supplier_case_study_end_to_end, sso_user,
    all_case_study_data, api_response_200
):
    mock_update_case_study.return_value = api_response_200

    response = supplier_case_study_end_to_end(case_study_id='1')

    assert response.status_code == http.client.FOUND
    assert response.get('Location') == reverse('company-detail')
    # django converts uploaded files to UploadedFile, which makes
    # `assert_called_once_with` tricky.
    data = {
        **all_case_study_data,
        'image_one': ANY, 'image_two': ANY, 'image_three': ANY,
    }
    mock_update_case_study.assert_called_once_with(
        data=data,
        case_study_id='1',
        sso_session_id=sso_user.session_id,
    )
test_utils.py 文件源码 项目:python-shaarli-client 作者: shaarli 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_generate_endpoint_parser_noparam(addargument):
    """Generate a parser from endpoint metadata - no params"""
    name = 'put-stuff'
    metadata = {
        'path': 'stuff',
        'method': 'PUT',
        'help': "Changes stuff",
        'params': {},
    }
    parser = ArgumentParser()
    subparsers = parser.add_subparsers()

    generate_endpoint_parser(subparsers, name, metadata)

    addargument.assert_has_calls([
        # first helper for the main parser
        mock.call('-h', '--help', action='help',
                  default=mock.ANY, help=mock.ANY),

        # second helper for the 'put-stuff' subparser
        mock.call('-h', '--help', action='help',
                  default=mock.ANY, help=mock.ANY)
    ])
test_ecs_operator.py 文件源码 项目:incubator-airflow-old 作者: apache 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_execute_without_failures(self, check_mock, wait_mock):

        client_mock = self.aws_hook_mock.return_value.get_client_type.return_value
        client_mock.run_task.return_value = RESPONSE_WITHOUT_FAILURES

        self.ecs.execute(None)

        self.aws_hook_mock.return_value.get_client_type.assert_called_once_with('ecs', region_name='eu-west-1')
        client_mock.run_task.assert_called_once_with(
            cluster='c',
            overrides={},
            startedBy=mock.ANY,  # Can by 'airflow' or 'Airflow'
            taskDefinition='t'
        )

        wait_mock.assert_called_once_with()
        check_mock.assert_called_once_with()
        self.assertEqual(self.ecs.arn, 'arn:aws:ecs:us-east-1:012345678910:task/d8c67b3c-ac87-4ffe-a847-4785bc3a8b55')
test_dataflow_operator.py 文件源码 项目:incubator-airflow-old 作者: apache 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_exec(self, gcs_hook, dataflow_mock):
        """Test DataFlowHook is created and the right args are passed to
        start_python_workflow.

        """
        start_python_hook = dataflow_mock.return_value.start_python_dataflow
        gcs_download_hook = gcs_hook.return_value.google_cloud_to_local
        self.dataflow.execute(None)
        self.assertTrue(dataflow_mock.called)
        expected_options = {
            'project': 'test',
            'staging_location': 'gs://test/staging',
            'output': 'gs://test/output'
        }
        gcs_download_hook.assert_called_once_with(PY_FILE)
        start_python_hook.assert_called_once_with(TASK_ID, expected_options,
                                                  mock.ANY, PY_OPTIONS)
        self.assertTrue(self.dataflow.py_file.startswith('/tmp/dataflow'))
test_stack_ref.py 文件源码 项目:aiotk 作者: AndreLouisCaron 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_exit_stack_exception_propagate():

    h1 = mock.MagicMock()
    h2 = mock.MagicMock()
    v1 = mock.MagicMock()
    v2 = mock.MagicMock()
    error = ValueError('FUUU')

    with pytest.raises(ValueError) as exc:
        with ExitStack() as stack:
            v = stack.enter_context(AutoClose(h1, v=v1))
            assert v is v1
            v = stack.enter_context(AutoClose(h2, v=v2))
            assert v is v2
            raise error
    assert exc.value is error

    h2.close.assert_called_once_with(ValueError, error, mock.ANY)
    h1.close.assert_called_once_with(ValueError, error, mock.ANY)
test_stack_sync.py 文件源码 项目:aiotk 作者: AndreLouisCaron 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_exit_stack_exception_propagate():

    h1 = mock.MagicMock()
    h2 = mock.MagicMock()
    v1 = mock.MagicMock()
    v2 = mock.MagicMock()
    error = ValueError('FUUU')

    with pytest.raises(ValueError) as exc:
        async with AsyncExitStack() as stack:
            v = await stack.enter_context(AutoClose(h1, v=v1))
            assert v is v1
            v = await stack.enter_context(AutoClose(h2, v=v2))
            assert v is v2
            raise error
    assert exc.value is error

    h2.close.assert_called_once_with(ValueError, error, mock.ANY)
    h1.close.assert_called_once_with(ValueError, error, mock.ANY)
test_stack_async.py 文件源码 项目:aiotk 作者: AndreLouisCaron 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_exit_stack_exception_propagate():

    h1 = mock.MagicMock()
    h2 = mock.MagicMock()
    v1 = mock.MagicMock()
    v2 = mock.MagicMock()
    error = ValueError('FUUU')

    with pytest.raises(ValueError) as exc:
        async with AsyncExitStack() as stack:
            v = await stack.enter_context(AutoClose(h1, v=v1))
            assert v is v1
            v = await stack.enter_context(AutoClose(h2, v=v2))
            assert v is v2
            raise error
    assert exc.value is error

    h2.close.assert_called_once_with(ValueError, error, mock.ANY)
    h1.close.assert_called_once_with(ValueError, error, mock.ANY)
test_region.py 文件源码 项目:meta-maas 作者: maas 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_Region_sync_custom_image_already_synced(tmpdir):
    """Test Region.sync_custom performs create and handles when its already
    synced."""
    region = make_Region()
    image_path = tmpdir.join("image.tar.gz")
    image_path.write(b"data")
    region.sync_custom("image", {
        "path": str(image_path),
        "architecture": "amd64/generic",
        "title": "My Title",
    })
    assert call(
        "custom/image", "amd64/generic", ANY,
        title="My Title", filetype=BootResourceFileType.TGZ,
        progress_callback=ANY) == region.origin.BootResources.create.call_args
    assert (
        call("custom/image already in sync", level=MessageLevel.SUCCESS) ==
        region.print_msg.call_args)
fileio_tests.py 文件源码 项目:GulpIO 作者: TwentyBN 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_ingest(self,
                    mock_process_pool,
                    mock_chunk_writer,
                    ):

        # The next three lines mock the ProcessPoolExecutor and it's map
        # function.
        executor_mock = mock.Mock()
        executor_mock.map.return_value = []
        mock_process_pool.return_value.__enter__.return_value = executor_mock
        self.gulp_ingestor.adapter.__len__.return_value = 2
        self.gulp_ingestor()
        mock_chunk_writer.assert_called_once_with(self.adapter)

        executor_mock.map.assert_called_once_with(
            mock_chunk_writer.return_value.write_chunk,
            mock.ANY,
            [slice(0, 1), slice(1, 2)],
        )
extension_test.py 文件源码 项目:flask-openapi 作者: remcohaszing 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_swagger_minimal(app):
    """
    Test a swagger config for a minimal setup.

    """
    app.config['OPENAPI_INFO_VERSION'] = '1.2.3'
    openapi = OpenAPI(app)
    assert openapi.swagger == {
        'swagger': '2.0',
        'info': {
            'title': 'test_swagger_minimal',
            'version': '1.2.3'
        },
        'paths': ANY,
        'schemes': ['http'],
    }
extension_test.py 文件源码 项目:flask-openapi 作者: remcohaszing 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_swagger_full(app):
    """
    Test a swagger config for a fully configured setup.

    """
    app.config.update(
        SERVER_NAME='api.example.com',
        OPENAPI_SHOW_HOST=True,
        OPENAPI_INFO_VERSION='1.2.3'
    )
    openapi = OpenAPI(app)
    assert openapi.swagger == {
        'swagger': '2.0',
        'info': {
            'title': 'test_swagger_full',
            'version': '1.2.3'
        },
        'paths': ANY,
        'host': 'api.example.com',
        'schemes': ['http']
    }
test_pipeline.py 文件源码 项目:slurm-pipeline 作者: acorg 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def testCwdWithRelativeScriptPath(self, isdirMock, existsMock, accessMock,
                                      subprocessMock):
        """
        If a step has a cwd set and its script is a relative path, the path of
        the executed script that is executed must be as specified (not
        converted to an absolute path).
        """
        subprocessMock.return_value = ''

        sp = SlurmPipeline(
            {
                'steps': [
                    {
                        'cwd': '/tmp',
                        'name': 'name1',
                        'script': 'script1',
                    },
                ],
            })
        sp.schedule()

        subprocessMock.assert_has_calls([
            call(['script1'], cwd='/tmp', universal_newlines=True,
                 stdin=DEVNULL, env=ANY),
        ])
test_pipeline.py 文件源码 项目:slurm-pipeline 作者: acorg 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def testForce(self, existsMock, accessMock, subprocessMock):
        """
        If force=True is given to SlurmPipeline, SP_FORCE must be set to '1'
        in the step execution environment.
        """
        subprocessMock.return_value = ''

        sp = SlurmPipeline(
            {
                'steps': [
                    {
                        'name': 'name1',
                        'script': 'script1',
                    },
                ],
            })
        sp.schedule(force=True)

        subprocessMock.assert_has_calls([
            call(['script1'], cwd='.', universal_newlines=True,
                 stdin=DEVNULL, env=ANY),
        ])

        env = subprocessMock.mock_calls[0][2]['env']
        self.assertEqual('1', env['SP_FORCE'])
test_pipeline.py 文件源码 项目:slurm-pipeline 作者: acorg 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def testDefaultNice(self, existsMock, accessMock, subprocessMock):
        """
        If no nice value is given to schedule, SP_NICE_ARG must be set to
        '--nice' in the step execution environment.
        """
        subprocessMock.return_value = ''

        sp = SlurmPipeline(
            {
                'steps': [
                    {
                        'name': 'name1',
                        'script': 'script1',
                    },
                ],
            })
        sp.schedule()

        subprocessMock.assert_has_calls([
            call(['script1'], cwd='.', universal_newlines=True,
                 stdin=DEVNULL, env=ANY),
        ])

        env = subprocessMock.mock_calls[0][2]['env']
        self.assertEqual('--nice', env['SP_NICE_ARG'])
test_pipeline.py 文件源码 项目:slurm-pipeline 作者: acorg 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def testSpecificNice(self, existsMock, accessMock, subprocessMock):
        """
        If a specific nice value is given to schedule, SP_NICE_ARG must be set
        to the expected value in the step execution environment.
        """
        subprocessMock.return_value = ''

        sp = SlurmPipeline(
            {
                'steps': [
                    {
                        'name': 'name1',
                        'script': 'script1',
                    },
                ],
            })
        sp.schedule(nice=40)

        subprocessMock.assert_has_calls([
            call(['script1'], cwd='.', universal_newlines=True,
                 stdin=DEVNULL, env=ANY),
        ])

        env = subprocessMock.mock_calls[0][2]['env']
        self.assertEqual('--nice 40', env['SP_NICE_ARG'])
test_circuit.py 文件源码 项目:quantumsim 作者: brianzi 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_simple(self):
        sdm = MagicMock()
        sdm.classical = {"A": 0, "B": 1}
        sdm.apply_ptm = MagicMock()

        c = circuit.Circuit()

        c.add_gate("hadamard", "A", time=0, conditional_bit="B")

        c.apply_to(sdm)

        sdm.apply_ptm.assert_called_once_with("A", ptm=ANY)
        sdm.ensure_classical.assert_called_once_with("B")

        sdm = MagicMock()
        sdm.classical = {"A": 0, "B": 0}
        sdm.hadamard = MagicMock()

        c.apply_to(sdm)

        sdm.apply_ptm.assert_not_called()
        sdm.ensure_classical.assert_called_once_with("B")
test_util_ssh.py 文件源码 项目:commissaire 作者: projectatomic 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_temporarysshkey_remove_failure(self):
        """
        Verify TemporarySSHKey.remove reacts properly to failure.
        """
        mock_logger = mock.MagicMock(logging.Logger('test'))
        key = TemporarySSHKey(TEST_HOST_CREDS, mock_logger)
        key.create()
        with mock.patch('os.unlink') as _unlink:
            _unlink.side_effect = Exception
            self.assertTrue(os.path.isfile(key.path))
            key.remove()
            self.assertTrue(os.path.isfile(key.path))
            # We should have a warning in the log
            mock_logger.warn.assert_called_once_with(
                mock.ANY, mock.ANY, mock.ANY)
        # Clean up the file
        key.remove()
test_views.py 文件源码 项目:flowcelltool 作者: bihealth 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_render(self):
        """Test that the flow cell delete POST works"""
        # Check precondition
        self.assertEqual(FlowCell.objects.all().count(), 1)

        # Simulate the POST
        with self.login(self.user):
            response = self.client.post(
                reverse('flowcell_delete', kwargs={'pk': self.flow_cell.pk}))

        # Check resulting database state
        self.assertEqual(FlowCell.objects.all().count(), 0)

        # Check call to sending emails
        self.email_mock.assert_called_once_with(self.user, ANY)
        m1 = model_to_dict(self.arg_flowcell)
        del m1['id']
        m2 = model_to_dict(self.flow_cell)
        del m2['id']
        self.assertEqual(m1, m2)

        # Check resulting response
        with self.login(self.user):
            self.assertRedirects(
                response, reverse('flowcell_list'))
test_scene.py 文件源码 项目:satpy 作者: pytroll 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_create_reader_instances_with_filenames(self):
        import satpy.scene
        filenames = ["bla", "foo", "bar"]
        sensors = None
        reader_name = None
        with mock.patch('satpy.scene.Scene._compute_metadata_from_readers'):
            with mock.patch('satpy.scene.ReaderFinder') as findermock:
                scene = satpy.scene.Scene(filenames=filenames)
                findermock.assert_called_once_with(ppp_config_dir=mock.ANY,
                                                   base_dir=None,
                                                   area=None,
                                                   end_time=None,
                                                   start_time=None)
                findermock.return_value.assert_called_once_with(
                    reader=reader_name,
                    sensor=set(),
                    filenames=filenames,
                    reader_kwargs=None,
                    metadata={}
                )
test_scene.py 文件源码 项目:satpy 作者: pytroll 项目源码 文件源码 阅读 72 收藏 0 点赞 0 评论 0
def test_create_reader_instances_with_sensor(self):
        import satpy.scene
        sensors = ["bla", "foo", "bar"]
        filenames = None
        reader_name = None
        with mock.patch('satpy.scene.Scene._compute_metadata_from_readers'):
            with mock.patch('satpy.scene.ReaderFinder') as findermock:
                scene = satpy.scene.Scene(sensor=sensors)
                findermock.assert_called_once_with(ppp_config_dir=mock.ANY,
                                                   base_dir=None,
                                                   area=None,
                                                   end_time=None,
                                                   start_time=None)
                findermock.return_value.assert_called_once_with(
                    reader=reader_name,
                    sensor=sensors,
                    filenames=filenames,
                    reader_kwargs=None,
                    metadata={}
                )
test_scene.py 文件源码 项目:satpy 作者: pytroll 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_create_reader_instances_with_sensor_and_filenames(self):
        import satpy.scene
        sensors = ["bla", "foo", "bar"]
        filenames = ["1", "2", "3"]
        reader_name = None
        with mock.patch('satpy.scene.Scene._compute_metadata_from_readers'):
            with mock.patch('satpy.scene.ReaderFinder') as findermock:
                scene = satpy.scene.Scene(sensor=sensors, filenames=filenames)
                findermock.assert_called_once_with(ppp_config_dir=mock.ANY,
                                                   base_dir=None,
                                                   area=None,
                                                   end_time=None,
                                                   start_time=None)
                findermock.return_value.assert_called_once_with(
                    reader=reader_name,
                    sensor=sensors,
                    filenames=filenames,
                    reader_kwargs=None,
                    metadata={}
                )
test_scene.py 文件源码 项目:satpy 作者: pytroll 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_create_reader_instances_with_reader(self):
        from satpy.scene import Scene
        reader = "foo"
        filenames = ["1", "2", "3"]
        sensors = set()
        with mock.patch('satpy.scene.Scene._compute_metadata_from_readers'):
            with mock.patch('satpy.scene.ReaderFinder') as findermock:
                scene = Scene(reader=reader, filenames=filenames)
                findermock.assert_called_once_with(ppp_config_dir=mock.ANY,
                                                   base_dir=None,
                                                   area=None,
                                                   end_time=None,
                                                   start_time=None)
                findermock.return_value.assert_called_once_with(
                    reader=reader,
                    sensor=sensors,
                    filenames=filenames,
                    reader_kwargs=None,
                    metadata={}
                )
test_handlers_clusters.py 文件源码 项目:commissaire-http 作者: projectatomic 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_create_cluster_with_valid_network(self):
        """
        Verify create_cluster uses valid networks as expected.
        """
        bus = mock.MagicMock()
        cluster = Cluster.new(name='test', network='test')
        # The cluster doesn't exist yet
        bus.storage.get_cluster.side_effect = Exception
        # Network response
        bus.storage.get_network.return_value = Network.new(name='test')
        # Creation of the cluster
        bus.storage.save.return_value = cluster

        # Call the handler...
        clusters.create_cluster.handler(copy.deepcopy(NETWORK_CLUSTER_REQUEST), bus)

        bus.storage.save.assert_called_with(mock.ANY)
test_handlers_clusters.py 文件源码 项目:commissaire-http 作者: projectatomic 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_create_cluster_with_invalid_network(self):
        """
        Verify create_cluster reacts to invalid networks as expected.
        """
        bus = mock.MagicMock()
        cluster = Cluster.new(name='test', network='test')
        # The cluster doesn't exist yet
        bus.storage.get_cluster.side_effect = Exception
        # The network doesn't exist
        bus.storage.get_network.side_effect = Exception
        # The cluster creation
        bus.storage.save.return_value = cluster

        # Call the handler...
        clusters.create_cluster.handler(copy.deepcopy(NETWORK_CLUSTER_REQUEST), bus)
        # Update clusters network to be 'default' as we expect 'test' to be
        # rejected by the handler
        cluster.network = 'default'
        bus.storage.save.assert_called_with(mock.ANY)


问题


面经


文章

微信
公众号

扫码关注公众号