python类DEFAULT的实例源码

test_process_services.py 文件源码 项目:Eskapade 作者: KaveIO 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_init(self, mock_get_env_var, mock_get_dir_path):
        """Test initialization of config object"""

        # set return values of project utility functions
        mock_get_env_var.side_effect = lambda *a, **k: ':0.0' if a and a[0] == 'display' else mock.DEFAULT
        mock_get_dir_path.side_effect = lambda *a, **k: 'es_path' if a and a[0] == 'es_root' else mock.DEFAULT

        # create mock config object
        mock_config_object = mock.MagicMock(name='ConfigObject_instance')
        settings = {}
        mock_config_object.__getitem__ = lambda s, k: settings.__getitem__(k)
        mock_config_object.__setitem__ = lambda s, k, v: settings.__setitem__(k, v)
        mock_config_object.get = settings.get

        # call init method with mock variables
        CONFIG_VARS.update([('sec1', ['var1']), ('sec2', ['var2', 'var3'])])
        CONFIG_DEFAULTS.update(var1='foo', var3=42)
        ConfigObject.__init__(mock_config_object)

        # check values of settings variables
        exp_settings = dict(var1='foo', var2=None, var3=42, batchMode=False, esRoot='es_path',
                            resultsDir='es_path/results', dataDir='es_path/data', macrosDir='es_path/tutorials',
                            templatesDir='es_path/templates', configDir='es_path/config')
        self.assertDictEqual(settings, exp_settings, 'unexpected resulting settings dictionary')
test_heroku.py 文件源码 项目:Dallinger 作者: Dallinger 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_sends_notification_if_resubmitted(self, run_check):
        # Include whimsical set to True to avoid error in the False code branch:
        config = {
            'duration': 1.0,
            'host': 'fakehost.herokuapp.com',
            'whimsical': False
        }
        fake_assignment = mock.Mock(AssignmentStatus='Submitted')
        mturk = mock.Mock(**{'get_assignment.return_value': [fake_assignment]})
        participants = [self.a.participant()]
        session = None
        # Move the clock forward so assignment is overdue:
        reference_time = datetime.datetime.now() + datetime.timedelta(hours=6)
        mock_messager = mock.Mock(spec=NullHITMessager)
        with mock.patch.multiple('dallinger.heroku.clock',
                                 requests=mock.DEFAULT,
                                 NullHITMessager=mock.DEFAULT) as mocks:
            mocks['NullHITMessager'].return_value = mock_messager
            run_check(config, mturk, participants, session, reference_time)
            mock_messager.send_resubmitted_msg.assert_called()
test_heroku.py 文件源码 项目:Dallinger 作者: Dallinger 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_no_assignement_on_mturk_sends_hit_cancelled_message(self, run_check):
        # Include whimsical set to True to avoid error in the False code branch:
        config = {
            'duration': 1.0,
            'host': 'fakehost.herokuapp.com',
            'whimsical': False
        }
        mturk = mock.Mock(**{'get_assignment.return_value': []})
        participants = [self.a.participant()]
        session = None
        # Move the clock forward so assignment is overdue:
        reference_time = datetime.datetime.now() + datetime.timedelta(hours=6)
        mock_messager = mock.Mock(spec=NullHITMessager)
        with mock.patch.multiple('dallinger.heroku.clock',
                                 requests=mock.DEFAULT,
                                 NullHITMessager=mock.DEFAULT) as mocks:
            mocks['NullHITMessager'].return_value = mock_messager
            run_check(config, mturk, participants, session, reference_time)
            mock_messager.send_hit_cancelled_msg.assert_called()
