def test_get_schema():
'''Should identify schema from id dimension in netCDF file.'''
tempdir = tempfile.gettempdir()
v1_0_dim = 'station'
v1_1_dim = 'feature_id'
v1_0_file = os.path.join(tempdir, 'v1_0.nc')
with Dataset(v1_0_file, 'w') as nc:
dim = nc.createDimension(v1_0_dim, None)
expected = constants.SCHEMAv1_0
returned = nwm_data.get_schema(nc)
assert expected == returned
os.remove(v1_0_file)
v1_1_file = os.path.join(tempdir, 'v1_1.nc')
with Dataset(v1_1_file, 'w') as nc:
dim = nc.createDimension(v1_1_dim, None)
expected = constants.SCHEMAv1_1
returned = nwm_data.get_schema(nc)
assert expected == returned
os.remove(v1_1_file)
python类Dataset()的实例源码
def polyNetcdf(poly, nc, v):
ncf = Dataset(nc)
# affine transform(px width, row rotation, UL x-coord, col rotation, px height, UL y-coord)
affine = Affine(0.1, 0.0, -180.0, 0.0, -0.1, 90.0)
ndims = ncf[v].ndim
if ndims == 2:
array = ncf[v][:,:]
ncf.close()
stats = (extractDate(nc), zonal_stats(poly, array, affine=affine, nodata=-9999.0, stats=['median'])[0]['median'])
return [stats]
elif ndims == 3: # assume it's time/lat/lon
#print 'N-dims:', ndims
cdftime = utime(ncf['time'].units)
stats = []
for i in range(ncf[v].shape[0]):
array = ncf[v][i,:,:]
dt = cdftime.num2date(ncf['time'][i]).strftime('%Y-%m-%d')
stats.append((dt, zonal_stats(poly, array, affine=affine, nodata=-9999.0, stats=['median'])[0]['median']))
ncf.close()
#print stats
return stats
else:
print 'Don\'t know what to do with %s dimensions!' % ndims
sys.exit()
def pointNetcdf(pt, nc, v):
ncf = Dataset(nc)
# affine transform(px width, row rotation, UL x-coord, col rotation, px height, UL y-coord)
affine = Affine(0.1, 0.0, -180.0, 0.0, -0.1, 90.0)
ndims = ncf[v].ndim
if ndims == 2:
array = ncf[v][:,:]
ncf.close()
stats = (extractDate(nc), point_query(pt, array, affine=affine, nodata=-9999.0)[0])
return [stats]
elif ndims == 3: # assume it's time/lat/lon
#print 'N-dims:', ndims
cdftime = utime(ncf['time'].units)
stats = []
for i in range(ncf[v].shape[0]):
array = ncf[v][i,:,:]
dt = cdftime.num2date(ncf['time'][i]).strftime('%Y-%m-%d')
stats.append((dt, point_query(pt, array, affine=affine, nodata=-9999.0)[0]))
ncf.close()
#print stats
return stats
else:
print 'Don\'t know what to do with %s dimensions!' % ndims
sys.exit()
def setup_netcdf_root(root,
header,
N):
"""
Add variables, dimensions (all of size *N), and attributes from
*config* to :class:`Dataset` *root* and return *root*.
"""
# add global attributes
root.source = header.Source_of_Data
root.station = header.Station_Name
root.code = header.IAGA_CODE
root.latitude = header.Geodetic_Latitude
root.longitude = header.Geodetic_Longitude
root.elevation = header.Elevation
root.reported = header.Reported
root.sensor_orientation = header.Sensor_Orientation
root.data_interval_type = header.Data_Interval_Type
root.data_type = header.Data_Type
root.creation_time_utc = str(datetime.utcnow())
# add dimensions
time_dim = root.createDimension('time', N)
codes = ['X', 'Y', 'Z', 'F', 'H', 'D']
for code in codes:
dim = root.createDimension(code, N)
# add variables
time = root.createVariable('time', 'f8', ('time',))
time.standard_name = 'time'
time.units = 'seconds since 2000-01-01 12:00:00.0'
time.calendar = 'gregorian'
for code in codes:
var = root.createVariable(code,
'f8',
(code,),
zlib=True)
var.standard_name = NAME_MAP[code]
var.units = UNITS_MAP[code]
return root
def test_from_dataset_11_list(self):
"""Test the creation of a list of InteractiveLists"""
variables, coords = self._from_dataset_test_variables
ds = xr.Dataset(variables, coords)
# Create two lists, each containing two arrays of variables v1 and v2.
# In the first list, the xdim dimensions are 0 and 1.
# In the second, the xdim dimensions are both 2
l = self.list_class.from_dataset(
ds, name=[['v1', 'v2']], xdim=[[0, 1], 2], prefer_list=True)
self.assertEqual(len(l), 2)
self.assertIsInstance(l[0], psyd.InteractiveList)
self.assertIsInstance(l[1], psyd.InteractiveList)
self.assertEqual(len(l[0]), 2)
self.assertEqual(len(l[1]), 2)
self.assertEqual(l[0][0].xdim, 0)
self.assertEqual(l[0][1].xdim, 1)
self.assertEqual(l[1][0].xdim, 2)
self.assertEqual(l[1][1].xdim, 2)
def test_to_dataframe(self):
variables, coords = self._from_dataset_test_variables
variables['v1'][:] = np.arange(variables['v1'].size).reshape(
variables['v1'].shape)
ds = xr.Dataset(variables, coords)
l = psyd.InteractiveList.from_dataset(ds, name='v1', t=[0, 1])
l.extend(psyd.InteractiveList.from_dataset(ds, name='v1', t=2,
x=slice(1, 3)),
new_name=True)
self.assertEqual(len(l), 3)
self.assertTrue(all(arr.ndim == 1 for arr in l), msg=l)
df = l.to_dataframe()
self.assertEqual(df.shape, (ds.xdim.size, 3))
self.assertEqual(df.index.values.tolist(), ds.xdim.values.tolist())
self.assertEqual(df[l[0].psy.arr_name].values.tolist(),
ds.v1[0].values.tolist())
self.assertEqual(df[l[1].psy.arr_name].values.tolist(),
ds.v1[1].values.tolist())
self.assertEqual(df[l[2].psy.arr_name].notnull().sum(), 2)
self.assertEqual(
df[l[2].psy.arr_name].values[
df[l[2].psy.arr_name].notnull().values].tolist(),
ds.v1[2, 1:3].values.tolist())
def initialize_file(vs, ncfile, create_time_dimension=True):
"""
Define standard grid in netcdf file
"""
if not isinstance(ncfile, Dataset):
raise TypeError("Argument needs to be a netCDF4 Dataset")
for dim in variables.BASE_DIMENSIONS:
var = vs.variables[dim]
dimsize = variables.get_dimensions(vs, var.dims[::-1], include_ghosts=False)[0]
nc_dim = add_dimension(vs, dim, dimsize, ncfile)
initialize_variable(vs, dim, var, ncfile)
write_variable(vs, dim, var, getattr(vs, dim), ncfile)
if create_time_dimension:
nc_dim_time = ncfile.createDimension("Time", None)
nc_dim_var_time = ncfile.createVariable("Time", "f8", ("Time",))
nc_dim_var_time.long_name = "Time"
nc_dim_var_time.units = "days"
nc_dim_var_time.time_origin = "01-JAN-1900 00:00:00"
def threaded_io(vs, filepath, mode):
"""
If using IO threads, start a new thread to write the netCDF data to disk.
"""
if vs.use_io_threads:
_wait_for_disk(vs, filepath)
_io_locks[filepath].clear()
nc_dataset = Dataset(filepath, mode)
try:
yield nc_dataset
finally:
if vs.use_io_threads:
io_thread = threading.Thread(target=_write_to_disk, args=(vs, nc_dataset, filepath))
io_thread.start()
else:
_write_to_disk(vs, nc_dataset, filepath)
def write(self):
"""
Write all cached states to the NetCDF file, and clear the cache.
This will append to any existing NetCDF file.
Raises
------
InvalidStateError
If cached states do not all have the same quantities
as every other cached and written state.
"""
with nc4.Dataset(self._filename, self._write_mode) as dataset:
self._ensure_cached_state_keys_compatible_with_dataset(dataset)
time_list, state_list = self._get_ordered_times_and_states()
self._ensure_time_exists(dataset, time_list[0])
it_start = dataset.dimensions['time'].size
it_end = it_start + len(time_list)
append_times_to_dataset(time_list, dataset, self._time_units)
all_states = combine_states(state_list)
for name, value in all_states.items():
ensure_variable_exists(dataset, name, value)
dataset.variables[name][
it_start:it_end, :] = value.values[:, :]
self._cached_state_dict = {}
def get_rad_data(self, inp_file):
unzip_file = self.unZip(inp_file)
if self.return_code != 0:
return
try:
if sys.platform == 'win32' or sys.platform == 'cygwin':
cdf_file = Dataset(unzip_file, 'r')
else:
cdf_file = NetCDFFile(unzip_file, 'r')
except:
self.decodeError(inp_file)
return
# Variable Description Units
# -------- ----------------------------------- --------
# swgnt Surface net downward shortwave flux W m-2
self.lati = cdf_file.variables[self.vars['latitude']][:]
self.longi = cdf_file.variables[self.vars['longitude']][:]
self.tims = cdf_file.variables['time'][:]
self.swgnt += self.getGHI(cdf_file.variables[self.vars['swgnt']])
cdf_file.close()
def get_rad_data(self, inp_file):
unzip_file = self.unZip(inp_file)
if self.return_code != 0:
return
try:
if sys.platform == 'win32' or sys.platform == 'cygwin':
cdf_file = Dataset(unzip_file, 'r')
else:
cdf_file = NetCDFFile(unzip_file, 'r')
except:
self.decodeError(inp_file)
return
# Variable Description Units
# -------- ----------------------------------- --------
# swgnt Surface net downward shortwave flux W m-2
self.lati = cdf_file.variables[self.vars['latitude']][:]
self.longi = cdf_file.variables[self.vars['longitude']][:]
self.tims = cdf_file.variables['time'][:]
self.swgnt += self.getGHI(cdf_file.variables[self.vars['swgnt']])
cdf_file.close()
def test_README(self):
sim_filename = self.run_with_output("wxgen sim -db examples/database.nc -n 10 -t 100")
file = netCDF4.Dataset(sim_filename, 'r')
self.assertTrue("time" in file.dimensions)
self.assertTrue("ensemble_member" in file.dimensions)
self.assertEqual(100, file.dimensions["time"].size)
self.assertEqual(10, file.dimensions["ensemble_member"].size)
file.close()
truth_filename = self.run_with_output("wxgen truth -db examples/database.nc")
file = netCDF4.Dataset(truth_filename, 'r')
self.assertTrue("time" in file.dimensions)
self.assertTrue("ensemble_member" in file.dimensions)
self.assertEqual(729, file.dimensions["time"].size)
self.assertEqual(1, file.dimensions["ensemble_member"].size)
file.close()
self.run_with_image("wxgen verif %s %s -m timeseries" % (sim_filename, truth_filename))
self.run_with_image("wxgen verif %s %s -m variance" % (sim_filename, truth_filename))
time.sleep(1)
for filename in [sim_filename, truth_filename]:
remove(filename)
def anim(filename,varname,cax):
template = filename+'_%03i.nc'
nc0 = Dataset(template%(0),'r') # open proc 0 nc file
nt = len(nc0.dimensions['t'])
nc0.close()
def animate(i):
global kt
time,z2d=read(filename,varname,kt)
im.set_data(z2d)
ti.set_text('%.0f '%time)
kt +=5
return im,ti
fig=plt.figure()
ax = fig.add_subplot(111)
time,z2d=read(filename,varname,0)
im=ax.imshow(z2d,vmin=cax[0],vmax=cax[1])
ti=ax.text(100,-50,'%.0f '%time)
print('launch the animation')
global kt
kt = 0
ani = animation.FuncAnimation(fig, animate,arange(nt),interval=5, blit=True)
plt.show()
def create_diag(self,diag):
nc = Dataset(self.diagfile,'w',format='NETCDF4')
nc.createDimension('t',None) # define dimensions ('None' is record dim)
d = nc.createVariable('t','f',('t',))
d.long_name = 'model time'
d = nc.createVariable('kt','i',('t',))
d.long_name = 'model iteration'
for v in self.list_diag:
d = nc.createVariable(v,'f',('t',))
d.long_name = v
nc.close()
self.kdiag = 0
# set up internal buffer to avoid too frequent disk access
self.ndiags = len(self.list_diag)+2
self.buffersize = 10
self.buffer=zeros((self.buffersize,self.ndiags))
def write(self,tend,t,dt,kt,tnextdiag,tnexthis,var):
nh = self.nh
nc = Dataset(self.restart_file,'w')
nc.setncattr('tend',tend)
nc.setncattr('t',t)
nc.setncattr('dt',dt)
nc.setncattr('kt',kt)
nc.setncattr('tnextdiag',tnextdiag)
nc.setncattr('tnexthis',tnexthis)
nc.createDimension('x',self.nxl)
nc.createDimension('y',self.nyl)
for v in self.varname_list:
nc.createVariable(v,'d',('y','x')) # save in double precision
z2d = var.get(v)
nc.variables[v][:,:]=z2d[:,:]
nc.close()
def plot_numvisc(diagfile):
plt.figure()
nc = Dataset(diagfile)
t=nc.variables['t'][:]
ke=nc.variables['ke'][:]
dkdt=np.diff(ke)/np.diff(t)
ens=nc.variables['enstrophy'][:]
ensm=0.5*(ens[1:]+ens[:-1])
# deltake[visc,res]=-(ke[-1]-ke[0])
# deltaens[visc,res]=max(medfilt(ens,21))-ens[5]
visc_tseries = -dkdt/ensm*4.4*np.pi
visc_num = max(visc_tseries[t[1:]>0.02])
#print('N=%4i / visc = %4.1e / num = %4.2e'%(N[res],Kdiff[visc],visc_num[res]))
plt.semilogy(t[1:],visc_tseries)
plt.xlabel('time')
plt.ylabel('viscosity (-(1/2V)dE/dt)')
plt.grid('on')
plt.show()
def read(t_MaskInfo):
c_FilterDir = mospat_inc_filters.c_FilterDir + 'REGION_MASK/'
c_AllMaskNames = t_MaskInfo['c_FilterName']
c_AllFileNames = t_MaskInfo['c_FilterFile']
c_AllVarNames = t_MaskInfo['c_FilterVar']
nmask = len(IncF.c_RegionMask)
for imask in range(nmask):
c_Mask = IncF.c_RegionMask[imask]
# Identifying the idx of mask of interest
idx_mask = [i for i, x in enumerate(c_AllMaskNames) if x == c_Mask][0]
c_FileName = c_FilterDir + c_AllFileNames[idx_mask]
c_VarName = c_AllVarNames[idx_mask]
# READING FILE WITH MASK
file = nc.Dataset(c_FileName, 'r')
f_lat = np.array(file.variables['lat'])
f_lon = np.array(file.variables['lon'])
f_mask = np.array(np.squeeze(file.variables[c_VarName]))
return f_mask, f_lat, f_lon
def load_grid(nc):
"""
Get a SGrid object from a netCDF4.Dataset or file/URL.
:param str or netCDF4.Dataset nc: a netCDF4 Dataset or URL/filepath
to the netCDF file
:return: SGrid object
:rtype: sgrid.SGrid
"""
if isinstance(nc, Dataset):
pass
else:
nc = Dataset(nc, 'r')
return SGrid.load_grid(nc)
def get_dataset(ncfile, dataset=None):
"""
Utility to create a netCDF4 Dataset from a filename, list of filenames,
or just pass it through if it's already a netCDF4.Dataset
if dataset is not None, it should be a valid netCDF4 Dataset object,
and it will simiply be returned
"""
if dataset is not None:
return dataset
if isinstance(ncfile, nc4.Dataset):
return ncfile
elif isinstance(ncfile, collections.Iterable) and len(ncfile) == 1:
return nc4.Dataset(ncfile[0])
elif isstring(ncfile):
return nc4.Dataset(ncfile)
else:
return nc4.MFDataset(ncfile)
def is_valid_mesh(nc, varname):
"""
determine if the given variable name is a valid mesh definition
:param nc: a netCDF4 Dataset to check
:param varname: name of the candidate mesh variable
"""
try:
mesh_var = nc.variables[varname]
except KeyError:
return False
try:
if (mesh_var.cf_role.strip() == 'mesh_topology' and
int(mesh_var.topology_dimension) == 2):
return True
except AttributeError:
# not a valid mesh variable
return False
# Defining properties of various connectivity arrays
# so that the same code can load all of them.
def file_to_subset_setup(request):
ids = [2, 4, 6]
flows = [3.1, -9999.0, 5.0]
date = '2017-04-29_00:00:00'
flows = ma.masked_array(flows, mask=[0, 1, 0]) # explicit mask
with Dataset(_file_to_subset, 'w') as nc:
nc.model_output_valid_time = date
dim = nc.createDimension('feature_id', 3)
id_var = nc.createVariable('feature_id', 'i', ('feature_id',))
id_var[:] = ids
flow_var = nc.createVariable('streamflow', 'f', ('feature_id',),
fill_value=-9999.0)
flow_var[:] = flows
extra_var = nc.createVariable('extra_var', 'i', ('feature_id',))
extra_var[:] = [1, 2, 3]
def file_to_subset_teardown():
os.remove(_file_to_subset)
request.addfinalizer(file_to_subset_teardown)
def test_time_from_variable():
'''Should read date from time variable.'''
tempdir = tempfile.gettempdir()
nc_file = os.path.join(tempdir, 'test_time_from_variable.nc')
date_obj = parser.parse('2017-04-29 04:00:00')
units = 'minutes since 1970-01-01 00:00:00 UTC'
nc_date = round(date2num(date_obj, units))
with Dataset(nc_file, 'w') as nc:
dim = nc.createDimension('time', 1)
time_var = nc.createVariable('time', 'i', ('time',))
time_var[:] = [nc_date]
time_var.units = units
with Dataset(nc_file, 'r') as nc:
expected = date_obj.replace(tzinfo=pytz.utc)
returned = nwm_data.time_from_dataset(nc)
assert expected == returned
os.remove(nc_file)
def files_to_cube_setup(request):
date_template = '2017-04-29_0{0}:00:00'
for i, nc_file in enumerate(_files_to_cube):
date = date_template.format(i)
flows = [flow * (i + 1) for flow in _flows_template]
if i == 1:
flows[1] = -9999.0 # one way of masking data
elif i == 2:
flows = ma.masked_array(flows, mask=[0, 1, 0]) # explicit mask
with Dataset(nc_file, 'w') as nc:
nc.model_output_valid_time = date
dim = nc.createDimension('feature_id', 3)
id_var = nc.createVariable('feature_id', 'i', ('feature_id',))
id_var[:] = _ids
flow_var = nc.createVariable('streamflow', 'f', ('feature_id',),
fill_value=-9999.0)
flow_var[:] = flows
def files_to_cube_teardown():
for nc_file in _files_to_cube:
os.remove(nc_file)
request.addfinalizer(files_to_cube_teardown)
def file_to_read_streamflow_setup(request):
ids = [2, 4, 6]
flows = [1.3, -9999.0, 5.1]
date = '2017-04-29_04:00:00'
flows = ma.masked_array(flows, mask=[0, 1, 0]) # explicit mask
with Dataset(_file_to_read_streamflow, 'w') as nc:
nc.model_output_valid_time = date
dim = nc.createDimension('feature_id', 3)
id_var = nc.createVariable('feature_id', 'i', ('feature_id',))
id_var[:] = ids
flow_var = nc.createVariable('streamflow', 'f', ('feature_id',),
fill_value=-9999.0)
flow_var[:] = flows
def file_to_read_streamflow_teardown():
os.remove(_file_to_read_streamflow)
request.addfinalizer(file_to_read_streamflow_teardown)
def load_netcdf_meta(datafile):
'''
Loads metadata for NetCDF
Parameters:
:datafile: str: Path on disk to NetCDF file
Returns:
:meta: Dictionary of metadata
'''
ras = nc.Dataset(datafile)
attrs = _get_nc_attrs(ras)
sds = _get_subdatasets(ras)
meta = {'meta': attrs,
'layer_meta': sds,
'name': datafile,
'variables': list(ras.variables.keys()),
}
return meta_strings_to_dict(meta)
def to_netcdf(self, path):
"""
Store the model in a netCDF file that attempts to be displayable
using standard tools and loosely follows the WRF 'standard'.
:param path: the path where to store the model
"""
import netCDF4
d = netCDF4.Dataset(path, 'w', format='NETCDF4')
d0, d1, k = self.m_ext.shape
d.createDimension('fuel_moisture_classes_stag', k)
d.createDimension('south_north', d0)
d.createDimension('west_east', d1)
ncfmc = d.createVariable('FMC_GC', 'f4', ('south_north', 'west_east','fuel_moisture_classes_stag'))
ncfmc[:,:,:] = self.m_ext
ncfmc_cov = d.createVariable('FMC_COV', 'f4', ('south_north', 'west_east','fuel_moisture_classes_stag', 'fuel_moisture_classes_stag'))
ncfmc_cov[:,:,:,:] = self.P
d.close()
def from_netcdf(cls, path):
"""
Construct a fuel moisture model from data stored in a netCDF file.
:param path: the path to the netCDF4 file
"""
import netCDF4
print "reading from netCDF file", path
d = netCDF4.Dataset(path)
ncfmc = d.variables['FMC_GC'][:,:,:]
print "FuelMoistureModel.from_netcdf: reading FMC_GC shape",ncfmc.shape
d0, d1, k = ncfmc.shape
P = d.variables['FMC_COV'][:,:,:,:]
Tk = np.array([1.0, 10.0, 100.0]) * 3600
fm = FuelMoistureModel(ncfmc[:,:,:k-2], Tk)
fm.m_ext[:,:,k-2:] = ncfmc[:,:,k-2:]
fm.P[:,:,:,:] = P
return fm
def create_netcdf(savef, dts, dat):
""" Write Florida current data to netcdf file """
dataset = Dataset(savef, 'w', format='NETCDF4_CLASSIC')
# Create time coordinate
tdim = dataset.createDimension('time', None)
time = dataset.createVariable('time',np.float64,(tdim.name,))
time.units = 'hours since 0001-01-01 00:00:00.0'
time.calendar = 'gregorian'
time[:] = date2num(dts, time.units, calendar=time.calendar)
# Create data variable
fc = dataset.createVariable('florida_current_transport',np.float64,(tdim.name),fill_value=1.e20)
fc.units = 'Sv'
fc[:] = dat
# close file
print 'SAVING: %s' % savef
dataset.close()
def write_to_netcdf(self, ncfile):
""" Write observation data to netcdf file """
# Open ncfile and create coords
dataset = Dataset(ncfile, 'w', format='NETCDF4_CLASSIC')
tdim = dataset.createDimension('time', None)
# Create time coordinate
time = dataset.createVariable('time',np.float64,(tdim.name,))
time.units = 'hours since 0001-01-01 00:00:00.0'
time.calendar = 'gregorian'
time[:] = date2num(self.dates, time.units, calendar=time.calendar)
# Create variables
fc = dataset.createVariable('florida_current_transport',np.float64,(tdim.name,))
fc.units = 'Sv'
fc[:] = self.fc
# Close file
print 'SAVING: %s' % ncfile
dataset.close()
def write_to_netcdf(self, ncfile):
""" Write observation data to netcdf file """
# Open ncfile and create coords
dataset = Dataset(ncfile, 'w', format='NETCDF4_CLASSIC')
tdim = dataset.createDimension('time', None)
# Create time coordinate
time = dataset.createVariable('time',np.float64,(tdim.name,))
time.units = 'hours since 0001-01-01 00:00:00.0'
time.calendar = 'gregorian'
time[:] = date2num(self.dates, time.units, calendar=time.calendar)
# Create variables
fc = dataset.createVariable('florida_current_transport',np.float64,(tdim.name,))
fc.units = 'Sv'
fc[:] = self.fc
# Close file
print 'SAVING: %s' % ncfile
dataset.close()