def test_local_file_completions():
ip = get_ipython()
with TemporaryWorkingDirectory():
prefix = './foo'
suffixes = ['1', '2']
names = [prefix+s for s in suffixes]
for n in names:
open(n, 'w').close()
# Check simple completion
c = ip.complete(prefix)[1]
nt.assert_equal(c, names)
# Now check with a function call
cmd = 'a = f("%s' % prefix
c = ip.complete(prefix, cmd)[1]
comp = set(prefix+s for s in suffixes)
nt.assert_true(comp.issubset(set(c)))
python类assert_true()的实例源码
def test_rehashx():
# clear up everything
_ip.alias_manager.clear_aliases()
del _ip.db['syscmdlist']
_ip.magic('rehashx')
# Practically ALL ipython development systems will have more than 10 aliases
nt.assert_true(len(_ip.alias_manager.aliases) > 10)
for name, cmd in _ip.alias_manager.aliases:
# we must strip dots from alias names
nt.assert_not_in('.', name)
# rehashx must fill up syscmdlist
scoms = _ip.db['syscmdlist']
nt.assert_true(len(scoms) > 10)
def test_line_cell_info():
"""%%foo and %foo magics are distinguishable to inspect"""
ip = get_ipython()
ip.magics_manager.register(FooFoo)
oinfo = ip.object_inspect('foo')
nt.assert_true(oinfo['found'])
nt.assert_true(oinfo['ismagic'])
oinfo = ip.object_inspect('%%foo')
nt.assert_true(oinfo['found'])
nt.assert_true(oinfo['ismagic'])
nt.assert_equal(oinfo['docstring'], FooFoo.cell_foo.__doc__)
oinfo = ip.object_inspect('%foo')
nt.assert_true(oinfo['found'])
nt.assert_true(oinfo['ismagic'])
nt.assert_equal(oinfo['docstring'], FooFoo.line_foo.__doc__)
def test_command_chain_dispatcher_ff():
"""Test two failing hooks"""
fail1 = Fail(u'fail1')
fail2 = Fail(u'fail2')
dp = CommandChainDispatcher([(0, fail1),
(10, fail2)])
try:
dp()
except TryNext as e:
nt.assert_equal(str(e), u'fail2')
else:
assert False, "Expected exception was not raised."
nt.assert_true(fail1.called)
nt.assert_true(fail2.called)
def test_last_two_blanks():
nt.assert_false(isp.last_two_blanks(''))
nt.assert_false(isp.last_two_blanks('abc'))
nt.assert_false(isp.last_two_blanks('abc\n'))
nt.assert_false(isp.last_two_blanks('abc\n\na'))
nt.assert_false(isp.last_two_blanks('abc\n \n'))
nt.assert_false(isp.last_two_blanks('abc\n\n'))
nt.assert_true(isp.last_two_blanks('\n\n'))
nt.assert_true(isp.last_two_blanks('\n\n '))
nt.assert_true(isp.last_two_blanks('\n \n'))
nt.assert_true(isp.last_two_blanks('abc\n\n '))
nt.assert_true(isp.last_two_blanks('abc\n\n\n'))
nt.assert_true(isp.last_two_blanks('abc\n\n \n'))
nt.assert_true(isp.last_two_blanks('abc\n\n \n '))
nt.assert_true(isp.last_two_blanks('abc\n\n \n \n'))
nt.assert_true(isp.last_two_blanks('abc\nd\n\n\n'))
nt.assert_true(isp.last_two_blanks('abc\nd\ne\nf\n\n\n'))
def test_basic_class():
def type_pprint_wrapper(obj, p, cycle):
if obj is MyObj:
type_pprint_wrapper.called = True
return pretty._type_pprint(obj, p, cycle)
type_pprint_wrapper.called = False
stream = StringIO()
printer = pretty.RepresentationPrinter(stream)
printer.type_pprinters[type] = type_pprint_wrapper
printer.pretty(MyObj)
printer.flush()
output = stream.getvalue()
nt.assert_equal(output, '%s.MyObj' % __name__)
nt.assert_true(type_pprint_wrapper.called)
def test_deepreload():
"Test that dreload does deep reloads and skips excluded modules."
with TemporaryDirectory() as tmpdir:
with prepended_to_syspath(tmpdir):
with open(os.path.join(tmpdir, 'A.py'), 'w') as f:
f.write("class Object(object):\n pass\n")
with open(os.path.join(tmpdir, 'B.py'), 'w') as f:
f.write("import A\n")
import A
import B
# Test that A is not reloaded.
obj = A.Object()
dreload(B, exclude=['A'])
nt.assert_true(isinstance(obj, A.Object))
# Test that A is reloaded.
obj = A.Object()
dreload(B)
nt.assert_false(isinstance(obj, A.Object))
def test_correct_geometric_distortion_signal_type(self):
dp=self.signal
dp.correct_geometric_distortion(D=np.array([[1., 0., 0.],
[0., 1., 0.],
[0., 0., 1.]]))
nt.assert_true(isinstance(dp, ElectronDiffraction))
# def test_geometric_distortion_rotation_origin(self):
# dp = self.signal
# dp.correct_geometric_distortion()
# np.testing.assert_allclose(rp.data, np.array([[5., 4.25, 2.875,
# 1.7, 0.92857143, 0.],
# [5., 4.75, 3.625,
# 2.5, 1.71428571,0.6]]),
# atol=1e-3)
# def test_geometric_distortion(self):
# dp = self.signal
# rp = dp.get_radial_profile(centers=np.array([[4, 3], [4, 3]]))
# np.testing.assert_allclose(rp.data, np.array([[5., 4.25, 2.875,
# 1.7, 0.92857143, 0.],
# [5., 4.375, 3.5,
# 2.4, 2.07142857, 1.]]),
# atol=1e-3)
def _test_calculate_on_random_set(self, alpha, random_str):
"""
Test utils.entropy on random set generated from alphanumerics.
Note, this method presumes a uniform distribution from the
stringer.random_string method.
"""
STRING_LENGTH = len(random_str)
ALPHA_LEN = len(alpha)
ent_calc = self.ent.calculate(random_str)
p = 1/ALPHA_LEN
# this is the expected Shannon entropy for a uniform distribution
exp_ent = STRING_LENGTH * p * log2(p)
accepted_err = 10
# why the hell is this failing?
# assert_true(exp_ent-accepted_err <=
# ent_calc <=
# exp_ent+accepted_err)
def test_subprocess_error():
"""error in mp.Process doesn't crash"""
with new_kernel() as kc:
iopub = kc.iopub_channel
code = '\n'.join([
"import multiprocessing as mp",
"p = mp.Process(target=int, args=('hi',))",
"p.start()",
"p.join()",
])
msg_id, content = execute(kc=kc, code=code)
stdout, stderr = assemble_output(iopub)
nt.assert_equal(stdout, '')
nt.assert_true("ValueError" in stderr, stderr)
_check_master(kc, expected=True)
_check_master(kc, expected=True, stream="stderr")
# raw_input tests
def test_local_file_completions():
ip = get_ipython()
with TemporaryWorkingDirectory():
prefix = './foo'
suffixes = ['1', '2']
names = [prefix+s for s in suffixes]
for n in names:
open(n, 'w').close()
# Check simple completion
c = ip.complete(prefix)[1]
nt.assert_equal(c, names)
# Now check with a function call
cmd = 'a = f("%s' % prefix
c = ip.complete(prefix, cmd)[1]
comp = set(prefix+s for s in suffixes)
nt.assert_true(comp.issubset(set(c)))
def test_rehashx():
# clear up everything
_ip.alias_manager.clear_aliases()
del _ip.db['syscmdlist']
_ip.magic('rehashx')
# Practically ALL ipython development systems will have more than 10 aliases
nt.assert_true(len(_ip.alias_manager.aliases) > 10)
for name, cmd in _ip.alias_manager.aliases:
# we must strip dots from alias names
nt.assert_not_in('.', name)
# rehashx must fill up syscmdlist
scoms = _ip.db['syscmdlist']
nt.assert_true(len(scoms) > 10)
def test_line_cell_info():
"""%%foo and %foo magics are distinguishable to inspect"""
ip = get_ipython()
ip.magics_manager.register(FooFoo)
oinfo = ip.object_inspect('foo')
nt.assert_true(oinfo['found'])
nt.assert_true(oinfo['ismagic'])
oinfo = ip.object_inspect('%%foo')
nt.assert_true(oinfo['found'])
nt.assert_true(oinfo['ismagic'])
nt.assert_equal(oinfo['docstring'], FooFoo.cell_foo.__doc__)
oinfo = ip.object_inspect('%foo')
nt.assert_true(oinfo['found'])
nt.assert_true(oinfo['ismagic'])
nt.assert_equal(oinfo['docstring'], FooFoo.line_foo.__doc__)
def test_command_chain_dispatcher_ff():
"""Test two failing hooks"""
fail1 = Fail(u'fail1')
fail2 = Fail(u'fail2')
dp = CommandChainDispatcher([(0, fail1),
(10, fail2)])
try:
dp()
except TryNext as e:
nt.assert_equal(str(e), u'fail2')
else:
assert False, "Expected exception was not raised."
nt.assert_true(fail1.called)
nt.assert_true(fail2.called)
def test_last_two_blanks():
nt.assert_false(isp.last_two_blanks(''))
nt.assert_false(isp.last_two_blanks('abc'))
nt.assert_false(isp.last_two_blanks('abc\n'))
nt.assert_false(isp.last_two_blanks('abc\n\na'))
nt.assert_false(isp.last_two_blanks('abc\n \n'))
nt.assert_false(isp.last_two_blanks('abc\n\n'))
nt.assert_true(isp.last_two_blanks('\n\n'))
nt.assert_true(isp.last_two_blanks('\n\n '))
nt.assert_true(isp.last_two_blanks('\n \n'))
nt.assert_true(isp.last_two_blanks('abc\n\n '))
nt.assert_true(isp.last_two_blanks('abc\n\n\n'))
nt.assert_true(isp.last_two_blanks('abc\n\n \n'))
nt.assert_true(isp.last_two_blanks('abc\n\n \n '))
nt.assert_true(isp.last_two_blanks('abc\n\n \n \n'))
nt.assert_true(isp.last_two_blanks('abc\nd\n\n\n'))
nt.assert_true(isp.last_two_blanks('abc\nd\ne\nf\n\n\n'))
def test_basic_class():
def type_pprint_wrapper(obj, p, cycle):
if obj is MyObj:
type_pprint_wrapper.called = True
return pretty._type_pprint(obj, p, cycle)
type_pprint_wrapper.called = False
stream = StringIO()
printer = pretty.RepresentationPrinter(stream)
printer.type_pprinters[type] = type_pprint_wrapper
printer.pretty(MyObj)
printer.flush()
output = stream.getvalue()
nt.assert_equal(output, '%s.MyObj' % __name__)
nt.assert_true(type_pprint_wrapper.called)
# This is only run on Python 2 because in Python 3 the language prevents you
# from setting a non-unicode value for __qualname__ on a metaclass, and it
# doesn't respect the descriptor protocol if you subclass unicode and implement
# __get__.
def test_deepreload():
"Test that dreload does deep reloads and skips excluded modules."
with TemporaryDirectory() as tmpdir:
with prepended_to_syspath(tmpdir):
with open(os.path.join(tmpdir, 'A.py'), 'w') as f:
f.write("class Object(object):\n pass\n")
with open(os.path.join(tmpdir, 'B.py'), 'w') as f:
f.write("import A\n")
import A
import B
# Test that A is not reloaded.
obj = A.Object()
dreload(B, exclude=['A'])
nt.assert_true(isinstance(obj, A.Object))
# Test that A is reloaded.
obj = A.Object()
dreload(B)
nt.assert_false(isinstance(obj, A.Object))
def assign_labeled_network(api):
"""
Adds the labeled network to the cluster and asserts the hosts are attached
"""
engine = api.system_service()
labeled_net = engine.networks_service().list(
search='name={}'.format(LABELED_NET_NAME))[0]
# the logical network will be automatically assigned to all host network
# interfaces with that label asynchronously
cluster_service = test_utils.get_cluster_service(engine, CLUSTER_NAME)
nt.assert_true(
cluster_service.networks_service().add(labeled_net)
)
hosts_service = engine.hosts_service()
for host in test_utils.hosts_in_cluster_v4(engine, CLUSTER_NAME):
host_service = hosts_service.host_service(id=host.id)
testlib.assert_true_within_short(
functools.partial(_host_is_attached_to_network, engine,
host_service, LABELED_NET_NAME))
def add_filter(ovirt_api4):
engine = ovirt_api4.system_service()
nics_service = test_utils.get_nics_service(engine, VM0_NAME)
nic = nics_service.list()[0]
network = ovirt_api4.follow_link(nic.vnic_profile).network
network_filters_service = engine.network_filters_service()
network_filter = next(
network_filter for network_filter in network_filters_service.list()
if network_filter.name == NETWORK_FILTER_NAME
)
vnic_profiles_service = engine.vnic_profiles_service()
vnic_profile = vnic_profiles_service.add(
types.VnicProfile(
name='{}_profile'.format(network_filter.name),
network=network,
network_filter=network_filter
)
)
nic.vnic_profile = vnic_profile
nt.assert_true(
nics_service.nic_service(nic.id).update(nic)
)
def add_filter_parameter(prefix):
engine = prefix.virt_env.engine_vm()
ovirt_api4 = engine.get_api(api_ver=4)
vm_gw = '.'.join(engine.ip().split('.')[0:3] + ['1'])
network_filter_parameters_service = test_utils.get_network_fiter_parameters_service(
ovirt_api4.system_service(), VM0_NAME)
nt.assert_true(
network_filter_parameters_service.add(
types.NetworkFilterParameter(
name=NETWORK_FILTER_PARAMETER0_NAME,
value=NETWORK_FILTER_PARAMETER0_VALUE
)
)
)
nt.assert_true(
network_filter_parameters_service.add(
types.NetworkFilterParameter(
name=NETWORK_FILTER_PARAMETER1_NAME,
value=vm_gw
)
)
)