python类Timestamp()的实例源码

tradingcalendar_tse.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def get_open_and_close(day, early_closes):
    market_open = pd.Timestamp(
        datetime(
            year=day.year,
            month=day.month,
            day=day.day,
            hour=9,
            minute=31),
        tz='US/Eastern').tz_convert('UTC')
    # 1 PM if early close, 4 PM otherwise
    close_hour = 13 if day in early_closes else 16
    market_close = pd.Timestamp(
        datetime(
            year=day.year,
            month=day.month,
            day=day.day,
            hour=close_hour),
        tz='US/Eastern').tz_convert('UTC')

    return market_open, market_close
tradingcalendar_bmf.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def get_open_and_close(day, early_closes):
    # only "early close" event in Bovespa actually is a late start
    # as the market only opens at 1pm
    open_hour = 13 if day in quarta_cinzas else 10
    market_open = pd.Timestamp(
        datetime(
            year=day.year,
            month=day.month,
            day=day.day,
            hour=open_hour,
            minute=00),
        tz='America/Sao_Paulo').tz_convert('UTC')
    market_close = pd.Timestamp(
        datetime(
            year=day.year,
            month=day.month,
            day=day.day,
            hour=16),
        tz='America/Sao_Paulo').tz_convert('UTC')

    return market_open, market_close
tradingcalendar_china.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def get_open_and_close(day, early_closes):
    market_open = pd.Timestamp(
        datetime(
            year=day.year,
            month=day.month,
            day=day.day,
            hour=9,
            minute=31),
        tz='US/Eastern').tz_convert('UTC')
    # 1 PM if early close, 4 PM otherwise
    close_hour = 13 if day in early_closes else 16
    market_close = pd.Timestamp(
        datetime(
            year=day.year,
            month=day.month,
            day=day.day,
            hour=close_hour),
        tz='Asia/Shanghai').tz_convert('UTC')

    return market_open, market_close
assets.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def was_active(reference_date_value, asset):
    """
    Whether or not `asset` was active at the time corresponding to
    `reference_date_value`.

    Parameters
    ----------
    reference_date_value : int
        Date, represented as nanoseconds since EPOCH, for which we want to know
        if `asset` was alive.  This is generally the result of accessing the
        `value` attribute of a pandas Timestamp.
    asset : Asset
        The asset object to check.

    Returns
    -------
    was_active : bool
        Whether or not the `asset` existed at the specified time.
    """
    return (
        asset.start_date.value
        <= reference_date_value
        <= asset.end_date.value
    )
futures.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def as_of(self, dt):
        """ Get the future chain for this root symbol as of a specific date.

        Parameters
        ----------
        dt : datetime.datetime or pandas.Timestamp or str, optional
            The as_of_date for the new chain.

        Returns
        -------
        FutureChain

        """
        return FutureChain(
            asset_finder=self._asset_finder,
            get_datetime=self._algorithm_get_datetime,
            root_symbol=self.root_symbol,
            as_of_date=Timestamp(dt, tz='UTC'),
        )
minute_bars.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def last_date_in_output_for_sid(self, sid):
        """
        Parameters:
        -----------
        sid : int
            Asset identifier.

        Returns:
        --------
        out : pd.Timestamp
            The midnight of the last date written in to the output for the
            given sid.
        """
        sizes_path = "{0}/close/meta/sizes".format(self.sidpath(sid))
        if not os.path.exists(sizes_path):
            return pd.NaT
        with open(sizes_path, mode='r') as f:
            sizes = f.read()
        data = json.loads(sizes)
        num_days = data['shape'][0] / self._minutes_per_day
        if num_days == 0:
            # empty container
            return pd.NaT
        return self._trading_days[num_days - 1]
test_assets.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def setUpClass(cls):
        cls.future = Future(
            2468,
            symbol='OMH15',
            root_symbol='OM',
            notice_date=pd.Timestamp('2014-01-20', tz='UTC'),
            expiration_date=pd.Timestamp('2014-02-20', tz='UTC'),
            auto_close_date=pd.Timestamp('2014-01-18', tz='UTC'),
            tick_size=.01,
            multiplier=500
        )
        cls.future2 = Future(
            0,
            symbol='CLG06',
            root_symbol='CL',
            start_date=pd.Timestamp('2005-12-01', tz='UTC'),
            notice_date=pd.Timestamp('2005-12-20', tz='UTC'),
            expiration_date=pd.Timestamp('2006-01-20', tz='UTC')
        )
        env = TradingEnvironment(load=noop_load)
        env.write_data(futures_identifiers=[TestFuture.future,
                                            TestFuture.future2])
        cls.asset_finder = env.asset_finder
