python类timedelta64()的实例源码

test_datetime.py 文件源码 项目:aws-lambda-numpy 作者: vitolimandibhrata 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_timedelta_arange(self):
        a = np.arange(3, 10, dtype='m8')
        assert_equal(a.dtype, np.dtype('m8'))
        assert_equal(a, np.timedelta64(0) + np.arange(3, 10))

        a = np.arange(np.timedelta64(3, 's'), 10, 2, dtype='m8')
        assert_equal(a.dtype, np.dtype('m8[s]'))
        assert_equal(a, np.timedelta64(0, 's') + np.arange(3, 10, 2))

        # Step of 0 is disallowed
        assert_raises(ValueError, np.arange, np.timedelta64(0),
                                np.timedelta64(5), 0)
        # Promotion across nonlinear unit boundaries is disallowed
        assert_raises(TypeError, np.arange, np.timedelta64(0, 'D'),
                                np.timedelta64(5, 'M'))
        assert_raises(TypeError, np.arange, np.timedelta64(0, 'Y'),
                                np.timedelta64(5, 'D'))
hrpt.py 文件源码 项目:satpy 作者: pytroll 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def time_seconds(tc_array, year):
    """Return the time object from the timecodes
    """
    tc_array = np.array(tc_array, copy=True)
    word = tc_array[:, 0]
    day = word >> 1
    word = tc_array[:, 1].astype(np.uint64)
    msecs = ((127) & word) * 1024
    word = tc_array[:, 2]
    msecs += word & 1023
    msecs *= 1024
    word = tc_array[:, 3]
    msecs += word & 1023
    return (np.datetime64(
        str(year) + '-01-01T00:00:00Z', 's') +
        msecs[:].astype('timedelta64[ms]') +
        (day - 1)[:].astype('timedelta64[D]'))
test_storage.py 文件源码 项目:gnocchi 作者: gnocchixyz 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_corrupted_data(self):
        self.incoming.add_measures(self.metric.id, [
            incoming.Measure(datetime64(2014, 1, 1, 12, 0, 1), 69),
        ])
        self.trigger_processing()

        self.incoming.add_measures(self.metric.id, [
            incoming.Measure(datetime64(2014, 1, 1, 13, 0, 1), 1),
        ])

        with mock.patch('gnocchi.carbonara.AggregatedTimeSerie.unserialize',
                        side_effect=carbonara.InvalidData()):
            with mock.patch('gnocchi.carbonara.BoundTimeSerie.unserialize',
                            side_effect=carbonara.InvalidData()):
                self.trigger_processing()

        m = self.storage.get_measures(self.metric)
        self.assertIn((datetime64(2014, 1, 1),
                       numpy.timedelta64(1, 'D'), 1), m)
        self.assertIn((datetime64(2014, 1, 1, 13),
                       numpy.timedelta64(1, 'h'), 1), m)
        self.assertIn((datetime64(2014, 1, 1, 13),
                       numpy.timedelta64(5, 'm'), 1), m)
test_storage.py 文件源码 项目:gnocchi 作者: gnocchixyz 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_aborted_initial_processing(self):
        self.incoming.add_measures(self.metric.id, [
            incoming.Measure(datetime64(2014, 1, 1, 12, 0, 1), 5),
        ])
        with mock.patch.object(self.storage, '_store_unaggregated_timeserie',
                               side_effect=Exception):
            try:
                self.trigger_processing()
            except Exception:
                pass

        with mock.patch('gnocchi.storage.LOG') as LOG:
            self.trigger_processing()
            self.assertFalse(LOG.error.called)

        m = self.storage.get_measures(self.metric)
        self.assertIn((datetime64(2014, 1, 1),
                       numpy.timedelta64(1, 'D'), 5.0), m)
        self.assertIn((datetime64(2014, 1, 1, 12),
                       numpy.timedelta64(1, 'h'), 5.0), m)
        self.assertIn((datetime64(2014, 1, 1, 12),
                       numpy.timedelta64(5, 'm'), 5.0), m)
