def multinomial_entropy(probs, count):
"""Compute entropy of multinomial distribution with given probs and count.
Args:
probs: A 1-dimensional array of normalized probabilities.
count: The number of draws in a multinomial distribution.
Returns:
A number in [0, count * len(probs)] representing entropy.
"""
assert count > 0
multi_probs = probs
for _ in range(count - 1):
if len(probs) > 2:
raise NotImplementedError(
'Only categorical and binomial are supported')
multi_probs = np.convolve(multi_probs, probs)
return entropy(multi_probs)
python类entropy()的实例源码
def observed_perplexity(self, counts):
"""Compute perplexity = exp(entropy) of observed variables.
Perplexity is an information theoretic measure of the number of
clusters or latent classes. Perplexity is a real number in the range
[1, M], where M is model_num_clusters.
Args:
counts: A [V]-shaped array of multinomial counts.
Returns:
A [V]-shaped numpy array of perplexity.
"""
V, E, M, R = self._VEMR
if counts is not None:
counts = np.ones(V, dtype=np.int8)
assert counts.shape == (V, )
assert counts.dtype == np.int8
assert np.all(counts > 0)
observed_entropy = np.empty(V, dtype=np.float32)
for v in range(V):
beg, end = self._ragged_index[v:v + 2]
probs = np.dot(self._feat_cond[beg:end, :], self._vert_probs[v, :])
observed_entropy[v] = multinomial_entropy(probs, counts[v])
return np.exp(observed_entropy)
def observed_perplexity(self, counts):
"""Compute perplexity = exp(entropy) of observed variables.
Perplexity is an information theoretic measure of the number of
clusters or observed classes. Perplexity is a real number in the range
[1, dim[v]], where dim[v] is the number of categories in an observed
categorical variable or 2 for an ordinal variable.
Args:
counts: A [V]-shaped array of multinomial counts.
Returns:
A [V]-shaped numpy array of perplexity.
"""
result = self._ensemble[0].observed_perplexity(counts)
for server in self._ensemble[1:]:
result += server.observed_perplexity(counts)
result /= len(self._ensemble)
return result
def get_local_words(preds, vocab, NEs=[], k=50):
"""
given the word probabilities over many coordinates,
first normalize the probability of each word in different
locations to get a probability distribution, then compute
the entropy of the word's distribution over all coordinates
and return the words that are low entropy and are not
named entities.
"""
#normalize the probabilites of each vocab using entropy
normalized_preds = normalize(preds, norm='l1', axis=0)
entropies = stats.entropy(normalized_preds)
sorted_indices = np.argsort(entropies)
sorted_local_words = np.array(vocab)[sorted_indices].tolist()
filtered_local_words = []
NEset = set(NEs)
for word in sorted_local_words:
if word in NEset: continue
filtered_local_words.append(word)
return filtered_local_words[0:k]
def __query_by_committee(self, clf, X_unlabeled):
num_classes = len(clf[0].classes_)
C = len(clf)
preds = []
if self.strategy == 'vote_entropy':
for model in clf:
y_out = map(int, model.predict(X_unlabeled))
preds.append(np.eye(num_classes)[y_out])
votes = np.apply_along_axis(np.sum, 0, np.stack(preds)) / C
return np.apply_along_axis(entropy, 1, votes)
elif self.strategy == 'average_kl_divergence':
for model in clf:
preds.append(model.predict_proba(X_unlabeled))
consensus = np.mean(np.stack(preds), axis=0)
divergence = []
for y_out in preds:
divergence.append(entropy(consensus.T, y_out.T))
return np.apply_along_axis(np.mean, 0, np.stack(divergence))
def select(self, features, freq_table):
""" Select features via some criteria
:type features: dict
:param features: features vocab
:type freq_table: 2-D numpy.array
:param freq_table: frequency table with rows as features,
columns as frequency values
"""
if self.method == 'frequency':
feat_vals = self.frequency(features, freq_table)
elif self.method == 'entropy':
feat_vals = self.entropy(features, freq_table)
elif self.method == 'freq-entropy':
feat_vals = self.freq_entropy(features, freq_table)
else:
raise KeyError("Unrecognized method")
new_features = self.rank(feat_vals)
return new_features
def parse_feature(feature):
""" Parse feature string into
(feature name, [1st order aggregates], [2nd order aggregates]).
'Grammar':
- feature name and aggregates are separated by dots, e.g. 'mfcc.entropy'
- feature name is first and contains no dots
- first order and second order aggregates are separated by one of 2 keywords:
'corpus' or 'song'
Ex.:
>>> parse_features('loudness.mean.song.pdf.log')
('loudness', ['mean'], ['song', 'pdf', 'log'])
"""
s = np.array(feature.split('.'))
split_points = (s == 'corpus') | (s == 'song')
split_points = np.nonzero(split_points)[0] if any(split_points) else [len(s)]
return s[0], s[1:split_points[0]].tolist(), s[split_points[-1]:].tolist()
def jsd_opinions(co):
"""Calculate Jensen-Shannon divergence between (contrastive) opinions.
Implements Jensen-Shannon divergence between (contrastive) opinions as
described in [Fang et al., 2012] section 3.2.
Parameter:
co : numpy ndarray
A numpy ndarray containing (contrastive) opinions (see
contrastive_opinions(query, topics, opinions, nks))
Returns:
float
The Jensen-Shannon divergence between the contrastive opinions.
"""
logger.debug('calculate Jensen-Shannon divergence between (contrastive) '
'opinions')
nPerspectives = co.shape[1]
result = np.zeros(nPerspectives, dtype=np.float)
p_avg = np.mean(co, axis=1)
for persp in range(nPerspectives):
result[persp] = entropy(co[:, persp], qk=p_avg, base=2)
return np.mean(result)
def aga_expression_entropies(adata):
"""Compute the median expression entropy for each node-group.
Parameters
----------
adata : AnnData
Annotated data matrix.
Returns
-------
entropies : list
Entropies of median expressions for each node.
"""
from scipy.stats import entropy
groups_order, groups_masks = utils.select_groups(adata,
key=adata.uns['aga_groups_key'])
entropies = []
for mask in groups_masks:
X_mask = adata.X[mask]
x_median = np.median(X_mask, axis=0)
x_probs = (x_median - np.min(x_median)) / (np.max(x_median) - np.min(x_median))
entropies.append(entropy(x_probs))
return entropies
def build_interaction_graph(mallet_model, threshold):
g = networkx.Graph()
topic_matrix = model.theta
for i in xrange(topic_matrix.shape[1]):
print i
for j in xrange(i+1, topic_matrix.shape[1]):
divergence_ij = stats.entropy(topic_matrix[:,i], topic_matrix[:,j])
divergence_ji = stats.entropy(topic_matrix[:,j], topic_matrix[:,i])
# quick and dirty "symmetrization" plus inversion
inverse_divergence_sym = float(1/(divergence_ij + divergence_ji))
if inverse_divergence_sym >= threshold:
g.add_node(j, label=', '.join(mallet_model.list_topic(j, 3)))
g.add_edge(i, j, weight=inverse_divergence_sym)
else:
g.add_node(i)
for i in xrange(topic_matrix.shape[1]):
if len(g.edge[i]) == 0:
g.remove_node(i)
for i in xrange(topic_matrix.shape[1]):
if i in g.node and len(g.node[i]) == 0 and len(g.edge[i]) != 0:
print i
g.add_node(i, label=', '.join(mallet_model.list_topic(i, 3)))
return g
def correlation(probs):
"""Compute correlation rho(X,Y) = sqrt(1 - exp(-2 I(X;Y))).
Args:
probs: An [M, M]-shaped numpy array representing a joint distribution.
Returns:
A number in [0,1) representing the information-theoretic correlation.
"""
assert len(probs.shape) == 2
assert probs.shape[0] == probs.shape[1]
mutual_information = (entropy(probs.sum(0)) + entropy(probs.sum(1)) -
entropy(probs.flatten()))
return np.sqrt(1.0 - np.exp(-2.0 * mutual_information))
def observed_perplexity(self, counts):
"""Compute perplexity = exp(entropy) of observed variables."""
def latent_perplexity(self):
"""Compute perplexity = exp(entropy) of latent variables."""
def latent_perplexity(self):
"""Compute perplexity = exp(entropy) of latent variables.
Perplexity is an information theoretic measure of the number of
clusters or latent classes. Perplexity is a real number in the range
[1, M], where M is model_num_clusters.
Returns:
A [V]-shaped numpy array of perplexity.
"""
result = self._ensemble[0].latent_perplexity()
for server in self._ensemble[1:]:
result += server.latent_perplexity()
result /= len(self._ensemble)
return result
def observed_perplexity(self):
"""Compute perplexity = exp(entropy) of observed variables.
Perplexity is an information theoretic measure of the number of
clusters or observed classes. Perplexity is a real number in the range
[1, dim[v]], where dim[v] is the number of categories in an observed
categorical variable or 2 for an ordinal variable.
Returns:
A [V]-shaped numpy array of perplexity.
"""
return self._server.observed_perplexity(self._counts)
def latent_perplexity(self):
"""Compute perplexity = exp(entropy) of latent variables.
Perplexity is an information theoretic measure of the number of
clusters or latent classes. Perplexity is a real number in the range
[1, M], where M is model_num_clusters.
Returns:
A [V]-shaped numpy array of perplexity.
"""
return self._server.latent_perplexity()
def kl_divergence(p_samples, q_samples):
# estimate densities
# p_samples = np.nan_to_num(p_samples)
# q_samples = np.nan_to_num(q_samples)
if isinstance(p_samples, tuple):
idx, p_samples = p_samples
if idx not in _cached_p_pdf:
_cached_p_pdf[idx] = sc.gaussian_kde(p_samples)
p_pdf = _cached_p_pdf[idx]
else:
p_pdf = sc.gaussian_kde(p_samples)
q_pdf = sc.gaussian_kde(q_samples)
# joint support
left = min(min(p_samples), min(q_samples))
right = max(max(p_samples), max(q_samples))
p_samples_num = p_samples.shape[0]
q_samples_num = q_samples.shape[0]
# quantise
lin = np.linspace(left, right, min(max(p_samples_num, q_samples_num), MAX_GRID_POINTS))
p = p_pdf.pdf(lin)
q = q_pdf.pdf(lin)
# KL
kl = min(sc.entropy(p, q), MAX_KL)
return kl
def check_KL_divergence(topics, results, thresh):
for res in results:
minimized_KL = 1
for topic in topics:
KL = KL_divergence(topic, res)
if KL < minimized_KL:
minimized_KL = KL
print(minimized_KL)
assert minimized_KL < thresh
def check_KL_divergence(topics, results, thresh):
for res in results:
minimized_KL = 1
for topic in topics:
KL = KL_divergence(topic, res)
if KL < minimized_KL:
minimized_KL = KL
print(minimized_KL)
assert minimized_KL < thresh
def JSD(P, Q):
M = 0.5 * (P + Q)
return 0.5 * (entropy(P, M) + entropy(Q, M))
def get_local_words(preds, vocab, NEs=[], k=50):
#normalize the probabilites of each vocab
normalized_preds = normalize(preds, norm='l1', axis=0)
entropies = stats.entropy(normalized_preds)
sorted_indices = np.argsort(entropies)
sorted_local_words = np.array(vocab)[sorted_indices].tolist()
filtered_local_words = []
NEset = set(NEs)
for word in sorted_local_words:
if word in NEset: continue
filtered_local_words.append(word)
return filtered_local_words[0:k]
def calculate_entropy(self,labelfracs):
return stats.entropy(labelfracs)
def calculateIG(self,groups,labels):
# current entropy
labelfracs = self.obtain_labelfracs(labels)
current_entropy = self.calculate_entropy(labelfracs)
# entropy of each grouping
group_entropy = []
for group in groups:
labelfracs = self.obtain_labelfracs(group)
group_entropy.append((len(group)/len(labels)) * self.calculate_entropy(labelfracs))
infogain = current_entropy - sum(group_entropy)
return infogain
def compare_entropy(name_img1,name_img2,method="rmq"):
'''Compare two images by the Kullback-Leibler divergence
Parameters
----------
name_img1 : string
filename of image 1 (png format)
name_img2 : string
filename of image 2 (png format)
Returns
-------
S : float
Kullback-Leibler divergence S = sum(pk * log(pk / qk), axis=0)
Note
----
See http://docs.scipy.org/doc/scipy/reference/generated/scipy.stats.entropy.html
'''
img1 = mpimg.imread(name_img1)
img2 = mpimg.imread(name_img2)
fimg1 = img1.flatten()
fimg2 = img2.flatten()
if method == "KL-div":
eps = 0.0001
S = stats.entropy(fimg2+eps,fimg1+eps)
S = numpy.log10(S)
elif method == "rmq":
fdiff=fimg1-fimg2
fdiff_sqr = fdiff**4
S = (fdiff_sqr.sum())**(old_div(1.,4))
return S,fimg1, fimg2
def KLDivergenceSim(a,b,topics):
from scipy.stats import entropy
import math
a = fill_list_from_dict(a,topics)
b = fill_list_from_dict(b,topics)
entropyOf_A_to_B = entropy(a,b)
entropyOf_B_to_A = entropy(b,a)
minusSummedEntropy = -(entropyOf_A_to_B+entropyOf_B_to_A)
return math.exp(minusSummedEntropy)
def print_performance(self,Z,itr,Z_series=(1.,1.)):
Yhat = self.get_Yhat(Z)
fit = 1 - np.linalg.norm(self.Y - Yhat)**2/np.linalg.norm(self.Y)**2
r2 = (1 - distance.correlation(self.Y.flatten(),Yhat.flatten()))**2
print 'itr: %d, fit: %f, r2: %f, Z entropy: %f, Z min: %f, Z max: %f, Z change: %f' % (itr,fit,r2,
np.average([np.exp(entropy(abs(z))) for z in Z.T]),Z.min(),Z.max(),
np.linalg.norm(Z_series[-2] - Z_series[-1])/np.linalg.norm(Z_series[-2]))
self.fit = (fit,r2)
def __uncertainty_sampling(self, clf, X_unlabeled):
probs = clf.predict_proba(X_unlabeled)
if self.strategy == 'least_confident':
return 1 - np.amax(probs, axis=1)
elif self.strategy == 'max_margin':
margin = np.partition(-probs, 1, axis=1)
return -np.abs(margin[:,0] - margin[:, 1])
elif self.strategy == 'entropy':
return np.apply_along_axis(entropy, 1, probs)
def entropy(self, features, freq_table):
"""
"""
feat_vals = {}
for (feat, idx) in features.items():
freq = freq_table[idx, :]
feat_vals[feat] = 1 / (entropy(freq) + 1e-3)
return feat_vals
def freq_entropy(self, features, freq_table):
"""
"""
feat_vals = {}
feat_freqs = self.frequency(features, freq_table)
feat_ents = self.entropy(features, freq_table)
for feat in features.keys():
freq = feat_freqs[feat]
ent = feat_ents[feat]
feat_vals[feat] = numpy.log(freq + 1e-3) * (ent + 1e-3)
return feat_vals
def test():
vocab = {'hello': 0, 'data': 1, 'computer': 2}
freq_table = [[23, 23, 23, 23], [23, 1, 4, 5], [1, 34, 1, 1]]
freq_table = numpy.array(freq_table)
fs = FeatureSelector(topn=2, method='freq-entropy')
newvocab = fs.select(vocab, freq_table)
print(newvocab)