test_api_callable.py 文件源码 项目:gax-python 作者: googleapis 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_retry(self, mock_exc_to_code, mock_time):
        mock_exc_to_code.side_effect = lambda e: e.code
        to_attempt = 3
        retry = RetryOptions(
            [_FAKE_STATUS_CODE_1],
            BackoffSettings(0, 0, 0, 0, 0, 0, 1))

        # Succeeds on the to_attempt'th call, and never again afterward
        mock_call = mock.Mock()
        mock_call.side_effect = ([CustomException('', _FAKE_STATUS_CODE_1)] *
                                 (to_attempt - 1) + [mock.DEFAULT])
        mock_call.return_value = 1729
        mock_time.return_value = 0
        settings = _CallSettings(timeout=0, retry=retry)
        my_callable = api_callable.create_api_call(mock_call, settings)
        self.assertEqual(my_callable(None), 1729)
        self.assertEqual(mock_call.call_count, to_attempt)
test_retry.py 文件源码 项目:gax-python 作者: googleapis 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_retryable_without_timeout(self, mock_time, mock_exc_to_code):
        mock_time.return_value = 0
        mock_exc_to_code.side_effect = lambda e: e.code

        to_attempt = 3
        mock_call = mock.Mock()
        mock_call.side_effect = ([CustomException('', _FAKE_STATUS_CODE_1)] *
                                 (to_attempt - 1) + [mock.DEFAULT])
        mock_call.return_value = 1729

        retry_options = RetryOptions(
            [_FAKE_STATUS_CODE_1],
            BackoffSettings(0, 0, 0, None, None, None, None))

        my_callable = retry.retryable(mock_call, retry_options)

        self.assertEqual(my_callable(None), 1729)
        self.assertEqual(to_attempt, mock_call.call_count)
test_retry.py 文件源码 项目:gax-python 作者: googleapis 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_retryable_with_timeout(self, mock_time, mock_exc_to_code):
        mock_time.return_value = 1
        mock_exc_to_code.side_effect = lambda e: e.code

        mock_call = mock.Mock()
        mock_call.side_effect = [CustomException('', _FAKE_STATUS_CODE_1),
                                 mock.DEFAULT]
        mock_call.return_value = 1729

        retry_options = RetryOptions(
            [_FAKE_STATUS_CODE_1],
            BackoffSettings(0, 0, 0, 0, 0, 0, 0))

        my_callable = retry.retryable(mock_call, retry_options)

        self.assertRaises(errors.RetryError, my_callable)
        self.assertEqual(0, mock_call.call_count)
test_retry.py 文件源码 项目:gax-python 作者: googleapis 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_retryable_when_no_codes(self, mock_time, mock_exc_to_code):
        mock_time.return_value = 0
        mock_exc_to_code.side_effect = lambda e: e.code

        mock_call = mock.Mock()
        mock_call.side_effect = [CustomException('', _FAKE_STATUS_CODE_1),
                                 mock.DEFAULT]
        mock_call.return_value = 1729

        retry_options = RetryOptions(
            [],
            BackoffSettings(0, 0, 0, 0, 0, 0, 1))

        my_callable = retry.retryable(mock_call, retry_options)

        try:
            my_callable(None)
            self.fail('Should not have been reached')
        except errors.RetryError as exc:
            self.assertIsInstance(exc.cause, CustomException)

        self.assertEqual(1, mock_call.call_count)
test_retry.py 文件源码 项目:gax-python 作者: googleapis 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_retryable_aborts_on_unexpected_exception(
            self, mock_time, mock_exc_to_code):
        mock_time.return_value = 0
        mock_exc_to_code.side_effect = lambda e: e.code

        mock_call = mock.Mock()
        mock_call.side_effect = [CustomException('', _FAKE_STATUS_CODE_2),
                                 mock.DEFAULT]
        mock_call.return_value = 1729

        retry_options = RetryOptions(
            [_FAKE_STATUS_CODE_1],
            BackoffSettings(0, 0, 0, 0, 0, 0, 1))

        my_callable = retry.retryable(mock_call, retry_options)

        try:
            my_callable(None)
            self.fail('Should not have been reached')
        except errors.RetryError as exc:
            self.assertIsInstance(exc.cause, CustomException)

        self.assertEqual(1, mock_call.call_count)