test_storage.py 文件源码 项目:gnocchi 作者: gnocchixyz 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_add_measures_update_subset_split(self):
        m, m_sql = self._create_metric('medium')
        measures = [
            incoming.Measure(datetime64(2014, 1, 6, i, j, 0), 100)
            for i in six.moves.range(2) for j in six.moves.range(0, 60, 2)]
        self.incoming.add_measures(m.id, measures)
        self.trigger_processing([str(m.id)])

        # add measure to end, in same aggregate time as last point.
        self.incoming.add_measures(m.id, [
            incoming.Measure(datetime64(2014, 1, 6, 1, 58, 1), 100)])

        with mock.patch.object(self.storage, '_store_metric_measures') as c:
            # should only resample last aggregate
            self.trigger_processing([str(m.id)])
        count = 0
        for call in c.mock_calls:
            # policy is 60 points and split is 48. should only update 2nd half
            args = call[1]
            if (args[0] == m_sql
               and args[2] == 'mean'
               and args[1].sampling == numpy.timedelta64(1, 'm')):
                count += 1
        self.assertEqual(1, count)
test_carbonara.py 文件源码 项目:gnocchi 作者: gnocchixyz 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_74_percentile_serialized(self):
        ts = carbonara.TimeSerie.from_tuples(
            [(datetime64(2014, 1, 1, 12, 0, 0), 3),
             (datetime64(2014, 1, 1, 12, 0, 4), 5),
             (datetime64(2014, 1, 1, 12, 0, 9), 6)])
        ts = self._resample(ts, numpy.timedelta64(60, 's'), '74pct')

        self.assertEqual(1, len(ts))
        self.assertEqual(5.48, ts[datetime64(2014, 1, 1, 12, 0, 0)][1])

        # Serialize and unserialize
        key = ts.get_split_key()
        o, s = ts.serialize(key)
        saved_ts = carbonara.AggregatedTimeSerie.unserialize(
            s, key, '74pct')

        ts = carbonara.TimeSerie.from_tuples(
            [(datetime64(2014, 1, 1, 12, 0, 0), 3),
             (datetime64(2014, 1, 1, 12, 0, 4), 5),
             (datetime64(2014, 1, 1, 12, 0, 9), 6)])
        ts = self._resample(ts, numpy.timedelta64(60, 's'), '74pct')
        saved_ts.merge(ts)

        self.assertEqual(1, len(ts))
        self.assertEqual(5.48, ts[datetime64(2014, 1, 1, 12, 0, 0)][1])
test_carbonara.py 文件源码 项目:gnocchi 作者: gnocchixyz 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_aggregation_std_with_unique(self):
        ts = carbonara.TimeSerie.from_tuples(
            [(datetime64(2014, 1, 1, 12, 0, 0), 3)])
        ts = self._resample(ts, numpy.timedelta64(60, 's'), 'std')
        self.assertEqual(0, len(ts), ts.values)

        ts = carbonara.TimeSerie.from_tuples(
            [(datetime64(2014, 1, 1, 12, 0, 0), 3),
             (datetime64(2014, 1, 1, 12, 0, 4), 6),
             (datetime64(2014, 1, 1, 12, 0, 9), 5),
             (datetime64(2014, 1, 1, 12, 1, 6), 9)])
        ts = self._resample(ts, numpy.timedelta64(60, 's'), "std")

        self.assertEqual(1, len(ts))
        self.assertEqual(1.5275252316519465,
                         ts[datetime64(2014, 1, 1, 12, 0, 0)][1])
test_carbonara.py 文件源码 项目:gnocchi 作者: gnocchixyz 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_serialize(self):
        ts = {'sampling': numpy.timedelta64(500, 'ms'), 'agg': 'mean'}
        tsb = carbonara.BoundTimeSerie(block_size=ts['sampling'])

        tsb.set_values(numpy.array([
            (datetime64(2014, 1, 1, 12, 0, 0, 1234), 3),
            (datetime64(2014, 1, 1, 12, 0, 0, 321), 6),
            (datetime64(2014, 1, 1, 12, 1, 4, 234), 5),
            (datetime64(2014, 1, 1, 12, 1, 9, 32), 7),
            (datetime64(2014, 1, 1, 12, 2, 12, 532), 1)],
            dtype=carbonara.TIMESERIES_ARRAY_DTYPE),
            before_truncate_callback=functools.partial(
                self._resample_and_merge, agg_dict=ts))

        key = ts['return'].get_split_key()
        o, s = ts['return'].serialize(key)
        self.assertEqual(ts['return'],
                         carbonara.AggregatedTimeSerie.unserialize(
                             s, key, 'mean'))
