def test_testMixedArithmetic(self):
na = np.array([1])
ma = array([1])
self.assertTrue(isinstance(na + ma, MaskedArray))
self.assertTrue(isinstance(ma + na, MaskedArray))
python类ma()的实例源码
def test_testUfuncRegression(self):
f_invalid_ignore = [
'sqrt', 'arctanh', 'arcsin', 'arccos',
'arccosh', 'arctanh', 'log', 'log10', 'divide',
'true_divide', 'floor_divide', 'remainder', 'fmod']
for f in ['sqrt', 'log', 'log10', 'exp', 'conjugate',
'sin', 'cos', 'tan',
'arcsin', 'arccos', 'arctan',
'sinh', 'cosh', 'tanh',
'arcsinh',
'arccosh',
'arctanh',
'absolute', 'fabs', 'negative',
'floor', 'ceil',
'logical_not',
'add', 'subtract', 'multiply',
'divide', 'true_divide', 'floor_divide',
'remainder', 'fmod', 'hypot', 'arctan2',
'equal', 'not_equal', 'less_equal', 'greater_equal',
'less', 'greater',
'logical_and', 'logical_or', 'logical_xor']:
try:
uf = getattr(umath, f)
except AttributeError:
uf = getattr(fromnumeric, f)
mf = getattr(np.ma, f)
args = self.d[:uf.nin]
with np.errstate():
if f in f_invalid_ignore:
np.seterr(invalid='ignore')
if f in ['arctanh', 'log', 'log10']:
np.seterr(divide='ignore')
ur = uf(*args)
mr = mf(*args)
self.assertTrue(eq(ur.filled(0), mr.filled(0), f))
self.assertTrue(eqmask(ur.mask, mr.mask))
def compare_functions_1v(func, nloop=500,
xs=xs, nmxs=nmxs, xl=xl, nmxl=nmxl):
funcname = func.__name__
print("-"*50)
print("%s on small arrays" % funcname)
module, data = "numpy.ma", "nmxs"
timer("%(module)s.%(funcname)s(%(data)s)" % locals(), v="%11s" % module, nloop=nloop)
print("%s on large arrays" % funcname)
module, data = "numpy.ma", "nmxl"
timer("%(module)s.%(funcname)s(%(data)s)" % locals(), v="%11s" % module, nloop=nloop)
return
def compare_methods(methodname, args, vars='x', nloop=500, test=True,
xs=xs, nmxs=nmxs, xl=xl, nmxl=nmxl):
print("-"*50)
print("%s on small arrays" % methodname)
data, ver = "nm%ss" % vars, 'numpy.ma'
timer("%(data)s.%(methodname)s(%(args)s)" % locals(), v=ver, nloop=nloop)
print("%s on large arrays" % methodname)
data, ver = "nm%sl" % vars, 'numpy.ma'
timer("%(data)s.%(methodname)s(%(args)s)" % locals(), v=ver, nloop=nloop)
return
def test_testMixedArithmetic(self):
na = np.array([1])
ma = array([1])
self.assertTrue(isinstance(na + ma, MaskedArray))
self.assertTrue(isinstance(ma + na, MaskedArray))
def test_testUfuncRegression(self):
f_invalid_ignore = [
'sqrt', 'arctanh', 'arcsin', 'arccos',
'arccosh', 'arctanh', 'log', 'log10', 'divide',
'true_divide', 'floor_divide', 'remainder', 'fmod']
for f in ['sqrt', 'log', 'log10', 'exp', 'conjugate',
'sin', 'cos', 'tan',
'arcsin', 'arccos', 'arctan',
'sinh', 'cosh', 'tanh',
'arcsinh',
'arccosh',
'arctanh',
'absolute', 'fabs', 'negative',
'floor', 'ceil',
'logical_not',
'add', 'subtract', 'multiply',
'divide', 'true_divide', 'floor_divide',
'remainder', 'fmod', 'hypot', 'arctan2',
'equal', 'not_equal', 'less_equal', 'greater_equal',
'less', 'greater',
'logical_and', 'logical_or', 'logical_xor']:
try:
uf = getattr(umath, f)
except AttributeError:
uf = getattr(fromnumeric, f)
mf = getattr(np.ma, f)
args = self.d[:uf.nin]
with np.errstate():
if f in f_invalid_ignore:
np.seterr(invalid='ignore')
if f in ['arctanh', 'log', 'log10']:
np.seterr(divide='ignore')
ur = uf(*args)
mr = mf(*args)
self.assertTrue(eq(ur.filled(0), mr.filled(0), f))
self.assertTrue(eqmask(ur.mask, mr.mask))
def compare_methods(methodname, args, vars='x', nloop=500, test=True,
xs=xs, nmxs=nmxs, xl=xl, nmxl=nmxl):
print("-"*50)
print("%s on small arrays" % methodname)
data, ver = "nm%ss" % vars, 'numpy.ma'
timer("%(data)s.%(methodname)s(%(args)s)" % locals(), v=ver, nloop=nloop)
print("%s on large arrays" % methodname)
data, ver = "nm%sl" % vars, 'numpy.ma'
timer("%(data)s.%(methodname)s(%(args)s)" % locals(), v=ver, nloop=nloop)
return
def compare_functions_2v(func, nloop=500, test=True,
xs=xs, nmxs=nmxs,
ys=ys, nmys=nmys,
xl=xl, nmxl=nmxl,
yl=yl, nmyl=nmyl):
funcname = func.__name__
print("-"*50)
print("%s on small arrays" % funcname)
module, data = "numpy.ma", "nmxs,nmys"
timer("%(module)s.%(funcname)s(%(data)s)" % locals(), v="%11s" % module, nloop=nloop)
print("%s on large arrays" % funcname)
module, data = "numpy.ma", "nmxl,nmyl"
timer("%(module)s.%(funcname)s(%(data)s)" % locals(), v="%11s" % module, nloop=nloop)
return
def test_testMixedArithmetic(self):
na = np.array([1])
ma = array([1])
self.assertTrue(isinstance(na + ma, MaskedArray))
self.assertTrue(isinstance(ma + na, MaskedArray))
def test_testUfuncRegression(self):
f_invalid_ignore = [
'sqrt', 'arctanh', 'arcsin', 'arccos',
'arccosh', 'arctanh', 'log', 'log10', 'divide',
'true_divide', 'floor_divide', 'remainder', 'fmod']
for f in ['sqrt', 'log', 'log10', 'exp', 'conjugate',
'sin', 'cos', 'tan',
'arcsin', 'arccos', 'arctan',
'sinh', 'cosh', 'tanh',
'arcsinh',
'arccosh',
'arctanh',
'absolute', 'fabs', 'negative',
'floor', 'ceil',
'logical_not',
'add', 'subtract', 'multiply',
'divide', 'true_divide', 'floor_divide',
'remainder', 'fmod', 'hypot', 'arctan2',
'equal', 'not_equal', 'less_equal', 'greater_equal',
'less', 'greater',
'logical_and', 'logical_or', 'logical_xor']:
try:
uf = getattr(umath, f)
except AttributeError:
uf = getattr(fromnumeric, f)
mf = getattr(np.ma, f)
args = self.d[:uf.nin]
with np.errstate():
if f in f_invalid_ignore:
np.seterr(invalid='ignore')
if f in ['arctanh', 'log', 'log10']:
np.seterr(divide='ignore')
ur = uf(*args)
mr = mf(*args)
self.assertTrue(eq(ur.filled(0), mr.filled(0), f))
self.assertTrue(eqmask(ur.mask, mr.mask))
def compare_functions_1v(func, nloop=500,
xs=xs, nmxs=nmxs, xl=xl, nmxl=nmxl):
funcname = func.__name__
print("-"*50)
print("%s on small arrays" % funcname)
module, data = "numpy.ma", "nmxs"
timer("%(module)s.%(funcname)s(%(data)s)" % locals(), v="%11s" % module, nloop=nloop)
print("%s on large arrays" % funcname)
module, data = "numpy.ma", "nmxl"
timer("%(module)s.%(funcname)s(%(data)s)" % locals(), v="%11s" % module, nloop=nloop)
return
def compare_methods(methodname, args, vars='x', nloop=500, test=True,
xs=xs, nmxs=nmxs, xl=xl, nmxl=nmxl):
print("-"*50)
print("%s on small arrays" % methodname)
data, ver = "nm%ss" % vars, 'numpy.ma'
timer("%(data)s.%(methodname)s(%(args)s)" % locals(), v=ver, nloop=nloop)
print("%s on large arrays" % methodname)
data, ver = "nm%sl" % vars, 'numpy.ma'
timer("%(data)s.%(methodname)s(%(args)s)" % locals(), v=ver, nloop=nloop)
return
def test_testCopySize(self):
# Tests of some subtle points of copying and sizing.
with suppress_warnings() as sup:
sup.filter(
np.ma.core.MaskedArrayFutureWarning,
"setting an item on a masked array which has a "
"shared mask will not copy")
n = [0, 0, 1, 0, 0]
m = make_mask(n)
m2 = make_mask(m)
self.assertTrue(m is m2)
m3 = make_mask(m, copy=1)
self.assertTrue(m is not m3)
x1 = np.arange(5)
y1 = array(x1, mask=m)
self.assertTrue(y1._data is not x1)
self.assertTrue(allequal(x1, y1._data))
self.assertTrue(y1.mask is m)
y1a = array(y1, copy=0)
self.assertTrue(y1a.mask is y1.mask)
y2 = array(x1, mask=m, copy=0)
self.assertTrue(y2.mask is m)
self.assertTrue(y2[2] is masked)
y2[2] = 9
self.assertTrue(y2[2] is not masked)
self.assertTrue(y2.mask is not m)
self.assertTrue(allequal(y2.mask, 0))
y3 = array(x1 * 1.0, mask=m)
self.assertTrue(filled(y3).dtype is (x1 * 1.0).dtype)
x4 = arange(4)
x4[2] = masked
y4 = resize(x4, (8,))
self.assertTrue(eq(concatenate([x4, x4]), y4))
self.assertTrue(eq(getmask(y4), [0, 0, 1, 0, 0, 0, 1, 0]))
y5 = repeat(x4, (2, 2, 2, 2), axis=0)
self.assertTrue(eq(y5, [0, 0, 1, 1, 2, 2, 3, 3]))
y6 = repeat(x4, 2, axis=0)
self.assertTrue(eq(y5, y6))
def composite(reducers, *rasters, normalize='sum', nodata=-9999.0, dtype=np.float32):
'''
NOTE: Uses masked arrays in NumPy and therefore is MUCH slower than the
`composite2()` function, which is equivalent in output.
Creates a multi-image (multi-date) composite from input rasters. The
reducers argument specifies, in the order of the bands (endmembers), how
to pick a value for that band in each pixel. If None is given, then the
median value of that band from across the images is used for that pixel
value. If None is specified as a reducer, the corresponding band(s) will
be dropped. Combining None reducer(s) with a normalized sum effectively
subtracts an endmember under the unity constraint. Arguments:
reducers One of ('min', 'max', 'mean', 'median', None) for each endmember
rasters One or more raster files to composite
normalize True (by default) to normalize results by their sum
nodata The NoData value (defaults to -9999)
dtype The data type to coerce in the output array; very important if the desired output is float but NoData value is integer
'''
shp = rasters[0].shape
num_non_null_bands = shp[0] - len([b for b in reducers if b is None])
assert all(map(lambda x: x == shp, [r.shape for r in rasters])), 'Rasters must have the same shape'
assert len(reducers) == shp[0], 'Must provide a reducer for each band (including None to drop the band)'
# Swap the sequence of rasters for a sequence of bands, then collapse the X-Y axes
stack = np.array(rasters).swapaxes(0, 1).reshape(shp[0], len(rasters), shp[-1]*shp[-2])
# Mask out NoData values
stack_masked = np.ma.masked_where(stack == nodata, stack)
# For each band (or endmember)...
band_arrays = []
for i in range(shp[0]):
if reducers[i] in ('min', 'max', 'median', 'mean'):
band_arrays.append(getattr(np.ma, reducers[i])(stack_masked[i, ...], axis=0))
# Stack each reduced band (and reshape to multi-band image)
final_stack = np.ma.vstack(band_arrays).reshape((num_non_null_bands, shp[-2], shp[-1]))
# Calculate a normalized sum (e.g., fractions must sum to one)
if normalize is not None:
constant = getattr(final_stack, normalize)(axis=0) # The sum across the bands
constant.set_fill_value(1.0) # NaNs will be divided by 1.0
constant = np.ma.repeat(constant, num_non_null_bands, axis=0).reshape(final_stack.shape)
# Divide the values in each band by the normalized sum across the bands
if num_non_null_bands > 1:
final_stack = final_stack / constant.swapaxes(0, 1)
else:
final_stack = final_stack / constant
# NOTE: Essential to cast type, e.g., to float in case first pixel (i.e. top-left) is all NoData of an integer type
final_stack.set_fill_value(dtype(nodata)) # Fill NoData for NaNs
return final_stack.filled()