def mi(x, y, k=3, base=2):
""" Mutual information of x and y
x, y should be a list of vectors, e.g. x = [[1.3], [3.7], [5.1], [2.4]]
if x is a one-dimensional scalar and we have four samples
"""
assert len(x) == len(y), "Lists should have same length"
assert k <= len(x) - 1, "Set k smaller than num. samples - 1"
intens = 1e-10 # small noise to break degeneracy, see doc.
x = [list(p + intens * nr.rand(len(x[0]))) for p in x]
y = [list(p + intens * nr.rand(len(y[0]))) for p in y]
points = zip2(x, y)
# Find nearest neighbors in joint space, p=inf means max-norm
tree = ss.cKDTree(points)
dvec = [tree.query(point, k + 1, p=float('inf'))[0][k] for point in points]
a, b, c, d = avgdigamma(x, dvec), avgdigamma(y, dvec), digamma(k), digamma(len(x))
return (-a - b + c + d) / log(base)
python类digamma()的实例源码
def cmi(x, y, z, k=3, base=2):
""" Mutual information of x and y, conditioned on z
x, y, z should be a list of vectors, e.g. x = [[1.3], [3.7], [5.1], [2.4]]
if x is a one-dimensional scalar and we have four samples
"""
assert len(x) == len(y), "Lists should have same length"
assert k <= len(x) - 1, "Set k smaller than num. samples - 1"
intens = 1e-10 # small noise to break degeneracy, see doc.
x = [list(p + intens * nr.rand(len(x[0]))) for p in x]
y = [list(p + intens * nr.rand(len(y[0]))) for p in y]
z = [list(p + intens * nr.rand(len(z[0]))) for p in z]
points = zip2(x, y, z)
# Find nearest neighbors in joint space, p=inf means max-norm
tree = ss.cKDTree(points)
dvec = [tree.query(point, k + 1, p=float('inf'))[0][k] for point in points]
a, b, c, d = avgdigamma(zip2(x, z), dvec), avgdigamma(zip2(y, z), dvec), avgdigamma(z, dvec), digamma(k)
return (-a - b + c + d) / log(base)
def inv_psi(y, tol=1e-10, n_iter=100, psi=psi):
"""
Inverse digamma function
"""
from numpy import exp
from scipy.special import digamma
## initial value
if y < -5 / 3. - EULER_MASCHERONI:
x = -1 / (EULER_MASCHERONI + y)
else:
x = 0.5 + exp(y)
## Newton root finding
for dummy in range(n_iter):
z = digamma(x) - y
if abs(z) < tol:
break
x -= z / d_approx_psi(x)
return x
def mi(x, y, k=3, base=2):
"""Mutual information of x and y.
x,y should be a list of vectors, e.g. x = [[1.3], [3.7], [5.1], [2.4]]
if x is a one-dimensional scalar and we have four samples.
"""
assert len(x)==len(y), 'Lists should have same length.'
assert k <= len(x) - 1, 'Set k smaller than num samples - 1.'
intens = 1e-10 # Small noise to break degeneracy, see doc.
x = [list(p + intens*nr.rand(len(x[0]))) for p in x]
y = [list(p + intens*nr.rand(len(y[0]))) for p in y]
points = zip2(x,y)
# Find nearest neighbors in joint space, p=inf means max-norm.
tree = ss.cKDTree(points)
dvec = [tree.query(point, k+1, p=float('inf'))[0][k] for point in points]
a = avgdigamma(x,dvec)
b = avgdigamma(y,dvec)
c = digamma(k)
d = digamma(len(x))
return (-a-b+c+d) / log(base)
def cmi(x, y, z, k=3, base=2):
"""Mutual information of x and y, conditioned on z
x,y,z should be a list of vectors, e.g. x = [[1.3], [3.7], [5.1], [2.4]]
if x is a one-dimensional scalar and we have four samples
"""
assert len(x)==len(y), 'Lists should have same length.'
assert k <= len(x) - 1, 'Set k smaller than num samples - 1.'
intens = 1e-10 # Small noise to break degeneracy, see doc.
x = [list(p + intens*nr.rand(len(x[0]))) for p in x]
y = [list(p + intens*nr.rand(len(y[0]))) for p in y]
z = [list(p + intens*nr.rand(len(z[0]))) for p in z]
points = zip2(x,y,z)
# Find nearest neighbors in joint space, p=inf means max-norm.
tree = ss.cKDTree(points)
dvec = [tree.query(point, k+1, p=float('inf'))[0][k] for point in points]
a = avgdigamma(zip2(x,z), dvec)
b = avgdigamma(zip2(y,z), dvec)
c = avgdigamma(z,dvec)
d = digamma(k)
return (-a-b+c+d) / log(base)
def _Eq5(_lambda,K,_x_u,m_pre_c,shot_comments_vector,user_comment,_comment_2_user_matrix,_n_t_c):
m_pre_s=Eq._Eq2(_lambda,K)
print _n_t_c
#V*C
#cacluate _term_2 comment in every shot
total=[]
for i,users in enumerate(_comment_2_user_matrix):
#sum all comment in one shot
_rows=np.zeros(K)
for j,user in enumerate(users):
#x_u
x_u=_x_u[user_comment.keys().index(user)]
shared_term=x_u*_lambda[i]+m_pre_c[i][j]
_rows+=x_u*dlgt(shared_term)* \
(digamma(np.sum(lgt(shared_term)))\
-digamma(np.sum(lgt(shared_term))+np.sum(shot_comments_vector[i][j]))\
+digamma(lgt(shared_term)+_n_t_c[i][j])\
-digamma(lgt(shared_term)))
total.append(_rows)
_term = -1 * _lambda - m_pre_s+np.array(total)
return _term
def _Eq6(_lambda,K,_x_u,m_pre_c,user_comment,_comment_2_user_matrix,shot_comments_vector,_n_t_c):
#traverse all users
total=[]
for index,key in enumerate(user_comment):
temp=np.zeros(K)
for comment in user_comment[key]:
i,j=comment[0],comment[1]
shared_term=_x_u[index]*_lambda[i]+m_pre_c[i][j]
temp+=_lambda[i]*dlgt(shared_term)\
*(digamma(np.sum(lgt(shared_term))\
-digamma(np.sum(lgt(shared_term))+np.sum(shot_comments_vector[i][j]))
+digamma(lgt(shared_term)+_n_t_c[i][j])\
-digamma(lgt(shared_term))))
total.append(-1*_x_u[index] +temp)
return np.array(total)
def _calc_gamma_psi(log_d, alpha, log_beta, gamma, log_psi0):
log_psi = log_psi0
count = 0
#print ((np.exp(log_psi1 - log_psi) ** 2).sum())
while count == 0 \
or ((np.exp(log_psi1) - np.exp(log_psi)) ** 2).sum() > 0.001 \
or ((gamma1 - gamma) ** 2).sum() > 0.001:
#print ('gamma psi:', count, ((np.exp(log_psi1) - np.exp(log_psi)) ** 2).sum())
log_psi1 = log_psi
gamma1 = gamma
psi_offset = (digamma(gamma))[:, np.newaxis, np.newaxis, :]
log_psi = log_beta[np.newaxis, :, :, :] + psi_offset
log_psi = log_normalize(log_psi, axis=3)
gamma = np.exp(logsumexp(logsumexp(log_d[:, :, :, np.newaxis] + log_psi, axis=1), axis=1)) + alpha[np.newaxis, :]
count += 1
#log_psi = np.average([log_psi0, log_psi], axis=0, weights=[0.9, 0.1]) # weak learning
return (gamma, log_psi)
def _calc_alpha_beta(log_d, alpha0, log_beta0, gamma, log_psi):
log_beta = logsumexp(log_psi + log_d[:, :, :, np.newaxis], axis=0)
log_beta = log_normalize(log_beta, axis=1)
log_smooth = np.log(10)
alpha = alpha0
N = gamma.shape[0]
zero = 1e-30
gamma_digamma_sum = digamma(gamma.sum(axis=1))[:, np.newaxis]
g_offset = (digamma(gamma) - gamma_digamma_sum).sum(axis=0) / N
# using log
def next_alpha(alpha):
das = digamma(alpha.sum())
g = alpha * N * (das - digamma(alpha) + g_offset)
h = alpha * N * (das + g_offset)
z = N * das
x = (alpha * g / h).sum()
w = (alpha ** 2 / h).sum()
return np.exp(np.log(alpha) - (g - x * alpha / (1/z + w)) / h)
return (alpha, log_beta)
def p_overlap(self, s_overlap, motif_st, l_overlap, is_rc=False):
""" p_overlap caclute the probability of a given sequence segment
overlapping with a motif, given the alignment.
Args:
s_overlap: string representing overlapping portion of the motif
motif_st: the begin idx of the motif
l_overlap: the length of the alignment.
is_rc: if the reverse complementary motif should be used.
Returns:
a float representation of the likelihood of observing
overlapping sequence given this alignment.
"""
motif_end = motif_st + l_overlap
pwm = self.lmbda_rc if is_rc else self.lmbda
pwm_overlap = np.array(pwm)[motif_st+1:motif_end+1]
assert len(pwm_overlap) == len(s_overlap)
prod = 1.0
for base, rho in zip(s_overlap, pwm_overlap):
prod *= np.exp(special.digamma(rho[tools.IDX[base]])-
special.digamma(sum(rho)))
return prod
def motif_aligns_and_ps(kmer, m, m_idx, phi, kmmm):
"""motif_aligns_and_ps finds all possible aligments of the motif and the
kmer and returns likelihoods of each alignment."""
alignments, align_ps = m.score_all_alignments(kmer, phi)
if kmmm.variational:
assert m.variational
component_term = np.exp(special.digamma(float(kmmm.theta[m_idx])/len(align_ps)))
align_ps = [p*component_term for p in align_ps]
else:
assert not m.variational
align_ps = [p*kmmm.gamma[m_idx] for p in align_ps]
if kmer != tools.rev_comp(kmer):
# we do not need to do this for reverse palandromic
# seqeunces because their counts have not been collapsed
# with reverse complements.
align_ps = [p*2.0 for p in align_ps]
return align_ps, alignments
def calcELBOSingleDoc_Fast(wc_d, DocTopicCount_d, Prior_d, sumR_d, alphaEbeta):
''' Evaluate ELBO contributions for single doc, dropping terms constant wrt local step.
Note: key to some progress was remembering that Prior_d is not just exp(ElogPi)
but that it is usually altered by a multiplicative constant for safety
we can find this constant offset (in logspace), and adjust sumR_d accordingly
'''
theta_d = DocTopicCount_d + alphaEbeta[:-1]
thetaRem = alphaEbeta[-1]
digammaSum = digamma(theta_d.sum() + thetaRem)
ElogPi_d = digamma(theta_d) - digammaSum
ElogPiRem = digamma(thetaRem) - digammaSum
cDir = -1 * c_Dir(theta_d[np.newaxis,:], thetaRem)
slackOn = np.inner(DocTopicCount_d + alphaEbeta[:-1] - theta_d,
ElogPi_d.flatten())
slackOff = (alphaEbeta[-1] - thetaRem) * ElogPiRem
rest = np.inner(wc_d, np.log(sumR_d)) - np.inner(DocTopicCount_d, np.log(Prior_d+1e-100))
return cDir + slackOn + slackOff + rest
def L_hdp(beta, omega, Tvec, alpha):
''' Compute top-tier of hdp bound.
'''
K = omega.size
assert K == beta.size
assert K == Tvec.size
rho = OROB.beta2rho(beta, K)
eta1 = rho * omega
eta0 = (1-rho) * omega
digamma_omega = digamma(omega)
digamma_eta1 = digamma(eta1)
digamma_eta0 = digamma(eta0)
ElogU = digamma_eta1 - digamma_omega
Elog1mU = digamma_eta0 - digamma_omega
Lscore = \
np.sum(gammaln(eta1) + gammaln(eta0) - gammaln(omega)) \
+ np.inner(nDoc + 1 - eta1, ElogU) \
+ np.inner(nDoc * OROB.kvec(K) + gamma - eta0, Elog1mU) \
+ alpha * np.inner(beta, Tvec)
return Lscore
def _E_logdetL(self, k=None):
dvec = np.arange(1, self.D + 1, dtype=np.float)
if k is 'all':
dvec = dvec[:, np.newaxis]
retVec = self.D * LOGTWO * np.ones(self.K)
for kk in xrange(self.K):
retVec[kk] -= self.GetCached('logdetB', kk)
nuT = self.Post.nu[np.newaxis, :]
retVec += np.sum(digamma(0.5 * (nuT + 1 - dvec)), axis=0)
return retVec
elif k is None:
nu = self.Prior.nu
else:
nu = self.Post.nu[k]
return self.D * LOGTWO \
- self.GetCached('logdetB', k) \
+ np.sum(digamma(0.5 * (nu + 1 - dvec)))
def _E_logphiT(self, k=None):
''' Calculate transpose of topic-word matrix
Important to make a copy of the matrix so it is C-contiguous,
which leads to much much faster matrix operations.
Returns
-------
ElogphiT : 2D array, vocab_size x K
'''
if k is None or k == 'prior':
lam = self.Prior.lam
ElogphiT = digamma(lam) - digamma(np.sum(lam))
elif k == 'all':
ElogphiT = self.Post.lam.T.copy()
digamma(ElogphiT, out=ElogphiT)
digammaColSumVec = digamma(np.sum(self.Post.lam, axis=1))
ElogphiT -= digammaColSumVec[np.newaxis,:]
else:
ElogphiT = digamma(self.Post.lam[k]) - \
digamma(self.Post.lam[k].sum())
assert ElogphiT.flags.c_contiguous
return ElogphiT
def _E_logphiT(self, k=None):
''' Calculate transpose of expected phi matrix
Important to make a copy of the matrix so it is C-contiguous,
which leads to much much faster matrix operations.
Returns
-------
ElogphiT : 2D array, vocab_size x K
'''
if k == 'all':
dlam1T = self.Post.lam1.T.copy()
dlambothT = self.Post.lam0.T.copy()
dlambothT += dlam1T
digamma(dlam1T, out=dlam1T)
digamma(dlambothT, out=dlambothT)
return dlam1T - dlambothT
ElogphiT = self._E_logphi(k).T.copy()
return ElogphiT
def _E_log1mphiT(self, k=None):
''' Calculate transpose of expected 1-minus-phi matrix
Important to make a copy of the matrix so it is C-contiguous,
which leads to much much faster matrix operations.
Returns
-------
ElogphiT : 2D array, vocab_size x K
'''
if k == 'all':
# Copy so lam1T/lam0T are C-contig and can be shared mem.
lam1T = self.Post.lam1.T.copy()
lam0T = self.Post.lam0.T.copy()
return digamma(lam0T) - digamma(lam1T + lam0T)
ElogphiT = self._E_log1mphi(k).T.copy()
return ElogphiT
def _E_logdetL(self, k=None):
dvec = np.arange(1, self.D + 1, dtype=np.float)
if k == 'all':
dvec = dvec[:, np.newaxis]
retVec = self.D * LOGTWO * np.ones(self.K)
for kk in xrange(self.K):
retVec[kk] -= self.GetCached('logdetB', kk)
nuT = self.Post.nu[np.newaxis, :]
retVec += np.sum(digamma(0.5 * (nuT + 1 - dvec)), axis=0)
return retVec
elif k is None:
nu = self.Prior.nu
else:
nu = self.Post.nu[k]
return self.D * LOGTWO \
- self.GetCached('logdetB', k) \
+ np.sum(digamma(0.5 * (nu + 1 - dvec)))
def _E_logL(self, k=None):
'''
Returns
-------
E_logL : 1D array, size D
'''
if k == 'all':
# retVec : K x D
retVec = LOGTWO - np.log(self.Post.beta.copy()) # no strided!
retVec += digamma(0.5 * self.Post.nu)[:, np.newaxis]
return retVec
elif k is None:
nu = self.Prior.nu
beta = self.Prior.beta
else:
nu = self.Post.nu[k]
beta = self.Post.beta[k]
return LOGTWO - np.log(beta) + digamma(0.5 * nu)
def _E_logdetL(self, k=None):
dvec = np.arange(1, self.D + 1, dtype=np.float)
if k == 'all':
dvec = dvec[:, np.newaxis]
retVec = self.D * LOGTWO * np.ones(self.K)
for kk in xrange(self.K):
retVec[kk] -= self.GetCached('logdetB', kk)
nuT = self.Post.nu[np.newaxis, :]
retVec += np.sum(digamma(0.5 * (nuT + 1 - dvec)), axis=0)
return retVec
elif k is None:
nu = self.Prior.nu
else:
nu = self.Post.nu[k]
return self.D * LOGTWO \
- self.GetCached('logdetB', k) \
+ np.sum(digamma(0.5 * (nu + 1 - dvec)))
def get_vlb(self):
# E[ln p(mu) / q(mu)] part
h_0, J_0, h_mf, J_mf = self.h_0, self.J_0, self.h_mf, self.J_mf
Emu, Emu2 = self._E_mu
p_mu_avgengy = -1./2*J_0*Emu2 + h_0*Emu \
- 1./2*(h_0**2/J_0) + 1./2*np.log(J_0) - 1./2*np.log(2*np.pi)
q_mu_entropy = 1./2*np.log(2*np.pi*np.e/J_mf)
# E[ln p(sigmasq) / q(sigmasq)] part
alpha_0, beta_0, alpha_mf, beta_mf = \
self.alpha_0, self.beta_0, self.alpha_mf, self.beta_mf
(Esigmasqinv, Elnsigmasq) = self._E_sigmasq
p_sigmasq_avgengy = (-alpha_0-1)*Elnsigmasq + (-beta_0)*Esigmasqinv \
- (special.gammaln(alpha_0) - alpha_0*np.log(beta_0))
q_sigmasq_entropy = alpha_mf + np.log(beta_mf) + special.gammaln(alpha_mf) \
- (1+alpha_mf)*special.digamma(alpha_mf)
return p_mu_avgengy + q_mu_entropy + p_sigmasq_avgengy + q_sigmasq_entropy
def get_vlb(self):
# E[ln p(mu) / q(mu)] part
h_0, J_0, h_mf, J_mf = self.h_0, self.J_0, self.h_mf, self.J_mf
Emu, Emu2 = self._E_mu
p_mu_avgengy = -1./2*J_0*Emu2 + h_0*Emu \
- 1./2*(h_0**2/J_0) + 1./2*np.log(J_0) - 1./2*np.log(2*np.pi)
q_mu_entropy = 1./2*np.log(2*np.pi*np.e/J_mf)
# E[ln p(sigmasq) / q(sigmasq)] part
alpha_0, beta_0, alpha_mf, beta_mf = \
self.alpha_0, self.beta_0, self.alpha_mf, self.beta_mf
(Esigmasqinv, Elnsigmasq) = self._E_sigmasq
p_sigmasq_avgengy = (-alpha_0-1)*Elnsigmasq + (-beta_0)*Esigmasqinv \
- (special.gammaln(alpha_0) - alpha_0*np.log(beta_0))
q_sigmasq_entropy = alpha_mf + np.log(beta_mf) + special.gammaln(alpha_mf) \
- (1+alpha_mf)*special.digamma(alpha_mf)
return np.sum(p_mu_avgengy + q_mu_entropy + p_sigmasq_avgengy + q_sigmasq_entropy)
def entropy(x, k=3, base=2):
""" The classic K-L k-nearest neighbor continuous entropy estimator
x should be a list of vectors, e.g. x = [[1.3], [3.7], [5.1], [2.4]]
if x is a one-dimensional scalar and we have four samples
"""
assert k <= len(x) - 1, "Set k smaller than num. samples - 1"
d = len(x[0])
N = len(x)
intens = 1e-10 # small noise to break degeneracy, see doc.
x = [list(p + intens * nr.rand(len(x[0]))) for p in x]
tree = ss.cKDTree(x)
nn = [tree.query(point, k + 1, p=float('inf'))[0][k] for point in x]
const = digamma(N) - digamma(k) + d * log(2)
return (const + d * np.mean(map(log, nn))) / log(base)
def avgdigamma(points, dvec):
# This part finds number of neighbors in some radius in the marginal space
# returns expectation value of <psi(nx)>
N = len(points)
tree = ss.cKDTree(points)
avg = 0.
for i in range(N):
dist = dvec[i]
# subtlety, we don't include the boundary point,
# but we are implicitly adding 1 to kraskov def bc center point is included
num_points = len(tree.query_ball_point(points[i], dist - 1e-15, p=float('inf')))
avg += digamma(num_points) / N
return avg
def summand(k, b):
return digamma(k)*poisson.pmf(k, b)
def summand(k, b):
return digamma(k)*poisson.pmf(k, b)
def __fixed_point_iteration(self, imps, clks, alpha, beta):
numerator_alpha = 0.0
numerator_beta = 0.0
denominator = 0.0
for i in range(len(imps)):
numerator_alpha += (special.digamma(clks[i]+alpha) - special.digamma(alpha))
numerator_beta += (special.digamma(imps[i]-clks[i]+beta) - special.digamma(beta))
denominator += (special.digamma(imps[i]+alpha+beta) - special.digamma(alpha+beta))
return alpha*(numerator_alpha/denominator), beta*(numerator_beta/denominator)
def _calc_entropy(self, thetas_ss, n_acc, k):
"""Calculate the entropy as described in Nunes & Balding, 2010.
E = log( pi^(q/2) / gamma(q/2+1) ) - digamma(k) + log(n)
+ q/n * sum_{i=1}^n( log(R_{i, k}) ), where
R_{i, k} is the Euclidean distance from the parameter theta_i to
its kth nearest neighbour;
q is the dimensionality of the parameter; and
n is the number of the accepted parameters n_acc in the rejection sampling.
Parameters
----------
thetas_ss : array_like
Parameters accepted upon the rejection sampling using
the summary-statistics combination ss.
n_acc : int
Number of the accepted parameters.
k : int
Nearest neighbour to be searched.
Returns
-------
float
Entropy.
"""
q = thetas_ss.shape[1]
# Calculate the distance to the kth nearest neighbour across all accepted parameters.
searcher_knn = cKDTree(thetas_ss)
sum_log_dist_knn = 0
for theta_ss in thetas_ss:
dist_knn = searcher_knn.query(theta_ss, k=k)[0][-1]
sum_log_dist_knn += np.log(dist_knn)
# Calculate the entropy.
E = np.log(np.pi**(q / 2) / gamma((q / 2) + 1)) - digamma(k) \
+ np.log(n_acc) + (q / n_acc) * sum_log_dist_knn
return E
def _KSG_mi(data,split,k=5):
'''
Estimate the mutual information I(X;Y) from samples {x_i,y_i}_{i=1}^N
Using KSG mutual information estimator
Input: data: 2D list of size N*(d_x + d_y)
split: should be d_x, splitting the data into two parts, X and Y
k: k-nearest neighbor parameter
Output: one number of I(X;Y)
'''
assert split >=1, "x must have at least one dimension"
assert split <= len(data[0]) - 1, "y must have at least one dimension"
N = len(data)
x = data[:,:split]
y = data[:,split:]
dx = len(x[0])
dy = len(y[0])
tree_xy = ss.cKDTree(data)
tree_x = ss.cKDTree(x)
tree_y = ss.cKDTree(y)
knn_dis = [tree_xy.query(point,k+1,p=2)[0][k] for point in data]
ans = digamma(k) + log(N) + vd(dx,2) + vd(dy,2) - vd(dx+dy,2)
for i in range(N):
ans += -log(len(tree_y.query_ball_point(y[i],knn_dis[i],p=2))-1)/N - log(len(tree_x.query_ball_point(x[i],knn_dis[i],p=2))-1)/N
return ans
def f_hyperprior_log_prima(x, weight_prod, K, a=1, b=1):
"""
Derivative of Log distribution
"""
res = (a-1) *1. / x - b + np.log(weight_prod) + K * np.log(K) - K * special.digamma(x)
for i in range(K):
res += special.digamma(x + i*1./K)
return res