def get_loss_func(self, C=1.0, k=1):
"""Get loss function of VAE.
The loss value is equal to ELBO (Evidence Lower Bound)
multiplied by -1.
Args:
C (int): Usually this is 1.0. Can be changed to control the
second term of ELBO bound, which works as regularization.
k (int): Number of Monte Carlo samples used in encoded vector.
"""
def lf(x):
mu, ln_var = self.encode(x)
batchsize = len(mu.data)
# reconstruction loss
rec_loss = 0
for l in six.moves.range(k):
z = F.gaussian(mu, ln_var)
rec_loss += F.bernoulli_nll(x, self.decode(z, sigmoid=False)) \
/ (k * batchsize)
self.rec_loss = rec_loss
self.loss = self.rec_loss + \
C * gaussian_kl_divergence(mu, ln_var) / batchsize
return self.loss
return lf
python类sigmoid()的实例源码
def __call__(self, xs):
"""
xs: (batchsize, hidden_dim)
"""
if self.h is not None:
h = self.h
c = self.c
else:
xp = chainer.cuda.get_array_module(xs.data)
batchsize = xs.shape[0]
h = Variable(xp.zeros((batchsize, self.outsize), 'f'), volatile='AUTO')
c = Variable(xp.zeros((batchsize, self.outsize), 'f'), volatile='AUTO')
in_gate = F.sigmoid(self.linear_in(F.concat([xs, h, c])))
new_in = F.tanh(self.linear_c(F.concat([xs, h])))
self.c = in_gate * new_in + (1. - in_gate) * c
out_gate = F.sigmoid(self.linear_out(F.concat([xs, h, self.c])))
self.h = F.tanh(self.c) * out_gate
return self.h
def __call__(self, x):
if not hasattr(self, 'encoding') or self.encoding is None:
self.batch_size = x.shape[0]
self.init()
dims = len(x.shape) - 1
f, z, o = F.split_axis(self.pre(x), 3, axis=dims)
f = F.sigmoid(f)
z = (1 - f) * F.tanh(z)
o = F.sigmoid(o)
if dims == 2:
self.c = strnn(f, z, self.c[:self.batch_size])
else:
self.c = f * self.c + z
if self.attention:
context = attention_sum(self.encoding, self.c)
self.h = o * self.o(F.concat((self.c, context), axis=dims))
else:
self.h = self.c * o
self.x = x
return self.h
def test(model, xs, ts, uss=None):
model.reset_state()
tags = model([Variable(
np.array([x], dtype=np.int32)
) for x in xs])
zss = []
y_mat = np.zeros((2, 2))
zs_mat = tuple(
np.zeros((clf.n_output, clf.n_output))
for clf in model.tagger.classifiers
)
for t, (y, zs) in zip(ts, tags):
y_mat[t, int(cf.sigmoid(y).data[0, 0] > 0.5)] += 1.0
if t:
zss.append(zs)
if uss:
assert len(uss) == len(zss)
for us, zs in zip(uss, zss):
for m, u, z in zip(zs_mat, us, zs):
m[u, cf.softmax(z).data.argmax(1)[0]] += 1
return y_mat, zs_mat
def generate(model, xs):
model.reset_state()
tags = model([Variable(
np.array([x], dtype=np.int32)
) for x in xs])
buf = bytearray()
for x, (y, zs) in zip(xs, tags):
buf.append(x)
if cf.sigmoid(y).data[0, 0] > 0.5:
yield (
buf.decode('utf-8', 'replace'),
tuple(
cf.softmax(z).data.argmax(1)[0]
for z in zs
)
)
buf = bytearray()
def test(model, xs, ts, uss=None):
model.reset_state()
tags = model([Variable(
np.array([x], dtype=np.int32)
) for x in xs])
zss = []
y_mat = np.zeros((2, 2))
zs_mat = tuple(
np.zeros((clf.n_output, clf.n_output))
for clf in model.tagger.classifiers
)
for t, (y, zs) in zip(ts, tags):
y_mat[t, int(cf.sigmoid(y).data[0, 0] > 0.5)] += 1.0
if t:
zss.append(zs)
if uss:
assert len(uss) == len(zss)
for us, zs in zip(uss, zss):
for m, u, z in zip(zs_mat, us, zs):
m[u, cf.softmax(z).data.argmax(1)[0]] += 1
return y_mat, zs_mat
def generate(model, xs):
model.reset_state()
tags = model([Variable(
np.array([x], dtype=np.int32)
) for x in xs])
buf = bytearray()
for x, (y, zs) in zip(xs, tags):
buf.append(x)
if cf.sigmoid(y).data[0, 0] > 0.5:
yield (
buf.decode('utf-8', 'replace'),
tuple(
cf.softmax(z).data.argmax(1)[0]
for z in zs
)
)
buf = bytearray()
def to_function(self):
if self.nonlinearity.lower() == "clipped_relu":
return clipped_relu()
if self.nonlinearity.lower() == "crelu":
return crelu()
if self.nonlinearity.lower() == "elu":
return elu()
if self.nonlinearity.lower() == "hard_sigmoid":
return hard_sigmoid()
if self.nonlinearity.lower() == "leaky_relu":
return leaky_relu()
if self.nonlinearity.lower() == "relu":
return relu()
if self.nonlinearity.lower() == "sigmoid":
return sigmoid()
if self.nonlinearity.lower() == "softmax":
return softmax()
if self.nonlinearity.lower() == "softplus":
return softplus()
if self.nonlinearity.lower() == "tanh":
return tanh()
if self.nonlinearity.lower() == "bst":
return bst()
raise NotImplementedError()
def decode(self, x, test=False, sigmoid=True):
activate = F.relu
# Hidden
h = x
for i in range(self.n_layers_dec - 1):
h = getattr(self, "dec_layer_%i" % i)(h)
h = getattr(self, "dec_batchnorm_%i" % i)(h, test=test)
h = activate(h)
if self.dropout:
h = F.dropout(h, train=not test)
# Output
output = getattr(self, "dec_layer_out")(h)
if sigmoid:
output = F.sigmoid(output)
return output
def __call__(self, X):
# generate random values
R = np.random.randn(X.data.shape[0], self.rand_sz)
R = Variable(R.astype("float32"))
# attach random to the inputs
h = F.concat([R, X])
#h = R
h = self.ipt(h)
#h = F.dropout(h)
y = self.out(h)
# prior knowledge: environment observation is one - hot vector
obs = F.softmax(y[:, :-2])
# prior knowledge: reward is in [0,1]
rew = F.sigmoid(y[:,[-2]])
fin = F.sigmoid(y[:, [-1]])
y = F.concat([obs, rew, fin])
return y
def __init__(self, n_hidden, activate='sigmoid', size=64, ch=512, wscale=0.02):
assert (size % 16 == 0)
initial_size = size // 16
self.n_hidden = n_hidden
if activate == 'sigmoid':
self.activate = F.sigmoid
elif activate == 'tanh':
self.activate = F.tanh
else:
raise ValueError('invalid activate function')
self.ch = ch
self.initial_size = initial_size
w = chainer.initializers.Normal(wscale)
super(Generator, self).__init__(
l0=L.Linear(self.n_hidden, initial_size * initial_size * ch, initialW=w),
dc1=L.Deconvolution2D(ch // 1, ch // 2, 4, 2, 1, initialW=w),
dc2=L.Deconvolution2D(ch // 2, ch // 4, 4, 2, 1, initialW=w),
dc3=L.Deconvolution2D(ch // 4, ch // 8, 4, 2, 1, initialW=w),
dc4=L.Deconvolution2D(ch // 8, 3, 4, 2, 1, initialW=w),
bn0=L.BatchNormalization(initial_size * initial_size * ch),
bn1=L.BatchNormalization(ch // 2),
bn2=L.BatchNormalization(ch // 4),
bn3=L.BatchNormalization(ch // 8),
)
def __init__(self, n_hidden, activate='sigmoid', size=64, ch=512, wscale=0.02):
assert (size % 8 == 0)
initial_size = size // 8
self.n_hidden = n_hidden
self.ch = ch
self.initial_size = initial_size
if activate == 'sigmoid':
self.activate = F.sigmoid
elif activate == 'tanh':
self.activate = F.tanh
else:
raise ValueError('invalid activate function')
w = chainer.initializers.Normal(wscale)
super(Generator2, self).__init__(
l0=L.Linear(self.n_hidden, initial_size * initial_size * ch, initialW=w),
dc1=L.Deconvolution2D(ch // 1, ch // 2, 4, 2, 1, initialW=w),
dc2=L.Deconvolution2D(ch // 2, ch // 4, 4, 2, 1, initialW=w),
dc3=L.Deconvolution2D(ch // 4, ch // 8, 4, 2, 1, initialW=w),
dc4=L.Deconvolution2D(ch // 8, 3, 3, 1, 1, initialW=w),
bn0=L.BatchNormalization(initial_size * initial_size * ch),
bn1=L.BatchNormalization(ch // 2),
bn2=L.BatchNormalization(ch // 4),
bn3=L.BatchNormalization(ch // 8),
)
def __init__(self, n_hidden, activate='sigmoid', size=64, ch=512, wscale=0.02):
assert (size % 8 == 0)
initial_size = size // 8
self.n_hidden = n_hidden
if activate == 'sigmoid':
self.activate = F.sigmoid
elif activate == 'tanh':
self.activate = F.tanh
else:
raise ValueError('invalid activate function')
self.ch = ch
self.initial_size = initial_size
w = chainer.initializers.Normal(wscale)
super(Generator, self).__init__(
l0=L.Linear(self.n_hidden, initial_size * initial_size * ch, initialW=w),
dc1=L.Deconvolution2D(ch // 1, ch // 2, 4, 2, 1, initialW=w),
dc2=L.Deconvolution2D(ch // 2, ch // 4, 4, 2, 1, initialW=w),
dc3=L.Deconvolution2D(ch // 4, ch // 8, 4, 2, 1, initialW=w),
dc4=L.Deconvolution2D(ch // 8, 3, 3, 1, 1, initialW=w),
)
def __init__(self, n_hidden, activate='sigmoid', size=64, ch=512, wscale=0.02):
assert (size % 8 == 0)
initial_size = size // 8
self.n_hidden = n_hidden
self.ch = ch
self.initial_size = initial_size
if activate == 'sigmoid':
self.activate = F.sigmoid
elif activate == 'tanh':
self.activate = F.tanh
else:
raise ValueError('invalid activate function')
w = chainer.initializers.Normal(wscale)
super(Generator, self).__init__(
l0=L.Linear(self.n_hidden, initial_size * initial_size * ch, initialW=w),
dc1=L.Deconvolution2D(ch // 1, ch // 2, 4, 2, 1, initialW=w),
dc2=L.Deconvolution2D(ch // 2, ch // 4, 4, 2, 1, initialW=w),
dc3=L.Deconvolution2D(ch // 4, ch // 8, 4, 2, 1, initialW=w),
dc4=L.Deconvolution2D(ch // 8, 3, 3, 1, 1, initialW=w),
bn0=L.BatchNormalization(initial_size * initial_size * ch),
bn1=L.BatchNormalization(ch // 2),
bn2=L.BatchNormalization(ch // 4),
bn3=L.BatchNormalization(ch // 8),
)
def __call__(self, X):
# remove right paddings
# e.g.
# kernel_size = 3
# pad = 2
# input sequence with paddings:
# [0, 0, x1, x2, x3, 0, 0]
# |< t1 >|
# |< t2 >|
# |< t3 >|
pad = self._kernel_size - 1
WX = self.W(X)[:, :, :-pad]
A, B = functions.split_axis(WX, 2, axis=1)
self.H = A * functions.sigmoid(B)
return self.H
def to_function(self):
if self.nonlinearity.lower() == "clipped_relu":
return clipped_relu()
if self.nonlinearity.lower() == "crelu":
return crelu()
if self.nonlinearity.lower() == "elu":
return elu()
if self.nonlinearity.lower() == "hard_sigmoid":
return hard_sigmoid()
if self.nonlinearity.lower() == "leaky_relu":
return leaky_relu()
if self.nonlinearity.lower() == "relu":
return relu()
if self.nonlinearity.lower() == "sigmoid":
return sigmoid()
if self.nonlinearity.lower() == "softmax":
return softmax()
if self.nonlinearity.lower() == "softplus":
return softplus()
if self.nonlinearity.lower() == "tanh":
return tanh()
raise NotImplementedError()
def __call__(self, v, h, label):
v_t = self.vertical_conv_t(v)
v_s = self.vertical_conv_s(v)
to_vertical_t = self.v_to_h_conv_t(v_t)
to_vertical_s = self.v_to_h_conv_s(v_s)
# v_gate = self.vertical_gate_conv(v)
# label bias is added to both vertical and horizontal conv
# here we take only shape as it should be the same
label = F.broadcast_to(F.expand_dims(F.expand_dims(self.label(label), -1), -1), v_t.shape)
v_t, v_s = v_t + label, v_s + label
v = F.tanh(v_t) * F.sigmoid(v_s)
h_t = self.horizontal_conv_t(h)
h_s = self.horizontal_conv_s(h)
h_t, h_s = h_t + to_vertical_t + label, h_s + to_vertical_s + label
h = self.horizontal_output(F.tanh(h_t) * F.sigmoid(h_s))
return v, h
def propup(self, vis):
"""
This function propagates the visible units activation upwards to the hidden units
Eq.(7)
:param vis: Variable Matrix(batch_size, in_channels, image_height, image_width)
- given v_sample
:return: Variable Matrix(batch_size, out_channels, image_height_out, image_width_out)
- probability for each hidden units to be h_i=1
"""
# conv.W: Matrix(out_channels, in_channels, filter height=ksize, filter width=ksize)
# conv.b: Vec (out_channels, )
if self.real == 0:
pre_sigmoid_activation = self.conv(vis)
else:
pre_sigmoid_activation = self.conv(vis / self.std_ch)
# F.matmul(vis, self.conv.W, transb=True) + F.broadcast_to(self.conv.b, (vis.data.shape[0], self.n_hidden))
return F.sigmoid(pre_sigmoid_activation)
def propdown(self, hid):
""" This function propagates the hidden units activation downwords to the visible units
:param hid: Variable Matrix(batch_size, out_channels, image_height_out, image_width_out) - given h_sample
:return: Variable Matrix(batch_size, in_channels, image_height, image_width) - probability for each visible units to be v_j = 1
"""
batch_size = hid.data.shape[0]
if self.real == 0:
W_flipped = F.swapaxes(CF.flip(self.conv.W, axes=(2, 3)), axis1=0, axis2=1)
pre_sigmoid_activation = F.convolution_2d(hid, W_flipped, self.conv.a, pad=self.ksize-1)
# F.matmul(hid, self.l.W) + F.broadcast_to(self.l.a, (batch_size, self.n_visible))
v_mean = F.sigmoid(pre_sigmoid_activation)
#print('W info ', self.conv.W.data.shape, 'W_flipped info ', W_flipped.data.shape)
#print('W info ', self.conv.W.data[3, 0, 2, 3], 'W_flipped info ', W_flipped.data[0, 3, 8, 7])
#print('W info ', self.conv.W.data[3, 0, 8, 7], 'W_flipped info ', W_flipped.data[0, 3, 2, 3])
#print('W info ', self.conv.W.data[19, 0, 4, 0], 'W_flipped info ', W_flipped.data[0, 19, 6, 10])
#print('pre_sigmoidactivation', F.sum(pre_sigmoid_activation).data)
#print('v_mean', v_mean.data.shape)
#print('v_mean sum', F.sum(v_mean).data)
#print('hid', hid.data.shape)
else:
# TODO: check
W_flipped = F.swapaxes(CF.flip(self.conv.W, axes=(2, 3)), axis1=0, axis2=1)
v_mean = F.convolution_2d(hid, W_flipped, self.conv.a, pad=self.ksize-1)
return v_mean
def reconstruct(self, v):
"""
:param v: Variable Matrix(batch_size, in_channels, image_height, image_width)
:return: reconstructed_v, Variable Matrix(batch_size, in_channels, image_height, image_width)
"""
batch_size = v.data.shape[0]
xp = cuda.get_array_module(v.data)
if self.real == 0:
h = F.sigmoid(self.conv(v))
else:
std_ch = xp.reshape(self.std, (1, self.in_channels, 1, 1))
h = F.sigmoid(self.conv(v / std_ch))
# F.sigmoid(F.matmul(v, self.l.W, transb=True) + F.broadcast_to(self.l.b, (batch_size, self.n_hidden)))
W_flipped = F.swapaxes(CF.flip(self.conv.W, axes=(2, 3)), axis1=0, axis2=1)
reconstructed_v = F.sigmoid(F.convolution_2d(h, W_flipped, self.conv.a, pad=self.ksize-1))
# = F.sigmoid(F.matmul(h, self.l.W) + F.broadcast_to(self.l.a, (batch_size, self.n_visible)))
return reconstructed_v
def __call__(self, x):
if not hasattr(self, 'encoding') or self.encoding is None:
self.batch_size = x.shape[0]
self.init()
dims = len(x.shape) - 1
f, z, o = F.split_axis(self.pre(x), 3, axis=dims)
f = F.sigmoid(f)
z = (1 - f) * F.tanh(z)
o = F.sigmoid(o)
if dims == 2:
self.c = strnn(f, z, self.c[:self.batch_size])
else:
self.c = f * self.c + z
if self.attention:
context = attention_sum(self.encoding, self.c)
self.h = o * self.o(F.concat((self.c, context), axis=dims))
else:
self.h = self.c * o
self.x = x
return self.h
def to_function(self):
if self.nonlinearity.lower() == "clipped_relu":
return clipped_relu()
if self.nonlinearity.lower() == "crelu":
return crelu()
if self.nonlinearity.lower() == "elu":
return elu()
if self.nonlinearity.lower() == "hard_sigmoid":
return hard_sigmoid()
if self.nonlinearity.lower() == "leaky_relu":
return leaky_relu()
if self.nonlinearity.lower() == "relu":
return relu()
if self.nonlinearity.lower() == "sigmoid":
return sigmoid()
if self.nonlinearity.lower() == "softmax":
return softmax()
if self.nonlinearity.lower() == "softplus":
return softplus()
if self.nonlinearity.lower() == "tanh":
return tanh()
raise NotImplementedError()
def to_function(self):
if self.nonlinearity.lower() == "clipped_relu":
return clipped_relu()
if self.nonlinearity.lower() == "crelu":
return crelu()
if self.nonlinearity.lower() == "elu":
return elu()
if self.nonlinearity.lower() == "hard_sigmoid":
return hard_sigmoid()
if self.nonlinearity.lower() == "leaky_relu":
return leaky_relu()
if self.nonlinearity.lower() == "relu":
return relu()
if self.nonlinearity.lower() == "sigmoid":
return sigmoid()
if self.nonlinearity.lower() == "softmax":
return softmax()
if self.nonlinearity.lower() == "softplus":
return softplus()
if self.nonlinearity.lower() == "tanh":
return tanh()
raise NotImplementedError()
def __call__(self, x):
z = self.W_z(x)
h_bar = self.W(x)
if self.h is not None:
r = F.sigmoid(self.W_r(x) + self.U_r(self.h))
z += self.U_z(self.h)
h_bar += self.U(r * self.h)
z = F.sigmoid(z)
h_bar = F.tanh(h_bar)
if self.h is not None:
h_new = F.linear_interpolate(z, h_bar, self.h)
else:
h_new = z * h_bar
self.h = h_new # save the state
return h_new
def differentiable_backward(self, g):
if self.normalize_input:
raise NotImplementedError
if self.activation is F.leaky_relu:
g = backward_leaky_relu(self.x, g)
elif self.activation is F.relu:
g = backward_relu(self.x, g)
elif self.activation is F.tanh:
g = backward_tanh(self.x, g)
elif self.activation is F.sigmoid:
g = backward_sigmoid(self.x, g)
elif not self.activation is None:
raise NotImplementedError
if self.norm == 'ln':
g = backward_layernormalization(self.nx, g, self.n)
elif not self.norm is None:
raise NotImplementedError
if self.nn == 'down_conv' or self.nn == 'conv':
g = backward_convolution(None, g, self.c)
elif self.nn == 'linear':
g = backward_linear(None, g, self.c)
elif self.nn == 'up_deconv':
g = backward_deconvolution(None, g, self.c)
else:
raise NotImplementedError
return g
def predict(self, input_x):
if isinstance(input_x, chainer.Variable):
device = cuda.get_device(input_x.data)
else:
device = cuda.get_device(input_x)
xp = self.predictor.xp
with device:
output = self.predictor(input_x)
batch_size, input_channel, input_h, input_w = input_x.shape
batch_size, _, grid_h, grid_w = output.shape
x, y, w, h, conf, prob = F.split_axis(F.reshape(output, (batch_size, self.predictor.n_boxes, self.predictor.n_classes+5, grid_h, grid_w)), (1, 2, 3, 4, 5), axis=2)
x = F.sigmoid(x)
y = F.sigmoid(y)
conf = F.sigmoid(conf)
prob = F.transpose(prob, (0, 2, 1, 3, 4))
prob = F.softmax(prob)
prob = F.transpose(prob, (0, 2, 1, 3, 4))
# convert coordinates to those on the image
x_shift = xp.asarray(np.broadcast_to(np.arange(grid_w, dtype=np.float32), x.shape))
y_shift = xp.asarray(np.broadcast_to(np.arange(grid_h, dtype=np.float32).reshape(grid_h, 1), y.shape))
w_anchor = xp.asarray(np.broadcast_to(np.reshape(np.array(self.anchors, dtype=np.float32)[:, 0], (self.predictor.n_boxes, 1, 1, 1)), w.shape))
h_anchor = xp.asarray(np.broadcast_to(np.reshape(np.array(self.anchors, dtype=np.float32)[:, 1], (self.predictor.n_boxes, 1, 1, 1)), h.shape))
box_x = (x + x_shift) / grid_w
box_y = (y + y_shift) / grid_h
box_w = F.exp(w) * w_anchor / grid_w
box_h = F.exp(h) * h_anchor / grid_h
return box_x, box_y, box_w, box_h, conf, prob
def predict(self, input_x):
if isinstance(input_x, chainer.Variable):
device = cuda.get_device(input_x.data)
else:
device = cuda.get_device(input_x)
xp = self.predictor.xp
with device:
output = self.predictor(input_x)
batch_size, input_channel, input_h, input_w = input_x.shape
batch_size, _, grid_h, grid_w = output.shape
x, y, w, h, conf, prob = F.split_axis(F.reshape(output, (batch_size, self.predictor.n_boxes, self.predictor.n_classes+5, grid_h, grid_w)), (1, 2, 3, 4, 5), axis=2)
x = F.sigmoid(x)
y = F.sigmoid(y)
conf = F.sigmoid(conf)
prob = F.transpose(prob, (0, 2, 1, 3, 4))
prob = F.softmax(prob)
prob = F.transpose(prob, (0, 2, 1, 3, 4))
# convert coordinates to those on the image
x_shift = xp.asarray(np.broadcast_to(np.arange(grid_w, dtype=np.float32), x.shape))
y_shift = xp.asarray(np.broadcast_to(np.arange(grid_h, dtype=np.float32).reshape(grid_h, 1), y.shape))
w_anchor = xp.asarray(np.broadcast_to(np.reshape(np.array(self.anchors, dtype=np.float32)[:, 0], (self.predictor.n_boxes, 1, 1, 1)), w.shape))
h_anchor = xp.asarray(np.broadcast_to(np.reshape(np.array(self.anchors, dtype=np.float32)[:, 1], (self.predictor.n_boxes, 1, 1, 1)), h.shape))
box_x = (x + x_shift) / grid_w
box_y = (y + y_shift) / grid_h
box_w = F.exp(w) * w_anchor / grid_w
box_h = F.exp(h) * h_anchor / grid_h
return box_x, box_y, box_w, box_h, conf, prob
def __call__(self, z):
h = F.reshape(F.relu(self.bn0(self.l0(z))),
(len(z), self.ch, self.bottom_width, self.bottom_width))
h = F.relu(self.bn1(self.dc1(h)))
h = F.relu(self.bn2(self.dc2(h)))
h = F.relu(self.bn3(self.dc3(h)))
x = F.sigmoid(self.dc4(h))
return x
def __call__(self, x, sigmoid=True):
"""AutoEncoder"""
return self.decode(self.encode(x)[0], sigmoid)
def decode(self, z, sigmoid=True):
h1 = F.tanh(self.ld1(z))
h2 = self.ld2(h1)
if sigmoid:
return F.sigmoid(h2)
else:
return h2