def __init__(self, rng, input, filter_shape, image_shape, poolsize=(2, 2),
stride=(1, 1)):
"""
Allocate a LeNetConvPoolLayer with shared variable internal parameters.
"""
assert image_shape[1] == filter_shape[1]
self.input = input
fan_in = np.prod(filter_shape[1:])
fan_out = (filter_shape[0] * np.prod(filter_shape[2:]) /
np.prod(poolsize))
W_bound = np.sqrt(6. / (fan_in + fan_out))
self.W = theano.shared(
np.asarray(
rng.uniform(low=-W_bound, high=W_bound, size=filter_shape),
dtype=theano.config.floatX
),
borrow=True
)
b_values = np.zeros((filter_shape[0],), dtype=theano.config.floatX)
self.b = theano.shared(value=b_values, borrow=True)
conv_out = conv.conv2d(
input=input,
filters=self.W,
filter_shape=filter_shape,
image_shape=image_shape,
subsample=stride
)
pooled_out = downsample.max_pool_2d(
input=conv_out,
ds=poolsize,
ignore_border=True
)
self.output = T.tanh(pooled_out + self.b.dimshuffle('x', 0, 'x', 'x'))
python类max_pool_2d()的实例源码
def __init__(self, rng, input, filter_shape, image_shape, poolsize=(2, 2)):
assert image_shape[1] == filter_shape[1]
self.input = input
# there are "num input feature maps * filter height * filter width"
# inputs to each hidden unit
fan_in = numpy.prod(filter_shape[1:])
# each unit in the lower layer receives a gradient from:
# "num output feature maps * filter height * filter width" /
# pooling size
fan_out = (filter_shape[0] * numpy.prod(filter_shape[2:]) /
numpy.prod(poolsize))
# initialize weights with random weights
W_bound = numpy.sqrt(6. / (fan_in + fan_out))
self.W = theano.shared(numpy.asarray(
rng.uniform(low=-W_bound, high=W_bound, size=filter_shape),
dtype=theano.config.floatX),
borrow=True)
# the bias is a 1D tensor -- one bias per output feature map
b_values = numpy.zeros((filter_shape[0],), dtype=theano.config.floatX)
self.b = theano.shared(value=b_values, borrow=True)
# convolve input feature maps with filters
conv_out = conv.conv2d(input=input, filters=self.W,
filter_shape=filter_shape, image_shape=image_shape)
# downsample each feature map individually, using maxpooling
pooled_out = downsample.max_pool_2d(input=conv_out,
ds=poolsize, ignore_border=True)
self.output = T.maximum(0.0, pooled_out + self.b.dimshuffle('x', 0, 'x', 'x'))
# store parameters of this layer
self.params = [self.W, self.b]
def LeNetConvPoolLayer(inps, feature_map, batch, length, window, dim, prefix, params, names):
fan_in = window * dim
fan_out = feature_map * window * dim / (length - window + 1)
filter_shape = (feature_map, 1, window, dim)
image_shape = (batch, 1, length, dim)
pool_size = (length - window + 1, 1)
#if non_linear=="none" or non_linear=="relu":
# conv_W = theano.shared(0.2 * numpy.random.uniform(low=-1.0,high=1.0,\
# size=filter_shape).astype(theano.config.floatX))
#else:
# W_bound = numpy.sqrt(6. / (fan_in + fan_out))
# conv_W = theano.shared(numpy.random.uniform(low=-W_bound,high=W_bound,\
# size=filter_shape).astype(theano.config.floatX))
W_bound = numpy.sqrt(6. / (fan_in + fan_out))
conv_W = theano.shared(numpy.random.uniform(low=-W_bound,high=W_bound,\
size=filter_shape).astype(theano.config.floatX))
conv_b = theano.shared(numpy.zeros(filter_shape[0], dtype=theano.config.floatX))
# bundle
params += [ conv_W, conv_b ]
names += [ prefix + '_conv_W_' + str(window), prefix + '_conv_b_' + str(window) ]
conv_out = conv.conv2d(input=inps, filters=conv_W, filter_shape=filter_shape, image_shape=image_shape)
conv_out_act = T.tanh(conv_out + conv_b.dimshuffle('x', 0, 'x', 'x'))
conv_output = downsample.max_pool_2d(input=conv_out_act, ds=pool_size, ignore_border=True)
return conv_output.flatten(2)
def output_func(self, input):
# In input we get a tensor (batch_size, nwords, ndim)
return downsample.max_pool_2d(input=input, ds=self.pool_size, ignore_border=True)
def set_inpt(self, inpt, inpt_dropout, mini_batch_size):
self.inpt = inpt.reshape(self.image_shape)
conv_out = conv.conv2d(
input=self.inpt, filters=self.w, filter_shape=self.filter_shape,
image_shape=self.image_shape)
pooled_out = downsample.max_pool_2d(
input=conv_out, ds=self.poolsize, ignore_border=True)
self.output = self.activation_fn(
pooled_out + self.b.dimshuffle('x', 0, 'x', 'x'))
self.output_dropout = self.output # no dropout in the convolutional layers
def output_func(self, input):
# In input we get a tensor (batch_size, nwords, ndim)
return downsample.max_pool_2d(input=input, ds=self.pool_size, ignore_border=True)
def predict(self, new_data, batch_size):
img_shape = (batch_size, 1, self.image_shape[2], self.image_shape[3])
conv_out = conv.conv2d(input=new_data, filters=self.W, filter_shape=self.filter_shape, image_shape=img_shape)
if self.non_linear=="tanh":
conv_out_tanh = T.tanh(conv_out + self.b.dimshuffle('x', 0, 'x', 'x'))
output = downsample.max_pool_2d(input=conv_out_tanh, ds=self.poolsize, ignore_border=True)
if self.non_linear=="relu":
conv_out_tanh = ReLU(conv_out + self.b.dimshuffle('x', 0, 'x', 'x'))
output = downsample.max_pool_2d(input=conv_out_tanh, ds=self.poolsize, ignore_border=True)
else:
pooled_out = downsample.max_pool_2d(input=conv_out, ds=self.poolsize, ignore_border=True)
output = pooled_out + self.b.dimshuffle('x', 0, 'x', 'x')
return output
def model(X, w, w2, w3, w4, w_o, p_drop_conv, p_drop_hidden):
l1a = rectify(conv2d(X, w, border_mode='full'))
l1 = max_pool_2d(l1a, (2, 2))
l1 = dropout(l1, p_drop_conv)
l2a = rectify(conv2d(l1, w2))
l2 = max_pool_2d(l2a, (2, 2))
l2 = dropout(l2, p_drop_conv)
l3a = rectify(conv2d(l2, w3))
l3b = max_pool_2d(l3a, (2, 2))
l3 = T.flatten(l3b, outdim=2)
l3 = dropout(l3, p_drop_conv)
l4 = rectify(T.dot(l3, w4))
l4 = dropout(l4, p_drop_hidden)
pyx = softmax(T.dot(l4, w_o))
return l1, l2, l3, l4, pyx
def op(self, state):
X = self.l_in.op(state=state)
return max_pool_2d(X, self.shape)
def set_input(self, input):
# convolve input feature maps with filters
conv_out = conv.conv2d(input=input, filters=self.W,filter_shape=self.filter_shape, image_shape=self.image_shape)
if self.non_linear=="tanh":
conv_out_tanh = T.tanh(conv_out + self.b.dimshuffle('x', 0, 'x', 'x'))
output = downsample.max_pool_2d(input=conv_out_tanh, ds=self.poolsize, ignore_border=True)
elif self.non_linear=="relu":
conv_out_tanh = ReLU(conv_out + self.b.dimshuffle('x', 0, 'x', 'x'))
output = downsample.max_pool_2d(input=conv_out_tanh, ds=self.poolsize, ignore_border=True)
else:
pooled_out = downsample.max_pool_2d(input=conv_out, ds=self.poolsize, ignore_border=True)
output = pooled_out + self.b.dimshuffle('x', 0, 'x', 'x')
return output
def predict(self, lnew_data, rnew_data):
"""
predict for new data
"""
lconv_out = conv.conv2d(input=lnew_data, filters=self.W)
rconv_out = conv.conv2d(input=rnew_data, filters=self.W)
lconv_out_tanh = T.tanh(lconv_out + self.b.dimshuffle('x', 0, 'x', 'x'))
rconv_out_tanh = T.tanh(rconv_out + self.b.dimshuffle('x', 0, 'x', 'x'))
loutput = downsample.max_pool_2d(input=lconv_out_tanh, ds=self.poolsize, ignore_border=True, mode="max")
routput = downsample.max_pool_2d(input=rconv_out_tanh, ds=self.poolsize, ignore_border=True, mode="max")
return loutput, routput
def get_output(self, train=False):
X = self.get_input(train)
X = T.reshape(X, (X.shape[0], X.shape[1], X.shape[2], 1)).dimshuffle(0, 2, 1, 3)
output = downsample.max_pool_2d(X, ds=self.pool_size, st=self.st, ignore_border=self.ignore_border)
output = output.dimshuffle(0, 2, 1, 3)
return T.reshape(output, (output.shape[0], output.shape[1], output.shape[2]))
def get_output(self, train=False):
X = self.get_input(train)
output = downsample.max_pool_2d(X, ds=self.pool_size, st=self.stride, ignore_border=self.ignore_border)
return output
def output_func(self, input):
return downsample.max_pool_2d(input, ds=self.maxpool_shape, ignore_border=self.ig_bor,st=self.st)
def output_func(self, input):
# In input we get a tensor (batch_size, nwords, ndim)
return downsample.max_pool_2d(input=input, ds=self.pool_size, ignore_border=True)
def get_output(self, train=False):
#output = K.pool2d(x = train, pool_size = (self.pool_length,1),
# border_mode = self.border_mode, pool_mode='max')
pool_size = (self.pool_length, 1)
strides = (self.pool_length, 1)
ignore_border = True
padding = (0, 0)
output = downsample.max_pool_2d(train, ds=pool_size, st=strides,
ignore_border=ignore_border,
padding=padding,
mode='max')
return output
def step(self, input):
# self.input = input
# convolve input feature maps with filters
# conv_out = t.conv.conv2d(
# input=input,
# filters=self.W,
# filter_shape=filter_shape,
# image_shape=image_shape
# )
conv_out = conv.conv2d(
input=input,
filters=self.W,
filter_shape=self.filter_shape,
image_shape=self.image_shape,
border_mode=self.border_mode
)
# downsample each feature map individually, using maxpooling
pooled_out = downsample.max_pool_2d(
input=conv_out,
ds=self.poolsize,
ignore_border=True,
)
# add the bias term. Since the bias is a vector (1D array), we first
# reshape it to a tensor of shape (1, n_filters, 1, 1). Each bias will
# thus be broadcasted across mini-batches and feature map
# width & height
output = tt.tanh(pooled_out + self.b.dimshuffle('x', 0, 'x', 'x'))
return output
def get_output(self, train):
X = self.get_input(train)
X = theano.tensor.reshape(X, (X.shape[0], X.shape[1], X.shape[2], 1)).dimshuffle(0, 1, 3, 2)
output = downsample.max_pool_2d(X, ds=self.poolsize, st=self.st, ignore_border=self.ignore_border)
output = output.dimshuffle(0, 1, 3, 2)
return theano.tensor.reshape(output, (output.shape[0], output.shape[1], output.shape[2]))
def get_output(self, train):
X = self.get_input(train)
output = downsample.max_pool_2d(X, ds=self.poolsize, st=self.stride, ignore_border=self.ignore_border)
return output
def predict(self, lnew_data, rnew_data):
"""
predict for new data
"""
lconv_out = conv.conv2d(input=lnew_data, filters=self.W)
rconv_out = conv.conv2d(input=rnew_data, filters=self.W)
lconv_out_tanh = T.tanh(lconv_out + self.b.dimshuffle('x', 0, 'x', 'x'))
rconv_out_tanh = T.tanh(rconv_out + self.b.dimshuffle('x', 0, 'x', 'x'))
loutput = downsample.max_pool_2d(input=lconv_out_tanh, ds=self.poolsize, ignore_border=True, mode="max")
routput = downsample.max_pool_2d(input=rconv_out_tanh, ds=self.poolsize, ignore_border=True, mode="max")
return loutput, routput