test_assets.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def test_insert_metadata(self):
        data = {0: {'start_date': '2014-01-01',
                    'end_date': '2015-01-01',
                    'symbol': "PLAY",
                    'foo_data': "FOO"}}
        self.env.write_data(equities_data=data)
        finder = self.asset_finder_type(self.env.engine)
        # Test proper insertion
        equity = finder.retrieve_asset(0)
        self.assertIsInstance(equity, Equity)
        self.assertEqual('PLAY', equity.symbol)
        self.assertEqual(pd.Timestamp('2015-01-01', tz='UTC'),
                         equity.end_date)

        # Test invalid field
        with self.assertRaises(AttributeError):
            equity.foo_data
test_assets.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def test_consume_asset_as_identifier(self):
        # Build some end dates
        eq_end = pd.Timestamp('2012-01-01', tz='UTC')
        fut_end = pd.Timestamp('2008-01-01', tz='UTC')

        # Build some simple Assets
        equity_asset = Equity(1, symbol="TESTEQ", end_date=eq_end)
        future_asset = Future(200, symbol="TESTFUT", end_date=fut_end)

        # Consume the Assets
        self.env.write_data(equities_identifiers=[equity_asset],
                            futures_identifiers=[future_asset])
        finder = self.asset_finder_type(self.env.engine)

        # Test equality with newly built Assets
        self.assertEqual(equity_asset, finder.retrieve_asset(1))
        self.assertEqual(future_asset, finder.retrieve_asset(200))
        self.assertEqual(eq_end, finder.retrieve_asset(1).end_date)
        self.assertEqual(fut_end, finder.retrieve_asset(200).end_date)
test_assets.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def test_sid_assignment(self):

        # This metadata does not contain SIDs
        metadata = ['PLAY', 'MSFT']

        today = normalize_date(pd.Timestamp('2015-07-09', tz='UTC'))

        # Write data with sid assignment
        self.env.write_data(equities_identifiers=metadata,
                            allow_sid_assignment=True)

        # Verify that Assets were built and different sids were assigned
        finder = self.asset_finder_type(self.env.engine)
        play = finder.lookup_symbol('PLAY', today)
        msft = finder.lookup_symbol('MSFT', today)
        self.assertEqual('PLAY', play.symbol)
        self.assertIsNotNone(play.sid)
        self.assertNotEqual(play.sid, msft.sid)
test_assets.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 43 收藏 0 点赞 0 评论 0
def test_map_identifier_index_to_sids(self):
        # Build an empty finder and some Assets
        dt = pd.Timestamp('2014-01-01', tz='UTC')
        finder = self.asset_finder_type(self.env.engine)
        asset1 = Equity(1, symbol="AAPL")
        asset2 = Equity(2, symbol="GOOG")
        asset200 = Future(200, symbol="CLK15")
        asset201 = Future(201, symbol="CLM15")

        # Check for correct mapping and types
        pre_map = [asset1, asset2, asset200, asset201]
        post_map = finder.map_identifier_index_to_sids(pre_map, dt)
        self.assertListEqual([1, 2, 200, 201], post_map)
        for sid in post_map:
            self.assertIsInstance(sid, int)

        # Change order and check mapping again
        pre_map = [asset201, asset2, asset200, asset1]
        post_map = finder.map_identifier_index_to_sids(pre_map, dt)
        self.assertListEqual([201, 2, 200, 1], post_map)
test_assets.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_group_by_type(self):
        equities = make_simple_equity_info(
            range(5),
            start_date=pd.Timestamp('2014-01-01'),
            end_date=pd.Timestamp('2015-01-01'),
        )
        futures = make_commodity_future_info(
            first_sid=6,
            root_symbols=['CL'],
            years=[2014],
        )
        # Intersecting sid queries, to exercise loading of partially-cached
        # results.
        queries = [
            ([0, 1, 3], [6, 7]),
            ([0, 2, 3], [7, 10]),
            (list(equities.index), list(futures.index)),
        ]
        with tmp_asset_finder(equities=equities, futures=futures) as finder:
            for equity_sids, future_sids in queries:
                results = finder.group_by_type(equity_sids + future_sids)
                self.assertEqual(
                    results,
                    {'equity': set(equity_sids), 'future': set(future_sids)},
                )
