def assert_dict_equal(d1, d2):
# check keys
nt.assert_is_instance(d1, dict)
nt.assert_is_instance(d2, dict)
nt.assert_set_equal(set(d1.keys()), set(d2.keys()))
for k in d1:
v1 = d1[k]
v2 = d2[k]
asserter = nt.assert_equal
if isinstance(v1, pd.DataFrame):
asserter = tm.assert_frame_equal
if isinstance(v1, pd.Series):
asserter = tm.assert_series_equal
try:
asserter(v1, v2)
except AssertionError:
raise AssertionError('{k} is not equal'.format(k=k))
python类assert_equal()的实例源码
def test_serialization(self):
env = TradingEnvironment()
pt = perf.PositionTracker(env.asset_finder)
pp = perf.PerformancePeriod(100, env.asset_finder)
pp.position_tracker = pt
p_string = dumps_with_persistent_ids(pp)
test = loads_with_persistent_ids(p_string, env=env)
correct = pp.__dict__.copy()
del correct['_position_tracker']
nt.assert_count_equal(test.__dict__.keys(), correct.keys())
equal_keys = list(correct.keys())
equal_keys.remove('_account_store')
equal_keys.remove('_portfolio_store')
for k in equal_keys:
nt.assert_equal(test.__dict__[k], correct[k])
def test_comparison_keys(self):
c_list = [
['class', 'ABD'],
['enum', 'ABDa'],
['class', 'abcd'],
]
ref_sort_idx = [0, 2, 1]
c_obj_list = []
for c_type, c_name in c_list:
c_obj_list.append(hpp2plantuml.hpp2plantuml.Container(
c_type, c_name))
c_obj_list.sort(key=lambda obj: obj.comparison_keys())
for i in range(len(c_list)):
nt.assert_equal(c_obj_list[i].get_name(),
c_list[ref_sort_idx[i]][1])
def test_main_function(self):
# List files
file_list = [os.path.join(test_fold, f) for f in self._input_files]
# Output to string
with io.StringIO() as io_stream:
sys.stdout = io_stream
hpp2plantuml.CreatePlantUMLFile(file_list)
io_stream.seek(0)
# Read string output, exclude final line return
output_str = io_stream.read()[:-1]
sys.stdout = sys.__stdout__
nt.assert_equal(self._diag_saved_ref, output_str)
# Output to file
output_fname = 'output.puml'
hpp2plantuml.CreatePlantUMLFile(file_list, output_fname)
output_fcontent = ''
with open(output_fname, 'rt') as fid:
output_fcontent = fid.read()
nt.assert_equal(self._diag_saved_ref, output_fcontent)
os.unlink(output_fname)
def _predict(mod, val):
mod.incache = True
val.reset()
true = val.gesture.copy()
segment = val.segment.copy()
val.reset()
assert np.all(true == val.gesture.copy())
assert np.all(segment == val.segment.copy())
out = mod.predict(val).asnumpy()
assert_equal(out.ndim, 2)
assert_equal(out.shape[1], mod.num_gesture)
pred = out.argmax(axis=1)
if mod.lstm:
cut = lambda a: np.hstack(a[begin:end + mod.num_channel - mod.data_shape_1]
for begin, end in utils.continuous_segments(segment))
true = cut(true)
segment = cut(segment)
assert_equal(true.shape, pred.shape)
return pred, true, segment
def _predict_proba(mod, val):
mod.incache = True
val.reset()
true = val.gesture.copy()
segment = val.segment.copy()
val.reset()
assert np.all(true == val.gesture.copy())
assert np.all(segment == val.segment.copy())
out = mod.predict(val).asnumpy()
# assert_equal(out.ndim, 2)
# assert_equal(out.shape[1], mod.num_gesture)
# pred = out.argmax(axis=1)
if mod.lstm:
cut = lambda a: np.hstack(a[begin:end + mod.num_channel - mod.data_shape_1]
for begin, end in utils.continuous_segments(segment))
true = cut(true)
segment = cut(segment)
# assert_equal(true.shape, pred.shape)
return out, true, segment
def test_powerview_update(self):
'''Updating with valid data_dict.'''
sysadmin = Sysadmin()
powerview_dict_create = factories.PowerView()
powerview_dict_update = toolkit.get_action('powerview_update')(
context={'user': sysadmin['name']},
data_dict=powerview_dict_create.copy()
)
# last_modified has changed
nosetools.assert_true(powerview_dict_create['last_modified'] is None)
nosetools.assert_true(powerview_dict_update['last_modified']
is not None)
# but it should be the only thing that changed
powerview_dict_update['last_modified'] = None
nosetools.assert_equal(powerview_dict_create, powerview_dict_update)
def test_powerview_update_resources_unchanged(self):
'''Updating a powerview containing resources, leaves them unchanged.'''
sysadmin = Sysadmin()
r1 = Resource()
r2 = Resource()
r3 = Resource()
resource_id_list = [r1['id'], r2['id'], r3['id']]
# powerview with resources
powerview_dict = factories.PowerView(resources=resource_id_list)
# Update dict with new title
powerview_dict['title'] = "New Title"
powerview_dict_update = toolkit.get_action('powerview_update')(
context={'user': sysadmin['name']},
data_dict=powerview_dict
)
nosetools.assert_equal(set(resource_id_list),
set(powerview_dict_update['resources']))
def test_powerview_update_resources_changed(self):
'''Updating a powerview's resources, returns expected list in dict.'''
sysadmin = Sysadmin()
r1 = Resource()
r2 = Resource()
r3 = Resource()
r4 = Resource()
resource_id_list = [r1['id'], r2['id'], r3['id']]
updated_resource_id_list = [r1['id'], r3['id'], r4['id']]
# powerview with resources
powerview_dict = factories.PowerView(resources=resource_id_list)
# Update dict with new resource list
powerview_dict['resources'] = updated_resource_id_list
powerview_dict_update = toolkit.get_action('powerview_update')(
context={'user': sysadmin['name']},
data_dict=powerview_dict
)
nosetools.assert_equal(set(updated_resource_id_list),
set(powerview_dict_update['resources']))
def test_powerview_show_with_private_resource_authorized(self):
'''Calling powerview_show will not raise NotAuthorized if powerview
contains a private resource for user who is authed to view.'''
user = User()
org = Organization(users=[{'name': user['name'],
'capacity': 'member'}])
dataset = Dataset(owner_org=org['id'], private="true")
r1 = Resource(package_id=dataset['id'])
r2 = Resource(package_id=dataset['id'])
p1 = factories.PowerView(private=False,
resources=[r1['id'],
r2['id']])
powerview_dict_show = toolkit.get_action('powerview_show')(
context={'user': user['name']},
data_dict={'id': p1['id']}
)
nosetools.assert_equal(powerview_dict_show, p1)
def test_powerview_list_offset_and_limit(self):
'''
Calling powerview_list with an offset and limit returns expected
results.
'''
# make some powerviews
for i in xrange(0, 20):
factories.PowerView(title='powerview_{0}'.format(i + 1),
private=False)
powerview_list = toolkit.get_action('powerview_list')(
data_dict={
'limit': 10,
'offset': 10
}
)
nosetools.assert_equal(len(powerview_list), 10)
pv_ids = [pv['title'] for pv in powerview_list]
for i in xrange(10, 20):
nosetools.assert_true('powerview_{0}'.format(i + 1) in pv_ids)
def test_powerview_list_private_powerview(self):
'''
Calling powerview_list by a normal user only returns public and
authorized powerviews.
'''
user_one = User()
user_two = User()
p1 = factories.PowerView(user=user_one, private=False)
p2 = factories.PowerView(user=user_one, private=True)
context = {'user': user_two['name']}
powerview_list = toolkit.get_action('powerview_list')(
context=context,
data_dict={'id': user_one['name']})
nosetools.assert_equal(len(powerview_list), 1)
nosetools.assert_true(p1 in powerview_list)
nosetools.assert_true(p2 not in powerview_list)
def test_powerview_list_private_powerview_authorized(self):
'''
Calling powerview_list by a normal user returns public and private
powerviews if they are the creator.
'''
user_one = User()
p1 = factories.PowerView(user=user_one, private=False)
p2 = factories.PowerView(user=user_one, private=True)
context = {'user': user_one['name']}
powerview_list = toolkit.get_action('powerview_list')(
context=context,
data_dict={'id': user_one['name']})
nosetools.assert_equal(len(powerview_list), 2)
nosetools.assert_true(p1 in powerview_list)
nosetools.assert_true(p2 in powerview_list)
def test_powerview_list_user_powerviews(self):
'''
Calling powerview_list only returns powerviews for the passed user id.
'''
user_one = User()
user_two = User()
user_three = User()
p1 = factories.PowerView(user=user_one, private=False)
p2 = factories.PowerView(user=user_two, private=False)
p3 = factories.PowerView(user=user_two, private=False)
context = {'user': user_three['name']}
powerview_list = toolkit.get_action('powerview_list')(
context=context,
data_dict={'id': user_two['name']})
nosetools.assert_equal(len(powerview_list), 2)
nosetools.assert_true(p1 not in powerview_list)
nosetools.assert_true(p2 in powerview_list)
nosetools.assert_true(p3 in powerview_list)
def test_powerview_delete(self):
'''Calling powerview delete with valid data_dict.'''
sysadmin = Sysadmin()
powerview_dict = factories.PowerView()
# one powerview
nosetools.assert_equal(PowerView.count(), 1)
toolkit.get_action('powerview_delete')(
context={'user': sysadmin['name']},
data_dict={'id': powerview_dict['id']}
)
# No powerview
nosetools.assert_equal(PowerView.count(), 0)
def test_powerview_remove_resource_valid(self):
sysadmin = Sysadmin()
r1 = Resource()
powerview_dict = factories.PowerView(resources=[r1['id']])
nosetools.assert_equal(PowerviewResourceAssociation.count(), 1)
toolkit.get_action('powerview_remove_resource')(
context={'user': sysadmin['name']},
data_dict={
'id': powerview_dict['id'],
'resource_id': r1['id']
}
)
nosetools.assert_equal(PowerviewResourceAssociation.count(), 0)
def test_powerview_remove_resource_retains_objects(self):
'''Calling powerview_remove_resource doesn't delete powerview or
resource.'''
sysadmin = Sysadmin()
r1 = Resource()
powerview_dict = factories.PowerView(resources=[r1['id']])
nosetools.assert_equal(PowerView.count(), 1)
nosetools.assert_equal(
model.meta.Session.query(model.Resource).count(), 1)
toolkit.get_action('powerview_remove_resource')(
context={'user': sysadmin['name']},
data_dict={
'id': powerview_dict['id'],
'resource_id': r1['id']
}
)
nosetools.assert_equal(PowerView.count(), 1)
nosetools.assert_equal(
model.meta.Session.query(model.Resource).count(), 1)
def test_powerview_create(self):
'''Creating a powerview returns a dict with expected values'''
sysadmin = Sysadmin()
powerview_result = toolkit.get_action('powerview_create')(
context={'user': sysadmin['name']},
data_dict=self._make_create_data_dict()
)
nosetools.assert_true(isinstance(powerview_result, dict))
nosetools.assert_true(powerview_result['title'] == 'Title')
nosetools.assert_true(powerview_result['description'] ==
'My short description.')
nosetools.assert_true(powerview_result['view_type'] == 'my-view-type')
nosetools.assert_true(isinstance(powerview_result['config'], dict))
nosetools.assert_true(powerview_result['private'])
# created_by field auto-populated
nosetools.assert_equal(powerview_result['created_by'], sysadmin['id'])
def test_powerview_create_resource_id_list_creates_associations(self):
'''If resource id list is provided, associations are create with
powerview.'''
sysadmin = Sysadmin()
r1 = Resource()
r2 = Resource()
r3 = Resource()
create_dict = self._make_create_data_dict()
create_dict['resources'] = [r1['id'], r2['id'], r3['id']]
toolkit.get_action('powerview_create')(
context={'user': sysadmin['name']},
data_dict=create_dict
)
nosetools.assert_equal(PowerviewResourceAssociation.count(), 3)
def test_powerview_create_resource_id_list_contains_duplicates(self):
'''Resource id list is de-duplicated before associations are
created.'''
sysadmin = Sysadmin()
r1 = Resource()
r2 = Resource()
create_dict = self._make_create_data_dict()
# r2 id added twice
create_dict['resources'] = [r1['id'], r2['id'], r2['id']]
toolkit.get_action('powerview_create')(
context={'user': sysadmin['name']},
data_dict=create_dict
)
nosetools.assert_equal(PowerviewResourceAssociation.count(), 2)
def test_add_ckan_admin_tab_two_routes(self):
'''
Add two different route/label pairs to ckan.admin_tabs.
'''
config = {}
toolkit.add_ckan_admin_tab(config, 'my_route_name', 'my_label')
toolkit.add_ckan_admin_tab(config, 'my_other_route_name', 'my_other_label')
expected_dict = {
'ckan.admin_tabs': {
'my_other_route_name': 'my_other_label',
'my_route_name': 'my_label'
}
}
nosetools.assert_equal(expected_dict, config)
def test_add_ckan_admin_tab_config_has_existing_admin_tabs(self):
'''
Config already has a ckan.admin_tabs option.
'''
config = {
'ckan.admin_tabs': {'my_existing_route': 'my_existing_label'}
}
toolkit.add_ckan_admin_tab(config, 'my_route_name', 'my_label')
toolkit.add_ckan_admin_tab(config, 'my_other_route_name', 'my_other_label')
expected_dict = {
'ckan.admin_tabs': {
'my_existing_route': 'my_existing_label',
'my_other_route_name': 'my_other_label',
'my_route_name': 'my_label'
}
}
nosetools.assert_equal(expected_dict, config)
def test_user_page_lists_users(self):
'''/users/ lists registered users'''
app = self._get_test_app()
factories.User(fullname='User One')
factories.User(fullname='User Two')
factories.User(fullname='User Three')
user_url = url_for(controller='user', action='index')
user_response = app.get(user_url, status=200)
user_response_html = BeautifulSoup(user_response.body)
user_list = user_response_html.select('ul.user-list li')
assert_equal(len(user_list), 3)
user_names = [u.text.strip() for u in user_list]
assert_true('User One' in user_names)
assert_true('User Two' in user_names)
assert_true('User Three' in user_names)
def test_user_page_doesnot_list_deleted_users(self):
'''/users/ doesn't list deleted users'''
app = self._get_test_app()
factories.User(fullname='User One', state='deleted')
factories.User(fullname='User Two')
factories.User(fullname='User Three')
user_url = url_for(controller='user', action='index')
user_response = app.get(user_url, status=200)
user_response_html = BeautifulSoup(user_response.body)
user_list = user_response_html.select('ul.user-list li')
assert_equal(len(user_list), 2)
user_names = [u.text.strip() for u in user_list]
assert_true('User One' not in user_names)
assert_true('User Two' in user_names)
assert_true('User Three' in user_names)
def test_store_restore():
ip.user_ns['foo'] = 78
ip.magic('alias bar echo "hello"')
tmpd = tempfile.mkdtemp()
ip.magic('cd ' + tmpd)
ip.magic('store foo')
ip.magic('store bar')
# Check storing
nt.assert_equal(ip.db['autorestore/foo'], 78)
nt.assert_in('bar', ip.db['stored_aliases'])
# Remove those items
ip.user_ns.pop('foo', None)
ip.alias_manager.undefine_alias('bar')
ip.magic('cd -')
ip.user_ns['_dh'][:] = []
# Check restoring
ip.magic('store -r')
nt.assert_equal(ip.user_ns['foo'], 78)
assert ip.alias_manager.is_alias('bar')
nt.assert_in(os.path.realpath(tmpd), ip.user_ns['_dh'])
os.rmdir(tmpd)
def test_geojson():
gj = display.GeoJSON(data={
"type": "Feature",
"geometry": {
"type": "Point",
"coordinates": [-81.327, 296.038]
},
"properties": {
"name": "Inca City"
}
},
url_template="http://s3-eu-west-1.amazonaws.com/whereonmars.cartodb.net/{basemap_id}/{z}/{x}/{y}.png",
layer_options={
"basemap_id": "celestia_mars-shaded-16k_global",
"attribution": "Celestia/praesepe",
"minZoom": 0,
"maxZoom": 18,
})
nt.assert_equal(u'<IPython.core.display.GeoJSON object>', str(gj))
def test_aggressive_namespace_cleanup(self):
"""Test that namespace cleanup is not too aggressive GH-238
Returning from another run magic deletes the namespace"""
# see ticket https://github.com/ipython/ipython/issues/238
with tt.TempFileMixin() as empty:
empty.mktmp('')
# On Windows, the filename will have \users in it, so we need to use the
# repr so that the \u becomes \\u.
src = ("ip = get_ipython()\n"
"for i in range(5):\n"
" try:\n"
" ip.magic(%r)\n"
" except NameError as e:\n"
" print(i)\n"
" break\n" % ('run ' + empty.fname))
self.mktmp(src)
_ip.magic('run %s' % self.fname)
_ip.run_cell('ip == get_ipython()')
nt.assert_equal(_ip.user_ns['i'], 4)
def test_run_tb():
"""Test traceback offset in %run"""
with TemporaryDirectory() as td:
path = pjoin(td, 'foo.py')
with open(path, 'w') as f:
f.write('\n'.join([
"def foo():",
" return bar()",
"def bar():",
" raise RuntimeError('hello!')",
"foo()",
]))
with capture_output() as io:
_ip.magic('run {}'.format(path))
out = io.stdout
nt.assert_not_in("execfile", out)
nt.assert_in("RuntimeError", out)
nt.assert_equal(out.count("---->"), 3)
def test_script_tb():
"""Test traceback offset in `ipython script.py`"""
with TemporaryDirectory() as td:
path = pjoin(td, 'foo.py')
with open(path, 'w') as f:
f.write('\n'.join([
"def foo():",
" return bar()",
"def bar():",
" raise RuntimeError('hello!')",
"foo()",
]))
out, err = tt.ipexec(path)
nt.assert_not_in("execfile", out)
nt.assert_in("RuntimeError", out)
nt.assert_equal(out.count("---->"), 3)
def test_latex_completions():
from IPython.core.latex_symbols import latex_symbols
import random
ip = get_ipython()
# Test some random unicode symbols
keys = random.sample(latex_symbols.keys(), 10)
for k in keys:
text, matches = ip.complete(k)
nt.assert_equal(len(matches),1)
nt.assert_equal(text, k)
nt.assert_equal(matches[0], latex_symbols[k])
# Test a more complex line
text, matches = ip.complete(u'print(\\alpha')
nt.assert_equal(text, u'\\alpha')
nt.assert_equal(matches[0], latex_symbols['\\alpha'])
# Test multiple matching latex symbols
text, matches = ip.complete(u'\\al')
nt.assert_in('\\alpha', matches)
nt.assert_in('\\aleph', matches)