test_ceph_hooks.py 文件源码 项目:charm-ceph-mon 作者: openstack 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_nrpe_dependency_installed(self, mock_config):
        config = copy.deepcopy(CHARM_CONFIG)
        mock_config.side_effect = lambda key: config[key]
        with patch.multiple(ceph_hooks,
                            apt_install=DEFAULT,
                            rsync=DEFAULT,
                            log=DEFAULT,
                            write_file=DEFAULT,
                            nrpe=DEFAULT) as mocks:
            ceph_hooks.update_nrpe_config()
        mocks["apt_install"].assert_called_once_with(
            ["python-dbus", "lockfile-progs"])
test_wangbase.py 文件源码 项目:DiscourseSenser 作者: WladimirSidorenko 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_train_1(self):
        j = 2
        ret_train = np.zeros((6, 3, N_CLASSES))
        ret_dev = np.zeros((6, 3, N_CLASSES))
        func_ret = np.zeros((1, N_CLASSES))
        func_ret[0, j] = 1.
        with patch.multiple(self.wb,
                            _gs=True,
                            _generate_ts=lambda *x: (self.TRAIN_X, Y),
                            _extract_features=MagicMock(
                                return_value=FEATS),
                            _model=MOCK_DEFAULT):
            with patch("dsenser.wang.wangbase.GridSearchCV"):
                self.wb._model.decision_function = \
                    MagicMock(return_value=func_ret)
                self.wb._model.classes_ = CLASSES_
                self.wb.train(([(0, REL1)], [PARSE1]),
                              ([(0, REL1)], [PARSE1]),
                              1, 1, ret_train, ret_dev)
test_ceph_hooks.py 文件源码 项目:charm-ceph 作者: openstack 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_upgrade_charm_with_nrpe_relation_installs_dependencies(self):
        with patch.multiple(
                ceph_hooks,
                apt_install=DEFAULT,
                rsync=DEFAULT,
                log=DEFAULT,
                write_file=DEFAULT,
                nrpe=DEFAULT,
                emit_cephconf=DEFAULT,
                mon_relation_joined=DEFAULT,
                is_relation_made=DEFAULT) as mocks, patch(
                    "charmhelpers.contrib.hardening.harden.config"):
            mocks["is_relation_made"].return_value = True
            ceph_hooks.upgrade_charm()
        mocks["apt_install"].assert_called_with(
            ["python-dbus", "lockfile-progs"])
runtime_test.py 文件源码 项目:treadmill 作者: Morgan-Stanley 项目源码 文件源码 阅读 41 收藏 0 点赞 0 评论 0
def test__allocate_sockets(self):
        """Test allocating sockets.
        """
        # access protected module _allocate_sockets
        # pylint: disable=w0212

        socket.socket.bind.side_effect = [
            socket.error(errno.EADDRINUSE, 'In use'),
            mock.DEFAULT,
            mock.DEFAULT,
            mock.DEFAULT
        ]

        sockets = treadmill.runtime._allocate_sockets(
            'prod', '0.0.0.0', socket.SOCK_STREAM, 3
        )

        self.assertEqual(3, len(sockets))
mock_.py 文件源码 项目:asynq 作者: quora 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def patch(
        target, new=mock.DEFAULT, spec=None, create=False,
        mocksignature=False, spec_set=None, autospec=False,
        new_callable=None, **kwargs
):
    """Mocks an async function.

    Should be a drop-in replacement for mock.patch that handles async automatically. The .asynq
    attribute is automatically created and shouldn't be used when accessing data on
    the mock.

    """
    getter, attribute = _get_target(target)
    return _make_patch_async(
        getter, attribute, new, spec, create, mocksignature, spec_set, autospec, new_callable,
        kwargs
    )
test_luks.py 文件源码 项目:Trusted-Platform-Module-nova 作者: BU-NU-CLOUD-SP16 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_attach_volume_fail(self, mock_execute):
        self.encryptor._get_key = mock.MagicMock()
        self.encryptor._get_key.return_value = \
                test_cryptsetup.fake__get_key(None)

        mock_execute.side_effect = [
                processutils.ProcessExecutionError(exit_code=1),  # luksOpen
                mock.DEFAULT,  # isLuks
        ]

        self.assertRaises(processutils.ProcessExecutionError,
                          self.encryptor.attach_volume, None)

        mock_execute.assert_has_calls([
            mock.call('cryptsetup', 'luksOpen', '--key-file=-', self.dev_path,
                      self.dev_name, process_input='0' * 32,
                      run_as_root=True, check_exit_code=True),
            mock.call('cryptsetup', 'isLuks', '--verbose', self.dev_path,
                      run_as_root=True, check_exit_code=True),
        ], any_order=False)
        self.assertEqual(2, mock_execute.call_count)