test_assets.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_repr(self):
        """ Test the __repr__ method of FutureChain.
        """
        cl = FutureChain(self.asset_finder, lambda: '2005-12-01', 'CL')
        cl_feb = FutureChain(self.asset_finder, lambda: '2005-12-01', 'CL',
                             as_of_date=pd.Timestamp('2006-02-01', tz='UTC'))

        # The default chain should not include the as of date.
        self.assertEqual(repr(cl), "FutureChain(root_symbol='CL')")

        # An explicit as of date should show up in the repr.
        self.assertEqual(
            repr(cl_feb),
            ("FutureChain(root_symbol='CL', "
             "as_of_date='2006-02-01 00:00:00+00:00')")
        )
test_sources.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def test_minute(self):
        np.random.seed(123)
        start_prices = {0: 100,
                        1: 500}
        start = pd.Timestamp('1990-01-01', tz='UTC')
        end = pd.Timestamp('1991-01-01', tz='UTC')
        source = RandomWalkSource(start_prices=start_prices,
                                  calendar=calendar_nyse, start=start,
                                  end=end)
        self.assertIsInstance(source.start, pd.lib.Timestamp)
        self.assertIsInstance(source.end, pd.lib.Timestamp)

        for event in source:
            self.assertIn(event.sid, start_prices.keys())
            self.assertIn(event.dt.replace(minute=0, hour=0),
                          calendar_nyse.trading_days)
            self.assertGreater(event.dt, start)
            self.assertLess(event.dt, end)
            self.assertGreater(event.price, 0,
                               "price should never go negative.")
            self.assertTrue(13 <= event.dt.hour <= 21,
                            "event.dt.hour == %i, not during market \
                            hours." % event.dt.hour)
test_sources.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_day(self):
        np.random.seed(123)
        start_prices = {0: 100,
                        1: 500}
        start = pd.Timestamp('1990-01-01', tz='UTC')
        end = pd.Timestamp('1992-01-01', tz='UTC')
        source = RandomWalkSource(start_prices=start_prices,
                                  calendar=calendar_nyse, start=start,
                                  end=end, freq='daily')
        self.assertIsInstance(source.start, pd.lib.Timestamp)
        self.assertIsInstance(source.end, pd.lib.Timestamp)

        for event in source:
            self.assertIn(event.sid, start_prices.keys())
            self.assertIn(event.dt.replace(minute=0, hour=0),
                          calendar_nyse.trading_days)
            self.assertGreater(event.dt, start)
            self.assertLess(event.dt, end)
            self.assertGreater(event.price, 0,
                               "price should never go negative.")
            self.assertEqual(event.dt.hour, 0)
test_algorithm.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def test_get_environment(self):
        expected_env = {
            'arena': 'backtest',
            'data_frequency': 'minute',
            'start': pd.Timestamp('2006-01-03 14:31:00+0000', tz='UTC'),
            'end': pd.Timestamp('2006-01-04 21:00:00+0000', tz='UTC'),
            'capital_base': 100000.0,
            'platform': 'zipline'
        }

        def initialize(algo):
            self.assertEqual('zipline', algo.get_environment())
            self.assertEqual(expected_env, algo.get_environment('*'))

        def handle_data(algo, data):
            pass

        algo = TradingAlgorithm(initialize=initialize,
                                handle_data=handle_data,
                                sim_params=self.sim_params,
                                env=self.env)
        algo.run(self.source)
test_factory.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def test_load_bars_from_yahoo(self):
        stocks = ['AAPL', 'GE']
        start = pd.datetime(1993, 1, 1, 0, 0, 0, 0, pytz.utc)
        end = pd.datetime(2002, 1, 1, 0, 0, 0, 0, pytz.utc)
        data = load_bars_from_yahoo(stocks=stocks, start=start, end=end)

        assert data.major_axis[0] == pd.Timestamp('1993-01-04 00:00:00+0000')
        assert data.major_axis[-1] == pd.Timestamp('2001-12-31 00:00:00+0000')
        for stock in stocks:
            assert stock in data.items

        for ohlc in ['open', 'high', 'low', 'close', 'volume', 'price']:
            assert ohlc in data.minor_axis

        np.testing.assert_raises(
            AssertionError, load_bars_from_yahoo, stocks=stocks,
            start=end, end=start
        )
