def _estimateAll(self):
self._estimateQParams()
self.iters[-1].Vl_estimate = self.fn.Norm(self.all_itr.calcDeltaVl()) \
if not self.params.bayesian \
else self._estimateBayesianVl()
self.iters[-1].Wl_estimate = self.fn.WorkModel(lvls=self.last_itr.get_lvls())
self.iters[-1].bias = self._estimateBias()
Ca = norm.ppf(self.params.confidence)
self.iters[-1].stat_error = np.inf if np.any(self.last_itr.M == 0) \
else Ca * \
np.sqrt(np.sum(self.Vl_estimate / self.last_itr.M))
python类ppf()的实例源码
def _calcTheoryM(self, TOL, theta, Vl, Wl, ceil=True, minM=1):
Ca = norm.ppf(self.params.confidence)
M = (theta * TOL / Ca)**-2 *\
np.sum(np.sqrt(Wl * Vl)) * np.sqrt(Vl / Wl)
M = np.maximum(M, minM)
M[np.isnan(M)] = minM
if ceil:
M = np.ceil(M).astype(np.int)
return M
def _confidence_interval_by_alpha(cls, p_hat, n, alpha, method='wald'):
"""Compute confidence interval for estimate of Bernoulli parameter p.
Args:
p_hat: maximum likelihood estimate of p
n: samples observed
alpha: the probability that the true p falls outside the CI
Returns:
left, right
"""
prob = 1 - 0.5 * alpha
z = norm.ppf(prob)
compute_ci = cls._confidence_interval_by_z_wald if method == 'wald' else cls._confidence_interval_by_z_wilson
return compute_ci(p_hat, n, z)
def rz_ci(r, n, conf_level = 0.95):
zr_se = pow(1/(n - 3), .5)
moe = norm.ppf(1 - (1 - conf_level)/float(2)) * zr_se
zu = atanh(r) + moe
zl = atanh(r) - moe
return tanh((zl, zu))
def get_AQ(score):
score = float(score)
percentage = get_percentage(score)
z_score = norm.ppf(percentage)
return int(100 + (z_score * 24))
def _breakpoints(n_bins, scale=1.):
"""Compute breakpoints for a given number of SAX symbols and a given Gaussian scale.
Example
-------
>>> _breakpoints(n_bins=2)
array([ 0.])
"""
return norm.ppf([float(a) / n_bins for a in range(1, n_bins)], scale=scale)
def _bin_medians(n_bins, scale=1.):
"""Compute median value corresponding to SAX symbols for a given Gaussian scale.
Example
-------
>>> _bin_medians(n_bins=2)
array([-0.67448975, 0.67448975])
"""
return norm.ppf([float(a) / (2 * n_bins) for a in range(1, 2 * n_bins, 2)], scale=scale)
def sample_manifold2d(vae, N):
image = np.zeros((N*28, N*28))
for z1 in xrange(N):
for z2 in xrange(N):
z = [np.array([norm.ppf(z1*(1/N) + 1/(2*N)),
norm.ppf(z2*(1/N) + 1/(2*N))])]
sample = vae.sample(z).reshape((28, 28))
image[z1*28:(z1 + 1)*28,z2*28:(z2 + 1)*28] = sample
image *= 255.
plt.imshow(image.astype(np.uint8), cmap="gray")
plt.show()
def serial_test(generator, n_bits, n1=None, sig_level=None, misc=None):
if n1 is None:
n0, n1 = _calculate_n0_n1(generator, n_bits)
else:
n0 = n_bits - n1
n_xx = [0] * 4 # number of occurrences of 00, 01, 10, 11
bi0 = generator.random_bit() # bit b[i]
for i in range(n_bits - 1):
bi1 = generator.random_bit() # bit b[i+1]
n_xx[bi0 << 1 | bi1] += 1
bi0 = bi1
# Calculate the statistic
x2 = 4 / (n_bits - 1) * (n_xx[0b00] ** 2 + n_xx[0b01] ** 2 + n_xx[0b10] ** 2 + n_xx[0b11] ** 2)
x2 += -2 / n_bits * (n0 ** 2 + n1 ** 2) + 1
if type(misc) is dict:
misc.update(n0=n0, n1=n1, n00=n_xx[0b00], n01=n_xx[0b01], n10=n_xx[0b10], n11=n_xx[0b11])
if sig_level is None:
return x2
else:
limit = chi2.ppf(1 - sig_level, 2)
if type(misc) is dict:
misc.update(x=x2, limit=limit)
return x2 <= limit
def poker_test(generator, n_bits, m=None, sig_level=None, misc=None):
if m is not None:
if n_bits // m < 5 * (2 ** m):
raise ValueError("Value m must satisfy requirement [n/m]>=5*2^m")
else:
# find the highest suitable m value
m = int(log2(n_bits / 5))
while n_bits // m < 5 * (2 ** m):
m -= 1
k = n_bits // m
# Divide the sequence into k non-overlapping parts each of length m
# and let ni be the number of occurrences of the ith type of sequence of length m, 1 <= i <= 2m.
ni = [0] * (2 ** m)
for i in range(0, k * m, m):
t = concat_chunks([generator.random_bit() for _ in range(m)], bits=1)
ni[t] += 1
x3 = (2 ** m) / k * sum(map(lambda x: x ** 2, ni)) - k
if type(misc) is dict:
misc.update(m=m, k=k, ni=ni)
if sig_level is None:
return x3
else:
limit = chi2.ppf(1 - sig_level, (2 ** m) - 1)
if type(misc) is dict:
misc.update(x=x3, limit=limit)
return x3 <= limit
def autocorrelation_test(generator, n_bits, d, sig_level=None, misc=None):
if not (1 <= d <= n_bits // 2):
raise ValueError("Parameter d must be between 1 and [n/2]=%d" % (n_bits // 2))
# random bits from i to i+d
generated_bits = deque([generator.random_bit() for _ in range(d)], maxlen=d)
a = 0
for i in range(n_bits - d):
# a += sequence[i] ^ sequence[i + d]
s_i_d = generator.random_bit()
s_i_0 = generated_bits.popleft()
generated_bits.append(s_i_d)
a += s_i_0 ^ s_i_d
# Calculate the statistic
x5 = 2 * (a - (n_bits - d) / 2) / sqrt(n_bits - d)
if type(misc) is dict:
misc.update(a=a)
if sig_level is None:
return x5
else:
limit = -norm.ppf(sig_level / 2)
if type(misc) is dict:
misc.update(x=x5, limit=limit)
return -limit <= x5 <= limit
def generate(estimator):
from scipy.stats import norm
n = 15 # Figure row size
figure = np.zeros((28 * n, 28 * n))
# Random normal distributions to feed network with
x_axis = norm.ppf(np.linspace(0.05, 0.95, n))
y_axis = norm.ppf(np.linspace(0.05, 0.95, n))
samples = []
for i, x in enumerate(x_axis):
for j, y in enumerate(y_axis):
samples.append(np.array([x, y], dtype=np.float32))
samples = np.array(samples)
x_reconstructed = estimator.generate(
plx.processing.numpy_input_fn({'samples': samples}, batch_size=n * n, shuffle=False))
results = [x['results'] for x in x_reconstructed]
for i, x in enumerate(x_axis):
for j, y in enumerate(y_axis):
digit = results[i * n + j].reshape(28, 28)
figure[i * 28: (i + 1) * 28, j * 28: (j + 1) * 28] = digit
try:
import matplotlib.pyplot as plt
plt.figure(figsize=(10, 10))
plt.imshow(figure, cmap='Greys_r')
plt.show()
except ImportError:
pass
def Z(x):
return norm.ppf(x)
def saxBreakpoints(cardinality):
return norm.ppf(quantiles(cardinality))
def _make_latent_exploration_op(self):
"""
Code adapted from https://github.com/fastforwardlabs/vae-tf/blob/master/plot.py
"""
# ops for exploration of latent space
nx = 30
ny = nx
z_dim = self.target_dist.dim
if self.latent_dist.dim==2:
range_ = (0, 1)
min_, max_ = range_
zs = np.rollaxis(np.mgrid[max_:min_:ny*1j, min_:max_:nx*1j], 0, 3)
if isinstance(self.target_dist, Gaussian):
from scipy.stats import norm
DELTA = 1E-8 # delta to avoid +/- inf at 0, 1 boundaries
zs = np.array([norm.ppf(np.clip(z, TINY, 1 - TINY),
scale=self.target_dist.stddev)
for z in zs])
else:
raise NotImplementedError
zs = tf.constant(zs.reshape((nx*ny, z_dim)),
dtype=tf.float32)
self.zs = tf.placeholder_with_default(zs,
shape=[None, z_dim],
name="zs")
hs_decoded = self.decoder_template.construct(z_in=self.zs,
phase=pt.Phase.test).tensor
xs_dist_info = self.output_dist.activate_dist(hs_decoded)
xs = self.output_dist.sample(xs_dist_info)
imgs = tf.reshape(xs, [nx, ny] + list(self.dataset.image_shape))
stacked_img = []
for row in xrange(nx):
row_img = []
for col in xrange(ny):
row_img.append(imgs[row, col, :, :, :])
stacked_img.append(tf.concat(axis=1, values=row_img))
self.latent_exploration_op = tf.concat(axis=0, values=stacked_img)
def toy_data_quantile(n_samples=50, probs=0.5, pattern=SinePattern(),
noise=(1., .2, 0., 1.5), random_state=None):
"""Sine wave toy dataset.
The target y is computed as a sine curve at of modulated by a sine envelope
with some period (default 1/3Hz) and mean (default 1). Moreover, this
pattern is distorted with a random Gaussian noise with mean 0 and a
linearly decreasing standard deviation (default from 1.2 at X = 0 to 0.2 at
X = 1 . 5).
Parameters
----------
n_samples : int
Number of samples to generate.
probs : list or float, shape = [n_quantiles], default=0.5
Probabilities (quantiles levels).
pattern : callable, default = SinePattern()
Callable which generates n_sample 1D data (inputs and targets).
noise : :rtype: (float, float, float, float)
Noise parameters (variance, shift, support_min, support_max).
Returns
-------
X : array, shape = [n_samples, 1]
Input data.
y : array, shape = [n_sample, 1]
Targets.
quantiles : array, shape = [n_samples x n_quantiles]
True conditional quantiles.
"""
probs = array(probs, ndmin=1)
noise_var, noise_shift, noise_min, noise_max = noise
random_state = check_random_state(random_state)
pattern.random_state = random_state
inputs, targets = pattern(n_samples)
# Noise decreasing std (from noise+0.2 to 0.2)
noise_std = noise_shift + (noise_var * (noise_max - inputs) /
(noise_max - noise_min))
# Gaussian noise with decreasing std
add_noise = noise_std * random_state.randn(n_samples, 1)
observations = targets + add_noise
quantiles = [targets + array([norm.ppf(p, loc=0., scale=abs(noise_c))
for noise_c in noise_std]) for p in probs]
return inputs, observations.ravel(), quantiles
def runs_test(generator, n_bits, sig_level=None, fips_style=False, misc=None):
# The expected number of gaps (or blocks) of length i in a random sequence of length n is
# e[i] = (n-i+3)=2^(i+2). Let k be equal to the largest integer i for which ei >= 5.
def f_e(j):
return (n_bits - j + 3) / 2 ** (j + 2)
if fips_style:
k = 6
else:
k = 1
while f_e(k + 1) >= 5:
k += 1
# Let B[i], G[i] be the number of blocks and gaps, respectively, of length i in s for each i,
# 1 <= i <= k.
run_bit = None
run_length = 0
max_run_length = 0
b = [0] * k # zero-indexed
g = [0] * k
for i in range(n_bits + 1):
bit = generator.random_bit() if i < n_bits else None
# ongoing run
if run_bit == bit:
run_length += 1
# run ended (or it's the beginning or end)
if run_bit != bit:
if run_length > max_run_length:
max_run_length = run_length
if fips_style and run_length > 6:
run_length = 6
if 1 <= run_length <= k:
if run_bit == 0:
g[run_length - 1] += 1 # zero-indexed!
elif run_bit == 1:
b[run_length - 1] += 1
# reset counter
run_bit = bit
run_length = 1
# Calculate the statistic:
e = [f_e(i) for i in range(1, k + 1)]
x4 = sum([(x - e[i]) ** 2 / e[i] for i, x in list(enumerate(b)) + list(enumerate(g))])
if type(misc) is dict:
misc.update(k=k, b=b, g=g, e=e, max_run=max_run_length)
if sig_level is None:
return x4
else:
limit = chi2.ppf(1 - sig_level, 2 * (k - 1))
if type(misc) is dict:
misc.update(x=x4, limit=limit)
return x4 <= limit
def mk_ts(df, const, group1, orderby = 'year', alpha = 0.05):
"""
df = dataframe
const = variable tested for trend
group1 = variable to group by
orderby = variable to order by (typically a date)
"""
def zcalc(Sp, Varp):
if Sp > 0:
return (Sp - 1)/Varp**0.5
elif Sp < 0:
return (Sp + 1)/Varp**0.5
else:
return 0
df.is_copy = False
df[const] = pd.to_numeric(df.ix[:,const])
# remove null values
df[const].dropna(inplace=True)
# remove index
df.reset_index(inplace=True, drop=True)
# sort by groups, then time
df.sort_values(by=[group1,orderby],axis=0, inplace=True)
# group by group and apply mk_test
dg = df.groupby(group1).apply(lambda x: mk_test(x.loc[:,const].dropna().values, alpha))
Var_S = dg.loc[:,'varS'].sum()
S = dg.loc[:,'s'].sum()
N = dg.loc[:,'n'].sum()
Z = zcalc(S,Var_S)
P = 2*(1-norm.cdf(abs(Z)))
group_n = len(dg)
h = abs(Z) > norm.ppf(1-alpha/2)
tau = S/dg.loc[:,'ta'].sum()
if (Z<0) and h:
trend = 'decreasing'
elif (Z>0) and h:
trend = 'increasing'
else:
trend = 'no trend'
return pd.Series({'S':S, 'Z':round(Z,2), 'p':P, 'trend':trend, 'group_n':group_n, 'sample_n':N, 'Var_S':Var_S, 'tau':round(tau,2)})