def test_mag_instantiation(self):
start_time = datetime(2017, 2, 12, 15)
end_time = datetime(2017, 2, 12, 16)
self.config.dims["report_number"].update({
"min": start_time, # for convenience, will convert according to index_by units if this is datetime
"max": end_time,
})
agg_list = generate_aggregation_list(self.config, self.files)
self.assertEqual(len(agg_list), 60)
evaluate_aggregation_list(self.config, agg_list, self.file)
with nc.Dataset(self.file) as nc_out:
time = nc_out.variables["OB_time"][:, 0]
out_start, out_end = nc.num2date(time[[0, -1]], nc_out.variables["OB_time"].units)
self.assertGreaterEqual(out_start, start_time-timedelta(seconds=0.25))
self.assertLessEqual(out_end, end_time+timedelta(seconds=0.25))
self.assertAlmostEqual(np.mean(np.diff(time)), 1, delta=0.001)
self.assertAlmostEqual(np.max(np.diff(time)), 1, delta=0.001)
self.assertAlmostEqual(np.min(np.diff(time)), 1, delta=0.001)
self.assertAlmostEqual(int((end_time - start_time).total_seconds()), time.size, delta=1)
python类num2date()的实例源码
def test_giving_extra_files(self):
start_time = datetime(2017, 2, 12, 15, 30)
end_time = datetime(2017, 2, 12, 16)
self.config.dims["report_number"].update({
"min": start_time, # for convenience, will convert according to index_by units if this is datetime
"max": end_time,
})
agg_list = generate_aggregation_list(self.config, self.files)
self.assertEqual(len(agg_list), 30)
evaluate_aggregation_list(self.config, agg_list, self.file)
with nc.Dataset(self.file) as nc_out:
time = nc_out.variables["OB_time"][:, 0]
out_start, out_end = nc.num2date(time[[0, -1]], nc_out.variables["OB_time"].units)
self.assertGreaterEqual(out_start, start_time-timedelta(seconds=0.25))
self.assertLessEqual(out_end, end_time+timedelta(seconds=0.25))
self.assertAlmostEqual(np.mean(np.diff(time)), 1, delta=0.001)
self.assertAlmostEqual(np.max(np.diff(time)), 1, delta=0.001)
self.assertAlmostEqual(np.min(np.diff(time)), 1, delta=0.001)
self.assertAlmostEqual(int((end_time - start_time).total_seconds()), time.size, delta=1)
def test_strict_time(self):
"""Make sure the time array looks ok. Evenly spaced, bounds are correct."""
numeric_times = self.output.variables["OB_time"][:].flatten()
np.set_printoptions(threshold=np.inf)
# logger.debug(numeric_times)
# logger.debug(np.diff(numeric_times)[9:].reshape(-1, 10))
self.assertAlmostEqual(np.mean(np.diff(numeric_times)), 0.1, delta=0.002)
self.assertAlmostEqual(np.min(np.diff(numeric_times)), 0.1, delta=0.002)
self.assertAlmostEqual(np.max(np.diff(numeric_times)), 0.1, delta=0.002)
datetimes = nc.num2date(numeric_times, self.output.variables["OB_time"].units)
# since we have records of size 10 and we don't want any coming in before start time
# the start time may be up to 0.9 after the aggregation start time,
self.assertLess(abs((datetimes[0]-self.start_time).total_seconds()), 0.91)
# similarly for the aggregation end, we aren't chopping off in the middle of a records,
# so even if the first one is before the end, up to 0.91 may be after.
self.assertLess(abs((datetimes[-10]-self.end_time).total_seconds()), 0.91)
def test_main(self):
start_time = datetime(2017, 7, 14, 0, 0)
end_time = start_time + timedelta(days=1) - timedelta(milliseconds=1)
self.config.dims["time"].update({
"index_by": "time",
"min": start_time, # for convenience, will convert according to index_by units if this is datetime
"max": end_time,
"expected_cadence": {"time": 1},
})
agg_list = generate_aggregation_list(self.config, self.files)
self.assertEqual(len(agg_list), 1)
evaluate_aggregation_list(self.config, agg_list, self.file)
with nc.Dataset(self.file) as nc_out:
time = nc_out.variables["time"][:]
out_start, out_end = nc.num2date(time[[0, -1]], nc_out.variables["time"].units)
self.assertGreaterEqual(out_start, start_time)
self.assertLessEqual(out_end, end_time)
self.assertAlmostEqual(np.mean(np.diff(time)), 1, delta=0.001)
self.assertAlmostEqual(np.max(np.diff(time)), 1, delta=0.001)
self.assertAlmostEqual(np.min(np.diff(time)), 1, delta=0.001)
self.assertAlmostEqual(int((end_time - start_time).total_seconds()), time.size, delta=1)
def get_data(lidar_datafile):
"""
Getting lidar data from netcdf and store those in a dictionary. Signals
are filtered using thresholds derived by percentile.
"""
ids = netCDF4.Dataset(lidar_datafile)
l_ds = {}
l_ds['Time'] = netCDF4.num2date(ids.variables['Time'][:], ids.variables['Time'].units)
l_ds['Longitude'] = ids.variables['Longitude'][:]
l_ds['Latitude'] = ids.variables['Latitude'][:]
l_ds['Altitude'] = ids.variables['Altitude'][:]
l_ds['rangeCorrected_0'] = ids.variables['rangeCorrected_0'][:]
l_ds['rangeCorrected_1'] = ids.variables['rangeCorrected_1'][:]
for var in ['rangeCorrected_0', 'rangeCorrected_1']:
_perc_05 = np.percentile(l_ds[var], 2)
_perc_95 = np.percentile(l_ds[var], 98)
l_ds[var][l_ds[var] < _perc_05] = np.nan
l_ds[var][l_ds[var] > _perc_95] = np.nan
return l_ds
def read_fgga_na(ifile):
"""
Reads the FGGA data
:param ifile: nasaAmes input filename"""
try:
import nappy
except:
sys.stdout.write('Can not import nappy ...\n')
return
ds = nappy.openNAFile(ifile)
ds.readData()
timestamp = netCDF4.num2date(ds.X, ds.getIndependentVariable(0)[1])
from collections import OrderedDict
dict = OrderedDict()
dict['timestamp'] = timestamp
for i, v in enumerate(['co2_ppm', 'co2_flag', 'ch4_ppb', 'ch4_flag']):
dict[v] = ds.V[i]
df = pd.DataFrame(dict)
df = df.set_index('timestamp')
return df
def readGraceData(filename, lat_name, lon_name, data_name, time=None):
'''
This function reads in netcdf data provided by GRACE Tellus
@param filename: Name of file to read in
@param lat_name: Name of latitude data
@param lon_name: Name of longitude data
@param data_name: Name of data product
@param time: Name of time data
'''
nc = Dataset(filename, 'r')
lat_index = nc[lat_name][:]
lon_index = nc[lon_name][:]
data = nc[data_name][:]
if time != None:
time = nc.variables[time]
date_index = pd.to_datetime(num2date(time[:],units=time.units,calendar=time.calendar))
return pd.Panel(data=data, items=date_index,major_axis=lat_index, minor_axis=lon_index)
else:
return pd.DataFrame(data = data, columns=lon_index, index=lat_index)
def time(self, n):
"""Get timestamp from a time frame"""
tvar = self.nc.variables['time']
return num2date(tvar[n], tvar.units)
def __read_var(self, file, var):
ds = Dataset(file, 'r')
self.nx = len(ds.dimensions[self.config.xdim])
self.ny = len(ds.dimensions[self.config.ydim])
self.nt = len(ds.dimensions[self.config.tdim])
self.x = ds.variables[self.config.xname][:]
self.y = ds.variables[self.config.yname][:]
# Sort out the dimensions.
if self.config.clip:
alldims = {}
for key, val in list(ds.dimensions.items()):
alldims[key] = (0, len(val))
vardims = ds.variables[var].dimensions
for clipname in self.config.clip:
clipdims = self.config.clip[clipname]
common = set(alldims.keys()).intersection([clipname])
for k in common:
alldims[k] = clipdims
dims = [alldims[d] for d in vardims]
self.data = np.flipud(np.squeeze(ds.variables[var][
dims[0][0]:dims[0][1],
dims[1][0]:dims[1][1],
dims[2][0]:dims[2][1],
dims[3][0]:dims[3][1]
]))
self.time = ds.variables[self.config.tname][:]
self.Times = []
for t in self.time:
self.Times.append(num2date(
t,
'seconds since {}'.format(self.config.basedate),
calendar=self.config.calendar
))
ds.close()
def test_combined_time(file_to_combine_setup):
date_template = '2017-04-29 0{0}:00:00'
expected = [parser.parse(date_template.format(i)) for i in range(3)]
with Dataset(_ids_in_order_nc) as nc:
var = nc.variables['time']
returned = list(num2date(var[:], var.units))
assert expected == returned
def time_from_dataset(nc_dataset):
if 'time' in nc_dataset.variables:
var = nc_dataset.variables['time']
units = var.units
val = var[0]
date = num2date(val, units)
elif 'model_output_valid_time' in nc_dataset.ncattrs():
date = date_parser.parse(
nc_dataset.model_output_valid_time.replace('_', ' '))
else:
raise ValueError('Could not find model output time in netCDF dataset.')
if date.tzinfo is None:
date = date.replace(tzinfo=pytz.utc)
return date
def _read_tcoord(self):
""" Read time coordinate information from netcdf file(s) """
nc = self._opennc(self.f)
t = nc.variables[self.tcoord]
if len(glob.glob(self.f)) > 1:
try:
self.dates = num2date(MFTime(t)[:], calendar=t.calendar, units=t.units)
except:
print 'netcdf4.MFTime incompatible with NETCDF4. Try concatenating data into a single file.'
raise NetCDF4ERROR(err)
else:
self.dates = num2date(t[:], calendar=t.calendar, units=t.units)
def _read_dates(self):
""" Read date information from file """
nc = Dataset(self.f)
t = nc.variables['time']
self.original_dates = num2date(t[:],units=t.units)
self.hh = np.array([dt.hour for dt in self.original_dates], dtype=np.int)
self.dd = np.array([dt.day for dt in self.original_dates], dtype=np.int)
self.mm = np.array([dt.month for dt in self.original_dates], dtype=np.int)
self.yy = np.array([dt.year for dt in self.original_dates], dtype=np.int)
def _read_dates(self):
""" Read date information from file """
nc = Dataset(self.f)
t = nc.variables['time']
self.original_dates = num2date(t[:],units=t.units)
self.hh = np.array([dt.hour for dt in self.original_dates], dtype=np.int)
self.dd = np.array([dt.day for dt in self.original_dates], dtype=np.int)
self.mm = np.array([dt.month for dt in self.original_dates], dtype=np.int)
self.yy = np.array([dt.year for dt in self.original_dates], dtype=np.int)
def _read_dates(self):
""" Read date information from file """
nc = Dataset(self.f)
t = nc.variables['time']
self.original_dates = num2date(t[:],units=t.units)
self.hh = np.array([dt.hour for dt in self.original_dates], dtype=np.int)
self.dd = np.array([dt.day for dt in self.original_dates], dtype=np.int)
self.mm = np.array([dt.month for dt in self.original_dates], dtype=np.int)
self.yy = np.array([dt.year for dt in self.original_dates], dtype=np.int)
def get_ncdates(nc, tvar='time'):
""" Return dates from nercdf time coordinate """
t = nc.variables[tvar]
dts = num2date(t[:], t.units, calendar=t.calendar)
return dts
def _read_dates(self):
""" Read date information from file """
nc = Dataset(self.f)
t = nc.variables['time']
self.original_dates = num2date(t[:],units=t.units)
self.hh = np.array([dt.hour for dt in self.original_dates], dtype=np.int)
self.dd = np.array([dt.day for dt in self.original_dates], dtype=np.int)
self.mm = np.array([dt.month for dt in self.original_dates], dtype=np.int)
self.yy = np.array([dt.year for dt in self.original_dates], dtype=np.int)
def _read_dates(self):
""" Read date information from file """
nc = Dataset(self.f)
t = nc.variables['time']
self.original_dates = num2date(t[:],units=t.units)
self.hh = np.array([dt.hour for dt in self.original_dates], dtype=np.int)
self.dd = np.array([dt.day for dt in self.original_dates], dtype=np.int)
self.mm = np.array([dt.month for dt in self.original_dates], dtype=np.int)
self.yy = np.array([dt.year for dt in self.original_dates], dtype=np.int)
def _read_dates(self):
""" Read date information from file """
nc = Dataset(self.f)
t = nc.variables['time']
self.original_dates = num2date(t[:],units=t.units)
self.hh = np.array([dt.hour for dt in self.original_dates], dtype=np.int)
self.dd = np.array([dt.day for dt in self.original_dates], dtype=np.int)
self.mm = np.array([dt.month for dt in self.original_dates], dtype=np.int)
self.yy = np.array([dt.year for dt in self.original_dates], dtype=np.int)
def get_ncdates(nc, tvar='time'):
""" Return dates from nercdf time coordinate """
t = nc.variables[tvar]
dts = num2date(t[:], t.units, calendar=t.calendar)
return dts
def test_time(self):
"""Make sure the time array looks ok. Evenly spaced, bounds are correct."""
numeric_times = self.output.variables["L1a_SciData_TimeStamp"][:]
# timestamps on SEIS seem pretty well behaved, these are small delta's but
# the timestamps are almost absolutely regular
self.assertAlmostEqual(np.mean(np.diff(numeric_times)), 1, delta=0.001)
self.assertAlmostEqual(np.min(np.diff(numeric_times)), 1, delta=0.001)
self.assertAlmostEqual(np.max(np.diff(numeric_times)), 1, delta=0.001)
datetimes = nc.num2date(numeric_times, self.output.variables["L1a_SciData_TimeStamp"].units)
self.assertLessEqual(abs((datetimes[0]-self.start_time).total_seconds()), 1)
self.assertLessEqual(abs((datetimes[-1]-self.end_time).total_seconds()), 1)
def test_time(self):
"""Make sure the time array looks ok. Evenly spaced, bounds are correct."""
numeric_times = self.output.variables["L1a_SciData_TimeStamp"][:, 0]
self.assertAlmostEqual(np.mean(np.diff(numeric_times)), 1, delta=0.01)
self.assertAlmostEqual(np.min(np.diff(numeric_times)), 1, delta=0.01)
self.assertAlmostEqual(np.max(np.diff(numeric_times)), 1, delta=0.01)
datetimes = nc.num2date(numeric_times, self.output.variables["L1a_SciData_TimeStamp"].units)
self.assertLess(abs((datetimes[0]-self.start_time).total_seconds()), 0.1)
self.assertLess(abs((datetimes[-1]-self.end_time).total_seconds()), 0.1)
def test_time(self):
"""Make sure the time array looks ok. Evenly spaced, bounds are correct."""
numeric_times = self.output.variables["L1a_SciData_TimeStamp"][:]
self.assertAlmostEqual(np.mean(np.diff(numeric_times)), 1, delta=0.01)
self.assertAlmostEqual(np.min(np.diff(numeric_times)), 1, delta=0.01)
self.assertAlmostEqual(np.max(np.diff(numeric_times)), 1, delta=0.01)
datetimes = nc.num2date(numeric_times, self.output.variables["L1a_SciData_TimeStamp"].units)
self.assertLess(abs((datetimes[0]-self.start_time).total_seconds()), 0.1)
self.assertLess(abs((datetimes[-1]-self.end_time).total_seconds()), 0.1)
def test_get_start_time(self):
self.config.dims["report_number"].update({
"index_by": "OB_time",
"other_dim_inds": {"number_samples_per_report": 0}
})
a = InputFileNode(self.config, test_input_file)
start_found = num2date(a.get_first_of_index_by(self.config.dims["report_number"]),
"seconds since 2000-01-01 12:00:00")
self.assertEqual(start_found, datetime(2017, 2, 12, 14, 59, 59, 900905))
def test_get_end_time(self):
self.config.dims["report_number"].update({
"index_by": "OB_time",
"other_dim_inds": {"number_samples_per_report": 0}
})
a = InputFileNode(self.config, test_input_file)
end_found = num2date(a.get_last_of_index_by(self.config.dims["report_number"]),
"seconds since 2000-01-01 12:00:00")
self.assertEqual(end_found, datetime(2017, 2, 12, 15, 0, 58, 900926))
def test_get_end_time_with_cadence(self):
self.config.dims["report_number"].update({
"index_by": "OB_time",
"other_dim_inds": {"number_samples_per_report": 0},
"expected_cadence": {"report_number": 1, "number_samples_per_report": 10}
})
a = InputFileNode(self.config, test_input_file)
end_found = num2date(a.get_last_of_index_by(self.config.dims["report_number"]),
"seconds since 2000-01-01 12:00:00")
self.assertEqual(end_found, datetime(2017, 2, 12, 15, 0, 58, 900926))
def test_get_start_time(self):
"""Test that a valid dim_configs is accepted."""
self.config.dims["time"].update({
"index_by": "time"
})
a = InputFileNode(self.config, another_input_file)
start_found = num2date(a.get_first_of_index_by(self.config.dims["time"]),
"seconds since 2000-01-01 12:00:00")
self.assertEqual(start_found, datetime(2017, 4, 14, 20, 27, 59, 900871))
def test_get_end_time(self):
"""Test that a valid dim_configs is accepted."""
self.config.dims["time"].update({
"index_by": "time"
})
a = InputFileNode(self.config, another_input_file)
end_found = num2date(a.get_last_of_index_by(self.config.dims["time"]),
"seconds since 2000-01-01 12:00:00")
self.assertEqual(end_found, datetime(2017, 4, 14, 20, 28, 59, 800611))
def test_time(self):
"""Make sure the time array looks ok. Evenly spaced, bounds are correct."""
numeric_times = self.output.variables["OB_time"][:, 0].flatten()
self.assertAlmostEqual(np.mean(np.diff(numeric_times)), 1, delta=0.02)
self.assertAlmostEqual(np.min(np.diff(numeric_times)), 1, delta=0.02)
self.assertAlmostEqual(np.max(np.diff(numeric_times)), 1, delta=0.02)
datetimes = nc.num2date(numeric_times, self.output.variables["OB_time"].units)
self.assertLess(abs((datetimes[0]-self.start_time).total_seconds()), 1)
self.assertLess(abs((datetimes[-1]-self.end_time).total_seconds()), 1)
def test_time(self):
"""Make sure the time array looks ok. Evenly spaced, bounds are correct."""
numeric_times = self.output.variables["time"][:]
self.assertAlmostEqual(np.mean(np.diff(numeric_times)), 0.1, delta=0.002)
self.assertAlmostEqual(np.min(np.diff(numeric_times)), 0.1, delta=0.002)
self.assertAlmostEqual(np.max(np.diff(numeric_times)), 0.1, delta=0.002)
datetimes = nc.num2date(numeric_times, self.output.variables["time"].units)
self.assertLess(abs((datetimes[0] - self.start_time).total_seconds()), 0.1)
self.assertGreaterEqual(datetimes[0], self.start_time)
self.assertLess(abs((datetimes[-1] - self.end_time).total_seconds()), 0.1)
self.assertLessEqual(datetimes[-1], self.end_time)