def compute(self, data, selected_indexes):
"""Return a variable name and :class:`xarray.Variable` to add in to the """
return
python类Variable()的实例源码
def compute(self, data, selected_indexes):
observed = data.time.values[selected_indexes] - np.datetime64(self._since)
days_since = observed.astype('timedelta64[D]').astype('int16')
return self._var_name, xarray.Variable(('y', 'x'), days_since)
def compute(self, data, selected_indexes):
observed = data.time.values[selected_indexes]
observed_date = xarray.Variable(('y', 'x'), datetime64_to_inttime(observed))
return self._var_name, observed_date
def compute(self, data, selected_indexes):
return self._var_name, xarray.Variable(('y', 'x'), data.source.values[selected_indexes])
def _check_triangular_bounds(self, var, coords=None, axis='x', nans=None):
"""
Checks whether the bounds in the variable attribute are triangular
Parameters
----------
var: xarray.Variable or xarray.DataArray
The variable to check
coords: dict
Coordinates to use. If None, the coordinates of the dataset in the
:attr:`ds` attribute are used.
axis: {'x', 'y'}
The spatial axis to check
nans: {None, 'skip', 'only'}
Determines whether values with nan shall be left (None), skipped
(``'skip'``) or shall be the only one returned (``'only'``)
Returns
-------
bool or None
True, if unstructered, None if it could not be determined
xarray.Coordinate or None
the bounds corrdinate (if existent)"""
coord = self.get_variable_by_axis(var, axis, coords=coords)
if coord is not None:
bounds = coord.attrs.get('bounds')
if bounds is not None:
bounds = self.ds.coords.get(bounds)
if coords is not None:
bounds = bounds.sel(**{
key: coords[key]
for key in set(coords).intersection(bounds.dims)})
if nans == 'skip':
bounds = bounds[~np.isnan(var.values)]
elif nans == 'only':
bounds = bounds[np.isnan(var.values)]
elif nans is None:
pass
else:
raise ValueError(
"`nans` must be either None, 'skip', or 'only'! "
"Not {0}!".format(str(nans)))
if bounds is not None:
return bounds.shape[-1] > 2, bounds
else:
return None, bounds
return None, None
def quantile_mapping(input_data, data_to_match, mask=None,
alpha=0.4, beta=0.4):
'''quantile mapping'''
assert input_data.get_axis_num('time') == 0
assert data_to_match.get_axis_num('time') == 0
assert input_data.shape[1:] == data_to_match.shape[1:]
# Make mask if mask is one was not provided
if mask is None:
d0 = input_data.isel(time=0, drop=True)
mask = xr.Variable(d0.dims, ~da.isnull(d0))
# quantiles for the input data
n = len(input_data['time'])
x1 = (np.arange(1, n + 1) - alpha) / (n + 1. - alpha - beta)
# quantiles for the obs
n = len(data_to_match['time'])
x0 = (np.arange(1, n + 1) - alpha) / (n + 1. - alpha - beta)
def qmap(data, like, mask):
# Use numpy to sort these arrays before we loop through each variable
sort_inds_all = np.argsort(data, axis=0)
sorted_all = np.sort(like, axis=0)
ii, jj = mask.nonzero()
new = np.full_like(data, np.nan)
for i, j in zip(ii, jj):
# Sorted Observations
y0 = sorted_all[:, i, j]
# Indicies that would sort the input data
sort_inds = sort_inds_all[:, i, j]
new[sort_inds, i, j] = np.interp(x1, x0, y0) # TODO: handle edges
return new
if isinstance(input_data.data, da.Array):
# dask arrays
new = da.map_blocks(qmap, input_data.data, data_to_match.data,
mask.data, chunks=input_data.data.chunks,
name='qmap')
else:
# numpy arrays
new = qmap(input_data.data, data_to_match.data, mask.data)
return xr.DataArray(new, dims=input_data.dims, coords=input_data.coords,
attrs=input_data.attrs, name=input_data.name)