test_deploy.py 文件源码 项目:ironic-staging-drivers 作者: openstack 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_deploy_no_callback(self, power_mock, get_ip_mock):
        self.config(group='ansible', use_ramdisk_callback=False)
        with mock.patch.multiple(self.driver,
                                 _ansible_deploy=mock.DEFAULT,
                                 reboot_to_instance=mock.DEFAULT) as moks:
            with task_manager.acquire(
                    self.context, self.node['uuid'], shared=False) as task:
                driver_return = self.driver.deploy(task)
                self.assertEqual(driver_return, states.DEPLOYDONE)
                power_mock.assert_called_once_with(task, states.REBOOT)
                get_ip_mock.assert_called_once_with(task)
                moks['_ansible_deploy'].assert_called_once_with(task,
                                                                '127.0.0.1')
                moks['reboot_to_instance'].assert_called_once_with(task)
test_deploy.py 文件源码 项目:ironic-staging-drivers 作者: openstack 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_continue_deploy(self, getip_mock):
        self.node.provision_state = states.DEPLOYWAIT
        self.node.target_provision_state = states.ACTIVE
        self.node.save()
        with task_manager.acquire(self.context, self.node.uuid) as task:
            with mock.patch.multiple(self.driver, autospec=True,
                                     _ansible_deploy=mock.DEFAULT,
                                     reboot_to_instance=mock.DEFAULT):
                self.driver.continue_deploy(task)
                getip_mock.assert_called_once_with(task)
                self.driver._ansible_deploy.assert_called_once_with(
                    task, '1.2.3.4')
                self.driver.reboot_to_instance.assert_called_once_with(task)
            self.assertEqual(states.ACTIVE, task.node.target_provision_state)
            self.assertEqual(states.DEPLOYING, task.node.provision_state)
test_activities.py 文件源码 项目:heaviside 作者: jhuapl-boss 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_run(self, mCreateSession, mSample):
        mSample.return_value = 'XXX'
        iSession = MockSession()
        mCreateSession.return_value = (iSession, '123456')
        client = iSession.client('stepfunctions')
        client.list_activities.return_value = {
            'activities':[{
                'name': 'name',
                'activityArn': 'XXX'
            }]
        }
        client.get_activity_task.return_value = {
            'taskToken': 'YYY',
            'input': '{}'
        }

        target = mock.MagicMock()

        activity = ActivityMixin(handle_task = target)

        def stop_loop(*args, **kwargs):
            activity.polling = False
            return mock.DEFAULT
        target.side_effect = stop_loop

        activity.run('name')

        calls = [
            mock.call.list_activities(),
            mock.call.get_activity_task(activityArn = 'XXX',
                                        workerName = 'name-XXX')
        ]
        self.assertEqual(client.mock_calls, calls)

        calls = [
            mock.call('YYY', {}),
            mock.call().start()
        ]
        self.assertEqual(target.mock_calls, calls)
base.py 文件源码 项目:deckhand 作者: att-comdev 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def patchobject(self, target, attribute, new=mock.DEFAULT, autospec=True):
        """Convenient wrapper around `mock.patch.object`

        Returns a started mock that will be automatically stopped after the
        test ran.
        """

        p = mock.patch.object(target, attribute, new, autospec=autospec)
        m = p.start()
        self.addCleanup(p.stop)
        return m
