python类LogCapture()的实例源码

test_management.py 文件源码 项目:edx-enterprise 作者: edx 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_transmit_course_metadata_task_no_catalog(self):
        """
        Test the data transmission task with enterprise customer that has no course catalog.
        """
        uuid = str(self.enterprise_customer.uuid)
        course_run_ids = ['course-v1:edX+DemoX+Demo_Course']
        self.mock_ent_courses_api_with_pagination(
            enterprise_uuid=uuid,
            course_run_ids=course_run_ids
        )
        integrated_channel_enterprise = self.sapsf.enterprise_customer
        integrated_channel_enterprise.catalog = None
        integrated_channel_enterprise.save()

        with LogCapture(level=logging.INFO) as log_capture:
            call_command('transmit_course_metadata', '--catalog_user', self.user.username)

            # Because there are no EnterpriseCustomers with a catalog, the process will end early.
            assert not log_capture.records
test_management.py 文件源码 项目:edx-enterprise 作者: edx 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_transmit_learner_data(
            self,
            command_kwargs,
            certificate,
            self_paced,
            end_date,
            passed,
            expected_completion,
    ):
        """
        Test the log output from a successful run of the transmit_learner_data management command,
        using all the ways we can invoke it.
        """
        with transmit_learner_data_context(command_kwargs, certificate, self_paced, end_date, passed) as (args, kwargs):
            with LogCapture(level=logging.INFO) as log_capture:
                expected_output = get_expected_output(**expected_completion)
                call_command('transmit_learner_data', *args, **kwargs)
                for index, message in enumerate(expected_output):
                    assert message in log_capture.records[index].getMessage()
test_watcher.py 文件源码 项目:vpc-router 作者: romana 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def setUp(self):
        self.lc = LogCapture()
        self.lc.setLevel(logging.DEBUG)
        self.lc.addFilter(test_common.MyLogCaptureFilter())

        self.additional_setup()

        self.addCleanup(self.cleanup)

        self.old_handle_spec = vpc.handle_spec

        # Monkey patch the handle_spec function, which is called by the
        # watcher. The handle_spec function is defined in the VPC module.
        # However, it was directly imported by the watcher module, so it's now
        # a copy in the watcher module namespace. Thus, the patch has to be
        # done actually in the watcher module. For safety, we'll do it in both
        # the vpc and watcher module.
        def new_handle_spec(*args, **kwargs):
            pass
        watcher.handle_spec = vpc.handle_spec = new_handle_spec
stop_on_nan_test.py 文件源码 项目:cxflow 作者: Cognexa 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_unsupported(self):
        """Test handling of unsupported types."""

        with self.assertRaises(ValueError):
            StopOnNaN(on_unknown_type='error').after_epoch(epoch_data=StopOnNaNTest._get_data(lambda: 0))

        with self.assertRaises(AssertionError):
            StopOnNaN(on_unknown_type='bad value')

        with LogCapture() as log_capture:
            StopOnNaN(on_unknown_type='warn').after_epoch(epoch_data=StopOnNaNTest._get_data(lambda: 0))

        log_capture.check(
            ('root', 'WARNING', 'Variable `var` of type `<class \'function\'>` can not be checked for NaNs.'),
        )

        StopOnNaN().after_epoch(epoch_data=StopOnNaNTest._get_data(lambda: 0))
log_profile_test.py 文件源码 项目:cxflow 作者: Cognexa 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_missing_train(self):
        """Test KeyError raised on missing profile entries."""
        with LogCapture() as log_capture:
            self._hook.after_epoch_profile(0, {}, [])

        log_capture.check(
            ('root', 'INFO', '\tT read data:\t0.000000'),
            ('root', 'INFO', '\tT train:\t0.000000'),
            ('root', 'INFO', '\tT eval:\t0.000000'),
            ('root', 'INFO', '\tT hooks:\t0.000000')
        )
        with LogCapture() as log_capture:
            self._hook.after_epoch_profile(0, {'some_contents': 1}, [])

        log_capture.check(
            ('root', 'INFO', '\tT read data:\t0.000000'),
            ('root', 'INFO', '\tT train:\t0.000000'),
            ('root', 'INFO', '\tT eval:\t0.000000'),
            ('root', 'INFO', '\tT hooks:\t0.000000')
        )
log_vars_test.py 文件源码 项目:cxflow 作者: Cognexa 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_log_variables_selected(self):
        """
        Test logging of selected variables from `epoch_data` streams.
        """
        with LogCapture() as log_capture:
            LogVariables(['accuracy', 'loss2']).after_epoch(_EPOCH_ID, _get_epoch_data())

        log_capture.check(
            ('root', 'INFO', '\ttrain accuracy: 1.000000'),
            ('root', 'INFO', '\ttrain loss2:'),
            ('root', 'INFO', '\t\tmean: 1.000000'),
            ('root', 'INFO', '\t\tmedian: 11.000000'),
            ('root', 'INFO', '\ttest accuracy: 2.000000'),
            ('root', 'INFO', '\ttest loss2:'),
            ('root', 'INFO', '\t\tmean: 2.000000'),
            ('root', 'INFO', '\t\tmedian: 22.000000')
        )
