def _calculate(self, X, y, categorical, metafeatures, helpers):
values = [val for val in helpers.get_value("NumSymbols") if val > 0]
if len(values) == 0:
return 0
# FIXME Error handle
std = np.nanstd(values)
return std if np.isfinite(std) else 0
python类nanstd()的实例源码
def _calculate(self, X, y, categorical, metafeatures, helpers):
kurts = helpers.get_value("Kurtosisses")
std = np.nanstd(kurts) if len(kurts) > 0 else 0
return std if np.isfinite(std) else 0
def normalize(array, window=11, normalize_variance=True):
"""
Arguments:
array (np.array): 2D array (time, member)
window (int): Window length to compute climatology
normalize_variance (bool): Adjust variance
Returns:
np.array: Array of normalized values (same size as input array)
"""
N = array.shape[1]
"""
Remove climatology so we can look at annomalies. Use separate obs and fcst climatology
otherwise the fcst variance is higher because obs gets the advantage of using its own
climatology.
"""
clim = climatology(array, window, use_future_years=True)
values = copy.deepcopy(array)
for i in range(0, N):
values[:, i] = (values[:, i] - clim)
if normalize_variance and array.shape[1] > 2:
"""
This removes any seasonally varying variance, which can cause the 1-year variance to be
larger than the 1/2 year variance, because the 1/2 year variance samples the summer months
more often than the winter months, because of the windowing approach. Also, this
normalization does not guarantee that the std of the whole timeseries is 1, therefore in
the plot, don't expect the first point to be 1.
The timeseries is scaled up again to match the average anomaly variance in the timeseries.
"""
std = np.nanstd(array, axis=1)
if np.min(std) == 0:
warning("Standard deviation of 0 at one or more days. Not normalizing variance")
else:
meanstd = np.nanmean(std)
for i in range(0, N):
values[:, i] = values[:, i] / std * meanstd
return values
def getStdError(self):
"""
get standard deviation of error over all joints, averaged over sequence
:return: standard deviation of error
"""
return numpy.nanmean(numpy.nanstd(numpy.sqrt(numpy.square(self.gt - self.joints).sum(axis=2)), axis=1))
def getJointStdError(self, jointID):
"""
get standard deviation of one joint, averaged over sequence
:param jointID: joint ID
:return: standard deviation of joint error
"""
return numpy.nanstd(numpy.sqrt(numpy.square(self.gt[:, jointID, :] - self.joints[:, jointID, :]).sum(axis=1)))
def test_basic_stats(x):
s = SummaryStats()
s.update(x)
assert s.count() == np.count_nonzero(~np.isnan(x))
np.testing.assert_allclose(s.sum(), np.nansum(x), rtol=RTOL, atol=ATOL)
np.testing.assert_equal(s.min(), np.nanmin(x) if len(x) else np.nan)
np.testing.assert_equal(s.max(), np.nanmax(x) if len(x) else np.nan)
np.testing.assert_allclose(s.mean(), np.nanmean(x) if len(x) else np.nan,
rtol=RTOL, atol=ATOL)
np.testing.assert_allclose(s.var(), np.nanvar(x) if len(x) else np.nan,
rtol=RTOL, atol=ATOL)
np.testing.assert_allclose(s.std(), np.nanstd(x) if len(x) else np.nan,
rtol=RTOL, atol=ATOL)
def plot_heatmaps(data, labels, alpha, mis, column_label, cont, topk=20, prefix='', focus=''):
cmap = sns.cubehelix_palette(as_cmap=True, light=.9)
m, nv = mis.shape
for j in range(m):
inds = np.where(np.logical_and(alpha[j] > 0, mis[j] > 0.))[0]
inds = inds[np.argsort(- alpha[j, inds] * mis[j, inds])][:topk]
if focus in column_label:
ifocus = column_label.index(focus)
if not ifocus in inds:
inds = np.insert(inds, 0, ifocus)
if len(inds) >= 2:
plt.clf()
order = np.argsort(cont[:,j])
subdata = data[:, inds][order].T
subdata -= np.nanmean(subdata, axis=1, keepdims=True)
subdata /= np.nanstd(subdata, axis=1, keepdims=True)
columns = [column_label[i] for i in inds]
sns.heatmap(subdata, vmin=-3, vmax=3, cmap=cmap, yticklabels=columns, xticklabels=False, mask=np.isnan(subdata))
filename = '{}/heatmaps/group_num={}.png'.format(prefix, j)
if not os.path.exists(os.path.dirname(filename)):
os.makedirs(os.path.dirname(filename))
plt.title("Latent factor {}".format(j))
plt.savefig(filename, bbox_inches='tight')
plt.close('all')
#plot_rels(data[:, inds], list(map(lambda q: column_label[q], inds)), colors=cont[:, j],
# outfile=prefix + '/relationships/group_num=' + str(j), latent=labels[:, j], alpha=0.1)
def first_order(feature, aggregates, verbose=False):
if not type(aggregates) is list:
aggregates = [aggregates]
for aggregate in aggregates:
if verbose:
print(' first order computation: ' + aggregate)
if aggregate == 'log':
feature = np.log(feature)
elif aggregate == 'sqrt':
feature = np.sqrt(feature)
elif aggregate == 'minlog':
feature = np.log(1 - feature)
elif aggregate == 'minsqrt':
feature = np.sqrt(1 - feature)
elif aggregate == 'mean':
# feature = np.mean(feature, axis=0)
feature = np.nanmean(feature, axis=0)
elif aggregate == 'var':
feature = np.var(feature, axis=0)
elif aggregate == 'std':
# feature = np.std(feature, axis=0)
feature = np.nanstd(feature, axis=0)
elif aggregate == 'stdmean':
feature = np.hstack([np.mean(feature, axis=0), np.std(feature, axis=0)])
elif aggregate == 'cov':
feature = np.flatten(np.cov(feature, axis=0))
elif aggregate == 'totvar':
feature = np.array([np.mean(np.var(feature, axis=0))])
elif aggregate == 'totstd':
feature = np.array([np.mean(np.std(feature, axis=0))])
elif aggregate == 'entropy':
feature = feature.flatten()
feature = np.array([stats.entropy(feature)])
elif aggregate == 'normentropy':
feature = feature.flatten()
feature = np.array([stats.entropy(feature) / np.log(feature.size)])
elif aggregate == 'information':
feature = - np.log(feature)
return feature
def test_nanstd(self):
tgt = np.std(self.mat)
for mat in self.integer_arrays():
assert_equal(np.nanstd(mat), tgt)
tgt = np.std(self.mat, ddof=1)
for mat in self.integer_arrays():
assert_equal(np.nanstd(mat, ddof=1), tgt)
def test_ddof(self):
nanfuncs = [np.nanvar, np.nanstd]
stdfuncs = [np.var, np.std]
for nf, rf in zip(nanfuncs, stdfuncs):
for ddof in [0, 1]:
tgt = [rf(d, ddof=ddof) for d in _rdat]
res = nf(_ndat, axis=1, ddof=ddof)
assert_almost_equal(res, tgt)
def __get_pd_mean(data, c=1.):
"""Get the mean and the standard deviation of data
Args:
data (numpy.ndarray): the data
Returns:
float, float: the mean and the standard deviation
"""
p = np.nanmean(data)
d = np.nanstd(data) / c
return p, d
def calculate_residual_distributions(self):
"""
This function ...
:return:
"""
# Inform the user
log.info("Calculating distributions of residual pixel values ...")
# Loop over the different colours
for colour_name in self.observed_colours:
# Debugging
log.debug("Calculating the distribution for the pixels of the " + colour_name + " residual map ...")
# Get an 1D array of the valid pixel values
pixel_values = None
# Create the distribution
distribution = Distribution.from_values(pixel_values)
# Debugging
#log.debug("Median " + colour_name + " residual: " + str(np.nanmedian(np.abs(residual))))
#log.debug("Standard deviation of " + colour_name + " residual: " + str(np.nanstd(residual)))
# Add the distribution to the dictionary
self.residual_distributions[colour_name] = distribution
# -----------------------------------------------------------------
def calculate_residual_distributions(self):
"""
This function ...
:return:
"""
# Inform the user
log.info("Calculating distributions of residual pixel values ...")
# Loop over the different colours
for colour_name in self.observed_colours:
# Debugging
log.debug("Calculating the distribution for the pixels of the " + colour_name + " residual map ...")
# Get an 1D array of the valid pixel values
pixel_values = None
# Create the distribution
distribution = Distribution.from_values(pixel_values)
# Debugging
#log.debug("Median " + colour_name + " residual: " + str(np.nanmedian(np.abs(residual))))
#log.debug("Standard deviation of " + colour_name + " residual: " + str(np.nanstd(residual)))
# Add the distribution to the dictionary
self.residual_distributions[colour_name] = distribution
# -----------------------------------------------------------------
def scaleParamsFromReference(img, reference):
# saving startup time:
from scipy.optimize import curve_fit
def ff(arr):
arr = imread(arr, 'gray')
if arr.size > 300000:
arr = arr[::10, ::10]
m = np.nanmean(arr)
s = np.nanstd(arr)
r = m - 3 * s, m + 3 * s
b = (r[1] - r[0]) / 5
return arr, r, b
img, imgr, imgb = ff(img)
reference, refr, refb = ff(reference)
nbins = np.clip(15, max(imgb, refb), 50)
refh = np.histogram(reference, bins=nbins, range=refr)[
0].astype(np.float32)
imgh = np.histogram(img, bins=nbins, range=imgr)[0].astype(np.float32)
import pylab as plt
plt.figure(1)
plt.plot(refh)
plt.figure(2)
plt.plot(imgh)
plt.show()
def fn(x, offs, div):
return (x - offs) / div
params, fitCovariances = curve_fit(fn, refh, imgh, p0=(0, 1))
perr = np.sqrt(np.diag(fitCovariances))
print('error scaling to reference image: %s' % perr[0])
# if perr[0] < 0.1:
return params[0], params[1]
def _update(self, limits=None):
curves = self.display.widget.curves
x = self.display.stack.values
y = []
s = self.pStd.value()
yStd = []
for curve in curves:
if limits:
b1 = np.argmax(curve.xData >= limits[0])
b2 = np.argmax(curve.xData >= limits[1])
if b2 == 0:
b2 = -1
else:
b1, b2 = None, None
y.append(np.nanmean(curve.yData[b1:b2]))
if s:
yStd.append(np.nanstd(curve.yData[b1:b2]))
if self.out is None or self.out.isClosed():
self.out = self.display.workspace.addDisplay(
origin=self.display,
axes=self.display.axes.copy(('stack', 1)),
title='ROI',
names=['ROI'],
data=[(x, y)])
else:
self.out.widget.curves[0].setData(x, y)
if s:
if self.outStd is None or self.outStd.isClosed():
self.outStd = self.display.workspace.addDisplay(
origin=self.display,
axes=self.display.axes.copy(('stack', 1)),
title='ROI - std',
names=['ROI - std'],
data=[(x, yStd)])
else:
self.outStd.widget.curves[0].setData(x, yStd)
def get_mat_movingstd(tsmat, periods):
mstd = np.empty(shape = tsmat.shape)
mstd.fill(np.NAN)
for i in xrange(tsmat.shape[0]):
j = i - periods + 1
if j < 0:
j = 0
mstd[i,:] = np.nanstd(tsmat[j:i+1,:], 0)
return mstd
cumulative_distributions.py 文件源码
项目:DR1_analysis
作者: GBTAmmoniaSurvey
项目源码
文件源码
阅读 35
收藏 0
点赞 0
评论 0
def calc_cumulative_dist(momlim=0.3,region_list=['L1688','NGC1333','B18','OrionA'],
file_extension='DR1_rebase3'):
projDir = '/media/DATAPART/projects/GAS/testing/'
min_bin = 20.5 # log 10 N(H2) - set by B18
max_bin = 26. # log 10 N(H2) - set by Orion A
bin_size = 0.05
nbins = np.int((max_bin - min_bin)/bin_size)
data_bin_vals = [min_bin + x * bin_size for x in range(nbins+1)]
# Loop over regions
for region_i in range(len(region_list)):
region = region_list[region_i]
nh3ImFits = projDir + '{0}/{0}_NH3_11_{1}_mom0_QA_trim.fits'.format(region,file_extension)
herColFits = 'nh2_regridded/{0}_NH2_regrid.fits'.format(region)
nh3_fits = fits.open(nh3ImFits)
nh2_regrid_fits = fits.open(herColFits)
h2_data = np.log10(nh2_regrid_fits[0].data)
nh3_data = nh3_fits[0].data
h2_mean_array = np.zeros(nbins)
h2_std_array = np.zeros(nbins)
nh3_frac_array = np.zeros(nbins)
for bin_i in range(nbins-1):
bin_h2_indices = np.where(np.logical_and(h2_data >= data_bin_vals[bin_i],
h2_data < data_bin_vals[bin_i+1]))
bin_h2_data = h2_data[bin_h2_indices]
bin_nh3_data = nh3_data[bin_h2_indices]
if np.count_nonzero(bin_nh3_data) != 0:
frac_above_mom = np.count_nonzero(bin_nh3_data > momlim)/(1.0*np.count_nonzero(bin_nh3_data))
else:
frac_above_mom = 0
bin_mean_h2 = np.nanmean(bin_h2_data)
bin_std_h2 = np.nanstd(bin_h2_data)
h2_mean_array[bin_i] = bin_mean_h2
nh3_frac_array[bin_i] = frac_above_mom
h2_std_array[bin_i] = bin_std_h2
# Write out bins
np.savetxt('cumulative/{0}_cumulative.txt'.format(region),
np.transpose([h2_mean_array,nh3_frac_array,h2_std_array]))
def corr(data):
ns = data.shape[0];
nt = data.shape[1];
mean = np.nanmean(data, axis = 0);
std = np.nanstd(data - mean, axis = 0);
c = np.zeros((nt, nt));
for t1 in range(nt):
#for t2 in range(nt):
#c[t1,t2] = np.nanmean( (data[:,t1] - mean[t1]) * (data[:,t2] - mean[t2]), axis = 0); # / (std[t1] * std[t2]);
c[t1,:] = np.nanmean( (data[:,:].T * data[:, t1]).T, axis = 0);
return c;
def downstddv(x):
if x.size < 4:
return np.nan
median = np.nanpercentile(x, 50)
return np.nanstd(x[x < median])
def get_stats(arr):
return np.array([
np.nanmean(arr),
np.nanvar(arr),
np.nanmedian(arr),
np.nanstd(arr),
arr.shape[0]
])