python类LogCapture()的实例源码

write_tensorboard_test.py 文件源码 项目:cxflow-tensorflow 作者: Cognexa 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def test_missing_variable(self):
        """Test if ``WriteTensorBoard`` handles missing image variables as expected."""
        bad_epoch_data = {'valid': {}}

        with mock.patch.dict('sys.modules', **{'cv2': cv2_mock}):
            # test ignore
            hook = WriteTensorBoard(output_dir=self.tmpdir, model=self.get_model(), image_variables=['plot'],
                                    on_missing_variable='ignore')
            with LogCapture(level=logging.INFO) as log_capture:
                hook.after_epoch(42, bad_epoch_data)
            log_capture.check()

            # test warn
            warn_hook = WriteTensorBoard(output_dir=self.tmpdir, model=self.get_model(), image_variables=['plot'],
                                         on_missing_variable='warn')
            with LogCapture(level=logging.INFO) as log_capture2:
                warn_hook.after_epoch(42, bad_epoch_data)
            log_capture2.check(('root', 'WARNING', '`plot` not found in epoch data.'))

            # test error
            raise_hook = WriteTensorBoard(output_dir=self.tmpdir, model=self.get_model(), image_variables=['plot'],
                                          on_missing_variable='error')
            with self.assertRaises(KeyError):
                raise_hook.after_epoch(42, bad_epoch_data)
api_test.py 文件源码 项目:micromasters 作者: mitodl 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def test_two_social(self):
        """
        get_social_username should return None if there are two social edX accounts for a user
        """
        UserSocialAuthFactory.create(user=self.user, uid='other name')

        with LogCapture() as log_capture:
            assert get_social_username(self.user) is None
            log_capture.check(
                (
                    'profiles.api',
                    'ERROR',
                    'Unexpected error retrieving social auth username: get() returned more than '
                    'one UserSocialAuth -- it returned 2!'
                )
            )
test_workers.py 文件源码 项目:UrbanSearch 作者: urbansearchTUD 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test__store_indices_db(self, mock_db, mock_event,
                               mock_pd, mock_classify,
                               mock_coOc, mock_pre_process,
                               mock_config):
        mock_index = Mock(return_value=True)
        mock_indice = MagicMock()
        mock_indice.return_value = True
        mock_indice.__len__.return_value = 40001
        mock_config.return_value = 0
        mock_db.store_indices.return_value = False

        w = Workers()
        from testfixtures import LogCapture
        with LogCapture() as l:
            w._store_indices_db(mock_index, mock_indice)
            assert (l.__sizeof__()) > 0
            assert mock_indice.append.called
            assert mock_db.store_indices.called
            assert mock_indice.clear.called
test_workers.py 文件源码 项目:UrbanSearch 作者: urbansearchTUD 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test__store_info_db(self, mock_event,
                            mock_pd, mock_classify,
                            mock_coOc, mock_pre_process,
                            mock_config):
        mock_config.return_value = 0

        mock_digests = Mock()
        mock_itm = Mock()
        mock_list = MagicMock()
        mock_list.__len__.return_value = 1
        mock_item = [mock_itm, mock_list]
        util_func = Mock()
        util_func.__name__ = 'mocked_util_func'

        w = Workers()
        from testfixtures import LogCapture
        with LogCapture() as l:
            w._store_info_db(mock_digests, mock_item, util_func)
            assert (l.__sizeof__()) > 0
            assert mock_list.append.called
            assert mock_list.clear.called
test_repository_listener.py 文件源码 项目:github-snooze-button 作者: tdsmith 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_bad_message_is_logged(self, config, trivial_message):
        responses.add(responses.POST, "https://api.github.com/repos/tdsmith/test_repo/hooks")
        repo_listener = snooze.RepositoryListener(
            events=snooze.LISTEN_EVENTS,
            **config["tdsmith/test_repo"])

        sqs = boto3.resource("sqs", region_name="us-west-2")
        sqs_queue = list(sqs.queues.all())[0]
        sqs_queue.send_message(MessageBody="this isn't a json message at all")

        with LogCapture() as l:
            repo_listener.poll()
            assert 'ERROR' in str(l)

        def my_callback(event, message):
            raise ValueError("I object!")
        sqs_queue.send_message(MessageBody=trivial_message)
        repo_listener.register_callback(my_callback)
        with LogCapture() as l:
            repo_listener.poll()
            assert 'I object!' in str(l)
