def test_master_clock_dim(self):
attrs = {self._clock_key: 1, self._master_clock_key: 1}
ds = xr.Dataset(coords={'clock': ('clock', [1, 2], attrs)})
assert ds.xsimlab.master_clock_dim == 'clock'
assert ds.xsimlab._master_clock_dim == 'clock' # cache
assert ds.xsimlab.master_clock_dim == 'clock' # get cached value
ds = xr.Dataset()
assert ds.xsimlab.master_clock_dim is None
python类Dataset()的实例源码
def test_set_master_clock_dim(self):
ds = xr.Dataset(coords={'clock': [1, 2], 'clock2': [3, 4]})
ds.xsimlab._set_master_clock_dim('clock')
assert self._master_clock_key in ds.clock.attrs
ds.xsimlab._set_master_clock_dim('clock2')
assert self._master_clock_key not in ds.clock.attrs
assert self._master_clock_key in ds.clock2.attrs
with pytest.raises(KeyError):
ds.xsimlab._set_master_clock_dim('invalid_clock')
def test_set_input_vars(self, model):
ds = xr.Dataset()
with pytest.raises(KeyError) as excinfo:
ds.xsimlab._set_input_vars(model, 'invalid_process', var=1)
assert "no process named" in str(excinfo.value)
with pytest.raises(ValueError) as excinfo:
ds.xsimlab._set_input_vars(model, 'some_process', some_param=0,
invalid_var=1)
assert "not valid input variables" in str(excinfo.value)
ds.xsimlab._set_input_vars(model, 'quantity',
quantity=('x', np.zeros(10)))
expected = xr.DataArray(data=np.zeros(10), dims='x')
assert "quantity__quantity" in ds
xr.testing.assert_equal(ds['quantity__quantity'], expected)
# test time and parameter dimensions
ds.xsimlab._set_input_vars(model, model.some_process, some_param=[1, 2])
expected = xr.DataArray(data=[1, 2], dims='some_process__some_param',
coords={'some_process__some_param': [1, 2]})
xr.testing.assert_equal(ds['some_process__some_param'], expected)
del ds['some_process__some_param']
ds['clock'] = ('clock', [0, 1], {self._master_clock_key: 1})
ds.xsimlab._set_input_vars(model, 'some_process',
some_param=('clock', [1, 2]))
expected = xr.DataArray(data=[1, 2], dims='clock',
coords={'clock': [0, 1]})
xr.testing.assert_equal(ds['some_process__some_param'], expected)
# test optional
ds.xsimlab._set_input_vars(model, 'grid')
expected = xr.DataArray(data=5)
xr.testing.assert_equal(ds['grid__x_size'], expected)
def test_set_snapshot_vars(self, model):
ds = xr.Dataset()
ds['clock'] = ('clock', [0, 2, 4, 6, 8],
{self._clock_key: 1, self._master_clock_key: 1})
ds['snap_clock'] = ('snap_clock', [0, 4, 8], {self._clock_key: 1})
ds['not_a_clock'] = ('not_a_clock', [0, 1])
with pytest.raises(KeyError) as excinfo:
ds.xsimlab._set_snapshot_vars(model, None, invalid_process='var')
assert "no process named" in str(excinfo.value)
with pytest.raises(KeyError) as excinfo:
ds.xsimlab._set_snapshot_vars(model, None, quantity='invalid_var')
assert "has no variable" in str(excinfo.value)
ds.xsimlab._set_snapshot_vars(model, None, grid='x')
assert ds.attrs[self._snapshot_vars_key] == 'grid__x'
ds.xsimlab._set_snapshot_vars(model, 'clock',
some_process='some_effect',
quantity='quantity')
expected = {'some_process__some_effect', 'quantity__quantity'}
actual = set(ds['clock'].attrs[self._snapshot_vars_key].split(','))
assert actual == expected
ds.xsimlab._set_snapshot_vars(model, 'snap_clock',
other_process=('other_effect', 'x2'))
expected = {'other_process__other_effect', 'other_process__x2'}
actual = set(ds['snap_clock'].attrs[self._snapshot_vars_key].split(','))
assert actual == expected
with pytest.raises(ValueError) as excinfo:
ds.xsimlab._set_snapshot_vars(model, 'not_a_clock',
quantity='quantity')
assert "not a valid clock" in str(excinfo.value)
def test_run_multi(self):
ds = xr.Dataset()
with pytest.raises(NotImplementedError):
ds.xsimlab.run_multi()
def test_constructor(self, model, input_dataset):
ds = xr.Dataset()
with pytest.raises(ValueError) as excinfo:
DatasetModelInterface(model, ds)
assert "missing master clock dimension" in str(excinfo.value)
invalid_ds = input_dataset.drop('quantity__quantity')
with pytest.raises(KeyError) as excinfo:
DatasetModelInterface(model, invalid_ds)
assert "missing data variables" in str(excinfo.value)
def _set_master_clock_dim(self, dim):
if dim not in self._obj.coords:
raise KeyError("Dataset has no %r dimension coordinate. "
"To create a new master clock dimension, "
"use Dataset.xsimlab.update_clock."
% dim)
if self.master_clock_dim is not None:
self._obj[self.master_clock_dim].attrs.pop(self._master_clock_key)
self._obj[dim].attrs[self._clock_key] = np.uint8(True)
self._obj[dim].attrs[self._master_clock_key] = np.uint8(True)
self._master_clock_dim = dim
def _set_snapshot_clock(self, dim, data=None, start=0., end=None,
step=None, nsteps=None, auto_adjust=True):
if self.master_clock_dim is None:
raise ValueError("no master clock dimension/coordinate is defined "
"in Dataset. "
"Use `Dataset.xsimlab._set_master_clock` first")
clock_data = self._set_clock_data(dim, data, start, end, step, nsteps)
da_master_clock = self._obj[self.master_clock_dim]
if auto_adjust:
kwargs = {'method': 'nearest'}
else:
kwargs = {}
indexer = {self.master_clock_dim: clock_data}
kwargs.update(indexer)
da_snapshot_clock = da_master_clock.sel(**kwargs)
self._obj[dim] = da_snapshot_clock.rename({self.master_clock_dim: dim})
# .sel copies variable attributes
self._obj[dim].attrs.pop(self._master_clock_key)
for attr_name in ('units', 'calendar'):
attr_value = da_master_clock.attrs.get(attr_name)
if attr_value is not None:
self._obj[dim].attrs[attr_name] = attr_value
def run(self, model=None, safe_mode=True):
"""Run the model.
Parameters
----------
model : :class:`xsimlab.Model` object, optional
Reference model. If None, tries to get model from context.
safe_mode : bool, optional
If True (default), it is safe to run multiple simulations
simultaneously. Generally safe mode shouldn't be disabled, except
in a few cases (e.g., debugging).
Returns
-------
output : Dataset
Another Dataset with both model inputs and outputs (snapshots).
"""
model = _maybe_get_model_from_context(model)
if safe_mode:
model = model.clone()
ds_model_interface = DatasetModelInterface(model, self._obj)
out_ds = ds_model_interface.run_model()
return out_ds
def run_multi(self):
"""Run multiple models.
Not yet implemented.
See Also
--------
:meth:`xarray.Dataset.xsimlab.run`
"""
# TODO:
raise NotImplementedError()
def test_version_metadata_with_streaming(self, api, opener):
np.random.seed(123)
times = pd.date_range('2000-01-01', '2001-12-31', name='time')
annual_cycle = np.sin(2 * np.pi * (times.dayofyear / 365.25 - 0.28))
base = 10 + 15 * np.array(annual_cycle).reshape(-1, 1)
tmin_values = base + 3 * np.random.randn(annual_cycle.size, 3)
tmax_values = base + 3 * np.random.randn(annual_cycle.size, 3)
ds = xr.Dataset({'tmin': (('time', 'location'), tmin_values),
'tmax': (('time', 'location'), tmax_values)},
{'time': times, 'location': ['IA', 'IN', 'IL']})
var = api.create('streaming_test')
with var.get_local_path(
bumpversion='patch',
dependencies={'arch1': '0.1.0', 'arch2': '0.2.0'}) as f:
ds.to_netcdf(f)
ds.close()
assert var.get_history()[-1]['dependencies']['arch2'] == '0.2.0'
tmin_values = base + 10 * np.random.randn(annual_cycle.size, 3)
ds.update({'tmin': (('time', 'location'), tmin_values)})
with var.get_local_path(
bumpversion='patch',
dependencies={'arch1': '0.1.0', 'arch2': '1.2.0'}) as f:
with xr.open_dataset(f) as ds:
mem = ds.load()
ds.close()
mem.to_netcdf(f)
assert var.get_history()[-1]['dependencies']['arch2'] == '1.2.0'
assert var.get_history()[-1][
'checksum'] != var.get_history()[-2]['checksum']
def to_netcdf(ds, *args, **kwargs):
"""
Store the given dataset as a netCDF file
This functions works essentially the same as the usual
:meth:`xarray.Dataset.to_netcdf` method but can also encode absolute time
units
Parameters
----------
ds: xarray.Dataset
The dataset to store
%(xarray.Dataset.to_netcdf.parameters)s
"""
to_update = {}
for v, obj in six.iteritems(ds.variables):
units = obj.attrs.get('units', obj.encoding.get('units', None))
if units == 'day as %Y%m%d.%f' and np.issubdtype(
obj.dtype, np.datetime64):
to_update[v] = xr.Variable(
obj.dims, AbsoluteTimeEncoder(obj), attrs=obj.attrs.copy(),
encoding=obj.encoding)
to_update[v].attrs['units'] = units
if to_update:
ds = ds.update(to_update, inplace=False)
return xarray_api.to_netcdf(ds, *args, **kwargs)
def decode_coords(ds, gridfile=None, inplace=True):
"""
Sets the coordinates and bounds in a dataset
This static method sets those coordinates and bounds that are marked
marked in the netCDF attributes as coordinates in :attr:`ds` (without
deleting them from the variable attributes because this information is
necessary for visualizing the data correctly)
Parameters
----------
ds: xarray.Dataset
The dataset to decode
gridfile: str
The path to a separate grid file or a xarray.Dataset instance which
may store the coordinates used in `ds`
inplace: bool, optional
If True, `ds` is modified in place
Returns
-------
xarray.Dataset
`ds` with additional coordinates"""
def add_attrs(obj):
if 'coordinates' in obj.attrs:
extra_coords.update(obj.attrs['coordinates'].split())
if 'bounds' in obj.attrs:
extra_coords.add(obj.attrs['bounds'])
if gridfile is not None and not isinstance(gridfile, xr.Dataset):
gridfile = open_dataset(gridfile)
extra_coords = set(ds.coords)
for k, v in six.iteritems(ds.variables):
add_attrs(v)
add_attrs(ds)
if gridfile is not None:
ds = ds.update({k: v for k, v in six.iteritems(gridfile.variables)
if k in extra_coords}, inplace=inplace)
ds = ds.set_coords(extra_coords.intersection(ds.variables),
inplace=inplace)
return ds
def get_idims(self, arr, coords=None):
"""Get the coordinates in the :attr:`ds` dataset as int or slice
This method returns a mapping from the coordinate names of the given
`arr` to an integer, slice or an array of integer that represent the
coordinates in the :attr:`ds` dataset and can be used to extract the
given `arr` via the :meth:`xarray.Dataset.isel` method.
Parameters
----------
arr: xarray.DataArray
The data array for which to get the dimensions as integers, slices
or list of integers from the dataset in the :attr:`base` attribute
Returns
-------
dict
Mapping from coordinate name to integer, list of integer or slice
See Also
--------
xarray.Dataset.isel, InteractiveArray.idims"""
if coords is None:
coord_items = six.iteritems(arr.coords)
else:
coord_items = ((label, coord) for label, coord in six.iteritems(
arr.coords) if label in coords)
ret = dict(
(label, get_index_from_coord(coord, self.ds.indexes[label]))
for label, coord in coord_items if label in self.ds.indexes)
# handle the coordinates that are not in the dataset
missing = set(arr.dims).difference(ret)
if missing:
warn('Could not get slices for the following dimensions: %r' % (
missing, ))
return ret
def open_dataset(filename_or_obj, decode_cf=True, decode_times=True,
decode_coords=True, engine=None, gridfile=None, **kwargs):
"""
Open an instance of :class:`xarray.Dataset`.
This method has the same functionality as the :func:`xarray.open_dataset`
method except that is supports an additional 'gdal' engine to open
gdal Rasters (e.g. GeoTiffs) and that is supports absolute time units like
``'day as %Y%m%d.%f'`` (if `decode_cf` and `decode_times` are True).
Parameters
----------
%(xarray.open_dataset.parameters.no_engine)s
engine: {'netcdf4', 'scipy', 'pydap', 'h5netcdf', 'gdal'}, optional
Engine to use when reading netCDF files. If not provided, the default
engine is chosen based on available dependencies, with a preference for
'netcdf4'.
%(CFDecoder.decode_coords.parameters.gridfile)s
Returns
-------
xarray.Dataset
The dataset that contains the variables from `filename_or_obj`"""
# use the absolute path name (is saver when saving the project)
if isstring(filename_or_obj) and os.path.exists(filename_or_obj):
filename_or_obj = os.path.abspath(filename_or_obj)
if engine == 'gdal':
from psyplot.gdal_store import GdalStore
filename_or_obj = GdalStore(filename_or_obj)
engine = None
ds = xr.open_dataset(filename_or_obj, decode_cf=decode_cf,
decode_coords=False, engine=engine,
decode_times=decode_times, **kwargs)
if decode_cf:
ds = CFDecoder.decode_ds(
ds, decode_coords=decode_coords, decode_times=decode_times,
gridfile=gridfile, inplace=True)
return ds
def __init__(self, xarray_obj, *args, **kwargs):
"""
The ``*args`` and ``**kwargs`` are essentially the same as for the
:class:`xarray.DataArray` method, additional ``**kwargs`` are
described below.
Other Parameters
----------------
base: xarray.Dataset
Default: None. Dataset that serves as the origin of the data
contained in this DataArray instance. This will be used if you want
to update the coordinates via the :meth:`update` method. If None,
this instance will serve as a base as soon as it is needed.
decoder: psyplot.CFDecoder
The decoder that decodes the `base` dataset and is used to get
bounds. If not given, a new :class:`CFDecoder` is created
idims: dict
Default: None. dictionary with integer values and/or slices in the
`base` dictionary. If not given, they are determined automatically
%(InteractiveBase.parameters)s
"""
self.arr = xarray_obj
super(InteractiveArray, self).__init__(*args, **kwargs)
self._registered_updates = {}
self._new_dims = {}
self.method = None
def _register_update(self, method='isel', replot=False, dims={}, fmt={},
force=False, todefault=False):
"""
Register new dimensions and formatoptions for updating
Parameters
----------
method: {'isel', None, 'nearest', ...}
Selection method of the xarray.Dataset to be used for setting the
variables from the informations in `dims`.
If `method` is 'isel', the :meth:`xarray.Dataset.isel` method is
used. Otherwise it sets the `method` parameter for the
:meth:`xarray.Dataset.sel` method.
%(setup_coords.parameters.dims)s
%(InteractiveBase._register_update.parameters)s
See Also
--------
start_update"""
if self._new_dims and self.method != method:
raise ValueError(
"New dimensions were already specified for with the %s method!"
" I can not choose a new method %s" % (self.method, method))
else:
self.method = method
if 'name' in dims:
self._new_dims['name'] = dims.pop('name')
self._new_dims.update(self.decoder.correct_dims(
next(six.itervalues(self.base_variables)), dims))
InteractiveBase._register_update(
self, fmt=fmt, replot=replot or bool(self._new_dims), force=force,
todefault=todefault)
def _open_ds_from_store(fname, store_mod=None, store_cls=None, **kwargs):
"""Open a dataset and return it"""
if isinstance(fname, xr.Dataset):
return fname
if store_mod is not None and store_cls is not None:
fname = getattr(import_module(store_mod), store_cls)(fname)
return open_dataset(fname, **kwargs)
def test_update(self):
"""Test the update of an :class:`psyplot.data.ArrayList`"""
variables, coords = self._from_dataset_test_variables
ds = xr.Dataset(variables, coords)
psy.register_plotter('test_plotter', module='something',
plotter_name='unimportant',
plotter_cls=tp.TestPlotter)
# add 2 arrays
psy.plot.test_plotter(ds, name=['v0', 'v1'], t=0)
# add a list
psy.plot.test_plotter(ds, name=['v0', 'v1'], t=0, prefer_list=True)
mp = psy.gcp(True)
self.assertEqual(len(mp), 3, msg=mp)
self.assertEqual(len(mp.plotters), 3, msg=mp)
# update the list
mp.update(t=1, fmt2='updated')
for i, plotter in enumerate(mp.plotters):
self.assertEqual(plotter['fmt2'], 'updated',
msg='Plotter of array %i not updated! %s' % (
i, mp[i]))
self.assertEqual(mp[0].time, ds.time[1])
self.assertEqual(mp[1].time, ds.time[1])
for data in mp[2]:
self.assertEqual(data.time, ds.time[1])
def test_1D_cf_bounds(self):
"""Test whether the CF Conventions for 1D bounaries are correct"""
final_bounds = np.arange(-180, 181, 30)
lon = xr.Variable(('lon', ), np.arange(-165, 166, 30),
{'bounds': 'lon_bounds'})
cf_bounds = xr.Variable(('lon', 'bnds'), np.zeros((len(lon), 2)))
for i in range(len(lon)):
cf_bounds[i, :] = final_bounds[i:i+2]
ds = xr.Dataset(coords={'lon': lon, 'lon_bounds': cf_bounds})
decoder = psyd.CFDecoder(ds)
self.assertEqual(list(final_bounds),
list(decoder.get_plotbounds(lon)))