def remove_outliers(idx_ts):
"""Remove outliers from a given timestamps array.
Parameters
----------
idx_ts : numpy.ndarray
given timestamp array.
Returns
-------
idx_ts_new : numpy.ndarray.
outliers removed timestamp array.
"""
idx_ts_new = idx_ts.copy()
summary = np.percentile(idx_ts_new, [25, 50, 75])
high_lim = summary[0] - 1.5 * (summary[2] - summary[1])
low_lim = summary[2] + 1.5 * (summary[2] - summary[1])
idx_ts_new = idx_ts_new[~(idx_ts_new >= low_lim)]
idx_ts_new = idx_ts_new[~(idx_ts_new <= high_lim)]
return idx_ts_new
python类percentile()的实例源码
def _classify_gems(counts0, counts1):
""" Infer number of distinct transcriptomes present in each GEM (1 or 2) and
report cr_constants.GEM_CLASS_GENOME0 for a single cell w/ transcriptome 0,
report cr_constants.GEM_CLASS_GENOME1 for a single cell w/ transcriptome 1,
report cr_constants.GEM_CLASS_MULTIPLET for multiple transcriptomes """
# Assumes that most of the GEMs are single-cell; model counts independently
thresh0, thresh1 = [cr_constants.DEFAULT_MULTIPLET_THRESHOLD] * 2
if sum(counts0 > counts1) >= 1 and sum(counts1 > counts0) >= 1:
thresh0 = np.percentile(counts0[counts0 > counts1], cr_constants.MULTIPLET_PROB_THRESHOLD)
thresh1 = np.percentile(counts1[counts1 > counts0], cr_constants.MULTIPLET_PROB_THRESHOLD)
doublet = np.logical_and(counts0 >= thresh0, counts1 >= thresh1)
dtype = np.dtype('|S%d' % max(len(cls) for cls in cr_constants.GEM_CLASSES))
result = np.where(doublet, cr_constants.GEM_CLASS_MULTIPLET, cr_constants.GEM_CLASS_GENOME0).astype(dtype)
result[np.logical_and(np.logical_not(result == cr_constants.GEM_CLASS_MULTIPLET), counts1 > counts0)] = cr_constants.GEM_CLASS_GENOME1
return result
def get_normalized_dispersion(mat_mean, mat_var, nbins=20):
mat_disp = (mat_var - mat_mean) / np.square(mat_mean)
quantiles = np.percentile(mat_mean, np.arange(0, 100, 100 / nbins))
quantiles = np.append(quantiles, mat_mean.max())
# merge bins with no difference in value
quantiles = np.unique(quantiles)
if len(quantiles) <= 1:
# pathological case: the means are all identical. just return raw dispersion.
return mat_disp
# calc median dispersion per bin
(disp_meds, _, disp_bins) = scipy.stats.binned_statistic(mat_mean, mat_disp, statistic='median', bins=quantiles)
# calc median absolute deviation of dispersion per bin
disp_meds_arr = disp_meds[disp_bins-1] # 0th bin is empty since our quantiles start from 0
disp_abs_dev = abs(mat_disp - disp_meds_arr)
(disp_mads, _, disp_bins) = scipy.stats.binned_statistic(mat_mean, disp_abs_dev, statistic='median', bins=quantiles)
# calculate normalized dispersion
disp_mads_arr = disp_mads[disp_bins-1]
disp_norm = (mat_disp - disp_meds_arr) / disp_mads_arr
return disp_norm
def prctile(data, p_vals=[0, 25, 50, 75, 100], sorted_=False):
"""``prctile(data, 50)`` returns the median, but p_vals can
also be a sequence.
Provides for small samples or extremes IMHO better values than
matplotlib.mlab.prctile or np.percentile, however also slower.
"""
ps = [p_vals] if np.isscalar(p_vals) else p_vals
if not sorted_:
data = sorted(data)
n = len(data)
d = []
for p in ps:
fi = p * n / 100 - 0.5
if fi <= 0: # maybe extrapolate?
d.append(data[0])
elif fi >= n - 1:
d.append(data[-1])
else:
i = int(fi)
d.append((i + 1 - fi) * data[i] + (fi - i) * data[i + 1])
return d[0] if np.isscalar(p_vals) else d
def fit_predict(self, ts):
"""
Unsupervised training of TSBitMaps.
:param ts: 1-D numpy array or pandas.Series
:return labels: `+1` for normal observations and `-1` for abnormal observations
"""
assert self._lag_window_size > self._feature_window_size, 'lag_window_size must be >= feature_window_size'
self._ref_ts = ts
scores = self._slide_chunks(ts)
self._ref_bitmap_scores = scores
thres = np.percentile(scores[self._lag_window_size: -self._lead_window_size + 1], self._q)
labels = np.full(len(ts), 1)
for idx, score in enumerate(scores):
if score > thres:
labels[idx] = -1
return labels
def fit(self, X, y=None):
old_threshold = None
threshold = None
self.threshold_ = 0.0
self._fit(X,y)
count = 0
while count < 100 and (old_threshold is None or abs(threshold - old_threshold) > 0.01):
old_threshold = threshold
ss = self.decision_function(X,y)
threshold = percentile(ss, 100 * self.contamination)
self._fit(X[ss > threshold],y[ss > threshold] if y is not None else None)
count += 1
self.threshold_ = threshold
return self
def _update_quantile(self):
states = np.array(self._quantile_states, dtype=np.float32)
limited_action_values = self.action_values(states, self._limited_action)
base_action_values = np.max(
np.array(
[
self.action_values(states, action)
for action in six.moves.range(self.num_actions)
if action != self._limited_action
]
),
axis=0
)
target = np.percentile(
limited_action_values - base_action_values, self._quantile
)
print("REWARD PENALTY TARGET:", target)
self.quantile_value += self._quantile_update_rate * target
print("QUANTILE:", self.quantile_value)
def b_value(mags, mt, perc=[2.5, 97.5], n_reps=None):
"""Compute the b-value and optionally its confidence interval."""
# Extract magnitudes above completeness threshold
m = mags[mags >= mt]
# Compute b-value
b = (np.mean(m) - mt) * np.log(10)
# Draw bootstrap replicates
if n_reps is None:
return b
else:
m_bs_reps = dcst.draw_bs_reps(m, np.mean, size=n_reps)
# Compute b-value from replicates
b_bs_reps = (m_bs_reps - mt) * np.log(10)
# Compute confidence interval
conf_int = np.percentile(b_bs_reps, perc)
return b, conf_int
def simple_slope_percentiles(res, df, target, varying, percs=[25, 50, 75]):
exog = {}
for param in res.fe_params.index:
if len(param.split(":")) != 1:
continue
if param == "Intercept":
exog[param] = 1.0
else:
exog[param] = np.median(df[param])
ret_vals = collections.OrderedDict()
for varying_perc in percs:
exog[varying] = np.percentile(df[varying], varying_perc)
ret_vals[exog[varying]] = collections.defaultdict(list)
for target_perc in [25, 75]:
exog[target] = np.percentile(df[target], target_perc)
exog_arr = np.array([exog[param] if len(param.split(":")) == 1 else exog[param.split(":")[0]] * exog[param.split(":")[1]]
for param in res.fe_params.index])
ret_vals[exog[varying]]["endog"].append(res.model.predict(res.fe_params, exog=exog_arr))
ret_vals[exog[varying]]["target"].append(exog[target])
return ret_vals
def simple_slope_categories(res, df, target, cat, cats):
exog = {}
for param in res.fe_params.index:
if len(param.split(":")) != 1:
continue
if param == "Intercept":
exog[param] = 1.0
elif param in cats:
exog[param] = 0
else:
exog[param] = np.mean(df[param])
if cat != None:
exog[cat] = 1
x_points = []
y_points = []
for target_perc in [10, 90]:
exog[target] = np.percentile(df[target], target_perc)
# exog[target] = target_perc
exog_arr = np.array([exog[param] if len(param.split(":")) == 1 else exog[param.split(":")[0]] * exog[param.split(":")[1]]
for param in res.fe_params.index])
y_points.append(res.model.predict(res.fe_params, exog=exog_arr))
x_points.append(exog[target])
return x_points, y_points
def test_value_at_risk(self):
value_at_risk = self.empyrical.value_at_risk
returns = [1.0, 2.0]
assert_almost_equal(value_at_risk(returns, cutoff=0.0), 1.0)
assert_almost_equal(value_at_risk(returns, cutoff=0.3), 1.3)
assert_almost_equal(value_at_risk(returns, cutoff=1.0), 2.0)
returns = [1, 81, 82, 83, 84, 85]
assert_almost_equal(value_at_risk(returns, cutoff=0.1), 41)
assert_almost_equal(value_at_risk(returns, cutoff=0.2), 81)
assert_almost_equal(value_at_risk(returns, cutoff=0.3), 81.5)
# Test a returns stream of 21 data points at different cutoffs.
returns = np.random.normal(0, 0.02, 21)
for cutoff in (0, 0.0499, 0.05, 0.20, 0.999, 1):
assert_almost_equal(
value_at_risk(returns, cutoff),
np.percentile(returns, cutoff * 100),
)
def value_at_risk(returns, cutoff=0.05):
"""
Value at risk (VaR) of a returns stream.
Parameters
----------
returns : pandas.Series or 1-D numpy.array
Non-cumulative daily returns.
cutoff : float, optional
Decimal representing the percentage cutoff for the bottom percentile of
returns. Defaults to 0.05.
Returns
-------
VaR : float
The VaR value.
"""
return np.percentile(returns, 100 * cutoff)
def get_packets_per_second(trace, features):
"""
Gets the total number of packets per second along with mean, standard deviation, min, max and median
@return a 1D list that contains the packets per second
"""
packets_per_second = {}
for val in trace:
second = floor(val[0])
if second not in packets_per_second:
packets_per_second[second] = 0
packets_per_second[second] += 1
l = list(packets_per_second.values())
features.append(sum(l) / len(l))
features.append(np.std(l))
features.append(np.percentile(l, 50))
features.append(min(l))
features.append(max(l))
return l
def get_inter_arrival_time(trace, in_trace, out_trace, features):
"""
For both the complete trace, the trace only containing incoming and the trace only containing outcoming packets,
we calculate the inter-arrival time.
Next, we add the max, mean, standard deviation and third quartile as features.
@param in_trace contains all the incoming packets and nothing else
@param out_trace contains all the outcoming packets and nothing else
"""
complete_inter_arrival_time = inter_packet_time(trace)
in_inter_arrival_time = inter_packet_time(in_trace)
out_inter_arrival_time = inter_packet_time(out_trace)
inter_arrival_times = [complete_inter_arrival_time, in_inter_arrival_time, out_inter_arrival_time]
for time in inter_arrival_times:
if len(time) == 0:
features.extend([0, 0, 0, 0])
else:
features.append(max(time))
features.append(sum(time) / len(time))
features.append(np.std(time))
features.append(np.percentile(time, 75))
def get_transmission_time_stats(trace, in_trace, out_trace, features):
"""
For the complete trace and the traces only containing incoming and outcoming packets, we extract the following:
- First, second and third quartile
- Total transmission time
"""
in_times = [x[0] for x in in_trace]
out_times = [x[0] for x in out_trace]
total_times = [x[0] for x in trace]
times = [total_times, in_times, out_times]
for time in times:
if len(time) == 0:
features.extend([0, 0, 0, 0])
else:
features.append(np.percentile(time, 25))
features.append(np.percentile(time, 50))
features.append(np.percentile(time, 75))
features.append(np.percentile(time, 100))
def deltasCSVWriter(self, name='ant'):
"Changes"
header = array([h.name[1:] for h in self.test.headers[:-2]])
oldRows = [r for r, p in zip(self.test._rows, self.pred) if p > 0]
delta = array([self.delta(t) for t in oldRows])
y = median(delta, axis=0)
yhi, ylo = percentile(delta, q=[75, 25], axis=0)
dat1 = sorted(
[(h, a, b, c) for h, a, b, c in zip(header, y, ylo, yhi)], key=lambda F: F[1])
dat = asarray([(d[0], n, d[1], d[2], d[3])
for d, n in zip(dat1, range(1, 21))])
with open('/Users/rkrsn/git/GNU-Plots/rkrsn/errorbar/%s.csv' % (name), 'w') as csvfile:
writer = csv.writer(csvfile, delimiter=' ')
for el in dat[()]:
writer.writerow(el)
# new = [self.newRow(t) for t in oldRows]
def _nanpercentile(a, q, axis=None, out=None, overwrite_input=False,
interpolation='linear', keepdims=False):
"""
Private function that doesn't support extended axis or keepdims.
These methods are extended to this function using _ureduce
See nanpercentile for parameter usage
"""
if axis is None:
part = a.ravel()
result = _nanpercentile1d(part, q, overwrite_input, interpolation)
else:
result = np.apply_along_axis(_nanpercentile1d, axis, a, q,
overwrite_input, interpolation)
# apply_along_axis fills in collapsed axis with results.
# Move that axis to the beginning to match percentile's
# convention.
if q.ndim != 0:
result = np.rollaxis(result, axis)
if out is not None:
out[...] = result
return result
def test_out(self):
mat = np.random.rand(3, 3)
nan_mat = np.insert(mat, [0, 2], np.nan, axis=1)
resout = np.zeros(3)
tgt = np.percentile(mat, 42, axis=1)
res = np.nanpercentile(nan_mat, 42, axis=1, out=resout)
assert_almost_equal(res, resout)
assert_almost_equal(res, tgt)
# 0-d output:
resout = np.zeros(())
tgt = np.percentile(mat, 42, axis=None)
res = np.nanpercentile(nan_mat, 42, axis=None, out=resout)
assert_almost_equal(res, resout)
assert_almost_equal(res, tgt)
res = np.nanpercentile(nan_mat, 42, axis=(0, 1), out=resout)
assert_almost_equal(res, resout)
assert_almost_equal(res, tgt)
def test_multiple_percentiles(self):
perc = [50, 100]
mat = np.ones((4, 3))
nan_mat = np.nan * mat
# For checking consistency in higher dimensional case
large_mat = np.ones((3, 4, 5))
large_mat[:, 0:2:4, :] = 0
large_mat[:, :, 3:] *= 2
for axis in [None, 0, 1]:
for keepdim in [False, True]:
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter('always')
val = np.percentile(mat, perc, axis=axis, keepdims=keepdim)
nan_val = np.nanpercentile(nan_mat, perc, axis=axis,
keepdims=keepdim)
assert_equal(nan_val.shape, val.shape)
val = np.percentile(large_mat, perc, axis=axis,
keepdims=keepdim)
nan_val = np.nanpercentile(large_mat, perc, axis=axis,
keepdims=keepdim)
assert_equal(nan_val, val)
megamat = np.ones((3, 4, 5, 6))
assert_equal(np.nanpercentile(megamat, perc, axis=(1, 2)).shape, (2, 3, 6))
def test_keepdims(self):
d = np.ones((3, 5, 7, 11))
assert_equal(np.percentile(d, 7, axis=None, keepdims=True).shape,
(1, 1, 1, 1))
assert_equal(np.percentile(d, 7, axis=(0, 1), keepdims=True).shape,
(1, 1, 7, 11))
assert_equal(np.percentile(d, 7, axis=(0, 3), keepdims=True).shape,
(1, 5, 7, 1))
assert_equal(np.percentile(d, 7, axis=(1,), keepdims=True).shape,
(3, 1, 7, 11))
assert_equal(np.percentile(d, 7, (0, 1, 2, 3), keepdims=True).shape,
(1, 1, 1, 1))
assert_equal(np.percentile(d, 7, axis=(0, 1, 3), keepdims=True).shape,
(1, 1, 7, 1))
assert_equal(np.percentile(d, [1, 7], axis=(0, 1, 3),
keepdims=True).shape, (2, 1, 1, 7, 1))
assert_equal(np.percentile(d, [1, 7], axis=(0, 3),
keepdims=True).shape, (2, 1, 5, 7, 1))
def test_out_nan(self):
with warnings.catch_warnings(record=True):
warnings.filterwarnings('always', '', RuntimeWarning)
o = np.zeros((4,))
d = np.ones((3, 4))
d[2, 1] = np.nan
assert_equal(np.percentile(d, 0, 0, out=o), o)
assert_equal(
np.percentile(d, 0, 0, interpolation='nearest', out=o), o)
o = np.zeros((3,))
assert_equal(np.percentile(d, 1, 1, out=o), o)
assert_equal(
np.percentile(d, 1, 1, interpolation='nearest', out=o), o)
o = np.zeros(())
assert_equal(np.percentile(d, 1, out=o), o)
assert_equal(
np.percentile(d, 1, interpolation='nearest', out=o), o)
def quantile(x, q, weights=None):
"""
Like numpy.percentile, but:
* Values of q are quantiles [0., 1.] rather than percentiles [0., 100.]
* scalar q not supported (q must be iterable)
* optional weights on x
"""
if weights is None:
return np.percentile(x, [100. * qi for qi in q])
else:
idx = np.argsort(x)
xsorted = x[idx]
cdf = np.add.accumulate(weights[idx])
cdf /= cdf[-1]
return np.interp(q, cdf, xsorted).tolist()
def _degradation_CI(results):
'''
Description
-----------
Monte Carlo estimation of uncertainty in degradation rate from OLS results
Parameters
----------
results: OLSResults object from fitting a model of the form:
results = sm.OLS(endog = df.energy_ma, exog = df.loc[:,['const','years']]).fit()
Returns
-------
68.2% confidence interval for degradation rate
'''
sampled_normal = np.random.multivariate_normal(results.params, results.cov_params(), 10000)
dist = sampled_normal[:, 1] / sampled_normal[:, 0]
Rd_CI = np.percentile(dist, [50 - 34.1, 50 + 34.1]) * 100.0
return Rd_CI
def info(self,burn=1000,plot=False):
"""
Print the summary statistics and optionally plot the results
"""
rows=len(self.varnames)
cols=2
chain=np.array(self.chain[burn:])
nsize=chain.shape[0]
# print rows,cols
print '%4s %16s %12s %12s [%12s, %12s, %12s]'%('no','name','mean','stddev','16%','50%','84%')
for i,name in enumerate(self.varnames):
temp=np.percentile(chain[:,i],[16.0,84.0,50.0])
print '%4i %16s %12g %12g [%12g, %12g, %12g]'%(i,name,np.mean(chain[:,i]),(temp[1]-temp[0])/2.0,temp[0],temp[2],temp[1])
if plot:
ax=plt.subplot(rows,cols,2*i+1)
# plt.text(0.05,0.9,r'$\tau$='+'%5.1f'%(acor.acor(chain[:,i])[0]),transform=ax.transAxes)
plt.plot(chain[:,i])
plt.ylabel(self.model.descr[name][3])
plt.xlabel('Iteration')
ax=plt.subplot(rows,cols,2*i+2)
plt.hist(chain[:,i],bins=100,histtype='step')
plt.text(0.05,0.9,sround(np.mean(chain[:,i]),temp[0],temp[1]),transform=ax.transAxes)
plt.xlabel(self.model.descr[name][3])
# plt.text(0.05,0.9,'%6g %3g (%4g-%4g)'%(np.mean(chain[:,i]),(temp[1]-temp[0])/2.0,temp[0],temp[1]),transform=ax.transAxes)
def filterout_outliers(l_data, l_date):
'''
Return a list with data filtered from outliers
:param l_data: list. data to be analyzed
'''
# === old code to filter outliers =======
# Q3 = np.percentile(l_data, 98)
# Q1 = np.percentile(l_data, 2)
# step = (Q3 - Q1) * 1.5
# step = max(3000., step)
# na_val = np.array(l_data)
# na_val = na_val[(na_val >= Q1 - step) & (na_val <= Q3 + step)]
# return na_val
# =======================================
# group by minute
df_filter = pd.Series(np.array(l_date)/60).astype(int)
l_filter = list((df_filter != df_filter.shift()).values)
l_filter[0] = True
l_filter[-1] = True
return np.array(pd.Series(l_data)[l_filter].values)
def filterout_outliers(l_data, l_date):
'''
Return a list with data filtered from outliers
:param l_data: list. data to be analyzed
'''
# === old code to filter outliers =======
# Q3 = np.percentile(l_data, 98)
# Q1 = np.percentile(l_data, 2)
# step = (Q3 - Q1) * 1.5
# step = max(3000., step)
# na_val = np.array(l_data)
# na_val = na_val[(na_val >= Q1 - step) & (na_val <= Q3 + step)]
# return na_val
# =======================================
# group by minute
df_filter = pd.Series(np.array(l_date)/60).astype(int)
l_filter = list((df_filter != df_filter.shift()).values)
l_filter[0] = True
l_filter[-1] = True
return np.array(pd.Series(l_data)[l_filter].values)
def _cut_windows_vertically(self, door_top, roof_top, sky_sig, win_strip):
win_sig = np.percentile(win_strip, 85, axis=1)
win_sig[sky_sig > 0.5] = 0
if win_sig.max() > 0:
win_sig /= win_sig.max()
win_sig[:roof_top] = 0
win_sig[door_top:] = 0
runs, starts, values = run_length_encode(win_sig > 0.5)
win_heights = runs[values]
win_tops = starts[values]
if len(win_heights) > 0:
win_bottom = win_tops[-1] + win_heights[-1]
win_top = win_tops[0]
win_vertical_spacing = np.diff(win_tops).mean() if len(win_tops) > 1 else 0
else:
win_bottom = win_top = win_vertical_spacing = -1
self.top = int(win_top)
self.bottom = int(win_bottom)
self.vertical_spacing = int(win_vertical_spacing)
self.vertical_scores = make_list(win_sig)
self.heights = np.array(win_heights)
self.tops = np.array(win_tops)
def _first_interval(x, n_bins):
"""
Gets the first interval based on the percentiles,
either a closed interval containing the same value multiple times
or a closed-open interval with a different lower and upper bound.
"""
# calculate the percentiles
percentiles = np.linspace(0., 100., n_bins + 1)
bounds = np.percentile(x, q=percentiles, interpolation='higher')
lower = bounds[0]
upper = bounds[1]
if lower == upper:
return lower, upper, True, True
else:
return lower, upper, True, False
#------- private methods for categorical binnings-------#
def random_submatrix(Z,g,s,o,threshpct=99,returnIdx=False):
xg = np.zeros(Z.shape[0],dtype=np.bool)
xg[:g] = True
np.random.shuffle(xg)
xt = np.zeros(Z.shape[1],dtype=np.bool)
xt[:s] = True
np.random.shuffle(xt)
X0 = Z[xg][:,xt]
thresh = np.percentile(X0,threshpct)
X0[(X0 > thresh)] = thresh
xo = np.zeros(X0.shape[0],dtype=np.bool)
xo[:o] = True
np.random.shuffle(xo)
Xobs = X0[xo]
if returnIdx:
return X0,xo,Xobs,xt,xg
else:
return X0,xo,Xobs
def filter_and_smooth_angular_velocity(angular_velocity,
low_pass_kernel_size, clip_percentile, plot=False):
"""Reduce the noise in a velocity signal."""
max_value = np.percentile(angular_velocity, clip_percentile)
print("Clipping angular velocity norms to {} rad/s ...".format(max_value))
angular_velocity_clipped = np.clip(angular_velocity, -max_value, max_value)
print("Done clipping angular velocity norms...")
low_pass_kernel = np.ones((low_pass_kernel_size, 1)) / low_pass_kernel_size
print("Smoothing with kernel size {} samples...".format(low_pass_kernel_size))
angular_velocity_smoothed = signal.correlate(angular_velocity_clipped,
low_pass_kernel, 'same')
print("Done smoothing angular velocity norms...")
if plot:
plot_angular_velocities("Angular Velocities", angular_velocity,
angular_velocity_smoothed, True)
return angular_velocity_smoothed.copy()