python类patch()的实例源码

sell_states_test.py 文件源码 项目:bitcoin-trading-system 作者: vinicius-ronconi 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def setUp(self):
        client = mock.MagicMock()
        bootstrap = factory.TrailingOrdersFactory().make_fake_bootstrap(self.INITIAL_SETUP)

        logging_patch = mock.patch('trading_system.systems.trailing_orders.system.logging')
        self.addCleanup(logging_patch.stop)
        logging_patch.start()

        self.system = TrailingOrders(client, bootstrap)
        self.system.get_pending_orders = mock.MagicMock(return_value=[])

        self.set_next_operation = mock.MagicMock()
        next_operation_patcher = mock.patch(
            'trading_system.systems.trailing_orders.system.TrailingOrders.set_next_operation', self.set_next_operation
        )
        self.addCleanup(next_operation_patcher.stop)
        next_operation_patcher.start()
test_cli.py 文件源码 项目:skipper 作者: Stratoscale 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_push(self, skipper_runner_run_mock, requests_get_mock):
        skipper_runner_run_mock.side_effect = [0, 0]
        push_params = ['my_image']
        with mock.patch('requests.Response', autospec=True) as requests_response_class_mock:
            requests_response_mock = requests_response_class_mock.return_value
            requests_response_mock.json.return_value = {
                'name': 'my_image',
                'tags': ['latest', 'aaaaaaa', 'bbbbbbb']
            }
            requests_get_mock.return_value = requests_response_mock

        self._invoke_cli(
            global_params=self.global_params,
            subcmd='push',
            subcmd_params=push_params
        )
        expected_commands = [
            mock.call(['docker', 'tag', 'my_image:1234567', 'registry.io:5000/my_image:1234567']),
            mock.call(['docker', 'push', 'registry.io:5000/my_image:1234567']),
            mock.call(['docker', 'rmi', 'registry.io:5000/my_image:1234567']),
        ]
        skipper_runner_run_mock.assert_has_calls(expected_commands)
test_cli.py 文件源码 项目:skipper 作者: Stratoscale 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_push_already_in_registry(self, skipper_runner_run_mock, requests_get_mock):
        skipper_runner_run_mock.side_effect = [0, 0]
        push_params = ['my_image']
        with mock.patch('requests.Response', autospec=True) as requests_response_class_mock:
            requests_response_mock = requests_response_class_mock.return_value
            requests_response_mock.json.return_value = {
                'name': 'my_image',
                'tags': ['latest', 'aaaaaaa', 'bbbbbbb', "1234567"]
            }
            requests_get_mock.return_value = requests_response_mock

        self._invoke_cli(
            global_params=self.global_params,
            subcmd='push',
            subcmd_params=push_params
        )
        expected_commands = [
            mock.call(['docker', 'tag', 'my_image:1234567', 'registry.io:5000/my_image:1234567']),
            mock.call(['docker', 'rmi', 'registry.io:5000/my_image:1234567']),
        ]
        skipper_runner_run_mock.assert_has_calls(expected_commands)
test_cli.py 文件源码 项目:skipper 作者: Stratoscale 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_push_already_in_registry_with_force(self, skipper_runner_run_mock, requests_get_mock):
        skipper_runner_run_mock.side_effect = [0, 0]
        push_params = ['my_image', "--force"]
        with mock.patch('requests.Response', autospec=True) as requests_response_class_mock:
            requests_response_mock = requests_response_class_mock.return_value
            requests_response_mock.json.return_value = {
                'name': 'my_image',
                'tags': ['latest', 'aaaaaaa', 'bbbbbbb']
            }
            requests_get_mock.return_value = requests_response_mock

        self._invoke_cli(
            global_params=self.global_params,
            subcmd='push',
            subcmd_params=push_params
        )
        expected_commands = [
            mock.call(['docker', 'tag', 'my_image:1234567', 'registry.io:5000/my_image:1234567']),
            mock.call(['docker', 'push', 'registry.io:5000/my_image:1234567']),
            mock.call(['docker', 'rmi', 'registry.io:5000/my_image:1234567']),
        ]
        skipper_runner_run_mock.assert_has_calls(expected_commands)
test_cli.py 文件源码 项目:skipper 作者: Stratoscale 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_push_fail(self, skipper_runner_run_mock, requests_get_mock):
        skipper_runner_run_mock.side_effect = [0, 1]
        push_params = ['my_image']
        with mock.patch('requests.Response', autospec=True) as requests_response_class_mock:
            requests_response_mock = requests_response_class_mock.return_value
            requests_response_mock.json.return_value = {
                'name': 'my_image',
                'tags': ['latest', 'aaaaaaa', 'bbbbbbb']
            }
            requests_get_mock.return_value = requests_response_mock

        result = self._invoke_cli(
            global_params=self.global_params,
            subcmd='push',
            subcmd_params=push_params
        )
        self.assertEqual(result.exit_code, 1)
        expected_commands = [
            mock.call(['docker', 'tag', 'my_image:1234567', 'registry.io:5000/my_image:1234567']),
            mock.call(['docker', 'push', 'registry.io:5000/my_image:1234567']),
        ]
        skipper_runner_run_mock.assert_has_calls(expected_commands)