request.py 文件源码 项目:pyaddepar 作者: lobnek 项目源码 文件源码 阅读 41 收藏 0 点赞 0 评论 0
def view(self, view_id, portfolio_id, portfolio_type, start_date=(pd.Timestamp("today")),
                     end_date=pd.Timestamp("today")):

        def __dict(d):
            return '&'.join(["{key}={value}".format(key=key, value=value) for key, value in d.items()])

        assert isinstance(portfolio_type, PortfolioType)

        request = "portfolio/views/{view}/results?".format(view=view_id) + \
                  __dict({"portfolio_id": portfolio_id,
                          "portfolio_type": portfolio_type.value,
                          "output_type": OutputType.JSON.value,
                          "start_date": start_date.strftime("%Y-%m-%d"),
                          "end_date": end_date.strftime("%Y-%m-%d")})

        return self.get(request=request)
functions.py 文件源码 项目:Eskapade 作者: KaveIO 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def to_date_time(dt, tz_in=None, tz_out=None):
    """Convert value to date/time object

    :param dt: value representing a date/time (parsed by pandas.Timestamp)
    :param tz_in: time zone to localize data/time value to (parsed by pandas.Timestamp.tz_localize)
    :param tz_out: time zone to convert data/time value into (parsed by pandas.Timestamp.tz_convert)
    :returns: date/time object
    :rtype: datetime.datetime
    """

    if dt is None:
        return None
    try:
        ts = pd.Timestamp(dt)
        if tz_in:
            ts = ts.tz_localize(tz_in)
        if tz_out:
            ts = ts.tz_convert(tz_out)
        return ts.to_datetime()
    except BaseException:
        return None
functions.py 文件源码 项目:Eskapade 作者: KaveIO 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def to_timestamp(dt, tz_in=None):
    """Convert value to Unix timestamp (ns)

    :param dt: value representing a date/time (parsed by pandas.Timestamp)
    :param tz_in: time zone to localize data/time value to (parsed by pandas.Timestamp.tz_localize)
    :returns: Unix timestamp (ns)
    :rtype: int
    """

    if dt is None:
        return None
    try:
        ts = pd.Timestamp(dt)
        if tz_in:
            ts = ts.tz_localize(tz_in)
        return ts.value
    except BaseException:
        return None
utils.py 文件源码 项目:empyrical 作者: quantopian 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def get_utc_timestamp(dt):
    """
    Returns the Timestamp/DatetimeIndex
    with either localized or converted to UTC.
    Parameters
    ----------
    dt : Timestamp/DatetimeIndex
        the date(s) to be converted
    Returns
    -------
    same type as input
        date(s) converted to UTC
    """

    dt = pd.to_datetime(dt)
    try:
        dt = dt.tz_localize('UTC')
    except TypeError:
        dt = dt.tz_convert('UTC')
    return dt
train_tensorflow.py 文件源码 项目:tianchi_power 作者: lvniqi 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def predict_tf_once(day,start_date = '2016-10-1'):
    all_dataset = get_dataset(day)
    all_dataset = map(lambda x:x.ix[start_date:start_date],all_dataset)
    y_p_features = map(lambda user_id:tf_percent_model.resample_x_y_(all_dataset,user_id)[0].reshape(-1),get_full_user_ids())
    y_p_features_df = pd.DataFrame(y_p_features,index = get_full_user_ids())
    percent = pd.DataFrame.from_csv('./features/tensorflow_model/percent_model/%d.csv'%day)
    #percent = pd.DataFrame.from_csv('./features/tensorflow_model/percent_model/%d.csv'%2)
    #%%
    percent = percent[map(lambda x:'percent#%d'%x,range(_feature_length))]
    t = pd.DataFrame(index = percent.index)
    t[pd.Timestamp(start_date)+pd.Timedelta('%dd'%(day-1))] = (np.array(y_p_features_df)*percent).sum(axis=1)
    t = t.T
    t.to_csv('./result/predict_part/%d.csv'%day)
    real = int(np.round((np.array(y_p_features_df)*percent).sum().sum()))
    print (day,real)
    return (day,real)
