def test_timedelta_arange(self):
a = np.arange(3, 10, dtype='m8')
assert_equal(a.dtype, np.dtype('m8'))
assert_equal(a, np.timedelta64(0) + np.arange(3, 10))
a = np.arange(np.timedelta64(3, 's'), 10, 2, dtype='m8')
assert_equal(a.dtype, np.dtype('m8[s]'))
assert_equal(a, np.timedelta64(0, 's') + np.arange(3, 10, 2))
# Step of 0 is disallowed
assert_raises(ValueError, np.arange, np.timedelta64(0),
np.timedelta64(5), 0)
# Promotion across nonlinear unit boundaries is disallowed
assert_raises(TypeError, np.arange, np.timedelta64(0, 'D'),
np.timedelta64(5, 'M'))
assert_raises(TypeError, np.arange, np.timedelta64(0, 'Y'),
np.timedelta64(5, 'D'))
python类timedelta64()的实例源码
def time_seconds(tc_array, year):
"""Return the time object from the timecodes
"""
tc_array = np.array(tc_array, copy=True)
word = tc_array[:, 0]
day = word >> 1
word = tc_array[:, 1].astype(np.uint64)
msecs = ((127) & word) * 1024
word = tc_array[:, 2]
msecs += word & 1023
msecs *= 1024
word = tc_array[:, 3]
msecs += word & 1023
return (np.datetime64(
str(year) + '-01-01T00:00:00Z', 's') +
msecs[:].astype('timedelta64[ms]') +
(day - 1)[:].astype('timedelta64[D]'))
def test_corrupted_data(self):
self.incoming.add_measures(self.metric.id, [
incoming.Measure(datetime64(2014, 1, 1, 12, 0, 1), 69),
])
self.trigger_processing()
self.incoming.add_measures(self.metric.id, [
incoming.Measure(datetime64(2014, 1, 1, 13, 0, 1), 1),
])
with mock.patch('gnocchi.carbonara.AggregatedTimeSerie.unserialize',
side_effect=carbonara.InvalidData()):
with mock.patch('gnocchi.carbonara.BoundTimeSerie.unserialize',
side_effect=carbonara.InvalidData()):
self.trigger_processing()
m = self.storage.get_measures(self.metric)
self.assertIn((datetime64(2014, 1, 1),
numpy.timedelta64(1, 'D'), 1), m)
self.assertIn((datetime64(2014, 1, 1, 13),
numpy.timedelta64(1, 'h'), 1), m)
self.assertIn((datetime64(2014, 1, 1, 13),
numpy.timedelta64(5, 'm'), 1), m)
def test_aborted_initial_processing(self):
self.incoming.add_measures(self.metric.id, [
incoming.Measure(datetime64(2014, 1, 1, 12, 0, 1), 5),
])
with mock.patch.object(self.storage, '_store_unaggregated_timeserie',
side_effect=Exception):
try:
self.trigger_processing()
except Exception:
pass
with mock.patch('gnocchi.storage.LOG') as LOG:
self.trigger_processing()
self.assertFalse(LOG.error.called)
m = self.storage.get_measures(self.metric)
self.assertIn((datetime64(2014, 1, 1),
numpy.timedelta64(1, 'D'), 5.0), m)
self.assertIn((datetime64(2014, 1, 1, 12),
numpy.timedelta64(1, 'h'), 5.0), m)
self.assertIn((datetime64(2014, 1, 1, 12),
numpy.timedelta64(5, 'm'), 5.0), m)
def test_add_measures_update_subset_split(self):
m, m_sql = self._create_metric('medium')
measures = [
incoming.Measure(datetime64(2014, 1, 6, i, j, 0), 100)
for i in six.moves.range(2) for j in six.moves.range(0, 60, 2)]
self.incoming.add_measures(m.id, measures)
self.trigger_processing([str(m.id)])
# add measure to end, in same aggregate time as last point.
self.incoming.add_measures(m.id, [
incoming.Measure(datetime64(2014, 1, 6, 1, 58, 1), 100)])
with mock.patch.object(self.storage, '_store_metric_measures') as c:
# should only resample last aggregate
self.trigger_processing([str(m.id)])
count = 0
for call in c.mock_calls:
# policy is 60 points and split is 48. should only update 2nd half
args = call[1]
if (args[0] == m_sql
and args[2] == 'mean'
and args[1].sampling == numpy.timedelta64(1, 'm')):
count += 1
self.assertEqual(1, count)
def test_74_percentile_serialized(self):
ts = carbonara.TimeSerie.from_tuples(
[(datetime64(2014, 1, 1, 12, 0, 0), 3),
(datetime64(2014, 1, 1, 12, 0, 4), 5),
(datetime64(2014, 1, 1, 12, 0, 9), 6)])
ts = self._resample(ts, numpy.timedelta64(60, 's'), '74pct')
self.assertEqual(1, len(ts))
self.assertEqual(5.48, ts[datetime64(2014, 1, 1, 12, 0, 0)][1])
# Serialize and unserialize
key = ts.get_split_key()
o, s = ts.serialize(key)
saved_ts = carbonara.AggregatedTimeSerie.unserialize(
s, key, '74pct')
ts = carbonara.TimeSerie.from_tuples(
[(datetime64(2014, 1, 1, 12, 0, 0), 3),
(datetime64(2014, 1, 1, 12, 0, 4), 5),
(datetime64(2014, 1, 1, 12, 0, 9), 6)])
ts = self._resample(ts, numpy.timedelta64(60, 's'), '74pct')
saved_ts.merge(ts)
self.assertEqual(1, len(ts))
self.assertEqual(5.48, ts[datetime64(2014, 1, 1, 12, 0, 0)][1])
def test_aggregation_std_with_unique(self):
ts = carbonara.TimeSerie.from_tuples(
[(datetime64(2014, 1, 1, 12, 0, 0), 3)])
ts = self._resample(ts, numpy.timedelta64(60, 's'), 'std')
self.assertEqual(0, len(ts), ts.values)
ts = carbonara.TimeSerie.from_tuples(
[(datetime64(2014, 1, 1, 12, 0, 0), 3),
(datetime64(2014, 1, 1, 12, 0, 4), 6),
(datetime64(2014, 1, 1, 12, 0, 9), 5),
(datetime64(2014, 1, 1, 12, 1, 6), 9)])
ts = self._resample(ts, numpy.timedelta64(60, 's'), "std")
self.assertEqual(1, len(ts))
self.assertEqual(1.5275252316519465,
ts[datetime64(2014, 1, 1, 12, 0, 0)][1])
def test_serialize(self):
ts = {'sampling': numpy.timedelta64(500, 'ms'), 'agg': 'mean'}
tsb = carbonara.BoundTimeSerie(block_size=ts['sampling'])
tsb.set_values(numpy.array([
(datetime64(2014, 1, 1, 12, 0, 0, 1234), 3),
(datetime64(2014, 1, 1, 12, 0, 0, 321), 6),
(datetime64(2014, 1, 1, 12, 1, 4, 234), 5),
(datetime64(2014, 1, 1, 12, 1, 9, 32), 7),
(datetime64(2014, 1, 1, 12, 2, 12, 532), 1)],
dtype=carbonara.TIMESERIES_ARRAY_DTYPE),
before_truncate_callback=functools.partial(
self._resample_and_merge, agg_dict=ts))
key = ts['return'].get_split_key()
o, s = ts['return'].serialize(key)
self.assertEqual(ts['return'],
carbonara.AggregatedTimeSerie.unserialize(
s, key, 'mean'))
def test_no_truncation(self):
ts = {'sampling': numpy.timedelta64(60, 's'), 'agg': 'mean'}
tsb = carbonara.BoundTimeSerie()
for i in six.moves.range(1, 11):
tsb.set_values(numpy.array([
(datetime64(2014, 1, 1, 12, i, i), float(i))],
dtype=carbonara.TIMESERIES_ARRAY_DTYPE),
before_truncate_callback=functools.partial(
self._resample_and_merge, agg_dict=ts))
tsb.set_values(numpy.array([
(datetime64(2014, 1, 1, 12, i, i + 1), float(i + 1))],
dtype=carbonara.TIMESERIES_ARRAY_DTYPE),
before_truncate_callback=functools.partial(
self._resample_and_merge, agg_dict=ts))
self.assertEqual(i, len(list(ts['return'].fetch())))
def test_split_key(self):
self.assertEqual(
numpy.datetime64("2014-10-07"),
carbonara.SplitKey.from_timestamp_and_sampling(
numpy.datetime64("2015-01-01T15:03"),
numpy.timedelta64(3600, 's')))
self.assertEqual(
numpy.datetime64("2014-12-31 18:00"),
carbonara.SplitKey.from_timestamp_and_sampling(
numpy.datetime64("2015-01-01 15:03:58"),
numpy.timedelta64(58, 's')))
key = carbonara.SplitKey.from_timestamp_and_sampling(
numpy.datetime64("2015-01-01 15:03"),
numpy.timedelta64(3600, 's'))
self.assertGreater(key, numpy.datetime64("1970"))
self.assertGreaterEqual(key, numpy.datetime64("1970"))
def test_split(self):
sampling = numpy.timedelta64(5, 's')
points = 100000
ts = carbonara.TimeSerie.from_data(
timestamps=list(map(datetime.datetime.utcfromtimestamp,
six.moves.range(points))),
values=list(six.moves.range(points)))
agg = self._resample(ts, sampling, 'mean')
grouped_points = list(agg.split())
self.assertEqual(
math.ceil((points / sampling.astype(float))
/ carbonara.SplitKey.POINTS_PER_SPLIT),
len(grouped_points))
self.assertEqual("0.0",
str(carbonara.SplitKey(grouped_points[0][0], 0)))
# 3600 × 5s = 5 hours
self.assertEqual(datetime64(1970, 1, 1, 5),
grouped_points[1][0])
self.assertEqual(carbonara.SplitKey.POINTS_PER_SPLIT,
len(grouped_points[0][1]))
def test_from_timeseries(self):
sampling = numpy.timedelta64(5, 's')
points = 100000
ts = carbonara.TimeSerie.from_data(
timestamps=list(map(datetime.datetime.utcfromtimestamp,
six.moves.range(points))),
values=list(six.moves.range(points)))
agg = self._resample(ts, sampling, 'mean')
split = [t[1] for t in list(agg.split())]
self.assertEqual(agg,
carbonara.AggregatedTimeSerie.from_timeseries(
split,
sampling=agg.sampling,
max_size=agg.max_size,
aggregation_method=agg.aggregation_method))
def add_hours_elpased_to_events(events, dt, remove_charttime=True):
events['HOURS'] = (events.CHARTTIME - dt).apply(lambda s: s / np.timedelta64(1, 's')) / 60./60
if remove_charttime:
del events['CHARTTIME']
return events
def add_age_to_icustays(stays):
stays['AGE'] = (stays.INTIME - stays.DOB).apply(lambda s: s / np.timedelta64(1, 's')) / 60./60/24/365
stays.AGE.ix[stays.AGE<0] = 90
return stays
def _make_actual_bg_array(bg_df, start_index, end_index, prediction_start_time):
total_len = start_index - end_index + 1
time_bg_array = np.zeros(total_len)
actual_bg_array = np.zeros(total_len)
array_index = 0
miss = 0
for df_index in range(start_index, end_index - 1, -1):
#Keep track of the time starting at 0 at the start_index
time = (bg_df.iloc[df_index]['created_at'] - bg_df.iloc[start_index]['created_at']) / np.timedelta64(1, 'm')
if time > prediction_start_time:
time_bg_array[array_index] = time
try:
actual_bg_array[array_index] = bg_df.iloc[df_index]['openaps']['enacted']['bg']
array_index += 1
last_time = time
except:
try:
actual_bg_array[array_index] = bg_df.iloc[df_index]['openaps']['suggested']['bg']
array_index += 1
last_time = time
except:
#If a miss, don't move to the next index and instead add one to the number missed
miss += 1
else:
miss += 1
#Remove the number of missed data
time_bg_array = np.resize(time_bg_array, total_len - miss)
actual_bg_array = np.resize(actual_bg_array, total_len - miss)
return time_bg_array, actual_bg_array
#Returns true if the data lies in a data gap, so it will not be used
def add_pricing_date(i=0,in_place=True):
if in_place:
Pricing_Database.pricing_date += np.timedelta64(i, 'D')
return None
else:
return Pricing_Database.pricing_date + np.timedelta64(i, 'D')
def add_date(npdate,i=0):
return npdate + np.timedelta64(i,'D')
def date_diff(dt1,dt2):
if(isinstance(dt1,int)):
dt1 = add_pricing_date(dt1,in_place=False)
if(isinstance(dt2,int)):
dt2 = add_pricing_date(dt2,in_place=False)
return (dt2-dt1)/np.timedelta64(1,'D')
def __to_Timestamp__(self, time):
return time * np.timedelta64(1, 's') + np.datetime64("1970-01-01 00:00:00")
def test_intersection_sem_mock_do_test_2(self):
poly = Polygon([(1, 1), (1, 3), (4, 3), (4, 1), (1, 1)])
response = self.traj2.intersection_shapely(poly)
traj = self.traj2.to_Trajectory(response)
time = np.datetime64('2000-02-01T00:01:00')
seconds = (time - np.datetime64("1970-01-01 00:00:00")) / np.timedelta64(1, 's')
assert (np.array_equal(traj.getTime()[0], seconds))
assert (np.array_equal(traj.getTime()[1], seconds))
assert (np.array_equal(traj.getTime()[2], seconds))