test_heroku.py 文件源码 项目:Dallinger 作者: Dallinger 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_check_db_for_missing_notifications_assembles_resources(self, run_check):
        # Can't import until after config is loaded:
        from dallinger.heroku.clock import check_db_for_missing_notifications
        with mock.patch.multiple('dallinger.heroku.clock',
                                 run_check=mock.DEFAULT,
                                 MTurkConnection=mock.DEFAULT) as mocks:
            mocks['MTurkConnection'].return_value = 'fake connection'
            check_db_for_missing_notifications()

            mocks['run_check'].assert_called()
test_recruiters.py 文件源码 项目:Dallinger 作者: Dallinger 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def recruiter(self):
        from dallinger.recruiters import BotRecruiter
        with mock.patch.multiple('dallinger.recruiters',
                                 _get_queue=mock.DEFAULT,
                                 get_base_url=mock.DEFAULT) as mocks:
            mocks['get_base_url'].return_value = 'fake_base_url'
            r = BotRecruiter()
            r._get_bot_factory = mock.Mock()
            yield r
test_recruiters.py 文件源码 项目:Dallinger 作者: Dallinger 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def recruiter(self, active_config):
        from dallinger.mturk import MTurkService
        from dallinger.recruiters import MTurkRecruiter
        with mock.patch.multiple('dallinger.recruiters',
                                 os=mock.DEFAULT,
                                 get_base_url=mock.DEFAULT) as mocks:
            mocks['get_base_url'].return_value = 'http://fake-domain'
            mocks['os'].getenv.return_value = 'fake-host-domain'
            mockservice = mock.create_autospec(MTurkService)
            active_config.extend({'mode': u'sandbox'})
            r = MTurkRecruiter()
            r.mturkservice = mockservice('fake key', 'fake secret')
            r.mturkservice.check_credentials.return_value = True
            r.mturkservice.create_hit.return_value = {'type_id': 'fake type id'}
            return r
test_command_line.py 文件源码 项目:Dallinger 作者: Dallinger 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def faster(tempdir):
    with mock.patch.multiple('dallinger.command_line',
                             time=mock.DEFAULT,
                             setup_experiment=mock.DEFAULT) as mocks:
        mocks['setup_experiment'].return_value = ('fake-uid', tempdir)

        yield mocks
testsentinel.py 文件源码 项目:RPoint 作者: george17-meet 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def testDEFAULT(self):
        self.assertIs(DEFAULT, sentinel.DEFAULT)
test_ceph_hooks.py 文件源码 项目:charm-ceph-mon 作者: openstack 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_upgrade_charm_with_nrpe_relation_installs_dependencies(
            self,
            mock_config):
        config = copy.deepcopy(CHARM_CONFIG)
        mock_config.side_effect = lambda key: config[key]
        with patch.multiple(
                ceph_hooks,
                apt_install=DEFAULT,
                rsync=DEFAULT,
                log=DEFAULT,
                write_file=DEFAULT,
                nrpe=DEFAULT,
                emit_cephconf=DEFAULT,
                mon_relation_joined=DEFAULT,
                is_relation_made=DEFAULT) as mocks, patch(
                    "charmhelpers.contrib.hardening.harden.config"):
            mocks["is_relation_made"].return_value = True
            ceph_hooks.upgrade_charm()
        mocks["apt_install"].assert_called_with(
            ["python-dbus", "lockfile-progs"])
test_mainloop.py 文件源码 项目:supvisors 作者: julien6387 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_send_request(self):
        """ Test the execution of a deferred Supervisor request. """
        from supvisors.mainloop import SupvisorsMainLoop
        from supvisors.utils import DeferredRequestHeaders
        main_loop = SupvisorsMainLoop(self.supvisors)
        # patch main loop subscriber
        with patch.multiple(main_loop, check_address=DEFAULT,
            start_process=DEFAULT, stop_process=DEFAULT,
            restart=DEFAULT, shutdown=DEFAULT) as mocked_loop:
            # test check address
            self.check_call(main_loop, mocked_loop, 'check_address',
                            DeferredRequestHeaders.CHECK_ADDRESS,
                            ('10.0.0.2', ))
            # test start process
            self.check_call(main_loop, mocked_loop, 'start_process',
                            DeferredRequestHeaders.START_PROCESS,
                            ('10.0.0.2', 'dummy_process', 'extra args'))
            # test stop process
            self.check_call(main_loop, mocked_loop, 'stop_process',
                            DeferredRequestHeaders.STOP_PROCESS,
                            ('10.0.0.2', 'dummy_process'))
            # test restart
            self.check_call(main_loop, mocked_loop, 'restart',
                            DeferredRequestHeaders.RESTART,
                            ('10.0.0.2', ))
            # test shutdown
            self.check_call(main_loop, mocked_loop, 'shutdown',
                            DeferredRequestHeaders.SHUTDOWN,
                            ('10.0.0.2', ))
