def noKeywordFoundExceptionQueries():
noKeyword = [
{
'input': 'Affiche moi.', # No keyword found in sentence!
'database': DATABASE_PATH + 'ecole.sql',
'language': LANG_PATH + 'french.csv'
},
{
'input': 'Affiche moi les étudiants', # No keyword found in sentence!
'database': DATABASE_PATH + 'ecole.sql',
'language': LANG_PATH + 'french.csv'
}
]
for test in noKeyword:
with pytest.raises(Exception):
Ln2sql(test['database'], test['language']).get_query(test['input'])
python类raises()的实例源码
def test_hooks_are_run_even_if_there_was_an_exception(wrapper):
class Bar(object):
def will_fail(self):
assert ctx.before_call_count == 1
raise Exception('heh')
bar = Bar()
ctx = ControllableContextManager()
wrapper(
cls_to_wrap=Bar, method_to_wrap_name='will_fail', context_manager=ctx,
is_cls_method=False)
with pytest.raises(Exception) as excinfo:
bar.will_fail()
assert str(excinfo.value) == 'heh'
assert ctx.before_call_count == 1
assert ctx.after_call_count == 1
def test_when_inner_enter_fails(self):
ControllableContextManager.reset_events()
outer = ControllableContextManager()
inner = ControllableContextManager(fail_in_method='__enter__')
with pytest.raises(ControllableContextManager.TestException):
self.run_nested_context_managers(outer, inner)
expected_calls = self.get_recorded_calls()
assert expected_calls == [
(outer, '__enter__'), (inner, '__enter__'), (outer, '__exit__')
]
ControllableContextManager.reset_events()
with pytest.raises(ControllableContextManager.TestException):
with multi_context_manager([outer, inner]):
pass
assert expected_calls == self.get_recorded_calls()
def test_when_outer_enter_fails(self):
ControllableContextManager.reset_events()
outer = ControllableContextManager(fail_in_method='__enter__')
inner = ControllableContextManager()
with pytest.raises(ControllableContextManager.TestException):
self.run_nested_context_managers(outer, inner)
expected_calls = self.get_recorded_calls()
assert expected_calls == [(outer, '__enter__')]
ControllableContextManager.reset_events()
with pytest.raises(ControllableContextManager.TestException):
with multi_context_manager([outer, inner]):
pass
assert expected_calls == self.get_recorded_calls()
def test_when_code_inside_context_managers_fails(self):
ControllableContextManager.reset_events()
outer = ControllableContextManager()
inner = ControllableContextManager()
def fail():
raise NotImplementedError('hi')
with pytest.raises(NotImplementedError):
self.run_nested_context_managers(outer, inner, fn=fail)
expected_calls = self.get_recorded_calls()
assert expected_calls == [
(outer, '__enter__'), (inner, '__enter__'),
(inner, '__exit__'), (outer, '__exit__')
]
ControllableContextManager.reset_events()
with pytest.raises(NotImplementedError):
with multi_context_manager([outer, inner]):
fail()
assert expected_calls == self.get_recorded_calls()
def test_three_managers_middle_one_fails(self):
""" this is like raising the LimitViolationError """
ControllableContextManager.reset_events()
outer = ControllableContextManager()
middle = ControllableContextManager(fail_in_method='__exit__')
inner = ControllableContextManager()
ControllableContextManager.reset_events()
with pytest.raises(ControllableContextManager.TestException):
with outer:
with middle:
with inner:
pass
expected_calls = self.get_recorded_calls()
assert expected_calls == [
(outer, '__enter__'), (middle, '__enter__'), (inner, '__enter__'),
(inner, '__exit__'), (middle, '__exit__'), (outer, '__exit__')
]
ControllableContextManager.reset_events()
with pytest.raises(ControllableContextManager.TestException):
with multi_context_manager([outer, middle, inner]):
pass
assert expected_calls == self.get_recorded_calls()
test_querycount_limits.py 文件源码
项目:django-performance-testing
作者: PaesslerAG
项目源码
文件源码
阅读 29
收藏 0
点赞 0
评论 0
def test_can_specify_typed_limits(db):
with pytest.raises(LimitViolationError) as excinfo:
with QueryBatchLimit(write=0, read=3):
list(Group.objects.all())
list(Group.objects.all())
list(Group.objects.all())
Group.objects.update(name='foo')
with pytest.raises(LimitViolationError) as excinfo:
with QueryBatchLimit(read=0):
list(Group.objects.all())
assert excinfo.value.context == {}
assert excinfo.value.actual == '1'
assert excinfo.value.limit == 0
assert excinfo.value.name == 'read'
with pytest.raises(LimitViolationError) as excinfo:
with QueryBatchLimit(write=1):
Group.objects.update(name='baz')
Group.objects.update(name='name')
assert excinfo.value.context == {}
assert excinfo.value.actual == '2'
assert excinfo.value.limit == 1
assert excinfo.value.name == 'write'
test_integrates_with_template_rendering.py 文件源码
项目:django-performance-testing
作者: PaesslerAG
项目源码
文件源码
阅读 27
收藏 0
点赞 0
评论 0
def test_has_support_for_elapsed_time_in_template_render(settings):
settings.PERFORMANCE_LIMITS = {
'Template.render': {
'time': {
'total': 0
}
}
}
template = loader.get_template('all-group-names.markdown')
with freeze_time('2016-09-29 15:52:01') as frozen_time:
class SlowIterable(object):
def __iter__(self):
yield 'foo'
frozen_time.tick(timedelta(seconds=5))
yield 'bar'
with pytest.raises(LimitViolationError) as excinfo:
template.render(context={'groups': SlowIterable()})
assert excinfo.value.context == {'template': ['all-group-names.markdown']}
test_integrates_with_django_test_client.py 文件源码
项目:django-performance-testing
作者: PaesslerAG
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def test_can_specify_limits_through_settings_for_django_test_client(
db, settings, client, method, limit, value, cfg_key, items_name,
view_ctx):
settings.PERFORMANCE_LIMITS = {
'django.test.client.Client': {
cfg_key: {
'total': limit
}
}
}
with view_ctx(value=value) as vctx:
with pytest.raises(LimitViolationError) as excinfo:
vctx.request(getattr(client, method.lower()))
assert excinfo.value.context == {
'Client.request': ['{method} {url}'.format(
url=vctx.url, method=method)]}
assert excinfo.value.items_name == items_name, \
excinfo.value.base_error_msg
def test_cli_rs3topng():
"""conversion to PNG on the commandline"""
temp_png = tempfile.NamedTemporaryFile(suffix='.png', delete=False)
temp_png.close()
# calling `rstviewer -f png input.rs3 output.png` will end the program
# with sys.exit(0), so we'll have to catch this here.
with pytest.raises(SystemExit) as serr:
cli(['-f', 'png', RS3_FILEPATH, temp_png.name])
out, err = pytest.capsys.readouterr()
assert err == 0
with open(temp_png.name, 'r') as png_file:
png_str = png_file.read()
os.unlink(temp_png.name)
# generated images might not be 100% identical, probably
# because of the font used
with open(EXPECTED_PNG1, 'r') as expected_png_file:
ident1 = png_str == expected_png_file.read()
with open(EXPECTED_PNG2, 'r') as expected_png_file:
ident2 = png_str == expected_png_file.read()
assert ident1 or ident2
def test_abstract_methods_required():
class Broken(Service):
"""Missing abstract methods."""
with pytest.raises(TypeError):
Broken()
def test__parse__(self):
ng.load_netlist(netlists_path + 'dc_ac_check.net')
with pytest.raises(KeyError):
ng.run_dc(src='v1', start=0, stop=1, fstep=.1)
with pytest.raises(ValueError):
ng.run_dc(src='v1', start=0, stop=1)
with pytest.raises(ValueError):
ng.run_dc(src='v1', start=0, stop=1, step=.1,
start2=0, stop2=1, step2=.1)
with pytest.raises(ValueError):
ng.run_dc(src='v1', start=0, stop=1, step=.1,
src2='v2', start2=0, stop2=1)
with pytest.raises(ValueError):
ng.run_dc('v1 0 1 0 ')
with pytest.raises(ValueError):
ng.run_dc('v1 0 1 1m v2 0 1 0')
def test_sync_manager(manager):
manager.sync = True
@manager.task
def foo(a, b):
foo.called = True
return a + b
assert foo.push(1, 2).ready().value == 3
assert foo.called
with pytest.raises(KeyError):
manager.process(make_task('boo'))
@manager.task
def bad():
raise ZeroDivisionError()
with pytest.raises(ZeroDivisionError):
bad.push()
def test_insufficient_numpy_write_data(self, x_series_device, seed):
# Reset the pseudorandom number generator with seed.
random.seed(seed)
# Randomly select physical channels to test.
number_of_channels = random.randint(
2, len(x_series_device.ao_physical_chans))
channels_to_test = random.sample(
x_series_device.ao_physical_chans, number_of_channels)
with nidaqmx.Task() as task:
task.ao_channels.add_ao_voltage_chan(
flatten_channel_string([c.name for c in channels_to_test]),
max_val=10, min_val=-10)
number_of_samples = random.randint(1, number_of_channels - 1)
values_to_test = numpy.float64([
random.uniform(-10, 10) for _ in range(number_of_samples)])
with pytest.raises(DaqError) as e:
task.write(values_to_test, auto_start=True)
assert e.value.error_code == -200524
def test_float_property(self, x_series_device):
with nidaqmx.Task() as task:
ai_channel = task.ai_channels.add_ai_voltage_chan(
x_series_device.ai_physical_chans[0].name, max_val=5)
# Test property default value.
assert ai_channel.ai_max == 5
# Test property setter and getter.
max_value = 10
ai_channel.ai_max = max_value
assert ai_channel.ai_max == max_value
# Test property deleter. Reading this property will throw an
# error after being reset.
del ai_channel.ai_max
with pytest.raises(DaqError) as e:
read_value = ai_channel.ai_max
assert e.value.error_code == -200695
def test_simple_scaling():
Quantity.set_prefs(spacer=None, show_label=None, label_fmt=None, label_fmt_full=None)
q=Quantity('1kg')
assert q.render() == '1 kg'
assert q.render(scale=0.001, show_units=False) == '1'
with pytest.raises(KeyError, message="Unable to convert between 'fuzz' and 'g'."):
q.render(scale='fuzz')
q=Quantity('1', units='g', scale=1000)
assert q.render() == '1 kg'
assert q.render(scale=(0.0022046, 'lbs')) == '2.2046 lbs'
q=Quantity('1', scale=(1000, 'g'))
assert q.render() == '1 kg'
assert q.render(scale=lambda v, u: (0.0022046*v, 'lbs')) == '2.2046 lbs'
def dB(v, u):
return 20*math.log(v, 10), 'dB'+u
def adB(v, u):
return pow(10, v/20), u[2:] if u.startswith('dB') else u
q=Quantity('-40 dBV', scale=adB)
assert q.render() == '10 mV'
assert q.render(scale=dB) == '-40 dBV'
def test_screenshot_matches__raises_when_images_are_different_sizes(selenium, tmpdir):
setup_selenium_session(selenium, tmpdir)
screenshot_matches(selenium, 'testing-size')
selenium.set_window_size(1200, 900)
selenium.get('./tests/fixtures/page1-changed.html')
selenium.find_element_by_tag_name('body').text.index("It has a second paragraph.")
with pytest.raises(exceptions.ScreenshotMismatch):
assert screenshot_matches(selenium, 'testing-size')
captured_path = os.path.join(settings['PDIFF_PATH'], 'testing-size.captured.png')
pdiff_path = os.path.join(settings['PDIFF_PATH'], 'testing-size.diff.png')
assert os.path.exists(captured_path)
assert os.path.exists(pdiff_path) is False
def test_value_write_failure_invalid_type():
"""
Try writing a value not of type bytes. Should raise an exception.
"""
ins = InnerStash({})
with pytest.raises(SSValueError):
ins.write_value(['hello','world'],3)
with pytest.raises(SSValueError):
ins.write_value(['hello','world'],"A string!")
# Try writing a dummy class instance:
class MyClass:
pass
with pytest.raises(SSValueError):
ins.write_value(['hello','world'],MyClass())
def test_values_tree_structure():
"""
Write various values to the tree. Make sure results are as expected
"""
ins = InnerStash({})
ins.write_value(['a','b','c'],b'abc')
ins.write_value(['a','b'],b'ab')
ins.write_value(['a','b','d'],b'abd')
ins.write_value(['a','b','d','e'],b'abde')
assert ins.read_value(['a','b','c']) == b'abc'
assert ins.read_value(['a','b']) == b'ab'
assert ins.read_value(['a','b','d']) == b'abd'
assert ins.read_value(['a','b','d','e']) == b'abde'
# The key ['a'] was not assigned a value:
with pytest.raises(SSKeyError):
ins.read_value(['a'])
def test_get_children():
ins = InnerStash({})
ins.write_value(['a','b','c'],b'abc')
ins.write_value(['a','b'],b'ab')
ins.write_value(['a','b','d'],b'abd')
ins.write_value(['a','b','d','e'],b'abde')
ins.write_value(['a','b','f'],b'abf')
assert ins.get_children(['a']) == ['b']
assert sorted(ins.get_children(['a','b'])) == sorted(['c','d','f'])
assert ins.get_children(['a','b','d','e']) == []
assert ins.get_children(['a','b','d']) == ['e']
# Empty key:
assert ins.get_children([]) == ['a']
# Try nonexistent key:
with pytest.raises(SSKeyError):
ins.get_children(['r','q'])
def test_wrong_password(tmp_file_path):
"""
Test the initialization of a CryptoStash (When stash file doesn't exist)
"""
# Initialization:
cs = CryptoStash(tmp_file_path,'my_password',default_num_iterations=1000)
my_store = {'1':2, '3':4}
cs.write_store({1:2,3:4})
# Try to load Crypto Stash with the wrong password:
with pytest.raises(SSCryptoError):
cs = CryptoStash(tmp_file_path,'my_password2',
default_num_iterations=1000)
# Try to load Crypto Stash with the correct password.
# Should work correctly:
cs = CryptoStash(tmp_file_path,'my_password',default_num_iterations=1000)
cs.read_store()
def test_rowmethod():
session = Session()
with pytest.raises(ValueError) as e:
ModelStub.my_row_method(0, 1, 2)
c, s, a = ModelStub.my_row_method(0, session, 1)
assert c is 0
assert s is session
assert a == 1
i = ModelStub()
c, s, a = i.my_row_method(session, 2)
assert c is i
assert s is session
assert a == 2
def test_juju_error_in_results_list(event_loop):
'''
Replicate the code that caused
https://github.com/juju/python-libjuju/issues/67, and verify that
we get a JujuError instead of passing silently by the failure.
(We don't raise a JujuAPIError, because the request isn't
completely invalid -- it's just passing a tag that doesn't exist.)
This also verifies that we will raise a JujuError any time there
is an error in one of a list of results.
'''
from juju.errors import JujuError
from juju.client import client
async with base.CleanModel() as model:
ann_facade = client.AnnotationsFacade.from_connection(model.connection())
ann = client.EntityAnnotations(
entity='badtag',
annotations={'gui-x': '1', 'gui-y': '1'},
)
with pytest.raises(JujuError):
return await ann_facade.Set([ann])
def test_out_of_bounds(self):
"""Test the CameraOutOfBounds exception through
testing through the default camera behavior movement.
1. Create a camera
2. Create a rectangle whose topleft is out-of-bounds
of the Camera source surface.
3. Assert exception is raised!
"""
camera = Camera((800, 600), (1080, 1050), (300, 300))
out_of_bounds_coord = (2000, 2000)
out_of_bounds_rect = pygame.Rect(out_of_bounds_coord, [32, 32])
with pytest.raises(CameraOutOfBounds):
camera.scroll_to(out_of_bounds_rect)
camera.update_state(28974329)
def _assert_failure_with_guarantees(
self,
name,
expected_cluster=None,
expected_database=None,
expected_environment=None,
expected_suffixes=None
):
with pytest.raises(ValueError) as e:
DBSourcedNamespace.create_from_namespace_name_with_guarantees(
name,
expected_environment=expected_environment,
expected_cluster=expected_cluster,
expected_database=expected_database,
expected_suffixes=expected_suffixes
)
assert "impossible to rectify" in e
def test_exceed_max_retry_count(
self,
number_sequence_func,
exp_backoff_with_max_retry_count_policy
):
with pytest.raises(MaxRetryError) as e, mock.patch.object(
exp_backoff_with_max_retry_count_policy.backoff_policy,
'next_backoff_delay',
return_value=0.1
) as next_backoff_delay_spy:
retry_on_condition(
retry_policy=exp_backoff_with_max_retry_count_policy,
retry_conditions=[Predicate(self.always_true)],
func_to_retry=number_sequence_func
)
assert number_sequence_func.call_count == self.max_retry_count + 1
assert e.value.last_result == 4
assert next_backoff_delay_spy.call_count == self.max_retry_count
def test_refresh_queue_no_job(
self,
priority_refresh_queue,
fake_source_name,
refresh_result
):
assert priority_refresh_queue.peek() == {}
with pytest.raises(EmptyQueueError) as e:
priority_refresh_queue.pop(fake_source_name)
assert fake_source_name in e.value.message
priority_refresh_queue.add_refreshes_to_queue([refresh_result])
assert priority_refresh_queue.pop(fake_source_name) == refresh_result
with pytest.raises(EmptyQueueError) as e:
priority_refresh_queue.pop(fake_source_name)
assert fake_source_name in e.value.message
def test_missing_name_error(
self,
schema_check_command,
parser,
source_name
):
with pytest.raises(FakeParserError) as e:
schema_check_command.process_args(
self._create_fake_args(
schema="{}",
source_name=source_name
),
parser
)
assert e.value.args
assert "--namespace must be provided when given a source name as source identifier" in e.value.args[0]
def test_avro_schema_with_no_namespace(
self,
register_command,
parser,
source_name,
schema_str
):
args = self._create_fake_args(
source_name=source_name,
namespace=None,
avro_schema=schema_str
)
with pytest.raises(FakeParserError) as e:
register_command.run(args, parser)
assert e.value.args
assert "--namespace must be provided" in e.value.args[0]
def _assert_schema_equals_schema_dict(
self,
topic_schema,
schema_obj,
schema_dict,
topic_to_check=None
):
assert topic_schema.schema_id == schema_dict['schema_id']
assert topic_schema.base_schema_id == schema_dict['base_schema_id']
assert topic_schema.primary_keys == schema_dict['primary_keys']
assert topic_schema.note == schema_dict['note']
assert schema_obj.schema_json == schema_dict['schema_json']
assert schema_obj.status == schema_dict['status']
if topic_to_check:
self._assert_topic_equals_topic_dict(
topic=topic_to_check,
topic_dict=schema_dict['topic'],
namespace_name=topic_to_check.source.namespace.name,
source_name=topic_to_check.source.name,
is_active=False,
include_kafka_info=False
)
else:
with pytest.raises(KeyError):
schema_dict['topic']