def test_against_numpy_nanstd(self):
source = [np.random.random((16, 12, 5)) for _ in range(10)]
for arr in source:
arr[randint(0, 15), randint(0, 11), randint(0, 4)] = np.nan
stack = np.stack(source, axis = -1)
for axis in (0, 1, 2, None):
for ddof in range(4):
with self.subTest('axis = {}, ddof = {}'.format(axis, ddof)):
from_numpy = np.nanstd(stack, axis = axis, ddof = ddof)
from_ivar = last(istd(source, axis = axis, ddof = ddof, ignore_nan = True))
self.assertSequenceEqual(from_numpy.shape, from_ivar.shape)
self.assertTrue(np.allclose(from_ivar, from_numpy))
python类nanstd()的实例源码
def plot_heatmaps(data, mis, column_label, cont, topk=30, prefix=''):
cmap = sns.cubehelix_palette(as_cmap=True, light=.9)
m, nv = mis.shape
for j in range(m):
inds = np.argsort(- mis[j, :])[:topk]
if len(inds) >= 2:
plt.clf()
order = np.argsort(cont[:,j])
subdata = data[:, inds][order].T
subdata -= np.nanmean(subdata, axis=1, keepdims=True)
subdata /= np.nanstd(subdata, axis=1, keepdims=True)
columns = [column_label[i] for i in inds]
sns.heatmap(subdata, vmin=-3, vmax=3, cmap=cmap, yticklabels=columns, xticklabels=False, mask=np.isnan(subdata))
filename = '{}/heatmaps/group_num={}.png'.format(prefix, j)
if not os.path.exists(os.path.dirname(filename)):
os.makedirs(os.path.dirname(filename))
plt.title("Latent factor {}".format(j))
plt.yticks(rotation=0)
plt.savefig(filename, bbox_inches='tight')
plt.close('all')
#plot_rels(data[:, inds], map(lambda q: column_label[q], inds), colors=cont[:, j],
# outfile=prefix + '/relationships/group_num=' + str(j), latent=labels[:, j], alpha=0.1)
def test_ddof_too_big(self):
nanfuncs = [np.nanvar, np.nanstd]
stdfuncs = [np.var, np.std]
dsize = [len(d) for d in _rdat]
for nf, rf in zip(nanfuncs, stdfuncs):
for ddof in range(5):
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter('always')
tgt = [ddof >= d for d in dsize]
res = nf(_ndat, axis=1, ddof=ddof)
assert_equal(np.isnan(res), tgt)
if any(tgt):
assert_(len(w) == 1)
assert_(issubclass(w[0].category, RuntimeWarning))
else:
assert_(len(w) == 0)
def _calculate(self, X, y, categorical, metafeatures, helpers):
skews = helpers.get_value("Skewnesses")
std = np.nanstd(skews) if len(skews) > 0 else 0
return std if np.isfinite(std) else 0
# @metafeatures.define("cancor1")
# def cancor1(X, y):
# pass
# @metafeatures.define("cancor2")
# def cancor2(X, y):
# pass
################################################################################
# Information-theoretic metafeatures
def test_ddof_too_big(self):
nanfuncs = [np.nanvar, np.nanstd]
stdfuncs = [np.var, np.std]
dsize = [len(d) for d in _rdat]
for nf, rf in zip(nanfuncs, stdfuncs):
for ddof in range(5):
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter('always')
tgt = [ddof >= d for d in dsize]
res = nf(_ndat, axis=1, ddof=ddof)
assert_equal(np.isnan(res), tgt)
if any(tgt):
assert_(len(w) == 1)
assert_(issubclass(w[0].category, RuntimeWarning))
else:
assert_(len(w) == 0)
def add_MACD(data, Ns=[12,26,9]):
'''
:param data: DataFrame containing stock price info in the second column
:param Ns: List of short term long term EMA to use and look-back window of MACD's EMA
:return:
'''
symbol = data.columns.values[1] # assuming stock price is in the second column in data
MACD = cal_EMA(data.ix[:,[symbol]],N=Ns[0]) - cal_EMA(data.ix[:,[symbol]],N=Ns[1])
data['MACD'] = MACD
signal = cal_EMA(data.MACD[Ns[1]:],N=Ns[2])
# # normalized them
# MACD = (MACD - np.nanmean(MACD))/(2*np.nanstd(MACD))
# signal = (signal - np.nanmean(signal))/(2*np.nanstd(signal))
data['MACD'] = MACD
data['Signal'] = 'NaN'
data.loc[Ns[1]:,'Signal'] = signal
return data
def add_MACD(data, Ns=None):
'''
:param data: DataFrame containing stock price info in the second column
:param Ns: List of short term long term EMA to use and look-back window of MACD's EMA
:return:
'''
if Ns is None:
Ns = [12, 26, 9]
symbol = data.columns.values[1] # assuming stock price is in the second column in data
MACD = cal_EMA(data.loc[:, symbol], N=Ns[0]) - cal_EMA(data.loc[:, symbol], N=Ns[1])
data['MACD'] = MACD
signal = cal_EMA(data.MACD[Ns[1]:], N=Ns[2])
# # normalized them
# MACD = (MACD - np.nanmean(MACD))/(2*np.nanstd(MACD))
# signal = (signal - np.nanmean(signal))/(2*np.nanstd(signal))
# data['MACD'] = MACD
data['Signal'] = 'NaN'
data.loc[Ns[1]:, 'Signal'] = signal
return data
def addNoise(img, snr=25, rShot=0.5):
'''
adds Gaussian (thermal) and shot noise to [img]
[img] is assumed to be noise free
[rShot] - shot noise ratio relative to all noise
'''
s0, s1 = img.shape[:2]
m = img.mean()
if np.isnan(m):
m = np.nanmean(img)
assert m != 0, 'image mean cannot be zero'
img = img / m
noise = np.random.normal(size=s0 * s1).reshape(s0, s1)
if rShot > 0:
noise *= (rShot * img**0.5 + 1)
noise /= np.nanstd(noise)
noise[np.isnan(noise)] = 0
return m * (img + noise / snr)
def histo_plot(figure,X,color,xlabel="",ylabel="",cumul=False,bar=True,n_points=400,smooth_factor=0.1,spline_order=3,linewidth=3,alpha=1.0,label=""):
if '%' in xlabel:
magnitude = 100
X_values = np.array(np.minimum(np.around(X),n_points+1),int)
else:
# magnitude = np.power(10,np.around(4*np.log10(X.mean()))/4+0.5)
magnitude = np.power(10,np.around(4*np.log10(np.nanmean(X)+np.nanstd(X)+1e-7))/4+1)
magnitude = np.around(magnitude,int(-np.log10(magnitude))+1)
# print magnitude
#magnitude = X.mean()+5.0*X.std()
X_values = np.array(np.minimum(np.around(n_points*X[True-np.isnan(X)]/magnitude),n_points+1),int)
X_histo = np.zeros(n_points+1,float)
for x in np.linspace(0,n_points,n_points+1):
X_histo[x] = nd.sum(np.ones_like(X_values,float),X_values,index=x)
if '%' in ylabel:
X_histo[x] /= X_values.size/100.0
if cumul:
X_histo[x] += X_histo[x-1]
if bar:
bar_plot(figure,np.linspace(0,magnitude,n_points+1),X_histo,np.array([1,1,1]),color,xlabel,ylabel,label=label)
else:
smooth_plot(figure,np.linspace(0,magnitude,n_points+1),X_histo,color,color,xlabel,ylabel,n_points=n_points,smooth_factor=smooth_factor,spline_order=spline_order,linewidth=linewidth,alpha=alpha,label=label)
def test_ddof_too_big(self):
nanfuncs = [np.nanvar, np.nanstd]
stdfuncs = [np.var, np.std]
dsize = [len(d) for d in _rdat]
for nf, rf in zip(nanfuncs, stdfuncs):
for ddof in range(5):
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter('always')
tgt = [ddof >= d for d in dsize]
res = nf(_ndat, axis=1, ddof=ddof)
assert_equal(np.isnan(res), tgt)
if any(tgt):
assert_(len(w) == 1)
assert_(issubclass(w[0].category, RuntimeWarning))
else:
assert_(len(w) == 0)
def process(self, obj_data):
'''
Removes table data with large snow errors
@param obj_data: Input DataWrapper, will be modified in place
'''
bad_stations = []
sigma_multiplier = self.ap_paramList[0]()
for label, data in obj_data.getIterator():
if len(data[data[self.snow_column]==4]) > 0 and len(data[data[self.snow_column]==2]) > 0:
snow = data[data[self.snow_column]==4].loc[:,self.column_name]
no_snow = data[data[self.snow_column]==2].loc[:,self.column_name]
non_snow_std = np.nanstd(no_snow)
snow_std = np.nanstd(snow)
if snow_std > sigma_multiplier * non_snow_std:
bad_stations.append(label)
if len(bad_stations) > 0:
obj_data.removeFrames(bad_stations)
def test_ddof_too_big(self):
nanfuncs = [np.nanvar, np.nanstd]
stdfuncs = [np.var, np.std]
dsize = [len(d) for d in _rdat]
for nf, rf in zip(nanfuncs, stdfuncs):
for ddof in range(5):
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter('always')
tgt = [ddof >= d for d in dsize]
res = nf(_ndat, axis=1, ddof=ddof)
assert_equal(np.isnan(res), tgt)
if any(tgt):
assert_(len(w) == 1)
assert_(issubclass(w[0].category, RuntimeWarning))
else:
assert_(len(w) == 0)
def X(self):
# Scale the data across features
X = stats.zscore(np.asarray(self.betas))
# Remove zero-variance features
X = X[:, self.good_voxels]
assert not np.isnan(X).any()
# Regress out behavioral confounds
rt = np.asarray(self.rt)
m, s = np.nanmean(rt), np.nanstd(rt)
rt = np.nan_to_num((rt - m) / s)
X = OLS(X, rt).fit().resid
return X
def split_and_zscore(self, data, test_run):
# Enforse type and size of the data
data = np.asarray(data)
if data.ndim == 1:
data = np.expand_dims(data, 1)
# Identify training and test samples
train = np.asarray(self.runs != test_run)
test = np.asarray(self.runs == test_run)
train_data = data[train]
test_data = data[test]
# Compute the mean and standard deviation of the training set
m, s = np.nanmean(train_data), np.nanstd(train_data)
# Scale the training and test set
train_data = (train_data - m) / s
test_data = (test_data - m) / s
return train_data, test_data
def test_dtype_from_dtype(self):
mat = np.eye(3)
codes = 'efdgFDG'
for nf, rf in zip(self.nanfuncs, self.stdfuncs):
for c in codes:
with suppress_warnings() as sup:
if nf in {np.nanstd, np.nanvar} and c in 'FDG':
# Giving the warning is a small bug, see gh-8000
sup.filter(np.ComplexWarning)
tgt = rf(mat, dtype=np.dtype(c), axis=1).dtype.type
res = nf(mat, dtype=np.dtype(c), axis=1).dtype.type
assert_(res is tgt)
# scalar case
tgt = rf(mat, dtype=np.dtype(c), axis=None).dtype.type
res = nf(mat, dtype=np.dtype(c), axis=None).dtype.type
assert_(res is tgt)
def test_dtype_from_char(self):
mat = np.eye(3)
codes = 'efdgFDG'
for nf, rf in zip(self.nanfuncs, self.stdfuncs):
for c in codes:
with suppress_warnings() as sup:
if nf in {np.nanstd, np.nanvar} and c in 'FDG':
# Giving the warning is a small bug, see gh-8000
sup.filter(np.ComplexWarning)
tgt = rf(mat, dtype=c, axis=1).dtype.type
res = nf(mat, dtype=c, axis=1).dtype.type
assert_(res is tgt)
# scalar case
tgt = rf(mat, dtype=c, axis=None).dtype.type
res = nf(mat, dtype=c, axis=None).dtype.type
assert_(res is tgt)
def test_ddof_too_big(self):
nanfuncs = [np.nanvar, np.nanstd]
stdfuncs = [np.var, np.std]
dsize = [len(d) for d in _rdat]
for nf, rf in zip(nanfuncs, stdfuncs):
for ddof in range(5):
with suppress_warnings() as sup:
sup.record(RuntimeWarning)
sup.filter(np.ComplexWarning)
tgt = [ddof >= d for d in dsize]
res = nf(_ndat, axis=1, ddof=ddof)
assert_equal(np.isnan(res), tgt)
if any(tgt):
assert_(len(sup.log) == 1)
else:
assert_(len(sup.log) == 0)
def zscore_cooccur(prob_observed, prob_shuffle):
"""Compare co-occurrence observed probabilities with shuffle
Parameters
----------
prob_observed : np.array
prob_shuffle : np.array
Returns
-------
prob_zscore : np.array
"""
num_neurons = prob_observed.shape[0]
prob_zscore = np.zeros((num_neurons, num_neurons))
for i in range(num_neurons):
for j in range(num_neurons):
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=RuntimeWarning)
prob_zscore[i][j] = (prob_observed[i][j] -
np.nanmean(np.squeeze(prob_shuffle[:, i, j]))) / np.nanstd(np.squeeze(prob_shuffle[:, i, j]))
return prob_zscore
def init_distrib_idx(self, distrib, idx=None):
assert isinstance(distrib, DistribGauss)
x = distrib.get_mu()
if idx is None:
# initialize prior and thus average over all cases
mu = np.nanmean(x, axis=0, keepdims=True)
else:
# select cases idx
mu = x[idx, :]
idx_nan = np.isnan(mu)
if np.any(idx_nan):
# we need to randomly select new values for all NaNs
idx_good = np.ones_like(idx, dtype=bool)
idx_good[idx, :] = False
idx_good[np.isnan(x)] = False
x_good = x[idx_good, :]
num_nan = np.count_nonzero(idx_nan)
mu[idx_nan] = np.random.choice(x_good, num_nan, replace=False)
mu = np.copy(mu) # make sure to not overwrite data
std = np.empty_like(mu)
std.fill(np.asscalar(np.nanstd(x)))
self.init_data(mu, std)
def normalize_mean_std(X, x_mean=None, x_std=None, axis=0, ignore_nan=False):
if ignore_nan:
mean = np.nanmean
std = np.nanstd
else:
mean = np.mean
std = np.std
if x_mean is None:
x_mean = mean(X, axis=axis, keepdims=True)
if x_std is None:
x_std = std(X, axis=axis, keepdims=True)
x_std[~np.isfinite(1 / x_std)] = 1
if np.any(~np.isfinite(x_mean)):
warnings.warn("x_mean contains NaN or Inf values!")
if np.any(~np.isfinite(x_std)):
warnings.warn("x_std contains NaN or Inf values!")
X = X - x_mean
X = X / x_std
return X, x_mean, x_std
def average_with_nan(detector_list, error_estimate=ErrorEstimate.stderr):
"""
Calculate average detector object, excluding malformed data (NaN) from averaging.
:param detector_list:
:param error_estimate:
:return:
"""
# TODO add compatibility check
result = Detector()
result.counter = len(detector_list)
result.data_raw = np.nanmean([det.data_raw for det in detector_list], axis=0)
if result.counter > 1 and error_estimate != ErrorEstimate.none:
# s = stddev = sqrt(1/(n-1)sum(x-<x>)**2)
# s : corrected sample standard deviation
result.error_raw = np.nanstd([det.data_raw for det in detector_list], axis=0, ddof=1)
# if user requested standard error then we calculate it as:
# S = stderr = stddev / sqrt(n), or in other words,
# S = s/sqrt(N) where S is the corrected standard deviation of the mean.
if error_estimate == ErrorEstimate.stderr:
result.error_raw /= np.sqrt(result.counter) # np.sqrt() always returns np.float64
else:
result.error_raw = np.zeros_like(result.data_raw)
return result
def test_ddof_too_big(self):
nanfuncs = [np.nanvar, np.nanstd]
stdfuncs = [np.var, np.std]
dsize = [len(d) for d in _rdat]
for nf, rf in zip(nanfuncs, stdfuncs):
for ddof in range(5):
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter('always')
tgt = [ddof >= d for d in dsize]
res = nf(_ndat, axis=1, ddof=ddof)
assert_equal(np.isnan(res), tgt)
if any(tgt):
assert_(len(w) == 1)
assert_(issubclass(w[0].category, RuntimeWarning))
else:
assert_(len(w) == 0)
def getStdError(self):
"""
get standard deviation of error over all joints, averaged over sequence
:return: standard deviation of error
"""
return numpy.nanmean(numpy.nanstd(numpy.sqrt(numpy.square(self.gt - self.joints).sum(axis=2)), axis=1))
def getJointStdError(self, jointID):
"""
get standard deviation of one joint, averaged over sequence
:param jointID: joint ID
:return: standard deviation of joint error
"""
return numpy.nanstd(numpy.sqrt(numpy.square(self.gt[:, jointID, :] - self.joints[:, jointID, :]).sum(axis=1)))
def test_nanstd(self):
tgt = np.std(self.mat)
for mat in self.integer_arrays():
assert_equal(np.nanstd(mat), tgt)
tgt = np.std(self.mat, ddof=1)
for mat in self.integer_arrays():
assert_equal(np.nanstd(mat, ddof=1), tgt)
def test_ddof(self):
nanfuncs = [np.nanvar, np.nanstd]
stdfuncs = [np.var, np.std]
for nf, rf in zip(nanfuncs, stdfuncs):
for ddof in [0, 1]:
tgt = [rf(d, ddof=ddof) for d in _rdat]
res = nf(_ndat, axis=1, ddof=ddof)
assert_almost_equal(res, tgt)
def mean_std(data):
# TODO: assert is a np.array
mean = np.nanmean(data, axis=0)
std = np.nanstd(data, axis=0)
return [mean, std]
def compute(self, today, asset_ids, out, values):
# *inputs are M x N numpy arrays, where M is the window_length and N is the number of securities
# out is an empty array of length N. out will be the output of our custom factor each day. The job of compute is to write output values into out.
# asset_ids will be an integer array of length N containing security ids corresponding to the columns in our *inputs arrays.
# today will be a pandas Timestamp representing the day for which compute is being called.
# Calculates the column-wise standard deviation, ignoring NaNs
out[:] = numpy.nanstd(values, axis=0)
# instantiate custom factor in make_pipeline()
def reject_outliers(data, m=3):
data[abs(data - np.nanmean(data, axis=0)) > m * np.nanstd(data, axis=0)] = np.nan
return data
def _get_stderr_lb_varyinglens(x):
mus, stds, ns = [], [], []
for temp_list in zip_longest(*x, fillvalue=np.nan):
mus.append(np.nanmean(temp_list))
n = len(temp_list) - np.sum(np.isnan(temp_list))
stds.append(np.nanstd(temp_list, ddof=1 if n > 1 else 0))
ns.append(n)
return np.array(mus) - np.array(stds) / np.sqrt(ns)