aptly_test.py 文件源码 项目:pyaptly 作者: adfinis-sygroup 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def do_publish_create_republish(config):
    """Test if creating republishes works."""
    with testfixtures.LogCapture() as l:
        do_publish_create(config)
        found = False
        for rec in l.records:
            if rec.levelname == "CRITICAL":
                if "has been deferred" in rec.msg:
                    found = True
        assert found
    args = [
        '-c',
        config,
        'publish',
        'create',
    ]
    main(args)
    state = SystemStateReader()
    state.read()
    assert 'fakerepo01-stable main' in state.publishes
test_rtmbot_core.py 文件源码 项目:marvin 作者: bassdread 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_init():
    with LogCapture() as l:
        rtmbot = init_rtmbot()

    assert rtmbot.token == 'test-12345'
    assert rtmbot.directory == '/tmp/'
    assert rtmbot.debug == True

    l.check(
        ('root', 'INFO', 'Initialized in: /tmp/')
    )
write_tensorboard_test.py 文件源码 项目:cxflow-tensorflow 作者: Cognexa 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_unknown_type(self):
        """Test if ``WriteTensorBoard`` handles unknown variable types as expected."""
        bad_epoch_data = {'valid': {'accuracy': 'bad_type'}}

        # test ignore
        hook = WriteTensorBoard(output_dir=self.tmpdir, model=self.get_model())
        with LogCapture(level=logging.INFO) as log_capture:
            hook.after_epoch(42, bad_epoch_data)
        log_capture.check()

        # test warn
        warn_hook = WriteTensorBoard(output_dir=self.tmpdir, model=self.get_model(), on_unknown_type='warn')
        with LogCapture(level=logging.INFO) as log_capture2:
            warn_hook.after_epoch(42, bad_epoch_data)
        log_capture2.check(('root', 'WARNING', 'Variable `accuracy` in stream `valid` has to be of type `int` '
                                               'or `float` (or a `dict` with a key named `mean` or `nanmean` '
                                               'whose corresponding value is of type `int` or `float`), '
                                               'found `<class \'str\'>` instead.'))

        # test error
        raise_hook = WriteTensorBoard(output_dir=self.tmpdir, model=self.get_model(), on_unknown_type='error')
        with self.assertRaises(ValueError):
            raise_hook.after_epoch(42, bad_epoch_data)

        with mock.patch.dict('sys.modules', **{'cv2': cv2_mock}):
            # test skip image variables
            skip_hook = WriteTensorBoard(output_dir=self.tmpdir, model=self.get_model(), on_unknown_type='error',
                                         image_variables=['accuracy'])
            skip_hook.after_epoch(42, {'valid': {'accuracy': np.zeros((10, 10, 3))}})
            skip_hook._summary_writer.close()
test_plugin.py 文件源码 项目:vpcrouter-romana-plugin 作者: romana 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def setUp(self):
        self.lc = LogCapture()
        self.lc.setLevel(logging.DEBUG)
        self.lc.addFilter(test_common.MyLogCaptureFilter())
        self.addCleanup(self.cleanup)
test_log.py 文件源码 项目:bottery 作者: rougeth 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_ColoredFormatter():
    """Test if logs are being colored"""

    logging.config.dictConfig(DEFAULT_LOGGING)

    with LogCapture(names='bottery') as logs:
        logger = logging.getLogger('bottery')
        logger.debug('DEBUG')
        logger.info('INFO')
        logger.warning('WARN')
        logger.error('ERROR')
        logger.critical('CRITICAL')
        records = [record for record in logs.records]

    # Create a list of all records formated with ColoredFormatter
    colored_formatter = ColoredFormatter()
    formatted_records = [colored_formatter.format(record)
                         for record in records]

    expected_records = [
        'DEBUG',
        'INFO',
        '\x1b[33mWARN\x1b[0m',
        '\x1b[31mERROR\x1b[0m',
        '\x1b[30m\x1b[41mCRITICAL\x1b[0m'
    ]

    assert formatted_records == expected_records
test_logging.py 文件源码 项目:python-zhmcclient 作者: zhmcclient 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def capture():
    """
    This way of defining a fixture works around the issue that when
    using the decorator testfixtures.log_capture() instead, pytest
    fails with "fixture 'capture' not found".
    """
    with LogCapture(level=logging.DEBUG) as log:
        yield log


#
# Test cases
#
test_main.py 文件源码 项目:UrbanSearch 作者: urbansearchTUD 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def test_classify_documents_from_indices(mock_manager, mock_workers,
                                         mock_indices_selector):
    with main.app.app_context():
        with patch('urbansearch.main.request') as mock_flask_request:
            mock_flask_request.args.get.return_value = MagicMock(side_effect=[1, 1, Mock()])
            ind_sel = mock_indices_selector.return_value = Mock()
            cworker = mock_workers.return_value = Mock()
            man = mock_manager.return_value = Mock()

            a = Mock()
            b = Mock()

            producers = ind_sel.run_workers.return_value = [a, Mock()]
            consumers = cworker.run_classifying_workers.return_value = \
                [b, Mock()]

            # Bugs other fixtures if imported globally.
            from testfixtures import LogCapture
            with LogCapture() as l:
                main.classify_documents_from_indices()
                assert ((l.__sizeof__()) > 0)

            assert mock_indices_selector.called
            assert mock_workers.called
            assert mock_manager.called
            assert man.Queue.called
            assert ind_sel.run_workers.called
            assert cworker.run_classifying_workers.called
            assert cworker.set_producers_done.called
            assert a.join.called
            assert b.join.called