test_cli.py 文件源码 项目:skipper 作者: Stratoscale 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_push_rmi_fail(self, skipper_runner_run_mock, requests_get_mock):
        skipper_runner_run_mock.side_effect = [0, 0, 1]
        push_params = ['my_image']
        with mock.patch('requests.Response', autospec=True) as requests_response_class_mock:
            requests_response_mock = requests_response_class_mock.return_value
            requests_response_mock.json.return_value = {
                'name': 'my_image',
                'tags': ['latest', 'aaaaaaa', 'bbbbbbb']
            }
            requests_get_mock.return_value = requests_response_mock

        result = self._invoke_cli(
            global_params=self.global_params,
            subcmd='push',
            subcmd_params=push_params
        )
        self.assertEqual(result.exit_code, 0)
        expected_commands = [
            mock.call(['docker', 'tag', 'my_image:1234567', 'registry.io:5000/my_image:1234567']),
            mock.call(['docker', 'push', 'registry.io:5000/my_image:1234567']),
            mock.call(['docker', 'rmi', 'registry.io:5000/my_image:1234567']),
        ]
        skipper_runner_run_mock.assert_has_calls(expected_commands)
test_cli.py 文件源码 项目:skipper 作者: Stratoscale 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def test_push_with_defaults_from_config_file(self, skipper_runner_run_mock, requests_get_mock):
        skipper_runner_run_mock.side_effect = [0, 0]
        push_params = ['my_image']
        with mock.patch('requests.Response', autospec=True) as requests_response_class_mock:
            requests_response_mock = requests_response_class_mock.return_value
            requests_response_mock.json.return_value = {
                'name': 'my_image',
                'tags': ['latest', 'aaaaaaa', 'bbbbbbb']
            }
            requests_get_mock.return_value = requests_response_mock

        self._invoke_cli(
            defaults=config.load_defaults(),
            subcmd='push',
            subcmd_params=push_params
        )
        expected_commands = [
            mock.call(['docker', 'tag', 'my_image:1234567', 'registry.io:5000/my_image:1234567']),
            mock.call(['docker', 'push', 'registry.io:5000/my_image:1234567']),
            mock.call(['docker', 'rmi', 'registry.io:5000/my_image:1234567']),
        ]
        skipper_runner_run_mock.assert_has_calls(expected_commands)
test_connections.py 文件源码 项目:umapi-client.py 作者: adobe-apiplatform 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_get_retry_logging(log_stream):
    with mock.patch("umapi_client.connection.requests.Session.get") as mock_get:
        mock_get.return_value = MockResponse(429, headers={"Retry-After": "3"})
        stream, logger = log_stream
        params = dict(mock_connection_params)
        params["logger"] = logger
        conn = Connection(**params)
        pytest.raises(UnavailableError, conn.make_call, "")
        stream.flush()
        log = stream.getvalue()  # save as a local so can do pytest -l to see exact log
        assert log == """UMAPI timeout...service unavailable (code 429 on try 1)
Next retry in 3 seconds...
UMAPI timeout...service unavailable (code 429 on try 2)
Next retry in 3 seconds...
UMAPI timeout...service unavailable (code 429 on try 3)
UMAPI timeout...giving up after 3 attempts (6 seconds).
"""


# log_stream fixture defined in conftest.py
test_connections.py 文件源码 项目:umapi-client.py 作者: adobe-apiplatform 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_post_retry_logging(log_stream):
    with mock.patch("umapi_client.connection.requests.Session.post") as mock_post:
        mock_post.return_value = MockResponse(429, headers={"Retry-After": "3"})
        stream, logger = log_stream
        params = dict(mock_connection_params)
        params["logger"] = logger
        conn = Connection(**params)
        pytest.raises(UnavailableError, conn.make_call, "", [3, 5])
        stream.flush()
        log = stream.getvalue()  # save as a local so can do pytest -l to see exact log
        assert log == """UMAPI timeout...service unavailable (code 429 on try 1)
Next retry in 3 seconds...
UMAPI timeout...service unavailable (code 429 on try 2)
Next retry in 3 seconds...
UMAPI timeout...service unavailable (code 429 on try 3)
UMAPI timeout...giving up after 3 attempts (6 seconds).
"""
test_os_vif_plug_noop.py 文件源码 项目:kuryr-kubernetes 作者: openstack 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def test_plug(self, mock_plug):
        plg = extension.Extension(name="noop",
                                  entry_point="os-vif",
                                  plugin=NoOpPlugin,
                                  obj=None)
        with mock.patch('stevedore.extension.ExtensionManager.names',
                        return_value=['foobar']),\
                mock.patch('stevedore.extension.ExtensionManager.__getitem__',
                           return_value=plg):
            os_vif.initialize()
            info = mock.sentinel.info
            vif = mock.MagicMock()
            vif.plugin_name = 'noop'
            os_vif.plug(vif, info)
            mock_plug.assert_called_once_with(vif, info)