train_tensorflow.py 文件源码 项目:tianchi_power 作者: lvniqi 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def predict_tf_all(path = None):
    result_list = []
    p = m_Pool(31)
    result_list = p.map(predict_tf_once,range(1,32))
    p.close()
    p.join()
    print 'writing...'
    result_df = pd.DataFrame(index = range(1))
    for day,result in result_list:
        day_s = str(day)
        if len(day_s)<=1:
            day_s = '0'+day_s
        result_df['201610'+day_s] = result
    result_df = result_df.T
    result_df.columns = ['predict_power_consumption']
    if path == None:
        date = str(pd.Timestamp(time.ctime())).replace(' ','_').replace(':','_')
        path = './result/'+date+'.csv'
    result_df.to_csv(path,index_label='predict_date')

    l = map(lambda day:pd.DataFrame.from_csv('./result/predict_part/%d.csv'%day),range(1,32))
    t = pd.concat(l)
    t.to_csv('./result/predict_part/'+date+'.csv')
dataframe_client_test.py 文件源码 项目:Dshield 作者: ywjt 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_write_points_from_dataframe(self):
        now = pd.Timestamp('1970-01-01 00:00+00:00')
        dataframe = pd.DataFrame(data=[["1", 1, 1.0], ["2", 2, 2.0]],
                                 index=[now, now + timedelta(hours=1)],
                                 columns=["column_one", "column_two",
                                          "column_three"])
        expected = (
            b"foo column_one=\"1\",column_two=1i,column_three=1.0 0\n"
            b"foo column_one=\"2\",column_two=2i,column_three=2.0 "
            b"3600000000000\n"
        )

        with requests_mock.Mocker() as m:
            m.register_uri(requests_mock.POST,
                           "http://localhost:8086/write",
                           status_code=204)

            cli = DataFrameClient(database='db')

            cli.write_points(dataframe, 'foo')
            self.assertEqual(m.last_request.body, expected)

            cli.write_points(dataframe, 'foo', tags=None)
            self.assertEqual(m.last_request.body, expected)
dataframe_client_test.py 文件源码 项目:Dshield 作者: ywjt 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def test_write_points_from_dataframe(self):
        now = pd.Timestamp('1970-01-01 00:00+00:00')
        dataframe = pd.DataFrame(data=[["1", 1, 1.0], ["2", 2, 2.0]],
                                 index=[now, now + timedelta(hours=1)],
                                 columns=["column_one", "column_two",
                                          "column_three"])
        points = [
            {
                "points": [
                    ["1", 1, 1.0, 0],
                    ["2", 2, 2.0, 3600]
                ],
                "name": "foo",
                "columns": ["column_one", "column_two", "column_three", "time"]
            }
        ]

        with requests_mock.Mocker() as m:
            m.register_uri(requests_mock.POST,
                           "http://localhost:8086/db/db/series")

            cli = DataFrameClient(database='db')
            cli.write_points({"foo": dataframe})

            self.assertListEqual(json.loads(m.last_request.body), points)
dataframe_client_test.py 文件源码 项目:Dshield 作者: ywjt 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def test_write_points_from_dataframe_with_float_nan(self):
        now = pd.Timestamp('1970-01-01 00:00+00:00')
        dataframe = pd.DataFrame(data=[[1, float("NaN"), 1.0], [2, 2, 2.0]],
                                 index=[now, now + timedelta(hours=1)],
                                 columns=["column_one", "column_two",
                                          "column_three"])
        points = [
            {
                "points": [
                    [1, None, 1.0, 0],
                    [2, 2, 2.0, 3600]
                ],
                "name": "foo",
                "columns": ["column_one", "column_two", "column_three", "time"]
            }
        ]

        with requests_mock.Mocker() as m:
            m.register_uri(requests_mock.POST,
                           "http://localhost:8086/db/db/series")

            cli = DataFrameClient(database='db')
            cli.write_points({"foo": dataframe})

            self.assertListEqual(json.loads(m.last_request.body), points)