test_main.py 文件源码 项目:UrbanSearch 作者: urbansearchTUD 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_classify_indices_to_db_not_connected(mock_db_connected, mock_workers):
    with main.app.app_context():
        with patch('urbansearch.main.request') as mock_flask_request:
            mock_db_connected.return_value = False

            from testfixtures import LogCapture
            with LogCapture() as l:
                main.classify_indices_to_db()
                assert (l.__sizeof__()) > 0
                assert not mock_workers.called
test_main.py 文件源码 项目:UrbanSearch 作者: urbansearchTUD 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_mock_classify_text_files_to_db(mock_manager, mock_workers,
                                        mock_indices_selector,
                                        mock_db_utils):

    mock_db_utils.connected.return_value = True
    w = mock_workers.return_value = Mock()
    man = mock_manager.return_value = Mock()

    a = Mock()
    b = Mock()

    producers = w.run_read_files_worker.return_value = [a, Mock()]
    consumers = w.run_classifying_workers.return_value = \
        [b, Mock()]

    # Bugs other fixtures if imported globally.
    from testfixtures import LogCapture
    with LogCapture() as l:
        main.classify_textfiles_to_db(Mock(), True, 1)
        assert (l.__sizeof__()) > 0

        assert mock_workers.called
        assert mock_manager.called
        assert man.Queue.called
        assert w.run_read_files_worker.called
        assert w.run_classifying_workers.called
        assert w.set_file_producers_done.called
        assert w.clear_file_producers_done.called
        assert a.join.called
        assert b.join.called
test_main.py 文件源码 项目:UrbanSearch 作者: urbansearchTUD 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_mock_classify_text_files_to_db_not_connected(mock_db_connected,
                                                      mock_workers):
    with main.app.app_context():
        with patch('urbansearch.main.request') as mock_flask_request:

            mock_db_connected.return_value = False

            from testfixtures import LogCapture
            with LogCapture() as l:
                main.classify_textfiles_to_db(Mock(), Mock(), 1, True)
                assert (l.__sizeof__()) > 0
                assert not mock_workers.called
test_workers.py 文件源码 项目:UrbanSearch 作者: urbansearchTUD 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_run_classifying_workers(self, mock_event, mock_pd,
                                     mock_classify, mock_pre_process,
                                     mock_coOc, mock_config,
                                     mock_process):
        queue = Mock()
        w = Workers()

        mock_pre_process.return_value = Mock()

        # Bugs other fixtures if imported globally.
        from testfixtures import LogCapture
        with LogCapture() as l:
            w.run_classifying_workers(1, queue, 1, pre_downloaded=True)
            assert (l.__sizeof__()) > 0
            assert mock_pre_process.called
test_main.py 文件源码 项目:hatchery 作者: ajk8 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test__log_failure_and_die():
    with testfixtures.LogCapture() as lc:
        cr = executor.CallResult(1, 'happy_stdout', 'sad_stderr')
        with pytest.raises(SystemExit):
            main._log_failure_and_die('error', cr, False)
        assert _somewhere_in_messages(lc, 'error')
        assert _nowhere_in_messages(lc, 'happy_stdout')
        assert _nowhere_in_messages(lc, 'sad_stderr')
        with pytest.raises(SystemExit):
            main._log_failure_and_die('error', cr, True)
        assert _somewhere_in_messages(lc, 'error')
        assert _somewhere_in_messages(lc, 'happy_stdout')
        assert _somewhere_in_messages(lc, 'sad_stderr')
tests.py 文件源码 项目:sdining 作者: Lurance 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_empty_pool_fallback(self):
        __current_test_get_from_pool_setting = settings.CAPTCHA_GET_FROM_POOL
        settings.CAPTCHA_GET_FROM_POOL = True
        CaptchaStore.objects.all().delete()  # Delete objects created during SetUp
        with LogCapture() as l:
            CaptchaStore.pick()
        l.check(('captcha.models', 'ERROR', "Couldn't get a captcha from pool, generating"),)
        self.assertEqual(CaptchaStore.objects.count(), 1)
        settings.CAPTCHA_GET_FROM_POOL = __current_test_get_from_pool_setting
test_callbacks.py 文件源码 项目:github-snooze-button 作者: tdsmith 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_bad_callback_type_is_logged(self, config):
        with LogCapture() as l:
            snooze.github_callback("foobar", None, None, None, None)
            assert "WARNING" in str(l)


问题


面经


文章

微信
公众号

扫码关注公众号