test_carbonara.py 文件源码 项目:gnocchi 作者: gnocchixyz 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_no_truncation(self):
        ts = {'sampling': numpy.timedelta64(60, 's'), 'agg': 'mean'}
        tsb = carbonara.BoundTimeSerie()

        for i in six.moves.range(1, 11):
            tsb.set_values(numpy.array([
                (datetime64(2014, 1, 1, 12, i, i), float(i))],
                dtype=carbonara.TIMESERIES_ARRAY_DTYPE),
                before_truncate_callback=functools.partial(
                    self._resample_and_merge, agg_dict=ts))
            tsb.set_values(numpy.array([
                (datetime64(2014, 1, 1, 12, i, i + 1), float(i + 1))],
                dtype=carbonara.TIMESERIES_ARRAY_DTYPE),
                before_truncate_callback=functools.partial(
                    self._resample_and_merge, agg_dict=ts))
            self.assertEqual(i, len(list(ts['return'].fetch())))
test_carbonara.py 文件源码 项目:gnocchi 作者: gnocchixyz 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_split_key(self):
        self.assertEqual(
            numpy.datetime64("2014-10-07"),
            carbonara.SplitKey.from_timestamp_and_sampling(
                numpy.datetime64("2015-01-01T15:03"),
                numpy.timedelta64(3600, 's')))
        self.assertEqual(
            numpy.datetime64("2014-12-31 18:00"),
            carbonara.SplitKey.from_timestamp_and_sampling(
                numpy.datetime64("2015-01-01 15:03:58"),
                numpy.timedelta64(58, 's')))

        key = carbonara.SplitKey.from_timestamp_and_sampling(
            numpy.datetime64("2015-01-01 15:03"),
            numpy.timedelta64(3600, 's'))

        self.assertGreater(key, numpy.datetime64("1970"))

        self.assertGreaterEqual(key, numpy.datetime64("1970"))
test_carbonara.py 文件源码 项目:gnocchi 作者: gnocchixyz 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_split(self):
        sampling = numpy.timedelta64(5, 's')
        points = 100000
        ts = carbonara.TimeSerie.from_data(
            timestamps=list(map(datetime.datetime.utcfromtimestamp,
                                six.moves.range(points))),
            values=list(six.moves.range(points)))
        agg = self._resample(ts, sampling, 'mean')

        grouped_points = list(agg.split())

        self.assertEqual(
            math.ceil((points / sampling.astype(float))
                      / carbonara.SplitKey.POINTS_PER_SPLIT),
            len(grouped_points))
        self.assertEqual("0.0",
                         str(carbonara.SplitKey(grouped_points[0][0], 0)))
        # 3600 × 5s = 5 hours
        self.assertEqual(datetime64(1970, 1, 1, 5),
                         grouped_points[1][0])
        self.assertEqual(carbonara.SplitKey.POINTS_PER_SPLIT,
                         len(grouped_points[0][1]))
test_carbonara.py 文件源码 项目:gnocchi 作者: gnocchixyz 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_from_timeseries(self):
        sampling = numpy.timedelta64(5, 's')
        points = 100000
        ts = carbonara.TimeSerie.from_data(
            timestamps=list(map(datetime.datetime.utcfromtimestamp,
                                six.moves.range(points))),
            values=list(six.moves.range(points)))
        agg = self._resample(ts, sampling, 'mean')

        split = [t[1] for t in list(agg.split())]

        self.assertEqual(agg,
                         carbonara.AggregatedTimeSerie.from_timeseries(
                             split,
                             sampling=agg.sampling,
                             max_size=agg.max_size,
                             aggregation_method=agg.aggregation_method))
