def prior_contribution_coefficients(self, state):
""" Calculate prior contribution from regression coefficients.
Log scale.
"""
# This will need to be revised if we allow different
# variances for different classes of coefficients, eg
# microbiome related and host-covariate related
dimensions = len(state.beta)
normalization = -0.5*dimensions*(
np.log(2.0*np.pi*self.prior_coefficient_variance)
)
exponent = (-0.5*np.dot(state.beta, state.beta) /
(self.prior_coefficient_variance))
return normalization + exponent
python类beta()的实例源码
def update_empty_probability(self):
""" Update Theta_0 from its prior based on length of the rule list.
"""
if len(self.current_state.rl) > 0:
# effectively, the 'emptiness' trial has failed
conditional_a = self.model.hyperparameter_a_empty
conditional_b = self.model.hyperparameter_b_empty + 1
else:
conditional_a = self.model.hyperparameter_a_empty + 1
conditional_b = self.model.hyperparameter_b_empty
new_empty_probability = scipy.stats.beta.rvs(
conditional_a,
conditional_b
)
self.current_state.empty_probability = new_empty_probability
def ecc_prior_fn(self, ecc):
## UPGRADE TO INCLUDE FURTHER STRINGS TO INDICATE ECC PRIOR OPTIONS FROM KEPLER AND OTHER SURVEY RESULTS #$$$$$$$$$$$$$$$$$$$$
ret = 1.0
if (self.e_prior==False) or (self.e_prior=="uniform"):
ret = self.uniform_fn(ecc,4)
else:
if ecc!=0:
if (self.e_prior == True) or (self.e_prior=='beta'):
ret = self.ecc_beta.pdf(ecc)
elif self.e_prior == '2e':
if (self.mins_ary[6]*self.days_per_year)>1000.0:
ret = 2.0*ecc
elif self.e_prior=='STO8':
ret =(1.0/self.ecc_ST08_k)*(1.0/(1.0+ecc)**self.ecc_ST08_a)-(ecc/2.0**self.ecc_ST08_a)
elif self.e_prior=='J08':
ret = self.ecc_R.pdf(ecc,scale=self.ecc_R_sig)
elif self.e_prior=='RayExp':
A = self.ecc_Rexp_a*self.ecc_exp.pdf(ecc, scale=1.0/self.ecc_Rexp_lamda)
B = (1.0-self.ecc_Rexp_a)*self.ecc_R.pdf(ecc,scale=self.ecc_Rexp_sig)/self.ecc_Rexp_sig
ret = A+B
#if ret==0: ret=-np.inf
return ret
def ecc_prior_fn(self, ecc):
## UPGRADE TO INCLUDE FURTHER STRINGS TO INDICATE ECC PRIOR OPTIONS FROM KEPLER AND OTHER SURVEY RESULTS #$$$$$$$$$$$$$$$$$$$$
ret = 1.0
if (self.e_prior==False) or (self.e_prior=="uniform"):
ret = self.uniform_fn(ecc,4)
else:
if ecc!=0:
if (self.e_prior == True) or (self.e_prior=='beta'):
ret = self.ecc_beta.pdf(ecc)
elif self.e_prior == '2e':
if (self.mins_ary[6]*self.days_per_year)>1000.0:
ret = 2.0*ecc
elif self.e_prior=='STO8':
ret =(1.0/self.ecc_ST08_k)*(1.0/(1.0+ecc)**self.ecc_ST08_a)-(ecc/2.0**self.ecc_ST08_a)
elif self.e_prior=='J08':
ret = self.ecc_R.pdf(ecc,scale=self.ecc_R_sig)
elif self.e_prior=='RayExp':
A = self.ecc_Rexp_a*self.ecc_exp.pdf(ecc, scale=1.0/self.ecc_Rexp_lamda)
B = (1.0-self.ecc_Rexp_a)*self.ecc_R.pdf(ecc,scale=self.ecc_Rexp_sig)/self.ecc_Rexp_sig
ret = A+B
#if ret==0: ret=-np.inf
return ret
def gen_x_draws(k):
"""
Returns a flat array containing k independent draws from the
distribution of X, the underlying random variable. This distribution is
itself a convex combination of three beta distributions.
"""
bdraws = beta_dist.rvs((3, k))
# == Transform rows, so each represents a different distribution == #
bdraws[0, :] -= 0.5
bdraws[1, :] += 0.6
bdraws[2, :] -= 1.1
# == Set X[i] = bdraws[j, i], where j is a random draw from {0, 1, 2} == #
js = np.random.random_integers(0, 2, size=k)
X = bdraws[js, np.arange(k)]
# == Rescale, so that the random variable is zero mean == #
m, sigma = X.mean(), X.std()
return (X - m) / sigma
def main():
fig = plt.figure()
x = np.linspace(0, 1, 100)
sizes = [1, 2, 20, 50]
fig_row, fig_col = 2, 4
# Mean of i.i.d uniform
for i, n in enumerate(sizes):
ax = fig.add_subplot(fig_row, fig_col, i + 1)
data, gaussian = central_limit(uniform_dist.rvs, n, 1000)
ax.hist(data, bins=20, normed=True, alpha=0.7)
plt.plot(x, gaussian.pdf(x), 'r')
plt.title('n={0}'.format(n))
# Mean of i.i.d beta(1, 2)
for i, n in enumerate(sizes):
ax = fig.add_subplot(fig_row, fig_col, i + fig_col + 1)
data, gaussian = central_limit(beta_dist(1, 2).rvs, n, 1000)
ax.hist(data, bins=20, normed=True, alpha=0.7)
plt.plot(x, gaussian.pdf(x), 'r')
plt.title('n={0}'.format(n))
plt.show()
def test_beta_binomial_two_identical_models(db_path, sampler):
binomial_n = 5
def model_fun(args):
return {"result": st.binom(binomial_n, args.theta).rvs()}
models = [model_fun for _ in range(2)]
models = list(map(SimpleModel, models))
population_size = ConstantPopulationSize(800)
parameter_given_model_prior_distribution = [Distribution(theta=st.beta(
1, 1))
for _ in range(2)]
abc = ABCSMC(models, parameter_given_model_prior_distribution,
MinMaxDistanceFunction(measures_to_use=["result"]),
population_size,
eps=MedianEpsilon(.1),
sampler=sampler)
abc.new(db_path, {"result": 2})
minimum_epsilon = .2
history = abc.run(minimum_epsilon, max_nr_populations=3)
mp = history.get_model_probabilities(history.max_t)
assert abs(mp.p[0] - .5) + abs(mp.p[1] - .5) < .08
def test_all_in_one_model(db_path, sampler):
models = [AllInOneModel() for _ in range(2)]
population_size = ConstantPopulationSize(800)
parameter_given_model_prior_distribution = [Distribution(theta=RV("beta",
1, 1))
for _ in range(2)]
abc = ABCSMC(models, parameter_given_model_prior_distribution,
MinMaxDistanceFunction(measures_to_use=["result"]),
population_size,
eps=MedianEpsilon(.1),
sampler=sampler)
abc.new(db_path, {"result": 2})
minimum_epsilon = .2
history = abc.run(minimum_epsilon, max_nr_populations=3)
mp = history.get_model_probabilities(history.max_t)
assert abs(mp.p[0] - .5) + abs(mp.p[1] - .5) < .08
def test_beta_binomial_two_identical_models_adaptive(db_path, sampler):
binomial_n = 5
def model_fun(args):
return {"result": st.binom(binomial_n, args.theta).rvs()}
models = [model_fun for _ in range(2)]
models = list(map(SimpleModel, models))
population_size = AdaptivePopulationSize(800)
parameter_given_model_prior_distribution = [
Distribution(theta=st.beta(1, 1)) for _ in range(2)]
abc = ABCSMC(models, parameter_given_model_prior_distribution,
MinMaxDistanceFunction(measures_to_use=["result"]),
population_size,
eps=MedianEpsilon(.1),
sampler=sampler)
abc.new(db_path, {"result": 2})
minimum_epsilon = .2
history = abc.run(minimum_epsilon, max_nr_populations=3)
mp = history.get_model_probabilities(history.max_t)
assert abs(mp.p[0] - .5) + abs(mp.p[1] - .5) < .08
dirichlet_test.py 文件源码
项目:DeepLearning_VirtualReality_BigData_Project
作者: rashmitripathi
项目源码
文件源码
阅读 59
收藏 0
点赞 0
评论 0
def testDirichletSample(self):
with self.test_session():
alpha = [1., 2]
dirichlet = dirichlet_lib.Dirichlet(alpha)
n = constant_op.constant(100000)
samples = dirichlet.sample(n)
sample_values = samples.eval()
self.assertEqual(sample_values.shape, (100000, 2))
self.assertTrue(np.all(sample_values > 0.0))
self.assertLess(
stats.kstest(
# Beta is a univariate distribution.
sample_values[:, 0],
stats.beta(
a=1., b=2.).cdf)[0],
0.01)
beta_test.py 文件源码
项目:DeepLearning_VirtualReality_BigData_Project
作者: rashmitripathi
项目源码
文件源码
阅读 28
收藏 0
点赞 0
评论 0
def testBetaSample(self):
with self.test_session():
a = 1.
b = 2.
beta = beta_lib.Beta(a, b)
n = constant_op.constant(100000)
samples = beta.sample(n)
sample_values = samples.eval()
self.assertEqual(sample_values.shape, (100000,))
self.assertFalse(np.any(sample_values < 0.0))
self.assertLess(
stats.kstest(
# Beta is a univariate distribution.
sample_values,
stats.beta(
a=1., b=2.).cdf)[0],
0.01)
# The standard error of the sample mean is 1 / (sqrt(18 * n))
self.assertAllClose(
sample_values.mean(axis=0), stats.beta.mean(a, b), atol=1e-2)
self.assertAllClose(
np.cov(sample_values, rowvar=0), stats.beta.var(a, b), atol=1e-1)
# Test that sampling with the same seed twice gives the same results.
def likelihood_z_given_rl(self, omega, rl):
""" Log-likelihood of working response kappa/omega given rl.
Integrates out beta.
I think this terminology is somewhat misleading and will revise it.
"""
X = self.data.covariate_matrix(rl)
return self.likelihood_z_given_X(omega,X)
def likelihood_y_given_X_beta(self, X, beta):
psi = np.dot(X,beta)
probabilities = 1.0/(1.0+np.exp(-1.0*psi))
return np.sum(
np.log(np.where(self.data.y,probabilities,1.0-probabilities))
)
def prior_contribution_empty_probability(self, state):
return scipy.stats.beta.logpdf(
state.empty_probability,
self.hyperparameter_a_empty,
self.hyperparameter_b_empty
)
def flat_prior(self, state):
""" Evaluate log-probability of each primitive in the population.
Return value is properly normalized.
"""
raw_phylogeny_weights = self.rule_population.flat_variable_weights
phylogeny_weights = scipy.stats.norm.logpdf(
np.log(raw_phylogeny_weights),
loc = state.phylogeny_mean,
scale = state.phylogeny_std
)
total_duration = float(self.data.experiment_end - self.data.experiment_start)
durations = (self.rule_population.flat_durations /
((1.0+self.window_duration_epsilon)*total_duration)
)
window_a = (
state.window_concentration *
state.window_typical_fraction
)
window_b = (
state.window_concentration *
(1.0-state.window_typical_fraction)
)
window_weights = scipy.stats.beta.logpdf(
durations,
window_a,
window_b
)
weights = phylogeny_weights + window_weights
normalization = logsumexp(weights)
return weights - normalization
def __init__(self, rl, omega, beta,
phylogeny_mean, phylogeny_std,
window_typical_fraction=0.5, window_concentration = 2.0,
empty_probability=0.5):
self.rl = rl
self.omega = omega
self.beta = beta
self.phylogeny_mean = phylogeny_mean
self.phylogeny_std = phylogeny_std
self.window_typical_fraction = window_typical_fraction
self.window_concentration = window_concentration
self.empty_probability = empty_probability
def copy(self):
return LogisticRuleState(
self.rl.copy(),
self.omega.copy(), self.beta.copy(),
self.phylogeny_mean, self.phylogeny_std,
self.window_typical_fraction, self.window_concentration,
self.empty_probability
)
def step(self):
beta_values = []
for i in xrange(self.local_updates_per_structure_update):
self.update_omega()
self.update_beta()
beta_values.append(self.current_state.beta)
self.additional_beta_values.append(beta_values)
move_type_indicator = np.random.rand()
if move_type_indicator < 0.45:
self.update_primitives()
elif move_type_indicator < 0.9:
self.add_remove()
else:
self.move_primitive()
# If the structure moves are accepted, beta becomes out of date,
# perhaps grievously, so we defensively update it before
# recording the new state
self.update_beta()
self.update_empty_probability()
self.update_window_parameters()
# As a convenience, have update_phylogeny_parameters return
# the resulting state's prior value (which it calculates
# anyway) and record that rather than recalculating it.
prior = self.update_phylogeny_parameters()
self.states.append(self.current_state.copy())
self.likelihoods.append(
self.model.likelihood_y_given_X_beta(self.current_X,
self.current_state.beta)
)
self.priors.append(prior)
def update_beta(self):
""" Draw a new beta from its conditional distribution. """
state = self.current_state
# Note p may not equal len(self.current_state.beta)
_, p = self.current_X.shape
v = self.model.prior_coefficient_variance
kappa = self.model.data.y - 0.5
# In later revisions we can avoid this inversion
# CAUTION: the dot product here will be incorrect if current_X
# is stored as an array of Booleans rather than ints.
# the np.diag is wasteful here but okay for test purposes
posterior_variance = np.linalg.inv(
np.eye(p)/v +
np.dot(self.current_X.T,
(np.dot(np.diag(state.omega),self.current_X))
)
)
posterior_mean = np.dot(posterior_variance,
np.dot(self.current_X.T, kappa)
)
beta = np.random.multivariate_normal(posterior_mean,
posterior_variance)
state.beta = beta
########################################
########################################
########################################
##### CORE RL SAMPLING CODE
def probabilities(rule_list, beta, test_data):
"""
Predict probabilities of outcome according to the rule list.
"""
X = test_data.covariate_matrix(rule_list)
psi = np.dot(X,beta)
probabilities = 1.0/(1.0+np.exp(-1.0*psi))
return probabilities
def prediction(rule_list, beta, test_data):
"""
Predict y=1 for subjects with probability >= 0.5.
"""
X = test_data.covariate_matrix(rule_list)
psi = np.dot(X,beta)
probabilities = 1.0/(1.0+np.exp(-1.0*psi))
prediction = probabilities >= 0.5
return prediction
def step(self):
move_type_indicator = np.random.rand()
if move_type_indicator < -1.0:# 0.44
self.update_primitives()
elif move_type_indicator < 2.0:#0.66
self.add_remove()
else:
raise
# self.move_primitive()
# If the structure moves are accepted, beta becomes out of date,
# perhaps grievously, so we defensively update it before
# recording the new state
self.states.append(self.current_state.copy())
def step(self):
move_type_indicator = np.random.rand()
if move_type_indicator < 0.5:# 0.44
self.update_primitives()
elif move_type_indicator < 2.0:#0.66
self.add_remove()
else:
raise
# self.move_primitive()
# If the structure moves are accepted, beta becomes out of date,
# perhaps grievously, so we defensively update it before
# recording the new state
self.states.append(self.current_state.copy())
def cmf_prior(self, m2, m1):
"""from equation 8 of Metchev & Hillenbrand 2009"""
beta = -0.39
ret = (m2**(beta)) * (m1**(-1.0*beta-1.0))
#if ret==0: ret=-np.inf
return ret
def __init__(self, A=1.4, ?=0.6, ?=0.96, grid_size=50,
G=None, ?=np.sqrt, F=stats.beta(2, 2)):
self.A, self.?, self.? = A, ?, ?
# === set defaults for G, ? and F === #
self.G = G if G is not None else lambda x, ?: A * (x * ?)**?
self.? = ?
self.F = F
# === Set up grid over the state space for DP === #
# Max of grid is the max of a large quantile value for F and the
# fixed point y = G(y, 1).
grid_max = max(A**(1 / (1 - ?)), self.F.ppf(1 - ?))
self.x_grid = np.linspace(?, grid_max, grid_size)
def __init__(self, ?=0.95, c=0.6, F_a=1, F_b=1, G_a=3, G_b=1.2,
w_max=2, w_grid_size=40, ?_grid_size=40):
self.?, self.c, self.w_max = ?, c, w_max
self.F = ?_distribution(F_a, F_b, scale=w_max)
self.G = ?_distribution(G_a, G_b, scale=w_max)
self.f, self.g = self.F.pdf, self.G.pdf # Density functions
self.?_min, self.?_max = 1e-3, 1 - 1e-3 # Avoids instability
self.w_grid = np.linspace(0, w_max, w_grid_size)
self.?_grid = np.linspace(self.?_min, self.?_max, ?_grid_size)
x, y = np.meshgrid(self.w_grid, self.?_grid)
self.grid_points = np.column_stack((x.ravel(1), y.ravel(1)))
def test_beta_binomial_different_priors(db_path, sampler):
binomial_n = 5
def model(args):
return {"result": st.binom(binomial_n, args['theta']).rvs()}
models = [model for _ in range(2)]
models = list(map(SimpleModel, models))
population_size = ConstantPopulationSize(800)
a1, b1 = 1, 1
a2, b2 = 10, 1
parameter_given_model_prior_distribution = [Distribution(theta=RV("beta",
a1, b1)),
Distribution(theta=RV("beta",
a2, b2))]
abc = ABCSMC(models, parameter_given_model_prior_distribution,
MinMaxDistanceFunction(measures_to_use=["result"]),
population_size,
eps=MedianEpsilon(.1),
sampler=sampler)
n1 = 2
abc.new(db_path, {"result": n1})
minimum_epsilon = .2
history = abc.run(minimum_epsilon, max_nr_populations=3)
mp = history.get_model_probabilities(history.max_t)
def B(a, b):
return gamma(a) * gamma(b) / gamma(a + b)
def expected_p(a, b, n1):
return binom(binomial_n, n1) * B(a + n1, b + binomial_n - n1) / B(a, b)
p1_expected_unnormalized = expected_p(a1, b1, n1)
p2_expected_unnormalized = expected_p(a2, b2, n1)
p1_expected = p1_expected_unnormalized / (p1_expected_unnormalized +
p2_expected_unnormalized)
p2_expected = p2_expected_unnormalized / (p1_expected_unnormalized +
p2_expected_unnormalized)
assert abs(mp.p[0] - p1_expected) + abs(mp.p[1] - p2_expected) < .08
beta_test.py 文件源码
项目:DeepLearning_VirtualReality_BigData_Project
作者: rashmitripathi
项目源码
文件源码
阅读 35
收藏 0
点赞 0
评论 0
def testBetaMean(self):
with session.Session():
a = [1., 2, 3]
b = [2., 4, 1.2]
expected_mean = stats.beta.mean(a, b)
dist = beta_lib.Beta(a, b)
self.assertEqual(dist.mean().get_shape(), (3,))
self.assertAllClose(expected_mean, dist.mean().eval())
beta_test.py 文件源码
项目:DeepLearning_VirtualReality_BigData_Project
作者: rashmitripathi
项目源码
文件源码
阅读 35
收藏 0
点赞 0
评论 0
def testBetaVariance(self):
with session.Session():
a = [1., 2, 3]
b = [2., 4, 1.2]
expected_variance = stats.beta.var(a, b)
dist = beta_lib.Beta(a, b)
self.assertEqual(dist.variance().get_shape(), (3,))
self.assertAllClose(expected_variance, dist.variance().eval())
beta_test.py 文件源码
项目:DeepLearning_VirtualReality_BigData_Project
作者: rashmitripathi
项目源码
文件源码
阅读 31
收藏 0
点赞 0
评论 0
def testBetaEntropy(self):
with session.Session():
a = [1., 2, 3]
b = [2., 4, 1.2]
expected_entropy = stats.beta.entropy(a, b)
dist = beta_lib.Beta(a, b)
self.assertEqual(dist.entropy().get_shape(), (3,))
self.assertAllClose(expected_entropy, dist.entropy().eval())