def change_grid_location_u_to_t(self,scalararray,conserving='x_flux'):
"""Return a xarray corresponding to scalararray averaged at a new
grid location.
Parameters
----------
scalararray : xarray.DataArray
original array to be relocated
conserving : str
any of 'area', 'x_flux' or 'y_flux'.
- 'area' : conserves the area
- 'x_flux' : conserves the flux in x-direction (eastward)
- 'y_flux' : conserves the flux in y-direction (northward)
"""
check_input_array(scalararray,\
chunks=self.chunks,grid_location='u',ndims=self.ndims)
wi, wo = self._weights_for_change_grid_location(input='u',output='t',
conserving=conserving)
out = self._to_western_grid_location(scalararray,weights_in=wi,
weights_out=wo)
return _append_dataarray_extra_attrs(out,grid_location='t')
python类DataArray()的实例源码
def change_grid_location_t_to_v(self,scalararray,conserving='area'):
"""Return a xarray corresponding to scalararray averaged at a new
grid location.
Parameters
----------
scalararray : xarray.DataArray
original array to be relocated
conserving : str
any of 'area', 'x_flux' or 'y_flux'.
- 'area' : conserves the area
- 'x_flux' : conserves the flux in x-direction (eastward)
- 'y_flux' : conserves the flux in y-direction (northward)
"""
check_input_array(scalararray,\
chunks=self.chunks,grid_location='t',ndims=self.ndims)
wi, wo = self._weights_for_change_grid_location(input='t',output='v',
conserving=conserving)
out = self._to_northern_grid_location(scalararray,weights_in=wi,
weights_out=wo)
return _append_dataarray_extra_attrs(out,grid_location='v')
def change_grid_location_v_to_t(self,scalararray,conserving='y_flux'):
"""Return a xarray corresponding to scalararray averaged at a new
grid location.
Parameters
----------
scalararray : xarray.DataArray
original array to be relocated
conserving : str
any of 'area', 'x_flux' or 'y_flux'.
- 'area' : conserves the area
- 'x_flux' : conserves the flux in x-direction (eastward)
- 'y_flux' : conserves the flux in y-direction (northward)
"""
check_input_array(scalararray,\
chunks=self.chunks,grid_location='v',ndims=self.ndims)
wi, wo = self._weights_for_change_grid_location(input='v',output='t',
conserving=conserving)
out = self._to_southern_grid_location(scalararray,weights_in=wi,
weights_out=wo)
return _append_dataarray_extra_attrs(out,grid_location='t')
def change_grid_location_f_to_u(self,scalararray,conserving='x_flux'):
"""Return a xarray corresponding to scalararray averaged at a new
grid location.
Parameters
----------
scalararray : xarray.DataArray
original array to be relocated
conserving : str
any of 'area', 'x_flux' or 'y_flux'.
- 'area' : conserves the area
- 'x_flux' : conserves the flux in x-direction (eastward)
- 'y_flux' : conserves the flux in y-direction (northward)
"""
check_input_array(scalararray,\
chunks=self.chunks,grid_location='f',ndims=self.ndims)
wi, wo = self._weights_for_change_grid_location(input='f',output='u',
conserving=conserving)
out = self._to_southern_grid_location(scalararray,weights_in=wi,
weights_out=wo)
return _append_dataarray_extra_attrs(out,grid_location='u')
def change_grid_location_f_to_v(self,scalararray,conserving='y_flux'):
"""Return a xarray corresponding to scalararray averaged at a new
grid location.
Parameters
----------
scalararray : xarray.DataArray
original array to be relocated
conserving : str
any of 'area', 'x_flux' or 'y_flux'.
- 'area' : conserves the area
- 'x_flux' : conserves the flux in x-direction (eastward)
- 'y_flux' : conserves the flux in y-direction (northward)
"""
check_input_array(scalararray,\
chunks=self.chunks,grid_location='f',ndims=self.ndims)
wi, wo = self._weights_for_change_grid_location(input='f',output='v',
conserving=conserving)
out = self._to_southern_grid_location(scalararray,weights_in=wi,
weights_out=wo)
return _append_dataarray_extra_attrs(out,grid_location='v')
def change_grid_location_u_to_v(self,scalararray,conserving='area'):
"""Return a xarray corresponding to scalararray averaged at a new
grid location.
Parameters
----------
scalararray : xarray.DataArray
original array to be relocated
conserving : str
any of 'area', 'x_flux' or 'y_flux'.
- 'area' : conserves the area
- 'x_flux' : conserves the flux in x-direction (eastward)
- 'y_flux' : conserves the flux in y-direction (northward)
"""
# first move to t-point
newarr = self.change_grid_location_u_to_t(scalararray,
conserving=conserving)
# then move t v-point
out = self.change_grid_location_t_to_v(newarr,
conserving=conserving)
return out
#---------------------------- Vector Operators ---------------------------------
def norm_of_vectorfield(self,vectorfield):
"""Return the norm of a vector field, at t-point.
So far, only available for vector fields at u,v grid_location
Parameters
----------
vectorfield : VectorField2d namedtuple
so far only valid for vectorfield at u,v-points
Return
------
scalararray : xarray.DataArray
xarray with a specified grid_location, so far t-point only
"""
return xu.sqrt(self.scalar_product(vectorfield,vectorfield))
def boundary_weights(self, mode='reflect', drop_dims=None):
"""
Compute the boundary weights
Parameters
----------
mode:
drop_dims:
Specify dimensions along which the mask is constant
Returns
-------
"""
mask = self.obj.notnull()
new_dims = copy.copy(self.obj.dims)
new_coords = copy.copy(self.coords)
for dim in drop_dims:
#TODO: Make the function work
mask = mask.isel({dim:0})
del(new_dims[dim])
del(new_coords[dim])
weights = im.convolve(mask.astype(float), self.coefficients, mode=mode)
res = xr.DataArray(weights, dims=new_dims, coords=new_coords, name='boundary weights')
return res.where(mask == 1)
def _create_xarray_2D(mask, lon_or_obj, lat, lon_name, lat_name):
"""create an xarray DataArray for 2D fields"""
lon2D, lat2D = _extract_lon_lat(lon_or_obj, lat, lon_name, lat_name)
if isinstance(lon2D, xr.DataArray):
dim1D_names = lon2D.dims
dim1D_0 = lon2D[dim1D_names[0]]
dim1D_1 = lon2D[dim1D_names[1]]
else:
dim1D_names = (lon_name + '_idx', lat_name + '_idx')
dim1D_0 = np.arange(np.array(lon2D).shape[0])
dim1D_1 = np.arange(np.array(lon2D).shape[1])
# dict with the coordinates
coords = {dim1D_names[0]: dim1D_0,
dim1D_names[1]: dim1D_1,
lat_name: (dim1D_names, lat2D),
lon_name: (dim1D_names, lon2D)}
mask = xr.DataArray(mask, coords = coords, dims=dim1D_names)
return mask
def test__mask_xarray_in_out_2D():
# create xarray DataArray with 2D dims
coords = {'lat_1D': [1, 2],
'lon_1D': [1, 2],
'lat_2D': (('lat_1D', 'lon_1D'), lat_2D),
'lon_2D': (('lat_1D', 'lon_1D'), lon_2D)}
d = np.random.rand(2, 2)
data = xr.DataArray(d, coords = coords, dims=('lat_1D', 'lon_1D'))
expected = expected_mask()
result = r1.mask(data, lon_name='lon_2D', lat_name='lat_2D')
assert isinstance(result, xr.DataArray)
assert np.allclose(result, expected, equal_nan=True)
assert np.allclose(result.lat_2D, lat_2D)
assert np.allclose(result.lon_2D, lon_2D)
assert np.allclose(result.lat_1D, [1, 2])
assert np.allclose(result.lon_1D, [1, 2])
def ts_probs(dset, bins=None, axis=0, dim=None, layer=None,
log_counts=False, log_probs=False, names=None,
keep_attrs=True, chunks=None):
'''Fixed or unevenly spaced histogram binning for
the time dimension of a 3-D cube DataArray in X
Parameters:
dset: MLDataset
axis: Integer like 0, 1, 2 to indicate which is the time axis of cube
layer: The name of the DataArray in MLDataset to run scipy.describe on
bins: Passed to np.histogram
log_probs: Return probabilities associated with log counts? True / False
'''
layer = _validate_layer(dset, layer)
def each_arr(arr, layer):
return resize_each_1d_slice(arr, _hist_1d, bins=bins,
axis=axis, dim=dim, layer=layer,
log_counts=log_counts,
log_probs=log_probs,
names=names,
keep_attrs=keep_attrs,
chunks=chunks)
return concat_ml_features(*(each_arr(dset[_], layer) for _ in layer))
def _apply_window(da, dims, window_type='hanning'):
"""Creating windows in dimensions dims."""
if window_type not in ['hanning']:
raise NotImplementedError("Only hanning window is supported for now.")
numpy_win_func = getattr(np, window_type)
if da.chunks:
def dask_win_func(n):
return dsar.from_delayed(
delayed(numpy_win_func, pure=True)(n),
(n,), float)
win_func = dask_win_func
else:
win_func = numpy_win_func
windows = [xr.DataArray(win_func(len(da[d])),
dims=da[d].dims, coords=da[d].coords) for d in dims]
return da * reduce(operator.mul, windows[::-1])
def test_dft_4d():
"""Test the discrete Fourier transform on 2D data"""
N = 16
da = xr.DataArray(np.random.rand(N,N,N,N),
dims=['time','z','y','x'],
coords={'time':range(N),'z':range(N),
'y':range(N),'x':range(N)}
)
ft = xrft.dft(da, shift=False)
npt.assert_almost_equal(ft.values, np.fft.fftn(da.values))
with pytest.raises(NotImplementedError):
xrft.dft(da, detrend='linear')
with pytest.raises(NotImplementedError):
xrft.dft(da, dim=['time','y','x'], detrend='linear')
da_prime = xrft.detrendn(da[:,0].values, [0,1,2]) # cubic detrend over time, y, and x
npt.assert_almost_equal(xrft.dft(da[:,0].drop('z'),
dim=['time','y','x'],
shift=False, detrend='linear'
).values,
np.fft.fftn(da_prime))
def test_dft_3d_dask():
"""Test the discrete Fourier transform on 3D dask array data"""
N=16
da = xr.DataArray(np.random.rand(N,N,N), dims=['time','x','y'],
coords={'time':range(N),'x':range(N),
'y':range(N)}
)
daft = xrft.dft(da.chunk({'time': 1}), dim=['x','y'], shift=False)
# assert hasattr(daft.data, 'dask')
npt.assert_almost_equal(daft.values,
np.fft.fftn(da.chunk({'time': 1}).values, axes=[1,2])
)
with pytest.raises(ValueError):
xrft.dft(da.chunk({'time': 1, 'x': 1}), dim=['x'])
daft = xrft.dft(da.chunk({'x': 1}), dim=['time'],
shift=False, detrend='linear')
# assert hasattr(daft.data, 'dask')
da_prime = sps.detrend(da.chunk({'x': 1}), axis=0)
npt.assert_almost_equal(daft.values,
np.fft.fftn(da_prime, axes=[0])
)
def test_power_spectrum_dask():
"""Test the power spectrum function on dask data"""
N = 16
dim = ['x','y']
da = xr.DataArray(np.random.rand(2,N,N), dims=['time','x','y'],
coords={'time':range(2),'x':range(N),
'y':range(N)}).chunk({'time': 1}
)
ps = xrft.power_spectrum(da, dim=dim, density=False)
daft = xrft.dft(da, dim=['x','y'])
npt.assert_almost_equal(ps.values, (daft * np.conj(daft)).real.values)
ps = xrft.power_spectrum(da, dim=dim, window=True, detrend='constant')
daft = xrft.dft(da, dim=dim, window=True, detrend='constant')
coord = list(daft.coords)
test = (daft * np.conj(daft)).real/N**4
for i in dim:
test /= daft['freq_' + i + '_spacing']
npt.assert_almost_equal(ps.values, test)
npt.assert_almost_equal(np.ma.masked_invalid(ps).mask.sum(), 0.)
def test_cross_spectrum():
"""Test the cross spectrum function"""
N = 16
da = xr.DataArray(np.random.rand(N,N), dims=['x','y'],
coords={'x':range(N),'y':range(N)}
)
da2 = xr.DataArray(np.random.rand(N,N), dims=['x','y'],
coords={'x':range(N),'y':range(N)}
)
cs = xrft.cross_spectrum(da, da2, window=True, density=False,
detrend='constant')
daft = xrft.dft(da,
dim=None, shift=True, detrend='constant',
window=True)
daft2 = xrft.dft(da2,
dim=None, shift=True, detrend='constant',
window=True)
npt.assert_almost_equal(cs.values, np.real(daft*np.conj(daft2)))
npt.assert_almost_equal(np.ma.masked_invalid(cs).mask.sum(), 0.)
def test_spacing_tol(test_data_1d):
da = test_data_1d
da2 = da.copy().load()
# Create improperly spaced data
Nx = 16
Lx = 1.0
x = np.linspace(0, Lx, Nx)
x[-1] = x[-1] + .001
da3 = xr.DataArray(np.random.rand(Nx), coords=[x], dims=['x'])
# This shouldn't raise an error
xrft.dft(da3, spacing_tol=1e-1)
# But this should
with pytest.raises(ValueError):
xrft.dft(da3, spacing_tol=1e-4)
def _master_dataarray(exp, data_dict):
case_list = [exp._case_data[case] for case in exp.cases]
stacked_data = _stack_dims(data_dict, case_list, {}, exp)
test_case = next(exp.all_cases())
test_da = data_dict[test_case]
name = test_da.name
new_coords = test_da.to_dataset().coords
logger.debug("Creating master dataarray")
for case in exp.cases:
logger.debug(" " + case)
new_coords[case] = exp._case_data[case].vals
new_dims = list(exp.cases) + list(test_da.dims)
new_da = DataArray(stacked_data, coords=new_coords,
dims=new_dims)
new_da = copy_attrs(test_da, new_da)
new_da.name = name
return new_da
def test_xarray_1d(self):
x_np = np.random.randn(10)
x = xr.DataArray(x_np)
dh = IVData(x, 'some_variable')
assert_equal(dh.ndarray, x_np[:, None])
assert dh.rows == list(np.arange(10))
assert dh.cols == ['some_variable.0']
expected = pd.DataFrame(x_np, columns=dh.cols, index=dh.rows)
assert_frame_equal(expected, dh.pandas)
index = pd.date_range('2017-01-01', periods=10)
x = xr.DataArray(x_np,
[('time', index)])
dh = IVData(x, 'some_variable')
assert_equal(dh.ndarray, x_np[:, None])
assert_series_equal(pd.Series(dh.rows), pd.Series(list(index)))
assert dh.cols == ['some_variable.0']
expected = pd.DataFrame(x_np[:, None], columns=dh.cols, index=dh.rows)
assert_frame_equal(expected, dh.pandas)
def test_xarray_2d(self):
x_np = np.random.randn(10, 2)
x = xr.DataArray(x_np)
dh = IVData(x)
assert_equal(dh.ndarray, x_np)
assert dh.rows == list(np.arange(10))
assert dh.cols == ['x.0', 'x.1']
expected = pd.DataFrame(x_np, columns=dh.cols, index=dh.rows)
assert_frame_equal(expected, dh.pandas)
index = pd.date_range('2017-01-01', periods=10)
x = xr.DataArray(x_np,
[('time', index),
('variables', ['apple', 'banana'])])
dh = IVData(x)
assert_equal(dh.ndarray, x_np)
assert_series_equal(pd.Series(dh.rows), pd.Series(list(index)))
assert dh.cols == ['apple', 'banana']
expected = pd.DataFrame(x_np, columns=dh.cols, index=dh.rows)
assert_frame_equal(expected, dh.pandas)