def test_dot_graph_no_filename(tmpdir, model):
ipydisp = pytest.importorskip('IPython.display')
# Map from format extension to expected return type.
result_types = {
'png': ipydisp.Image,
'jpeg': ipydisp.Image,
'dot': type(None),
'pdf': type(None),
'svg': ipydisp.SVG,
}
for format in result_types:
before = tmpdir.listdir()
result = dot_graph(model, filename=None, format=format)
# We shouldn't write any files if filename is None.
after = tmpdir.listdir()
assert before == after
assert isinstance(result, result_types[format])
python类importorskip()的实例源码
def test_filenames_and_formats(model):
ipydisp = pytest.importorskip('IPython.display')
# Test with a variety of user provided args
filenames = ['modelpdf', 'model.pdf', 'model.pdf', 'modelpdf',
'model.pdf.svg']
formats = ['svg', None, 'svg', None, None]
targets = ['modelpdf.svg', 'model.pdf', 'model.pdf.svg', 'modelpdf.png',
'model.pdf.svg']
result_types = {
'png': ipydisp.Image,
'jpeg': ipydisp.Image,
'dot': type(None),
'pdf': type(None),
'svg': ipydisp.SVG,
}
for filename, format, target in zip(filenames, formats, targets):
expected_result_type = result_types[target.split('.')[-1]]
result = dot_graph(model, filename=filename, format=format)
assert os.path.isfile(target)
assert isinstance(result, expected_result_type)
_ensure_not_exists(target)
def test_visualize(self, model):
pytest.importorskip('graphviz')
ipydisp = pytest.importorskip('IPython.display')
result = model.visualize()
assert isinstance(result, ipydisp.Image)
result = model.visualize(show_inputs=True)
assert isinstance(result, ipydisp.Image)
result = model.visualize(show_variables=True)
assert isinstance(result, ipydisp.Image)
result = model.visualize(
show_only_variable=('quantity', 'quantity'))
assert isinstance(result, ipydisp.Image)
def spawn(self, cmd, expect_timeout=10.0):
"""Run a command using pexpect.
The pexpect child is returned.
"""
pexpect = pytest.importorskip("pexpect", "3.0")
if hasattr(sys, 'pypy_version_info') and '64' in platform.machine():
pytest.skip("pypy-64 bit not supported")
if sys.platform == "darwin":
pytest.xfail("pexpect does not work reliably on darwin?!")
if sys.platform.startswith("freebsd"):
pytest.xfail("pexpect does not work reliably on freebsd")
logfile = self.tmpdir.join("spawn.out").open("wb")
child = pexpect.spawn(cmd, logfile=logfile)
self.request.addfinalizer(logfile.close)
child.timeout = expect_timeout
return child
def test_hdfs_home():
hdfs3 = pytest.importorskip('hdfs3')
hdfs = hdfs3.HDFileSystem()
d = '/tmp/test'
try:
hdfs.mkdir(d)
k = Knit(nn='localhost', rm='localhost', nn_port=8020, rm_port=8088,
replication_factor=1, hdfs_home=d)
env_zip = k.create_env(env_name='dev', packages=['python=2.7'], remove=True)
k.start('env', files=[env_zip], memory=128)
assert d + '/.knitDeps' in hdfs.ls(d, False)
assert d + "/.knitDeps/knit-1.0-SNAPSHOT.jar" in hdfs.ls(d + '/.knitDeps', False)
assert d + "/.knitDeps/dev.zip" in hdfs.ls(d + '/.knitDeps', False)
if not k.wait_for_completion(30):
k.kill()
finally:
hdfs.rm(d, True)
k.kill()
def test_load_empty_table_arrow(self, con, empty_table):
pd = pytest.importorskip("pandas")
pa = pytest.importorskip("pyarrow")
skip_if_no_arrow_loader(con)
data = [(1, 1.1, 'a'),
(2, 2.2, '2'),
(3, 3.3, '3')]
df = pd.DataFrame(data, columns=list('abc')).astype({
'a': 'int32',
'b': 'float32'
})
table = pa.Table.from_pandas(df, preserve_index=False)
con.load_table(empty_table, table, method='arrow')
result = sorted(con.execute("select * from {}".format(empty_table)))
self.check_empty_insert(result, data)
def test_load_columnar_pandas_all(self, con, all_types_table):
pd = pytest.importorskip("pandas")
import numpy as np
data = pd.DataFrame({
"boolean_": [True, False],
"smallint_": np.array([0, 1], dtype=np.int8),
"int_": np.array([0, 1], dtype=np.int32),
"bigint_": np.array([0, 1], dtype=np.int64),
"float_": np.array([0, 1], dtype=np.float32),
"double_": np.array([0, 1], dtype=np.float64),
"varchar_": ["a", "b"],
"text_": ['a', 'b'],
"time_": [datetime.time(0, 11, 59), datetime.time(13)],
"timestamp_": [pd.Timestamp("2016"), pd.Timestamp("2017")],
"date_": [datetime.date(2016, 1, 1), datetime.date(2017, 1, 1)],
}, columns=['boolean_', 'smallint_', 'int_', 'bigint_', 'float_',
'double_', 'varchar_', 'text_', 'time_', 'timestamp_',
'date_'])
con.load_table_columnar(all_types_table, data, preserve_index=False)
def test_load_table_columnar_arrow_all(self, con, all_types_table):
pa = pytest.importorskip("pyarrow")
skip_if_no_arrow_loader(con)
names = ['boolean_', 'smallint_', 'int_', 'bigint_',
'float_', 'double_', 'varchar_', 'text_',
'time_', 'timestamp_', 'date_']
columns = [pa.array([True, False, None], type=pa.bool_()),
pa.array([1, 0, None]).cast(pa.int16()),
pa.array([1, 0, None]).cast(pa.int32()),
pa.array([1, 0, None]),
pa.array([1.0, 1.1, None]).cast(pa.float32()),
pa.array([1.0, 1.1, None]),
# no fixed-width string
pa.array(['a', 'b', None]),
pa.array(['a', 'b', None]),
(pa.array([1, 2, None]).cast(pa.int32())
.cast(pa.time32('s'))),
pa.array([datetime.datetime(2016, 1, 1, 12, 12, 12),
datetime.datetime(2017, 1, 1), None]),
pa.array([datetime.date(2016, 1, 1),
datetime.date(2017, 1, 1), None])]
table = pa.Table.from_arrays(columns, names=names)
con.load_table_arrow(all_types_table, table)
def test_load_table_creates(self, con, not_a_table):
pd = pytest.importorskip("pandas")
import numpy as np
data = pd.DataFrame({
"boolean_": [True, False],
"smallint_cast": np.array([0, 1], dtype=np.int8),
"smallint_": np.array([0, 1], dtype=np.int16),
"int_": np.array([0, 1], dtype=np.int32),
"bigint_": np.array([0, 1], dtype=np.int64),
"float_": np.array([0, 1], dtype=np.float32),
"double_": np.array([0, 1], dtype=np.float64),
"varchar_": ["a", "b"],
"text_": ['a', 'b'],
"time_": [datetime.time(0, 11, 59), datetime.time(13)],
"timestamp_": [pd.Timestamp("2016"), pd.Timestamp("2017")],
"date_": [datetime.date(2016, 1, 1), datetime.date(2017, 1, 1)],
}, columns=['boolean_', 'smallint_', 'int_', 'bigint_', 'float_',
'double_', 'varchar_', 'text_', 'time_', 'timestamp_',
'date_'])
con.load_table(not_a_table, data, create=True)
def test_visualize():
pytest.importorskip('graphviz')
X, y = make_classification(n_samples=100, n_classes=2, flip_y=.2,
random_state=0)
clf = SVC(random_state=0)
grid = {'C': [.1, .5, .9]}
gs = dcv.GridSearchCV(clf, grid).fit(X, y)
assert hasattr(gs, 'dask_graph_')
with tmpdir() as d:
gs.visualize(filename=os.path.join(d, 'mydask'))
assert os.path.exists(os.path.join(d, 'mydask.png'))
# Doesn't work if not fitted
gs = dcv.GridSearchCV(clf, grid)
with pytest.raises(NotFittedError):
gs.visualize()
def spawn(self, cmd, expect_timeout=10.0):
"""Run a command using pexpect.
The pexpect child is returned.
"""
pexpect = pytest.importorskip("pexpect", "3.0")
if hasattr(sys, 'pypy_version_info') and '64' in platform.machine():
pytest.skip("pypy-64 bit not supported")
if sys.platform == "darwin":
pytest.xfail("pexpect does not work reliably on darwin?!")
if sys.platform.startswith("freebsd"):
pytest.xfail("pexpect does not work reliably on freebsd")
logfile = self.tmpdir.join("spawn.out").open("wb")
child = pexpect.spawn(cmd, logfile=logfile)
self.request.addfinalizer(logfile.close)
child.timeout = expect_timeout
return child
def test_batched_successful_call_explicit_loop(framework_aio):
'''
batched calls really happen in batches
'''
# Trollius doesn't come with this, so won't work on py2
pytest.importorskip('asyncio.test_utils')
from asyncio.test_utils import TestLoop
def time_gen():
yield
yield
new_loop = TestLoop(time_gen)
calls = []
def foo(*args, **kw):
calls.append((args, kw))
txa = txaio.with_config(loop=new_loop)
batched = txa.make_batched_timer(5)
batched.call_later(1, foo, "first call")
new_loop.advance_time(2.0)
new_loop._run_once()
assert len(calls) == 1
def test_create_future_success_explicit_loop(framework):
"""
process events on alternate loop= for create_future later
"""
pytest.importorskip('asyncio')
if txaio.using_twisted:
pytest.skip()
import asyncio
alt_loop = asyncio.new_event_loop()
txa = txaio.with_config(loop=alt_loop)
f = txa.create_future_success('some result')
results = []
f.add_done_callback(lambda r: results.append(r.result()))
# run_once() runs the txaio.config.loop so we shouldn't get any
# results until we spin alt_loop
assert results == []
run_once()
assert results == []
with replace_loop(alt_loop):
run_once()
assert results == ['some result']
def test_explicit_reactor_coroutine(framework):
"""
If we set an event-loop, Futures + Tasks should use it.
"""
pytest.importorskip('asyncio')
if txaio.using_twisted:
pytest.skip()
from asyncio import coroutine
@coroutine
def some_coroutine():
yield 'nothing'
with patch.object(txaio.config, 'loop') as fake_loop:
txaio.as_future(some_coroutine)
assert len(fake_loop.method_calls) == 2
c = fake_loop.method_calls[1]
assert c[0] == 'call_soon'
def test_log_converter(handler, framework):
pytest.importorskip("twisted.logger")
# this checks that we can convert a plain Twisted Logger calling
# failure() into a traceback on our observers.
from twisted.logger import Logger
from txaio.tx import _LogObserver
out = six.StringIO()
observer = _LogObserver(out)
logger = Logger(observer=observer)
try:
raise RuntimeError("failed on purpose")
except:
logger.failure(None)
output = out.getvalue()
assert "failed on purpose" in output
assert "Traceback" in output
def test_txlog_write_binary(handler, framework):
"""
Writing to a binary stream is supported.
"""
pytest.importorskip("twisted.logger")
from txaio.tx import _LogObserver
out_file = BytesIO()
observer = _LogObserver(out_file)
observer({
"log_format": "hi: {testentry}",
"testentry": "hello",
"log_level": observer.to_tx["info"],
"log_time": 1442890018.002233
})
output = out_file.getvalue()
assert b"hi: hello" in output
def test_txlog_write_text(handler, framework_tx):
"""
Writing to a text stream is supported.
"""
pytest.importorskip("twisted.logger")
from txaio.tx import _LogObserver
out_file = StringIO()
observer = _LogObserver(out_file)
observer({
"log_format": "hi: {testentry}",
"testentry": "hello",
"log_level": observer.to_tx["info"],
"log_time": 1442890018.002233
})
output = out_file.getvalue()
assert u"hi: hello" in output
def test_aiolog_write_text(handler, framework_aio):
"""
Writing to a text stream is supported.
"""
pytest.importorskip("txaio.aio")
from txaio.aio import _TxaioFileHandler
out_file = StringIO()
observer = _TxaioFileHandler(out_file)
observer.emit(Log(args={
"log_message": "hi: {testentry}",
"testentry": "hello",
"log_time": 1442890018.002233
}))
output = out_file.getvalue()
assert u"hi: hello" in output
def test_logger_cfg_yaml(self, tmpdir):
pytest.importorskip("yaml")
logger_cfg = tmpdir.join("logging.yml")
logger_cfg.write(textwrap.dedent("""
version: 1
disable_existing_loggers: False
root:
handlers: []
level: WARNING
"""))
config = make_config(["devpi-server", "--logger-cfg=%s" % logger_cfg])
configure_logging(config.args)
assert logging.getLogger().getEffectiveLevel() == logging.WARNING
assert not logging.getLogger().handlers
def test_compat_Zpad(backend, X,Y,Z, P, K):
pymr = pytest.importorskip('pymr')
b = backend()
i_shape = (X, Y, Z, K)
o_shape = (X+2*P, Y+2*P, Z+2*P, K)
x = indigo.util.rand64c( *i_shape )
D0 = pymr.linop.Zpad( o_shape, i_shape, dtype=x.dtype )
D1 = b.Zpad(o_shape[:3], i_shape[:3], dtype=x.dtype)
x_indigo = np.asfortranarray(x.reshape((-1,K), order='F'))
x_pmr = pymr.util.vec(x)
y_exp = D0 * x_pmr
y_act = D1 * x_indigo
y_act = y_act.flatten(order='F')
npt.assert_equal(y_act, y_exp)
def test_compat_Interp(backend, X, Y, Z, RO, PS, K, n, width):
pymr = pytest.importorskip('pymr')
b = backend()
N = (X, Y, Z, K)
M = (1, RO, PS, K)
T = (3, RO, PS)
import scipy.signal as signal
beta = 0.1234
kb = signal.kaiser(2 * n + 1, beta)[n:]
x = indigo.util.rand64c(*N)
traj = indigo.util.rand64c(*T).real - 0.5
G0 = pymr.linop.Interp(M, N, traj, width, kb, dtype=x.dtype)
y_exp = G0 * pymr.util.vec(x)
G1 = b.Interp(N[:3], traj, width, kb, dtype=x.dtype)
y_act = G1 * x.reshape((-1,K), order='F')
y_act = y_act.flatten(order='F')
npt.assert_allclose( y_act, y_exp, rtol=1e-2)
def test_compat_conjgrad(backend, N):
pymr = pytest.importorskip('pymr')
b = backend()
A = indigo.util.randM( N, N, 0.5 )
A = A.H @ A # make positive definite
y = indigo.util.rand64c( N )
x0 = np.zeros( N, dtype=np.complex64 )
A_pmr = pymr.linop.Matrix( A.toarray(), dtype=A.dtype )
x_exp = pymr.alg.cg(A_pmr, A.H * y, x0, maxiter=40)
A_indigo = b.SpMatrix(A)
b.cg(A_indigo, y, x0, maxiter=40)
x_act = x0.copy()
npt.assert_allclose(x_act, x_exp, rtol=1e-6)
def test_unittest_mock_and_fixture(self, testdir):
pytest.importorskip("unittest.mock")
testdir.makepyfile("""
import os.path
import unittest.mock
import pytest
@pytest.fixture
def inject_me():
pass
@unittest.mock.patch.object(os.path, "abspath",
new=unittest.mock.MagicMock)
def test_hello(inject_me):
import os
os.path.abspath("hello")
""")
reprec = testdir.inline_run()
reprec.assertoutcome(passed=1)
def test_random_report_log_xdist(testdir):
"""xdist calls pytest_runtest_logreport as they are executed by the slaves,
with nodes from several nodes overlapping, so junitxml must cope with that
to produce correct reports. #1064
"""
pytest.importorskip('xdist')
testdir.makepyfile("""
import pytest, time
@pytest.mark.parametrize('i', list(range(30)))
def test_x(i):
assert i != 22
""")
_, dom = runandparse(testdir, '-n2')
suite_node = dom.getElementsByTagName("testsuite")[0]
failed = []
for case_node in suite_node.getElementsByTagName("testcase"):
if case_node.getElementsByTagName('failure'):
failed.append(case_node.getAttributeNode('name').value)
assert failed == ['test_x[22]']
def test_preparse_ordering_with_setuptools(testdir, monkeypatch):
pkg_resources = pytest.importorskip("pkg_resources")
def my_iter(name):
assert name == "pytest11"
class EntryPoint:
name = "mytestplugin"
class dist:
pass
def load(self):
class PseudoPlugin:
x = 42
return PseudoPlugin()
return iter([EntryPoint()])
monkeypatch.setattr(pkg_resources, 'iter_entry_points', my_iter)
testdir.makeconftest("""
pytest_plugins = "mytestplugin",
""")
monkeypatch.setenv("PYTEST_PLUGINS", "mytestplugin")
config = testdir.parseconfig()
plugin = config.pluginmanager.getplugin("mytestplugin")
assert plugin.x == 42
def spawn(self, cmd, expect_timeout=10.0):
"""Run a command using pexpect.
The pexpect child is returned.
"""
pexpect = pytest.importorskip("pexpect", "3.0")
if hasattr(sys, 'pypy_version_info') and '64' in platform.machine():
pytest.skip("pypy-64 bit not supported")
if sys.platform == "darwin":
pytest.xfail("pexpect does not work reliably on darwin?!")
if sys.platform.startswith("freebsd"):
pytest.xfail("pexpect does not work reliably on freebsd")
logfile = self.tmpdir.join("spawn.out").open("wb")
child = pexpect.spawn(cmd, logfile=logfile)
self.request.addfinalizer(logfile.close)
child.timeout = expect_timeout
return child
def test_numpy_dtypes(self):
np = pytest.importorskip("numpy")
dtypes = np.typecodes["AllInteger"]
for dtype in dtypes:
dtype = np.dtype(dtype).type
shortcuts.check_integer(dtype(1))
# The integer check is strict. The
# type must match, even if numerically
# the number is an integer.
msg = "Expected an integer but got"
dtypes = (np.typecodes["AllFloat"] +
np.typecodes["Complex"])
for dtype in dtypes:
dtype = np.dtype(dtype).type
assert_raises(TypeError, msg, shortcuts.check_integer, dtype(1))
def test_make_instance():
"""Ensure that the schema's make instance works properly."""
peewee = pytest.importorskip('peewee')
class User(peewee.Model):
name = peewee.CharField(max_length=255)
class UserSchema(Schema):
name = fields.String()
class Meta:
model = User
data = {'name': 'Bob Blah'}
user = UserSchema.make_instance(data)
assert isinstance(user, User)
assert user.name == data['name']
def test_dot_graph(model, tmpdir):
ipydisp = pytest.importorskip('IPython.display')
# Use a name that the shell would interpret specially to ensure that we're
# not vulnerable to shell injection when interacting with `dot`.
filename = str(tmpdir.join('$(touch should_not_get_created.txt)'))
# Map from format extension to expected return type.
result_types = {
'png': ipydisp.Image,
'jpeg': ipydisp.Image,
'dot': type(None),
'pdf': type(None),
'svg': ipydisp.SVG,
}
for format in result_types:
target = '.'.join([filename, format])
_ensure_not_exists(target)
try:
result = dot_graph(model, filename=filename, format=format)
assert not os.path.exists('should_not_get_created.txt')
assert os.path.isfile(target)
assert isinstance(result, result_types[format])
finally:
_ensure_not_exists(target)
# format supported by graphviz but not by IPython
with pytest.raises(ValueError) as excinfo:
dot_graph(model, filename=filename, format='ps')
assert "Unknown format" in str(excinfo.value)
def __init__(db, dbtype):
pytest.importorskip(dbtype)
import tempfile
db.installation_dir = py.path.local(tempfile.mkdtemp(prefix='abe-test-'))
print("Created temporary directory %s" % db.installation_dir)
try:
db.server = db.install_server()
except Exception as e:
#print("EXCEPTION %s" % e)
db._delete_tmpdir()
pytest.skip(e)
raise
DB.__init__(db, dbtype, db.get_connect_args())