def test(epoch, prior):
ans = np.zeros((50, 10))
for data, lab in test_loader:
C = prior.predict(data.numpy().reshape(data.numpy().shape[0],-1))
for i in xrange(len(lab)):
ans[C[i],lab[i]]+=1
print(ans)
s = np.sum(ans)
v = 0
for i in xrange(ans.shape[0]):
for j in xrange(ans.shape[1]):
if ans[i,j]>0:
v += ans[i,j]/s*np.log(ans[i,j]/s/(np.sum(ans[i,:])/s)/(np.sum(ans[:,j])/s))
print("Mutual information: "+str(v))
#prior = BayesianGaussianMixture(n_components=100, covariance_type='diag')
python类BayesianGaussianMixture()的实例源码
def train(epoch, prior):
model.train()
train_loss = 0
#prior = BayesianGaussianMixture(n_components=1, covariance_type='diag')
tmp = []
for (data,_) in train_loader:
data = Variable(data)
if args.cuda:
data = data.cuda()
recon_batch, mu, logvar, z = model(data)
tmp.append(z.cpu().data.numpy())
print('Update Prior')
prior.fit(np.vstack(tmp))
print('prior: '+str(prior.weights_))
for batch_idx, (data, _) in enumerate(train_loader):
data = Variable(data)
if args.cuda:
data = data.cuda()
optimizer.zero_grad()
recon_batch, mu, logvar, z = model(data)
loss = loss_function(recon_batch, data, mu, logvar, prior, z)
loss.backward()
train_loss += loss.data[0]
optimizer.step()
#if batch_idx % args.log_interval == 0:
# print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
# epoch, batch_idx * len(data), len(train_loader.dataset),
# 100. * batch_idx / len(train_loader),
# loss.data[0] / len(data)))
print('====> Epoch: {} Average loss: {:.4f}'.format(
epoch, train_loss / len(train_loader.dataset)))
return prior
def train(epoch, prior):
prior = BayesianGaussianMixture(n_components=50, covariance_type='diag', n_init=5, max_iter=1000)
tmp = []
for (data,_) in train_loader:
#print(data.numpy().shape)
tmp.append(data.numpy().reshape(data.numpy().shape[0],-1))
prior.fit(np.vstack(tmp))
return prior