def test_mutation(self):
# Check that passed array is not modified.
ndat = _ndat.copy()
np.nanmedian(ndat)
assert_equal(ndat, _ndat)
python类nanmedian()的实例源码
def test_keepdims(self):
mat = np.eye(3)
for axis in [None, 0, 1]:
tgt = np.median(mat, axis=axis, out=None, overwrite_input=False)
res = np.nanmedian(mat, axis=axis, out=None, overwrite_input=False)
assert_(res.ndim == tgt.ndim)
d = np.ones((3, 5, 7, 11))
# Randomly set some elements to NaN:
w = np.random.random((4, 200)) * np.array(d.shape)[:, None]
w = w.astype(np.intp)
d[tuple(w)] = np.nan
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter('always', RuntimeWarning)
res = np.nanmedian(d, axis=None, keepdims=True)
assert_equal(res.shape, (1, 1, 1, 1))
res = np.nanmedian(d, axis=(0, 1), keepdims=True)
assert_equal(res.shape, (1, 1, 7, 11))
res = np.nanmedian(d, axis=(0, 3), keepdims=True)
assert_equal(res.shape, (1, 5, 7, 1))
res = np.nanmedian(d, axis=(1,), keepdims=True)
assert_equal(res.shape, (3, 1, 7, 11))
res = np.nanmedian(d, axis=(0, 1, 2, 3), keepdims=True)
assert_equal(res.shape, (1, 1, 1, 1))
res = np.nanmedian(d, axis=(0, 1, 3), keepdims=True)
assert_equal(res.shape, (1, 1, 7, 1))
def test_small_large(self):
# test the small and large code paths, current cutoff 400 elements
for s in [5, 20, 51, 200, 1000]:
d = np.random.randn(4, s)
# Randomly set some elements to NaN:
w = np.random.randint(0, d.size, size=d.size // 5)
d.ravel()[w] = np.nan
d[:,0] = 1. # ensure at least one good value
# use normal median without nans to compare
tgt = []
for x in d:
nonan = np.compress(~np.isnan(x), x)
tgt.append(np.median(nonan, overwrite_input=True))
assert_array_equal(np.nanmedian(d, axis=-1), tgt)
def test_result_values(self):
tgt = [np.median(d) for d in _rdat]
res = np.nanmedian(_ndat, axis=1)
assert_almost_equal(res, tgt)
def test_empty(self):
mat = np.zeros((0, 3))
for axis in [0, None]:
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter('always')
assert_(np.isnan(np.nanmedian(mat, axis=axis)).all())
assert_(len(w) == 1)
assert_(issubclass(w[0].category, RuntimeWarning))
for axis in [1]:
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter('always')
assert_equal(np.nanmedian(mat, axis=axis), np.zeros([]))
assert_(len(w) == 0)
def test_scalar(self):
assert_(np.nanmedian(0.) == 0.)
def test_float_special(self):
with warnings.catch_warnings(record=True):
warnings.simplefilter('ignore', RuntimeWarning)
a = np.array([[np.inf, np.nan], [np.nan, np.nan]])
assert_equal(np.nanmedian(a, axis=0), [np.inf, np.nan])
assert_equal(np.nanmedian(a, axis=1), [np.inf, np.nan])
assert_equal(np.nanmedian(a), np.inf)
# minimum fill value check
a = np.array([[np.nan, np.nan, np.inf], [np.nan, np.nan, np.inf]])
assert_equal(np.nanmedian(a, axis=1), np.inf)
# no mask path
a = np.array([[np.inf, np.inf], [np.inf, np.inf]])
assert_equal(np.nanmedian(a, axis=1), np.inf)
def colbkg(self):
'''
'''
# Flux in background pixels
bkg = np.zeros(self.nx)
for col in range(self.nx):
b = np.where(self.aperture[:,col] == 0)
bkg[col] = np.nanmedian(self.images[self.cadence][b,col])
return 100 * (bkg / np.mean(bkg) - 1.)
def lcbkg(self):
'''
'''
binds = np.where(self.aperture ^ 1)
bkg = np.nanmedian(np.array([f[binds] for f in self.images], dtype='float64'), axis = 1)
return bkg.reshape(-1, 1)
def _nanmedian_small(a, axis=None, out=None, overwrite_input=False):
"""
sort + indexing median, faster for small medians along multiple
dimensions due to the high overhead of apply_along_axis
see nanmedian for parameter usage
"""
a = np.ma.masked_array(a, np.isnan(a))
m = np.ma.median(a, axis=axis, overwrite_input=overwrite_input)
for i in range(np.count_nonzero(m.mask.ravel())):
warnings.warn("All-NaN slice encountered", RuntimeWarning)
if out is not None:
out[...] = m.filled(np.nan)
return out
return m.filled(np.nan)
def test_mutation(self):
# Check that passed array is not modified.
ndat = _ndat.copy()
np.nanmedian(ndat)
assert_equal(ndat, _ndat)
def test_keepdims(self):
mat = np.eye(3)
for axis in [None, 0, 1]:
tgt = np.median(mat, axis=axis, out=None, overwrite_input=False)
res = np.nanmedian(mat, axis=axis, out=None, overwrite_input=False)
assert_(res.ndim == tgt.ndim)
d = np.ones((3, 5, 7, 11))
# Randomly set some elements to NaN:
w = np.random.random((4, 200)) * np.array(d.shape)[:, None]
w = w.astype(np.intp)
d[tuple(w)] = np.nan
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter('always', RuntimeWarning)
res = np.nanmedian(d, axis=None, keepdims=True)
assert_equal(res.shape, (1, 1, 1, 1))
res = np.nanmedian(d, axis=(0, 1), keepdims=True)
assert_equal(res.shape, (1, 1, 7, 11))
res = np.nanmedian(d, axis=(0, 3), keepdims=True)
assert_equal(res.shape, (1, 5, 7, 1))
res = np.nanmedian(d, axis=(1,), keepdims=True)
assert_equal(res.shape, (3, 1, 7, 11))
res = np.nanmedian(d, axis=(0, 1, 2, 3), keepdims=True)
assert_equal(res.shape, (1, 1, 1, 1))
res = np.nanmedian(d, axis=(0, 1, 3), keepdims=True)
assert_equal(res.shape, (1, 1, 7, 1))
def test_small_large(self):
# test the small and large code paths, current cutoff 400 elements
for s in [5, 20, 51, 200, 1000]:
d = np.random.randn(4, s)
# Randomly set some elements to NaN:
w = np.random.randint(0, d.size, size=d.size // 5)
d.ravel()[w] = np.nan
d[:,0] = 1. # ensure at least one good value
# use normal median without nans to compare
tgt = []
for x in d:
nonan = np.compress(~np.isnan(x), x)
tgt.append(np.median(nonan, overwrite_input=True))
assert_array_equal(np.nanmedian(d, axis=-1), tgt)
def test_result_values(self):
tgt = [np.median(d) for d in _rdat]
res = np.nanmedian(_ndat, axis=1)
assert_almost_equal(res, tgt)
def test_empty(self):
mat = np.zeros((0, 3))
for axis in [0, None]:
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter('always')
assert_(np.isnan(np.nanmedian(mat, axis=axis)).all())
assert_(len(w) == 1)
assert_(issubclass(w[0].category, RuntimeWarning))
for axis in [1]:
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter('always')
assert_equal(np.nanmedian(mat, axis=axis), np.zeros([]))
assert_(len(w) == 0)
def test_scalar(self):
assert_(np.nanmedian(0.) == 0.)
def test_float_special(self):
with warnings.catch_warnings(record=True):
warnings.simplefilter('ignore', RuntimeWarning)
a = np.array([[np.inf, np.nan], [np.nan, np.nan]])
assert_equal(np.nanmedian(a, axis=0), [np.inf, np.nan])
assert_equal(np.nanmedian(a, axis=1), [np.inf, np.nan])
assert_equal(np.nanmedian(a), np.inf)
# minimum fill value check
a = np.array([[np.nan, np.nan, np.inf], [np.nan, np.nan, np.inf]])
assert_equal(np.nanmedian(a, axis=1), np.inf)
# no mask path
a = np.array([[np.inf, np.inf], [np.inf, np.inf]])
assert_equal(np.nanmedian(a, axis=1), np.inf)
def zero_center_normalize(df, samples, logInput=False, method='median'):
'''
Transforming input peptide abundance table into log2-scale and centralize to zero.
Inputs:
df : dataframe of peptide abundaces
samples: column names of selected samples
logInput: input abundances are already in log scale
method: method for estimating zero point
'''
assert method in ('median', 'average', 'GMM'), \
'Zero centering method has to be among median, average or GMM!'
if not logInput:
# convert abundances to log2 scale
df[samples] = df[samples].apply(np.log2)
if method == 'average':
norm_scale = np.nanmean(df[samples], axis=0)
elif method == 'median':
norm_scale = np.nanmedian(df[samples], axis=0)
elif method == 'GMM':
''' two-component Gaussian mixture model '''
from sklearn.mixture import GMM
gmm = GMM(2)
norm_scale = []
for sp in samples:
v = df[sp].values
v = v[np.logical_not(np.isnan(v))]
v = v[np.logical_not(np.isinf(v))]
try:
gmm.fit(np.matrix(v.values).T)
vmean = gmm.means_[np.argmin(gmm.covars_)]
norm_scale.append(vmean)
except:
norm_scale.append(np.nanmean(v))
norm_scale = np.array(norm_scale)
df[samples] = df[samples] - norm_scale
return df
def __get_pd_median(data, c=1.):
"""Get the median and the mad of data
Args:
data (numpy.ndarray): the data
Returns:
float, float: the median and the mad
"""
p = np.nanmedian(data)
d = np.nanmedian(np.abs(data - p)) / c # d is the MAD
return p, d
def calculate_residual_distributions(self):
"""
This function ...
:return:
"""
# Inform the user
log.info("Calculating distributions of residual pixel values ...")
# Loop over the different colours
for colour_name in self.observed_colours:
# Debugging
log.debug("Calculating the distribution for the pixels of the " + colour_name + " residual map ...")
# Get an 1D array of the valid pixel values
pixel_values = None
# Create the distribution
distribution = Distribution.from_values(pixel_values)
# Debugging
#log.debug("Median " + colour_name + " residual: " + str(np.nanmedian(np.abs(residual))))
#log.debug("Standard deviation of " + colour_name + " residual: " + str(np.nanstd(residual)))
# Add the distribution to the dictionary
self.residual_distributions[colour_name] = distribution
# -----------------------------------------------------------------