def test_invalid_dataset():
class WrongNumberOfArgsDataSource(FileDataSource):
def collect_files(self):
return ["dummy.txt"]
def collect_features(self, path, this_is_not_needed):
pass
class WrongNumberOfCollectedFilesDataSource(FileDataSource):
def collect_files(self):
return ["dummy.txt"] * 1, ["dummy.txt"] * 2
def collect_features(self, path):
pass
def __test_wrong_num_args():
X = FileSourceDataset(WrongNumberOfArgsDataSource())
X[0]
def __test_wrong_num_collected_files():
X = FileSourceDataset(WrongNumberOfCollectedFilesDataSource())
X[0]
yield raises(TypeError)(__test_wrong_num_args)
yield raises(RuntimeError)(__test_wrong_num_collected_files)
python类raises()的实例源码
def test_voice_statistics_dummy():
data_source = voice_statistics.WavFileDataSource("dummy", speakers=["fujitou"])
@raises(ValueError)
def __test_invalid_speaker():
data_source = voice_statistics.WavFileDataSource("dummy", speakers=["test"])
@raises(ValueError)
def __test_invalid_emotion():
data_source = voice_statistics.WavFileDataSource(
"dummy", speakers=["fujitou"], emotions="nnmnkwii")
@raises(RuntimeError)
def __test_nodir(data_source):
data_source.collect_files()
__test_invalid_speaker()
__test_invalid_emotion()
__test_nodir(data_source)
def test_data_watcher_once(self):
update = threading.Event()
data = [True]
# Make it a non-existent path
self.path += 'f'
dwatcher = self.client.DataWatch(self.path)
@dwatcher
def changed(d, stat):
data.pop()
data.append(d)
update.set()
update.wait(10)
eq_(data, [None])
update.clear()
@raises(KazooException)
def test_it():
@dwatcher
def func(d, stat):
data.pop()
test_it()
def test_child_watcher_once(self):
update = threading.Event()
all_children = ['fred']
cwatch = self.client.ChildrenWatch(self.path)
@cwatch
def changed(children):
while all_children:
all_children.pop()
all_children.extend(children)
update.set()
update.wait(10)
eq_(all_children, [])
update.clear()
@raises(KazooException)
def test_it():
@cwatch
def changed_again(children):
update.set()
test_it()
def test_async_exception(self):
@raises(IOError)
def check_exc(r):
r.get()
def broken():
raise IOError("Failed")
with start_stop_one() as handler:
r = handler.async_result()
w = handler.spawn(utils.wrap(r)(broken))
w.join()
self.assertFalse(r.successful())
check_exc(r)
def test_scipy_lbfgsb():
sess = tf.Session()
x = tf.Variable(np.float64(2), name='x')
sess.run(tf.initialize_variables([x]))
optimizer = ScipyLBFGSBOptimizer(verbose=True, session=sess)
# With gradient
results = optimizer.minimize([x], x**2, [2 * x])
assert results.success
# Without gradient
results = optimizer.minimize([x], x**2)
assert results.success
# Test callback
def callback(xs):
pass
optimizer = ScipyLBFGSBOptimizer(verbose=True, session=sess, callback=callback)
assert optimizer.minimize([x], x**2).success
@raises(ValueError)
def test_illegal_parameter_as_variable1():
optimizer.minimize([42], x**2)
test_illegal_parameter_as_variable1()
@raises(ValueError)
def test_illegal_parameter_as_variable2():
optimizer.minimize(42, x**2)
test_illegal_parameter_as_variable2()
def test_migrad():
sess = tf.Session()
x = tf.Variable(np.float64(2), name='x')
sess.run(tf.initialize_variables([x]))
optimizer = MigradOptimizer(session=sess)
# With gradient
results = optimizer.minimize([x], x**2, [2 * x])
assert results.success
# Without gradient
results = optimizer.minimize([x], x**2)
assert results.success
@raises(ValueError)
def test_illegal_parameter_as_variable1():
optimizer.minimize([42], x**2)
test_illegal_parameter_as_variable1()
@raises(ValueError)
def test_illegal_parameter_as_variable2():
optimizer.minimize(42, x**2)
test_illegal_parameter_as_variable2()
def test_init():
with tp.Model():
X1 = tp.Uniform(lower=-1, upper=1)
X2 = tp.Uniform(lower=-1)
X3 = tp.Uniform(upper=1)
X4 = tp.Uniform()
X7 = tp.Uniform(lower=X1, upper=X2)
# @raises(ValueError)
# def test_uniform_fail_lower():
# with tp.Model():
# X1 = tp.Uniform()
# X2 = tp.Uniform(lower=X1)
# @raises(ValueError)
# def test_uniform_fail_upper():
# with tp.Model() as model:
# X1 = tp.Uniform()
# X2 = tp.Uniform(upper=X1)
def test_inconsistent_inner_fct(self):
# Test that scan can detect inconsistencies in the inner graph and
# raises an appropriate exception. The pickled file used in this test
# relies on the cuda backend.
# This test has not been extensively tested for Python 3 so it should
# be skipped if python version is >=3
version = sys.version_info
if version >= (3,):
raise SkipTest("This test relies on a pickled file produced with "
"Python 2. The current python version "
"(%i.%i.%i.%i) is >= 3 so the test will be "
"skipped." % (version.major, version.minor,
version.micro, version.serial))
# When unpickled, the scan op should perform validation on its inner
# graph, detect the inconsistencies and raise a TypeError
folder = os.path.dirname(os.path.abspath(__file__))
path = os.path.join(folder, "inconsistent_scan.pkl")
assert_raises(TypeError, pickle.load, open(path, "r"))
def test_data_watcher_once(self):
update = threading.Event()
data = [True]
# Make it a non-existent path
self.path += 'f'
dwatcher = self.client.DataWatch(self.path)
@dwatcher
def changed(d, stat):
data.pop()
data.append(d)
update.set()
update.wait(10)
eq_(data, [None])
update.clear()
@raises(KazooException)
def test_it():
@dwatcher
def func(d, stat):
data.pop()
test_it()
def test_child_watcher_once(self):
update = threading.Event()
all_children = ['fred']
cwatch = self.client.ChildrenWatch(self.path)
@cwatch
def changed(children):
while all_children:
all_children.pop()
all_children.extend(children)
update.set()
update.wait(10)
eq_(all_children, [])
update.clear()
@raises(KazooException)
def test_it():
@cwatch
def changed_again(children):
update.set()
test_it()
def test_async_exception(self):
@raises(IOError)
def check_exc(r):
r.get()
def broken():
raise IOError("Failed")
with start_stop_one() as handler:
r = handler.async_result()
w = handler.spawn(utils.wrap(r)(broken))
w.join()
self.assertFalse(r.successful())
check_exc(r)
def test_formpack_version_cannot_have_name(self):
vdata = copy(SINGLE_NOTE_SURVEY)
vdata['name'] = "somename"
FormPack(id_string="idstring",
versions=[
vdata,
])
# TODO: remove this test of fix it
# @raises(PyXFormError)
# def test_xform(self):
# fp = FormPack(title='test_fixture_title',
# root_node_name='daata',
# versions=[
# SINGLE_NOTE_SURVEY,
# ])
# fp.versions[0].to_xml()
def test_csv_with_tag_headers(self):
title, schemas, submissions = build_fixture('dietary_needs')
fp = FormPack(schemas, title)
options = {'versions': 'dietv1', 'tag_cols_for_header': ['hxl']}
rows = list(fp.export(**options).to_csv(submissions))
assert rows[1] == (u'"#loc +name";"#indicator +diet";'
u'"#indicator +diet";"#indicator +diet";'
u'"#indicator +diet";"#indicator +diet"')
# disabled for now
# @raises(RuntimeError)
# def test_csv_on_repeatable_groups(self):
# title, schemas, submissions = build_fixture('grouped_repeatable')
# fp = FormPack(schemas, title)
# options = {'versions': 'rgv1'}
# list(fp.export(**options).to_csv(submissions))
def test_dwm_init_fields_badlookup():
""" test Dwm class raises error with bad lookup type """
fields = {
'field1': {
'lookup': ['genericLookup', 'genericRegex', 'fieldSpecificRegex',
'fieldSpecificLookup', 'normLookup', 'badlookup'],
'derive': [
{
'type': 'deriveIncludes',
'fieldSet': ['field2'],
'options': []
}
]
}
}
Dwm(name='test', mongo=DB, fields=fields)
def test_dwm_init_fields_badderive():
""" test Dwm class raises error with bad derive type """
fields = {
'field1': {
'lookup': ['genericLookup', 'genericRegex', 'fieldSpecificRegex',
'fieldSpecificLookup', 'normLookup', 'normIncludes'],
'derive': [
{
'type': 'badderive',
'fieldSet': ['field2'],
'options': []
}
]
}
}
Dwm(name='test', mongo=DB, fields=fields)
def test_dwm_init_fields_badopt():
""" test Dwm class raises error with bad derive option type """
fields = {
'field1': {
'lookup': ['genericLookup', 'genericRegex', 'fieldSpecificRegex',
'fieldSpecificLookup', 'normLookup', 'normIncludes'],
'derive': [
{
'type': 'deriveIncludes',
'fieldSet': ['field2'],
'options': ['badoption']
}
]
}
}
Dwm(name='test', mongo=DB, fields=fields)
# Initialize with User-Defined Functions
test_yql_object.py 文件源码
项目:yahoo-fantasy-football-metrics
作者: uberfastman
项目源码
文件源码
阅读 17
收藏 0
点赞 0
评论 0
def test_yql_object_one(self):
"""Test that invalid query raises AttributeError"""
yqlobj.query = 1
test_yql_object.py 文件源码
项目:yahoo-fantasy-football-metrics
作者: uberfastman
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def test_one(self):
"""Test that accessing one result raises exception"""
yqlobj.one()
def test_hts_append():
lab_path = join(DATA_DIR, "BASIC5000_0001.lab")
test_labels = hts.load(lab_path)
print("\n{}".format(test_labels))
# should get same string representation
labels = hts.HTSLabelFile()
assert str(labels) == ""
for label in test_labels:
labels.append(label)
assert str(test_labels) == str(labels)
@raises(ValueError)
def test_invalid_start_time():
l = hts.HTSLabelFile()
l.append((100000, 0, "NG"))
def test_succeeding_times():
l = hts.HTSLabelFile()
l.append((0, 1000000, "OK"))
l.append((1000000, 2000000, "OK"))
@raises(ValueError)
def test_non_succeeding_times():
l = hts.HTSLabelFile()
l.append((0, 1000000, "OK"))
l.append((1500000, 2000000, "NG"))
test_invalid_start_time()
test_succeeding_times()
test_non_succeeding_times()
# shouldn't raise RuntimeError
def test_empty_dataset():
class EmptyDataSource(FileDataSource):
def collect_files(self):
return []
def collect_features(self, path):
pass
X = FileSourceDataset(EmptyDataSource())
def __test_outof_range(X):
print(X[0])
# Should raise IndexError
yield raises(IndexError)(__test_outof_range), X
def test_asarray():
X, Y = _get_small_datasets(padded=False, duration=True)
lengths = [len(x) for x in X]
X, Y = _get_small_datasets(
padded=True, duration=True, padded_length=np.max(lengths))
X_array = np.asarray(X)
assert X_array.ndim == 3
assert np.allclose(X_array, X.asarray())
# Explicitly give padded length to actual max time length
X, Y = _get_small_datasets(padded=False, duration=True)
assert np.allclose(X_array, X.asarray(padded_length=np.max(lengths)))
# Make sure that auto-guessing padded_length should get same result as
# explicitly given max time length
assert np.allclose(X_array, X.asarray(padded_length=None))
# Force triggering re-allocations
assert np.allclose(X_array, X.asarray(
padded_length=None, padded_length_guess=1))
def __test_very_small_padded_length():
X, Y = _get_small_datasets(padded=False, duration=True)
X.asarray(padded_length=1)
# Should raise `num frames exceeded`
yield raises(RuntimeError)(__test_very_small_padded_length)
def test_sequence_wise_torch_data_loader():
import torch
from torch.utils import data as data_utils
X, Y = _get_small_datasets(padded=False)
class TorchDataset(data_utils.Dataset):
def __init__(self, X, Y):
self.X = X
self.Y = Y
def __getitem__(self, idx):
return torch.from_numpy(self.X[idx]), torch.from_numpy(self.Y[idx])
def __len__(self):
return len(self.X)
def __test(X, Y, batch_size):
dataset = TorchDataset(X, Y)
loader = data_utils.DataLoader(
dataset, batch_size=batch_size, num_workers=1, shuffle=True)
for idx, (x, y) in enumerate(loader):
assert len(x.shape) == len(y.shape)
assert len(x.shape) == 3
print(idx, x.shape, y.shape)
# Test with batch_size = 1
yield __test, X, Y, 1
# Since we have variable length frames, batch size larger than 1 causes
# runtime error.
yield raises(RuntimeError)(__test), X, Y, 2
# For padded dataset, which can be reprensented by (N, T^max, D), batchsize
# can be any number.
X, Y = _get_small_datasets(padded=True)
yield __test, X, Y, 1
yield __test, X, Y, 2
def test_ljspeech_dummy():
data_sources = [ljspeech.TranscriptionDataSource,
ljspeech.NormalizedTranscriptionDataSource,
ljspeech.WavFileDataSource]
for data_source in data_sources:
@raises(RuntimeError)
def f(source):
source("dummy")
f(data_source)
def test_vcc2016_dummy():
data_source = vcc2016.WavFileDataSource("dummy", speakers=["SF1"])
@raises(ValueError)
def __test_invalid_speaker():
data_source = vcc2016.WavFileDataSource("dummy", speakers=["test"])
@raises(RuntimeError)
def __test_nodir(data_source):
data_source.collect_files()
__test_invalid_speaker()
__test_nodir(data_source)
def test_jsut_dummy():
data_sources = [jsut.TranscriptionDataSource,
jsut.WavFileDataSource]
for data_source in data_sources:
@raises(RuntimeError)
def f(source):
source("dummy")
f(data_source)
def test_vctk_dummy():
assert len(vctk.available_speakers) == 108
data_sources = [vctk.TranscriptionDataSource,
vctk.WavFileDataSource]
for data_source in data_sources:
@raises(RuntimeError)
def f(source):
source("dummy")
f(data_source)
def test_invalid_duration_features():
phone_labels = hts.load(example_label_file(phone_level=True))
@raises(ValueError)
def __test(labels, unit_size, feature_size):
fe.duration_features(labels, unit_size=unit_size, feature_size=feature_size)
yield __test, phone_labels, None, "frame"
def test_modspec_smoothing():
static_dim = 2
T = 64
np.random.seed(1234)
y = np.random.rand(T, static_dim)
modfs = 200
for log_domain in [True, False]:
for norm in [None, "ortho"]:
for n in [1024, 2048]:
# Nyquist freq
y_hat = P.modspec_smoothing(y, modfs, n=n, norm=norm,
cutoff=modfs // 2,
log_domain=log_domain)
assert np.allclose(y, y_hat)
# Smooth
P.modspec_smoothing(y, modfs, n=n, norm=norm,
cutoff=modfs // 4,
log_domain=log_domain)
# Cutoff frequency larger than modfs//2
@raises(ValueError)
def __test_invalid_param(y, modfs):
P.modspec_smoothing(y, modfs, n=2048, cutoff=modfs // 2 + 1)
# FFT size should larger than time length
@raises(RuntimeError)
def __test_invalid_time_length(y, modfs):
P.modspec_smoothing(y, modfs, n=32, cutoff=modfs // 2)
__test_invalid_time_length(y, modfs)
__test_invalid_param(y, modfs)
def test_yql_object_one(self):
"""Test that invalid query raises AttributeError"""
yqlobj.query = 1