test_listener.py 文件源码 项目:supvisors 作者: julien6387 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_on_remote_event(self):
        """ Test the reception of a Supervisor remote comm event. """
        from supvisors.listener import SupervisorListener
        listener = SupervisorListener(self.supvisors)
        # add patches for what is tested just above
        with patch.multiple(listener, unstack_event=DEFAULT,
                unstack_info=DEFAULT, authorization=DEFAULT):
            # test unknown type
            event = Mock(type='unknown', data='')
            listener.on_remote_event(event)
            self.assertFalse(listener.unstack_event.called)
            self.assertFalse(listener.unstack_info.called)
            self.assertFalse(listener.authorization.called)
            # test event
            event = Mock(type='event', data={'state': 'RUNNING'})
            listener.on_remote_event(event)
            self.assertEqual([call({'state': 'RUNNING'})],
                listener.unstack_event.call_args_list)
            self.assertFalse(listener.unstack_info.called)
            self.assertFalse(listener.authorization.called)
            listener.unstack_event.reset_mock()
            # test info
            event = Mock(type='info', data={'name': 'dummy_process'})
            listener.on_remote_event(event)
            self.assertFalse(listener.unstack_event.called)
            self.assertEqual([call({'name': 'dummy_process'})],
                listener.unstack_info.call_args_list)
            self.assertFalse(listener.authorization.called)
            listener.unstack_info.reset_mock()
            # test authorization
            event = Mock(type='auth', data=('10.0.0.1', True))
            listener.on_remote_event(event)
            self.assertFalse(listener.unstack_event.called)
            self.assertFalse(listener.unstack_info.called)
            self.assertEqual([call(('10.0.0.1', True))],
                listener.authorization.call_args_list)
test_redirector.py 文件源码 项目:vault-redirector-twisted 作者: manheim 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def test_run(self):
        self.cls.reactor = Mock(spec_set=reactor)
        with patch.multiple(
            pbm,
            logger=DEFAULT,
            Site=DEFAULT,
            LoopingCall=DEFAULT,
            VaultRedirectorSite=DEFAULT
        ) as mod_mocks:
            with patch.multiple(
                pb,
                get_active_node=DEFAULT,
                run_reactor=DEFAULT,
                listentcp=DEFAULT,
                add_update_loop=DEFAULT,
                listentls=DEFAULT
            ) as cls_mocks:
                cls_mocks['get_active_node'].return_value = 'consul:1234'
                self.cls.run()
        assert self.cls.active_node_ip_port == 'consul:1234'
        assert mod_mocks['logger'].mock_calls == [
            call.warning('Initial Vault active node: %s', 'consul:1234'),
            call.warning('Starting Twisted reactor (event loop)')
        ]
        assert mod_mocks['VaultRedirectorSite'].mock_calls == [call(self.cls)]
        assert mod_mocks['Site'].mock_calls == [
            call(mod_mocks['VaultRedirectorSite'].return_value)
        ]
        assert self.cls.reactor.mock_calls == []
        assert cls_mocks['run_reactor'].mock_calls == [call()]
        assert mod_mocks['LoopingCall'].mock_calls == []
        assert cls_mocks['listentcp'].mock_calls == [
            call(mod_mocks['Site'].return_value)
        ]
        assert cls_mocks['add_update_loop'].mock_calls == [call()]
        assert cls_mocks['listentls'].mock_calls == []
