def renet_layer_lr_allscan(X, rnn1, rnn2, w, h, wp, hp):
# list_of_images = []
C = X.shape[0]
X = X.dimshuffle((1, 0, 2)).reshape((h/hp, hp*C*w)) # split the rows for the first scan
def rnn_pass(x):
x = x.reshape((hp, C, w)).dimshuffle((2, 1, 0)).reshape((w/wp, C*wp*hp))
h1 = rnn1.output(x)
h2 = rnn2.output(x, go_backwards=True)
img = T.concatenate([h1.T, h2.T])
# list_of_images.append(img)
return img
results, _ = theano.scan(
fn=rnn_pass,
sequences=X,
outputs_info=None,
n_steps=h/hp,
)
return results.dimshuffle((1, 0, 2))
# return T.stacklists(list_of_images).dimshuffle((1, 0, 2))
python类scan()的实例源码
def forward_all(self, x, masks = None, h0=None, return_c=False, direction = None):
if h0 is None:
if x.ndim > 1:
h0 = T.zeros((x.shape[1], self.n_out*(self.order+1)), dtype=theano.config.floatX)
else:
h0 = T.zeros((self.n_out*(self.order+1),), dtype=theano.config.floatX)
if masks == None:
masks = T.ones((x.shape[0], x.shape[1]), dtype = theano.config.floatX)
h, _ = theano.scan(
fn = self.forward,
sequences = [x, masks],
outputs_info = [ h0 ]
)
if return_c:
return h
elif x.ndim > 1:
return h[:,:,self.n_out*self.order:]
else:
return h[:,self.n_out*self.order:]
def for_loop(step_function, inputs, initial_hidden_states, go_backwards):
"""
inputs: time axis must be first
"""
results = theano.scan(
step_function,
sequences=inputs,
outputs_info=initial_hidden_states,
go_backwards=go_backwards)[0] #screw the updates
#when results has length 1, it is not returned as a list. wrap it
if (isinstance(results, list)==False):
results = [results]
#put the batch axis back in front
results = [dimshuffle(tensor, [1,0]+[x for x in xrange(2, tensor.ndim)])
for tensor in results]
return results
def get_output(self, train=False):
X = self.get_input(train) # shape: (nb_samples, time (padded with zeros), input_dim)
# new shape: (time, nb_samples, input_dim) -> because theano.scan iterates over main dimension
padded_mask = self.get_padded_shuffled_mask(train, X, pad=1)
X = X.dimshuffle((1, 0, 2))
x = T.dot(X, self.W) + self.b
# scan = theano symbolic loop.
# See: http://deeplearning.net/software/theano/library/scan.html
# Iterate over the first dimension of the x array (=time).
outputs, updates = theano.scan(
self._step, # this will be called with arguments (sequences[i], outputs[i-1], non_sequences[i])
sequences=[x, dict(input=padded_mask, taps=[-1])], # tensors to iterate over, inputs to _step
# initialization of the output. Input to _step with default tap=-1.
outputs_info=T.unbroadcast(alloc_zeros_matrix(X.shape[1], self.output_dim), 1),
non_sequences=self.U, # static inputs to _step
truncate_gradient=self.truncate_gradient
)
if self.return_sequences:
return outputs.dimshuffle((1, 0, 2))
return outputs[-1]
def get_output(self, train=False):
X = self.get_input(train)
padded_mask = self.get_padded_shuffled_mask(train, X, pad=1)
X = X.dimshuffle((1, 0, 2))
x_z = T.dot(X, self.W_z) + self.b_z
x_r = T.dot(X, self.W_r) + self.b_r
x_h = T.dot(X, self.W_h) + self.b_h
outputs, updates = theano.scan(
self._step,
sequences=[x_z, x_r, x_h, padded_mask],
outputs_info=T.unbroadcast(alloc_zeros_matrix(X.shape[1], self.output_dim), 1),
non_sequences=[self.U_z, self.U_r, self.U_h],
truncate_gradient=self.truncate_gradient
)
if self.return_sequences:
return outputs.dimshuffle((1, 0, 2))
return outputs[-1]
def get_output(self, train=False):
X = self.get_input(train)
padded_mask = self.get_padded_shuffled_mask(train, X, pad=1)
X = X.dimshuffle((1, 0, 2))
xi = T.dot(X, self.W_i) + self.b_i
xf = T.dot(X, self.W_f) + self.b_f
xc = T.dot(X, self.W_c) + self.b_c
xo = T.dot(X, self.W_o) + self.b_o
[outputs, memories], updates = theano.scan(
self._step,
sequences=[xi, xf, xo, xc, padded_mask],
outputs_info=[
T.unbroadcast(alloc_zeros_matrix(X.shape[1], self.output_dim), 1),
T.unbroadcast(alloc_zeros_matrix(X.shape[1], self.output_dim), 1)
],
non_sequences=[self.U_i, self.U_f, self.U_o, self.U_c],
truncate_gradient=self.truncate_gradient
)
if self.return_sequences:
return outputs.dimshuffle((1, 0, 2))
return outputs[-1]
def get_output(self, train=False):
X = self.get_input(train)
padded_mask = self.get_padded_shuffled_mask(train, X, pad=1)
X = X.dimshuffle((1, 0, 2))
x_z = T.dot(X, self.W_z) + self.b_z
x_r = T.dot(X, self.Pmat) + self.b_r
x_h = T.dot(X, self.W_h) + self.b_h
outputs, updates = theano.scan(
self._step,
sequences=[x_z, x_r, x_h, padded_mask],
outputs_info=T.unbroadcast(alloc_zeros_matrix(X.shape[1], self.output_dim), 1),
non_sequences=[self.U_z, self.U_r, self.U_h],
truncate_gradient=self.truncate_gradient
)
if self.return_sequences:
return outputs.dimshuffle((1, 0, 2))
return outputs[-1]
def get_output(self, train=False):
X = self.get_input(train)
padded_mask = self.get_padded_shuffled_mask(train, X, pad=1)
X = X.dimshuffle((1, 0, 2))
x_z = T.dot(X, self.W_z) + self.b_z
x_r = T.dot(X, self.W_r) + self.b_r
x_h = T.dot(X, self.W_h) + self.b_h
outputs, updates = theano.scan(
self._step,
sequences=[x_z, x_r, x_h, padded_mask],
outputs_info=T.unbroadcast(alloc_zeros_matrix(X.shape[1], self.output_dim), 1),
non_sequences=[self.U_z, self.U_r, self.U_h],
truncate_gradient=self.truncate_gradient
)
if self.return_sequences:
return outputs.dimshuffle((1, 0, 2))
return outputs[-1]
def ctc_path_probs(predict, Y, alpha=1e-4):
smoothed_predict = (1 - alpha) * predict[:, Y] + alpha * np.float32(1.) / Y.shape[0]
L = T.log(smoothed_predict)
zeros = T.zeros_like(L[0])
log_first = zeros
f_skip_idxs = ctc_create_skip_idxs(Y)
b_skip_idxs = ctc_create_skip_idxs(Y[::-1]) # there should be a shortcut to calculating this
def step(log_f_curr, log_b_curr, f_active, log_f_prev, b_active, log_b_prev):
f_active_next, log_f_next = ctc_update_log_p(f_skip_idxs, zeros, f_active, log_f_curr, log_f_prev)
b_active_next, log_b_next = ctc_update_log_p(b_skip_idxs, zeros, b_active, log_b_curr, log_b_prev)
return f_active_next, log_f_next, b_active_next, log_b_next
[f_active, log_f_probs, b_active, log_b_probs], _ = theano.scan(
step, sequences=[L, L[::-1, ::-1]], outputs_info=[np.int32(1), log_first, np.int32(1), log_first])
idxs = T.arange(L.shape[1]).dimshuffle('x', 0)
mask = (idxs < f_active.dimshuffle(0, 'x')) & (idxs < b_active.dimshuffle(0, 'x'))[::-1, ::-1]
log_probs = log_f_probs + log_b_probs[::-1, ::-1] - L
return log_probs, mask
def __Recurrent(name, hidden_dims, step_fn, inputs, non_sequences=[], h0s=None):
if not isinstance(inputs, list):
inputs = [inputs]
if not isinstance(hidden_dims, list):
hidden_dims = [hidden_dims]
if h0s is None:
h0s = [None]*len(hidden_dims)
for i in xrange(len(hidden_dims)):
if h0s[i] is None:
h0_unbatched = lib.param(
name + '.h0_' + str(i),
numpy.zeros((hidden_dims[i],), dtype=theano.config.floatX)
)
num_batches = inputs[0].shape[1]
h0s[i] = T.alloc(h0_unbatched, num_batches, hidden_dims[i])
h0s[i] = T.patternbroadcast(h0s[i], [False] * h0s[i].ndim)
outputs, _ = theano.scan(
step_fn,
sequences=inputs,
outputs_info=h0s,
non_sequences=non_sequences
)
return outputs
def __init__(self, dropout_prob, fix_mask=False, fast_predict=False, prefix="dropout"):
self.dropout_prob = dropout_prob
self.fix_mask = fix_mask
self.prefix = prefix
self.fast_predict = fast_predict
print (self.prefix, self.dropout_prob, self.fix_mask)
assert (dropout_prob > 0)
""" This one works for the scan function.
(instead of theano.tensor.shared.randomstreams.RandomStreams)
See discussion: https://groups.google.com/forum/#!topic/theano-users/DbvTgTqkT8o
"""
self.rng = MRG_RandomStreams(seed=RANDOM_SEED, use_cuda=True)
def get_reconstruction_cost(self, updates, pre_sigmoid_nv):
"""Approximation to the reconstruction error
Note that this function requires the pre-sigmoid activation as
input. To understand why this is so you need to understand a
bit about how Theano works. Whenever you compile a Theano
function, the computational graph that you pass as input gets
optimized for speed and stability. This is done by changing
several parts of the subgraphs with others. One such
optimization expresses terms of the form log(sigmoid(x)) in
terms of softplus. We need this optimization for the
cross-entropy since sigmoid of numbers larger than 30. (or
even less then that) turn to 1. and numbers smaller than
-30. turn to 0 which in terms will force theano to compute
log(0) and therefore we will get either -inf or NaN as
cost. If the value is expressed in terms of softplus we do not
get this undesirable behaviour. This optimization usually
works fine, but here we have a special case. The sigmoid is
applied inside the scan op, while the log is
outside. Therefore Theano will only see log(scan(..)) instead
of log(sigmoid(..)) and will not apply the wanted
optimization. We can not go and replace the sigmoid in scan
with something else also, because this only needs to be done
on the last step. Therefore the easiest and more efficient way
is to get also the pre-sigmoid activation as an output of
scan, and apply both the log and sigmoid outside scan such
that Theano can catch and optimize the expression.
"""
cross_entropy = T.mean(
T.sum(
self.input * T.log(T.nnet.sigmoid(pre_sigmoid_nv)) +
(1 - self.input) * T.log(1 - T.nnet.sigmoid(pre_sigmoid_nv)),
axis=1
)
)
return cross_entropy
def output(self, train=True):
outputs_info = [self.h0, self.s0]
([outputs, states], updates) = theano.scan(
fn=self.one_step, #function
sequences=self.X,
# n_steps=600,
outputs_info = outputs_info,
go_backwards=self.go_backwards
)
return outputs
def output(self, train=True):
outputs_info = [self.s0]
(outputs, updates) = theano.scan(
fn=self.one_step,
sequences=self.X,
outputs_info=outputs_info,
go_backwards=self.go_backwards
)
return outputs
def output(self, train=True):
outputs_info = [self.s0]
(outputs, updates) = theano.scan(
fn=self.one_step,
sequences=self.X,
outputs_info=outputs_info,
go_backwards=self.go_backwards
)
return outputs
def get_output_for(self, input, **kwargs):
def norm_fn(f, mask, label, previous, W_sim):
# f: inst * class, mask: inst, previous: inst * class, W_sim: class * class
next = previous.dimshuffle(0, 1, 'x') + f.dimshuffle(0, 'x', 1) + W_sim.dimshuffle('x', 0, 1)
if COST:
next = next + COST_CONST * (1.0 - T.extra_ops.to_one_hot(label, self.num_classes).dimshuffle(0, 'x', 1))
# next: inst * prev * cur
next = theano_logsumexp(next, axis = 1)
# next: inst * class
mask = mask.dimshuffle(0, 'x')
next = previous * (1.0 - mask) + next * mask
return next
f = T.dot(input, self.W)
# f: inst * time * class
initial = f[:, 0, :]
if CRF_INIT:
initial = initial + self.W_init[0].dimshuffle('x', 0)
if COST:
initial = initial + COST_CONST * (1.0 - T.extra_ops.to_one_hot(self.label_input[:, 0], self.num_classes))
outputs, _ = theano.scan(fn = norm_fn, \
sequences = [f.dimshuffle(1, 0, 2)[1: ], self.mask_input.dimshuffle(1, 0)[1: ], self.label_input.dimshuffle(1, 0)[1:]], \
outputs_info = initial, non_sequences = [self.W_sim], strict = True)
norm = T.sum(theano_logsumexp(outputs[-1], axis = 1))
f_pot = (f.reshape((-1, f.shape[-1]))[T.arange(f.shape[0] * f.shape[1]), self.label_input.flatten()] * self.mask_input.flatten()).sum()
if CRF_INIT:
f_pot += self.W_init[0][self.label_input[:, 0]].sum()
labels = self.label_input
# labels: inst * time
shift_labels = T.roll(labels, -1, axis = 1)
mask = self.mask_input
# mask : inst * time
shift_mask = T.roll(mask, -1, axis = 1)
g_pot = (self.W_sim[labels.flatten(), shift_labels.flatten()] * mask.flatten() * shift_mask.flatten()).sum()
return - (f_pot + g_pot - norm) / f.shape[0]
def get_output_for(self, input, **kwargs):
def max_fn(f, mask, prev_score, prev_back, W_sim):
next_score = prev_score.dimshuffle(0, 1, 'x') + f.dimshuffle(0, 'x', 1) + W_sim.dimshuffle('x', 0, 1)
next_back = T.argmax(next_score, axis = 1)
next_score = T.max(next_score, axis = 1)
mask = mask.dimshuffle(0, 'x')
next_score = next_score * mask + prev_score * (1.0 - mask)
next_back = next_back * mask + prev_back * (1.0 - mask)
next_back = T.cast(next_back, 'int32')
return [next_score, next_back]
def produce_fn(back, mask, prev_py):
# back: inst * class, prev_py: inst, mask: inst
next_py = back[T.arange(prev_py.shape[0]), prev_py]
next_py = mask * next_py + (1.0 - mask) * prev_py
next_py = T.cast(next_py, 'int32')
return next_py
f = T.dot(input, self.W)
init_score, init_back = f[:, 0, :], T.zeros_like(f[:, 0, :], dtype = 'int32')
if CRF_INIT:
init_score = init_score + self.W_init[0].dimshuffle('x', 0)
([scores, backs], _) = theano.scan(fn = max_fn, \
sequences = [f.dimshuffle(1, 0, 2)[1: ], self.mask_input.dimshuffle(1, 0)[1: ]], \
outputs_info = [init_score, init_back], non_sequences = [self.W_sim], strict = True)
init_py = T.argmax(scores[-1], axis = 1)
init_py = T.cast(init_py, 'int32')
# init_py: inst, backs: time * inst * class
pys, _ = theano.scan(fn = produce_fn, \
sequences = [backs, self.mask_input.dimshuffle(1, 0)[1:]], outputs_info = [init_py], go_backwards = True)
# pys: (rev_time - 1) * inst
pys = pys.dimshuffle(1, 0)[:, :: -1]
# pys : inst * (time - 1)
return T.concatenate([pys, init_py.dimshuffle(0, 'x')], axis = 1)
def __init__(self, rng, x, minibatch_size, n_hidden, x_vocabulary, y_vocabulary, stage1_model_file_name, p=None):
y_vocabulary_size = len(y_vocabulary)
self.stage1_model_file_name = stage1_model_file_name
self.stage1, _ = load(stage1_model_file_name, minibatch_size, x)
self.n_hidden = n_hidden
self.x_vocabulary = x_vocabulary
self.y_vocabulary = y_vocabulary
# output model
self.GRU = GRULayer(rng=rng, n_in=self.stage1.n_hidden + 1, n_out=n_hidden, minibatch_size=minibatch_size)
self.Wy = weights_const(n_hidden, y_vocabulary_size, 'Wy', 0)
self.by = weights_const(1, y_vocabulary_size, 'by', 0)
self.params = [self.Wy, self.by]
self.params += self.GRU.params
def recurrence(x_t, p_t, h_tm1, Wy, by):
h_t = self.GRU.step(x_t=T.concatenate((x_t, p_t.dimshuffle((0, 'x'))), axis=1), h_tm1=h_tm1)
z = T.dot(h_t, Wy) + by
y_t = T.nnet.softmax(z)
return [h_t, y_t]
[_, self.y], _ = theano.scan(fn=recurrence,
sequences=[self.stage1.last_hidden_states, p],
non_sequences=[self.Wy, self.by],
outputs_info=[self.GRU.h0, None])
print "Number of parameters is %d" % sum(np.prod(p.shape.eval()) for p in self.params)
print "Number of parameters with stage1 params is %d" % sum(np.prod(p.shape.eval()) for p in self.params + self.stage1.params)
self.L1 = sum(abs(p).sum() for p in self.params)
self.L2_sqr = sum((p**2).sum() for p in self.params)
def get_output_for(self, inputs, **kwargs):
unary, ref = inputs
N, _, H, W = ref.shape
yx = tt.cast(tt.stack(tt.mgrid[0:H, 0:W]), "float32")
grid = tt.alloc(yx[np.newaxis, :, :, :], N, 2, H, W)
stacked = tt.concatenate([grid, ref], axis=1)
def _bilateral(V, R):
o = tt.ones((1, V.shape[1], V.shape[2]), "float32")
norm = tt.sqrt(gaussian_filter(R, o, self.kstd_bf,
self.ref_dim)) + 1e-8
return gaussian_filter(R, V/norm, self.kstd_bf, self.ref_dim,
self.val_dim) / norm
def _step(prev_q, U, ref, normalize=True):
qbf = _bilateral(prev_q, ref,)
qsf = tt.nnet.conv2d(prev_q[np.newaxis, :, :, :],
self.W_spatial, border_mode="half")[0]
q_hat = -self.compat_bf * qbf + -self.compat_spatial * qsf
q_hat = U - q_hat
return softmax(q_hat, axis=0) if normalize else q_hat
def _inference(unary_i, ref_i):
U = tt.log(tt.clip(unary_i, 1e-5, 1))
prev_q = softmax(U, axis=0)
# This is faster than using scan.
for i in range(self.num_iter):
normalize = self.normalize_final_iter or i < self.num_iter-1
prev_q = _step(prev_q, U, ref_i, normalize)
return prev_q
return theano.scan(fn=_inference, sequences=[unary, stacked],
outputs_info=None)[0]
def grad(self, inputs, ograds):
ref, values, ref_dim, val_dim = inputs[:4]
hash_struct = inputs[4:]
ograd = ograds[0]
ref_dim = get_scalar_constant_value(ref_dim)
val_dim = get_scalar_constant_value(val_dim)
def _conv(x):
return GaussianFilter()(ref, x, ref_dim, val_dim, *hash_struct)
# Since the kernels are separable and symmetric, the gradient w.r.t.
# input is just the same filtering applied to the output grads.
grad_i = _conv(ograd)
def _gradr(r_i, vals, og, *args):
return (og * (_conv(vals*r_i) - r_i*_conv(vals)) +
vals * (_conv(og*r_i) - r_i*_conv(og)))
grad_r, _ = theano.scan(fn=_gradr, sequences=[ref],
non_sequences=[values, ograd] + hash_struct,
outputs_info=None)
grad_r = grad_r.sum(axis=1, acc_dtype="float32")
grads = [DisconnectedType()() for i in range(len(inputs))]
grads[0] = grad_r
grads[1] = grad_i
return grads