def _create_daily_stats(self, perfs):
# create daily and cumulative stats dataframe
daily_perfs = []
# TODO: the loop here could overwrite expected properties
# of daily_perf. Could potentially raise or log a
# warning.
for perf in perfs:
if 'daily_perf' in perf:
perf['daily_perf'].update(
perf['daily_perf'].pop('recorded_vars')
)
perf['daily_perf'].update(perf['cumulative_risk_metrics'])
daily_perfs.append(perf['daily_perf'])
else:
self.risk_report = perf
daily_dts = [np.datetime64(perf['period_close'], utc=True)
for perf in daily_perfs]
daily_stats = pd.DataFrame(daily_perfs, index=daily_dts)
return daily_stats
python类datetime64()的实例源码
def drop_inconsistent_keys(self, columns, obj):
"""Drop inconsistent keys
Drop inconsistent keys from a ValueCounts or Histogram object.
:param list columns: columns key to retrieve desired datatypes
:param object obj: ValueCounts or Histogram object to drop inconsistent keys from
"""
# has array been converted first? if so, set correct comparison
# datatype
comp_dtype = []
for col in columns:
dt = np.dtype(self.var_dtype[col]).type()
is_converted = isinstance(
dt, np.number) or isinstance(
dt, np.datetime64)
if is_converted:
comp_dtype.append(np.int64)
else:
comp_dtype.append(self.var_dtype[col])
# keep only keys of types in comp_dtype
obj.remove_keys_of_inconsistent_type(prefered_key_type=comp_dtype)
return obj
def categorize_columns(self, df):
"""Categorize columns of dataframe by data type
:param df: input (pandas) data frame
"""
# check presence and data type of requested columns
# sort columns into numerical, timestamp and category based
for c in self.columns:
for col in c:
if col not in df.columns:
raise KeyError('column "{0:s}" not in dataframe "{1:s}"'.format(col, self.read_key))
dt = self.get_data_type(df, col)
if col not in self.var_dtype:
self.var_dtype[col] = dt.type
if (self.var_dtype[col] is np.string_) or (self.var_dtype[col] is np.object_):
self.var_dtype[col] = str
if not any(dt in types for types in (STRING_SUBSTR, NUMERIC_SUBSTR, TIME_SUBSTR)):
raise TypeError('cannot process column "{0:s}" of data type "{1:s}"'.format(col, str(dt)))
is_number = isinstance(dt.type(), np.number)
is_timestamp = isinstance(dt.type(), np.datetime64)
colset = self.num_cols if is_number else self.dt_cols if is_timestamp else self.str_cols
if col not in colset:
colset.append(col)
self.log().debug('Data type of column "%s" is "%s"', col, self.var_dtype[col])
def _dtype(self, c):
n = ':'.join(c)
if n in self.var_dtype:
return self.var_dtype[n]
# ranking in order is: float, int, short, char
elif any(self.var_dtype[col] == np.dtype(float) for col in c if col in self.var_dtype):
return np.dtype(float)
elif any(self.var_dtype[col] == np.dtype(int) for col in c if col in self.var_dtype):
return np.dtype(int)
elif any(self.var_dtype[col] == np.datetime64 for col in c if col in self.var_dtype):
return np.dtype(int)
elif any(self.var_dtype[col] == np.dtype('short') for col in c if col in self.var_dtype):
return np.dtype('short')
elif any(self.var_dtype[col] == np.dtype('byte') for col in c if col in self.var_dtype):
return np.dtype('byte')
elif any(self.var_dtype[col] == np.dtype(bool) for col in c if col in self.var_dtype):
return np.dtype(bool)
# default is float
return self._default_dtype
def test_make_scale_and_datetimes():
def correct_scale(scale, name):
return scale.__class__.__name__ == name
# cpython
x = pd.Series([datetime(year, 1, 1) for year in [2010, 2026, 2015]])
assert correct_scale(make_scale('x', x), 'scale_x_datetime')
assert correct_scale(make_scale('color', x), 'scale_color_datetime')
assert correct_scale(make_scale('fill', x), 'scale_fill_datetime')
assert correct_scale(make_scale('size', x), 'scale_size_datetime')
assert correct_scale(make_scale('alpha', x), 'scale_alpha_datetime')
# numpy
x = pd.Series([np.datetime64(i*10, 'D') for i in range(1, 10)])
assert correct_scale(make_scale('x', x), 'scale_x_datetime')
assert correct_scale(make_scale('color', x), 'scale_color_datetime')
assert correct_scale(make_scale('fill', x), 'scale_fill_datetime')
assert correct_scale(make_scale('size', x), 'scale_size_datetime')
assert correct_scale(make_scale('alpha', x), 'scale_alpha_datetime')
def get_price_yahoo(self,ticker):
try:
if Cache.data_request_delay != None:
time.sleep(Cache.data_request_delay)
base_url = "http://ichart.finance.yahoo.com/table.csv?s="
file_name = os.path.dirname(__file__) + "\\data\\"+ "yahoo_"+ticker+"_"+str(Pricing_Database.current_date)
urllib.request.urlretrieve(base_url+ticker, file_name)
df = pd.read_csv(file_name, index_col=False, header=0)
df['Open'] = df['Open']*df['Adj Close']/df['Close']
df['High'] = df['High']*df['Adj Close']/df['Close']
df['Low'] = df['Low']*df['Adj Close']/df['Close']
df['Close'] = df['Adj Close']
df['Date'] = df['Date'].apply(lambda x:np.datetime64(x))
df = df[['Date','Open', 'High', 'Low', 'Close', 'Volume']]
df = df.set_index(['Date'])
df.dropna(inplace=True)
df = df.iloc[::-1]
return df
except Exception as e:
print("YAHOO/"+ticker)
raise e
def __init__(self, x, y, t):
try:
if (len(x) != len(y) or len(y) != len(t)):
raise Exception('Os arrays x, y e t precisam ser do mesmo tamanho')
except:
raise Exception('Os atributos x, y e t precisam ser um arrays')
self.unit = 's'
self.x = np.array(x, dtype='f8')
self.y = np.array(y, dtype='f8')
self.t = np.array(t, dtype='datetime64[{}]'.format(self.unit))
self.seconds = (self.t - np.datetime64("1970-01-01T00:00:00")) / np.timedelta64(1, 's')
self._t = {str(v): i for i, v in enumerate(t)}
def test_scalar_none_comparison(self):
# Scalars should still just return False and not give a warnings.
# The comparisons are flagged by pep8, ignore that.
with warnings.catch_warnings(record=True) as w:
warnings.filterwarnings('always', '', FutureWarning)
assert_(not np.float32(1) == None)
assert_(not np.str_('test') == None)
# This is dubious (see below):
assert_(not np.datetime64('NaT') == None)
assert_(np.float32(1) != None)
assert_(np.str_('test') != None)
# This is dubious (see below):
assert_(np.datetime64('NaT') != None)
assert_(len(w) == 0)
# For documentation purposes, this is why the datetime is dubious.
# At the time of deprecation this was no behaviour change, but
# it has to be considered when the deprecations are done.
assert_(np.equal(np.datetime64('NaT'), None))
def test_datetime_nat_casting(self):
a = np.array('NaT', dtype='M8[D]')
b = np.datetime64('NaT', '[D]')
# Arrays
assert_equal(a.astype('M8[s]'), np.array('NaT', dtype='M8[s]'))
assert_equal(a.astype('M8[ms]'), np.array('NaT', dtype='M8[ms]'))
assert_equal(a.astype('M8[M]'), np.array('NaT', dtype='M8[M]'))
assert_equal(a.astype('M8[Y]'), np.array('NaT', dtype='M8[Y]'))
assert_equal(a.astype('M8[W]'), np.array('NaT', dtype='M8[W]'))
# Scalars -> Scalars
assert_equal(np.datetime64(b, '[s]'), np.datetime64('NaT', '[s]'))
assert_equal(np.datetime64(b, '[ms]'), np.datetime64('NaT', '[ms]'))
assert_equal(np.datetime64(b, '[M]'), np.datetime64('NaT', '[M]'))
assert_equal(np.datetime64(b, '[Y]'), np.datetime64('NaT', '[Y]'))
assert_equal(np.datetime64(b, '[W]'), np.datetime64('NaT', '[W]'))
# Arrays -> Scalars
assert_equal(np.datetime64(a, '[s]'), np.datetime64('NaT', '[s]'))
assert_equal(np.datetime64(a, '[ms]'), np.datetime64('NaT', '[ms]'))
assert_equal(np.datetime64(a, '[M]'), np.datetime64('NaT', '[M]'))
assert_equal(np.datetime64(a, '[Y]'), np.datetime64('NaT', '[Y]'))
assert_equal(np.datetime64(a, '[W]'), np.datetime64('NaT', '[W]'))
def test_pydatetime_creation(self):
a = np.array(['1960-03-12', datetime.date(1960, 3, 12)], dtype='M8[D]')
assert_equal(a[0], a[1])
a = np.array(['1999-12-31', datetime.date(1999, 12, 31)], dtype='M8[D]')
assert_equal(a[0], a[1])
a = np.array(['2000-01-01', datetime.date(2000, 1, 1)], dtype='M8[D]')
assert_equal(a[0], a[1])
# Will fail if the date changes during the exact right moment
a = np.array(['today', datetime.date.today()], dtype='M8[D]')
assert_equal(a[0], a[1])
# datetime.datetime.now() returns local time, not UTC
#a = np.array(['now', datetime.datetime.now()], dtype='M8[s]')
#assert_equal(a[0], a[1])
# we can give a datetime.date time units
assert_equal(np.array(datetime.date(1960, 3, 12), dtype='M8[s]'),
np.array(np.datetime64('1960-03-12T00:00:00')))
def test_datetime_y2038(self):
# Test parsing on either side of the Y2038 boundary
a = np.datetime64('2038-01-19T03:14:07')
assert_equal(a.view(np.int64), 2**31 - 1)
a = np.datetime64('2038-01-19T03:14:08')
assert_equal(a.view(np.int64), 2**31)
# Test parsing on either side of the Y2038 boundary with
# a manually specified timezone offset
with assert_warns(DeprecationWarning):
a = np.datetime64('2038-01-19T04:14:07+0100')
assert_equal(a.view(np.int64), 2**31 - 1)
with assert_warns(DeprecationWarning):
a = np.datetime64('2038-01-19T04:14:08+0100')
assert_equal(a.view(np.int64), 2**31)
# Test parsing a date after Y2038
a = np.datetime64('2038-01-20T13:21:14')
assert_equal(str(a), '2038-01-20T13:21:14')
def format_time(x):
"""Formats date values
This function formats :class:`datetime.datetime` and
:class:`datetime.timedelta` objects (and the corresponding numpy objects)
using the :func:`xarray.core.formatting.format_timestamp` and the
:func:`xarray.core.formatting.format_timedelta` functions.
Parameters
----------
x: object
The value to format. If not a time object, the value is returned
Returns
-------
str or `x`
Either the formatted time object or the initial `x`"""
if isinstance(x, (datetime64, datetime)):
return format_timestamp(x)
elif isinstance(x, (timedelta64, timedelta)):
return format_timedelta(x)
elif isinstance(x, ndarray):
return list(x) if x.ndim else x[()]
return x
def _parase_fq_factor(code, start, end):
symbol = _code_to_symbol(code)
request = Request(ct.HIST_FQ_FACTOR_URL%(ct.P_TYPE['http'],
ct.DOMAINS['vsf'], symbol))
text = urlopen(request, timeout=10).read()
text = text[1:len(text)-1]
text = text.replace('{_', '{"')
text = text.replace('total', '"total"')
text = text.replace('data', '"data"')
text = text.replace(':"', '":"')
text = text.replace('",_', '","')
text = text.replace('_', '-')
text = json.loads(text)
df = pd.DataFrame({'date':list(text['data'].keys()), 'factor':list(text['data'].values())})
df['date'] = df['date'].map(_fun_except) # for null case
if df['date'].dtypes == np.object:
df['date'] = df['date'].astype(np.datetime64)
df = df.drop_duplicates('date')
df['factor'] = df['factor'].astype(float)
return df
def test_netcdf_monitor_single_time_all_vars():
try:
assert not os.path.isfile('out.nc')
monitor = NetCDFMonitor('out.nc')
monitor.store(state)
assert not os.path.isfile('out.nc') # not set to write on store
monitor.write()
assert os.path.isfile('out.nc')
with xr.open_dataset('out.nc') as ds:
assert len(ds.data_vars.keys()) == 2
assert 'air_temperature' in ds.data_vars.keys()
assert ds.data_vars['air_temperature'].attrs['units'] == 'degK'
assert tuple(ds.data_vars['air_temperature'].shape) == (1, nx, ny, nz)
assert 'air_pressure' in ds.data_vars.keys()
assert ds.data_vars['air_pressure'].attrs['units'] == 'Pa'
assert tuple(ds.data_vars['air_pressure'].shape) == (1, nx, ny, nz)
assert len(ds['time']) == 1
assert ds['time'][0] == np.datetime64(state['time'])
finally: # make sure we remove the output file
if os.path.isfile('out.nc'):
os.remove('out.nc')
def test_too_late_start() -> None:
"""Model start after last release in file"""
config = {
'start_time': np.datetime64('2015-05-02 12'),
'stop_time': np.datetime64('2015-05-03 12'),
'particle_release_file': 'release.rls',
'release_format': ['mult', 'release_time', 'X'],
'release_dtype': dict(mult=int, release_time=np.datetime64, X=float),
'release_type': 'discrete',
'dt': 3600,
'particle_variables': []
}
# Make a release file
with open('release.rls', mode='w') as f:
f.write('2 2015-04-01 100\n')
# Release should quit with SystemExit
with pytest.raises(SystemExit):
ParticleReleaser(config)
# Clean up
os.remove('release.rls')
def test_scalar_none_comparison(self):
# Scalars should still just return False and not give a warnings.
# The comparisons are flagged by pep8, ignore that.
with warnings.catch_warnings(record=True) as w:
warnings.filterwarnings('always', '', FutureWarning)
assert_(not np.float32(1) == None)
assert_(not np.str_('test') == None)
# This is dubious (see below):
assert_(not np.datetime64('NaT') == None)
assert_(np.float32(1) != None)
assert_(np.str_('test') != None)
# This is dubious (see below):
assert_(np.datetime64('NaT') != None)
assert_(len(w) == 0)
# For documentation purposes, this is why the datetime is dubious.
# At the time of deprecation this was no behaviour change, but
# it has to be considered when the deprecations are done.
assert_(np.equal(np.datetime64('NaT'), None))
def test_datetime_nat_casting(self):
a = np.array('NaT', dtype='M8[D]')
b = np.datetime64('NaT', '[D]')
# Arrays
assert_equal(a.astype('M8[s]'), np.array('NaT', dtype='M8[s]'))
assert_equal(a.astype('M8[ms]'), np.array('NaT', dtype='M8[ms]'))
assert_equal(a.astype('M8[M]'), np.array('NaT', dtype='M8[M]'))
assert_equal(a.astype('M8[Y]'), np.array('NaT', dtype='M8[Y]'))
assert_equal(a.astype('M8[W]'), np.array('NaT', dtype='M8[W]'))
# Scalars -> Scalars
assert_equal(np.datetime64(b, '[s]'), np.datetime64('NaT', '[s]'))
assert_equal(np.datetime64(b, '[ms]'), np.datetime64('NaT', '[ms]'))
assert_equal(np.datetime64(b, '[M]'), np.datetime64('NaT', '[M]'))
assert_equal(np.datetime64(b, '[Y]'), np.datetime64('NaT', '[Y]'))
assert_equal(np.datetime64(b, '[W]'), np.datetime64('NaT', '[W]'))
# Arrays -> Scalars
assert_equal(np.datetime64(a, '[s]'), np.datetime64('NaT', '[s]'))
assert_equal(np.datetime64(a, '[ms]'), np.datetime64('NaT', '[ms]'))
assert_equal(np.datetime64(a, '[M]'), np.datetime64('NaT', '[M]'))
assert_equal(np.datetime64(a, '[Y]'), np.datetime64('NaT', '[Y]'))
assert_equal(np.datetime64(a, '[W]'), np.datetime64('NaT', '[W]'))
def test_pydatetime_creation(self):
a = np.array(['1960-03-12', datetime.date(1960, 3, 12)], dtype='M8[D]')
assert_equal(a[0], a[1])
a = np.array(['1999-12-31', datetime.date(1999, 12, 31)], dtype='M8[D]')
assert_equal(a[0], a[1])
a = np.array(['2000-01-01', datetime.date(2000, 1, 1)], dtype='M8[D]')
assert_equal(a[0], a[1])
# Will fail if the date changes during the exact right moment
a = np.array(['today', datetime.date.today()], dtype='M8[D]')
assert_equal(a[0], a[1])
# datetime.datetime.now() returns local time, not UTC
#a = np.array(['now', datetime.datetime.now()], dtype='M8[s]')
#assert_equal(a[0], a[1])
# we can give a datetime.date time units
assert_equal(np.array(datetime.date(1960, 3, 12), dtype='M8[s]'),
np.array(np.datetime64('1960-03-12T00:00:00')))
def test_datetime_y2038(self):
# Test parsing on either side of the Y2038 boundary
a = np.datetime64('2038-01-19T03:14:07')
assert_equal(a.view(np.int64), 2**31 - 1)
a = np.datetime64('2038-01-19T03:14:08')
assert_equal(a.view(np.int64), 2**31)
# Test parsing on either side of the Y2038 boundary with
# a manually specified timezone offset
with assert_warns(DeprecationWarning):
a = np.datetime64('2038-01-19T04:14:07+0100')
assert_equal(a.view(np.int64), 2**31 - 1)
with assert_warns(DeprecationWarning):
a = np.datetime64('2038-01-19T04:14:08+0100')
assert_equal(a.view(np.int64), 2**31)
# Test parsing a date after Y2038
a = np.datetime64('2038-01-20T13:21:14')
assert_equal(str(a), '2038-01-20T13:21:14')
def test_filter_date():
t = Table()
t.a = np.random.rand(10)
t.b = pd.date_range('2000-01-01', freq='D', periods=10)
t.c = np.array([1, 2])
t.add_column('d', np.array([1, 2]), align='bottom')
thres1 = np.array(['2000-01-03'], dtype=np.datetime64)
thres2 = np.array(['2000-01-05'], dtype=np.datetime64)
t1 = t.filter(t.b >= thres1)
assert np.all(t1.c.values == np.array([]))
assert np.all(t1.d.values == np.array([1, 2]))
assert np.all(t1.a.values == t.a.values[2:])
t1 = t.filter((t.b >= thres1) & (t.b <= thres2))
assert np.all(t1.c.values == np.array([]))
assert np.all(t1.d.values == np.array([]))
assert np.all(t1.a.values == t.a.values[2:5])
t1 = t.filter(t.b.date_range(fr=thres1, to=thres2))
assert np.all(t1.c.values == np.array([]))
assert np.all(t1.d.values == np.array([]))
assert np.all(t1.a.values == t.a.values[2:5])
def __init__(self):
self.ipo=ts.new_stocks()
#print ipo.info()
#????
self.ipo['ipo_date']=self.ipo['ipo_date'].astype('datetime64')
#print ipo.info()
self.start=self.ipo['ipo_date'].values[-1]
self.end=self.ipo['ipo_date'].values[0]
print type(self.end)
#????
#ipo['ipo_date']=ipo['ipo_date'].astype('datetime64')
#self.start_d=datetime.datetime.strptime(self.start,'%Y-%m-%d')
#self.end_d=datetime.datetime.strptime(self.end,'%Y-%m-%d')
#print type(self.start_d)
#period=self.start_d+datetime.timedelta(days=30)
#print period.strftime('%Y-%m-%d')
#print ipo[ipo['ipo_date']<np.datetime64(period)]
def how_to_use():
# Set date range for training/test data
train_start_date = np.datetime64("2012-01-01")
train_end_date = np.datetime64("2015-12-31")
test_start_date = np.datetime64("2016-01-01")
test_end_date = np.datetime64("2016-08-31")
# Doanload latest data
stockdata = StockData()
stockdata.download()
# How to train
model = NikkeiModel([], "YourModelName")
model.prepare_training_data(train_start_date, train_end_date)
model.train()
# How to evaluate
model.prepare_test_data(test_start_date, test_end_date)
model.evaluate()
model.backtest()
# How to predict
n225_open = 16500 # Today's N225 Open value
model.predict(n225_open, np.datetime64("today"), downloadData=False)
nanops.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def unique1d(values):
"""
Hash table-based unique
"""
if np.issubdtype(values.dtype, np.floating):
table = _hash.Float64HashTable(len(values))
uniques = np.array(table.unique(_ensure_float64(values)),
dtype=np.float64)
elif np.issubdtype(values.dtype, np.datetime64):
table = _hash.Int64HashTable(len(values))
uniques = table.unique(_ensure_int64(values))
uniques = uniques.view('M8[ns]')
elif np.issubdtype(values.dtype, np.timedelta64):
table = _hash.Int64HashTable(len(values))
uniques = table.unique(_ensure_int64(values))
uniques = uniques.view('m8[ns]')
elif np.issubdtype(values.dtype, np.integer):
table = _hash.Int64HashTable(len(values))
uniques = table.unique(_ensure_int64(values))
else:
table = _hash.PyObjectHashTable(len(values))
uniques = table.unique(_ensure_object(values))
return uniques
internals.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 29
收藏 0
点赞 0
评论 0
def _astype(self, dtype, mgr=None, **kwargs):
"""
these automatically copy, so copy=True has no effect
raise on an except if raise == True
"""
# if we are passed a datetime64[ns, tz]
if com.is_datetime64tz_dtype(dtype):
dtype = DatetimeTZDtype(dtype)
values = self.values
if getattr(values, 'tz', None) is None:
values = DatetimeIndex(values).tz_localize('UTC')
values = values.tz_convert(dtype.tz)
return self.make_block(values)
# delegate
return super(DatetimeBlock, self)._astype(dtype=dtype, **kwargs)
common.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 24
收藏 0
点赞 0
评论 0
def _infer_fill_value(val):
"""
infer the fill value for the nan/NaT from the provided
scalar/ndarray/list-like if we are a NaT, return the correct dtyped
element to provide proper block construction
"""
if not is_list_like(val):
val = [val]
val = np.array(val, copy=False)
if is_datetimelike(val):
return np.array('NaT', dtype=val.dtype)
elif is_object_dtype(val.dtype):
dtype = lib.infer_dtype(_ensure_object(val))
if dtype in ['datetime', 'datetime64']:
return np.array('NaT', dtype=_NS_DTYPE)
elif dtype in ['timedelta', 'timedelta64']:
return np.array('NaT', dtype=_TD_DTYPE)
return np.nan
test_timeseries.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def test_normalize(self):
rng = date_range('1/1/2000 9:30', periods=10, freq='D')
result = rng.normalize()
expected = date_range('1/1/2000', periods=10, freq='D')
self.assertTrue(result.equals(expected))
rng_ns = pd.DatetimeIndex(np.array([1380585623454345752,
1380585612343234312]).astype(
"datetime64[ns]"))
rng_ns_normalized = rng_ns.normalize()
expected = pd.DatetimeIndex(np.array([1380585600000000000,
1380585600000000000]).astype(
"datetime64[ns]"))
self.assertTrue(rng_ns_normalized.equals(expected))
self.assertTrue(result.is_normalized)
self.assertFalse(rng.is_normalized)
test_timeseries.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 26
收藏 0
点赞 0
评论 0
def test_timestamp_compare_scalars(self):
# case where ndim == 0
lhs = np.datetime64(datetime(2013, 12, 6))
rhs = Timestamp('now')
nat = Timestamp('nat')
ops = {'gt': 'lt',
'lt': 'gt',
'ge': 'le',
'le': 'ge',
'eq': 'eq',
'ne': 'ne'}
for left, right in ops.items():
left_f = getattr(operator, left)
right_f = getattr(operator, right)
expected = left_f(lhs, rhs)
result = right_f(rhs, lhs)
self.assertEqual(result, expected)
expected = left_f(rhs, nat)
result = right_f(nat, rhs)
self.assertEqual(result, expected)
test_tslib.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 24
收藏 0
点赞 0
评论 0
def test_barely_oob_dts(self):
one_us = np.timedelta64(1).astype('timedelta64[us]')
# By definition we can't go out of bounds in [ns], so we
# convert the datetime64s to [us] so we can go out of bounds
min_ts_us = np.datetime64(Timestamp.min).astype('M8[us]')
max_ts_us = np.datetime64(Timestamp.max).astype('M8[us]')
# No error for the min/max datetimes
Timestamp(min_ts_us)
Timestamp(max_ts_us)
# One us less than the minimum is an error
self.assertRaises(ValueError, Timestamp, min_ts_us - one_us)
# One us more than the maximum is an error
self.assertRaises(ValueError, Timestamp, max_ts_us + one_us)
def datetime(x):
"""
Helper function to convert list of string objects to np.datetime64 objects.
"""
return np.array(x, dtype=np.datetime64)
def process_line(line):
dt = np.datetime64(line["dt"]).astype(np.int64)
sid = line["sid"]
open_p = float(line["open"])
high_p = float(line["high"])
low_p = float(line["low"])
close_p = float(line["close"])
volume = int(line["volume"])
return (dt, sid, open_p, high_p, low_p, close_p, volume)