def _step_forward_with_context(self, x_t, x_m, h_tm1, c_z, c_r, c_h):
"""
x_t: input at time t
x_m: mask of x_t
h_tm1: previous state
c_x: contex of the rnn
"""
z_t = T.nnet.sigmoid(T.dot(x_t, self.W_xz) +
T.dot(h_tm1, self.W_hz) + c_z + self.b_z)
r_t = T.nnet.sigmoid(T.dot(x_t, self.W_xr) +
T.dot(h_tm1, self.W_hr) + c_r + self.b_r)
can_h_t = T.tanh(T.dot(x_t, self.W_xh) +
r_t * T.dot(h_tm1, self.W_hh) + c_h +
self.b_h)
h_t = (1 - z_t) * h_tm1 + z_t * can_h_t
h_t = x_m[:, None] * h_t + (1. - x_m[:, None]) * h_tm1
return h_t
python类dot()的实例源码
def _step_forward(self, x_t, x_m, h_tm1):
"""
x_t: input at time t
x_m: mask of x_t
h_tm1: previous state
c_x: contex of the rnn
"""
z_t = T.nnet.sigmoid(T.dot(x_t, self.W_xz) +
T.dot(h_tm1, self.W_hz) + self.b_z)
r_t = T.nnet.sigmoid(T.dot(x_t, self.W_xr) +
T.dot(h_tm1, self.W_hr) + self.b_r)
can_h_t = T.tanh(T.dot(x_t, self.W_xh) +
r_t * T.dot(h_tm1, self.W_hh) +
self.b_h)
h_t = (1 - z_t) * h_tm1 + z_t * can_h_t
h_t = x_m[:, None] * h_t + (1. - x_m[:, None]) * h_tm1
return h_t
def linear(inputs, size, bias, concat=False, dtype=None, scope=None):
if not isinstance(size, (list, tuple)):
raise ValueError("size argument must be (input_size, output_size)")
input_size, output_size = size
if not isinstance(input_size, (list, tuple)):
input_size = [input_size]
if not isinstance(inputs, (list, tuple)):
inputs = [inputs]
if len(inputs) != len(input_size):
raise RuntimeError("unmatched elements found: inputs and input_size")
results = []
with variable_scope(scope):
if concat:
input_size = sum(input_size)
inputs = theano.tensor.concatenate(inputs, -1)
shape = [input_size, output_size]
matrix = get_variable("matrix", shape, dtype=dtype)
results.append(theano.dot(inputs, matrix))
else:
for i in range(len(input_size)):
shape = [input_size[i], output_size]
name = "matrix_%d" % i
matrix = get_variable(name, shape, dtype=dtype)
results.append(theano.dot(inputs[i], matrix))
if bias:
shape = [output_size]
bias = get_variable("bias", shape, dtype=dtype)
results.append(bias)
if len(results) == 1:
return results[0]
return reduce(theano.tensor.add, results)
def test_csr_correct_output_faster_than_scipy(self):
# contrast with test_grad, we put csr in float32, csc in float64
sparse_dtype = 'float32'
dense_dtype = 'float32'
a = SparseType('csr', dtype=sparse_dtype)()
b = tensor.matrix(dtype=dense_dtype)
d = theano.dot(a, b)
f = theano.function([a, b], d)
for M, N, K, nnz in [(4, 3, 2, 3),
(40, 30, 20, 3),
(40, 30, 20, 30),
(400, 3000, 200, 6000),
]:
spmat = sp.csr_matrix(random_lil((M, N), sparse_dtype, nnz))
mat = numpy.asarray(numpy.random.randn(N, K), dense_dtype)
t0 = time.time()
theano_result = f(spmat, mat)
t1 = time.time()
scipy_result = spmat * mat
t2 = time.time()
theano_time = t1 - t0
scipy_time = t2 - t1
# print 'theano took', theano_time,
# print 'scipy took', scipy_time
overhead_tol = 0.002 # seconds
overhead_rtol = 1.1 # times as long
utt.assert_allclose(scipy_result, theano_result)
if (not theano.config.mode in ["DebugMode", "DEBUG_MODE"] and
theano.config.cxx):
self.assertFalse(
theano_time > overhead_rtol * scipy_time + overhead_tol,
(theano_time,
overhead_rtol * scipy_time + overhead_tol,
scipy_time, overhead_rtol, overhead_tol))
def test_cuda(self):
import theano.sandbox.cuda as cuda
if not cuda.cuda_available:
raise SkipTest("Optional package cuda not available")
a = sparse.csr_matrix('a', dtype='float32')
b = cuda.float32_shared_constructor(
numpy.random.rand(3, 4).astype('float32'))
d = sparse.dot(a, b)
f = theano.function([a], d)
a_val = scipy.sparse.csr_matrix(random_lil((5, 3), 'float32', 5))
d_theano = f(a_val)
d_numpy = a_val * b.get_value()
utt.assert_allclose(d_numpy, d_theano)
def matrix_dot(*args):
""" Shorthand for product between several dots.
Given :math:`N` matrices :math:`A_0, A_1, .., A_N`, ``matrix_dot`` will
generate the matrix product between all in the given order, namely
:math:`A_0 \cdot A_1 \cdot A_2 \cdot .. \cdot A_N`.
"""
rval = args[0]
for a in args[1:]:
rval = theano.tensor.dot(rval, a)
return rval
def perform(self, node, inputs, outputs):
"""
Implements the "reverse-mode" gradient for the eigensystem of
a square matrix.
"""
x, w, v, W, V = inputs
N = x.shape[0]
outer = numpy.outer
def G(n):
return sum(v[:, m] * V.T[n].dot(v[:, m]) / (w[n] - w[m])
for m in xrange(N) if m != n)
g = sum(outer(v[:, n], v[:, n] * W[n] + G(n))
for n in xrange(N))
# Numpy's eigh(a, 'L') (eigh(a, 'U')) is a function of tril(a)
# (triu(a)) only. This means that partial derivative of
# eigh(a, 'L') (eigh(a, 'U')) with respect to a[i,j] is zero
# for i < j (i > j). At the same time, non-zero components of
# the gradient must account for the fact that variation of the
# opposite triangle contributes to variation of two elements
# of Hermitian (symmetric) matrix. The following line
# implements the necessary logic.
out = self.tri0(g) + self.tri1(g).T
# The call to self.tri0 in perform upcast from float32 to
# float64 or from int* to int64 in numpy 1.6.1 but not in
# 1.6.2. We do not want version dependent dtype in Theano.
# We think it should be the same as the output.
outputs[0][0] = numpy.asarray(out, dtype=node.outputs[0].dtype)
def matrix_power(M, n):
"""
Raise a square matrix to the (integer) power n.
Parameters
----------
M : Tensor variable
n : Python int
"""
result = 1
for i in xrange(n):
result = theano.dot(result, M)
return result
def test_const_type_in_mul_canonizer():
input = dmatrix()
w = dmatrix()
visb = dvector()
hidb = dvector()
betas = dvector()
a = dvector()
def sigm(x):
return 1. / (1 + tensor.exp(-x))
hid = sigm((tensor.dot(w, input) + hidb) * betas)
vis_gauss1 = (tensor.dot(w.T, hid) + visb) * betas / (2 * a * a)
vis_gauss2 = (tensor.dot(w.T, hid) + visb) * betas / (2. * a * a)
f1 = function([input, w, visb, hidb, betas, a], vis_gauss1)
f2 = function([input, w, visb, hidb, betas, a], vis_gauss2)
ival = numpy.random.rand(5, 5)
wval = numpy.random.rand(5, 5)
visbval = numpy.random.rand(5)
hidbval = numpy.random.rand(5)
betaval = numpy.random.rand(5)
aval = numpy.random.rand(5)
utt.assert_allclose(
f2(ival, wval, visbval, hidbval, betaval, aval),
f1(ival, wval, visbval, hidbval, betaval, aval))
def test_dot_allocs_0(self):
v1 = tensor.vector('v1')
v2 = tensor.vector('v2')
m1 = tensor.matrix('m1')
m2 = tensor.matrix('m2')
vv2 = numpy.asarray([0, 1], dtype=theano.config.floatX)
vm2 = numpy.asarray([[1, 2], [4, 5]],
dtype=theano.config.floatX)
vv3 = numpy.asarray([0, 1, 2], dtype=theano.config.floatX)
vm3 = numpy.asarray([[1, 2, 3], [4, 5, 6], [7, 8, 9]],
dtype=theano.config.floatX)
for _e1 in [(v1, vv2, vv3), (m1, vm2, vm3)]:
for _e2 in [(v2, vv2, vv3), (m2, vm2, vm3)]:
for p in [0, 1]:
if p == 0:
e1 = tensor.zeros_like(_e1[0])
e2 = _e2[0]
else:
e1 = _e1[0]
e2 = tensor.zeros_like(_e2[0])
o = tensor.dot(e1, e2)
f = theano.function([_e1[0], _e2[0]], o, mode=self.mode)
f(_e1[1], _e2[1])
f(_e1[2], _e2[2])
assert numpy.all([not isinstance(n.op, tensor.Dot) for n in
f.maker.fgraph.toposort()])
# test that we don't remove shape errors
self.assertRaises((ValueError, AssertionError), f,
_e1[1], _e2[2])
self.assertRaises((ValueError, AssertionError), f,
_e1[2], _e2[1])
def test_local_subtensor_of_dot():
m1 = theano.tensor.matrix()
m2 = theano.tensor.matrix()
d1 = numpy.arange(6).reshape((3, 2)).astype(config.floatX)
d2 = numpy.arange(8).reshape((2, 4)).astype(config.floatX) + 10
mode = compile.get_default_mode().including("local_subtensor_of_dot")
def test_equality(a, b):
return a.shape == b.shape and numpy.allclose(a, b)
# [cst]
f = theano.function([m1, m2], theano.dot(m1, m2)[1], mode=mode)
topo = f.maker.fgraph.toposort()
assert test_equality(f(d1, d2), numpy.dot(d1, d2)[1])
# DimShuffle happen in FAST_COMPILE
assert isinstance(topo[-1].op, (T.blas_c.CGemv, T.blas.Gemv, T.DimShuffle))
# slice
f = theano.function([m1, m2], theano.dot(m1, m2)[1:2], mode=mode)
topo = f.maker.fgraph.toposort()
assert test_equality(f(d1, d2), numpy.dot(d1, d2)[1:2])
assert isinstance(topo[-1].op, (T.blas.Dot22))
m1 = theano.tensor.tensor3()
m2 = theano.tensor.tensor3()
idx = theano.tensor.iscalar()
d1 = numpy.arange(30).reshape(2, 5, 3).astype(config.floatX)
d2 = numpy.arange(72).reshape(4, 3, 6).astype(config.floatX) + 100
f = theano.function([m1, m2, idx], theano.dot(m1, m2)[idx, 1:4, :, idx:], mode=mode)
assert test_equality(f(d1, d2, 1), numpy.dot(d1, d2)[1, 1:4, :, 1:])
# if we return the gradients. We need to use same mode as before.
assert check_stack_trace(f, ops_to_check='last')
f = theano.function([m1, m2, idx], theano.dot(m1, m2)[1:4, :, idx:, idx], mode=mode)
assert test_equality(f(d1, d2, 1), numpy.dot(d1, d2)[1:4, :, 1:, 1])
# Now test that the stack trace is copied over properly,
# if we return the gradients. We need to use same mode as before.
assert check_stack_trace(f, ops_to_check='last')
def test_matrix_matrix(self):
a, b = matrices('ab')
g = self.simple_optimize(FunctionGraph([a, b], [tensor.dot(a, b).T]))
sg = '[dot(InplaceDimShuffle{1,0}(b), InplaceDimShuffle{1,0}(a))]'
assert str(g) == sg, (str(g), sg)
# Check stacktrace was copied over correctly after opt was applied
self.assertTrue(check_stack_trace(g, ops_to_check='all'))
def test_row_matrix(self):
a = vector('a')
b = matrix('b')
g = optimize(FunctionGraph(
[a, b],
[tensor.dot(a.dimshuffle('x', 0), b).T]),
level='stabilize')
sg = '[dot(InplaceDimShuffle{1,0}(b), InplaceDimShuffle{0,x}(a))]'
assert str(g) == sg, (str(g), sg)
# Check stacktrace was copied over correctly after opt was applied
self.assertTrue(check_stack_trace(g, ops_to_check='all'))
def test_matrix_col(self):
a = vector('a')
b = matrix('b')
g = optimize(FunctionGraph(
[a, b],
[tensor.dot(b, a.dimshuffle(0, 'x')).T]),
level='stabilize')
sg = '[dot(InplaceDimShuffle{x,0}(a), InplaceDimShuffle{1,0}(b))]'
assert str(g) == sg, (str(g), sg)
# Check stacktrace was copied over correctly after opt was applied
self.assertTrue(check_stack_trace(g, ops_to_check='all'))
def _gemm(z, a, x, y, b):
assert a.shape == ()
assert b.shape == ()
return b * z + a * numpy.dot(x, y)
def test_factorised_scalar(self):
a = T.matrix()
b = T.matrix()
c = T.matrix()
s = theano.shared(numpy.zeros((5, 5)).astype(config.floatX))
lr1 = T.constant(0.01).astype(config.floatX)
lr2 = T.constant(2).astype(config.floatX)
l2_reg = T.constant(0.0001).astype(config.floatX)
# test constant merge with gemm
f = theano.function([a, b], updates=[(s, lr1 * T.dot(a, b) +
l2_reg * lr2 * s)],
mode=mode_not_fast_compile).maker.fgraph.toposort()
#[Gemm{inplace}(<TensorType(float64, matrix)>, 0.01,
# <TensorType(float64, matrix)>, <TensorType(float64, matrix)>,
# 2e-06)]
assert len(f) == 1
assert f[0].op == gemm_inplace
# test factored scalar with merge
f = theano.function([a, b], updates=[(s, lr1 * (T.dot(a, b) -
l2_reg * s))],
mode=mode_not_fast_compile).maker.fgraph.toposort()
#[Gemm{inplace}(<TensorType(float64, matrix)>, 0.01,
# <TensorType(float64, matrix)>, <TensorType(float64, matrix)>,
# -2e-06)]
assert len(f) == 1
assert f[0].op == gemm_inplace
# test factored scalar with merge and neg
f = theano.function([a, b],
updates=[(s, s - lr1 * (s * .0002 + T.dot(a, b)))],
mode=mode_not_fast_compile).maker.fgraph.toposort()
#[Gemm{inplace}(<TensorType(float64, matrix)>, -0.01,
# <TensorType(float64, matrix)>, <TensorType(float64, matrix)>,
# 0.999998)]
assert len(f) == 1
assert f[0].op == gemm_inplace
def test_destroy_map4(self):
"""test that dot args can be aliased"""
Z = shared(self.rand(2, 2), name='Z')
A = shared(self.rand(2, 2), name='A')
one = T.constant(1.0).astype(Z.dtype)
f = inplace_func([], gemm_inplace(Z, one, A, A, one))
f()
f = inplace_func([], gemm_inplace(Z, one, A, A.T, one))
f()
def test_gemm_opt0():
"""Many subgraphs whose dots can be eliminated"""
X, Y, Z, a, b = XYZab()
just_gemm([X, Y, Z, a, b], [T.dot(X, Y) * a + Z * b])
just_gemm([X, Y, Z, a, b], [a * T.dot(X, Y) + b * Z])
just_gemm([X, Y, Z, a, b], [b * Z + a * T.dot(X, Y)])
just_gemm([X, Y, Z, a, b], [T.dot(X, Y) * a - Z * b])
just_gemm([X, Y, Z, a, b], [a * T.dot(X, Y) - b * Z])
just_gemm([X, Y, Z, a, b], [b * Z - a * T.dot(X, Y)])
# with transposes (transposes should be pushed through dot in canonicalize)
just_gemm([X, Y, Z, a, b], [b * Z.T - a * T.dot(Y.T, X.T)])
just_gemm([X, Y, Z, a, b], [b * Z.T + a * b * T.dot(X, Y).T])
just_gemm([X, Y, Z, a, b], [b * Z + a * T.dot(X, Y).T],
ishapes=[(5, 3), (3, 4), (4, 5), (), ()])
# with N multiplications instead of just one
just_gemm([X, Y, Z, a, b], [(b * b) * Z * a + (a * a) * T.dot(X, Y) * b])
just_gemm([X, Y, Z, a, b], [Z + T.dot(X, Y)])
just_gemm([X, Y, Z, a, b], [Z * b + T.dot(X, Y)])
just_gemm([X, Y, Z, a, b], [Z + a * b * a * T.dot(X, Y)])
just_gemm([X, Y, Z, a, b], [(b * b) * Z * a - (a * a) * T.dot(X, Y) * b])
just_gemm([X, Y, Z, a, b], [Z - T.dot(X, Y)])
just_gemm([X, Y, Z, a, b], [Z * b - T.dot(X, Y)])
just_gemm([X, Y, Z, a, b], [Z - a * b * a * T.dot(X, Y)])
def test_upcasting_scalar_nogemm():
# Test that the optimization does not crash when the scale has an incorrect
# dtype, and forces upcasting of the result
v = T.fmatrix('v')
w = T.fmatrix('w')
t = T.fmatrix('t')
alpha = T.dscalar('a')
rval = T.dot(w, v) * alpha + t
f = theano.function([w, v, t, alpha], rval)
t = f.maker.fgraph.toposort()
assert numpy.sum([isinstance(n.op, Gemm) for n in t]) == 0
#theano.printing.debugprint(f, print_type=True)
v = T.fmatrix('v')
w = T.fmatrix('w')
t = T.fmatrix('t')
alpha = T.cscalar('a')
on_opt_error = config.on_opt_error
try:
config.on_opt_error = 'raise'
rval = T.dot(w, v) * alpha + t
f = theano.function([w, v, t, alpha], rval)
finally:
config.on_opt_error = on_opt_error
t = f.maker.fgraph.toposort()
assert numpy.sum([isinstance(n.op, Gemm) for n in t]) == 0
#theano.printing.debugprint(f, print_type=True)
def test_gemm_opt_wishlist():
X, Y, Z, a, b = T.matrix(), T.matrix(), T.matrix(), T.scalar(), T.scalar()
# with >2 additions of the same T.dot(X,Y term
just_gemm([X, Y, Z, a, b],
[(b * b) * Z * a + (a * a) * T.dot(X, Y) + b * T.dot(X, Y)])
just_gemm([X, Y, Z, a, b], [Z + T.dot(X, Y) + T.dot(X, Y)])