test_s3.py 文件源码 项目:stor 作者: counsyl 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_upload_progress_logging(self, mock_getsize, mock_files):
        mock_files.return_value = {
            'file%s' % i: 20
            for i in range(20)
        }
        mock_getsize.return_value = 20

        s3_p = S3Path('s3://bucket')
        with LogCapture('stor.s3.progress') as progress_log:
            s3_p.upload(['upload'])
            progress_log.check(
                ('stor.s3.progress', 'INFO', 'starting upload of 20 objects'),  # nopep8
                ('stor.s3.progress', 'INFO', '10/20\t0:00:00\t0.00 MB\t0.00 MB/s'),  # nopep8
                ('stor.s3.progress', 'INFO', '20/20\t0:00:00\t0.00 MB\t0.00 MB/s'),  # nopep8
                ('stor.s3.progress', 'INFO', 'upload complete - 20/20\t0:00:00\t0.00 MB\t0.00 MB/s'),  # nopep8
            )
test_s3.py 文件源码 项目:stor 作者: counsyl 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_download_progress_logging(self, mock_list, mock_getsize, mock_make_dest_dir):
        mock_list.return_value = [
            S3Path('s3://bucket/file%s' % i)
            for i in range(19)
        ] + [S3Path('s3://bucket/dir')]
        mock_getsize.return_value = 100

        s3_p = S3Path('s3://bucket')
        with LogCapture('stor.s3.progress') as progress_log:
            s3_p.download('output_dir')
            progress_log.check(
                ('stor.s3.progress', 'INFO', 'starting download of 20 objects'),  # nopep8
                ('stor.s3.progress', 'INFO', '10/20\t0:00:00\t0.00 MB\t0.00 MB/s'),  # nopep8
                ('stor.s3.progress', 'INFO', '20/20\t0:00:00\t0.00 MB\t0.00 MB/s'),  # nopep8
                ('stor.s3.progress', 'INFO', 'download complete - 20/20\t0:00:00\t0.00 MB\t0.00 MB/s'),  # nopep8
            )
test_swift.py 文件源码 项目:stor 作者: counsyl 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_progress_logging(self):
        self.mock_swift.download.return_value = [
            {
                'action': 'download_object',
                'read_length': 100
            }
            for i in range(20)
        ]
        self.mock_swift.download.return_value.append({'action': 'random_action'})

        swift_p = SwiftPath('swift://tenant/container')
        with LogCapture('stor.swift.progress') as progress_log:
            swift_p.download('output_dir')
            progress_log.check(
                ('stor.swift.progress', 'INFO', 'starting download'),
                ('stor.swift.progress', 'INFO', '10\t0:00:00\t0.00 MB\t0.00 MB/s'),  # nopep8
                ('stor.swift.progress', 'INFO', '20\t0:00:00\t0.00 MB\t0.00 MB/s'),  # nopep8
                ('stor.swift.progress', 'INFO', 'download complete - 20\t0:00:00\t0.00 MB\t0.00 MB/s'),  # nopep8
            )
test_trace_call.py 文件源码 项目:logfury 作者: ppolewicz 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_complex_signature_py2(self):
        with LogCapture() as l:

            @trace_call(self.logger)
            def foo(a, b, c, d, e, g='G', h='H', i='ii', j='jj', *varargs_, **varkwargs_):
                pass

            foo('a', 'b', *['c', 'd'], e='E', Z='Z', **{'g': 'g', 'h': 'h'})

            l.check(
                (
                    'test.v0_1.test_base', 'DEBUG', "calling %sfoo(a='a', b='b', c='c', d='d', e='E', "
                    "g='g', h='h', varkwargs_={'Z': 'Z'}, "
                    "i='ii', j='jj', varargs_=<class '%s._empty'>)" % (self._get_prefix(), INSPECT_MODULE_NAME)
                ),
            )
