def test_gmmmap_swap():
static_dim = 2
T = 10
windows = _get_windows_set()[-1]
np.random.seed(1234)
src_mc = np.random.rand(T, static_dim * len(windows))
tgt_mc = np.random.rand(T, static_dim * len(windows))
# pseudo parallel data
XY = np.concatenate((src_mc, tgt_mc), axis=-1)
gmm = GaussianMixture(n_components=4)
gmm.fit(XY)
paramgen = MLPG(gmm, windows=windows, swap=False)
swap_paramgen = MLPG(gmm, windows=windows, swap=True)
mc_converted1 = paramgen.transform(src_mc)
mc_converted2 = swap_paramgen.transform(tgt_mc)
src_mc = src_mc[:, :static_dim]
tgt_mc = tgt_mc[:, :static_dim]
assert norm(tgt_mc - mc_converted1) < norm(src_mc - mc_converted1)
assert norm(tgt_mc - mc_converted2) > norm(src_mc - mc_converted2)
python类GaussianMixture()的实例源码
def mnplotsdistancethreshold(dists, method='bimodal', returnmodel=False):
distsloc = np.array(dists).reshape(-1, 1)
if method == 'bimodal':
GM = skmix.GaussianMixture(n_components=2)
GM.fit(distsloc)
if len(dists) == 1:
labels = [1]
else:
labels = GM.predict(distsloc)
labels = np.nonzero(labels == labels[0])[0]
if returnmodel:
return (labels, GM)
else:
return labels
elif method == 'largestgap' or method == 'largest_gap':
if len(dists) == 1:
labels = [1]
else:
gaps = np.subtract(dists[1:], dists[0:-1])
wgaps = np.multiply(gaps, np.arange(len(gaps), 0, -1)) # weight gaps (higher weight for first clusters)
labels = np.arange(0, np.argmax(wgaps)+1)
return labels
else:
raise ValueError('Invalid method submitted to mnplotsdistancethreshold. '
'Use either ''bimodal'' or ''largestgap''')
def _train_hierarchy(self, hierarchy):
"""Trains a hierarchy (quin-, tri- or single-phones).
This helper method fits a GMM for all phones of one hierarchy.
:param hierarchy: all phones of one hierarchy
:returns: fitted GMMs for the given hierarchy
"""
output = dict.fromkeys(hierarchy.keys())
for k in output.keys():
gm = GaussianMixture()
gm.fit(hierarchy[k])
output[k] = gm
print('Fitted {:s}'.format(''.join(str(k))))
return output
acousticModelTraining.py 文件源码
项目:jingjuSingingPhraseMatching
作者: ronggong
项目源码
文件源码
阅读 30
收藏 0
点赞 0
评论 0
def bicGMMModelSelection(X):
'''
bic model selection
:param X: features - observation * dimension
:return:
'''
lowest_bic = np.infty
bic = []
n_components_range = [10,15,20,25,30,35,40,45,50,55,60,65,70]
best_n_components = n_components_range[0]
for n_components in n_components_range:
# Fit a Gaussian mixture with EM
print 'Fitting GMM with n_components =',str(n_components)
gmm = mixture.GaussianMixture(n_components=n_components,
covariance_type='diag')
gmm.fit(X)
bic.append(gmm.bic(X))
if bic[-1] < lowest_bic:
lowest_bic = bic[-1]
best_n_components = n_components
best_gmm = gmm
return best_n_components,gmm
def find_centers_gmm(data, gridpoints, n_components=2):
r""" Provided 1D data and a grid of points, return the indices of the points closest to
the centers of an assumed n-modal distribution behind "data"
data : 1D data ndarray, of shape (N, 1)
gridpoints: 1D gridpoints
returns: idxs, igmm
idxs: 1D ndarray
INDICES of gridpoints that are closest to the centers of the gaussians
igmm: gaussian mixture model (sklearn type)
tested = true
"""
igmm = _GMM(n_components=n_components)
igmm.fit(data)
return _np.abs(gridpoints-_np.sort(igmm.means_, 0)).argmin(1), igmm
def fit_gmm(data, prefix):
# Fit GMM to histogram
X, _ = np.histogram(data, normed=True, bins=10)
X = np.expand_dims(X, 1)
gmm = GaussianMixture(n_components=2).fit(X)
# Get means / variances in order of ascending mean
mu = np.array([gmm.means_[0][0], gmm.means_[1][0]])
var = np.array([gmm.covariances_[0][0], gmm.covariances_[1][0]])
order = np.argsort(mu)
# Means/variances of each component
mu_0, mu_1 = mu[order]
var_0, var_1 = var[order]
return {'{}_mu0'.format(prefix): mu_0,
'{}_mu1'.format(prefix): mu_1,
'{}_var0'.format(prefix): var_0[0],
'{}_var1'.format(prefix): var_1[0]}
def test_GMM_n_components(*data):
'''
test the performance with different N_components
:param data: data, target
:return: None
'''
X,labels_true=data
nums=range(1,50)
ARIs=[]
for num in nums:
clst=mixture.GaussianMixture(n_components=num)
clst.fit(X)
predicted_labels=clst.predict(X)
ARIs.append(adjusted_rand_score(labels_true,predicted_labels))
## graph
fig=plt.figure()
ax=fig.add_subplot(1,1,1)
ax.plot(nums,ARIs,marker="+")
ax.set_xlabel("n_components")
ax.set_ylabel("ARI")
fig.suptitle("GMM")
plt.show()
def mog_test(self, embedding, labels, n_genres=None):
"""
Evaluates how well the given embedding performs on the mixture of gaussians task
@param embedding: a list of vectors [v1, v2, ..., vn] of song embeddings
in R^k.
@param labels: a list of genres where labels[i] is the genre of embedding[i].
@param n_genres: the number of genres (for convenience). computed manually if None
is given.
"""
if n_genres == None:
n_genres = len(set(labels))
clf = GaussianMixture(n_components=n_genres)
clf.fit(embedding)
p_labels = clf.predict(embedding)
return self.get_scores(labels, p_labels)
def _discretize_by_gmm(col, num_bins, random_state):
nan_idx = col[col.isnull()].index
gmm = GaussianMixture(n_components=num_bins,covariance_type='full',random_state=random_state)
gmm = gmm.fit(X=np.expand_dims(col.dropna(), 1))
if col.isnull().sum() == 0:
group = gmm.predict(X=np.expand_dims(col, 1))
else:
group = gmm.predict(X=np.expand_dims(col.dropna(), 1)).astype(float)
for idx in nan_idx:
group = np.insert(group,idx,np.nan)
return pd.Series(group)
def test_diffvc():
# MLPG is performed dimention by dimention, so static_dim 1 is enough, 2 just for in
# case.
static_dim = 2
T = 10
for windows in _get_windows_set():
np.random.seed(1234)
src_mc = np.random.rand(T, static_dim * len(windows))
tgt_mc = np.random.rand(T, static_dim * len(windows))
# pseudo parallel data
XY = np.concatenate((src_mc, tgt_mc), axis=-1)
gmm = GaussianMixture(n_components=4)
gmm.fit(XY)
paramgen = MLPG(gmm, windows=windows, diff=False)
diff_paramgen = MLPG(gmm, windows=windows, diff=True)
mc_converted1 = paramgen.transform(src_mc)
mc_converted2 = diff_paramgen.transform(src_mc)
assert mc_converted1.shape == (T, static_dim)
assert mc_converted2.shape == (T, static_dim)
src_mc = src_mc[:, :static_dim]
tgt_mc = tgt_mc[:, :static_dim]
assert norm(tgt_mc - mc_converted1) < norm(src_mc - mc_converted1)
def __init__(self, gmm, swap=False, diff=False):
assert gmm.covariance_type == "full"
# D: static + delta dim
D = gmm.means_.shape[1] // 2
self.num_mixtures = gmm.means_.shape[0]
self.weights = gmm.weights_
# Split source and target parameters from joint GMM
self.src_means = gmm.means_[:, :D]
self.tgt_means = gmm.means_[:, D:]
self.covarXX = gmm.covariances_[:, :D, :D]
self.covarXY = gmm.covariances_[:, :D, D:]
self.covarYX = gmm.covariances_[:, D:, :D]
self.covarYY = gmm.covariances_[:, D:, D:]
if diff:
self.tgt_means = self.tgt_means - self.src_means
self.covarYY = self.covarXX + self.covarYY - self.covarXY - self.covarYX
self.covarXY = self.covarXY - self.covarXX
self.covarYX = self.covarXY.transpose(0, 2, 1)
# swap src and target parameters
if swap:
self.tgt_means, self.src_means = self.src_means, self.tgt_means
self.covarYY, self.covarXX = self.covarXX, self.covarYY
self.covarYX, self.covarXY = self.covarXY, self.covarYX
# p(x), which is used to compute posterior prob. for a given source
# spectral feature in mapping stage.
self.px = GaussianMixture(
n_components=self.num_mixtures, covariance_type="full")
self.px.means_ = self.src_means
self.px.covariances_ = self.covarXX
self.px.weights_ = self.weights
self.px.precisions_cholesky_ = _compute_precision_cholesky(
self.px.covariances_, "full")
def em(data):
gmm = GaussianMixture(
n_components=6,
covariance_type="tied"
).fit(data)
predicted_data = gmm.predict(data)
print collections.Counter(predicted_data)
print metrics.silhouette_score(data, predicted_data)
reduced_data = reduce_with_pca(data, 2)
plot_2d_data(reduced_data, predicted_data)
def __init__(self, mean, std, num, n):
if mean != None:
self.gauMix = np.random.normal(mean[0], std[0], num[0])
self.gauMix = np.append(self.gauMix, np.random.normal(mean[1], std[1], num[1]))
self.gauMix = np.append(self.gauMix, np.random.normal(mean[2], std[2], num[2]))
if n == 4:
self.gauMix = np.append(self.gauMix, np.random.normal(mean[3], std[3], num[3]))
self.gmm = GaussianMixture(n_components=n, covariance_type="full", tol=0.001)
self.gmm = self.gmm.fit(X=np.expand_dims(self.gauMix, 1))
def __init__(self, mean, std, num):
if mean != None:
self.gauMix = np.random.normal(mean[0], std[0], num[0])
self.gauMix = np.append(self.gauMix, np.random.normal(mean[1], std[1], num[1]))
self.gauMix = np.append(self.gauMix, np.random.normal(mean[2], std[2], num[2]))
self.gmm = GaussianMixture(n_components=3, covariance_type="full", tol=0.001)
self.gmm = self.gmm.fit(X=np.expand_dims(self.gauMix, 1))
def __init__(self, mean, std, num):
if mean != None:
self.gauMix = np.random.normal(mean[0], std[0], num[0])
self.gauMix = np.append(self.gauMix, np.random.normal(mean[1], std[1], num[1]))
self.gauMix = np.append(self.gauMix, np.random.normal(mean[2], std[2], num[2]))
self.gmm = GaussianMixture(n_components=3, covariance_type="full", tol=0.001)
self.gmm = self.gmm.fit(X=np.expand_dims(self.gauMix, 1))
def __init__(self, mean, std, num):
if mean != None:
self.gauMix = np.random.normal(mean[0], std[0], num[0])
self.gauMix = np.append(self.gauMix, np.random.normal(mean[1], std[1], num[1]))
self.gauMix = np.append(self.gauMix, np.random.normal(mean[2], std[2], num[2]))
self.gmm = GaussianMixture(n_components=3, covariance_type="full", tol=0.001)
self.gmm = self.gmm.fit(X=np.expand_dims(self.gauMix, 1))
def __init__(self, mean, std, num, n):
if mean != None:
self.gauMix = np.random.normal(mean[0], std[0], num[0])
self.gauMix = np.append(self.gauMix, np.random.normal(mean[1], std[1], num[1]))
self.gauMix = np.append(self.gauMix, np.random.normal(mean[2], std[2], num[2]))
if n == 4:
self.gauMix = np.append(self.gauMix, np.random.normal(mean[3], std[3], num[3]))
self.gmm = GaussianMixture(n_components=n, covariance_type="full", tol=0.001)
self.gmm = self.gmm.fit(X=np.expand_dims(self.gauMix, 1))
def _gmm_clustering(self, X, n_components):
gmm = GaussianMixture(n_components=n_components)
gmm.fit(X)
return gmm.means_
def _gaussian_fit(array):
if array.shape[0] < 2:
logging.error("Cannot fit a Gaussian with two distances.")
return 0
array_2 = -(np.array(sorted(array)).reshape(-1, 1))
array = np.array(list(array_2) + list(array)).reshape(-1, 1)
try:
# new sklearn GMM
gmm = mixture.GaussianMixture(n_components=3,
covariance_type='diag')
gmm.fit(array)
# gmmmean = np.max(gmm.means_)
# gmmsigma = gmm.covariances_[np.argmax(gmm.means_)]
except AttributeError:
# use old sklearn method
gmm = mixture.GMM(n_components=3)
gmm.fit(array)
# gmmmean = np.max(gmm.means_)
# gmmsigma = gmm.covars_[np.argmax(gmm.means_)]
# Extract optimal threshold
plt.hist(array, bins=50, normed=True) # debug, print
lin = np.linspace(0, 1, 10000)[:, np.newaxis]
# plt.plot(lin, np.exp(gmm.score_samples(lin)[0]), 'r')
pred = gmm.predict(lin)
try:
idx = np.min(np.where(pred == np.argmax(gmm.means_))[0])
except ValueError:
# print("Error", np.unique(pred))
idx = 0
plt.axvline(x=lin[idx], linestyle='--', color='r')
# plt.gcf().savefig("threshold_naive{}.png".format(k))
plt.close()
threshold = lin[idx][0] # threshold
# np.save("threshold_naive", threshold)
return threshold
def vad_energy(log_energy, distrib_nb=3, nb_train_it=25):
""" Fitting Gaussian mixture model on the log-energy and the voice
activity is the component with highest energy.
Return
------
vad: array of 0, 1
threshold: scalar
"""
from sklearn.exceptions import ConvergenceWarning
from sklearn.mixture import GaussianMixture
# center and normalize the energy
log_energy = (log_energy - np.mean(log_energy)) / np.std(log_energy)
if log_energy.ndim == 1:
log_energy = log_energy[:, np.newaxis]
# create mixture model: diag, spherical
world = GaussianMixture(
n_components=distrib_nb, covariance_type='diag',
init_params='kmeans', max_iter=nb_train_it,
weights_init=np.ones(distrib_nb) / distrib_nb,
means_init=(-2 + 4.0 * np.arange(distrib_nb) / (distrib_nb - 1))[:, np.newaxis],
precisions_init=np.ones((distrib_nb, 1)),
)
try:
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=ConvergenceWarning)
world.fit(log_energy)
except (ValueError, IndexError): # index error because of float32 cumsum
if distrib_nb - 1 >= 2:
return vad_energy(log_energy, distrib_nb=distrib_nb - 1,
nb_train_it=nb_train_it)
return np.zeros(shape=(log_energy.shape[0],)), 0
# Compute threshold
threshold = world.means_.max() - \
__current_vad_mode * np.sqrt(1.0 / world.precisions_[world.means_.argmax(), 0])
# Apply frame selection with the current threshold
label = log_energy.ravel() > threshold
return label, threshold
def cluster_gaussian(X_train, model_args=None, gridsearch=True):
from sklearn.mixture import GaussianMixture
print('GaussianMixture')
if gridsearch is True:
param_grid = {
'n_components': np.arange(1, 20, 4),
'covariance_type': ['full', 'tied', 'diag', 'spherical'],
}
prune(param_grid, model_args)
else:
param_grid = None
return ModelWrapper(GaussianMixture, X=X_train, model_args=model_args, param_grid=param_grid, unsupervised=True)
def fit(self, X, y=None):
"""Fit the model according to the given training data.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Samples.
Returns
-------
self : detector
Return self.
"""
X = check_array(X)
self._gmm = GaussianMixture(
covariance_type = self.covariance_type,
max_iter = self.max_iter,
means_init = self.means_init,
n_components = self.n_components,
precisions_init = self.precisions_init,
random_state = self.random_state,
tol = self.tol,
warm_start = self.warm_start,
weights_init = self.weights_init
).fit(X)
self.y_score_ = self.anomaly_score(X)
self.threshold_ = np.percentile(
self.y_score_, 100.0 * (1.0 - self.fpr)
)
return self
community_embeddings.py 文件源码
项目:nodeembedding-to-communityembedding
作者: andompesta
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def __init__(self, model, lr, reg_covar=0):
self.lr = lr
self.g_mixture = mixture.GaussianMixture(n_components=model.k, reg_covar=reg_covar, covariance_type='full', n_init=10)
def __init__(self, input, scipy_gmm, phase_train=None):
self.scipy_gmm = scipy_gmm
# This actually checks that we are using the newest version of sklearn as well as the right object type.
assert(type(scipy_gmm) == mixture.GaussianMixture)
super(GMM, self).__init__(input, phase_train=phase_train)
def test_GMM(*data):
'''
test the method of GMM
:param data: data , target
:return: None
'''
X,labels_true=data
clst=mixture.GaussianMixture()
clst.fit(X)
predicted_labels=clst.predict(X)
print("ARI:{0}".format(adjusted_rand_score(labels_true,predicted_labels)))
def test_GMM_cov_type(*data):
'''
test the performance with different cov_type
:param data: data, target
:return: None
'''
X,labels_true=data
nums=range(1,50)
cov_types=['spherical','tied','diag','full']
markers="+o*s"
fig=plt.figure()
ax=fig.add_subplot(1,1,1)
for i ,cov_type in enumerate(cov_types):
ARIs=[]
for num in nums:
clst=mixture.GaussianMixture(n_components=num,covariance_type=cov_type)
clst.fit(X)
predicted_labels=clst.predict(X)
ARIs.append(adjusted_rand_score(labels_true,predicted_labels))
ax.plot(nums,ARIs,marker=markers[i],label="covariance_type:{0}".format(cov_type))
ax.set_xlabel("n_components")
ax.legend(loc="best")
ax.set_ylabel("ARI")
fig.suptitle("GMM")
plt.show()
def transform(self, XY):
X, Y = XY
assert X.ndim == 3 and Y.ndim == 3
longer_features = X if X.shape[1] > Y.shape[1] else Y
Xc = X.copy() # this will be updated iteratively
X_aligned = np.zeros_like(longer_features)
Y_aligned = np.zeros_like(longer_features)
refined_paths = np.empty(len(X), dtype=np.object)
for idx in range(self.n_iter):
for idx, (x, y) in enumerate(zip(Xc, Y)):
x, y = trim_zeros_frames(x), trim_zeros_frames(y)
dist, path = fastdtw(x, y, radius=self.radius, dist=self.dist)
dist /= (len(x) + len(y))
pathx = list(map(lambda l: l[0], path))
pathy = list(map(lambda l: l[1], path))
refined_paths[idx] = pathx
x, y = x[pathx], y[pathy]
max_len = max(len(x), len(y))
if max_len > X_aligned.shape[1] or max_len > Y_aligned.shape[1]:
pad_size = max(
max_len - X_aligned.shape[1],
max_len > Y_aligned.shape[1])
X_aligned = np.pad(
X_aligned, [(0, 0), (0, pad_size), (0, 0)],
mode="constant", constant_values=0)
Y_aligned = np.pad(
Y_aligned, [(0, 0), (0, pad_size), (0, 0)],
mode="constant", constant_values=0)
X_aligned[idx][:len(x)] = x
Y_aligned[idx][:len(y)] = y
if self.verbose > 0:
print("{}, distance: {}".format(idx, dist))
# Fit
gmm = GaussianMixture(
n_components=self.n_components_gmm,
covariance_type="full", max_iter=self.max_iter_gmm)
XY = np.concatenate((X_aligned, Y_aligned),
axis=-1).reshape(-1, X.shape[-1] * 2)
gmm.fit(XY)
windows = [(0, 0, np.array([1.0]))] # no delta
paramgen = MLPG(gmm, windows=windows)
for idx in range(len(Xc)):
x = trim_zeros_frames(Xc[idx])
Xc[idx][:len(x)] = paramgen.transform(x)
# Finally we can get aligned X
for idx in range(len(X_aligned)):
x = X[idx][refined_paths[idx]]
X_aligned[idx][:len(x)] = x
return X_aligned, Y_aligned
def fit(self, X, y=None):
"""Learn a fisher vector encoding.
Fit a gaussian mixture model to the data using n_gaussians with
diagonal covariance matrices.
Parameters
----------
X : array_like or list
The local features to train on. They must be either nd arrays or
a list of nd arrays.
"""
X, _ = self._reshape_local_features(X)
if self.n_pca_components != 1:
# train PCA
self.pca_model = PCA(n_components=int(X.shape[-1]*self.n_pca_components))
self.pca_model.fit(X)
# apply PCA and reduce dimensionality
X = self.pca_model.transform(X)
# consider changing the initialization parameters
gmm = GaussianMixture(
n_components=self.n_gaussians,
max_iter=self.max_iter,
covariance_type='diag',
verbose=self.verbose
)
gmm.fit(X)
# save the results of the gmm
self.weights = gmm.weights_
self.means = gmm.means_
self.covariances = gmm.covariances_
# precompute some values for encoding
D = X[0].size
self.inverted_covariances = (1./self.covariances)
self.inverted_sqrt_covariances = np.sqrt(1./self.covariances)
self.normalization_factor = np.hstack([
np.repeat(1.0/np.sqrt(self.weights), D),
np.repeat(1.0/np.sqrt(2*self.weights), D)
])
return self
acousticModelTraining.py 文件源码
项目:jingjuSingingPhraseMatching
作者: ronggong
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def processAcousticModelTrainPho(class_name,syllableTierName,phonemeTierName,featureFilename,gmmModel_path):
'''
Monophonic acoustic model training
:param mode: sourceSeparation, qmLonUpfLaosheng
:param syllableTierName: 'pinyin', 'dian'
:param phonemeTierName: 'details'
:param featureFilename: 'dic_pho_feature_train.pkl'
:param gmmModel_path: in parameters.py
:return:
'''
# model training
recordings_train = getRecordingNamesSimi('TRAIN',class_name)
dic_pho_feature_train = dumpFeaturePho(class_name,recordings_train,syllableTierName,phonemeTierName)
output = open(featureFilename, 'wb')
pickle.dump(dic_pho_feature_train, output)
output.close()
# model loading
pkl_file = open(featureFilename, 'rb')
dic_pho_feature_train = pickle.load(pkl_file)
pkl_file.close()
n_comp = 40
g = mixture.GaussianMixture(n_components=n_comp,covariance_type='diag')
print len(dic_pho_feature_train.keys())
for ii,key in enumerate(dic_pho_feature_train):
# print key, dic_pho_feature_train[key].shape
print 'fitting gmm ', key, ' ', str(ii), ' of ', str(len(dic_pho_feature_train.keys()))
##-- try just fit the first dim of MFCC
# x = np.expand_dims(dic_pho_feature_train[key][:,0],axis=1)
x = dic_pho_feature_train[key]
print x.shape
if x.shape[0] > n_comp*5:
g.fit(x)
output = open(os.path.join(gmmModel_path,key+'.pkl'),'wb')
pickle.dump(g, output)
output.close()
else:
# yn not fitted, because too few samples
print(key+' is not fitted.')