def create_infer_func(layers):
Xa, Xb = T.tensor4('Xa'), T.tensor4('Xb')
Xa_batch, Xb_batch = T.tensor4('Xa_batch'), T.tensor4('Xb_batch')
Tp = get_output(
layers['trans'],
inputs={
layers['inputa']: Xa, layers['inputb']: Xb,
}, deterministic=True,
)
infer_func = theano.function(
inputs=[theano.In(Xa_batch), theano.In(Xb_batch)],
outputs=Tp,
givens={
Xa: Xa_batch, Xb: Xb_batch, # Ia, Ib
}
)
return infer_func
python类In()的实例源码
def test_sparse_shared_memory():
# Note : There are no inplace ops on sparse matrix yet. If one is
# someday implemented, we could test it here.
a = random_lil((3, 4), 'float32', 3).tocsr()
m1 = random_lil((4, 4), 'float32', 3).tocsr()
m2 = random_lil((4, 4), 'float32', 3).tocsr()
x = SparseType('csr', dtype='float32')()
y = SparseType('csr', dtype='float32')()
sdot = theano.sparse.structured_dot
z = sdot(x * 3, m1) + sdot(y * 2, m2)
f = theano.function([theano.In(x, mutable=True),
theano.In(y, mutable=True)], z, mode='FAST_RUN')
def f_(x, y, m1=m1, m2=m2):
return ((x * 3) * m1) + ((y * 2) * m2)
assert SparseType.may_share_memory(a, a) # This is trivial
result = f(a, a)
result_ = f_(a, a)
assert (result_.todense() == result.todense()).all()
def test_state_access(self):
a = T.scalar() # the a is for 'anonymous' (un-named).
x, s = T.scalars('xs')
f = function([x, In(a, value=1.0, name='a'), In(s, value=0.0, update=s + a * x)], s + a * x)
self.assertTrue(f[a] == 1.0)
self.assertTrue(f[s] == 0.0)
self.assertTrue(f(3.0) == 3.0)
self.assertTrue(f(3.0, a=2.0) == 9.0) # 3.0 + 2*3.0
self.assertTrue(f[a] == 1.0) # state hasn't changed permanently, we just overrode it last line
self.assertTrue(f[s] == 9.0)
f[a] = 5.0
self.assertTrue(f[a] == 5.0)
self.assertTrue(f(3.0) == 24.0) # 9 + 3*5
self.assertTrue(f[s] == 24.0)
def test_copy(self):
a = T.scalar() # the a is for 'anonymous' (un-named).
x, s = T.scalars('xs')
f = function([x, In(a, value=1.0, name='a'),
In(s, value=0.0, update=s + a * x, mutable=True)],
s + a * x)
g = copy.copy(f)
# if they both return, assume that they return equivalent things.
self.assertFalse(g.container[x].storage is f.container[x].storage)
self.assertFalse(g.container[a].storage is f.container[a].storage)
self.assertFalse(g.container[s].storage is f.container[s].storage)
self.assertFalse(g.value[a] is not f.value[a]) # should not have been copied
self.assertFalse(g.value[s] is f.value[s]) # should have been copied because it is mutable.
self.assertFalse((g.value[s] != f.value[s]).any()) # its contents should be identical
self.assertTrue(f(2, 1) == g(2)) # they should be in sync, default value should be copied.
self.assertTrue(f(2, 1) == g(2)) # they should be in sync, default value should be copied.
f(1, 2) # put them out of sync
self.assertFalse(f(1, 2) == g(1, 2)) # they should not be equal anymore.
def test_shared_state0(self):
a = T.scalar() # the a is for 'anonymous' (un-named).
x, s = T.scalars('xs')
f = function([x, In(a, value=1.0, name='a'),
In(s, value=0.0, update=s + a * x, mutable=True)],
s + a * x)
g = function([x, In(a, value=1.0, name='a'),
In(s, value=f.container[s], update=s - a * x, mutable=True)],
s + a * x)
f(1, 2)
self.assertTrue(f[s] == 2)
self.assertTrue(g[s] == 2)
g(1, 2)
self.assertTrue(f[s] == 0)
self.assertTrue(g[s] == 0)
def test_shared_state2(self):
a = T.scalar() # the a is for 'anonymous' (un-named).
x, s = T.scalars('xs')
f = function([x, In(a, value=1.0, name='a'), In(s, value=0.0, update=s + a * x,
mutable=False)], s + a * x)
g = function([x, In(a, value=1.0, name='a'), In(s, value=f.container[s])], s + a * x)
f(1, 2)
self.assertTrue(f[s] == 2)
self.assertTrue(g[s] == 2)
f(1, 2)
self.assertTrue(f[s] == 4)
self.assertTrue(g[s] == 4)
g(1, 2) # has no effect on state
self.assertTrue(f[s] == 4)
self.assertTrue(g[s] == 4)
def test_shared_state_not_implicit(self):
# This test is taken from the documentation in
# doc/topics/function.txt. If it does not pass anymore and yet the
# behavior is still intended the doc and the test should both be
# updated accordingly.
x, s = T.scalars('xs')
inc = function([x, In(s, update=(s + x), value=10.0)], [])
dec = function([x, In(s, update=(s - x), value=inc.container[s],
implicit=False)], [])
self.assertTrue(dec[s] is inc[s])
inc[s] = 2
self.assertTrue(dec[s] == 2)
dec(1)
self.assertTrue(inc[s] == 1)
dec(1, 0)
self.assertTrue(inc[s] == -1)
self.assertTrue(dec[s] == -1)
def test_borrow_input(self):
"""
Tests that the contract for io.In is respected. When borrow=False, it should be
impossible for outputs to be aliased to the input variables provided by the user,
either through a view-map or a destroy map. New tests should be added in the future
when borrow=True is implemented.
"""
a = T.dmatrix()
aval = numpy.random.rand(3, 3)
# when borrow=False, test that a destroy map cannot alias output to input
f = theano.function([In(a, borrow=False)], Out(a + 1, borrow=True))
assert numpy.all(f(aval) == aval + 1)
assert not numpy.may_share_memory(aval, f(aval))
# when borrow=False, test that a viewmap cannot alias output to input
f = theano.function([In(a, borrow=False)], Out(a[0, :], borrow=True))
assert numpy.all(f(aval) == aval[0, :])
assert not numpy.may_share_memory(aval, f(aval))
def __init__(self):
a = T.scalar() # the a is for 'anonymous' (un-named).
x, s = T.scalars('xs')
v = T.vector('v')
self.s = s
self.x = x
self.v = v
self.e = a * x + s
self.f1 = function([x, In(a, value=1.0, name='a'),
In(s, value=0.0, update=s + a * x, mutable=True)],
s + a * x)
self.f2 = function([x, In(a, value=1.0, name='a'),
In(s, value=self.f1.container[s], update=s + a * x,
mutable=True)],
s + a * x)
def test_doc(self):
"""Ensure the code given in pfunc.txt works as expected"""
# Example #1.
a = lscalar()
b = shared(1)
f1 = pfunc([a], (a + b))
f2 = pfunc([In(a, value=44)], a + b, updates={b: b + 1})
self.assertTrue(b.get_value() == 1)
self.assertTrue(f1(3) == 4)
self.assertTrue(f2(3) == 4)
self.assertTrue(b.get_value() == 2)
self.assertTrue(f1(3) == 5)
b.set_value(0)
self.assertTrue(f1(3) == 3)
# Example #2.
a = tensor.lscalar()
b = shared(7)
f1 = pfunc([a], a + b)
f2 = pfunc([a], a * b)
self.assertTrue(f1(5) == 12)
b.set_value(8)
self.assertTrue(f1(5) == 13)
self.assertTrue(f2(4) == 32)
def test_param_strict(self):
a = tensor.dvector()
b = shared(7)
out = a + b
f = pfunc([In(a, strict=False)], [out])
# works, rand generates float64 by default
f(numpy.random.rand(8))
# works, casting is allowed
f(numpy.array([1, 2, 3, 4], dtype='int32'))
f = pfunc([In(a, strict=True)], [out])
try:
# fails, f expects float64
f(numpy.array([1, 2, 3, 4], dtype='int32'))
except TypeError:
pass
def test_param_allow_downcast_floatX(self):
a = tensor.fscalar('a')
b = tensor.fscalar('b')
c = tensor.fscalar('c')
f = pfunc([In(a, allow_downcast=True),
In(b, allow_downcast=False),
In(c, allow_downcast=None)],
(a + b + c))
# If the values can be accurately represented, everything is OK
assert numpy.all(f(0, 0, 0) == 0)
# If allow_downcast is True, idem
assert numpy.allclose(f(0.1, 0, 0), 0.1)
# If allow_downcast is False, nope
self.assertRaises(TypeError, f, 0, 0.1, 0)
# If allow_downcast is None, it should work iff floatX=float32
if config.floatX == 'float32':
assert numpy.allclose(f(0, 0, 0.1), 0.1)
else:
self.assertRaises(TypeError, f, 0, 0, 0.1)
def test_param_allow_downcast_vector_floatX(self):
a = tensor.fvector('a')
b = tensor.fvector('b')
c = tensor.fvector('c')
f = pfunc([In(a, allow_downcast=True),
In(b, allow_downcast=False),
In(c, allow_downcast=None)],
(a + b + c))
# If the values can be accurately represented, everything is OK
z = [0]
assert numpy.all(f(z, z, z) == 0)
# If allow_downcast is True, idem
assert numpy.allclose(f([0.1], z, z), 0.1)
# If allow_downcast is False, nope
self.assertRaises(TypeError, f, z, [0.1], z)
# If allow_downcast is None, like False
self.assertRaises(TypeError, f, z, z, [0.1])
def test_append_inplace(self):
mySymbolicMatricesList = TypedListType(T.TensorType(
theano.config.floatX, (False, False)))()
mySymbolicMatrix = T.matrix()
z = Append()(mySymbolicMatricesList, mySymbolicMatrix)
m = theano.compile.mode.get_default_mode().including("typed_list_inplace_opt")
f = theano.function([In(mySymbolicMatricesList, borrow=True,
mutable=True),
In(mySymbolicMatrix, borrow=True,
mutable=True)], z, accept_inplace=True, mode=m)
self.assertTrue(f.maker.fgraph.toposort()[0].op.inplace)
x = rand_ranged_matrix(-1000, 1000, [100, 101])
y = rand_ranged_matrix(-1000, 1000, [100, 101])
self.assertTrue(numpy.array_equal(f([x], y), [x, y]))
def test_extend_inplace(self):
mySymbolicMatricesList1 = TypedListType(T.TensorType(
theano.config.floatX, (False, False)))()
mySymbolicMatricesList2 = TypedListType(T.TensorType(
theano.config.floatX, (False, False)))()
z = Extend()(mySymbolicMatricesList1, mySymbolicMatricesList2)
m = theano.compile.mode.get_default_mode().including("typed_list_inplace_opt")
f = theano.function([In(mySymbolicMatricesList1, borrow=True,
mutable=True), mySymbolicMatricesList2],
z, mode=m)
self.assertTrue(f.maker.fgraph.toposort()[0].op.inplace)
x = rand_ranged_matrix(-1000, 1000, [100, 101])
y = rand_ranged_matrix(-1000, 1000, [100, 101])
self.assertTrue(numpy.array_equal(f([x], [y]), [x, y]))
def test_insert_inplace(self):
mySymbolicMatricesList = TypedListType(T.TensorType(
theano.config.floatX, (False, False)))()
mySymbolicIndex = T.scalar(dtype='int64')
mySymbolicMatrix = T.matrix()
z = Insert()(mySymbolicMatricesList, mySymbolicIndex, mySymbolicMatrix)
m = theano.compile.mode.get_default_mode().including("typed_list_inplace_opt")
f = theano.function([In(mySymbolicMatricesList, borrow=True,
mutable=True), mySymbolicIndex, mySymbolicMatrix],
z, accept_inplace=True, mode=m)
self.assertTrue(f.maker.fgraph.toposort()[0].op.inplace)
x = rand_ranged_matrix(-1000, 1000, [100, 101])
y = rand_ranged_matrix(-1000, 1000, [100, 101])
self.assertTrue(numpy.array_equal(f([x], numpy.asarray(1,
dtype='int64'), y), [x, y]))
def test_remove_inplace(self):
mySymbolicMatricesList = TypedListType(T.TensorType(
theano.config.floatX, (False, False)))()
mySymbolicMatrix = T.matrix()
z = Remove()(mySymbolicMatricesList, mySymbolicMatrix)
m = theano.compile.mode.get_default_mode().including("typed_list_inplace_opt")
f = theano.function([In(mySymbolicMatricesList, borrow=True,
mutable=True), In(mySymbolicMatrix, borrow=True,
mutable=True)], z, accept_inplace=True, mode=m)
self.assertTrue(f.maker.fgraph.toposort()[0].op.inplace)
x = rand_ranged_matrix(-1000, 1000, [100, 101])
y = rand_ranged_matrix(-1000, 1000, [100, 101])
self.assertTrue(numpy.array_equal(f([x, y], y), [x]))
def _buildOptimizationFunction(self, X, n_steps, plr):
mu_0,logcov_0 = self._inference(X)
optdict = {}
_, logcov_f, elbo_final = self._optimizeVariationalParams(X, mu_0, logcov_0, n_steps, plr,
savedict = optdict)
diff_elbo, _ = self._estimateELBOEntropy(optdict['elbo_its'][0],optdict['elbo_its'][-1], logcov_0, logcov_f)
self.optimize_mu_logcov = theano.function([X, theano.In(n_steps, value=self.params['n_steps'], name='n_steps'),
theano.In(plr, value=self.params['param_lr'], name='plr')],
[optdict['elbo_its'], optdict['gradnorm_mu_its'],
optdict['gradnorm_logcov_its'],optdict['elbo_its'].shape[0], diff_elbo],
name = 'Optimize ELBO wrt mu/cov')
diff_elbo, _ = self._estimateELBOEntropy(optdict['elbo_its'][0], optdict['elbo_its'][-1], logcov_0, logcov_f)
self.final_elbo = theano.function([X, theano.In(n_steps, value=self.params['n_steps'], name='n_steps'),
theano.In(plr, value=self.params['param_lr'], name='plr')],
[optdict['elbo_its'][0],optdict['elbo_its'][-1], optdict['elbo_its'].shape[0],
optdict['gradnorm_mu_its'][-1],optdict['gradnorm_logcov_its'][-1],
diff_elbo], name = 'Optimize ELBO wrt mu/cov')
self.init_final_params = theano.function([X, theano.In(n_steps, value=self.params['n_steps'], name='n_steps'),
theano.In(plr, value=self.params['param_lr'], name='plr')],
[optdict['mu_its'][0],optdict['logcov_its'][0], optdict['mu_its'][-1],
optdict['logcov_its'][-1]], name = 'init/final params')
def pretraining_functions(self, train_set_x, batch_size):
# index to a [mini]batch
index = T.lscalar('index') # index to a minibatch
corruption_level = T.scalar('corruption') # % of corruption to use
learning_rate = T.scalar('lr') # learning rate to use
# begining of a batch, given `index`
batch_begin = index * batch_size
# ending of a batch given `index`
batch_end = batch_begin + batch_size
pretrain_fns = []
for dA in self.dA_layers:
# get the cost and the updates list
cost, updates = dA.get_cost_updates(corruption_level,
learning_rate)
# compile the theano function
fn = theano.function(
inputs=[
index,
theano.In(corruption_level, value=0.2),
theano.In(learning_rate, value=0.1)
],
outputs=cost,
updates=updates,
givens={
self.x: train_set_x[batch_begin: batch_end]
}
)
# append `fn` to the list of functions
pretrain_fns.append(fn)
return pretrain_fns
def create_train_func(layers):
Xa, Xb = T.tensor4('Xa'), T.tensor4('Xb')
Xa_batch, Xb_batch = T.tensor4('Xa_batch'), T.tensor4('Xb_batch')
Tp = get_output(
layers['trans'],
inputs={
layers['inputa']: Xa, layers['inputb']: Xb,
}, deterministic=False,
)
# transforms: ground-truth, predicted
Tg = T.fmatrix('Tg')
Tg_batch = T.fmatrix('Tg_batch')
theta_gt = Tg.reshape((-1, 2, 3))
theta_pr = Tp.reshape((-1, 2, 3))
# grids: ground-truth, predicted
Gg = T.dot(theta_gt, _meshgrid(20, 20))
Gp = T.dot(theta_pr, _meshgrid(20, 20))
train_loss = T.mean(T.sqr(Gg - Gp))
params = get_all_params(layers['trans'], trainable=True)
updates = nesterov_momentum(train_loss, params, 1e-3, 0.9)
corr_func = theano.function(
inputs=[theano.In(Xa_batch), theano.In(Xb_batch), theano.In(Tg_batch)],
outputs=[Tp, train_loss],
updates=updates,
givens={
Xa: Xa_batch, Xb: Xb_batch, # Ia, Ib
Tg: Tg_batch, # transform Ia --> Ib
}
)
return corr_func
def create_valid_func(layers):
Xa, Xb = T.tensor4('Xa'), T.tensor4('Xb')
Xa_batch, Xb_batch = T.tensor4('Xa_batch'), T.tensor4('Xb_batch')
Tp = get_output(
layers['trans'],
inputs={
layers['inputa']: Xa, layers['inputb']: Xb,
}, deterministic=True,
)
# transforms: ground-truth, predicted
Tg = T.fmatrix('Tg')
Tg_batch = T.fmatrix('Tg_batch')
theta_gt = Tg.reshape((-1, 2, 3))
theta_pr = Tp.reshape((-1, 2, 3))
# grids: ground-truth, predicted
Gg = T.dot(theta_gt, _meshgrid(20, 20))
Gp = T.dot(theta_pr, _meshgrid(20, 20))
valid_loss = T.mean(T.sqr(Gg - Gp))
corr_func = theano.function(
inputs=[theano.In(Xa_batch), theano.In(Xb_batch), theano.In(Tg_batch)],
outputs=[Tp, valid_loss],
givens={
Xa: Xa_batch, Xb: Xb_batch, # Ia, Ib
Tg: Tg_batch, # transform Ia --> Ib
}
)
return corr_func
def test_naming_rule4(self):
a = T.scalar() # the a is for 'anonymous' (un-named).
x, s = T.scalars('xs')
f = function([x, In(a, value=1.0, name='a'), s], a / s + x)
self.assertTrue(f(9, 2, 4) == 9.5) # can specify all args in order
self.assertTrue(f(9, 2, s=4) == 9.5) # can give s as kwarg
self.assertTrue(f(9, s=4) == 9.25) # can give s as kwarg, get default a
self.assertTrue(f(9, a=2, s=4) == 9.5) # can give s as kwarg, a as kwarg
self.assertTrue(f(x=9, a=2, s=4) == 9.5) # can give all kwargs
self.assertTrue(f(x=9, s=4) == 9.25) # can give all kwargs
checkfor(self, lambda: f(), TypeError) # takes exactly 3 non-keyword arguments (0 given)
checkfor(self, lambda: f(5.0, x=9), TypeError) # got multiple values for keyword argument 'x'
def test_weird_names(self):
a, x, s = T.scalars('xxx')
checkfor(self, lambda: function([In(a, name=[])], []), TypeError)
def t():
f = function([In(a, name=set(['adsf', ()]), value=1.0),
In(x, name=(), value=2.0),
In(s, name=T.scalar(), value=3.0)], a + x + s)
return f
checkfor(self, t, TypeError)
def test_constant_output(self):
# Test that if the output is a constant, we respect the theano memory interface
f = theano.function([], theano.tensor.constant([4]))
# print f.maker.fgraph.toposort()
out = f()
assert (out == 4).all()
out[0] = 3
out2 = f()
# If the following 2 asserts fail it mean Theano broke it's memory contract.
assert out2 is not out
assert (out2 == 4).all()
# Test that if the output is a constant and borrow, we respect the theano memory interface
f = theano.function([], Out(theano.tensor.constant([4]), borrow=True))
# print f.maker.fgraph.toposort()
out = f()
assert (out == 4).all()
out[0] = 3
out2 = f()
if isinstance(theano.compile.mode.get_default_mode(),
theano.compile.DebugMode):
# In DebugMode, we don't implement optimization based on borrow on the output.
assert (out2 == 4).all()
else:
assert out2 is out
assert (out2 == 3).all()
def test_deepcopy(self):
a = T.scalar() # the a is for 'anonymous' (un-named).
x, s = T.scalars('xs')
f = function([x, In(a, value=1.0, name='a'),
In(s, value=0.0, update=s + a * x, mutable=True)], s + a * x)
try:
g = copy.deepcopy(f)
except NotImplementedError as e:
if e[0].startswith('DebugMode is not picklable'):
return
else:
raise
# if they both return, assume that they return equivalent things.
# print [(k,id(k)) for k in f.finder.keys()]
# print [(k,id(k)) for k in g.finder.keys()]
self.assertFalse(g.container[0].storage is f.container[0].storage)
self.assertFalse(g.container[1].storage is f.container[1].storage)
self.assertFalse(g.container[2].storage is f.container[2].storage)
self.assertFalse(x in g.container)
self.assertFalse(x in g.value)
self.assertTrue(len(f.defaults) == len(g.defaults))
# print 'f.defaults = %s' % (f.defaults, )
# print 'g.defaults = %s' % (g.defaults, )
self.assertTrue(all([f_req == g_req and f_feed == g_feed and
f_val == g_val
for ((f_req, f_feed, f_val), (g_req, g_feed, g_val)) in zip(
f.defaults, g.defaults)]))
self.assertFalse(g.value[1] is f.value[1]) # should not have been copied
self.assertFalse(g.value[2] is f.value[2]) # should have been copied because it is mutable.
self.assertFalse((g.value[2] != f.value[2]).any()) # its contents should be identical
self.assertTrue(f(2, 1) == g(2)) # they should be in sync, default value should be copied.
self.assertTrue(f(2, 1) == g(2)) # they should be in sync, default value should be copied.
f(1, 2) # put them out of sync
self.assertFalse(f(1, 2) == g(1, 2)) # they should not be equal anymore.
g(1, 2) # put them back in sync
self.assertTrue(f(3) == g(3)) # They should be in sync again.
def test_pickle(self):
a = T.scalar() # the a is for 'anonymous' (un-named).
x, s = T.scalars('xs')
f = function([x, In(a, value=1.0, name='a'),
In(s, value=0.0, update=s + a * x, mutable=True)], s + a * x)
try:
# Note that here we also test protocol 0 on purpose, since it
# should work (even though one should not use it).
g = pickle.loads(pickle.dumps(f, protocol=0))
g = pickle.loads(pickle.dumps(f, protocol=-1))
except NotImplementedError as e:
if e[0].startswith('DebugMode is not picklable'):
return
else:
raise
# if they both return, assume that they return equivalent things.
# print [(k,id(k)) for k in f.finder.keys()]
# print [(k,id(k)) for k in g.finder.keys()]
self.assertFalse(g.container[0].storage is f.container[0].storage)
self.assertFalse(g.container[1].storage is f.container[1].storage)
self.assertFalse(g.container[2].storage is f.container[2].storage)
self.assertFalse(x in g.container)
self.assertFalse(x in g.value)
self.assertFalse(g.value[1] is f.value[1]) # should not have been copied
self.assertFalse(g.value[2] is f.value[2]) # should have been copied because it is mutable.
self.assertFalse((g.value[2] != f.value[2]).any()) # its contents should be identical
self.assertTrue(f(2, 1) == g(2)) # they should be in sync, default value should be copied.
self.assertTrue(f(2, 1) == g(2)) # they should be in sync, default value should be copied.
f(1, 2) # put them out of sync
self.assertFalse(f(1, 2) == g(1, 2)) # they should not be equal anymore.
def test_empty_givens_updates():
"""
Regression test for bug fixed in 8625e03.
"""
# Empty givens / updates dictionaries were not properly detected before,
# triggering useless crashes at compile time.
x = T.scalar()
y = x * 2
function([theano.In(x)], y, givens={})
function([theano.In(x)], y, updates={})
def test_param_allow_downcast_int(self):
a = tensor.wvector('a') # int16
b = tensor.bvector('b') # int8
c = tensor.bscalar('c') # int8
f = pfunc([In(a, allow_downcast=True),
In(b, allow_downcast=False),
In(c, allow_downcast=None)],
(a + b + c))
# Both values are in range. Since they're not ndarrays (but lists),
# they will be converted, and their value checked.
assert numpy.all(f([3], [6], 1) == 10)
# Values are in range, but a dtype too large has explicitly been given
# For performance reasons, no check of the data is explicitly performed
# (It might be OK to change this in the future.)
self.assertRaises(TypeError, f,
[3], numpy.array([6], dtype='int16'), 1)
# Value too big for a, silently ignored
assert numpy.all(f([2 ** 20], numpy.ones(1, dtype='int8'), 1) == 2)
# Value too big for b, raises TypeError
self.assertRaises(TypeError, f, [3], [312], 1)
# Value too big for c, raises TypeError
self.assertRaises(TypeError, f, [3], [6], 806)
def test_input_aliasing_affecting_inplace_operations(self):
# Note: to trigger this bug with theano rev 4586:2bc6fc7f218b,
# you need to make in inputs mutable (so that inplace
# operations are used) and to break the elemwise composition
# with some non-elemwise op (here dot)
x = theano.tensor.dvector()
y = theano.tensor.dvector()
m1 = theano.tensor.dmatrix()
m2 = theano.tensor.dmatrix()
f = theano.function([theano.In(x, mutable=True),
theano.In(y, mutable=True),
theano.In(m1, mutable=True),
theano.In(m2, mutable=True)],
theano.dot((x * 2), m1) + theano.dot((y * 3), m2))
# Test 1. If the same variable is given twice
# Compute bogus values
v = numpy.asarray([1, 2, 3, 4, 5], dtype='float64')
m = numpy.asarray([[1, 0, 0, 0, 0],
[0, 1, 0, 0, 0],
[0, 0, 1, 0, 0],
[0, 0, 0, 1, 0],
[0, 0, 0, 0, 1]], dtype='float64')
bogus_vals = f(v, v, m, m)
# Since we used inplace operation v and m may be corrupted
# so we need to recreate them
v = numpy.asarray([1, 2, 3, 4, 5], dtype='float64')
m = numpy.asarray([[1, 0, 0, 0, 0],
[0, 1, 0, 0, 0],
[0, 0, 1, 0, 0],
[0, 0, 0, 1, 0],
[0, 0, 0, 0, 1]], dtype='float64')
m_copy = m.copy()
v_copy = v.copy()
vals = f(v, v_copy, m, m_copy)
assert numpy.allclose(vals, bogus_vals)
def test_reverse_inplace(self):
mySymbolicMatricesList = TypedListType(T.TensorType(
theano.config.floatX, (False, False)))()
z = Reverse()(mySymbolicMatricesList)
m = theano.compile.mode.get_default_mode().including("typed_list_inplace_opt")
f = theano.function([In(mySymbolicMatricesList, borrow=True,
mutable=True)], z, accept_inplace=True, mode=m)
self.assertTrue(f.maker.fgraph.toposort()[0].op.inplace)
x = rand_ranged_matrix(-1000, 1000, [100, 101])
y = rand_ranged_matrix(-1000, 1000, [100, 101])
self.assertTrue(numpy.array_equal(f([x, y]), [y, x]))