test_trace_call.py 文件源码 项目:logfury 作者: ppolewicz 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_complex_signature_py3(self):
        if six.PY2:
            raise SkipTest()
        with LogCapture() as l:
            # without this exec Python 2 and pyflakes complain about syntax errors etc
            exec (
                """@trace_call(self.logger)
def foo(a, b, c, d, e, *varargs_, f=None, g='G', h='H', i='ii', j='jj', **varkwargs_: None):
    pass
foo('a', 'b', *['c', 'd'], e='E', f='F', Z='Z', **{'g':'g', 'h':'h'})
""", locals(), globals()
            )
            l.check(
                (
                    'test.v0_1.test_base',
                    'DEBUG',
                    "calling foo(a='a', b='b', c='c', d='d', e='E', f='F', "
                    "g='g', h='h', varkwargs_={'Z': 'Z'}, varargs_=<class '%s._empty'>, "
                    "i='ii', j='jj')" % (INSPECT_MODULE_NAME,)  # prefix does not work because of the eval, inspect module is for pypy3
                ),
            )
test_meta.py 文件源码 项目:logfury 作者: ppolewicz 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_disable_trace(self):
        @six.add_metaclass(TraceAllPublicCallsMeta)
        class Ala(object):
            @disable_trace
            def bar(self, a, b, c=None):
                return True

            def __repr__(self):
                return '<%s object>' % (self.__class__.__name__,)

        class Bela(Ala):
            def bar(self, a, b, c=None):
                return False

        with LogCapture() as l:
            a = Ala()
            a.bar(1, 2, 3)
            a.bar(1, b=2)
            b = Bela()
            b.bar(1, 2, 3)
            b.bar(1, b=2)
            l.check()
error_suite.py 文件源码 项目:solidfire-cli 作者: solidfire 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def check_api_error():
    runner = CliRunner()
    with LogCapture() as l:
        result = runner.invoke(cli.cli, ['--debug', '0', '-c', '0', 'Account', 'GetByID', '--account_id', '1000000'])
        l.check()
    print("Critical setting working.")
    with LogCapture() as l:
        result = runner.invoke(cli.cli, ['--debug', '1', '-c', '0', 'Account', 'GetByID', '--account_id', '1000000'])
        l.check(('element.cli.cli', "ERROR", "xUnknownAccount"))
    print("Error setting working.")
    with LogCapture() as l:
        result = runner.invoke(cli.cli, ['--debug', '2', '-c', '0', 'Account', 'GetByID', '--account_id', '1000000'])
        l.check(
            ('element.cli.cli', "INFO", "account_id = 1000000;"),
            ('element.cli.cli', "ERROR", "xUnknownAccount")
        )
    print("Info setting is working.")
test_models.py 文件源码 项目:cyphon 作者: dunbarcyber 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_natural_key_exception():
        """
        Tests the get_by_natural_key method for a ContextFilter that
        doesn't exist.
        """
        with LogCapture() as log_capture:
            key = ['dummy_context', 'mongodb', 'test_database', 'test_posts',
                   'host', 'from']
            ContextFilter.objects.get_by_natural_key(*key)
            expected_1 = ('Context dummy_context:mongodb.test_database.'
                          'test_posts does not exist')
            expected_2 = ('ContextFilter dummy_context:mongodb.test_database.'
                          'test_posts (host -> from) does not exist')
            log_capture.check(
                ('contexts.models', 'ERROR', expected_1),
                ('contexts.models', 'ERROR', expected_2)
            )
test_receiver.py 文件源码 项目:cyphon 作者: dunbarcyber 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_process_msg_exception(self):
        """
        Tests the process_msg function when an exception is raised.
        """
        logging.disable(logging.NOTSET)
        with patch('receiver.receiver.logging.getLogger', return_value=LOGGER):
            with patch('receiver.receiver.json.loads', side_effect=Exception('foo')):
                with LogCapture() as log_capture:
                    process_msg(**self.kwargs)
                    log_capture.check(
                        ('receiver',
                         'ERROR',
                         'An error occurred while processing the message \'{"@uuid": "12345", '
                         '"collection": "elasticsearch.test_index.test_logs", "message": '
                         '"foobar"}\':\n'
                         '  foo'),
                    )
