def _spec_op_init(scalar_op, nfunc, nin, nout):
def construct(symbol):
symbolname = symbol.__name__
msg = "no_inplace"
n = "Elemwise{%s,%s}" % (symbolname, msg)
rval = Elemwise(scalar_op, name=n,
nfunc_spec=(nfunc and (nfunc, nin, nout)))
if getattr(symbol, '__doc__', False):
rval.__doc__ = symbol.__doc__ + '\n' + rval.__doc__
# for the meaning of this see the ./epydoc script
# it makes epydoc display rval as if it were a function, not an object
rval.__epydoc_asRoutine = symbol
rval.__module__ = 'tensor'
pprint.assign(rval, printing.FunctionPrinter(symbolname))
return rval
return construct
python类printing()的实例源码
def get_output(self, train=False):
print(len(self.layers))
u=self.layers[0].get_output(train)
t=self.layers[1].get_output(train)
#tp=t[0]
#tn=t[1]
#un=T.dot(u,u)
#return [T.dot(u,tp)/(un*T.dot(tp,tp)) ,T.dot(u,tn)/(un*T.dot(tn,tn))]
#theano.printing.pprint('vals')
#x=T.dvector()
#printed_u = hello_world_op(x)
#f = theano.function([x], printed_u)
#f(['here'])
#T.reshape(u,[2,1])
#T.reshape(t,[1,2,2])
#d=T.dot(t.dimshuffle(1, 0, 2), u)
#u1=self.activation(u)
#t.reshape([2,2,2])
return (([u ,u]*t.dimshuffle(1,0,2)).dimshuffle(1,0,2))#.reshape([2,2])
#return d.dimshuffle(1,0,2) #just dot product
def get_output(self, train=False):
print(len(self.layers))
u=self.layers[0].get_output(train)
t=self.layers[1].get_output(train)
#tp=t[0]
#tn=t[1]
#un=T.dot(u,u)
#return [T.dot(u,tp)/(un*T.dot(tp,tp)) ,T.dot(u,tn)/(un*T.dot(tn,tn))]
#theano.printing.pprint('vals')
#x=T.dvector()
#printed_u = hello_world_op(x)
#f = theano.function([x], printed_u)
#f(['here'])
#T.reshape(u,[2,1])
#T.reshape(t,[1,2,2])
#d=T.dot(t.dimshuffle(1, 0, 2), u)
#u1=self.activation(u)
#t.reshape([2,2,2])
return T.max( (([u ,u]*t.dimshuffle(1,0,2)).dimshuffle(1,0,2)),2)#.reshape([2,2])
#return d.dimshuffle(1,0,2) #just dot product
def get_output(self, train=False):
print(len(self.layers))
u=self.layers[0].get_output(train)
t=self.layers[1].get_output(train)
#tp=t[0]
#tn=t[1]
#un=T.dot(u,u)
#return [T.dot(u,tp)/(un*T.dot(tp,tp)) ,T.dot(u,tn)/(un*T.dot(tn,tn))]
#theano.printing.pprint('vals')
#x=T.dvector()
#printed_u = hello_world_op(x)
#f = theano.function([x], printed_u)
#f(['here'])
#T.reshape(u,[2,1])
#T.reshape(t,[1,2,2])
#d=T.dot(t.dimshuffle(1, 0, 2), u)
#u1=self.activation(u)
#t.reshape([2,2,2])
return T.sum( (([u ,u,u,u,u]*t.dimshuffle(1,0,2)).dimshuffle(1,0,2)),2)#.reshape([2,2])
#return d.dimshuffle(1,0,2) #just dot product
def get_output(self, train=False):
print(len(self.layers))
u=self.layers[0].get_output(train)
t=self.layers[1].get_output(train)
#tp=t[0]
#tn=t[1]
#un=T.dot(u,u)
#return [T.dot(u,tp)/(un*T.dot(tp,tp)) ,T.dot(u,tn)/(un*T.dot(tn,tn))]
#theano.printing.pprint('vals')
#x=T.dvector()
#printed_u = hello_world_op(x)
#f = theano.function([x], printed_u)
#f(['here'])
#T.reshape(u,[2,1])
#T.reshape(t,[1,2,2])
#d=T.dot(t.dimshuffle(1, 0, 2), u)
#u1=self.activation(u)
#t.reshape([2,2,2])
return T.sum( (([u ,u]*t.dimshuffle(1,0,2)).dimshuffle(1,0,2)),2)#.reshape([2,2])
#return d.dimshuffle(1,0,2) #just dot product
def forward_conv_batch(self, x):
"""
:param x: (batch, length, dim)
:return: (batch, length - kernel + 2*padding_size + 1, hidden_dim)
"""
# T.nn.conv2d (batch size, input channels, input rows, input columns)
# dl4nlp (batch size, 1, length, in_dim)
x = x.dimshuffle([0, 'x', 1, 2])
# T.nn.conv2d (output channels, input channels, filter rows, filter columns)
# dl4nlp (hidden_dim, 1, kernel_size, in_dim)
filter_w = self.W.dimshuffle([1, 'x', 0, 2])
# T.nn.conv2d (batch size, output channels, output rows, output columns)
# dl4nlp (batch size, hidden_dim, length+kernel-1, 1)
conv_result = T.nnet.conv2d(x, filter_w,
border_mode='valid',)
# from theano.printing import Print
# conv_result = Print()(conv_result)
# (batch size, hidden_dim, length - kernel + 2*padding_size + 1, 1)
# -> (batch, length - kernel + 2*padding_size + 1, hidden_dim)
conv_result = T.transpose(conv_result[:, :, :, 0], (0, 2, 1))
return conv_result
def test_pydotprint_long_name():
"""This is a REALLY PARTIAL TEST.
It prints a graph where there are variable and apply nodes whose long
names are different, but not the shortened names.
We should not merge those nodes in the dot graph.
"""
# Skip test if pydot is not available.
if not theano.printing.pydot_imported:
raise SkipTest('pydot not available')
x = tensor.dvector()
mode = theano.compile.mode.get_default_mode().excluding("fusion")
f = theano.function([x], [x * 2, x + x], mode=mode)
f([1, 2, 3, 4])
theano.printing.pydotprint(f, max_label_size=5,
print_output_file=False)
theano.printing.pydotprint([x * 2, x + x],
max_label_size=5,
print_output_file=False)
def test_printing_scan():
# Skip test if pydot is not available.
if not theano.printing.pydot_imported:
raise SkipTest('pydot not available')
def f_pow2(x_tm1):
return 2 * x_tm1
state = theano.tensor.scalar('state')
n_steps = theano.tensor.iscalar('nsteps')
output, updates = theano.scan(f_pow2,
[],
state,
[],
n_steps=n_steps,
truncate_gradient=-1,
go_backwards=False)
f = theano.function([state, n_steps],
output,
updates=updates,
allow_input_downcast=True)
theano.printing.pydotprint(output, scan_graphs=True)
theano.printing.pydotprint(f, scan_graphs=True)
def test_inplace0():
# should fail to insert gemm_inplace because gemm_inplace would
# create cycles
X, Y, Z, a, b = T.matrix('X'), T.matrix('Y'), T.matrix('Z'), T.scalar(
'a'), T.scalar('b')
R, S, c = T.matrix('R'), T.matrix('S'), T.scalar('c')
f = inplace_func([Z, b, R, S],
[Z * (Z + b * T.dot(R, S).T)], mode='FAST_RUN')
if (gemm_inplace in [n.op for n in f.maker.fgraph.apply_nodes]):
print(pp(f.maker.fgraph.outputs[0]))
raise Failure('gemm_inplace in graph')
assert gemm_no_inplace in [n.op for n in f.maker.fgraph.apply_nodes]
# gemm_inplace should be inserted here, to work in-place on Z*c
f = inplace_func([X, Y, Z, a, b, R, S, c],
[Z * (c * Z + a * T.dot(X, Y) + b * T.dot(R, S).T)],
mode='FAST_RUN')
if (not gemm_inplace in [n.op for n in f.maker.fgraph.apply_nodes]):
theano.printing.debugprint(f)
raise Failure('no gemm_inplace in graph')
def get_output(self, train=False):
print(len(self.layers))
u=self.layers[0].get_output(train)
t=self.layers[1].get_output(train)
#tp=t[0]
#tn=t[1]
#un=T.dot(u,u)
#return [T.dot(u,tp)/(un*T.dot(tp,tp)) ,T.dot(u,tn)/(un*T.dot(tn,tn))]
#theano.printing.pprint('vals')
#x=T.dvector()
#printed_u = hello_world_op(x)
#f = theano.function([x], printed_u)
#f(['here'])
#T.reshape(u,[2,1])
#T.reshape(t,[1,2,2])
#d=T.dot(t.dimshuffle(1, 0, 2), u)
#u1=self.activation(u)
#t.reshape([2,2,2])
return (([u ,u]*t.dimshuffle(1,0,2)).dimshuffle(1,0,2))#.reshape([2,2])
#return d.dimshuffle(1,0,2) #just dot product
def get_output(self, train=False):
print(len(self.layers))
u=self.layers[0].get_output(train)
t=self.layers[1].get_output(train)
#tp=t[0]
#tn=t[1]
#un=T.dot(u,u)
#return [T.dot(u,tp)/(un*T.dot(tp,tp)) ,T.dot(u,tn)/(un*T.dot(tn,tn))]
#theano.printing.pprint('vals')
#x=T.dvector()
#printed_u = hello_world_op(x)
#f = theano.function([x], printed_u)
#f(['here'])
#T.reshape(u,[2,1])
#T.reshape(t,[1,2,2])
#d=T.dot(t.dimshuffle(1, 0, 2), u)
#u1=self.activation(u)
#t.reshape([2,2,2])
return T.max( (([u ,u]*t.dimshuffle(1,0,2)).dimshuffle(1,0,2)),2)#.reshape([2,2])
#return d.dimshuffle(1,0,2) #just dot product
def get_output(self, train=False):
print(len(self.layers))
u=self.layers[0].get_output(train)
t=self.layers[1].get_output(train)
#tp=t[0]
#tn=t[1]
#un=T.dot(u,u)
#return [T.dot(u,tp)/(un*T.dot(tp,tp)) ,T.dot(u,tn)/(un*T.dot(tn,tn))]
#theano.printing.pprint('vals')
#x=T.dvector()
#printed_u = hello_world_op(x)
#f = theano.function([x], printed_u)
#f(['here'])
#T.reshape(u,[2,1])
#T.reshape(t,[1,2,2])
#d=T.dot(t.dimshuffle(1, 0, 2), u)
#u1=self.activation(u)
#t.reshape([2,2,2])
return T.sum( (([u ,u,u,u,u]*t.dimshuffle(1,0,2)).dimshuffle(1,0,2)),2)#.reshape([2,2])
#return d.dimshuffle(1,0,2) #just dot product
def get_output(self, train=False):
print(len(self.layers))
u=self.layers[0].get_output(train)
t=self.layers[1].get_output(train)
#tp=t[0]
#tn=t[1]
#un=T.dot(u,u)
#return [T.dot(u,tp)/(un*T.dot(tp,tp)) ,T.dot(u,tn)/(un*T.dot(tn,tn))]
#theano.printing.pprint('vals')
#x=T.dvector()
#printed_u = hello_world_op(x)
#f = theano.function([x], printed_u)
#f(['here'])
#T.reshape(u,[2,1])
#T.reshape(t,[1,2,2])
#d=T.dot(t.dimshuffle(1, 0, 2), u)
#u1=self.activation(u)
#t.reshape([2,2,2])
return T.sum( (([u ,u]*t.dimshuffle(1,0,2)).dimshuffle(1,0,2)),2)#.reshape([2,2])
#return d.dimshuffle(1,0,2) #just dot product
def dbg_hook(hook, x):
if not isinstance(x, TT.TensorVariable):
x.out = theano.printing.Print(global_fn=hook)(x.out)
return x
else:
return theano.printing.Print(global_fn=hook)(x)
def dbg_hook(hook, x):
if not isinstance(x, TT.TensorVariable):
x.out = theano.printing.Print(global_fn=hook)(x.out)
return x
else:
return theano.printing.Print(global_fn=hook)(x)
def test_pydotprint_cond_highlight():
"""
This is a REALLY PARTIAL TEST.
I did them to help debug stuff.
"""
# Skip test if pydot is not available.
if not theano.printing.pydot_imported:
raise SkipTest('pydot not available')
x = tensor.dvector()
f = theano.function([x], x * 2)
f([1, 2, 3, 4])
s = StringIO()
new_handler = logging.StreamHandler(s)
new_handler.setLevel(logging.DEBUG)
orig_handler = theano.logging_default_handler
theano.theano_logger.removeHandler(orig_handler)
theano.theano_logger.addHandler(new_handler)
try:
theano.printing.pydotprint(f, cond_highlight=True,
print_output_file=False)
finally:
theano.theano_logger.addHandler(orig_handler)
theano.theano_logger.removeHandler(new_handler)
assert (s.getvalue() == 'pydotprint: cond_highlight is set but there'
' is no IfElse node in the graph\n')
def test_pydotprint_return_image():
# Skip test if pydot is not available.
if not theano.printing.pydot_imported:
raise SkipTest('pydot not available')
x = tensor.dvector()
ret = theano.printing.pydotprint(x * 2, return_image=True)
assert isinstance(ret, (str, bytes))
def test_pydotprint_variables():
"""
This is a REALLY PARTIAL TEST.
I did them to help debug stuff.
It make sure the code run.
"""
# Skip test if pydot is not available.
if not theano.printing.pydot_imported:
raise SkipTest('pydot not available')
x = tensor.dvector()
s = StringIO()
new_handler = logging.StreamHandler(s)
new_handler.setLevel(logging.DEBUG)
orig_handler = theano.logging_default_handler
theano.theano_logger.removeHandler(orig_handler)
theano.theano_logger.addHandler(new_handler)
try:
theano.printing.pydotprint(x * 2)
if not theano.printing.pd.__name__ == "pydot_ng":
theano.printing.pydotprint_variables(x * 2)
finally:
theano.theano_logger.addHandler(orig_handler)
theano.theano_logger.removeHandler(new_handler)
def test_pydotprint_profile():
"""Just check that pydotprint does not crash with profile."""
# Skip test if pydot is not available.
if not theano.printing.pydot_imported:
raise SkipTest('pydot not available')
A = tensor.matrix()
prof = theano.compile.ProfileStats(atexit_print=False)
f = theano.function([A], A + 1, profile=prof)
theano.printing.pydotprint(f, print_output_file=False)
f([[1]])
theano.printing.pydotprint(f, print_output_file=False)
def test_upcasting_scalar_nogemm():
# Test that the optimization does not crash when the scale has an incorrect
# dtype, and forces upcasting of the result
v = T.fmatrix('v')
w = T.fmatrix('w')
t = T.fmatrix('t')
alpha = T.dscalar('a')
rval = T.dot(w, v) * alpha + t
f = theano.function([w, v, t, alpha], rval)
t = f.maker.fgraph.toposort()
assert numpy.sum([isinstance(n.op, Gemm) for n in t]) == 0
#theano.printing.debugprint(f, print_type=True)
v = T.fmatrix('v')
w = T.fmatrix('w')
t = T.fmatrix('t')
alpha = T.cscalar('a')
on_opt_error = config.on_opt_error
try:
config.on_opt_error = 'raise'
rval = T.dot(w, v) * alpha + t
f = theano.function([w, v, t, alpha], rval)
finally:
config.on_opt_error = on_opt_error
t = f.maker.fgraph.toposort()
assert numpy.sum([isinstance(n.op, Gemm) for n in t]) == 0
#theano.printing.debugprint(f, print_type=True)
def test_inplace1():
X, Y, Z, a, b = XYZab()
# with > 2 terms in the overall addition
f = inplace_func([X, Y, Z],
[Z + Z + T.dot(X, Y)], mode='FAST_RUN')
# theano.printing.debugprint(f)
# it doesn't work inplace because we didn't mark Z as mutable input
assert [n.op for n in f.maker.fgraph.apply_nodes] == [gemm_no_inplace]
def _conversion(real_value, name):
__oplist_tag(real_value, 'casting')
real_value.__module__ = 'tensor.basic'
pprint.assign(real_value, printing.FunctionPrinter(name))
return real_value
# These _conver_to_<type> functions have leading underscores to indicate that
# they should not be called directly. They do not perform sanity checks about
# what types you are casting to what. That logic is implemented by the
# `cast()` function below.
def apply(self, fgraph):
import theano.printing
print("PrintCurrentFunctionGraph:", self.header)
theano.printing.debugprint(fgraph.outputs)
def just_gemm(i, o, ishapes=[(4, 3), (3, 5), (4, 5), (), ()],
max_graphlen=0, expected_nb_gemm=1):
try:
f = inplace_func(
[In(ii, mutable=True, allow_downcast=True) for ii in i],
o,
mode='FAST_RUN',
on_unused_input='ignore')
nb_gemm = 0
for node in f.maker.fgraph.apply_nodes:
if isinstance(node.op, T.Dot):
raise Failure('dot not changed to gemm_inplace in graph')
if node.op == _dot22:
raise Failure('_dot22 not changed to gemm_inplace in graph')
if node.op == gemm_inplace:
nb_gemm += 1
assert nb_gemm == expected_nb_gemm, (nb_gemm, expected_nb_gemm)
g = inplace_func(i, o, mode=compile.Mode(linker='py', optimizer=None),
allow_input_downcast=True, on_unused_input='ignore')
for node in g.maker.fgraph.apply_nodes:
if node.op == gemm_inplace:
raise Exception('gemm_inplace in original graph')
graphlen = len(f.maker.fgraph.toposort())
if max_graphlen and (graphlen <= max_graphlen):
# theano.printing.debugprint(f)
assert False, 'graphlen=%i>%i' % (graphlen, max_graphlen)
rng = numpy.random.RandomState(unittest_tools.fetch_seed(234))
r0 = f(*[numpy.asarray(rng.randn(*sh), config.floatX)
for sh in ishapes])
rng = numpy.random.RandomState(unittest_tools.fetch_seed(234))
r1 = g(*[numpy.asarray(rng.randn(*sh), config.floatX)
for sh in ishapes])
max_abs_err = numpy.max(numpy.abs(r0[0] - r1[0]))
eps = 1.0e-8
if config.floatX == 'float32':
eps = 1.0e-6
if max_abs_err > eps:
raise Failure('GEMM is computing the wrong output. max_rel_err =',
max_abs_err)
except Failure:
for node in f.maker.fgraph.toposort():
print('GRAPH', node)
raise
def cmp_dot22(self, b_shp, c_shp):
av = numpy.zeros((0, 0), dtype=self.dtype)
bv = self.rand(*b_shp)
cv = self.rand(*c_shp)
a = self.shared(av, 'a')
b = self.shared(bv, 'b')
c = self.shared(cv, 'c')
b_t = self.shared(bv.T, 'b.T')
c_t = self.shared(cv.T, 'c.T')
b_dev = b.get_value(borrow=False, return_internal_type=True)
c_dev = c.get_value(borrow=False, return_internal_type=True)
bt_dev = b_t.get_value(borrow=False, return_internal_type=True)
ct_dev = c_t.get_value(borrow=False, return_internal_type=True)
f_nn = theano.function([], [], updates=[(a, tensor.dot(b, c))],
mode=self.mode)
# print 'class name:', self.__class__.__name__
# theano.printing.debugprint(f_nn)
f_nt = theano.function([], [], updates=[(a, tensor.dot(b, c_t.T))],
mode=self.mode)
f_tn = theano.function([], [], updates=[(a, tensor.dot(b_t.T, c))],
mode=self.mode)
f_tt = theano.function([], [], updates=[(a, tensor.dot(b_t.T, c_t.T))],
mode=self.mode)
# Try with all stride patterns, and all transposed pattern
for step_signs in itertools_product((-1, 1), repeat=4):
for step in (1, 2):
b_step1, b_step2, c_step1, c_step2 = (s * step
for s in step_signs)
b.set_value(b_dev.copy()[::b_step1, ::b_step2], borrow=True)
c.set_value(c_dev.copy()[::c_step1, ::c_step2], borrow=True)
b_t.set_value(bt_dev.copy()[::b_step2, ::b_step1], borrow=True)
c_t.set_value(ct_dev.copy()[::c_step2, ::c_step1], borrow=True)
# Numpy result
a_n = numpy.dot(bv[::b_step1, ::b_step2],
cv[::c_step1, ::c_step2])
f_nn()
assert numpy.allclose(a.get_value(), a_n)
f_nt()
assert numpy.allclose(a.get_value(), a_n)
f_tn()
assert numpy.allclose(a.get_value(), a_n)
f_tt()
assert numpy.allclose(a.get_value(), a_n)
def _scal_elemwise_with_nfunc(nfunc, nin, nout):
"""
Replace a symbol definition with an elementwise version of the
corresponding scalar Op. If it is not None, the nfunc argument
should be a string such that getattr(numpy, nfunc) implements
a vectorized version of the elemwise operation. nin is the number
of inputs expected by that function, and nout is the number of
**destination** inputs it takes. That is, the function should
take nin+nout inputs. nout == 0 means that the numpy function
does not take a numpy array argument to put its result in.
"""
def construct(symbol):
symbolname = symbol.__name__
inplace = symbolname.endswith('_inplace')
if inplace:
msg = "inplace"
else:
msg = "no_inplace"
n = "Elemwise{%s,%s}" % (symbolname, msg)
if inplace:
scalar_op = getattr(scal, symbolname[:-len('_inplace')])
inplace_scalar_op = scalar_op.__class__(scal.transfer_type(0))
rval = elemwise.Elemwise(inplace_scalar_op, {0: 0}, name=n,
nfunc_spec=(nfunc and (nfunc, nin, nout)))
else:
scalar_op = getattr(scal, symbolname)
rval = elemwise.Elemwise(scalar_op, name=n,
nfunc_spec=(nfunc and (nfunc, nin, nout)))
if getattr(symbol, '__doc__', False):
rval.__doc__ = symbol.__doc__ + '\n' + rval.__doc__
# for the meaning of this see the ./epydoc script
# it makes epydoc display rval as if it were a function, not an object
rval.__epydoc_asRoutine = symbol
rval.__module__ = 'tensor'
pprint.assign(rval, printing.FunctionPrinter(symbolname))
return rval
return construct