def calcLoss(self, t, categ_vec_h, categ_vec_c, mu, ln_var,wei_arr=None):
k = self.sample_size;
loss = None
t_pred = [t_e[1:] + [2] for t_e in t]
t_pred = [xp.asarray(tp_e, dtype=xp.int32) for tp_e in t_pred]
t = self.denoiseInput(t)
t_vec = self.makeEmbedBatch(t)
for l in range(k):
z = F.gaussian(mu, ln_var)
if loss is None:
loss = self.decode(z, categ_vec_h, categ_vec_c, t_vec, t_pred,wei_arr) / (k * self.batch_size)
elif loss is not None:
loss += self.decode(z, categ_vec_h, categ_vec_c, t_vec, t_pred,wei_arr) / (k * self.batch_size)
C = 0.005 * (self.epoch_now - self.kl_zero_epoch) / self.epoch # 0.02
if self.epoch_now > self.kl_zero_epoch: loss+= C * F.gaussian_kl_divergence(mu, ln_var) / self.batch_size
return loss
python类gaussian()的实例源码
def get_loss_func(self, C=1.0, k=1):
"""Get loss function of VAE.
The loss value is equal to ELBO (Evidence Lower Bound)
multiplied by -1.
Args:
C (int): Usually this is 1.0. Can be changed to control the
second term of ELBO bound, which works as regularization.
k (int): Number of Monte Carlo samples used in encoded vector.
"""
def lf(x):
mu, ln_var = self.encode(x)
batchsize = len(mu.data)
# reconstruction loss
rec_loss = 0
for l in six.moves.range(k):
z = F.gaussian(mu, ln_var)
rec_loss += F.bernoulli_nll(x, self.decode(z, sigmoid=False)) \
/ (k * batchsize)
self.rec_loss = rec_loss
self.loss = self.rec_loss + \
C * gaussian_kl_divergence(mu, ln_var) / batchsize
return self.loss
return lf
def encode_z(self, x, a):
# a = F.gaussian(self.qmu_a, self.qln_var_a) # This should be outside the encoding function. Pass the function a.
net_input = F.concat((x,a), axis=1)
h = self.qlinz0(net_input)
h = self.qlinz_batch_norm_0(h)
h = F.crelu(h)
for i in range(self.num_layers-1):
layer_name = 'qlinz' + str(i+1)
h = self[layer_name](h)
layer_name = 'qlinz_batch_norm_' + str(i+1)
h = self[layer_name](h)
h = F.crelu(h)
self.qmu_z = self.qlinz_mu(h)
self.qln_var_z = self.qlinz_ln_var(h)
return self.qmu_z, self.qln_var_z
def get_loss_func(self, C=1.0, k=1):
def lf(x):
mu, ln_var = self.encode(x)
batchsize = len(mu.data)
rec_loss = 0
for l in six.moves.range(k):
z = F.gaussian(mu, ln_var)
rec_loss += F.bernoulli_nill(x, self.decode(z, sigmoid=False))
rec_loss /= (k * batchsize)
self.rec_loss = rec_loss
self.loss = self.rec_loss + C * gaussian_kl_divergence(mu, ln_var)
self.loss /= batchsize
return self.loss
return lf
def term_bias(self, bs, train=True):
""" Compute overall bias and broadcast to shape of batchsize
"""
shape = (bs, 1,)
# Bias is drawn from a Gaussian with given mu and log variance
bs_mu = F.broadcast_to(self.bias_mu.b, shape)
bs_lv = F.broadcast_to(self.bias_lv.b, shape)
bias = F.flatten(F.gaussian(bs_mu, bs_lv))
# Add a very negative log variance so we're sampling
# from a very narrow distribution about the mean.
# Useful for validation dataset when we want to only guess
# the mean.
if not train:
bs_lv += self.lv_floor
# Compute prior on the bias, so compute the KL div
# from the KL(N(mu_bias, var_bias) | N(0, 1))
kld = F.gaussian_kl_divergence(self.bias_mu.b, self.bias_lv.b)
return bias, kld
def term_feat(self, iloc, jloc, ival, jval, bs, nf, train=True):
# Change all of the shapes to form interaction vectors
shape = (bs, nf * 2, self.n_dim)
feat_mu_vec = F.broadcast_to(self.feat_mu_vec.b, shape)
feat_lv_vec = F.broadcast_to(self.feat_lv_vec.b, shape)
if not train:
feat_lv_vec += self.lv_floor
# Construct the interaction mean and variance
# iloc is (bs, nf), feat(iloc) is (bs, nf, ndim) and
# dot(feat, feat) is (bs, nf)
ivec = F.gaussian(feat_mu_vec + self.feat_delta_mu(iloc),
feat_lv_vec + self.feat_delta_lv(iloc))
jvec = F.gaussian(feat_mu_vec + self.feat_delta_mu(jloc),
feat_lv_vec + self.feat_delta_lv(jloc))
# feat is (bs, )
feat = dot(F.sum(ivec * jvec, axis=2), ival * jval)
# Compute the KLD for the group mean vector and variance vector
kld1 = F.gaussian_kl_divergence(self.feat_mu_vec.b, self.feat_lv_vec.b)
# Compute the KLD for vector deviations from the group mean and var
kld2 = F.gaussian_kl_divergence(self.feat_delta_mu.W,
self.feat_delta_lv.W)
return feat, kld1 + kld2
def term_bias(self, bs, train=True):
""" Compute overall bias and broadcast to shape of batchsize
"""
shape = (bs, 1,)
# Bias is drawn from a Gaussian with given mu and log variance
bs_mu = F.broadcast_to(self.bias_mu.b, shape)
bs_lv = F.broadcast_to(self.bias_lv.b, shape)
bias = F.flatten(F.gaussian(bs_mu, bs_lv))
# Add a very negative log variance so we're sampling
# from a very narrow distribution about the mean.
# Useful for validation dataset when we want to only guess
# the mean.
if not train:
bs_lv += self.lv_floor
# Compute prior on the bias, so compute the KL div
# from the KL(N(mu_bias, var_bias) | N(0, 1))
kld = F.gaussian_kl_divergence(self.bias_mu.b, self.bias_lv.b)
return bias, kld
def train(self, x, L=1, test=False):
batchsize = x.data.shape[0]
z_mean, z_ln_var = self.encoder(x, test=test, apply_f=False)
loss = 0
for l in xrange(L):
# Sample z
z = F.gaussian(z_mean, z_ln_var)
# Compute lower bound
log_px_z = self.log_px_z(x, z, test=test)
log_pz = self.log_pz(z, z_mean, z_ln_var)
log_qz_x = self.log_qz_x(z, z_mean, z_ln_var)
lower_bound = log_px_z + log_pz - log_qz_x
loss += -lower_bound
loss = F.sum(loss) / L / batchsize
self.zero_grads()
loss.backward()
self.update()
if self.gpu:
loss.to_cpu()
return loss.data
def train(self, x, L=1, test=False):
batchsize = x.data.shape[0]
z_mean, z_ln_var = self.encoder(x, test=test, apply_f=False)
loss = 0
for l in xrange(L):
# Sample z
z = F.gaussian(z_mean, z_ln_var)
# Decode
x_expectation = self.decoder(z, test=test, apply_f=False)
# E_q(z|x)[log(p(x|z))]
loss += self.bernoulli_nll_keepbatch(x, x_expectation)
if L > 1:
loss /= L
# KL divergence
loss += self.gaussian_kl_divergence_keepbatch(z_mean, z_ln_var)
loss = F.sum(loss) / batchsize
self.zero_grads()
loss.backward()
self.update()
if self.gpu:
loss.to_cpu()
return loss.data
def sample(self):
return F.gaussian(self.mean, self.ln_var)
def calcLoss(self,t,mu,ln_var):
k = self.sample_size;kl_zero_epoch = self.kl_zero_epoch
loss = None
t_pred = [t_e[1:]+[2] for t_e in t]
t_pred = [xp.asarray(tp_e,dtype=xp.int32) for tp_e in t_pred]
t = self.denoiseInput(t)
print("t:{}".format([self.vocab.itos(t_e) for t_e in t[0]]))
t_vec = self.makeEmbedBatch(t)
for l in range(k):
z = F.gaussian(mu, ln_var)
if loss is None:loss = self.decode(z,t_vec,t_pred) / (k * self.batch_size)
elif loss is not None:loss += self.decode(z,t_vec,t_pred) / (k * self.batch_size)
C = 0.06 *(self.epoch_now-kl_zero_epoch)/self.epoch
if self.epoch_now>kl_zero_epoch:loss += C * F.gaussian_kl_divergence(mu, ln_var) / self.batch_size
return loss
def encode_z(self, x, a):
# a = F.gaussian(self.qmu_a, self.qln_var_a) # This should be outside the encoding function. Pass the function a.
net_input = F.concat((x,a), axis=1)
h = F.crelu(self.qlinz0(net_input))
for i in range(self.num_layers-1):
layer_name = 'qlinz' + str(i+1)
h = F.crelu(self[layer_name](h))
self.qmu_z = self.qlinz_mu(h)
self.qln_var_z = self.qlinz_ln_var(h)
return self.qmu_z, self.qln_var_z
def encode_z(self, x, a):
# a = F.gaussian(self.qmu_a, self.qln_var_a) # This should be outside the encoding function. Pass the function a.
net_input = F.concat((x,a), axis=1)
h = F.crelu(self.qlinz0(net_input))
for i in range(self.num_layers-1):
layer_name = 'qlinz' + str(i+1)
h = F.crelu(self[layer_name](h))
self.qmu_z = self.qlinz_mu(h)
self.qln_var_z = self.qlinz_ln_var(h)
return self.qmu_z, self.qln_var_z
def encode_z(self, x, a):
# a = F.gaussian(self.qmu_a, self.qln_var_a) # This should be outside the encoding function. Pass the function a.
net_input = F.concat((x,a), axis=1)
h = F.crelu(self.qlinz0(net_input))
for i in range(self.num_layers-1):
layer_name = 'qlinz' + str(i+1)
h = F.crelu(self[layer_name](h))
self.qmu_z = self.qlinz_mu(h)
self.qln_var_z = self.qlinz_ln_var(h)
return self.qmu_z, self.qln_var_z
def __call__(self, x):
# Compute q(z|x)
encoding_time = time.time()
self.encode(x)
encoding_time = float(time.time() - encoding_time)
decoding_time_average = 0.
self.kl = gaussian_kl_divergence_standard(self.qmu, self.qln_var)
self.logp = 0
for j in xrange(self.num_zsamples):
# z ~ q(z|x)
z = F.gaussian(self.qmu, self.qln_var)
# Compute p(x|z)
decoding_time = time.time()
self.decode(z)
decoding_time = time.time() - decoding_time
decoding_time_average += decoding_time
# Compute objective
self.logp += gaussian_logp(x, self.pmu, self.pln_var)
current_temperature = min(self.temperature['value'],1.0)
self.temperature['value'] += self.temperature['increment']
# pdb.set_trace()
decoding_time_average /= self.num_zsamples
self.logp /= self.num_zsamples
self.obj_batch = self.logp - (current_temperature*self.kl)
self.timing_info = np.array([encoding_time,decoding_time_average])
batch_size = self.obj_batch.shape[0]
self.obj = -F.sum(self.obj_batch)/batch_size
return self.obj
def __call__(self, x):
if chainer.config.train == False:
return x
xp = cuda.get_array_module(x.data)
std = math.log(self.std ** 2)
noise = functions.gaussian(chainer.Variable(xp.zeros_like(x.data)), chainer.Variable(xp.full_like(x.data, std)))
return x + noise
# Link
def get_loss_func(self, C=1.0, k=1, train=True):
"""Get loss function of VAE.
The loss value is equal to ELBO (Evidence Lower Bound)
multiplied by -1.
Args:
C (int): Usually this is 1.0. Can be changed to control the
second term of ELBO bound, which works as regularization.
k (int): Number of Monte Carlo samples used in encoded vector.
train (bool): If true loss_function is used for training.
"""
def lf(x):
mu, ln_var = self.encode(x)
batchsize = len(mu.data)
# reconstruction loss
rec_loss = 0
for l in six.moves.range(k):
z = F.gaussian(mu, ln_var)
rec_loss += F.bernoulli_nll(x, self.decode(z, sigmoid=False)) \
/ (k * batchsize)
self.rec_loss = rec_loss
self.loss = self.rec_loss + \
C * gaussian_kl_divergence(mu, ln_var) / batchsize
return self.loss
return lf
def __call__(self, x):
xp = cuda.get_array_module(x.data)
ln_var = math.log(self.std ** 2)
noise = F.gaussian(Variable(xp.zeros_like(x.data)), Variable(xp.full_like(x.data, ln_var)))
return x + noise
def __call__(self, x):
if chainer.config.train == False:
return x
data = x.data if isinstance(x, chainer.Variable) else x
xp = cuda.get_array_module(data)
ln_var = math.log(self.std ** 2)
noise = functions.gaussian(xp.full_like(data, self.mean), xp.full_like(data, ln_var))
return x + noise
# Connections
def __call__(self, x, sigmoid=True):
"""AutoEncoder"""
mu, ln_var = self.encode(x)
batchsize = len(mu.data)
# reconstruction loss
rec_loss = 0
for l in six.moves.range(self.k):
z = F.gaussian(mu, ln_var)
rec_loss += F.bernoulli_nll(x, self.decode(z, sigmoid=False)) \
/ (self.k * batchsize)
loss = rec_loss + \
self.C * gaussian_kl_divergence(mu, ln_var) / batchsize
chainer.report({'loss': loss}, self)
return loss
def lf(self, x):
mu, ln_var = self.encode(x)
batchsize = len(mu.data)
# reconstruction loss
rec_loss = 0
for l in six.moves.range(self.k):
z = F.gaussian(mu, ln_var)
rec_loss += F.bernoulli_nll(x, self.decode(z, sigmoid=False)) \
/ (self.k * batchsize)
self.rec_loss = rec_loss
self.loss = self.rec_loss + \
self.C * gaussian_kl_divergence(mu, ln_var) / batchsize
return self.loss
def term_slop(self, loc, val, bs, nf, train=True):
""" Compute the slope for each active feature.
"""
shape = (bs, nf)
# Reshape all of our constants
pr_mu = F.broadcast_to(self.slop_mu.b, shape)
pr_lv = F.broadcast_to(self.slop_lv.b, shape)
# This is either zero or a very negative number
# indicating to sample N(mean, logvar) or just draw
# the mean preicsely
if not train:
pr_lv += self.lv_floor
# The feature slopes are grouped together so that they
# all share a common mean. Then individual features slop_delta_lv
# are shrunk towards zero, which effectively sets features to fall
# back on the group mean.
sl_mu = F.reshape(self.slop_delta_mu(loc), shape) + pr_mu
sl_lv = F.reshape(self.slop_delta_lv(loc), shape) + pr_lv
coef = F.gaussian(sl_mu, sl_lv)
slop = F.sum(coef * val, axis=1)
# Calculate divergence between group mean and N(0, 1)
kld1 = F.gaussian_kl_divergence(self.slop_mu.b, self.slop_lv.b)
# Calculate divergence of individual delta means and delta vars
args = (self.slop_delta_mu.W, self.slop_delta_lv.W)
kld2 = F.gaussian_kl_divergence(*args)
return slop, kld1 + kld2
def term_slop(self, loc, val, bs, nf, train=True):
""" Compute the slope for each active feature.
"""
shape = (bs, nf)
# Reshape all of our constants
pr_mu = F.broadcast_to(self.slop_mu.b, shape)
pr_lv = F.broadcast_to(self.slop_lv.b, shape)
# This is either zero or a very negative number
# indicating to sample N(mean, logvar) or just draw
# the mean preicsely
if not train:
pr_lv += self.lv_floor
# The feature slopes are grouped together so that they
# all share a common mean. Then individual features slop_delta_lv
# are shrunk towards zero, which effectively sets features to fall
# back on the group mean.
sl_mu = F.reshape(self.slop_delta_mu(loc), shape) + pr_mu
sl_lv = F.reshape(self.slop_delta_lv(loc), shape) + pr_lv
coef = F.gaussian(sl_mu, sl_lv)
slop = F.sum(coef * val, axis=1)
# Calculate divergence between group mean and N(0, 1)
kld1 = F.gaussian_kl_divergence(self.slop_mu.b, self.slop_lv.b)
# Calculate divergence of individual delta means and delta vars
args = (self.slop_delta_mu.W, self.slop_delta_lv.W)
kld2 = F.gaussian_kl_divergence(*args)
return slop, kld1 + kld2
def __call__(self, x, test=False):
mu_array1=chainer.Variable(xp.array(xp.zeros([batchsize,784]),dtype=np.float32))
log_std_array1=chainer.Variable(xp.log(0.09)*xp.array(xp.ones([batchsize,784]),dtype=np.float32))
mu_array2=chainer.Variable(xp.array(xp.zeros([batchsize,1000]),dtype=np.float32))
log_std_array2=chainer.Variable(xp.log(0.09)*xp.array(xp.ones([batchsize,1000]),dtype=np.float32))
mu_array3=chainer.Variable(xp.array(xp.zeros([batchsize,500]),dtype=np.float32))
log_std_array3=chainer.Variable(xp.log(0.09)*xp.array(xp.ones([batchsize,500]),dtype=np.float32))
mu_array4=chainer.Variable(xp.array(xp.zeros([batchsize,250]),dtype=np.float32))
log_std_array4=chainer.Variable(xp.log(0.09)*xp.array(xp.ones([batchsize,250]),dtype=np.float32))
mu_array5=chainer.Variable(xp.array(xp.zeros([batchsize,250]),dtype=np.float32))
log_std_array5=chainer.Variable(xp.log(0.09)*xp.array(xp.ones([batchsize,250]),dtype=np.float32))
mu_array6=chainer.Variable(xp.array(xp.zeros([batchsize,250]),dtype=np.float32))
log_std_array6=chainer.Variable(xp.log(0.09)*xp.array(xp.ones([batchsize,250]),dtype=np.float32))
x=x+F.gaussian(mu_array1,log_std_array1)
h1=F.leaky_relu(self.bn0(self.l0(x)+F.gaussian(mu_array2,log_std_array2),test),slope=0.1)
h2=F.leaky_relu(self.bn1(self.l1(h1)+F.gaussian(mu_array3,log_std_array3),test),slope=0.1)
h3=F.leaky_relu(self.bn2(self.l2(h2)+F.gaussian(mu_array4,log_std_array4),test),slope=0.1)
h4=F.leaky_relu(self.bn3(self.l3(h3)+F.gaussian(mu_array5,log_std_array5),test),slope=0.1)
h5=F.leaky_relu(self.bn4(self.l4(h4)+F.gaussian(mu_array6,log_std_array6),test),slope=0.1)
h6=F.softmax(self.l5(h5))
return h6
def __call__(self, x, test=False):
if test == True:
return x
xp = cuda.get_array_module(x.data)
ln_var = math.log(self.std ** 2)
noise = F.gaussian(Variable(xp.zeros_like(x.data)), Variable(xp.full_like(x.data, ln_var)))
return x + noise
def __call__(self, x, test=False):
if test == True:
return x
xp = cuda.get_array_module(x.data)
ln_var = math.log(self.std ** 2)
noise = F.gaussian(Variable(xp.zeros_like(x.data)), Variable(xp.full_like(x.data, ln_var)))
return x + noise
def __call__(self, x, test=False):
if test == True:
return x
xp = cuda.get_array_module(x.data)
ln_var = math.log(self.std ** 2)
noise = F.gaussian(Variable(xp.zeros_like(x.data)), Variable(xp.full_like(x.data, ln_var)))
return x + noise
def encode_x_a(self, x, test=False):
x = self.to_variable(x)
mean, ln_var = self.q_a_x(x, test=test)
return F.gaussian(mean, ln_var)
def encode_axy_z(self, a, x, y, test=False):
a = self.to_variable(a)
x = self.to_variable(x)
y = self.to_variable(y)
mean, ln_var = self.q_z_axy(a, x, y, test=test)
return F.gaussian(mean, ln_var)
def encode_x_z(self, x, test=False, argmax_y=True):
x = self.to_variable(x)
mean, ln_var = self.q_a_x(x, test=test)
a = F.gaussian(mean, ln_var)
y = self.sample_x_y(x, argmax=argmax_y, test=test)
mean, ln_var = self.q_z_axy(a, x, y, test=test)
return F.gaussian(mean, ln_var)