test_models.py 文件源码 项目:cyphon 作者: dunbarcyber 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_natural_key_exception(self):
        """
        Tests the get_by_natural_key method when the Distillery does not exist.
        """
        with LogCapture() as log_capture:
            natural_key = ['elasticsearch', 'test_index', 'fake_doctype']
            Distillery.objects.get_by_natural_key(*natural_key)
            log_capture.check(
                ('warehouses.models',
                 'ERROR',
                 'Collection elasticsearch.test_index.fake_doctype does '
                 'not exist'),
                ('distilleries.models',
                 'ERROR',
                 'Distillery for Collection elasticsearch.test_index.'
                 'fake_doctype does not exist')
            )
test_models.py 文件源码 项目:cyphon 作者: dunbarcyber 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_add_raw_data_info_for_none(self):
        """
        Tests the _add_raw_data_info method when no Collection name is
        given.
        """
        with LogCapture() as log_capture:
            doc_obj = self.doc_obj
            doc_obj.collection = None
            actual = self.distillery._add_raw_data_info(self.doc, doc_obj)
            expected = self.doc
            log_capture.check(
                ('cyphon.documents',
                 'ERROR',
                 'Info for raw data document None:1 could not be added'),
            )
            self.assertEqual(actual, expected)
test_functional.py 文件源码 项目:cyphon 作者: dunbarcyber 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_integrity_error(self):
        """
        Tests the configuration test tool when an IntegrityError is raised.
        """
        self.page.config_test_value = 'test text'

        with patch('django.forms.ModelForm.save',
                   side_effect=IntegrityError('foo')):
            with LogCapture('cyphon.admin') as log_capture:

                actual = self.page.run_test()
                expected = "Could not create an object for testing: foo"
                self.assertEqual(actual, expected)

                msg = 'An error occurred while creating a test instance: ' + \
                      '<WSGIRequest: POST ' + \
                      "'/admin/mailcondensers/mailcondenser/1/change/test/'>"
                log_capture.check(
                    ('cyphon.admin', 'ERROR', msg),
                )
test_functional.py 文件源码 项目:cyphon 作者: dunbarcyber 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_validation_error(self):
        """
        Tests the configuration test tool when a ValidationError is raised.
        """
        self.page.config_test_value = 'test text'

        with patch(
            'sifter.mailsifter.mailcondensers.admin.MailCondenserAdmin._get_result',
            side_effect=ValidationError('foo')):
            with LogCapture('cyphon.admin') as log_capture:

                actual = self.page.run_test()
                expected = "A validation error occurred: ['foo']"
                self.assertEqual(actual, expected)

                msg = 'An error occurred while initializing a config test: ' + \
                      '<WSGIRequest: POST ' + \
                      "'/admin/mailcondensers/mailcondenser/1/change/test/'>"
                log_capture.check(
                    ('cyphon.admin', 'ERROR', msg),
                )
test_accessors.py 文件源码 项目:cyphon 作者: dunbarcyber 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_decode_error(self):
        """
        Tests the get_email_value function when a UnicodeDecodeError is
        raised.
        """
        error = UnicodeDecodeError('funnycodec', b'\x00\x00', 1, 2,
                                   'Something went wrong!')
        with patch('sifter.mailsifter.accessors.bleach.clean',
                   side_effect=error):
            with LogCapture() as log_capture:
                actual = accessors.get_email_value('Subject', {'Subject': 'test'})
                expected = 'The Subject of this email could not be displayed ' + \
                           'due to an error.'
                self.assertEqual(actual, expected)

                msg = ('An error was encountered while parsing the '
                       'Subject field of an email.')
                log_capture.check(
                    ('sifter.mailsifter.accessors', 'ERROR', msg),
                )
test_attachments.py 文件源码 项目:cyphon 作者: dunbarcyber 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_no_file_path(self):
        """
        Tests the save_attachment function.
        """
        mock_settings_1 = {
            'ALLOWED_EMAIL_ATTACHMENTS': ('application/java',)
        }
        with patch.dict('sifter.mailsifter.accessors.settings.MAILSIFTER',
                        mock_settings_1):
            self.msg.attach(self.java)
            attachment = get_first_attachment(self.msg)
            with patch('sifter.mailsifter.attachments.settings',
                       self.mock_settings):
                with LogCapture() as log_capture:
                    actual = attachments.save_attachment(attachment)
                    expected = None
                    self.assertEqual(actual, expected)
                    msg = 'The attachment %s is not an allowed file type' \
                          % self.java_file
                    log_capture.check(
                        ('sifter.mailsifter.attachments', 'WARNING', msg),
                    )
