def testMetadata(ncattributes, files):
answer = list([])
# metadata variables... (ncattributes)
#print "files: ", len(files)
content = dict()
count = 0
for f in files:
print f
nc_fid = netCDF4.Dataset( f,'r')
for v in ncattributes:
checkVariable(nc_fid,v,answer,count)
# print "variables failed = " , count
nc_fid.close()
for a in nc_fid.ncattrs():
# content[a] = str(nc_fid.getncattr(a))
content[str(a).replace(".","_")] = str(nc_fid.getncattr(a))
return answer,content
python类Dataset()的实例源码
def __init__(self, grid_def, description=''):
with nc.Dataset(grid_def) as f:
# Select points from double density horizontal grid. Only
# need t-points.
try:
x_t = f.variables['nav_lon'][:]
y_t = f.variables['nav_lat'][:]
z = f.variables['deptht'][:]
except KeyError:
x_t = f.variables['lon'][:]
y_t = f.variables['lat'][:]
z = f.variables['depth'][:]
mask = np.zeros_like(f.variables['tmask'], dtype=bool)
mask[f.variables['tmask'][:] == 0.0] = True
super(OrasGrid, self).__init__(x_t, y_t, mask_t=mask,
levels=zdescription)
def __init__(self, num_lons=128, num_lats=64, num_levels=1,
mask_file=None, description=''):
levels = range(num_levels)
self.type = 'Spectral'
self.full_name = 'T42'
if mask_file:
with nc.Dataset(mask_file) as f:
try:
mask = np.round(f.variables['WGOCN'][0, 0, :, :-1])
except KeyError as e:
print("Error: var WGOCN not in {}.".format(mask_file),
file=sys.stderr)
raise e
else:
# Default is all unmasked.
mask = np.zeros((num_lats, num_lons))
assert mask.shape[0] == num_lats
assert mask.shape[1] == num_lons
super(T42Grid, self).__init__(num_lons, num_lats, mask_t=mask,
levels=levels, description=description)
def regrid(regrid_weights, src_data, dest_grid):
"""
Regrid a single time index of data.
"""
print('Horizontal regridding ...')
# Destination arrays
dest_data = np.ndarray((dest_grid.num_lat_points,
dest_grid.num_lon_points))
with nc.Dataset(regrid_weights) as wf:
n_s = wf.dimensions['n_s'].size
n_b = wf.dimensions['n_b'].size
row = wf.variables['row'][:]
col = wf.variables['col'][:]
s = wf.variables['S'][:]
dest_data[:, :] = apply_weights(src_data[:, :], dest_data.shape,
n_s, n_b, row, col, s)
return dest_data
def regrid(regrid_weights, src_data, dest_grid):
"""
Regrid a single time index of data.
"""
print('Horizontal regridding ...')
# Destination arrays
dest_data = np.ndarray((dest_grid.num_levels, dest_grid.num_lat_points,
dest_grid.num_lon_points))
with nc.Dataset(regrid_weights) as wf:
n_s = wf.dimensions['n_s'].size
n_b = wf.dimensions['n_b'].size
row = wf.variables['row'][:]
col = wf.variables['col'][:]
s = wf.variables['S'][:]
for l in range(src_data.shape[0]):
dest_data[l, :, :] = apply_weights(src_data[l, :, :], dest_data.shape[1:],
n_s, n_b, row, col, s)
return dest_data
def __init__(self, grid_def, description=''):
with nc.Dataset(grid_def) as f:
# Get lon and lat
x_t = f.variables['lon'][:]
y_t = f.variables['lat'][:]
try:
z = f.variables['depth'][:]
except KeyError:
z = f.variables['level'][:]
try:
mask = f.variables['practical_salinity'][0, :, :, :].mask[:]
except KeyError:
mask = f.variables['s_an'][0, :, :, :].mask[:]
super(WoaGrid, self).__init__(x_t, y_t, z, mask, description)
def __init__(self, grid_def, description=''):
with nc.Dataset(grid_def) as f:
# Select points from double density horizontal grid. Only
# need t-points.
x_t = f.variables['lon'][:]
y_t = f.variables['lat'][:]
try:
z = f.variables['level'][:]
except KeyError:
z = f.variables['depth'][:]
try:
mask = f.variables['pottmp'][0, :].mask[:]
except KeyError:
mask = f.variables['POT'][0, :].mask[:]
super(GodasGrid, self).__init__(x_t, y_t, z, mask, description)
def __init__(self, h_grid_def, v_grid_def, mask_file, description):
with nc.Dataset(h_grid_def) as f:
# Select points from double density horizontal grid. Only
# need t-points.
x_t = f.variables['x'][1::2,1::2]
y_t = f.variables['y'][1::2,1::2]
self.x_vt = f.variables['x'][:]
self.y_vt = f.variables['y'][:]
with nc.Dataset(v_grid_def) as f:
# Only take cell centres.
z = f.variables['zeta'][1::2]
if mask_file is None:
mask = np.zeros_like(x_t, dtype=bool)
else:
with nc.Dataset(mask_file) as f:
mask = np.zeros_like(f.variables['mask'], dtype=bool)
mask[f.variables['mask'][:] == 0.0] = True
super(MomGrid, self).__init__(x_t, y_t, z, mask, description)
def get_time_origin(filename):
"""
Parse time.units to find the start/origin date of the file. Return a
datetime.date object.
"""
date_search_strings = ['\d{4}-\d{2}-\d{2}','\d{4}-\d{1}-\d{2}',
'\d{4}-\d{2}-\d{1}','\d{4}-\d{1}-\d{1}']
with nc.Dataset(filename) as f:
time_var = f.variables['time']
assert 'months since' in time_var.units or \
'days since' in time_var.units or \
'hours since' in time_var.units, \
"Time units doesn't have expected format: {}".format(time_var.units)
for ds in date_search_strings:
m = re.search(ds, time_var.units)
if m is not None:
break
assert m is not None
date = dt.datetime.strptime(m.group(0), '%Y-%m-%d')
return dt.date(date.year, date.month, date.day)
def write_nemo_output_at_time(filename, var_name, var_longname, var_units,
var_data, time_idx, time_pt, write_ic=False):
with nc.Dataset(filename, 'r+') as f:
if not f.variables.has_key(var_name):
var = f.createVariable(var_name, 'f8', ('time_counter', 'z', 'y', 'x'))
var.long_name = var_longname
var.units = var_units
var = f.variables[var_name]
if write_ic:
var[0, :] = var_data[:]
f.variables['time_counter'][0] = time_pt
else:
var[time_idx, :] = var_data[:]
f.variables['time_counter'][time_idx] = time_pt
def _create_file(filename, mask, lat, lon):
"""create the netcdf file to store the srex mask"""
import netCDF4 as nc
with nc.Dataset(filename, 'w') as ncf:
ncf.createDimension('lat', size=lat.size)
ncf.createDimension('lon', size=lon.size)
ncf.createVariable('lat', 'f', 'lat')
ncf.createVariable('lon', 'f', 'lon')
ncf.createVariable('mask', 'f', ('lat', 'lon'))
ncf.variables['lat'][:] = lat
ncf.variables['lon'][:] = lon
ncf.variables['mask'][:] = mask
# create unique filename for the mask
def readGraceData(filename, lat_name, lon_name, data_name, time=None):
'''
This function reads in netcdf data provided by GRACE Tellus
@param filename: Name of file to read in
@param lat_name: Name of latitude data
@param lon_name: Name of longitude data
@param data_name: Name of data product
@param time: Name of time data
'''
nc = Dataset(filename, 'r')
lat_index = nc[lat_name][:]
lon_index = nc[lon_name][:]
data = nc[data_name][:]
if time != None:
time = nc.variables[time]
date_index = pd.to_datetime(num2date(time[:],units=time.units,calendar=time.calendar))
return pd.Panel(data=data, items=date_index,major_axis=lat_index, minor_axis=lon_index)
else:
return pd.DataFrame(data = data, columns=lon_index, index=lat_index)
def nc_to_dataframe(nc_fname,
columns=slice(None)):
"""
Return a pandas data frame containing the information in the
netCDF file *nc_fname*. Return it and a mapping to the header
metadata. Use *columns* to select columns (via a list of column
names).
"""
root = Dataset(nc_fname)
data = {}
data.update({dim: root[dim][:] for dim in root.dimensions if dim != 'time'})
index = data['time'] = map(fromJ2000, root['time'][:])
return (PD.DataFrame(data=data, index=index)[columns],
{x: getattr(root, x) for x in root.ncattrs()})
def test_1D_cf_bounds(self):
"""Test whether the CF Conventions for 1D bounaries are correct"""
final_bounds = np.arange(-180, 181, 30)
lon = xr.Variable(('lon', ), np.arange(-165, 166, 30),
{'bounds': 'lon_bounds'})
cf_bounds = xr.Variable(('lon', 'bnds'), np.zeros((len(lon), 2)))
for i in range(len(lon)):
cf_bounds[i, :] = final_bounds[i:i+2]
ds = xr.Dataset(coords={'lon': lon, 'lon_bounds': cf_bounds})
decoder = psyd.CFDecoder(ds)
self.assertEqual(list(final_bounds),
list(decoder.get_plotbounds(lon)))
def test_1D_bounds_calculation(self):
"""Test whether the 1D cell boundaries are calculated correctly"""
final_bounds = np.arange(-180, 181, 30)
lon = xr.Variable(('lon', ), np.arange(-165, 166, 30))
ds = xr.Dataset(coords={'lon': lon})
decoder = psyd.CFDecoder(ds)
self.assertEqual(list(final_bounds),
list(decoder.get_plotbounds(lon)))
def _filter_test_ds(self):
return xr.Dataset(
{'v0': xr.Variable(('ydim', 'xdim'), np.zeros((4, 4)),
attrs={'test': 1, 'test2': 1}),
'v1': xr.Variable(('xdim', ), np.zeros(4), attrs={'test': 2,
'test2': 2}),
'v2': xr.Variable(('xdim', ), np.zeros(4), attrs={'test': 3,
'test2': 3})},
{'ydim': xr.Variable(('ydim', ), np.arange(1, 5)),
'xdim': xr.Variable(('xdim', ), np.arange(4))})
def test_from_dataset_01_basic(self):
"""test creation without any additional information"""
variables, coords = self._from_dataset_test_variables
ds = xr.Dataset(variables, coords)
l = self.list_class.from_dataset(ds)
self.assertEqual(len(l), 4)
self.assertEqual(set(l.names), set(variables))
for arr in l:
self.assertEqual(arr.dims, variables[arr.name].dims,
msg="Wrong dimensions for variable " + arr.name)
self.assertEqual(arr.shape, variables[arr.name].shape,
msg="Wrong shape for variable " + arr.name)
def test_from_dataset_02_name(self):
"""Test the from_dataset creation method with selected names"""
variables, coords = self._from_dataset_test_variables
ds = xr.Dataset(variables, coords)
l = self.list_class.from_dataset(ds, name="v2")
self.assertEqual(len(l), 1)
self.assertEqual(set(l.names), {"v2"})
for arr in l:
self.assertEqual(arr.dims, variables[arr.name].dims,
msg="Wrong dimensions for variable " + arr.name)
self.assertEqual(arr.shape, variables[arr.name].shape,
msg="Wrong shape for variable " + arr.name)
def test_from_dataset_04_exact_selection(self):
"""Test the from_dataset creation method with selected names"""
variables, coords = self._from_dataset_test_variables
ds = xr.Dataset(variables, coords)
l = self.list_class.from_dataset(ds, ydim=2, method=None,
name=['v0', 'v2'])
self.assertEqual(len(l), 2)
self.assertEqual(set(l.names), {'v0', 'v2'})
for arr in l:
self.assertEqual(arr.ydim, 2,
msg="Wrong ydim slice for " + arr.name)
def test_from_dataset_05_exact_array_selection(self):
"""Test the from_dataset creation method with selected names"""
variables, coords = self._from_dataset_test_variables
ds = xr.Dataset(variables, coords)
l = self.list_class.from_dataset(ds, ydim=[[2, 3]], method=None,
name=['v0', 'v2'])
self.assertEqual(len(l), 2)
self.assertEqual(set(l.names), {'v0', 'v2'})
for arr in l:
self.assertEqual(arr.ydim.values.tolist(), [2, 3],
msg="Wrong ydim slice for " + arr.name)