python类datetime64()的实例源码

gps.py 文件源码 项目:PyGPS 作者: gregstarr 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def getIntervals(data,sat_num,maxgap=3,maxjump=1.2):
    """
    scans through the phase tec of a satellite and determines where "good"
    intervals begin and end
    inputs:
        data - Panel4D with dimensions (parameter,satellite number,time,data/lli/ssi)
        sat_num - the number of the satellite to get good intervals for
        maxgap - maximum number of nans before starting new interval
        maxjump - maximum jump in phase TEC before starting new interval
    output:
        intervals - list of 2-tuples, beginning and end of each "good" interval
                    as a Pandas/numpy datetime64
    """
    if c2p2(data,sat_num):
        finite_values = np.where(np.logical_and.reduce((
                                np.isfinite(data[['L1','L2','C1','C2'],sat_num,:,'data']).T)))[0]
    else:
        finite_values = np.where(np.logical_and.reduce((
                                np.isfinite(data[['L1','L2','C1','P2'],sat_num,:,'data']).T)))[0]
    intervals=[]
    if len(finite_values)==0:
        return intervals
    phase_tec=2.85E9*(data['L1',sat_num,:,'data']/f1-data['L2',sat_num,:,'data']/f2)
    beginning=finite_values[0]
    last=finite_values[0]
    for i in finite_values[1:]:
        if i-last>maxgap or abs(phase_tec[i]-phase_tec[last])>maxjump:
            intervals.append((beginning,last))
            beginning=i
        last=i
        if i==finite_values[-1]:
            intervals.append((beginning,last))
    intervals=[(data.major_axis[time[0]],data.major_axis[time[1]]) for time in intervals]
    return intervals
gps.py 文件源码 项目:PyGPS 作者: gregstarr 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def getTec(data,sat_num,data_interval,satbias=None):
    """
    calculates slant TEC using phase tec shifted by the median difference
    between phase tec and pseudorange tec
    inputs:
        data - Panel4D with dimensions (parameter,satellite number,time,data/lli/ssi)
        sat_num - the number of the satellite to calculate TEC for
        data_interval - the interval made from getInterval(), it's a 2-tuple
                        marking the beginning and the end of a "good" interval
                        of data, each value is a Pandas/numpy datetime64
    """
    if c2p2(data,sat_num,data_interval):
        range_tec = (2.85E9/3.0E8)*(
                    data['C2',sat_num,data_interval[0]:data_interval[1],'data']
                    -data['C1',sat_num,data_interval[0]:data_interval[1],'data'])
    else:
        range_tec = (2.85E9/3.0E8)*(
                    data['P2',sat_num,data_interval[0]:data_interval[1],'data']
                    -data['C1',sat_num,data_interval[0]:data_interval[1],'data'])

    phase_tec=2.85E9*(data['L1',sat_num,data_interval[0]:data_interval[1],'data']/f1
                      -data['L2',sat_num,data_interval[0]:data_interval[1],'data']/f2)

    tec_difference = np.array(sorted(phase_tec-range_tec))
    tec_difference = tec_difference[np.isfinite(tec_difference)]
    median_difference = tec_difference[int(len(tec_difference)/2)]
    difference_width = tec_difference[int(len(tec_difference)*.75)]-tec_difference[int(len(tec_difference)*.25)]
    median_error = difference_width/np.sqrt(len(tec_difference))
    tec = phase_tec - median_difference

    return tec,median_error
spark_histogrammar_filler.py 文件源码 项目:Eskapade 作者: KaveIO 项目源码 文件源码 阅读 14 收藏 0 点赞 0 评论 0
def get_data_type(self, df, col):
        """Get data type of dataframe column

        :param df: input data frame
        :param str col: column
        """
        if col not in df.columns:
            raise KeyError('column "{0:s}" not in input dataframe'.format(col))
        dt = dict(df.dtypes)[col]
        # spark conversions to numpy or python equivalent
        if dt == 'string':
            dt = 'str'
        elif dt == 'timestamp':
            dt = np.datetime64
        return np.dtype(dt)
value_counter.py 文件源码 项目:Eskapade 作者: KaveIO 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def process_columns(self, df):
        """Process columns before histogram filling

        Specifically, convert timestamp columns to integers
        and numeric variables are converted to indices

        :param df: input (pandas) data frame
        :returns: output (pandas) data frame with converted timestamp columns
        :rtype: pandas DataFrame
        """

        # timestamp variables are converted to ns here
        # make temp df for value counting (used below)

        idf = df[self.str_cols].copy(deep=False)
        for col in self.dt_cols:
            self.log().debug('Converting column "%s" of type "%s" to nanosec', col, self.var_dtype[col])
            idf[col] = df[col].apply(hf.to_ns)

        # numerical variables are converted to indices here
        for col in self.num_cols + self.dt_cols:
            self.log().debug('Converting column "%s" of type "%s" to index', col, self.var_dtype[col])
            # find column specific bin_specs. if not found, use dict of default
            # values.
            dt = df[col].dtype
            is_number = isinstance(dt.type(), np.number)
            is_timestamp = isinstance(dt.type(), np.datetime64)
            sf = idf if is_timestamp else df
            bin_specs = self.bin_specs.get(col, self._unit_bin_specs if is_number else self._unit_timestamp_specs)
            idf[col] = sf[col].apply(hf.value_to_bin_index, **bin_specs)

        return idf
