def test_snapshot_vars(self, model):
ds = xr.Dataset()
ds['clock'] = ('clock', [0, 2, 4, 6, 8],
{self._clock_key: 1, self._master_clock_key: 1})
ds['snap_clock'] = ('snap_clock', [0, 4, 8], {self._clock_key: 1})
# snapshot clock with no snapshot variable (attribute) set
ds['snap_clock2'] = ('snap_clock2', [0, 8], {self._clock_key: 1})
ds.xsimlab._set_snapshot_vars(model, None, grid='x')
ds.xsimlab._set_snapshot_vars(model, 'clock', quantity='quantity')
ds.xsimlab._set_snapshot_vars(model, 'snap_clock',
other_process=('other_effect', 'x2'))
expected = {None: set([('grid', 'x')]),
'clock': set([('quantity', 'quantity')]),
'snap_clock': set([('other_process', 'other_effect'),
('other_process', 'x2')])}
actual = {k: set(v) for k, v in ds.xsimlab.snapshot_vars.items()}
assert actual == expected
python类Dataset()的实例源码
def input_dataset():
clock_key = SimlabAccessor._clock_key
mclock_key = SimlabAccessor._master_clock_key
svars_key = SimlabAccessor._snapshot_vars_key
ds = xr.Dataset()
ds['clock'] = ('clock', [0, 2, 4, 6, 8],
{clock_key: np.uint8(True), mclock_key: np.uint8(True)})
ds['out'] = ('out', [0, 4, 8], {clock_key: np.uint8(True)})
ds['grid__x_size'] = ((), 10, {'description': 'grid size'})
ds['quantity__quantity'] = ('x', np.zeros(10),
{'description': 'a quantity'})
ds['some_process__some_param'] = ((), 1, {'description': 'some parameter'})
ds['other_process__other_param'] = ('clock', [1, 2, 3, 4, 5],
{'description': 'other parameter'})
ds['clock'].attrs[svars_key] = 'quantity__quantity'
ds['out'].attrs[svars_key] = ('other_process__other_effect,'
'some_process__some_effect')
ds.attrs[svars_key] = 'grid__x'
return ds
def master_clock_dim(self):
"""Dimension used as master clock for model runs. Returns None
if no dimension is set as master clock.
See Also
--------
:meth:`Dataset.xsimlab.update_clocks`
"""
if self._master_clock_dim is not None:
return self._master_clock_dim
else:
for c in self._obj.coords.values():
if c.attrs.get(self._master_clock_key, False):
dim = c.dims[0]
self._master_clock_dim = dim
return dim
return None
def can_decode(cls, ds, var):
"""
Class method to determine whether the object can be decoded by this
decoder class.
Parameters
----------
ds: xarray.Dataset
The dataset that contains the given `var`
var: xarray.Variable or xarray.DataArray
The array to decode
Returns
-------
bool
True if the decoder can decode the given array `var`. Otherwise
False
Notes
-----
The default implementation returns True for any argument. Subclass this
method to be specific on what type of data your decoder can decode
"""
return True
def decode_ds(cls, ds, *args, **kwargs):
"""
Static method to decode coordinates and time informations
This method interpretes absolute time informations (stored with units
``'day as %Y%m%d.%f'``) and coordinates
Parameters
----------
%(CFDecoder._decode_ds.parameters)s
Returns
-------
xarray.Dataset
The decoded dataset"""
for decoder_cls in cls._registry + [CFDecoder]:
ds = decoder_cls._decode_ds(ds, *args, **kwargs)
return ds
def init_accessor(self, base=None, idims=None, decoder=None,
*args, **kwargs):
"""
Initialize the accessor instance
This method initializes the accessor
Parameters
----------
base: xr.Dataset
The base dataset for the data
idims: dict
A mapping from dimension name to indices. If not provided, it is
calculated when the :attr:`idims` attribute is accessed
decoder: CFDecoder
The decoder of this object
%(InteractiveBase.parameters)s
"""
if base is not None:
self.base = base
self.idims = idims
if decoder is not None:
self.decoder = decoder
super(InteractiveArray, self).__init__(*args, **kwargs)
def test_from_dataset_11_list(self):
"""Test the creation of a list of InteractiveLists"""
variables, coords = self._from_dataset_test_variables
ds = xr.Dataset(variables, coords)
# Create two lists, each containing two arrays of variables v1 and v2.
# In the first list, the xdim dimensions are 0 and 1.
# In the second, the xdim dimensions are both 2
l = self.list_class.from_dataset(
ds, name=[['v1', 'v2']], xdim=[[0, 1], 2], prefer_list=True)
self.assertEqual(len(l), 2)
self.assertIsInstance(l[0], psyd.InteractiveList)
self.assertIsInstance(l[1], psyd.InteractiveList)
self.assertEqual(len(l[0]), 2)
self.assertEqual(len(l[1]), 2)
self.assertEqual(l[0][0].xdim, 0)
self.assertEqual(l[0][1].xdim, 1)
self.assertEqual(l[1][0].xdim, 2)
self.assertEqual(l[1][1].xdim, 2)
def test_to_dataframe(self):
variables, coords = self._from_dataset_test_variables
variables['v1'][:] = np.arange(variables['v1'].size).reshape(
variables['v1'].shape)
ds = xr.Dataset(variables, coords)
l = psyd.InteractiveList.from_dataset(ds, name='v1', t=[0, 1])
l.extend(psyd.InteractiveList.from_dataset(ds, name='v1', t=2,
x=slice(1, 3)),
new_name=True)
self.assertEqual(len(l), 3)
self.assertTrue(all(arr.ndim == 1 for arr in l), msg=l)
df = l.to_dataframe()
self.assertEqual(df.shape, (ds.xdim.size, 3))
self.assertEqual(df.index.values.tolist(), ds.xdim.values.tolist())
self.assertEqual(df[l[0].psy.arr_name].values.tolist(),
ds.v1[0].values.tolist())
self.assertEqual(df[l[1].psy.arr_name].values.tolist(),
ds.v1[1].values.tolist())
self.assertEqual(df[l[2].psy.arr_name].notnull().sum(), 2)
self.assertEqual(
df[l[2].psy.arr_name].values[
df[l[2].psy.arr_name].notnull().values].tolist(),
ds.v1[2, 1:3].values.tolist())
def to_xarray(self):
"""Convert to xarray.Dataset
Returns
-------
xarray.Dataset
"""
import xarray as xr
data_vars = {
"frequencies": xr.DataArray(self.frequencies, dims="bin"),
"errors2": xr.DataArray(self.errors2, dims="bin"),
"bins": xr.DataArray(self.bins, dims=("bin", "x01"))
}
coords = {}
attrs = {
"underflow": self.underflow,
"overflow": self.overflow,
"inner_missed": self.inner_missed,
"keep_missed": self.keep_missed
}
attrs.update(self._meta_data)
# TODO: Add stats
return xr.Dataset(data_vars, coords, attrs)
def from_xarray(cls, arr):
"""Convert form xarray.Dataset
Parameters
----------
arr: xarray.Dataset
The data in xarray representation
"""
kwargs = {'frequencies': arr["frequencies"],
'binning': arr["bins"],
'errors2': arr["errors2"],
'overflow': arr.attrs["overflow"],
'underflow': arr.attrs["underflow"],
'keep_missed': arr.attrs["keep_missed"]}
# TODO: Add stats
return cls(**kwargs)
def second_layer_input_matrix(X, models):
'''Build a second layer model input matrix by taking the
metadata from X given to the first layer models and forming
a new matrix from the 1-D predictions of the first layer models
'''
preds = predict_many(dict(X=X), to_raster=False,
ensemble=models)
example = preds[0].flat
input_matrix = np.empty((example.shape[0], len(preds)))
for j, pred in enumerate(preds):
input_matrix[:, j] = pred.flat.values[:, 0]
attrs = X.attrs.copy()
attrs['old_dims'] = [X[SOIL_MOISTURE].dims] * len(preds)
attrs['canvas'] = X[SOIL_MOISTURE].canvas
tags = [tag for tag, _ in models]
arr = xr.DataArray(input_matrix,
coords=[('space', example.space),
('band', tags)],
dims=('space', 'band'),
attrs=attrs)
return xr.Dataset(dict(flat=arr), attrs=attrs)
def _as_numpy_arrs(self, X, y=None, **kw):
'''Convert X, y for a scikit-learn method numpy.ndarrays
'''
if isinstance(X, np.ndarray):
return X, y, None
if isinstance(X, xr.Dataset):
X = MLDataset(X)
if hasattr(X, 'has_features'):
if X.has_features(raise_err=False):
pass
else:
X = X.to_features()
row_idx = get_row_index(X)
if hasattr(X, 'to_array') and not isinstance(X, np.ndarray):
X, y = X.to_array(y=y)
# TODO what about row_idx now?
# TODO - if y is not numpy array, then the above lines are needed for y
return X, y, row_idx
def test_ea_search_sklearn_elm_steps(label, do_predict):
'''Test that EaSearchCV can work with numpy, dask.array,
pandas.DataFrame, xarray.Dataset, xarray_filters.MLDataset
'''
from scipy.stats import lognorm
est, make_data, sel, kw = args[label]
parameters = {'kernel': ['linear', 'rbf'],
'C': lognorm(4),}
if isinstance(est, (sk_Pipeline, Pipeline)):
parameters = {'est__{}'.format(k): v
for k, v in parameters.items()}
ea = EaSearchCV(est, parameters,
n_iter=4,
ngen=2,
model_selection=sel,
model_selection_kwargs=kw)
X, y = make_data()
ea.fit(X, y)
if do_predict:
pred = ea.predict(X)
assert isinstance(pred, type(y))
def import_from_netcdf(network, path, skip_time=False):
"""
Import network data from netCDF file or xarray Dataset at `path`.
Parameters
----------
path : string|xr.Dataset
Path to netCDF dataset or instance of xarray Dataset
skip_time : bool, default False
Skip reading in time dependent attributes
"""
assert has_xarray, "xarray must be installed for netCDF support."
basename = os.path.basename(path) if isinstance(path, string_types) else None
with ImporterNetCDF(path=path) as importer:
_import_from_importer(network, importer, basename=basename,
skip_time=skip_time)
def __init__(self,
instance: int,
data: xr.Dataset,
mutable: bool = False):
"""
Create a new _Instance view representing the specified instance of the specified xarray data set.
Parameters
----------
instance: int
The index of the instance in the specified xarray data set
data: xarray.Dataset
The xarray data set containing the instance
mutable: bool, optional
If True, attributes of this instance may be modified. If False (default), any attempt to modify the instance
will result in an AttributeError
"""
self._instance = instance
self._data = data
self._mutable = mutable
def __init__(self,
data: xr.Dataset,
mutable: bool = False):
"""
Create and initialize a new DataSet with the specified parameters.
There should be no reason to invoke this constructor directly. Instead, the utility methods for loading a data
set from a file, or for creating an empty data set should be used.
Parameters
----------
data: xarray.Dataset
The xarray data set storing the actual data
mutable: bool
True, if modifications to the data set should be allowed, False otherwise
"""
super().__init__()
self._data = data
self._mutable = mutable
def load_netcdf_meta(datafile):
'''
Loads metadata for NetCDF
Parameters:
:datafile: str: Path on disk to NetCDF file
Returns:
:meta: Dictionary of metadata
'''
ras = nc.Dataset(datafile)
attrs = _get_nc_attrs(ras)
sds = _get_subdatasets(ras)
meta = {'meta': attrs,
'layer_meta': sds,
'name': datafile,
'variables': list(ras.variables.keys()),
}
return meta_strings_to_dict(meta)
def read_met_data(params: dict, domain: xr.Dataset) -> xr.Dataset:
"""
Read input meteorological forcings for MetSim.
This method supports ascii, binary, netcdf, and
xarray input pointers. The input source is derived
from the key 'forcing' in the params dictionary.
The format of the data is derived from 'in_format'
key in the parameter dictionary.
"""
process_funcs = {
"netcdf": process_nc,
"binary": process_vic,
"ascii": process_vic,
"data": process_nc
}
return process_funcs[params['forcing_fmt']](params, domain)
def read_netcdf(data_handle, domain=None, iter_dims=['lat', 'lon'],
start=None, stop=None, calendar='standard',
var_dict=None) -> xr.Dataset:
"""Read in a NetCDF file"""
ds = xr.open_dataset(data_handle)
if var_dict is not None:
ds.rename(var_dict, inplace=True)
if start is not None and stop is not None:
ds = ds.sel(time=slice(start, stop))
dates = ds.indexes['time']
ds['day_of_year'] = xr.Variable(('time', ), dates.dayofyear)
if domain is not None:
ds = ds.sel(**{d: domain[d] for d in iter_dims})
out = ds.load()
ds.close()
return out
def read_data(data_handle, domain=None, iter_dims=['lat', 'lon'],
start=None, stop=None, calendar='standard',
var_dict=None) -> xr.Dataset:
"""Read data directly from an xarray dataset"""
varlist = list(data_handle.keys())
if var_dict is not None:
data_handle.rename(var_dict, inplace=True)
varlist = list(var_dict.values())
if start is not None and stop is not None:
data_handle = data_handle[varlist].sel(time=slice(start, stop))
dates = data_handle.indexes['time']
data_handle['day_of_year'] = xr.Variable(('time', ), dates.dayofyear)
if domain is not None:
data_handle = data_handle.sel(**{d: domain[d] for d in iter_dims})
out = data_handle.load()
data_handle.close()
return out
def test_from_features_dropped_rows(X):
features = X.to_features()
data1 = features.from_features()
# Assert that we get the original Dataset back after X.to_features().from_features()
assert np.array_equal(data1.coords.to_index().values, X.coords.to_index().values)
assert np.allclose(data1.to_xy_arrays()[0], X.to_xy_arrays()[0])
# Drop some rows
features['features'].values[:2, :] = np.nan
zerod_vals_copy = features['features'].values[:] # Copy NaN positions for testing later on
features = features.dropna(features['features'].dims[0])
# Convert back to original dataset, padding NaN values into the proper locations if necessary
data2 = features.from_features()
# Assert that the coords are correct, and NaNs are in the right places
if np.nan in data2.to_xy_arrays()[0]:
assert np.array_equal(data2.coords.to_index().values, data1.coords.to_index().values)
assert np.allclose(data2.to_xy_arrays()[0], zerod_vals_copy, equal_nan=True)
def create_master(self, var, data=None, **kwargs):
""" Convenience function to create a master dataset for a
given experiment.
Parameters
----------
var : Var or str
A Var object containing the information about the variable
being processed or a string indicating its name for inference
when creating the master dataset
data : dict (optional, unless var is a string)
Dictionary of dictionaries/dataset containing the variable data
to be collected into a master dataset
Returns
-------
A Dataset with all the data, collapsed onto additional dimensions
for each case in the Experiment.
"""
return create_master(self, var, data, **kwargs)
def _make_dataset(varname, seed=None, **var_kws):
rs = np.random.RandomState(seed)
_dims = {'time': 10, 'x': 5, 'y': 5}
_dim_keys = ('time', 'x', 'y')
ds = xr.Dataset()
ds['time'] = ('time', pd.date_range('2000-01-01', periods=_dims['time']))
ds['x'] = np.linspace(0, 10, _dims['x'])
ds['y'] = np.linspace(0, 10, _dims['y'])
data = rs.normal(size=tuple(_dims[d] for d in _dim_keys))
ds[varname] = (_dim_keys, data)
ds.coords['numbers'] = ('time',
np.array(range(_dims['time']), dtype='int64'))
return ds
def test_new_geometric_median():
from datacube_stats.statistics import NewGeomedianStatistic
arr = np.random.random((5, 100, 100))
dataarray = xr.DataArray(arr, dims=('time', 'y', 'x'), coords={'time': list(range(5))})
dataset = xr.Dataset(data_vars={'band1': dataarray, 'band2': dataarray})
new_geomedian_stat = NewGeomedianStatistic()
result = new_geomedian_stat.compute(dataset)
assert isinstance(result, xr.Dataset)
assert result.band1.dims == result.band2.dims == ('y', 'x')
# The two bands had the same inputs, so should have the same result
assert (result.band1 == result.band2).all()
def two_band_eo_dataset(draw):
crs, height, width, times = draw(dataset_shape())
coordinates = {dim: np.arange(size) for dim, size in zip(crs.dimensions, (height, width))}
coordinates['time'] = times
dimensions = ('time',) + crs.dimensions
shape = (len(times), height, width)
arr = np.random.random_sample(size=shape)
data1 = xr.DataArray(arr,
dims=dimensions,
coords=coordinates,
attrs={'crs': crs})
arr = np.random.random_sample(size=shape)
data2 = xr.DataArray(arr,
dims=dimensions,
coords=coordinates,
attrs={'crs': crs})
name1, name2 = draw(st.lists(variable_name, min_size=2, max_size=2, unique=True))
dataset = xr.Dataset(data_vars={name1: data1, name2: data2},
attrs={'crs': crs})
return dataset
def test_normalised_difference_stats(dataset, output_name):
var1, var2 = list(dataset.data_vars)
ndstat = NormalisedDifferenceStats(var1, var2, output_name)
result = ndstat.compute(dataset)
assert isinstance(result, xr.Dataset)
assert 'time' not in result.dims
assert dataset.crs == result.crs
expected_output_varnames = set(f'{output_name}_{stat_name}' for stat_name in ndstat.stats)
assert set(result.data_vars) == expected_output_varnames
# Check the measurements() function raises an error on bad input_measurements
with pytest.raises(StatsConfigurationError):
invalid_names = [{'name': 'foo'}]
ndstat.measurements(invalid_names)
# Check the measurements() function returns something reasonable
input_measurements = [{'name': name} for name in (var1, var2)]
output_measurements = ndstat.measurements(input_measurements)
measurement_names = set(m['name'] for m in output_measurements)
assert expected_output_varnames == measurement_names
def compute(self, data):
is_integer_type = np.issubdtype(data.water.dtype, np.integer)
if not is_integer_type:
raise StatsProcessingError("Attempting to count bit flags on non-integer data. Provided data is: {}"
.format(data.water))
# 128 == clear and wet, 132 == clear and wet and masked for sea
# The PQ sea mask that we use is dodgy and should be ignored. It excludes lots of useful data
wet = ((data.water == 128) | (data.water == 132)).sum(dim='time')
dry = ((data.water == 0) | (data.water == 4)).sum(dim='time')
clear = wet + dry
with np.errstate(divide='ignore', invalid='ignore'):
frequency = wet / clear
if self.freq_only:
return xarray.Dataset({'frequency': frequency}, attrs=dict(crs=data.crs))
else:
return xarray.Dataset({'count_wet': wet,
'count_clear': clear,
'frequency': frequency}, attrs=dict(crs=data.crs))
def load_data(sub_tile_slice, sources):
"""
Load a masked chunk of data from the datacube, based on a specification and list of datasets in `sources`.
:param sub_tile_slice: A portion of a tile, tuple coordinates
:param sources: a dictionary containing `data`, `spec` and `masks`
:return: :class:`xarray.Dataset` containing loaded data. Will be indexed and sorted by time.
"""
datasets = [load_masked_data(sub_tile_slice, source_prod)
for source_prod in sources] # list of datasets
datasets = _remove_emptys(datasets)
if len(datasets) == 0:
raise EmptyChunkException()
# TODO: Add check for compatible data variable attributes
# flags_definition between pq products is different and is silently dropped
datasets = xarray.concat(datasets, dim='time') # Copies all the data
if len(datasets.time) == 0:
raise EmptyChunkException()
# sort along time dim
return datasets.isel(time=datasets.time.argsort()) # Copies all the data again
def test_filter_accessor():
ds = xr.Dataset(data_vars={'var1': ('x', [1, 2]), 'var2': ('y', [3, 4])},
coords={'x': [1, 2], 'y': [3, 4]})
filtered = ds.filter(lambda var: 'x' in var.dims)
assert 'var1' in filtered and 'var2' not in filtered
assert 'x' in filtered.coords and 'y' not in filtered.coords
def test_clock_coords(self):
ds = xr.Dataset(
coords={
'mclock': ('mclock', [0, 1, 2],
{self._clock_key: 1, self._master_clock_key: 1}),
'sclock': ('sclock', [0, 2], {self._clock_key: 1}),
'no_clock': ('no_clock', [3, 4])
}
)
assert set(ds.xsimlab.clock_coords) == {'mclock', 'sclock'}