test_redirector.py 文件源码 项目:vault-redirector-twisted 作者: manheim 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_run_tls(self):
        self.cls.reactor = Mock(spec_set=reactor)
        self.cls.tls_factory = Mock()
        with patch.multiple(
            pbm,
            logger=DEFAULT,
            Site=DEFAULT,
            LoopingCall=DEFAULT,
            VaultRedirectorSite=DEFAULT
        ) as mod_mocks:
            with patch.multiple(
                pb,
                get_active_node=DEFAULT,
                run_reactor=DEFAULT,
                listentcp=DEFAULT,
                add_update_loop=DEFAULT,
                listentls=DEFAULT
            ) as cls_mocks:
                cls_mocks['get_active_node'].return_value = 'consul:1234'
                self.cls.run()
        assert self.cls.active_node_ip_port == 'consul:1234'
        assert mod_mocks['logger'].mock_calls == [
            call.warning('Initial Vault active node: %s', 'consul:1234'),
            call.warning('Starting Twisted reactor (event loop)')
        ]
        assert mod_mocks['VaultRedirectorSite'].mock_calls == [call(self.cls)]
        assert mod_mocks['Site'].mock_calls == [
            call(mod_mocks['VaultRedirectorSite'].return_value)
        ]
        assert self.cls.reactor.mock_calls == []
        assert cls_mocks['run_reactor'].mock_calls == [call()]
        assert mod_mocks['LoopingCall'].mock_calls == []
        assert cls_mocks['listentls'].mock_calls == [
            call(mod_mocks['Site'].return_value)
        ]
        assert cls_mocks['add_update_loop'].mock_calls == [call()]
        assert cls_mocks['listentcp'].mock_calls == []
test_redirector.py 文件源码 项目:vault-redirector-twisted 作者: manheim 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def test_run_error(self):
        self.cls.reactor = Mock(spec_set=reactor)
        with patch.multiple(
            pbm,
            logger=DEFAULT,
            Site=DEFAULT,
            LoopingCall=DEFAULT,
            VaultRedirectorSite=DEFAULT
        ) as mod_mocks:
            with patch.multiple(
                pb,
                get_active_node=DEFAULT,
                run_reactor=DEFAULT,
                listentcp=DEFAULT,
                add_update_loop=DEFAULT
            ) as cls_mocks:
                cls_mocks['get_active_node'].return_value = None
                with pytest.raises(SystemExit) as excinfo:
                    self.cls.run()
        assert excinfo.value.code == 3
        assert mod_mocks['logger'].mock_calls == [
            call.critical("ERROR: Could not get active vault node from "
                          "Consul. Exiting.")
        ]
        assert mod_mocks['VaultRedirectorSite'].mock_calls == []
        assert mod_mocks['Site'].mock_calls == []
        assert self.cls.reactor.mock_calls == []
        assert cls_mocks['run_reactor'].mock_calls == []
        assert mod_mocks['LoopingCall'].mock_calls == []
test_cli.py 文件源码 项目:http-prompt 作者: eliangcs 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def run_and_exit(cli_args=None, prompt_commands=None):
    """Run http-prompt executable, execute some prompt commands, and exit."""
    if cli_args is None:
        cli_args = []

        # Make sure last command is 'exit'
    if prompt_commands is None:
        prompt_commands = ['exit']
    else:
        prompt_commands += ['exit']

    # Fool cli() so that it believes we're running from CLI instead of pytest.
    # We will restore it at the end of the function.
    orig_argv = sys.argv
    sys.argv = ['http-prompt'] + cli_args

    try:
        with patch.multiple('http_prompt.cli',
                            prompt=DEFAULT, execute=DEFAULT) as mocks:
            mocks['execute'].side_effect = execute

            # prompt() is mocked to return the command in 'prompt_commands' in
            # sequence, i.e., prompt() returns prompt_commands[i-1] when it is
            # called for the ith time
            mocks['prompt'].side_effect = prompt_commands

            result = CliRunner().invoke(cli, cli_args)
            context = mocks['execute'].call_args[0][1]

        return result, context
    finally:
        sys.argv = orig_argv


问题


面经


文章

微信
公众号

扫码关注公众号