statistics.py 文件源码 项目:Eskapade 作者: KaveIO 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def get_col_props(var_type):
    """Get column properties

    :returns dict: Column properties
    """
    npdtype = np.dtype(var_type)

    # determine data-type categories
    is_int = isinstance(npdtype.type(), np.integer)
    is_ts = isinstance(npdtype.type(), np.datetime64)
    is_num = is_ts or isinstance(npdtype.type(), np.number)

    return dict(dtype=npdtype, is_num=is_num, is_int=is_int, is_ts=is_ts)
dq_helper.py 文件源码 项目:Eskapade 作者: KaveIO 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def to_date_time(val):
    """Convert input to numpy.datetime64

    :param val: value to be evaluated
    :returns: evaluated value
    :rtype: numpy.datetime64
    """

    return pd.to_datetime(val, errors='coerce')
trajectory.py 文件源码 项目:postgis-t 作者: postgis-t 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def __getitem__(self, t):
        t = np.datetime64(t, unit=self.unit)
        try:
            i = self._t[str(t)]
            return self.x[i], self.y[i]
        except KeyError:
            # interpolate?
            pass
trajectory.py 文件源码 项目:postgis-t 作者: postgis-t 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def __setitem__(self, t, point):
        t = np.datetime64(t, unit=self.unit)
        try:
            i = self._t[str(t)]
            self.x[i] = point[0]
            self.y[i] = point[1]
        except KeyError:
            # interpolate?
            pass
trajectory.py 文件源码 项目:postgis-t 作者: postgis-t 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def __to_Timestamp__(self, time):
        return time * np.timedelta64(1, 's') + np.datetime64("1970-01-01 00:00:00")
trajectory.py 文件源码 项目:postgis-t 作者: postgis-t 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def after(self, t):
        t = np.datetime64(t, unit=self.unit)
        mask = self.t > t

        if(np.all(mask == False, axis = 0)):
            return None

        result = Trajectory(self.x[mask], self.y[mask], self.t[mask])
        return result
trajectory.py 文件源码 项目:postgis-t 作者: postgis-t 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def during(self, t1, t2):

        t1 = np.datetime64(t1, unit=self.unit)
        t2 = np.datetime64(t2, unit=self.unit)

        mask = ((self.t > t1) & (self.t < t2))

        if(np.all(mask == False, axis = 0)):
            return None

        result = Trajectory(self.x[mask], self.y[mask], self.t[mask])
        return result
TestTrajectory.py 文件源码 项目:postgis-t 作者: postgis-t 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_intersection_sem_mock_do_test_2(self):
        poly = Polygon([(1, 1), (1, 3), (4, 3), (4, 1), (1, 1)])
        response = self.traj2.intersection_shapely(poly)

        traj = self.traj2.to_Trajectory(response)

        time = np.datetime64('2000-02-01T00:01:00')
        seconds = (time - np.datetime64("1970-01-01 00:00:00")) / np.timedelta64(1, 's')

        assert (np.array_equal(traj.getTime()[0], seconds))
        assert (np.array_equal(traj.getTime()[1], seconds))
        assert (np.array_equal(traj.getTime()[2], seconds))
TestTrajectory.py 文件源码 项目:postgis-t 作者: postgis-t 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_startTime(self):
        dateTest = dt.datetime(year=2000, month=1, day=1, minute=1)
        dateTestNumPy = np.array(dateTest, dtype='datetime64[{}]'.format('s'))
        dateFalse = dt.datetime(year=2000, month=1, day=2, hour=1)

        self.assertTrue(np.array_equal(self.traj.begins(), dateTestNumPy))
        self.assertNotIsInstance(self.traj.begins(), dt.datetime)
        self.assertNotEqual(self.traj.begins(), dateFalse)
        self.assertEqual(self.traj.begins(), dateTestNumPy)
TestTrajectory.py 文件源码 项目:postgis-t 作者: postgis-t 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_endTime(self):
        dateTest = dt.datetime(year=2000, month=4, day=1, minute=1)
        dateTestNumPy = np.array(dateTest, dtype='datetime64[{}]'.format('s'))
        dateFalse = dt.datetime(year=2000, month=3, day=2, hour=1)

        self.assertTrue(self.traj.ends(), dateTest)
        self.assertNotIsInstance(self.traj.ends(), dt.datetime)
        self.assertNotEqual(self.traj.ends(), dateFalse)
        self.assertEqual(self.traj.ends(), dateTestNumPy)
