def skip_if(predicate, reason=None):
"""Skip a test if predicate is true."""
reason = reason or predicate.__name__
from nose import SkipTest
def decorate(fn):
fn_name = fn.__name__
def maybe(*args, **kw):
if predicate():
msg = "'%s' skipped: %s" % (
fn_name, reason)
raise SkipTest(msg)
else:
return fn(*args, **kw)
return function_named(maybe, fn_name)
return decorate
python类SkipTest()的实例源码
def setup_class(cls):
if not tests.is_datastore_supported():
raise nose.SkipTest("Datastore not supported")
p.load('datastore')
ctd.CreateTestData.create()
cls.sysadmin_user = model.User.get('testsysadmin')
cls.normal_user = model.User.get('annafan')
resource = model.Package.get('annakarenina').resources[0]
cls.data = {
'resource_id': resource.id,
'aliases': u'b\xfck2',
'fields': [{'id': 'book', 'type': 'text'},
{'id': 'author', 'type': 'text'},
{'id': 'rating with %', 'type': 'text'}],
'records': [{'book': 'annakarenina', 'author': 'tolstoy',
'rating with %': '90%'},
{'book': 'warandpeace', 'author': 'tolstoy',
'rating with %': '42%'}]
}
engine = db._get_engine(
{'connection_url': config['ckan.datastore.write_url']})
cls.Session = orm.scoped_session(orm.sessionmaker(bind=engine))
set_url_type(
model.Package.get('annakarenina').resources, cls.sysadmin_user)
def setup_class(cls):
wsgiapp = middleware.make_app(config['global_conf'], **config)
cls.app = paste.fixture.TestApp(wsgiapp)
if not tests.is_datastore_supported():
raise nose.SkipTest("Datastore not supported")
p.load('datastore')
p.load('datapusher')
p.load('test_datapusher_plugin')
resource = factories.Resource(url_type='datastore')
cls.dataset = factories.Dataset(resources=[resource])
cls.sysadmin_user = factories.User(name='testsysadmin', sysadmin=True)
cls.normal_user = factories.User(name='annafan')
engine = db._get_engine(
{'connection_url': config['ckan.datastore.write_url']})
cls.Session = orm.scoped_session(orm.sessionmaker(bind=engine))
def test_interrupted_systemcall(self):
'''
Make sure interrupted system calls don't break the world, since we
can't control what all signals our connection thread will get
'''
if 'linux' not in platform:
raise SkipTest('Unable to reproduce error case on'
' non-linux platforms')
path = 'interrupt_test'
value = b"1"
self.client.create(path, value)
# set the euid to the current process' euid.
# glibc sends SIGRT to all children, which will interrupt the
# system call
os.seteuid(os.geteuid())
# basic sanity test that it worked alright
assert self.client.get(path)[0] == value
def setup_class(cls):
if not pylons.config.get('ckan.datastore.read_url'):
raise nose.SkipTest('Datastore runs on legacy mode, skipping...')
engine = db._get_engine(
{'connection_url': pylons.config['ckan.datastore.write_url']}
)
cls.Session = orm.scoped_session(orm.sessionmaker(bind=engine))
datastore_test_helpers.clear_db(cls.Session)
create_tables = [
u'CREATE TABLE test_a (id_a text)',
u'CREATE TABLE test_b (id_b text)',
u'CREATE TABLE "TEST_C" (id_c text)',
u'CREATE TABLE test_d ("?/?" integer)',
]
for create_table_sql in create_tables:
cls.Session.execute(create_table_sql)
def skip(msg=None):
"""Decorator factory - mark a test function for skipping from test suite.
Parameters
----------
msg : string
Optional message to be added.
Returns
-------
decorator : function
Decorator, which, when applied to a function, causes SkipTest
to be raised, with the optional message added.
"""
return skipif(True,msg)
def test_obj_del(self):
"""Test that object's __del__ methods are called on exit."""
if sys.platform == 'win32':
try:
import win32api
except ImportError:
raise SkipTest("Test requires pywin32")
src = ("class A(object):\n"
" def __del__(self):\n"
" print 'object A deleted'\n"
"a = A()\n")
self.mktmp(py3compat.doctest_refactor_print(src))
if dec.module_not_available('sqlite3'):
err = 'WARNING: IPython History requires SQLite, your history will not be saved\n'
else:
err = None
tt.ipexec_validate(self.fname, 'object A deleted', err)
def test_not_writable_ipdir():
tmpdir = tempfile.mkdtemp()
os.name = "posix"
env.pop('IPYTHON_DIR', None)
env.pop('IPYTHONDIR', None)
env.pop('XDG_CONFIG_HOME', None)
env['HOME'] = tmpdir
ipdir = os.path.join(tmpdir, '.ipython')
os.mkdir(ipdir, 0o555)
try:
open(os.path.join(ipdir, "_foo_"), 'w').close()
except IOError:
pass
else:
# I can still write to an unwritable dir,
# assume I'm root and skip the test
raise SkipTest("I can't create directories that I can't write to")
with AssertPrints('is not a writable location', channel='stderr'):
ipdir = paths.get_ipython_dir()
env.pop('IPYTHON_DIR', None)
def monkeypatch_xunit():
try:
knownfailureif(True)(lambda: None)()
except Exception as e:
KnownFailureTest = type(e)
def addError(self, test, err, capt=None):
if issubclass(err[0], KnownFailureTest):
err = (SkipTest,) + err[1:]
return self.orig_addError(test, err, capt)
Xunit.orig_addError = Xunit.addError
Xunit.addError = addError
#-----------------------------------------------------------------------------
# Check which dependencies are installed and greater than minimum version.
#-----------------------------------------------------------------------------
def __call__(self, fn):
@decorator
def decorate(fn, *args, **kw):
if self.predicate():
if self.reason:
msg = "'%s' : %s" % (
fn.__name__,
self.reason
)
else:
msg = "'%s': %s" % (
fn.__name__, self.predicate
)
raise SkipTest(msg)
else:
if self._fails_on:
with self._fails_on.fail_if(name=fn.__name__):
return fn(*args, **kw)
else:
return fn(*args, **kw)
return decorate(fn)
def test_status_exceptions_can_be_pickled_across_processes(self):
try:
import jobrunner
except ImportError:
raise SkipTest("jobrunner not installed, skipping")
runner = jobrunner.JobRunner(jobrunner.JobRunner.RUN_MODE_MULTIPROCESS,
runnables=[raise_an_exception],
auto_assert=False)
result = runner.run()[0]
self.assertTrue(result.exception_occured())
self.assertEqual(str(result.err_type), str(nifpga.FifoTimeoutError))
self.assertIn("session: 0xbeef", result.err_class)
if python_version == 2:
self.assertIn("a bogus string argument: 'I am a string'", result.err_class)
else:
self.assertIn("a bogus string argument: b'I am a string'", result.err_class)
def skip_if(predicate, reason=None):
"""Skip a test if predicate is true."""
reason = reason or predicate.__name__
from nose import SkipTest
def decorate(fn):
fn_name = fn.__name__
def maybe(*args, **kw):
if predicate():
msg = "'%s' skipped: %s" % (
fn_name, reason)
raise SkipTest(msg)
else:
return fn(*args, **kw)
return function_named(maybe, fn_name)
return decorate
def test_complex_signature_py3(self):
if six.PY2:
raise SkipTest()
with LogCapture() as l:
# without this exec Python 2 and pyflakes complain about syntax errors etc
exec (
"""@trace_call(self.logger)
def foo(a, b, c, d, e, *varargs_, f=None, g='G', h='H', i='ii', j='jj', **varkwargs_: None):
pass
foo('a', 'b', *['c', 'd'], e='E', f='F', Z='Z', **{'g':'g', 'h':'h'})
""", locals(), globals()
)
l.check(
(
'test.v0_1.test_base',
'DEBUG',
"calling foo(a='a', b='b', c='c', d='d', e='E', f='F', "
"g='g', h='h', varkwargs_={'Z': 'Z'}, varargs_=<class '%s._empty'>, "
"i='ii', j='jj')" % (INSPECT_MODULE_NAME,) # prefix does not work because of the eval, inspect module is for pypy3
),
)
test_ols.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 24
收藏 0
点赞 0
评论 0
def testWLS(self):
# WLS centered SS changed (fixed) in 0.5.0
sm_version = sm.version.version
if sm_version < LooseVersion('0.5.0'):
raise nose.SkipTest("WLS centered SS not fixed in statsmodels"
" version {0}".format(sm_version))
X = DataFrame(np.random.randn(30, 4), columns=['A', 'B', 'C', 'D'])
Y = Series(np.random.randn(30))
weights = X.std(1)
self._check_wls(X, Y, weights)
weights.ix[[5, 15]] = np.nan
Y[[2, 21]] = np.nan
self._check_wls(X, Y, weights)
test_compat.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 29
收藏 0
点赞 0
评论 0
def test_compat():
# test we have compat with our version of nu
from pandas.computation import _NUMEXPR_INSTALLED
try:
import numexpr as ne
ver = ne.__version__
if ver == LooseVersion('2.4.4'):
assert not _NUMEXPR_INSTALLED
elif ver < LooseVersion('2.1'):
with tm.assert_produces_warning(UserWarning,
check_stacklevel=False):
assert not _NUMEXPR_INSTALLED
else:
assert _NUMEXPR_INSTALLED
except ImportError:
raise nose.SkipTest("not testing numexpr version compat")
test_compat.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 32
收藏 0
点赞 0
评论 0
def check_invalid_numexpr_version(engine, parser):
def testit():
a, b = 1, 2
res = pd.eval('a + b', engine=engine, parser=parser)
tm.assert_equal(res, 3)
if engine == 'numexpr':
try:
import numexpr as ne
except ImportError:
raise nose.SkipTest("no numexpr")
else:
if ne.__version__ < LooseVersion('2.1'):
with tm.assertRaisesRegexp(ImportError, "'numexpr' version is "
".+, must be >= 2.1"):
testit()
elif ne.__version__ == LooseVersion('2.4.4'):
raise nose.SkipTest("numexpr version==2.4.4")
else:
testit()
else:
testit()
test_eval.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def get_expected_pow_result(self, lhs, rhs):
try:
expected = _eval_single_bin(lhs, '**', rhs, self.engine)
except ValueError as e:
msg = 'negative number cannot be raised to a fractional power'
try:
emsg = e.message
except AttributeError:
emsg = e
emsg = u(emsg)
if emsg == msg:
if self.engine == 'python':
raise nose.SkipTest(emsg)
else:
expected = np.nan
else:
raise
return expected
test_multi.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def test_legacy_pickle(self):
if PY3:
raise nose.SkipTest("testing for legacy pickles not "
"support on py3")
path = tm.get_data_path('multiindex_v1.pickle')
obj = pd.read_pickle(path)
obj2 = MultiIndex.from_tuples(obj.values)
self.assertTrue(obj.equals(obj2))
res = obj.get_indexer(obj)
exp = np.arange(len(obj))
assert_almost_equal(res, exp)
res = obj.get_indexer(obj2[::-1])
exp = obj.get_indexer(obj[::-1])
exp2 = obj2.get_indexer(obj2[::-1])
assert_almost_equal(res, exp)
assert_almost_equal(exp, exp2)
test_analytics.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 26
收藏 0
点赞 0
评论 0
def test_quantile_interpolation_np_lt_1p9(self):
# GH #10174
if not _np_version_under1p9:
raise nose.SkipTest("Numpy version is greater than 1.9")
from numpy import percentile
# interpolation = linear (default case)
q = self.ts.quantile(0.1, interpolation='linear')
self.assertEqual(q, percentile(self.ts.valid(), 10))
q1 = self.ts.quantile(0.1)
self.assertEqual(q1, percentile(self.ts.valid(), 10))
# interpolation other than linear
expErrMsg = "Interpolation methods other than "
with tm.assertRaisesRegexp(ValueError, expErrMsg):
self.ts.quantile(0.9, interpolation='nearest')
# object dtype
with tm.assertRaisesRegexp(ValueError, expErrMsg):
q = Series(self.ts, dtype=object).quantile(0.7,
interpolation='higher')
test_panel.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def test_to_excel(self):
try:
import xlwt # noqa
import xlrd # noqa
import openpyxl # noqa
from pandas.io.excel import ExcelFile
except ImportError:
raise nose.SkipTest("need xlwt xlrd openpyxl")
for ext in ['xls', 'xlsx']:
path = '__tmp__.' + ext
with ensure_clean(path) as path:
self.panel.to_excel(path)
try:
reader = ExcelFile(path)
except ImportError:
raise nose.SkipTest("need xlwt xlrd openpyxl")
for item, df in self.panel.iteritems():
recdf = reader.parse(str(item), index_col=0)
assert_frame_equal(df, recdf)
test_panel.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def test_to_excel_xlsxwriter(self):
try:
import xlrd # noqa
import xlsxwriter # noqa
from pandas.io.excel import ExcelFile
except ImportError:
raise nose.SkipTest("Requires xlrd and xlsxwriter. Skipping test.")
path = '__tmp__.xlsx'
with ensure_clean(path) as path:
self.panel.to_excel(path, engine='xlsxwriter')
try:
reader = ExcelFile(path)
except ImportError as e:
raise nose.SkipTest("cannot write excel file: %s" % e)
for item, df in self.panel.iteritems():
recdf = reader.parse(str(item), index_col=0)
assert_frame_equal(df, recdf)
test_panel4d.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 24
收藏 0
点赞 0
评论 0
def test_setitem_ndarray(self):
raise nose.SkipTest("skipping for now")
# from pandas import DateRange, datetools
# timeidx = DateRange(start=datetime(2009,1,1),
# end=datetime(2009,12,31),
# offset=datetools.MonthEnd())
# lons_coarse = np.linspace(-177.5, 177.5, 72)
# lats_coarse = np.linspace(-87.5, 87.5, 36)
# P = Panel(items=timeidx, major_axis=lons_coarse,
# minor_axis=lats_coarse)
# data = np.random.randn(72*36).reshape((72,36))
# key = datetime(2009,2,28)
# P[key] = data#
# assert_almost_equal(P[key].values, data)
test_panel4d.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 27
收藏 0
点赞 0
评论 0
def test_shift(self):
raise nose.SkipTest("skipping for now")
# # major
# idx = self.panel.major_axis[0]
# idx_lag = self.panel.major_axis[1]
# shifted = self.panel.shift(1)
# assert_frame_equal(self.panel.major_xs(idx),
# shifted.major_xs(idx_lag))
# # minor
# idx = self.panel.minor_axis[0]
# idx_lag = self.panel.minor_axis[1]
# shifted = self.panel.shift(1, axis='minor')
# assert_frame_equal(self.panel.minor_xs(idx),
# shifted.minor_xs(idx_lag))
# self.assertRaises(Exception, self.panel.shift, 1, axis='items')
test_panel4d.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 30
收藏 0
点赞 0
评论 0
def test_multiindex_get(self):
raise nose.SkipTest("skipping for now")
# ind = MultiIndex.from_tuples([('a', 1), ('a', 2), ('b', 1), ('b',2)],
# names=['first', 'second'])
# wp = Panel(np.random.random((4,5,5)),
# items=ind,
# major_axis=np.arange(5),
# minor_axis=np.arange(5))
# f1 = wp['a']
# f2 = wp.ix['a']
# assert_panel_equal(f1, f2)
# self.assertTrue((f1.items == [1, 2]).all())
# self.assertTrue((f2.items == [1, 2]).all())
# ind = MultiIndex.from_tuples([('a', 1), ('a', 2), ('b', 1)],
# names=['first', 'second'])
test_unpack.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def test_unpacker_hook_refcnt(self):
if not hasattr(sys, 'getrefcount'):
raise nose.SkipTest('no sys.getrefcount()')
result = []
def hook(x):
result.append(x)
return x
basecnt = sys.getrefcount(hook)
up = Unpacker(object_hook=hook, list_hook=hook)
assert sys.getrefcount(hook) >= basecnt + 2
up.feed(packb([{}]))
up.feed(packb([{}]))
assert up.unpack() == [{}]
assert up.unpack() == [{}]
assert result == [{}, [{}], {}, [{}]]
del up
assert sys.getrefcount(hook) == basecnt
test_analytics.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 28
收藏 0
点赞 0
评论 0
def test_built_in_round(self):
if not compat.PY3:
raise nose.SkipTest("build in round cannot be overriden "
"prior to Python 3")
# GH11763
# Here's the test frame we'll be working with
df = DataFrame(
{'col1': [1.123, 2.123, 3.123], 'col2': [1.234, 2.234, 3.234]})
# Default round to integer (i.e. decimals=0)
expected_rounded = DataFrame(
{'col1': [1., 2., 3.], 'col2': [1., 2., 3.]})
assert_frame_equal(round(df), expected_rounded)
# Clip
test_internals.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 19
收藏 0
点赞 0
评论 0
def test_split_block_at(self):
# with dup column support this method was taken out
# GH3679
raise nose.SkipTest("skipping for now")
bs = list(self.fblock.split_block_at('a'))
self.assertEqual(len(bs), 1)
self.assertTrue(np.array_equal(bs[0].items, ['c', 'e']))
bs = list(self.fblock.split_block_at('c'))
self.assertEqual(len(bs), 2)
self.assertTrue(np.array_equal(bs[0].items, ['a']))
self.assertTrue(np.array_equal(bs[1].items, ['e']))
bs = list(self.fblock.split_block_at('e'))
self.assertEqual(len(bs), 1)
self.assertTrue(np.array_equal(bs[0].items, ['a', 'c']))
# bblock = get_bool_ex(['f'])
# bs = list(bblock.split_block_at('f'))
# self.assertEqual(len(bs), 0)
test_multilevel.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def test_partial_ix_missing(self):
raise nose.SkipTest("skipping for now")
result = self.ymd.ix[2000, 0]
expected = self.ymd.ix[2000]['A']
assert_series_equal(result, expected)
# need to put in some work here
# self.ymd.ix[2000, 0] = 0
# self.assertTrue((self.ymd.ix[2000]['A'] == 0).all())
# Pretty sure the second (and maybe even the first) is already wrong.
self.assertRaises(Exception, self.ymd.ix.__getitem__, (2000, 6))
self.assertRaises(Exception, self.ymd.ix.__getitem__, (2000, 6), 0)
# ---------------------------------------------------------------------
test_cparser.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 19
收藏 0
点赞 0
评论 0
def test_header_not_enough_lines_as_recarray(self):
if compat.is_platform_windows():
raise nose.SkipTest(
"segfaults on win-64, only when all tests are run")
data = ('skip this\n'
'skip this\n'
'a,b,c\n'
'1,2,3\n'
'4,5,6')
reader = TextReader(StringIO(data), delimiter=',', header=2,
as_recarray=True)
header = reader.header
expected = [['a', 'b', 'c']]
self.assertEqual(header, expected)
recs = reader.read()
expected = {'a': [1, 4], 'b': [2, 5], 'c': [3, 6]}
assert_array_dicts_equal(expected, recs)
# not enough rows
self.assertRaises(parser.CParserError, TextReader, StringIO(data),
delimiter=',', header=5, as_recarray=True)
test_cparser.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 19
收藏 0
点赞 0
评论 0
def test_numpy_string_dtype_as_recarray(self):
data = """\
a,1
aa,2
aaa,3
aaaa,4
aaaaa,5"""
if compat.is_platform_windows():
raise nose.SkipTest(
"segfaults on win-64, only when all tests are run")
def _make_reader(**kwds):
return TextReader(StringIO(data), delimiter=',', header=None,
**kwds)
reader = _make_reader(dtype='S4', as_recarray=True)
result = reader.read()
self.assertEqual(result['0'].dtype, 'S4')
ex_values = np.array(['a', 'aa', 'aaa', 'aaaa', 'aaaa'], dtype='S4')
self.assertTrue((result['0'] == ex_values).all())
self.assertEqual(result['1'].dtype, 'S4')