def test_powerview_add_resource_normal_user(self):
'''
Calling powerview add resource for a normal user raises NotAuthorized.
'''
a_user = factories.User()
dataset = factories.Dataset()
r1 = factories.Resource(package_id=dataset['id'])
powerview = powerview_factories.PowerView(user=a_user)
context = {'user': a_user['name'], 'model': model}
nosetools.assert_raises(NotAuthorized, helpers.call_auth,
'ckanext_powerview_add_resource',
context=context,
powerview_id=powerview['id'],
resource_id=r1['id'])
python类assert_raises()的实例源码
def test_powerview_add_resource_anon_user_allow_create(self):
'''
Calling powerview add resource for an anon user still raises
NotAuthorized.
'''
dataset = factories.Dataset()
r1 = factories.Resource(package_id=dataset['id'])
powerview = powerview_factories.PowerView()
context = {'user': '', 'model': model}
nosetools.assert_raises(NotAuthorized, helpers.call_auth,
'ckanext_powerview_add_resource',
context=context,
powerview_id=powerview['id'],
resource_id=r1['id'])
def test_powerview_remove_resource_normal_user(self):
'''
Calling powerview remove resource for a normal user raises
NotAuthorized.
'''
a_user = factories.User()
dataset = factories.Dataset()
r1 = factories.Resource(package_id=dataset['id'])
powerview = powerview_factories.PowerView(user=a_user,
resources=[r1['id']])
context = {'user': a_user['name'], 'model': model}
nosetools.assert_raises(NotAuthorized, helpers.call_auth,
'ckanext_powerview_remove_resource',
context=context,
powerview_id=powerview['id'],
resource_id=r1['id'])
def test_powerview_remove_resource_anon_user(self):
'''
Calling powerview remove resource for an anon user raises
NotAuthorized.
'''
dataset = factories.Dataset()
r1 = factories.Resource(package_id=dataset['id'])
powerview = powerview_factories.PowerView(resources=[r1['id']])
context = {'user': '', 'model': model}
nosetools.assert_raises(NotAuthorized, helpers.call_auth,
'ckanext_powerview_remove_resource',
context=context,
powerview_id=powerview['id'],
resource_id=r1['id'])
def test_powerview_remove_resource_anon_user_allow_create(self):
'''
Calling powerview remove resource for an anon user still raises
NotAuthorized.
'''
dataset = factories.Dataset()
r1 = factories.Resource(package_id=dataset['id'])
powerview = powerview_factories.PowerView(resources=[r1['id']])
context = {'user': '', 'model': model}
nosetools.assert_raises(NotAuthorized, helpers.call_auth,
'ckanext_powerview_remove_resource',
context=context,
powerview_id=powerview['id'],
resource_id=r1['id'])
def test_powerview_resource_list_normal_user_private_resources(self):
'''
Calling powerview resource list with a normal user on a powerview
containing private resources raises NotAuthorized.
'''
a_user = factories.User()
org = factories.Organization()
dataset = factories.Dataset(owner_org=org['id'],
private="true")
r1 = factories.Resource(package_id=dataset['id'])
r2 = factories.Resource(package_id=dataset['id'])
powerview = powerview_factories.PowerView(private='no',
resources=[r1['id'],
r2['id']])
context = {'user': a_user['name'], 'model': model}
nosetools.assert_raises(NotAuthorized, helpers.call_auth,
'ckanext_powerview_resource_list',
context=context,
id=powerview['id'])
def test_powerview_show_with_private_resource_not_authorized(self):
'''Calling powerview_show will raise NotAuthorized if powerview
contains a private resource for user.'''
org = Organization()
user = User()
dataset = Dataset(owner_org=org['id'], private="true")
r1 = Resource(package_id=dataset['id'])
r2 = Resource(package_id=dataset['id'])
p1 = factories.PowerView(private=False,
resources=[r1['id'],
r2['id']])
powerview_show_action = toolkit.get_action('powerview_show')
context = {'user': user['name']}
nosetools.assert_raises(toolkit.NotAuthorized, powerview_show_action,
context=context, data_dict={'id': p1['id']})
def test_powerview_remove_resource_unassociated_resource(self):
'''Calling powerview_remove_resource with an unassociated resource id
raises an error.'''
sysadmin = Sysadmin()
r1 = Resource()
r2 = Resource()
powerview_dict = factories.PowerView(resources=[r1['id']])
with nosetools.assert_raises(ValidationError):
toolkit.get_action('powerview_remove_resource')(
context={'user': sysadmin['name']},
data_dict={
'id': powerview_dict['id'],
'resource_id': r2['id']
}
)
def test_engine_execute_errors(self):
# ensures that SQL errors are reported
with assert_raises(ProgrammingError):
with self.connection() as conn:
conn.execute('SELECT * FROM a_wrong_table').fetchall()
traces = self.tracer.writer.pop_traces()
# trace composition
eq_(len(traces), 1)
eq_(len(traces[0]), 1)
span = traces[0][0]
# span fields
eq_(span.name, '{}.query'.format(self.VENDOR))
eq_(span.service, self.SERVICE)
eq_(span.resource, 'SELECT * FROM a_wrong_table')
eq_(span.get_tag('sql.db'), self.SQL_DB)
ok_(span.get_tag('sql.rows') is None)
self.check_meta(span)
eq_(span.span_type, 'sql')
ok_(span.duration > 0)
# check the error
eq_(span.error, 1)
eq_(span.get_tag('error.type'), 'mysql.connector.errors.ProgrammingError')
ok_("Table 'test.a_wrong_table' doesn't exist" in span.get_tag('error.msg'))
ok_("Table 'test.a_wrong_table' doesn't exist" in span.get_tag('error.stack'))
def test_engine_execute_errors(self):
# ensures that SQL errors are reported
with assert_raises(ProgrammingError):
with self.connection() as conn:
conn.execute('SELECT * FROM a_wrong_table').fetchall()
traces = self.tracer.writer.pop_traces()
# trace composition
eq_(len(traces), 1)
eq_(len(traces[0]), 1)
span = traces[0][0]
# span fields
eq_(span.name, '{}.query'.format(self.VENDOR))
eq_(span.service, self.SERVICE)
eq_(span.resource, 'SELECT * FROM a_wrong_table')
eq_(span.get_tag('sql.db'), self.SQL_DB)
ok_(span.get_tag('sql.rows') is None)
self.check_meta(span)
eq_(span.span_type, 'sql')
ok_(span.duration > 0)
# check the error
eq_(span.error, 1)
ok_('relation "a_wrong_table" does not exist' in span.get_tag('error.msg'))
ok_('ProgrammingError' in span.get_tag('error.type'))
ok_('ProgrammingError: relation "a_wrong_table" does not exist' in span.get_tag('error.stack'))
def test_engine_execute_errors(self):
# ensures that SQL errors are reported
with assert_raises(OperationalError):
with self.connection() as conn:
conn.execute('SELECT * FROM a_wrong_table').fetchall()
traces = self.tracer.writer.pop_traces()
# trace composition
eq_(len(traces), 1)
eq_(len(traces[0]), 1)
span = traces[0][0]
# span fields
eq_(span.name, '{}.query'.format(self.VENDOR))
eq_(span.service, self.SERVICE)
eq_(span.resource, 'SELECT * FROM a_wrong_table')
eq_(span.get_tag('sql.db'), self.SQL_DB)
ok_(span.get_tag('sql.rows') is None)
eq_(span.span_type, 'sql')
ok_(span.duration > 0)
# check the error
eq_(span.error, 1)
eq_(span.get_tag('error.msg'), 'no such table: a_wrong_table')
ok_('OperationalError' in span.get_tag('error.type'))
ok_('OperationalError: no such table: a_wrong_table' in span.get_tag('error.stack'))
def test_wrapper_incr_safety(self):
# get the default cache
cache = caches['default']
# it should fail not because of our wrapper
with assert_raises(ValueError) as ex:
cache.incr('missing_key')
# the error is not caused by our tracer
eq_(ex.exception.args[0], "Key 'missing_key' not found")
# an error trace must be sent
spans = self.tracer.writer.pop()
eq_(len(spans), 2)
span = spans[0]
eq_(span.resource, 'incr')
eq_(span.name, 'django.cache')
eq_(span.span_type, 'cache')
eq_(span.error, 1)
def test_wrapper_decr_safety(self):
# get the default cache
cache = caches['default']
# it should fail not because of our wrapper
with assert_raises(ValueError) as ex:
cache.decr('missing_key')
# the error is not caused by our tracer
eq_(ex.exception.args[0], "Key 'missing_key' not found")
# an error trace must be sent
spans = self.tracer.writer.pop()
eq_(len(spans), 3)
span = spans[0]
eq_(span.resource, 'decr')
eq_(span.name, 'django.cache')
eq_(span.span_type, 'cache')
eq_(span.error, 1)
def test_template_renderer_exception(self):
# it should trace the Template exceptions generation even outside web handlers
t = template.Template('{% module ModuleThatDoesNotExist() %}')
with assert_raises(NameError):
t.generate()
traces = self.tracer.writer.pop_traces()
eq_(1, len(traces))
eq_(1, len(traces[0]))
template_span = traces[0][0]
eq_('tornado-web', template_span.service)
eq_('tornado.template', template_span.name)
eq_('template', template_span.span_type)
eq_('render_string', template_span.resource)
eq_('render_string', template_span.get_tag('tornado.template_name'))
eq_(1, template_span.error)
ok_('is not defined' in template_span.get_tag('error.msg'))
ok_('NameError' in template_span.get_tag('error.stack'))
def test_resource_delete_editor(self):
'''Normally organization admins can delete resources
Our plugin prevents this by blocking delete organization.
Ensure the delete button is not displayed (as only resource delete
is checked for showing this)
'''
user = factories.User()
owner_org = factories.Organization(
users=[{'name': user['id'], 'capacity': 'admin'}]
)
dataset = factories.Dataset(owner_org=owner_org['id'])
resource = factories.Resource(package_id=dataset['id'])
with assert_raises(logic.NotAuthorized) as e:
logic.check_access('resource_delete', {'user': user['name']}, {'id': resource['id']})
assert_equal(e.exception.message, 'User %s not authorized to delete resource %s' % (user['name'], resource['id']))
def test_remote_groups_only_local(self):
# Create an existing group
Group(id='group1-id', name='group1')
config = {'remote_groups': 'only_local'}
results_by_guid = run_harvest(
url='http://localhost:%s' % mock_ckan.PORT,
harvester=CKANHarvester(),
config=json.dumps(config))
assert 'dataset1-id' in results_by_guid
# Check that the dataset was added to the existing local group
dataset = call_action('package_show', {}, id=mock_ckan.DATASETS[0]['id'])
assert_equal(dataset['groups'][0]['id'], mock_ckan.DATASETS[0]['groups'][0]['id'])
# Check that the other remote group was not created locally
assert_raises(toolkit.ObjectNotFound, call_action, 'group_show', {},
id='remote-group')
def test_extension():
# Debugging information for failures of this test
print('sys.path:')
for p in sys.path:
print(' ', p)
print('CWD', os.getcwd())
nt.assert_raises(ImportError, _ip.magic, "load_ext daft_extension")
daft_path = os.path.join(os.path.dirname(__file__), "daft_extension")
sys.path.insert(0, daft_path)
try:
_ip.user_ns.pop('arq', None)
invalidate_caches() # Clear import caches
_ip.magic("load_ext daft_extension")
nt.assert_equal(_ip.user_ns['arq'], 185)
_ip.magic("unload_ext daft_extension")
assert 'arq' not in _ip.user_ns
finally:
sys.path.remove(daft_path)
def test_pop_string():
f = PlainTextFormatter()
type_str = '%s.%s' % (C.__module__, 'C')
with nt.assert_raises(KeyError):
f.pop(type_str)
f.for_type(type_str, foo_printer)
f.pop(type_str)
with nt.assert_raises(KeyError):
f.lookup_by_type(C)
with nt.assert_raises(KeyError):
f.pop(type_str)
f.for_type(C, foo_printer)
nt.assert_is(f.pop(type_str, None), foo_printer)
with nt.assert_raises(KeyError):
f.lookup_by_type(C)
with nt.assert_raises(KeyError):
f.pop(type_str)
nt.assert_is(f.pop(type_str, None), None)
def eval_formatter_check(f):
ns = dict(n=12, pi=math.pi, stuff='hello there', os=os, u=u"café", b="café")
s = f.format("{n} {n//4} {stuff.split()[0]}", **ns)
nt.assert_equal(s, "12 3 hello")
s = f.format(' '.join(['{n//%i}'%i for i in range(1,8)]), **ns)
nt.assert_equal(s, "12 6 4 3 2 2 1")
s = f.format('{[n//i for i in range(1,8)]}', **ns)
nt.assert_equal(s, "[12, 6, 4, 3, 2, 2, 1]")
s = f.format("{stuff!s}", **ns)
nt.assert_equal(s, ns['stuff'])
s = f.format("{stuff!r}", **ns)
nt.assert_equal(s, repr(ns['stuff']))
# Check with unicode:
s = f.format("{u}", **ns)
nt.assert_equal(s, ns['u'])
# This decodes in a platform dependent manner, but it shouldn't error out
s = f.format("{b}", **ns)
nt.assert_raises(NameError, f.format, '{dne}', **ns)
def test_get_py_filename():
os.chdir(TMP_TEST_DIR)
with make_tempfile('foo.py'):
nt.assert_equal(path.get_py_filename('foo.py'), 'foo.py')
nt.assert_equal(path.get_py_filename('foo'), 'foo.py')
with make_tempfile('foo'):
nt.assert_equal(path.get_py_filename('foo'), 'foo')
nt.assert_raises(IOError, path.get_py_filename, 'foo.py')
nt.assert_raises(IOError, path.get_py_filename, 'foo')
nt.assert_raises(IOError, path.get_py_filename, 'foo.py')
true_fn = 'foo with spaces.py'
with make_tempfile(true_fn):
nt.assert_equal(path.get_py_filename('foo with spaces'), true_fn)
nt.assert_equal(path.get_py_filename('foo with spaces.py'), true_fn)
nt.assert_raises(IOError, path.get_py_filename, '"foo with spaces.py"')
nt.assert_raises(IOError, path.get_py_filename, "'foo with spaces.py'")