dataframe_client_test.py 文件源码 项目:Dshield 作者: ywjt 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def test_write_points_from_dataframe_with_numeric_column_names(self):
        now = pd.Timestamp('1970-01-01 00:00+00:00')
        # df with numeric column names
        dataframe = pd.DataFrame(data=[["1", 1, 1.0], ["2", 2, 2.0]],
                                 index=[now, now + timedelta(hours=1)])
        points = [
            {
                "points": [
                    ["1", 1, 1.0, 0],
                    ["2", 2, 2.0, 3600]
                ],
                "name": "foo",
                "columns": ['0', '1', '2', "time"]
            }
        ]

        with requests_mock.Mocker() as m:
            m.register_uri(requests_mock.POST,
                           "http://localhost:8086/db/db/series")

            cli = DataFrameClient(database='db')
            cli.write_points({"foo": dataframe})

            self.assertListEqual(json.loads(m.last_request.body), points)
dataframe_client_test.py 文件源码 项目:Dshield 作者: ywjt 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def test_datetime_to_epoch(self):
        timestamp = pd.Timestamp('2013-01-01 00:00:00.000+00:00')
        cli = DataFrameClient('host', 8086, 'username', 'password', 'db')

        self.assertEqual(
            cli._datetime_to_epoch(timestamp),
            1356998400.0
        )
        self.assertEqual(
            cli._datetime_to_epoch(timestamp, time_precision='s'),
            1356998400.0
        )
        self.assertEqual(
            cli._datetime_to_epoch(timestamp, time_precision='m'),
            1356998400000.0
        )
        self.assertEqual(
            cli._datetime_to_epoch(timestamp, time_precision='ms'),
            1356998400000.0
        )
        self.assertEqual(
            cli._datetime_to_epoch(timestamp, time_precision='u'),
            1356998400000000.0
        )
test_integration.py 文件源码 项目:pymapd 作者: mapd 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def test_load_columnar_pandas_all(self, con, all_types_table):
        pd = pytest.importorskip("pandas")
        import numpy as np

        data = pd.DataFrame({
            "boolean_": [True, False],
            "smallint_": np.array([0, 1], dtype=np.int8),
            "int_": np.array([0, 1], dtype=np.int32),
            "bigint_": np.array([0, 1], dtype=np.int64),
            "float_": np.array([0, 1], dtype=np.float32),
            "double_": np.array([0, 1], dtype=np.float64),
            "varchar_": ["a", "b"],
            "text_": ['a', 'b'],
            "time_": [datetime.time(0, 11, 59), datetime.time(13)],
            "timestamp_": [pd.Timestamp("2016"), pd.Timestamp("2017")],
            "date_": [datetime.date(2016, 1, 1), datetime.date(2017, 1, 1)],
        }, columns=['boolean_', 'smallint_', 'int_', 'bigint_', 'float_',
                    'double_', 'varchar_', 'text_', 'time_', 'timestamp_',
                    'date_'])
        con.load_table_columnar(all_types_table, data, preserve_index=False)
test_integration.py 文件源码 项目:pymapd 作者: mapd 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def test_load_table_creates(self, con, not_a_table):
        pd = pytest.importorskip("pandas")
        import numpy as np

        data = pd.DataFrame({
            "boolean_": [True, False],
            "smallint_cast": np.array([0, 1], dtype=np.int8),
            "smallint_": np.array([0, 1], dtype=np.int16),
            "int_": np.array([0, 1], dtype=np.int32),
            "bigint_": np.array([0, 1], dtype=np.int64),
            "float_": np.array([0, 1], dtype=np.float32),
            "double_": np.array([0, 1], dtype=np.float64),
            "varchar_": ["a", "b"],
            "text_": ['a', 'b'],
            "time_": [datetime.time(0, 11, 59), datetime.time(13)],
            "timestamp_": [pd.Timestamp("2016"), pd.Timestamp("2017")],
            "date_": [datetime.date(2016, 1, 1), datetime.date(2017, 1, 1)],
        }, columns=['boolean_', 'smallint_', 'int_', 'bigint_', 'float_',
                    'double_', 'varchar_', 'text_', 'time_', 'timestamp_',
                    'date_'])
        con.load_table(not_a_table, data, create=True)


问题


面经


文章

微信
公众号

扫码关注公众号