subject.py 文件源码 项目:mimic3-benchmarks 作者: YerevaNN 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def add_hours_elpased_to_events(events, dt, remove_charttime=True):
    events['HOURS'] = (events.CHARTTIME - dt).apply(lambda s: s / np.timedelta64(1, 's')) / 60./60
    if remove_charttime:
        del events['CHARTTIME']
    return events
mimic3csv.py 文件源码 项目:mimic3-benchmarks 作者: YerevaNN 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def add_age_to_icustays(stays):
    stays['AGE'] = (stays.INTIME - stays.DOB).apply(lambda s: s / np.timedelta64(1, 's')) / 60./60/24/365
    stays.AGE.ix[stays.AGE<0] = 90
    return stays
datamatrix.py 文件源码 项目:OpenAPS 作者: medicinexlab 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def _make_actual_bg_array(bg_df, start_index, end_index, prediction_start_time):
    total_len = start_index - end_index + 1
    time_bg_array = np.zeros(total_len)
    actual_bg_array = np.zeros(total_len)
    array_index = 0
    miss = 0

    for df_index in range(start_index, end_index - 1, -1):
        #Keep track of the time starting at 0 at the start_index
        time = (bg_df.iloc[df_index]['created_at'] - bg_df.iloc[start_index]['created_at']) / np.timedelta64(1, 'm')

        if time > prediction_start_time:
            time_bg_array[array_index] = time
            try:
                actual_bg_array[array_index] = bg_df.iloc[df_index]['openaps']['enacted']['bg']
                array_index += 1
                last_time = time
            except:
                try:
                    actual_bg_array[array_index] = bg_df.iloc[df_index]['openaps']['suggested']['bg']
                    array_index += 1
                    last_time = time
                except:
                    #If a miss, don't move to the next index and instead add one to the number missed
                    miss += 1
        else:
            miss += 1


    #Remove the number of missed data
    time_bg_array = np.resize(time_bg_array, total_len - miss)
    actual_bg_array = np.resize(actual_bg_array, total_len - miss)

    return time_bg_array, actual_bg_array


#Returns true if the data lies in a data gap, so it will not be used
Data_API.py 文件源码 项目:QTS_Research 作者: geome-mitbbs 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def add_pricing_date(i=0,in_place=True):
    if in_place:
        Pricing_Database.pricing_date += np.timedelta64(i, 'D')
        return None
    else:
        return Pricing_Database.pricing_date + np.timedelta64(i, 'D')
Data_API.py 文件源码 项目:QTS_Research 作者: geome-mitbbs 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def add_date(npdate,i=0):
    return npdate + np.timedelta64(i,'D')
Data_API.py 文件源码 项目:QTS_Research 作者: geome-mitbbs 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def date_diff(dt1,dt2):
    if(isinstance(dt1,int)):
        dt1 = add_pricing_date(dt1,in_place=False)
    if(isinstance(dt2,int)):
        dt2 = add_pricing_date(dt2,in_place=False)

    return (dt2-dt1)/np.timedelta64(1,'D')
trajectory.py 文件源码 项目:postgis-t 作者: postgis-t 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def __to_Timestamp__(self, time):
        return time * np.timedelta64(1, 's') + np.datetime64("1970-01-01 00:00:00")
TestTrajectory.py 文件源码 项目:postgis-t 作者: postgis-t 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_intersection_sem_mock_do_test_2(self):
        poly = Polygon([(1, 1), (1, 3), (4, 3), (4, 1), (1, 1)])
        response = self.traj2.intersection_shapely(poly)

        traj = self.traj2.to_Trajectory(response)

        time = np.datetime64('2000-02-01T00:01:00')
        seconds = (time - np.datetime64("1970-01-01 00:00:00")) / np.timedelta64(1, 's')

        assert (np.array_equal(traj.getTime()[0], seconds))
        assert (np.array_equal(traj.getTime()[1], seconds))
        assert (np.array_equal(traj.getTime()[2], seconds))


问题


面经


文章

微信
公众号

扫码关注公众号