test_os_vif_plug_noop.py 文件源码 项目:kuryr-kubernetes 作者: openstack 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def test_unplug(self, mock_unplug):
        plg = extension.Extension(name="demo",
                                  entry_point="os-vif",
                                  plugin=NoOpPlugin,
                                  obj=None)
        with mock.patch('stevedore.extension.ExtensionManager.names',
                        return_value=['foobar']),\
                mock.patch('stevedore.extension.ExtensionManager.__getitem__',
                           return_value=plg):
            os_vif.initialize()
            info = mock.sentinel.info
            vif = mock.MagicMock()
            vif.plugin_name = 'noop'
            os_vif.unplug(vif, info)
            mock_unplug.assert_called_once_with(vif, info)
test_pipeline.py 文件源码 项目:kuryr-kubernetes 作者: openstack 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_wrap_consumer(self, m_retry_type, m_logging_type):
        consumer = mock.sentinel.consumer
        retry_handler = mock.sentinel.retry_handler
        logging_handler = mock.sentinel.logging_handler
        m_retry_type.return_value = retry_handler
        m_logging_type.return_value = logging_handler
        thread_group = mock.sentinel.thread_group

        with mock.patch.object(h_dis.EventPipeline, '__init__'):
            pipeline = h_pipeline.ControllerPipeline(thread_group)
            ret = pipeline._wrap_consumer(consumer)

        self.assertEqual(logging_handler, ret)
        m_logging_type.assert_called_with(retry_handler)
        m_retry_type.assert_called_with(consumer, exceptions=mock.ANY)
test_k8s_client.py 文件源码 项目:kuryr-kubernetes 作者: openstack 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_annotate_diff_resource_vers_no_conflict(self, m_patch, m_count):
        m_count.return_value = list(range(1, 5))
        path = '/test'
        annotations = {'a1': 'v1', 'a2': 'v2'}
        resource_version = "123"
        new_resource_version = "456"
        conflicting_obj = {'metadata': {
            'annotations': annotations,
            'resourceVersion': resource_version}}
        good_obj = {'metadata': {
            'annotations': annotations,
            'resourceVersion': new_resource_version}}
        conflicting_data = jsonutils.dumps(conflicting_obj, sort_keys=True)
        good_data = jsonutils.dumps(good_obj, sort_keys=True)

        m_resp_conflict = mock.MagicMock()
        m_resp_conflict.ok = False
        m_resp_conflict.status_code = requests.codes.conflict
        m_resp_good = mock.MagicMock()
        m_resp_good.ok = True
        m_resp_good.json.return_value = conflicting_obj
        m_patch.side_effect = [m_resp_conflict, m_resp_good]

        with mock.patch.object(self.client, 'get') as m_get:
            m_get.return_value = good_obj
            self.assertEqual(annotations, self.client.annotate(
                path, annotations, resource_version=resource_version))

        m_patch.assert_has_calls([
            mock.call(self.base_url + path,
                      data=conflicting_data,
                      headers=mock.ANY,
                      cert=(None, None), verify=False),
            mock.call(self.base_url + path,
                      data=good_data,
                      headers=mock.ANY,
                      cert=(None, None), verify=False)])
test_k8s_client.py 文件源码 项目:kuryr-kubernetes 作者: openstack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_annotate_diff_resource_vers_no_annotation(self, m_patch, m_count):
        m_count.return_value = list(range(1, 5))
        path = '/test'
        annotations = {'a1': 'v1', 'a2': 'v2'}
        annotating_resource_version = '123'
        annotating_obj = {'metadata': {
            'annotations': annotations,
            'resourceVersion': annotating_resource_version}}
        annotating_data = jsonutils.dumps(annotating_obj, sort_keys=True)

        new_resource_version = '456'
        new_obj = {'metadata': {
            'resourceVersion': new_resource_version}}

        resolution_obj = annotating_obj.copy()
        resolution_obj['metadata']['resourceVersion'] = new_resource_version
        resolution_data = jsonutils.dumps(resolution_obj, sort_keys=True)

        m_resp_conflict = mock.MagicMock()
        m_resp_conflict.ok = False
        m_resp_conflict.status_code = requests.codes.conflict
        m_resp_good = mock.MagicMock()
        m_resp_good.ok = True
        m_resp_good.json.return_value = resolution_obj
        m_patch.side_effect = (m_resp_conflict, m_resp_good)

        with mock.patch.object(self.client, 'get') as m_get:
            m_get.return_value = new_obj
            self.assertEqual(annotations, self.client.annotate(
                path, annotations,
                resource_version=annotating_resource_version))

        m_patch.assert_has_calls([
            mock.call(self.base_url + path,
                      data=annotating_data,
                      headers=mock.ANY,
                      cert=(None, None), verify=False),
            mock.call(self.base_url + path,
                      data=resolution_data,
                      headers=mock.ANY,
                      cert=(None, None), verify=False)])
