def testMarkWithShrinkingBatchSize_raisesValueError(self):
id_ = db.RUN_ROWID.create(1, 1)
event1 = tf.Event(step=123)
event2 = tf.Event(step=456)
path1 = self._save_records('events.out.tfevents.1.localhost',
[event1.SerializeToString()])
path2 = self._save_records('events.out.tfevents.2.localhost',
[event2.SerializeToString()])
with self.connect_db() as db_conn:
with self.EventLog(path1) as log1, self.EventLog(path2) as log2:
with loader.RunReader(id_, 'doodle') as run:
run.add_event_log(db_conn, log1)
run.add_event_log(db_conn, log2)
run.mark()
self.assertEqual(event1, run.get_next_event())
self.assertEqual(event2, run.get_next_event())
self.assertIsNone(run.get_next_event())
run.reset()
self.assertEqual(event1, run.get_next_event())
with six.assertRaisesRegex(self, ValueError, r'monotonic'):
run.mark()
python类assertRaisesRegex()的实例源码
def _test_distributions(self, run_name, tag_name, should_work=True):
self.set_up_with_runs([self._RUN_WITH_SCALARS,
self._RUN_WITH_LEGACY_DISTRIBUTION,
self._RUN_WITH_DISTRIBUTION])
if should_work:
(data, mime_type) = self.plugin.distributions_impl(tag_name, run_name)
self.assertEqual('application/json', mime_type)
self.assertEqual(len(data), self._STEPS)
for i in xrange(self._STEPS):
[_unused_wall_time, step, bps_and_icdfs] = data[i]
self.assertEqual(i, step)
(bps, _unused_icdfs) = zip(*bps_and_icdfs)
self.assertEqual(bps, compressor.NORMAL_HISTOGRAM_BPS)
else:
with six.assertRaisesRegex(self, ValueError, 'No histogram tag'):
self.plugin.distributions_impl(self._DISTRIBUTION_TAG, run_name)
def _test(self, name, should_be_okay):
temp_dir = tempfile.mkdtemp(prefix=self.get_temp_dir())
self.addCleanup(shutil.rmtree, temp_dir)
multiplexer = event_multiplexer.EventMultiplexer(
size_guidance=application.DEFAULT_SIZE_GUIDANCE,
purge_orphaned_data=True)
plugins = [
FakePlugin(
None, plugin_name='foo', is_active_value=True, routes_mapping={}),
FakePlugin(
None, plugin_name=name, is_active_value=True, routes_mapping={}),
FakePlugin(
None, plugin_name='bar', is_active_value=False, routes_mapping={}),
]
if should_be_okay:
application.TensorBoardWSGIApp(
temp_dir, plugins, multiplexer, reload_interval=0,
path_prefix='')
else:
with six.assertRaisesRegex(self, ValueError, r'invalid name'):
application.TensorBoardWSGIApp(
temp_dir, plugins, multiplexer, reload_interval=0,
path_prefix='')
def _test(self, route, should_be_okay):
temp_dir = tempfile.mkdtemp(prefix=self.get_temp_dir())
self.addCleanup(shutil.rmtree, temp_dir)
multiplexer = event_multiplexer.EventMultiplexer(
size_guidance=application.DEFAULT_SIZE_GUIDANCE,
purge_orphaned_data=True)
plugins = [
FakePlugin(
None,
plugin_name='foo',
is_active_value=True,
routes_mapping={route: lambda environ, start_response: None}),
]
if should_be_okay:
application.TensorBoardWSGIApp(
temp_dir, plugins, multiplexer, reload_interval=0, path_prefix='')
else:
with six.assertRaisesRegex(self, ValueError, r'invalid route'):
application.TensorBoardWSGIApp(
temp_dir, plugins, multiplexer, reload_interval=0, path_prefix='')
def testPluginTagToContent_PluginsCannotJumpOnTheBandwagon(self):
# If there are multiple `SummaryMetadata` for a given tag, and the
# set of plugins in the `plugin_data` of second is different from
# that of the first, then the second set should be ignored.
logdir = self.get_temp_dir()
summary_metadata_1 = tf.SummaryMetadata(
display_name='current tagee',
summary_description='no',
plugin_data=tf.SummaryMetadata.PluginData(plugin_name='outlet',
content=b'120v'))
self._writeMetadata(logdir, summary_metadata_1, nonce='1')
acc = ea.EventAccumulator(logdir)
acc.Reload()
summary_metadata_2 = tf.SummaryMetadata(
display_name='tagee of the future',
summary_description='definitely not',
plugin_data=tf.SummaryMetadata.PluginData(plugin_name='plug',
content=b'110v'))
self._writeMetadata(logdir, summary_metadata_2, nonce='2')
acc.Reload()
self.assertEqual(acc.PluginTagToContent('outlet'),
{'you_are_it': b'120v'})
with six.assertRaisesRegex(self, KeyError, 'plug'):
acc.PluginTagToContent('plug')
def testPluginTagToContent_PluginsCannotJumpOnTheBandwagon(self):
# If there are multiple `SummaryMetadata` for a given tag, and the
# set of plugins in the `plugin_data` of second is different from
# that of the first, then the second set should be ignored.
logdir = self.get_temp_dir()
summary_metadata_1 = tf.SummaryMetadata(
display_name='current tagee',
summary_description='no',
plugin_data=tf.SummaryMetadata.PluginData(plugin_name='outlet',
content=b'120v'))
self._writeMetadata(logdir, summary_metadata_1, nonce='1')
acc = ea.EventAccumulator(logdir)
acc.Reload()
summary_metadata_2 = tf.SummaryMetadata(
display_name='tagee of the future',
summary_description='definitely not',
plugin_data=tf.SummaryMetadata.PluginData(plugin_name='plug',
content=b'110v'))
self._writeMetadata(logdir, summary_metadata_2, nonce='2')
acc.Reload()
self.assertEqual(acc.PluginTagToContent('outlet'),
{'you_are_it': b'120v'})
with six.assertRaisesRegex(self, KeyError, 'plug'):
acc.PluginTagToContent('plug')
def testEmptyDirectQuery(self):
from girder.plugins.database_assetstore import query
# Test that queries fail with no connector
with six.assertRaisesRegex(self, Exception,
'Failed to connect'):
query.queryDatabase(None, {}, {})
def testInvalidParameters(self):
# Test conditions that should return None
from girder.plugins.database_assetstore import assetstore
from girder.plugins.database_assetstore.assetstore import DB_INFO_KEY
self.assertIsNone(assetstore.getDbInfoForFile({}))
self.assertIsNone(assetstore.getDbInfoForFile(
{DB_INFO_KEY: {}, 'assetstoreId': 'unknown'}, {'type': 'unknown'}))
self.assertEqual(assetstore.getQueryParamsForFile({}), {})
self.assertEqual(assetstore.getQueryParamsForFile(
{DB_INFO_KEY: {'a': 'b'}}), {})
self.assertEqual(assetstore.getQueryParamsForFile(
{DB_INFO_KEY: {'sort': 'b'}}), {'sort': 'b'})
# Test with non-database assetstore
resp = self.request(path='/assetstore', method='GET', user=self.admin)
self.assertStatusOk(resp)
self.assertEqual(1, len(resp.json))
assetstore1 = resp.json[0]
self.assertIsNone(assetstore.validateFile(
{DB_INFO_KEY: {}, 'assetstoreId': str(assetstore1['_id'])}))
# Test database validation
resp = self.request(path='/assetstore', method='POST', user=self.admin,
params=self.dbParams2)
self.assertStatusOk(resp)
assetstore1 = resp.json
with six.assertRaisesRegex(self, Exception,
'must have a non-blank database'):
self.assertIsNone(assetstore.validateFile({
DB_INFO_KEY: {'table': 'sample'},
'assetstoreId': str(assetstore1['_id'])}))
def test_fc_raises(self):
six.assertRaisesRegex(
self, TypeError, 'FC input dtype must be float32', tdl.FC(1),
tf.constant([0], dtype='int64'))
six.assertRaisesRegex(
self, TypeError, 'FC input shape must be 1D', tdl.FC(1),
tf.constant(0, dtype='float32'))
fc = tdl.FC(1)
fc(tf.constant([[0]], 'float32'))
six.assertRaisesRegex(
self, TypeError, 'Type mismatch between input type', fc,
tf.constant([[0, 0]], 'float32'))
def test_embedding_raises(self):
self.assertRaises(ValueError, tdl.Embedding, 2, 2, np.zeros([3, 3]))
six.assertRaisesRegex(
self, TypeError, 'Embeddings take scalar inputs.', tdl.Embedding(2, 2),
tf.constant([[0, 0]], 'int32'))
six.assertRaisesRegex(
self, TypeError, 'Embeddings take integer inputs.', tdl.Embedding(2, 2),
tf.constant([0], 'float32'))
def test_metrics_raises(self):
sp0 = _pos_neg_block([])
spn = _pos_neg_block([2])
block = {'foo': sp0, 'bar:': spn} >> tdb.Concat()
six.assertRaisesRegex(
self, TypeError, 'Metric [a-z]+tive has incompatible types',
tdc.Compiler.create, block)
def test_malformed(self):
six.assertRaisesRegex(
self, ValueError, 'Spec "foo" doesn\'t contain any key value pair',
plan.parse_spec, 'foo')
six.assertRaisesRegex(
self, ValueError, 'Duplicate key foo',
plan.parse_spec, 'foo=3.0,foo=bar')
six.assertRaisesRegex(
self, ValueError, 'Empty value for key bar',
plan.parse_spec, 'foo=3.0,bar=')
def test_bad_optimizer(self):
six.assertRaisesRegex(
self, ValueError, 'Unrecognized optimizer: magic',
plan.build_optimizer_from_params, 'magic')
def test_missing_argument(self):
six.assertRaisesRegex(
self, ValueError, 'The adagrad optimizer requires learning_rate '
'to be set.', plan.build_optimizer_from_params, 'adagrad')
def test_bad_algorithm(self):
six.assertRaisesRegex(
self, ValueError, 'Unknown algorithm: foo',
plan.build_learning_rate_decay_from_params,
{'algorithm': 'foo'}, None, 0.01)
def test_missing_learning_rate(self):
six.assertRaisesRegex(
self, ValueError, 'Missing learning_rate field',
plan.build_learning_rate_decay_from_params,
{'algorithm': 'exponential_decay'}, None, None)
def test_missing_algorithm(self):
six.assertRaisesRegex(
self, ValueError, 'Missing algorithm field',
plan.build_learning_rate_decay_from_params,
{'foo': 'bar'}, None, 0.01)
def test_init_raises(self):
six.assertRaisesRegex(
self, TypeError, 'root must have at least one output',
tdc.Compiler.create, tdb.Record([]))
six.assertRaisesRegex(
self, TypeError, 'root outputs must all be tensors',
tdc.Compiler.create, tdb.GetItem('foo'))
six.assertRaisesRegex(
self, TypeError, 'root output may not contain sequences',
tdc.Compiler.create, tdb.Map(tdb.Scalar()))
def test_composition_rasies_foreign_io(self):
a = tdb.Scalar()
c = tdb.Composition([a])
c2 = tdb.Composition()
six.assertRaisesRegex(
self, ValueError, 'is the input or output of a different composition',
c.connect, c2.input, a)
def test_composition_raises_unused(self):
fn0 = tdb.Scalar()
fn1 = times_scalar_block(2.0)
c = tdb.Composition([fn1, fn0])
c.connect(c.input, fn0)
c.connect(fn0, fn1)
c.connect(fn0, c.output)
six.assertRaisesRegex(
self, TypeError, 'children have unused outputs: .*', c._validate, None)