def test_transmit_course_metadata_task_no_catalog(self):
"""
Test the data transmission task with enterprise customer that has no course catalog.
"""
uuid = str(self.enterprise_customer.uuid)
course_run_ids = ['course-v1:edX+DemoX+Demo_Course']
self.mock_ent_courses_api_with_pagination(
enterprise_uuid=uuid,
course_run_ids=course_run_ids
)
integrated_channel_enterprise = self.sapsf.enterprise_customer
integrated_channel_enterprise.catalog = None
integrated_channel_enterprise.save()
with LogCapture(level=logging.INFO) as log_capture:
call_command('transmit_course_metadata', '--catalog_user', self.user.username)
# Because there are no EnterpriseCustomers with a catalog, the process will end early.
assert not log_capture.records
python类LogCapture()的实例源码
def test_transmit_learner_data(
self,
command_kwargs,
certificate,
self_paced,
end_date,
passed,
expected_completion,
):
"""
Test the log output from a successful run of the transmit_learner_data management command,
using all the ways we can invoke it.
"""
with transmit_learner_data_context(command_kwargs, certificate, self_paced, end_date, passed) as (args, kwargs):
with LogCapture(level=logging.INFO) as log_capture:
expected_output = get_expected_output(**expected_completion)
call_command('transmit_learner_data', *args, **kwargs)
for index, message in enumerate(expected_output):
assert message in log_capture.records[index].getMessage()
def setUp(self):
self.lc = LogCapture()
self.lc.setLevel(logging.DEBUG)
self.lc.addFilter(test_common.MyLogCaptureFilter())
self.additional_setup()
self.addCleanup(self.cleanup)
self.old_handle_spec = vpc.handle_spec
# Monkey patch the handle_spec function, which is called by the
# watcher. The handle_spec function is defined in the VPC module.
# However, it was directly imported by the watcher module, so it's now
# a copy in the watcher module namespace. Thus, the patch has to be
# done actually in the watcher module. For safety, we'll do it in both
# the vpc and watcher module.
def new_handle_spec(*args, **kwargs):
pass
watcher.handle_spec = vpc.handle_spec = new_handle_spec
def test_unsupported(self):
"""Test handling of unsupported types."""
with self.assertRaises(ValueError):
StopOnNaN(on_unknown_type='error').after_epoch(epoch_data=StopOnNaNTest._get_data(lambda: 0))
with self.assertRaises(AssertionError):
StopOnNaN(on_unknown_type='bad value')
with LogCapture() as log_capture:
StopOnNaN(on_unknown_type='warn').after_epoch(epoch_data=StopOnNaNTest._get_data(lambda: 0))
log_capture.check(
('root', 'WARNING', 'Variable `var` of type `<class \'function\'>` can not be checked for NaNs.'),
)
StopOnNaN().after_epoch(epoch_data=StopOnNaNTest._get_data(lambda: 0))
def test_missing_train(self):
"""Test KeyError raised on missing profile entries."""
with LogCapture() as log_capture:
self._hook.after_epoch_profile(0, {}, [])
log_capture.check(
('root', 'INFO', '\tT read data:\t0.000000'),
('root', 'INFO', '\tT train:\t0.000000'),
('root', 'INFO', '\tT eval:\t0.000000'),
('root', 'INFO', '\tT hooks:\t0.000000')
)
with LogCapture() as log_capture:
self._hook.after_epoch_profile(0, {'some_contents': 1}, [])
log_capture.check(
('root', 'INFO', '\tT read data:\t0.000000'),
('root', 'INFO', '\tT train:\t0.000000'),
('root', 'INFO', '\tT eval:\t0.000000'),
('root', 'INFO', '\tT hooks:\t0.000000')
)
def test_log_variables_selected(self):
"""
Test logging of selected variables from `epoch_data` streams.
"""
with LogCapture() as log_capture:
LogVariables(['accuracy', 'loss2']).after_epoch(_EPOCH_ID, _get_epoch_data())
log_capture.check(
('root', 'INFO', '\ttrain accuracy: 1.000000'),
('root', 'INFO', '\ttrain loss2:'),
('root', 'INFO', '\t\tmean: 1.000000'),
('root', 'INFO', '\t\tmedian: 11.000000'),
('root', 'INFO', '\ttest accuracy: 2.000000'),
('root', 'INFO', '\ttest loss2:'),
('root', 'INFO', '\t\tmean: 2.000000'),
('root', 'INFO', '\t\tmedian: 22.000000')
)
def do_publish_create_republish(config):
"""Test if creating republishes works."""
with testfixtures.LogCapture() as l:
do_publish_create(config)
found = False
for rec in l.records:
if rec.levelname == "CRITICAL":
if "has been deferred" in rec.msg:
found = True
assert found
args = [
'-c',
config,
'publish',
'create',
]
main(args)
state = SystemStateReader()
state.read()
assert 'fakerepo01-stable main' in state.publishes
def test_init():
with LogCapture() as l:
rtmbot = init_rtmbot()
assert rtmbot.token == 'test-12345'
assert rtmbot.directory == '/tmp/'
assert rtmbot.debug == True
l.check(
('root', 'INFO', 'Initialized in: /tmp/')
)
def test_unknown_type(self):
"""Test if ``WriteTensorBoard`` handles unknown variable types as expected."""
bad_epoch_data = {'valid': {'accuracy': 'bad_type'}}
# test ignore
hook = WriteTensorBoard(output_dir=self.tmpdir, model=self.get_model())
with LogCapture(level=logging.INFO) as log_capture:
hook.after_epoch(42, bad_epoch_data)
log_capture.check()
# test warn
warn_hook = WriteTensorBoard(output_dir=self.tmpdir, model=self.get_model(), on_unknown_type='warn')
with LogCapture(level=logging.INFO) as log_capture2:
warn_hook.after_epoch(42, bad_epoch_data)
log_capture2.check(('root', 'WARNING', 'Variable `accuracy` in stream `valid` has to be of type `int` '
'or `float` (or a `dict` with a key named `mean` or `nanmean` '
'whose corresponding value is of type `int` or `float`), '
'found `<class \'str\'>` instead.'))
# test error
raise_hook = WriteTensorBoard(output_dir=self.tmpdir, model=self.get_model(), on_unknown_type='error')
with self.assertRaises(ValueError):
raise_hook.after_epoch(42, bad_epoch_data)
with mock.patch.dict('sys.modules', **{'cv2': cv2_mock}):
# test skip image variables
skip_hook = WriteTensorBoard(output_dir=self.tmpdir, model=self.get_model(), on_unknown_type='error',
image_variables=['accuracy'])
skip_hook.after_epoch(42, {'valid': {'accuracy': np.zeros((10, 10, 3))}})
skip_hook._summary_writer.close()
def setUp(self):
self.lc = LogCapture()
self.lc.setLevel(logging.DEBUG)
self.lc.addFilter(test_common.MyLogCaptureFilter())
self.addCleanup(self.cleanup)
def test_ColoredFormatter():
"""Test if logs are being colored"""
logging.config.dictConfig(DEFAULT_LOGGING)
with LogCapture(names='bottery') as logs:
logger = logging.getLogger('bottery')
logger.debug('DEBUG')
logger.info('INFO')
logger.warning('WARN')
logger.error('ERROR')
logger.critical('CRITICAL')
records = [record for record in logs.records]
# Create a list of all records formated with ColoredFormatter
colored_formatter = ColoredFormatter()
formatted_records = [colored_formatter.format(record)
for record in records]
expected_records = [
'DEBUG',
'INFO',
'\x1b[33mWARN\x1b[0m',
'\x1b[31mERROR\x1b[0m',
'\x1b[30m\x1b[41mCRITICAL\x1b[0m'
]
assert formatted_records == expected_records
def capture():
"""
This way of defining a fixture works around the issue that when
using the decorator testfixtures.log_capture() instead, pytest
fails with "fixture 'capture' not found".
"""
with LogCapture(level=logging.DEBUG) as log:
yield log
#
# Test cases
#
def test_classify_documents_from_indices(mock_manager, mock_workers,
mock_indices_selector):
with main.app.app_context():
with patch('urbansearch.main.request') as mock_flask_request:
mock_flask_request.args.get.return_value = MagicMock(side_effect=[1, 1, Mock()])
ind_sel = mock_indices_selector.return_value = Mock()
cworker = mock_workers.return_value = Mock()
man = mock_manager.return_value = Mock()
a = Mock()
b = Mock()
producers = ind_sel.run_workers.return_value = [a, Mock()]
consumers = cworker.run_classifying_workers.return_value = \
[b, Mock()]
# Bugs other fixtures if imported globally.
from testfixtures import LogCapture
with LogCapture() as l:
main.classify_documents_from_indices()
assert ((l.__sizeof__()) > 0)
assert mock_indices_selector.called
assert mock_workers.called
assert mock_manager.called
assert man.Queue.called
assert ind_sel.run_workers.called
assert cworker.run_classifying_workers.called
assert cworker.set_producers_done.called
assert a.join.called
assert b.join.called
def test_classify_indices_to_db_not_connected(mock_db_connected, mock_workers):
with main.app.app_context():
with patch('urbansearch.main.request') as mock_flask_request:
mock_db_connected.return_value = False
from testfixtures import LogCapture
with LogCapture() as l:
main.classify_indices_to_db()
assert (l.__sizeof__()) > 0
assert not mock_workers.called
def test_mock_classify_text_files_to_db(mock_manager, mock_workers,
mock_indices_selector,
mock_db_utils):
mock_db_utils.connected.return_value = True
w = mock_workers.return_value = Mock()
man = mock_manager.return_value = Mock()
a = Mock()
b = Mock()
producers = w.run_read_files_worker.return_value = [a, Mock()]
consumers = w.run_classifying_workers.return_value = \
[b, Mock()]
# Bugs other fixtures if imported globally.
from testfixtures import LogCapture
with LogCapture() as l:
main.classify_textfiles_to_db(Mock(), True, 1)
assert (l.__sizeof__()) > 0
assert mock_workers.called
assert mock_manager.called
assert man.Queue.called
assert w.run_read_files_worker.called
assert w.run_classifying_workers.called
assert w.set_file_producers_done.called
assert w.clear_file_producers_done.called
assert a.join.called
assert b.join.called
def test_mock_classify_text_files_to_db_not_connected(mock_db_connected,
mock_workers):
with main.app.app_context():
with patch('urbansearch.main.request') as mock_flask_request:
mock_db_connected.return_value = False
from testfixtures import LogCapture
with LogCapture() as l:
main.classify_textfiles_to_db(Mock(), Mock(), 1, True)
assert (l.__sizeof__()) > 0
assert not mock_workers.called
def test_run_classifying_workers(self, mock_event, mock_pd,
mock_classify, mock_pre_process,
mock_coOc, mock_config,
mock_process):
queue = Mock()
w = Workers()
mock_pre_process.return_value = Mock()
# Bugs other fixtures if imported globally.
from testfixtures import LogCapture
with LogCapture() as l:
w.run_classifying_workers(1, queue, 1, pre_downloaded=True)
assert (l.__sizeof__()) > 0
assert mock_pre_process.called
def test__log_failure_and_die():
with testfixtures.LogCapture() as lc:
cr = executor.CallResult(1, 'happy_stdout', 'sad_stderr')
with pytest.raises(SystemExit):
main._log_failure_and_die('error', cr, False)
assert _somewhere_in_messages(lc, 'error')
assert _nowhere_in_messages(lc, 'happy_stdout')
assert _nowhere_in_messages(lc, 'sad_stderr')
with pytest.raises(SystemExit):
main._log_failure_and_die('error', cr, True)
assert _somewhere_in_messages(lc, 'error')
assert _somewhere_in_messages(lc, 'happy_stdout')
assert _somewhere_in_messages(lc, 'sad_stderr')
def test_empty_pool_fallback(self):
__current_test_get_from_pool_setting = settings.CAPTCHA_GET_FROM_POOL
settings.CAPTCHA_GET_FROM_POOL = True
CaptchaStore.objects.all().delete() # Delete objects created during SetUp
with LogCapture() as l:
CaptchaStore.pick()
l.check(('captcha.models', 'ERROR', "Couldn't get a captcha from pool, generating"),)
self.assertEqual(CaptchaStore.objects.count(), 1)
settings.CAPTCHA_GET_FROM_POOL = __current_test_get_from_pool_setting
def test_bad_callback_type_is_logged(self, config):
with LogCapture() as l:
snooze.github_callback("foobar", None, None, None, None)
assert "WARNING" in str(l)