test_models.py 文件源码 项目:cyphon 作者: dunbarcyber 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_no_match_missing_munger(self):
        """
        Tests the process_email receiver for an email that doesn't match
        an existing MailChute when a default MailChute is enabled but
        the defaul MailMunger can't be found.
        """
        doc_obj = self.doc_obj
        doc_obj.data['Subject'] = 'nothing to see here'
        mock_config = {
            'DEFAULT_MUNGER': 'missing_munger',
            'DEFAULT_MUNGER_ENABLED': True
        }
        with patch.dict('sifter.mailsifter.mailchutes.models.conf.MAILSIFTER',
                        mock_config):
            with LogCapture() as log_capture:
                msg = 'Default MailMunger "missing_munger" is not configured.'
                MailChute.objects.process(doc_obj)
                log_capture.check(
                    ('sifter.chutes.models', 'ERROR', msg),
                )
test_functional.py 文件源码 项目:cyphon 作者: dunbarcyber 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def test_integrity_error(self):
        """
        Tests the configuration test tool when an IntegrityError is raised.
        """
        self.page.config_test_value = json.dumps({'text': 'test'})

        with patch('django.forms.ModelForm.save',
                   side_effect=IntegrityError('foo')):
            with LogCapture('cyphon.admin') as log_capture:

                actual = self.page.run_test()
                expected = "Could not create an object for testing: foo"
                self.assertEqual(actual, expected)

                msg = 'An error occurred while creating a test instance: ' + \
                      '<WSGIRequest: POST ' + \
                      "'/admin/datacondensers/datacondenser/1/change/test/'>"
                log_capture.check(
                    ('cyphon.admin', 'ERROR', msg),
                )
test_functional.py 文件源码 项目:cyphon 作者: dunbarcyber 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_validation_error(self):
        """
        Tests the configuration test tool when a ValidationError is raised.
        """
        self.page.config_test_value = json.dumps({'text': 'test'})

        with patch(
            'sifter.datasifter.datacondensers.admin.DataCondenserAdmin._get_result',
            side_effect=ValidationError('foo')):
            with LogCapture('cyphon.admin') as log_capture:

                actual = self.page.run_test()
                expected = "A validation error occurred: ['foo']"
                self.assertEqual(actual, expected)

                msg = 'An error occurred while initializing a config test: ' + \
                      '<WSGIRequest: POST ' + \
                      "'/admin/datacondensers/datacondenser/1/change/test/'>"
                log_capture.check(
                    ('cyphon.admin', 'ERROR', msg),
                )
test_functional.py 文件源码 项目:cyphon 作者: dunbarcyber 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_integrity_error(self):
        """
        Tests the configuration test tool when an IntegrityError is raised.
        """
        with patch('django.forms.ModelForm.save',
                   side_effect=IntegrityError('foo')):
            with LogCapture('cyphon.admin') as log_capture:

                actual = self.page.run_test()
                expected = "Could not create an object for testing: foo"
                self.assertEqual(actual, expected)

                msg = 'An error occurred while creating a test instance: ' + \
                      '<WSGIRequest: POST ' + \
                      "'/admin/logcondensers/logcondenser/1/change/test/'>"
                log_capture.check(
                    ('cyphon.admin', 'ERROR', msg),
                )
test_models.py 文件源码 项目:cyphon 作者: dunbarcyber 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_get_default_no_chute(self):
        """
        Tests the _default_munger function when the default LogMunger
        does not exist.
        """
        mock_config = {
            'DEFAULT_MUNGER': 'dummy_munger',
            'DEFAULT_MUNGER_ENABLED': True
        }
        with patch.dict('sifter.logsifter.logchutes.models.conf.LOGSIFTER',
                        mock_config):
            with LogCapture() as log_capture:
                actual = LogChute.objects._default_munger
                expected = None
                self.assertEqual(actual, expected)
                self.assertFalse(LogChute.objects._default_munger_enabled)
                log_capture.check(
                    ('sifter.chutes.models',
                     'ERROR',
                     'Default LogMunger "dummy_munger" is not configured.'),
                )
