def test_harvest_job_create_as_sysadmin(self):
source = factories.HarvestSource(**SOURCE_DICT.copy())
site_user = toolkit.get_action('get_site_user')(
{'model': model, 'ignore_auth': True}, {})['name']
data_dict = {
'source_id': source['id'],
'run': True
}
job = toolkit.get_action('harvest_job_create')(
{'user': site_user}, data_dict)
assert_equal(job['source_id'], source['id'])
assert_equal(job['status'], 'Running')
assert_equal(job['gather_started'], None)
assert_in('stats', job.keys())
python类assert_in()的实例源码
def test_harvest_job_create_as_admin(self):
# as if an admin user presses 'refresh'
user = ckan_factories.User()
user['capacity'] = 'admin'
org = ckan_factories.Organization(users=[user])
source_dict = dict(SOURCE_DICT.items() +
[('publisher_id', org['id'])])
source = factories.HarvestSource(**source_dict)
data_dict = {
'source_id': source['id'],
'run': True
}
job = toolkit.get_action('harvest_job_create')(
{'user': user['name']}, data_dict)
assert_equal(job['source_id'], source['id'])
assert_equal(job['status'], 'Running')
assert_equal(job['gather_started'], None)
assert_in('stats', job.keys())
def test_store_restore():
ip.user_ns['foo'] = 78
ip.magic('alias bar echo "hello"')
tmpd = tempfile.mkdtemp()
ip.magic('cd ' + tmpd)
ip.magic('store foo')
ip.magic('store bar')
# Check storing
nt.assert_equal(ip.db['autorestore/foo'], 78)
nt.assert_in('bar', ip.db['stored_aliases'])
# Remove those items
ip.user_ns.pop('foo', None)
ip.alias_manager.undefine_alias('bar')
ip.magic('cd -')
ip.user_ns['_dh'][:] = []
# Check restoring
ip.magic('store -r')
nt.assert_equal(ip.user_ns['foo'], 78)
assert ip.alias_manager.is_alias('bar')
nt.assert_in(os.path.realpath(tmpd), ip.user_ns['_dh'])
os.rmdir(tmpd)
def test_set_matplotlib_formats():
from matplotlib.figure import Figure
formatters = get_ipython().display_formatter.formatters
for formats in [
('png',),
('pdf', 'svg'),
('jpeg', 'retina', 'png'),
(),
]:
active_mimes = {_fmt_mime_map[fmt] for fmt in formats}
display.set_matplotlib_formats(*formats)
for mime, f in formatters.items():
if mime in active_mimes:
nt.assert_in(Figure, f)
else:
nt.assert_not_in(Figure, f)
def test_run_tb():
"""Test traceback offset in %run"""
with TemporaryDirectory() as td:
path = pjoin(td, 'foo.py')
with open(path, 'w') as f:
f.write('\n'.join([
"def foo():",
" return bar()",
"def bar():",
" raise RuntimeError('hello!')",
"foo()",
]))
with capture_output() as io:
_ip.magic('run {}'.format(path))
out = io.stdout
nt.assert_not_in("execfile", out)
nt.assert_in("RuntimeError", out)
nt.assert_equal(out.count("---->"), 3)
def test_script_tb():
"""Test traceback offset in `ipython script.py`"""
with TemporaryDirectory() as td:
path = pjoin(td, 'foo.py')
with open(path, 'w') as f:
f.write('\n'.join([
"def foo():",
" return bar()",
"def bar():",
" raise RuntimeError('hello!')",
"foo()",
]))
out, err = tt.ipexec(path)
nt.assert_not_in("execfile", out)
nt.assert_in("RuntimeError", out)
nt.assert_equal(out.count("---->"), 3)
def test_line_cell_magics():
from IPython.core.magic import register_line_cell_magic
@register_line_cell_magic
def _bar_cellm(line, cell):
pass
ip = get_ipython()
c = ip.Completer
# The policy here is trickier, see comments in completion code. The
# returned values depend on whether the user passes %% or not explicitly,
# and this will show a difference if the same name is both a line and cell
# magic.
s, matches = c.complete(None, '_bar_ce')
nt.assert_in('%_bar_cellm', matches)
nt.assert_in('%%_bar_cellm', matches)
s, matches = c.complete(None, '%_bar_ce')
nt.assert_in('%_bar_cellm', matches)
nt.assert_in('%%_bar_cellm', matches)
s, matches = c.complete(None, '%%_bar_ce')
nt.assert_not_in('%_bar_cellm', matches)
nt.assert_in('%%_bar_cellm', matches)
def test_magic_color():
ip = get_ipython()
c = ip.Completer
s, matches = c.complete(None, 'colo')
nt.assert_in('%colors', matches)
s, matches = c.complete(None, 'colo')
nt.assert_not_in('NoColor', matches)
s, matches = c.complete(None, 'colors ')
nt.assert_in('NoColor', matches)
s, matches = c.complete(None, '%colors ')
nt.assert_in('NoColor', matches)
s, matches = c.complete(None, 'colors NoCo')
nt.assert_list_equal(['NoColor'], matches)
s, matches = c.complete(None, '%colors NoCo')
nt.assert_list_equal(['NoColor'], matches)
def test_dict_key_completion_unicode_py3():
"""Test handling of unicode in dict key completion"""
ip = get_ipython()
complete = ip.Completer.complete
ip.user_ns['d'] = {u'a\u05d0': None}
# query using escape
if sys.platform != 'win32':
# Known failure on Windows
_, matches = complete(line_buffer="d['a\\u05d0")
nt.assert_in("u05d0", matches) # tokenized after \\
# query using character
_, matches = complete(line_buffer="d['a\u05d0")
nt.assert_in(u"a\u05d0", matches)
with greedy_completion():
# query using escape
_, matches = complete(line_buffer="d['a\\u05d0")
nt.assert_in("d['a\\u05d0']", matches) # tokenized after \\
# query using character
_, matches = complete(line_buffer="d['a\u05d0")
nt.assert_in(u"d['a\u05d0']", matches)
def test_struct_array_key_completion():
"""Test dict key completion applies to numpy struct arrays"""
import numpy
ip = get_ipython()
complete = ip.Completer.complete
ip.user_ns['d'] = numpy.array([], dtype=[('hello', 'f'), ('world', 'f')])
_, matches = complete(line_buffer="d['")
nt.assert_in("hello", matches)
nt.assert_in("world", matches)
# complete on the numpy struct itself
dt = numpy.dtype([('my_head', [('my_dt', '>u4'), ('my_df', '>u4')]),
('my_data', '>f4', 5)])
x = numpy.zeros(2, dtype=dt)
ip.user_ns['d'] = x[1]
_, matches = complete(line_buffer="d['")
nt.assert_in("my_head", matches)
nt.assert_in("my_data", matches)
# complete on a nested level
with greedy_completion():
ip.user_ns['d'] = numpy.zeros(2, dtype=dt)
_, matches = complete(line_buffer="d[1]['my_head']['")
nt.assert_true(any(["my_dt" in m for m in matches]))
nt.assert_true(any(["my_df" in m for m in matches]))
def test_inline_twice(self):
"Using '%matplotlib inline' twice should not reset formatters"
ip = self.Shell()
gui, backend = ip.enable_matplotlib('inline')
nt.assert_equal(gui, 'inline')
fmts = {'png'}
active_mimes = {_fmt_mime_map[fmt] for fmt in fmts}
pt.select_figure_formats(ip, fmts)
gui, backend = ip.enable_matplotlib('inline')
nt.assert_equal(gui, 'inline')
for mime, f in ip.display_formatter.formatters.items():
if mime in active_mimes:
nt.assert_in(Figure, f)
else:
nt.assert_not_in(Figure, f)
def test_property_sources():
import zlib
class A(object):
@property
def foo(self):
return 'bar'
foo = foo.setter(lambda self, v: setattr(self, 'bar', v))
id = property(id)
compress = property(zlib.compress)
i = inspector.info(A.foo, detail_level=1)
nt.assert_in('def foo(self):', i['source'])
nt.assert_in('lambda self, v:', i['source'])
i = inspector.info(A.id, detail_level=1)
nt.assert_in('fget = <function id>', i['source'])
i = inspector.info(A.compress, detail_level=1)
nt.assert_in('fget = <function zlib.compress>', i['source'])
def test_var_expand_local(self):
"""Test local variable expansion in !system and %magic calls"""
# !system
ip.run_cell('def test():\n'
' lvar = "ttt"\n'
' ret = !echo {lvar}\n'
' return ret[0]\n')
res = ip.user_ns['test']()
nt.assert_in('ttt', res)
# %magic
ip.run_cell('def makemacro():\n'
' macroname = "macro_var_expand_locals"\n'
' %macro {macroname} codestr\n')
ip.user_ns['codestr'] = "str(12)"
ip.run_cell('makemacro()')
nt.assert_in('macro_var_expand_locals', ip.user_ns)
def test_file_amend():
"""%%file -a amends files"""
ip = get_ipython()
with TemporaryDirectory() as td:
fname = os.path.join(td, 'file2')
ip.run_cell_magic("file", fname, u'\n'.join([
'line1',
'line2',
]))
ip.run_cell_magic("file", "-a %s" % fname, u'\n'.join([
'line3',
'line4',
]))
with open(fname) as f:
s = f.read()
nt.assert_in('line1\n', s)
nt.assert_in('line3\n', s)
def test_save():
"""Test %save."""
ip = get_ipython()
ip.history_manager.reset() # Clear any existing history.
cmds = [u"a=1", u"def b():\n return a**2", u"print(a, b())"]
for i, cmd in enumerate(cmds, start=1):
ip.history_manager.store_inputs(i, cmd)
with TemporaryDirectory() as tmpdir:
file = os.path.join(tmpdir, "testsave.py")
ip.run_line_magic("save", "%s 1-10" % file)
with open(file) as f:
content = f.read()
nt.assert_equal(content.count(cmds[0]), 1)
nt.assert_in('coding: utf-8', content)
ip.run_line_magic("save", "-a %s 1-10" % file)
with open(file) as f:
content = f.read()
nt.assert_equal(content.count(cmds[0]), 2)
nt.assert_in('coding: utf-8', content)
def test_lookup_by_type_string():
f = PlainTextFormatter()
type_str = '%s.%s' % (C.__module__, 'C')
f.for_type(type_str, foo_printer)
# verify insertion
nt.assert_in(_mod_name_key(C), f.deferred_printers)
nt.assert_not_in(C, f.type_printers)
nt.assert_is(f.lookup_by_type(type_str), foo_printer)
# lookup by string doesn't cause import
nt.assert_in(_mod_name_key(C), f.deferred_printers)
nt.assert_not_in(C, f.type_printers)
nt.assert_is(f.lookup_by_type(C), foo_printer)
# should move from deferred to imported dict
nt.assert_not_in(_mod_name_key(C), f.deferred_printers)
nt.assert_in(C, f.type_printers)
def test_get_ipython_dir_3():
"""test_get_ipython_dir_3, move XDG if defined, and .ipython doesn't exist."""
tmphome = TemporaryDirectory()
try:
with patch_get_home_dir(tmphome.name), \
patch('os.name', 'posix'), \
modified_env({
'IPYTHON_DIR': None,
'IPYTHONDIR': None,
'XDG_CONFIG_HOME': XDG_TEST_DIR,
}), warnings.catch_warnings(record=True) as w:
ipdir = paths.get_ipython_dir()
nt.assert_equal(ipdir, os.path.join(tmphome.name, ".ipython"))
if sys.platform != 'darwin':
nt.assert_equal(len(w), 1)
nt.assert_in('Moving', str(w[0]))
finally:
tmphome.cleanup()
def test_ipython_embed():
"""test that `IPython.embed()` works"""
with NamedFileInTemporaryDirectory('file_with_embed.py') as f:
f.write(_sample_embed)
f.flush()
f.close() # otherwise msft won't be able to read the file
# run `python file_with_embed.py`
cmd = [sys.executable, f.name]
env = os.environ.copy()
env['IPY_TEST_SIMPLE_PROMPT'] = '1'
p = subprocess.Popen(cmd, env=env, stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = p.communicate(_exit)
std = out.decode('UTF-8')
nt.assert_equal(p.returncode, 0)
nt.assert_in('3 . 14', std)
if os.name != 'nt':
# TODO: Fix up our different stdout references, see issue gh-14
nt.assert_in('IPython', std)
nt.assert_in('bye!', std)
def test_ipython_start_kernel_userns():
cmd = ('from IPython import start_kernel\n'
'ns = {"tre": 123}\n'
'start_kernel(user_ns=ns)')
with setup_kernel(cmd) as client:
msg_id = client.inspect('tre')
msg = client.get_shell_msg(block=True, timeout=TIMEOUT)
content = msg['content']
assert content['found']
text = content['data']['text/plain']
nt.assert_in(u'123', text)
# user_module should be an instance of DummyMod
msg_id = client.execute("usermod = get_ipython().user_module")
msg = client.get_shell_msg(block=True, timeout=TIMEOUT)
content = msg['content']
nt.assert_equal(content['status'], u'ok')
msg_id = client.inspect('usermod')
msg = client.get_shell_msg(block=True, timeout=TIMEOUT)
content = msg['content']
assert content['found']
text = content['data']['text/plain']
nt.assert_in(u'DummyMod', text)
def test_store_restore():
ip.user_ns['foo'] = 78
ip.magic('alias bar echo "hello"')
tmpd = tempfile.mkdtemp()
ip.magic('cd ' + tmpd)
ip.magic('store foo')
ip.magic('store bar')
# Check storing
nt.assert_equal(ip.db['autorestore/foo'], 78)
nt.assert_in('bar', ip.db['stored_aliases'])
# Remove those items
ip.user_ns.pop('foo', None)
ip.alias_manager.undefine_alias('bar')
ip.magic('cd -')
ip.user_ns['_dh'][:] = []
# Check restoring
ip.magic('store -r')
nt.assert_equal(ip.user_ns['foo'], 78)
assert ip.alias_manager.is_alias('bar')
nt.assert_in(os.path.realpath(tmpd), ip.user_ns['_dh'])
os.rmdir(tmpd)
def test_set_matplotlib_formats():
from matplotlib.figure import Figure
formatters = get_ipython().display_formatter.formatters
for formats in [
('png',),
('pdf', 'svg'),
('jpeg', 'retina', 'png'),
(),
]:
active_mimes = {_fmt_mime_map[fmt] for fmt in formats}
display.set_matplotlib_formats(*formats)
for mime, f in formatters.items():
if mime in active_mimes:
nt.assert_in(Figure, f)
else:
nt.assert_not_in(Figure, f)
def test_run_tb():
"""Test traceback offset in %run"""
with TemporaryDirectory() as td:
path = pjoin(td, 'foo.py')
with open(path, 'w') as f:
f.write('\n'.join([
"def foo():",
" return bar()",
"def bar():",
" raise RuntimeError('hello!')",
"foo()",
]))
with capture_output() as io:
_ip.magic('run {}'.format(path))
out = io.stdout
nt.assert_not_in("execfile", out)
nt.assert_in("RuntimeError", out)
nt.assert_equal(out.count("---->"), 3)
def test_script_tb():
"""Test traceback offset in `ipython script.py`"""
with TemporaryDirectory() as td:
path = pjoin(td, 'foo.py')
with open(path, 'w') as f:
f.write('\n'.join([
"def foo():",
" return bar()",
"def bar():",
" raise RuntimeError('hello!')",
"foo()",
]))
out, err = tt.ipexec(path)
nt.assert_not_in("execfile", out)
nt.assert_in("RuntimeError", out)
nt.assert_equal(out.count("---->"), 3)
def test_greedy_completions():
ip = get_ipython()
ip.ex('a=list(range(5))')
_,c = ip.complete('.',line='a[0].')
nt.assert_false('.real' in c,
"Shouldn't have completed on a[0]: %s"%c)
with greedy_completion():
def _(line, cursor_pos, expect, message):
_,c = ip.complete('.', line=line, cursor_pos=cursor_pos)
nt.assert_in(expect, c, message%c)
yield _, 'a[0].', 5, 'a[0].real', "Should have completed on a[0].: %s"
yield _, 'a[0].r', 6, 'a[0].real', "Should have completed on a[0].r: %s"
if sys.version_info > (3,4):
yield _, 'a[0].from_', 10, 'a[0].from_bytes', "Should have completed on a[0].from_: %s"
def test_line_cell_magics():
from IPython.core.magic import register_line_cell_magic
@register_line_cell_magic
def _bar_cellm(line, cell):
pass
ip = get_ipython()
c = ip.Completer
# The policy here is trickier, see comments in completion code. The
# returned values depend on whether the user passes %% or not explicitly,
# and this will show a difference if the same name is both a line and cell
# magic.
s, matches = c.complete(None, '_bar_ce')
nt.assert_in('%_bar_cellm', matches)
nt.assert_in('%%_bar_cellm', matches)
s, matches = c.complete(None, '%_bar_ce')
nt.assert_in('%_bar_cellm', matches)
nt.assert_in('%%_bar_cellm', matches)
s, matches = c.complete(None, '%%_bar_ce')
nt.assert_not_in('%_bar_cellm', matches)
nt.assert_in('%%_bar_cellm', matches)
def test_dict_key_completion_unicode_py3():
"""Test handling of unicode in dict key completion"""
ip = get_ipython()
complete = ip.Completer.complete
ip.user_ns['d'] = {u'a\u05d0': None}
# query using escape
if sys.platform != 'win32':
# Known failure on Windows
_, matches = complete(line_buffer="d['a\\u05d0")
nt.assert_in("u05d0", matches) # tokenized after \\
# query using character
_, matches = complete(line_buffer="d['a\u05d0")
nt.assert_in(u"a\u05d0", matches)
with greedy_completion():
# query using escape
_, matches = complete(line_buffer="d['a\\u05d0")
nt.assert_in("d['a\\u05d0']", matches) # tokenized after \\
# query using character
_, matches = complete(line_buffer="d['a\u05d0")
nt.assert_in(u"d['a\u05d0']", matches)
def test_property_sources():
import zlib
class A(object):
@property
def foo(self):
return 'bar'
foo = foo.setter(lambda self, v: setattr(self, 'bar', v))
id = property(id)
compress = property(zlib.compress)
i = inspector.info(A.foo, detail_level=1)
nt.assert_in('def foo(self):', i['source'])
nt.assert_in('lambda self, v:', i['source'])
i = inspector.info(A.id, detail_level=1)
nt.assert_in('fget = <function id>', i['source'])
i = inspector.info(A.compress, detail_level=1)
nt.assert_in('fget = <function zlib.compress>', i['source'])
def test_var_expand_local(self):
"""Test local variable expansion in !system and %magic calls"""
# !system
ip.run_cell('def test():\n'
' lvar = "ttt"\n'
' ret = !echo {lvar}\n'
' return ret[0]\n')
res = ip.user_ns['test']()
nt.assert_in('ttt', res)
# %magic
ip.run_cell('def makemacro():\n'
' macroname = "macro_var_expand_locals"\n'
' %macro {macroname} codestr\n')
ip.user_ns['codestr'] = "str(12)"
ip.run_cell('makemacro()')
nt.assert_in('macro_var_expand_locals', ip.user_ns)
def test_alias_magic():
"""Test %alias_magic."""
ip = get_ipython()
mm = ip.magics_manager
# Basic operation: both cell and line magics are created, if possible.
ip.run_line_magic('alias_magic', 'timeit_alias timeit')
nt.assert_in('timeit_alias', mm.magics['line'])
nt.assert_in('timeit_alias', mm.magics['cell'])
# --cell is specified, line magic not created.
ip.run_line_magic('alias_magic', '--cell timeit_cell_alias timeit')
nt.assert_not_in('timeit_cell_alias', mm.magics['line'])
nt.assert_in('timeit_cell_alias', mm.magics['cell'])
# Test that line alias is created successfully.
ip.run_line_magic('alias_magic', '--line env_alias env')
nt.assert_equal(ip.run_line_magic('env', ''),
ip.run_line_magic('env_alias', ''))
def test_save():
"""Test %save."""
ip = get_ipython()
ip.history_manager.reset() # Clear any existing history.
cmds = [u"a=1", u"def b():\n return a**2", u"print(a, b())"]
for i, cmd in enumerate(cmds, start=1):
ip.history_manager.store_inputs(i, cmd)
with TemporaryDirectory() as tmpdir:
file = os.path.join(tmpdir, "testsave.py")
ip.run_line_magic("save", "%s 1-10" % file)
with open(file) as f:
content = f.read()
nt.assert_equal(content.count(cmds[0]), 1)
nt.assert_in('coding: utf-8', content)
ip.run_line_magic("save", "-a %s 1-10" % file)
with open(file) as f:
content = f.read()
nt.assert_equal(content.count(cmds[0]), 2)
nt.assert_in('coding: utf-8', content)