def test_process_source_and_namespace_args_name_error(
self,
command,
parser,
source_one_active
):
args = Args(
source_id=None,
source_name=source_one_active,
namespace=None
)
with pytest.raises(FakeParserError) as e:
command.process_source_and_namespace_args(args, parser)
assert e
assert e.value.args
assert "--namespace must be provided" in e.value.args[0]
python类raises()的实例源码
def test_ensure_messages_published_fails_when_overpublished(
self, topic, messages, producer, topic_offsets
):
for message in messages:
producer.publish(message)
producer.flush()
with pytest.raises(
PublicationUnensurableError
), mock.patch.object(
data_pipeline.producer,
'logger'
) as mock_logger:
producer.ensure_messages_published(messages[:2], topic_offsets)
self._assert_logged_info_correct(
mock_logger,
len(messages),
topic,
topic_offsets,
message_count=len(messages[:2])
)
def test_publish_fails_after_retry(self, message, producer):
# TODO(DATAPIPE-606|clin) investigate better way than mocking response
with mock.patch.object(
producer._kafka_producer.kafka_client,
'send_produce_request',
side_effect=[FailedPayloadsError]
) as mock_send_request, capture_new_messages(
message.topic
) as get_messages, pytest.raises(
MaxRetryError
):
orig_topic_to_offset_map = self.get_orig_topic_to_offset_map(producer)
producer.publish(message)
producer.flush()
messages = get_messages()
assert len(messages) == 0
assert mock_send_request.call_count == self.max_retry_count
self.assert_new_topic_to_offset_map(
producer,
message.topic,
orig_topic_to_offset_map,
published_message_count=0
)
def test_move(tmpdir, storages):
f = tmpdir.join('alpha')
f_http = Resource(storages['http']('/LICENSE'))
f_file = Resource(storages['file'](f.strpath))
f_sftp = Resource(storages['sftp']('/gamma'))
f_ftp = Resource(storages['ftp']('/beta'))
assert f_http.exists()
delete_files(f_file, f_sftp, f_ftp)
assert not f_file.exists()\
and not f_sftp.exists()\
and not f_ftp.exists()
with pytest.raises(transfert.exceptions.ReadOnlyResourceException):
transfert.actions.move(f_http, f_ftp, size=100)
assert f_ftp.exists() and f_http.exists()
transfert.actions.move(f_ftp, f_sftp, size=100)
assert not f_ftp.exists() and f_sftp.exists()
transfert.actions.move(f_sftp, f_file, size=100)
assert not f_sftp.exists() and f_file.exists()
def noTableFoundExceptionQueries():
noTable = [
{
'input': 'Quel est le nom des reservation ?', # No table name found in sentence!
'database': DATABASE_PATH + 'ecole.sql',
'language': LANG_PATH + 'french.csv'
}
]
for test in noTable:
with pytest.raises(Exception):
Ln2sql(test['database'], test['language']).get_query(test['input'])
def noLinkForJoinFoundExceptionQueries():
noLink = [
{
'input': "Quel est le professeur qui enseigne la matière SVT ?",
# There is at least column `matiere` that is unreachable from table `PROFESSEUR`!
'database': DATABASE_PATH + 'ecole.sql',
'language': LANG_PATH + 'french.csv'
},
{
'input': "compte les salle des élève",
# There is at least column `salle` that is unreachable from table `ELEVE`!
'database': DATABASE_PATH + 'ecole.sql',
'language': LANG_PATH + 'french.csv'
}
]
for test in noLink:
with pytest.raises(Exception):
Ln2sql(test['database'], test['language']).get_query(test['input'])
test_worst_report.py 文件源码
项目:django-performance-testing
作者: PaesslerAG
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def test_report_can_deal_with_single_anonymous_result_not_with_more():
report = WorstReport()
report.handle_results_collected(
signal=None, sender=WithId('foo'),
results=[9], context={})
assert list(report.data.keys()) == ['foo']
assert get_value_and_context(report, 'foo')[0] == 9
with pytest.raises(TypeError) as excinfo:
report.handle_results_collected(
signal=None, sender=WithId('foo'),
results=[1, 2], context={})
assert 'Duplicate result name(s): \'\'' == str(excinfo.value)
assert list(report.data.keys()) == ['foo']
assert get_value_and_context(report, 'foo')[0] == 9
def test_can_limit_elapsed_seconds(seconds):
with freeze_time('2016-09-22 15:57:01') as frozen_time:
with pytest.raises(LimitViolationError) as excinfo:
with TimeLimit(total=0):
frozen_time.tick(timedelta(seconds=seconds))
assert excinfo.value.base_error_msg == \
'Too many ({}) total elapsed seconds (limit: 0)'.format(seconds)
test_core_infrastructure.py 文件源码
项目:django-performance-testing
作者: PaesslerAG
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def test_signals_all_listeners_reports_first_failure(self, collector_cls):
collector = collector_cls()
class MyException(Exception):
pass
def get_mock_listener():
m = Mock()
m.return_value = None
return m
first_attached_handler = get_mock_listener()
def second_handler_that_will_fail(*a, **kw):
raise MyException('foo')
last_attached_handler = get_mock_listener()
listeners = [
first_attached_handler, second_handler_that_will_fail,
last_attached_handler]
for l in listeners:
results_collected.connect(l)
try:
with pytest.raises(MyException) as excinfo:
with collector:
pass
assert str(excinfo.value).endswith('foo')
method_name_in_stacktrace = 'second_handler_that_will_fail'
assert method_name_in_stacktrace in str(excinfo.value)
assert first_attached_handler.called
assert last_attached_handler.called
finally:
for l in listeners:
results_collected.disconnect(l)
test_core_infrastructure.py 文件源码
项目:django-performance-testing
作者: PaesslerAG
项目源码
文件源码
阅读 26
收藏 0
点赞 0
评论 0
def test_signal_handler_error_doesnt_hide_orig_error(self, collector_cls):
collector = collector_cls()
failing_signal_handler = Mock(side_effect=Exception('handler error'))
results_collected.connect(failing_signal_handler)
try:
with pytest.raises(Exception) as excinfo:
with collector:
raise Exception('actual code error')
assert str(excinfo.value) == 'actual code error'
assert failing_signal_handler.called
finally:
results_collected.disconnect(failing_signal_handler)
test_core_infrastructure.py 文件源码
项目:django-performance-testing
作者: PaesslerAG
项目源码
文件源码
阅读 24
收藏 0
点赞 0
评论 0
def test_when_above_the_limit_there_is_an_error(
self, limit_cls_and_name, limit, value):
limit_cls, name = limit_cls_and_name
assert limit < value, 'test pre-req'
limit_obj = limit_cls(**{name: limit})
result = NameValueResult(name, value)
with pytest.raises(LimitViolationError) as excinfo:
limit_obj.handle_results(
results=[result], context=None)
assert excinfo.value.limit_obj == limit_obj
assert excinfo.value.result == result
assert excinfo.value.actual == str(value)
assert not excinfo.value.context
test_core_infrastructure.py 文件源码
项目:django-performance-testing
作者: PaesslerAG
项目源码
文件源码
阅读 26
收藏 0
点赞 0
评论 0
def test_without_collector_id_cannot_be_settings_based(self, limit_cls):
with pytest.raises(TypeError) as excinfo:
limit_cls(settings_based=True)
assert 'Can only be settings based when collector_id is provided.' == \
str(excinfo.value)
test_template_limit_blocks.py 文件源码
项目:django-performance-testing
作者: PaesslerAG
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def test_limits_can_be_used_as_template_tags(sample_template):
with freeze_time('2016-11-02 10:33:00') as frozen_time:
too_slow = SlowRendering(frozen_time=frozen_time, render_in_seconds=5)
too_slow_context = template.context.Context(
{'slow_rendering': too_slow})
with pytest.raises(LimitViolationError) as excinfo:
rendered_content = sample_template.render(too_slow_context)
print(rendered_content) # to get debug info
assert excinfo.value.actual == '5.0'
assert excinfo.value.limit == 3
test_core_constructing_a_registry.py 文件源码
项目:django-performance-testing
作者: PaesslerAG
项目源码
文件源码
阅读 29
收藏 0
点赞 0
评论 0
def test_cannot_construct_from_list_of_duplicates(dotted_paths):
with pytest.raises(DuplicateNamesError):
UniqueNamedClassRegistry(dotted_paths)
def test_fails_when_class_has_no_such_method_as_to_wrap(wrapper):
with pytest.raises(AttributeError):
wrapper(
cls_to_wrap=object, method_to_wrap_name='no_such_method',
context_manager=None, is_cls_method=False)
with pytest.raises(AttributeError):
wrapper(
cls_to_wrap=object, method_to_wrap_name='no_such_method',
context_manager=None, is_cls_method=True)
test_querycount_limits.py 文件源码
项目:django-performance-testing
作者: PaesslerAG
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def test_integration_test_with_db(db):
with pytest.raises(LimitViolationError) as excinfo:
with override_current_context() as ctx:
with QueryBatchLimit(total=2) as limit:
ctx.enter(key='some', value='context')
list(Group.objects.all())
Group.objects.update(name='bar')
Group.objects.create(name='group')
assert excinfo.value.context == {'some': ['context']}
assert excinfo.value.limit_obj == limit
assert excinfo.value.limit == 2
assert excinfo.value.name == 'total'
test_integrates_with_template_rendering.py 文件源码
项目:django-performance-testing
作者: PaesslerAG
项目源码
文件源码
阅读 29
收藏 0
点赞 0
评论 0
def test_has_support_for_number_of_queries_in_templates(db, settings):
settings.PERFORMANCE_LIMITS = {
'Template.render': {
'queries': {
'total': 0
}
}
}
template = loader.get_template('all-group-names.markdown')
with pytest.raises(LimitViolationError) as excinfo:
template.render(context={'groups': Group.objects.all()})
assert excinfo.value.context == {'template': ['all-group-names.markdown']}
test_core_context.py 文件源码
项目:django-performance-testing
作者: PaesslerAG
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def test_exiting_a_not_entered_context_is_an_error(self):
ctx = Context()
with pytest.raises(ValueError) as excinfo:
ctx.exit(key='no such key', value='some value')
assert str(excinfo.value) == \
'cannot exit not entered context - key {!r} mismatch'.format(
'no such key')
ctx.enter(key='key', value='enter value')
with pytest.raises(ValueError) as excinfo:
ctx.exit(key='key', value='exit value')
assert str(excinfo.value) == \
'cannot exit not entered context - value mismatch ' \
'(exit: {!r}, enter: {!r})'.format(
'exit value', 'enter value')
def test_required_config(config):
with pytest.raises(TypeError) as excinfo:
Fake.from_config(**config)
message = excinfo.value.args[0]
assert 'missing required config keys' in message
assert 'from Fake' in message