def test_missing_variable(self):
"""Test if ``WriteTensorBoard`` handles missing image variables as expected."""
bad_epoch_data = {'valid': {}}
with mock.patch.dict('sys.modules', **{'cv2': cv2_mock}):
# test ignore
hook = WriteTensorBoard(output_dir=self.tmpdir, model=self.get_model(), image_variables=['plot'],
on_missing_variable='ignore')
with LogCapture(level=logging.INFO) as log_capture:
hook.after_epoch(42, bad_epoch_data)
log_capture.check()
# test warn
warn_hook = WriteTensorBoard(output_dir=self.tmpdir, model=self.get_model(), image_variables=['plot'],
on_missing_variable='warn')
with LogCapture(level=logging.INFO) as log_capture2:
warn_hook.after_epoch(42, bad_epoch_data)
log_capture2.check(('root', 'WARNING', '`plot` not found in epoch data.'))
# test error
raise_hook = WriteTensorBoard(output_dir=self.tmpdir, model=self.get_model(), image_variables=['plot'],
on_missing_variable='error')
with self.assertRaises(KeyError):
raise_hook.after_epoch(42, bad_epoch_data)
python类LogCapture()的实例源码
def test_two_social(self):
"""
get_social_username should return None if there are two social edX accounts for a user
"""
UserSocialAuthFactory.create(user=self.user, uid='other name')
with LogCapture() as log_capture:
assert get_social_username(self.user) is None
log_capture.check(
(
'profiles.api',
'ERROR',
'Unexpected error retrieving social auth username: get() returned more than '
'one UserSocialAuth -- it returned 2!'
)
)
def test__store_indices_db(self, mock_db, mock_event,
mock_pd, mock_classify,
mock_coOc, mock_pre_process,
mock_config):
mock_index = Mock(return_value=True)
mock_indice = MagicMock()
mock_indice.return_value = True
mock_indice.__len__.return_value = 40001
mock_config.return_value = 0
mock_db.store_indices.return_value = False
w = Workers()
from testfixtures import LogCapture
with LogCapture() as l:
w._store_indices_db(mock_index, mock_indice)
assert (l.__sizeof__()) > 0
assert mock_indice.append.called
assert mock_db.store_indices.called
assert mock_indice.clear.called
def test__store_info_db(self, mock_event,
mock_pd, mock_classify,
mock_coOc, mock_pre_process,
mock_config):
mock_config.return_value = 0
mock_digests = Mock()
mock_itm = Mock()
mock_list = MagicMock()
mock_list.__len__.return_value = 1
mock_item = [mock_itm, mock_list]
util_func = Mock()
util_func.__name__ = 'mocked_util_func'
w = Workers()
from testfixtures import LogCapture
with LogCapture() as l:
w._store_info_db(mock_digests, mock_item, util_func)
assert (l.__sizeof__()) > 0
assert mock_list.append.called
assert mock_list.clear.called
def test_bad_message_is_logged(self, config, trivial_message):
responses.add(responses.POST, "https://api.github.com/repos/tdsmith/test_repo/hooks")
repo_listener = snooze.RepositoryListener(
events=snooze.LISTEN_EVENTS,
**config["tdsmith/test_repo"])
sqs = boto3.resource("sqs", region_name="us-west-2")
sqs_queue = list(sqs.queues.all())[0]
sqs_queue.send_message(MessageBody="this isn't a json message at all")
with LogCapture() as l:
repo_listener.poll()
assert 'ERROR' in str(l)
def my_callback(event, message):
raise ValueError("I object!")
sqs_queue.send_message(MessageBody=trivial_message)
repo_listener.register_callback(my_callback)
with LogCapture() as l:
repo_listener.poll()
assert 'I object!' in str(l)
def test_upload_progress_logging(self, mock_getsize, mock_files):
mock_files.return_value = {
'file%s' % i: 20
for i in range(20)
}
mock_getsize.return_value = 20
s3_p = S3Path('s3://bucket')
with LogCapture('stor.s3.progress') as progress_log:
s3_p.upload(['upload'])
progress_log.check(
('stor.s3.progress', 'INFO', 'starting upload of 20 objects'), # nopep8
('stor.s3.progress', 'INFO', '10/20\t0:00:00\t0.00 MB\t0.00 MB/s'), # nopep8
('stor.s3.progress', 'INFO', '20/20\t0:00:00\t0.00 MB\t0.00 MB/s'), # nopep8
('stor.s3.progress', 'INFO', 'upload complete - 20/20\t0:00:00\t0.00 MB\t0.00 MB/s'), # nopep8
)
def test_download_progress_logging(self, mock_list, mock_getsize, mock_make_dest_dir):
mock_list.return_value = [
S3Path('s3://bucket/file%s' % i)
for i in range(19)
] + [S3Path('s3://bucket/dir')]
mock_getsize.return_value = 100
s3_p = S3Path('s3://bucket')
with LogCapture('stor.s3.progress') as progress_log:
s3_p.download('output_dir')
progress_log.check(
('stor.s3.progress', 'INFO', 'starting download of 20 objects'), # nopep8
('stor.s3.progress', 'INFO', '10/20\t0:00:00\t0.00 MB\t0.00 MB/s'), # nopep8
('stor.s3.progress', 'INFO', '20/20\t0:00:00\t0.00 MB\t0.00 MB/s'), # nopep8
('stor.s3.progress', 'INFO', 'download complete - 20/20\t0:00:00\t0.00 MB\t0.00 MB/s'), # nopep8
)
def test_progress_logging(self):
self.mock_swift.download.return_value = [
{
'action': 'download_object',
'read_length': 100
}
for i in range(20)
]
self.mock_swift.download.return_value.append({'action': 'random_action'})
swift_p = SwiftPath('swift://tenant/container')
with LogCapture('stor.swift.progress') as progress_log:
swift_p.download('output_dir')
progress_log.check(
('stor.swift.progress', 'INFO', 'starting download'),
('stor.swift.progress', 'INFO', '10\t0:00:00\t0.00 MB\t0.00 MB/s'), # nopep8
('stor.swift.progress', 'INFO', '20\t0:00:00\t0.00 MB\t0.00 MB/s'), # nopep8
('stor.swift.progress', 'INFO', 'download complete - 20\t0:00:00\t0.00 MB\t0.00 MB/s'), # nopep8
)
def test_complex_signature_py2(self):
with LogCapture() as l:
@trace_call(self.logger)
def foo(a, b, c, d, e, g='G', h='H', i='ii', j='jj', *varargs_, **varkwargs_):
pass
foo('a', 'b', *['c', 'd'], e='E', Z='Z', **{'g': 'g', 'h': 'h'})
l.check(
(
'test.v0_1.test_base', 'DEBUG', "calling %sfoo(a='a', b='b', c='c', d='d', e='E', "
"g='g', h='h', varkwargs_={'Z': 'Z'}, "
"i='ii', j='jj', varargs_=<class '%s._empty'>)" % (self._get_prefix(), INSPECT_MODULE_NAME)
),
)
def test_complex_signature_py3(self):
if six.PY2:
raise SkipTest()
with LogCapture() as l:
# without this exec Python 2 and pyflakes complain about syntax errors etc
exec (
"""@trace_call(self.logger)
def foo(a, b, c, d, e, *varargs_, f=None, g='G', h='H', i='ii', j='jj', **varkwargs_: None):
pass
foo('a', 'b', *['c', 'd'], e='E', f='F', Z='Z', **{'g':'g', 'h':'h'})
""", locals(), globals()
)
l.check(
(
'test.v0_1.test_base',
'DEBUG',
"calling foo(a='a', b='b', c='c', d='d', e='E', f='F', "
"g='g', h='h', varkwargs_={'Z': 'Z'}, varargs_=<class '%s._empty'>, "
"i='ii', j='jj')" % (INSPECT_MODULE_NAME,) # prefix does not work because of the eval, inspect module is for pypy3
),
)
def test_disable_trace(self):
@six.add_metaclass(TraceAllPublicCallsMeta)
class Ala(object):
@disable_trace
def bar(self, a, b, c=None):
return True
def __repr__(self):
return '<%s object>' % (self.__class__.__name__,)
class Bela(Ala):
def bar(self, a, b, c=None):
return False
with LogCapture() as l:
a = Ala()
a.bar(1, 2, 3)
a.bar(1, b=2)
b = Bela()
b.bar(1, 2, 3)
b.bar(1, b=2)
l.check()
def check_api_error():
runner = CliRunner()
with LogCapture() as l:
result = runner.invoke(cli.cli, ['--debug', '0', '-c', '0', 'Account', 'GetByID', '--account_id', '1000000'])
l.check()
print("Critical setting working.")
with LogCapture() as l:
result = runner.invoke(cli.cli, ['--debug', '1', '-c', '0', 'Account', 'GetByID', '--account_id', '1000000'])
l.check(('element.cli.cli', "ERROR", "xUnknownAccount"))
print("Error setting working.")
with LogCapture() as l:
result = runner.invoke(cli.cli, ['--debug', '2', '-c', '0', 'Account', 'GetByID', '--account_id', '1000000'])
l.check(
('element.cli.cli', "INFO", "account_id = 1000000;"),
('element.cli.cli', "ERROR", "xUnknownAccount")
)
print("Info setting is working.")
def test_natural_key_exception():
"""
Tests the get_by_natural_key method for a ContextFilter that
doesn't exist.
"""
with LogCapture() as log_capture:
key = ['dummy_context', 'mongodb', 'test_database', 'test_posts',
'host', 'from']
ContextFilter.objects.get_by_natural_key(*key)
expected_1 = ('Context dummy_context:mongodb.test_database.'
'test_posts does not exist')
expected_2 = ('ContextFilter dummy_context:mongodb.test_database.'
'test_posts (host -> from) does not exist')
log_capture.check(
('contexts.models', 'ERROR', expected_1),
('contexts.models', 'ERROR', expected_2)
)
def test_process_msg_exception(self):
"""
Tests the process_msg function when an exception is raised.
"""
logging.disable(logging.NOTSET)
with patch('receiver.receiver.logging.getLogger', return_value=LOGGER):
with patch('receiver.receiver.json.loads', side_effect=Exception('foo')):
with LogCapture() as log_capture:
process_msg(**self.kwargs)
log_capture.check(
('receiver',
'ERROR',
'An error occurred while processing the message \'{"@uuid": "12345", '
'"collection": "elasticsearch.test_index.test_logs", "message": '
'"foobar"}\':\n'
' foo'),
)
def test_natural_key_exception(self):
"""
Tests the get_by_natural_key method when the Distillery does not exist.
"""
with LogCapture() as log_capture:
natural_key = ['elasticsearch', 'test_index', 'fake_doctype']
Distillery.objects.get_by_natural_key(*natural_key)
log_capture.check(
('warehouses.models',
'ERROR',
'Collection elasticsearch.test_index.fake_doctype does '
'not exist'),
('distilleries.models',
'ERROR',
'Distillery for Collection elasticsearch.test_index.'
'fake_doctype does not exist')
)
def test_add_raw_data_info_for_none(self):
"""
Tests the _add_raw_data_info method when no Collection name is
given.
"""
with LogCapture() as log_capture:
doc_obj = self.doc_obj
doc_obj.collection = None
actual = self.distillery._add_raw_data_info(self.doc, doc_obj)
expected = self.doc
log_capture.check(
('cyphon.documents',
'ERROR',
'Info for raw data document None:1 could not be added'),
)
self.assertEqual(actual, expected)
def test_integrity_error(self):
"""
Tests the configuration test tool when an IntegrityError is raised.
"""
self.page.config_test_value = 'test text'
with patch('django.forms.ModelForm.save',
side_effect=IntegrityError('foo')):
with LogCapture('cyphon.admin') as log_capture:
actual = self.page.run_test()
expected = "Could not create an object for testing: foo"
self.assertEqual(actual, expected)
msg = 'An error occurred while creating a test instance: ' + \
'<WSGIRequest: POST ' + \
"'/admin/mailcondensers/mailcondenser/1/change/test/'>"
log_capture.check(
('cyphon.admin', 'ERROR', msg),
)
def test_validation_error(self):
"""
Tests the configuration test tool when a ValidationError is raised.
"""
self.page.config_test_value = 'test text'
with patch(
'sifter.mailsifter.mailcondensers.admin.MailCondenserAdmin._get_result',
side_effect=ValidationError('foo')):
with LogCapture('cyphon.admin') as log_capture:
actual = self.page.run_test()
expected = "A validation error occurred: ['foo']"
self.assertEqual(actual, expected)
msg = 'An error occurred while initializing a config test: ' + \
'<WSGIRequest: POST ' + \
"'/admin/mailcondensers/mailcondenser/1/change/test/'>"
log_capture.check(
('cyphon.admin', 'ERROR', msg),
)
def test_decode_error(self):
"""
Tests the get_email_value function when a UnicodeDecodeError is
raised.
"""
error = UnicodeDecodeError('funnycodec', b'\x00\x00', 1, 2,
'Something went wrong!')
with patch('sifter.mailsifter.accessors.bleach.clean',
side_effect=error):
with LogCapture() as log_capture:
actual = accessors.get_email_value('Subject', {'Subject': 'test'})
expected = 'The Subject of this email could not be displayed ' + \
'due to an error.'
self.assertEqual(actual, expected)
msg = ('An error was encountered while parsing the '
'Subject field of an email.')
log_capture.check(
('sifter.mailsifter.accessors', 'ERROR', msg),
)
def test_no_file_path(self):
"""
Tests the save_attachment function.
"""
mock_settings_1 = {
'ALLOWED_EMAIL_ATTACHMENTS': ('application/java',)
}
with patch.dict('sifter.mailsifter.accessors.settings.MAILSIFTER',
mock_settings_1):
self.msg.attach(self.java)
attachment = get_first_attachment(self.msg)
with patch('sifter.mailsifter.attachments.settings',
self.mock_settings):
with LogCapture() as log_capture:
actual = attachments.save_attachment(attachment)
expected = None
self.assertEqual(actual, expected)
msg = 'The attachment %s is not an allowed file type' \
% self.java_file
log_capture.check(
('sifter.mailsifter.attachments', 'WARNING', msg),
)
def test_no_match_missing_munger(self):
"""
Tests the process_email receiver for an email that doesn't match
an existing MailChute when a default MailChute is enabled but
the defaul MailMunger can't be found.
"""
doc_obj = self.doc_obj
doc_obj.data['Subject'] = 'nothing to see here'
mock_config = {
'DEFAULT_MUNGER': 'missing_munger',
'DEFAULT_MUNGER_ENABLED': True
}
with patch.dict('sifter.mailsifter.mailchutes.models.conf.MAILSIFTER',
mock_config):
with LogCapture() as log_capture:
msg = 'Default MailMunger "missing_munger" is not configured.'
MailChute.objects.process(doc_obj)
log_capture.check(
('sifter.chutes.models', 'ERROR', msg),
)
def test_integrity_error(self):
"""
Tests the configuration test tool when an IntegrityError is raised.
"""
self.page.config_test_value = json.dumps({'text': 'test'})
with patch('django.forms.ModelForm.save',
side_effect=IntegrityError('foo')):
with LogCapture('cyphon.admin') as log_capture:
actual = self.page.run_test()
expected = "Could not create an object for testing: foo"
self.assertEqual(actual, expected)
msg = 'An error occurred while creating a test instance: ' + \
'<WSGIRequest: POST ' + \
"'/admin/datacondensers/datacondenser/1/change/test/'>"
log_capture.check(
('cyphon.admin', 'ERROR', msg),
)
def test_validation_error(self):
"""
Tests the configuration test tool when a ValidationError is raised.
"""
self.page.config_test_value = json.dumps({'text': 'test'})
with patch(
'sifter.datasifter.datacondensers.admin.DataCondenserAdmin._get_result',
side_effect=ValidationError('foo')):
with LogCapture('cyphon.admin') as log_capture:
actual = self.page.run_test()
expected = "A validation error occurred: ['foo']"
self.assertEqual(actual, expected)
msg = 'An error occurred while initializing a config test: ' + \
'<WSGIRequest: POST ' + \
"'/admin/datacondensers/datacondenser/1/change/test/'>"
log_capture.check(
('cyphon.admin', 'ERROR', msg),
)
def test_integrity_error(self):
"""
Tests the configuration test tool when an IntegrityError is raised.
"""
with patch('django.forms.ModelForm.save',
side_effect=IntegrityError('foo')):
with LogCapture('cyphon.admin') as log_capture:
actual = self.page.run_test()
expected = "Could not create an object for testing: foo"
self.assertEqual(actual, expected)
msg = 'An error occurred while creating a test instance: ' + \
'<WSGIRequest: POST ' + \
"'/admin/logcondensers/logcondenser/1/change/test/'>"
log_capture.check(
('cyphon.admin', 'ERROR', msg),
)
def test_get_default_no_chute(self):
"""
Tests the _default_munger function when the default LogMunger
does not exist.
"""
mock_config = {
'DEFAULT_MUNGER': 'dummy_munger',
'DEFAULT_MUNGER_ENABLED': True
}
with patch.dict('sifter.logsifter.logchutes.models.conf.LOGSIFTER',
mock_config):
with LogCapture() as log_capture:
actual = LogChute.objects._default_munger
expected = None
self.assertEqual(actual, expected)
self.assertFalse(LogChute.objects._default_munger_enabled)
log_capture.check(
('sifter.chutes.models',
'ERROR',
'Default LogMunger "dummy_munger" is not configured.'),
)
def test_email_error(self):
"""
Tests that an error message is logged when an
SMTPAuthenticationErro is encountered.
"""
mock_email = Mock()
mock_email.send = Mock(
side_effect=SMTPAuthenticationError(535, 'foobar'))
with patch('alerts.signals.emails_enabled', return_value=True):
with patch('alerts.signals.compose_comment_email',
return_value=mock_email):
with LogCapture() as log_capture:
comment = Comment.objects.get(pk=1)
comment.pk = None
comment.save()
log_capture.check(
('alerts.signals',
'ERROR',
'An error occurred when sending an email '
'notification: (535, \'foobar\')'),
)
def test_cannot_connect(self, mock_index):
"""
Tests the catch_connection_error decorator when a connection
is established.
"""
@catch_connection_error
def test_decorator():
"""Test the catch_connection_error decorator."""
self.engine.insert({'foo': 'bar'})
with LogCapture() as log_capture:
test_decorator()
expected = 'Cannot connect to Elasticsearch'
log_capture.check(
('engines.elasticsearch.engine', 'ERROR', expected),
)
def test_normal_streaming_query(self):
"""
Tests the start method for a Pump with a streaming Pipe and a query that
doesn't exceed the Pipe's specs.
"""
with LogCapture() as log_capture:
self.stream_pump._factor_query = Mock(return_value=[self.subquery1])
self.stream_pump._process_streaming_query = Mock()
self.stream_pump.start(self.subquery1)
# check that _factor_query() was called with the value that was
# passed to start()
self.stream_pump._factor_query.assert_called_once_with(self.subquery1)
# check that _process_nonstreaming_queries() was called with the
# first element of the query list returned by _factor_query()
self.stream_pump._process_streaming_query.assert_called_once_with(
self.subquery1)
log_capture.check()
def test_large_streaming_query(self):
"""
Tests the start method for a Pump with a streaming Pipe and a query that
exceeds the Pipe's specs.
"""
with LogCapture() as log_capture:
self.stream_pump._factor_query = Mock(return_value=self.query_list)
self.stream_pump._process_streaming_query = Mock()
self.stream_pump.start(self.query)
# check that _factor_query() was called with the value that was
# passed to start()
self.stream_pump._factor_query.assert_called_once_with(self.query)
# check that _process_nonstreaming_queries() was called with the
# first element of the query list returned by _factor_query()
self.stream_pump._process_streaming_query.assert_called_once_with(
self.query_list[0])
# check that a warning was generated
msg = 'Query was too large for Pipe "Twitter PublicStreamsAPI." ' \
+ 'A smaller version of the query was submitted.'
log_capture.check(
('aggregator.pumproom.pump', 'WARNING', msg),
)
def test_transmit_course_metadata_task_no_channel(self):
"""
Test the data transmission task without any integrated channel.
"""
user = factories.UserFactory(username='john_doe')
factories.EnterpriseCustomerFactory(
catalog=1,
name='Veridian Dynamics',
)
# Remove all integrated channels
SAPSuccessFactorsEnterpriseCustomerConfiguration.objects.all().delete()
DegreedEnterpriseCustomerConfiguration.objects.all().delete()
with LogCapture(level=logging.INFO) as log_capture:
call_command('transmit_course_metadata', '--catalog_user', user.username)
# Because there are no IntegratedChannels, the process will end early.
assert not log_capture.records