test_k8s_client.py 文件源码 项目:kuryr-kubernetes 作者: openstack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_annotate_diff_resource_vers_conflict(self, m_patch, m_count):
        m_count.return_value = list(range(1, 5))
        path = '/test'
        annotations = {'a1': 'v1', 'a2': 'v2'}
        resource_version = "123"
        new_resource_version = "456"
        conflicting_obj = {'metadata': {
            'annotations': annotations,
            'resourceVersion': resource_version}}
        actual_obj = {'metadata': {
            'annotations': {'a1': 'v2'},
            'resourceVersion': new_resource_version}}
        conflicting_data = jsonutils.dumps(conflicting_obj, sort_keys=True)

        m_resp_conflict = mock.MagicMock()
        m_resp_conflict.ok = False
        m_resp_conflict.status_code = requests.codes.conflict
        m_patch.return_value = m_resp_conflict

        with mock.patch.object(self.client, 'get') as m_get:
            m_get.return_value = actual_obj
            self.assertRaises(exc.K8sClientException,
                              self.client.annotate,
                              path, annotations,
                              resource_version=resource_version)
        m_patch.assert_called_once_with(self.base_url + path,
                                        data=conflicting_data,
                                        headers=mock.ANY,
                                        cert=(None, None), verify=False)
test_asynchronous.py 文件源码 项目:kuryr-kubernetes 作者: openstack 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def test_run(self, m_count):
        event = mock.sentinel.event
        group = mock.sentinel.group
        m_queue = mock.Mock()
        m_queue.empty.return_value = True
        m_queue.get.return_value = event
        m_handler = mock.Mock()
        m_count.return_value = [1]
        async_handler = h_async.Async(m_handler, mock.Mock(), mock.Mock(),
                                      queue_depth=1)

        with mock.patch('time.sleep'):
            async_handler._run(group, m_queue)

        m_handler.assert_called_once_with(event)
test_asynchronous.py 文件源码 项目:kuryr-kubernetes 作者: openstack 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def test_run_empty(self, m_count):
        events = [mock.sentinel.event1, mock.sentinel.event2]
        group = mock.sentinel.group
        m_queue = mock.Mock()
        m_queue.empty.return_value = True
        m_queue.get.side_effect = events + [six_queue.Empty()]
        m_handler = mock.Mock()
        m_count.return_value = list(range(5))
        async_handler = h_async.Async(m_handler, mock.Mock(), mock.Mock())

        with mock.patch('time.sleep'):
            async_handler._run(group, m_queue)

        m_handler.assert_has_calls([mock.call(event) for event in events])
        self.assertEqual(len(events), m_handler.call_count)
test_deploy.py 文件源码 项目:ironic-staging-drivers 作者: openstack 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test__prepare_variables_configdrive_file(self, mem_req_mock):
        i_info = self.node.instance_info
        i_info['configdrive'] = 'fake-content'
        self.node.instance_info = i_info
        self.node.save()
        self.config(tempdir='/path/to/tmpfiles')
        expected = {"image": {"url": "http://image",
                              "validate_certs": "yes",
                              "source": "fake-image",
                              "mem_req": 2000,
                              "disk_format": "qcow2",
                              "checksum": "md5:checksum"},
                    'configdrive': {'type': 'file',
                                    'location': '/path/to/tmpfiles/%s.cndrive'
                                    % self.node.uuid}}
        with mock.patch.object(ansible_deploy, 'open', mock.mock_open(),
                               create=True) as open_mock:
            with task_manager.acquire(self.context, self.node.uuid) as task:
                self.assertEqual(expected,
                                 ansible_deploy._prepare_variables(task))
            open_mock.assert_has_calls((
                mock.call('/path/to/tmpfiles/%s.cndrive' % self.node.uuid,
                          'w'),
                mock.call().__enter__(),
                mock.call().write('fake-content'),
                mock.call().__exit__(None, None, None)))


问题


面经


文章

微信
公众号

扫码关注公众号