test_multiarray.py 文件源码 项目:radar 作者: amoose136 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_datetime(self):
        a = np.array([0,0,0], dtype='datetime64[D]')
        b = np.array([2,1,0], dtype='datetime64[D]')
        idx = np.lexsort((b, a))
        expected_idx = np.array([2, 1, 0])
        assert_array_equal(idx, expected_idx)

        a = np.array([0,0,0], dtype='timedelta64[D]')
        b = np.array([2,1,0], dtype='timedelta64[D]')
        idx = np.lexsort((b, a))
        expected_idx = np.array([2, 1, 0])
        assert_array_equal(idx, expected_idx)
test_deprecations.py 文件源码 项目:radar 作者: amoose136 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_string(self):
        self.assert_deprecated(np.datetime64, args=('2000-01-01T00+01',))
        self.assert_deprecated(np.datetime64, args=('2000-01-01T00Z',))
test_deprecations.py 文件源码 项目:radar 作者: amoose136 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_datetime(self):
        tz = pytz.timezone('US/Eastern')
        dt = datetime.datetime(2000, 1, 1, 0, 0, tzinfo=tz)
        self.assert_deprecated(np.datetime64, args=(dt,))
test_datetime.py 文件源码 项目:radar 作者: amoose136 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_datetime_dtype_creation(self):
        for unit in ['Y', 'M', 'W', 'D',
                     'h', 'm', 's', 'ms', 'us',
                     'ns', 'ps', 'fs', 'as']:
            dt1 = np.dtype('M8[750%s]' % unit)
            assert_(dt1 == np.dtype('datetime64[750%s]' % unit))
            dt2 = np.dtype('m8[%s]' % unit)
            assert_(dt2 == np.dtype('timedelta64[%s]' % unit))

        # Generic units shouldn't add [] to the end
        assert_equal(str(np.dtype("M8")), "datetime64")

        # Should be possible to specify the endianness
        assert_equal(np.dtype("=M8"), np.dtype("M8"))
        assert_equal(np.dtype("=M8[s]"), np.dtype("M8[s]"))
        assert_(np.dtype(">M8") == np.dtype("M8") or
                np.dtype("<M8") == np.dtype("M8"))
        assert_(np.dtype(">M8[D]") == np.dtype("M8[D]") or
                np.dtype("<M8[D]") == np.dtype("M8[D]"))
        assert_(np.dtype(">M8") != np.dtype("<M8"))

        assert_equal(np.dtype("=m8"), np.dtype("m8"))
        assert_equal(np.dtype("=m8[s]"), np.dtype("m8[s]"))
        assert_(np.dtype(">m8") == np.dtype("m8") or
                np.dtype("<m8") == np.dtype("m8"))
        assert_(np.dtype(">m8[D]") == np.dtype("m8[D]") or
                np.dtype("<m8[D]") == np.dtype("m8[D]"))
        assert_(np.dtype(">m8") != np.dtype("<m8"))

        # Check that the parser rejects bad datetime types
        assert_raises(TypeError, np.dtype, 'M8[badunit]')
        assert_raises(TypeError, np.dtype, 'm8[badunit]')
        assert_raises(TypeError, np.dtype, 'M8[YY]')
        assert_raises(TypeError, np.dtype, 'm8[YY]')
        assert_raises(TypeError, np.dtype, 'm4')
        assert_raises(TypeError, np.dtype, 'M7')
        assert_raises(TypeError, np.dtype, 'm7')
        assert_raises(TypeError, np.dtype, 'M16')
        assert_raises(TypeError, np.dtype, 'm16')
test_datetime.py 文件源码 项目:radar 作者: amoose136 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_datetime_scalar_construction_timezone(self):
        # verify that supplying an explicit timezone works, but is deprecated
        with assert_warns(DeprecationWarning):
            assert_equal(np.datetime64('2000-01-01T00Z'),
                         np.datetime64('2000-01-01T00'))
        with assert_warns(DeprecationWarning):
            assert_equal(np.datetime64('2000-01-01T00-08'),
                         np.datetime64('2000-01-01T08'))
test_datetime.py 文件源码 项目:radar 作者: amoose136 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_datetime_array_find_type(self):
        dt = np.datetime64('1970-01-01', 'M')
        arr = np.array([dt])
        assert_equal(arr.dtype, np.dtype('M8[M]'))

        # at the moment, we don't automatically convert these to datetime64

        dt = datetime.date(1970, 1, 1)
        arr = np.array([dt])
        assert_equal(arr.dtype, np.dtype('O'))

        dt = datetime.datetime(1970, 1, 1, 12, 30, 40)
        arr = np.array([dt])
        assert_equal(arr.dtype, np.dtype('O'))

        # find "supertype" for non-dates and dates

        b = np.bool_(True)
        dt = np.datetime64('1970-01-01', 'M')
        arr = np.array([b, dt])
        assert_equal(arr.dtype, np.dtype('O'))

        dt = datetime.date(1970, 1, 1)
        arr = np.array([b, dt])
        assert_equal(arr.dtype, np.dtype('O'))

        dt = datetime.datetime(1970, 1, 1, 12, 30, 40)
        arr = np.array([b, dt])
        assert_equal(arr.dtype, np.dtype('O'))


问题


面经


文章

微信
公众号

扫码关注公众号