def test_function_otype_inference_raises(self):
def infer(result):
itype = tdt.TensorType([])
f = lambda _: result
return tdb._infer_tf_output_type_from_input_type(f, itype)
self.assertRaisesWithLiteralMatch(
TypeError, '42 is not a TF tensor', infer, 42)
six.assertRaisesRegex(
self, TypeError, 'unspecified rank', infer, tf.placeholder('float32'))
six.assertRaisesRegex(
self, TypeError, 'expected a batch tensor, saw a scalar', infer,
tf.placeholder('float32', []))
six.assertRaisesRegex(
self, TypeError, r'leading \(batch\) dimension should be None', infer,
tf.placeholder('float32', [0, 2]))
six.assertRaisesRegex(
self, TypeError, 'instance shape is not fully defined', infer,
tf.placeholder('float32', [None, 42, None, 5]))
python类assertRaisesRegex()的实例源码
def _check_copyto_where_multigpu_raises(self, dtype, ngpus):
def get_numpy():
a = testing.shaped_arange((2, 3, 4), numpy, dtype)
b = testing.shaped_reverse_arange((2, 3, 4), numpy, dtype)
c = testing.shaped_arange((2, 3, 4), numpy, '?')
numpy.copyto(a, b, where=c)
return a
for dev1, dev2, dev3, dev4 in itertools.product(*[range(ngpus)] * 4):
if dev1 == dev2 == dev3 == dev4:
continue
if not dev1 <= dev2 <= dev3 <= dev4:
continue
with cuda.Device(dev1):
a = testing.shaped_arange((2, 3, 4), cupy, dtype)
with cuda.Device(dev2):
b = testing.shaped_reverse_arange((2, 3, 4), cupy, dtype)
with cuda.Device(dev3):
c = testing.shaped_arange((2, 3, 4), cupy, '?')
with cuda.Device(dev4):
with six.assertRaisesRegex(
self, ValueError,
'^Array device must be same as the current device'):
cupy.copyto(a, b, where=c)
def check_type_mismatch(self, x_data):
xp = cuda.get_array_module(x_data)
class DummyFunction(chainer.Function):
label = 'dummy_function'
def forward(self, inputs):
return xp.array(1, np.float32),
def backward(self, inputs, grads):
return [1]
x = chainer.Variable(x_data)
y = DummyFunction()(x)
with six.assertRaisesRegex(self, TypeError, 'dummy_function'):
y.backward()
def check_shape_mismatch(self, x_data):
xp = cuda.get_array_module(x_data)
class DummyFunction(chainer.Function):
label = 'dummy_function'
def forward(self, inputs):
return xp.array(1, np.float32),
def backward(self, inputs, grads):
return xp.array([1, 2], np.float32),
x = chainer.Variable(x_data)
y = DummyFunction()(x)
with six.assertRaisesRegex(self, ValueError, 'dummy_function'):
y.backward()
def test_version_operation_mismatch(self):
"""
Test that an OperationNotSupported error is generated when trying to
invoke an operation unsupported by a specific version of KMIP.
"""
e = engine.KmipEngine()
e._logger = mock.MagicMock()
e._protocol_version = contents.ProtocolVersion.create(1, 0)
args = (None, )
regex = "DiscoverVersions is not supported by KMIP {0}".format(
e._protocol_version
)
six.assertRaisesRegex(
self,
exceptions.OperationNotSupported,
regex,
e._process_discover_versions,
*args
)
def test_process_batch_missing_batch_id(self):
"""
Test that an InvalidMessage error is generated while processing a
batch with missing batch IDs.
"""
e = engine.KmipEngine()
e._logger = mock.MagicMock()
batch = list([
messages.RequestBatchItem(),
messages.RequestBatchItem()
])
args = (batch, None, None)
six.assertRaisesRegex(
self,
exceptions.InvalidMessage,
"Batch item ID is undefined.",
e._process_batch,
*args
)
def test_unsupported_operation(self):
"""
Test that an OperationNotSupported error is generated when invoking
an operation not supported by the server.
"""
e = engine.KmipEngine()
e._logger = mock.MagicMock()
args = (enums.Operation.POLL, None)
regex = "{0} operation is not supported by the server.".format(
args[0].name.title()
)
six.assertRaisesRegex(
self,
exceptions.OperationNotSupported,
regex,
e._process_operation,
*args
)
def test_get_object_type_missing_object(self):
"""
Test that an ItemNotFound error is generated when attempting to
retrieve the object type of an object that does not exist.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._logger = mock.MagicMock()
args = ('1', )
regex = "Could not locate object: 1"
six.assertRaisesRegex(
self,
exceptions.ItemNotFound,
regex,
e._get_object_type,
*args
)
e._data_session.commit()
e._logger.warning.assert_called_once_with(
"Could not identify object type for object: 1"
)
def test_register_unsupported_object_type(self):
"""
Test that an InvalidField error is generated when attempting to
register an unsupported object type.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._logger = mock.MagicMock()
object_type = attributes.ObjectType(enums.ObjectType.SPLIT_KEY)
payload = register.RegisterRequestPayload(object_type=object_type)
args = (payload, )
regex = "The SplitKey object type is not supported."
six.assertRaisesRegex(
self,
exceptions.InvalidField,
regex,
e._process_register,
*args
)
def test_request_omitting_secret(self):
"""
Test that an InvalidField error is generate when trying to register
a secret in absentia.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._logger = mock.MagicMock()
object_type = attributes.ObjectType(enums.ObjectType.SYMMETRIC_KEY)
payload = register.RegisterRequestPayload(object_type=object_type)
args = (payload, )
regex = "Cannot register a secret in absentia."
six.assertRaisesRegex(
self,
exceptions.InvalidField,
regex,
e._process_register,
*args
)
def test_version_operation_mismatch(self):
"""
Test that an OperationNotSupported error is generated when trying to
invoke an operation unsupported by a specific version of KMIP.
"""
e = engine.KmipEngine()
e._logger = mock.MagicMock()
e._protocol_version = contents.ProtocolVersion.create(1, 0)
args = (None, )
regex = "DiscoverVersions is not supported by KMIP {0}".format(
e._protocol_version
)
six.assertRaisesRegex(
self,
exceptions.OperationNotSupported,
regex,
e._process_discover_versions,
*args
)
def test_process_batch_missing_batch_id(self):
"""
Test that an InvalidMessage error is generated while processing a
batch with missing batch IDs.
"""
e = engine.KmipEngine()
e._logger = mock.MagicMock()
batch = list([
messages.RequestBatchItem(),
messages.RequestBatchItem()
])
args = (batch, None, None)
six.assertRaisesRegex(
self,
exceptions.InvalidMessage,
"Batch item ID is undefined.",
e._process_batch,
*args
)
def test_unsupported_operation(self):
"""
Test that an OperationNotSupported error is generated when invoking
an operation not supported by the server.
"""
e = engine.KmipEngine()
e._logger = mock.MagicMock()
args = (enums.Operation.POLL, None)
regex = "{0} operation is not supported by the server.".format(
args[0].name.title()
)
six.assertRaisesRegex(
self,
exceptions.OperationNotSupported,
regex,
e._process_operation,
*args
)
def test_get_object_type_missing_object(self):
"""
Test that an ItemNotFound error is generated when attempting to
retrieve the object type of an object that does not exist.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._logger = mock.MagicMock()
args = ('1', )
regex = "Could not locate object: 1"
six.assertRaisesRegex(
self,
exceptions.ItemNotFound,
regex,
e._get_object_type,
*args
)
e._data_session.commit()
e._logger.warning.assert_called_once_with(
"Could not identify object type for object: 1"
)
def test_register_unsupported_object_type(self):
"""
Test that an InvalidField error is generated when attempting to
register an unsupported object type.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._logger = mock.MagicMock()
object_type = attributes.ObjectType(enums.ObjectType.SPLIT_KEY)
payload = register.RegisterRequestPayload(object_type=object_type)
args = (payload, )
regex = "The SplitKey object type is not supported."
six.assertRaisesRegex(
self,
exceptions.InvalidField,
regex,
e._process_register,
*args
)
def test_request_omitting_secret(self):
"""
Test that an InvalidField error is generate when trying to register
a secret in absentia.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._logger = mock.MagicMock()
object_type = attributes.ObjectType(enums.ObjectType.SYMMETRIC_KEY)
payload = register.RegisterRequestPayload(object_type=object_type)
args = (payload, )
regex = "Cannot register a secret in absentia."
six.assertRaisesRegex(
self,
exceptions.InvalidField,
regex,
e._process_register,
*args
)
def test_config_bad_urls(self, mock_input, mock_load_config, mock_save_config):
test_urls = [
'foo.com',
'ftp://foo.com',
'blah://bar.com',
'http://foo.bar.com/baz',
]
# test general URL setting -- result should match initial state
mock_load_config.return_value = {}
initial_url = command.get_registry_url()
for test_url in test_urls:
mock_input.return_value = test_url
with assertRaisesRegex(self, command.CommandException, 'Invalid URL'):
command.config()
assert mock_input.called
mock_input.reset_mock()
mock_save_config.assert_not_called()
assert command.get_registry_url() == initial_url
def test_build_checks_yaml_syntax_error(self): # pylint: disable=C0103
path = os.path.abspath(os.path.dirname(__file__))
buildfilepath = os.path.join(path, 'build_checks_bad_syntax.yml')
checksorigpath = os.path.join(path, 'checks_bad_syntax.yml')
checksfilepath = os.path.join(path, 'checks.yml')
try:
origdir = os.curdir
os.chdir(path)
assert not os.path.exists(checksfilepath)
shutil.copy(checksorigpath, checksfilepath)
with assertRaisesRegex(self, command.CommandException, r'Bad yaml syntax.*checks\.yml'):
command.build('user/test', buildfilepath)
finally:
os.remove(checksfilepath)
os.chdir(origdir)
def test_git_clone_fail(self):
git_url = 'https://github.com/quiltdata/testdata.git'
def mock_git_clone(cmd):
# test git command
assert len(cmd) == 6
assert cmd[:5] == ['git', 'clone', '-q', '--depth=1', git_url]
# fake git clone fail
raise Exception()
with patch('subprocess.check_call', mock_git_clone):
with self.assertRaises(command.CommandException):
command.build('user/pkg__test_git_clone_fail', git_url)
# TODO: running -n (pytest-xdist) there's leaky state and can throw
# either ImportError: cannot import name or ModuleNotFoundError
with assertRaisesRegex(self, Exception, r'cannot import|not found|No module named|Could not find'):
from quilt.data.user import pkg__test_git_clone_fail
def test_build_package(self):
def run_build(build_fn=None, checks_fn=None, expect_error=False):
build_fn = os.path.join(os.path.dirname(__file__), build_fn) if build_fn else None
checks_fn = os.path.join(os.path.dirname(__file__), checks_fn) if checks_fn else None
if expect_error:
with assertRaisesRegex(self, IOError, 'doesnt_exist.yml'):
build.build_package('foox', 'barx', build_fn, checks_fn, dry_run=True)
else:
build.build_package('foox', 'barx', build_fn, checks_fn, dry_run=True)
run_build("build_simple_checks.yml", "checks_simple.yml")
run_build("doesnt_exist.yml", "checks_simple.yml", True)
run_build("build_simple_checks.yml", "doesnt_exist.yml", True)
run_build("build_simple_checks.yml", None)
# bad yaml files
with assertRaisesRegex(self, yaml.parser.ParserError, 'expected'):
run_build("build_simple_checks.yml", "test_checks.py")
with assertRaisesRegex(self, yaml.parser.ParserError, 'expected'):
run_build("test_checks.py", None)
def test_bad_contents_hash(self):
"""
Test that a package with a bad contents hash fails installation.
"""
tabledata = b'Bad package'
h = hashlib.new(HASH_TYPE)
h.update(tabledata)
obj_hash = h.hexdigest()
contents = GroupNode(dict(
foo=GroupNode(dict(
bar=TableNode([obj_hash], PackageFormat.default.value)
))
))
contents_hash = 'e867010701edc0b1c8be177e02a93aa3cb1342bb1123046e1f6b40e428c6048e'
self._mock_tag('foo/bar', 'latest', contents_hash)
self._mock_package('foo/bar', contents_hash, '', contents, [obj_hash])
with assertRaisesRegex(self, command.CommandException, "Mismatched hash"):
command.install('foo/bar')
assert not os.path.exists(os.path.join(self._store_dir, 'foo/bar.json'))
def test_invalid_target_field(self):
"""
Tests that a validation error is raised if the target_field is not
present in the Bottle associated with the Fitting's Condenser.
"""
field = BottleField.objects.get_by_natural_key('subject')
fitting = MailFitting(
condenser=self.condenser,
target_field=field,
object_id=2,
content_type=self.parser_type
)
with six.assertRaisesRegex(self, ValidationError, 'The selected '
'target field is not compatible with the '
'condenser\'s bottle.'):
fitting.clean()
def test_target_is_not_embebbed_doc(self):
"""
Tests that a validation error is raised if the target field is not an
EmbeddedDocument and the content type is a Condenser.
"""
field = BottleField.objects.get_by_natural_key('location')
fitting = MailFitting(
condenser=self.condenser,
target_field=field,
object_id=2,
content_type=self.condenser_type
)
with six.assertRaisesRegex(self, ValidationError, 'Unless the '
'target field is an EmbeddedDocument, '
'the content type must be a parser.'):
fitting.clean()
def test_fitting_is_not_condenser(self):
"""
Tests that a validation error is raised if the target field is an
EmbeddedDocument and the content type is not a Condenser.
"""
field = BottleField.objects.get_by_natural_key('content')
fitting = MailFitting(
condenser=self.condenser,
target_field=field,
object_id=3,
content_type=self.parser_type
)
with six.assertRaisesRegex(self, ValidationError, 'If the '
'target field is an EmbeddedDocument, '
'the content type must be a condenser.'):
fitting.clean()
def test_invalid_target_field(self):
"""
Tests that a validation error is raised if the target_field is not
present in the Bottle associated with the Fitting's Condenser.
"""
field = BottleField.objects.get_by_natural_key('subject')
fitting = DataFitting(
condenser=self.condenser,
target_field=field,
object_id=2,
content_type=self.parser_type
)
with six.assertRaisesRegex(self, ValidationError, 'The selected '
'target field is not compatible with the '
'condenser\'s bottle.'):
fitting.clean()
def test_target_is_not_embebbed_doc(self):
"""
Tests that a validation error is raised if the target field is not an
EmbeddedDocument and the content type is a Condenser.
"""
field = BottleField.objects.get_by_natural_key('location')
fitting = DataFitting(
condenser=self.condenser,
target_field=field,
object_id=2,
content_type=self.condenser_type
)
with six.assertRaisesRegex(self, ValidationError, 'Unless the '
'target field is an EmbeddedDocument, '
'the content type must be a parser.'):
fitting.clean()
def test_invalid_target_field(self):
"""
Tests that a validation error is raised if the target_field is not
present in the Bottle associated with the Fitting's Condenser.
"""
field = BottleField.objects.get_by_natural_key('subject')
fitting = LogFitting(
condenser=self.condenser,
target_field=field,
object_id=2,
content_type=self.parser_type
)
with six.assertRaisesRegex(self, ValidationError, 'The selected '
'target field is not compatible with the '
'condenser\'s bottle.'):
fitting.clean()
def test_target_is_not_embebbed_doc(self):
"""
Tests that a validation error is raised if the target field is not an
EmbeddedDocument and the content type is a Condenser.
"""
field = BottleField.objects.get_by_natural_key('location')
fitting = LogFitting(
condenser=self.condenser,
target_field=field,
object_id=2,
content_type=self.condenser_type
)
with six.assertRaisesRegex(self, ValidationError, 'Unless the '
'target field is an EmbeddedDocument, '
'the content type must be a parser.'):
fitting.clean()
def test_fitting_is_not_condenser(self):
"""
Tests that a validation error is raised if the target field is an
EmbeddedDocument and the content type is not a Condenser.
"""
field = BottleField.objects.get_by_natural_key('content')
fitting = LogFitting(
condenser=self.condenser,
target_field=field,
object_id=3,
content_type=self.parser_type
)
with six.assertRaisesRegex(self, ValidationError, 'If the '
'target field is an EmbeddedDocument, '
'the content type must be a condenser.'):
fitting.clean()
def test_lambada_class(self):
"""Validate the base lambada class."""
tune = lambada.Lambada()
# Make sure we make the attributes we need
self.assertIsNotNone(getattr(tune, 'dancers'))
self.assertIsNotNone(getattr(tune, 'config'))
# Create a dancer and call it
tune.dancers['test'] = MagicMock()
context = LambdaContext('test')
tune('hi', context)
tune.dancers['test'].assert_called()
tune.dancers['test'].assert_called_with('hi', context)
# Try a dancer that doesn't exist
context = LambdaContext('nope')
with assertRaisesRegex(self, Exception, 'No matching dancer'):
tune('bye', context)