test_signals.py 文件源码 项目:cyphon 作者: dunbarcyber 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_email_error(self):
        """
        Tests that an error message is logged when an
        SMTPAuthenticationErro is encountered.
        """
        mock_email = Mock()
        mock_email.send = Mock(
            side_effect=SMTPAuthenticationError(535, 'foobar'))
        with patch('alerts.signals.emails_enabled', return_value=True):
            with patch('alerts.signals.compose_comment_email',
                       return_value=mock_email):
                with LogCapture() as log_capture:
                    comment = Comment.objects.get(pk=1)
                    comment.pk = None
                    comment.save()
                    log_capture.check(
                        ('alerts.signals',
                         'ERROR',
                         'An error occurred when sending an email '
                         'notification: (535, \'foobar\')'),
                    )
test_engine.py 文件源码 项目:cyphon 作者: dunbarcyber 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_cannot_connect(self, mock_index):
        """
        Tests the catch_connection_error decorator when a connection
        is established.
        """
        @catch_connection_error
        def test_decorator():
            """Test the catch_connection_error decorator."""
            self.engine.insert({'foo': 'bar'})

        with LogCapture() as log_capture:
            test_decorator()
            expected = 'Cannot connect to Elasticsearch'
            log_capture.check(
                ('engines.elasticsearch.engine', 'ERROR', expected),
            )
test_pump.py 文件源码 项目:cyphon 作者: dunbarcyber 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_normal_streaming_query(self):
        """
        Tests the start method for a Pump with a streaming Pipe and a query that
        doesn't exceed the Pipe's specs.
        """
        with LogCapture() as log_capture:
            self.stream_pump._factor_query = Mock(return_value=[self.subquery1])
            self.stream_pump._process_streaming_query = Mock()
            self.stream_pump.start(self.subquery1)

            # check that _factor_query() was called with the value that was
            # passed to start()
            self.stream_pump._factor_query.assert_called_once_with(self.subquery1)

            # check that _process_nonstreaming_queries() was called with the
            # first element of the query list returned by _factor_query()
            self.stream_pump._process_streaming_query.assert_called_once_with(
                self.subquery1)

            log_capture.check()
test_pump.py 文件源码 项目:cyphon 作者: dunbarcyber 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_large_streaming_query(self):
        """
        Tests the start method for a Pump with a streaming Pipe and a query that
        exceeds the Pipe's specs.
        """
        with LogCapture() as log_capture:
            self.stream_pump._factor_query = Mock(return_value=self.query_list)
            self.stream_pump._process_streaming_query = Mock()
            self.stream_pump.start(self.query)

            # check that _factor_query() was called with the value that was
            # passed to start()
            self.stream_pump._factor_query.assert_called_once_with(self.query)

            # check that _process_nonstreaming_queries() was called with the
            # first element of the query list returned by _factor_query()
            self.stream_pump._process_streaming_query.assert_called_once_with(
                self.query_list[0])

            # check that a warning was generated
            msg = 'Query was too large for Pipe "Twitter PublicStreamsAPI." ' \
                        + 'A smaller version of the query was submitted.'
            log_capture.check(
                ('aggregator.pumproom.pump', 'WARNING', msg),
            )
test_management.py 文件源码 项目:edx-enterprise 作者: edx 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_transmit_course_metadata_task_no_channel(self):
        """
        Test the data transmission task without any integrated channel.
        """
        user = factories.UserFactory(username='john_doe')
        factories.EnterpriseCustomerFactory(
            catalog=1,
            name='Veridian Dynamics',
        )

        # Remove all integrated channels
        SAPSuccessFactorsEnterpriseCustomerConfiguration.objects.all().delete()
        DegreedEnterpriseCustomerConfiguration.objects.all().delete()

        with LogCapture(level=logging.INFO) as log_capture:
            call_command('transmit_course_metadata', '--catalog_user', user.username)

            # Because there are no IntegratedChannels, the process will end early.
            assert not log_capture.records


问题


面经


文章

微信
公众号

扫码关注公众号