def check_commands(self, event):
l = len(self.sched_commands)
curr_time = datetime.datetime.now()
i = 0
while(i<l and curr_time >= self.sched_commands[i][0]):
logging.info(u'exec command:,i=%s,time=%s,command[i][1]=%s' % (i, curr_time, self.sched_commands[i][1].__name__))
arg = self.sched_commands[i][2]
self.sched_commands[i][1](**arg)
i += 1
if i>0:
del self.sched_commands[0:i]
python类datetime()的实例源码
def day_switch(self, event):
newday = event.dict['date']
if newday <= self.scur_day:
return
self.logger.info('switching the trading day from %s to %s, reset tick_id=%s to 0' % (self.scur_day, newday, self.tick_id))
if not self.eod_flag:
self.run_eod()
self.scur_day = newday
self.tick_id = 0
self.timer_count = 0
super(Agent, self).mkt_data_sod(newday)
self.eod_flag = False
eod_time = datetime.datetime.combine(newday, datetime.time(15, 20, 0))
self.put_command(eod_time, self.run_eod)
def run_tick(self, event):#???????
tick = event.dict['data']
if self.live_trading:
now_ticknum = get_tick_num(datetime.datetime.now())
cur_ticknum = get_tick_num(tick.timestamp)
if abs(cur_ticknum - now_ticknum)> self.realtime_tick_diff:
self.logger.warning('the tick timestamp has more than 10sec diff from the system time, inst=%s, ticknum= %s, now_ticknum=%s' % (tick.instID, cur_ticknum, now_ticknum))
if not self.update_instrument(tick):
return
self.update_min_bar(tick)
inst = tick.instID
for key in self.inst2spread[inst]:
self.trade_manager.check_pending_trades(key)
self.trade_manager.check_pending_trades(inst)
self.trade_manager.process_trades()
def ctp_qry_instruments(self, event):
dtime = datetime.datetime.now()
min_id = get_min_id(dtime)
if min_id < 250:
gateway = self.type2gateway['CTP']
gateway.qry_commands.append(gateway.tdApi.qryInstrument)
def load_from_yahoo(indexes=None,
stocks=None,
start=None,
end=None,
adjusted=True):
"""
Loads price data from Yahoo into a dataframe for each of the indicated
securities. By default, 'price' is taken from Yahoo's 'Adjusted Close',
which removes the impact of splits and dividends. If the argument
'adjusted' is False, then the non-adjusted 'close' field is used instead.
:param indexes: Financial indexes to load.
:type indexes: dict
:param stocks: Stock closing prices to load.
:type stocks: list
:param start: Retrieve prices from start date on.
:type start: datetime
:param end: Retrieve prices until end date.
:type end: datetime
:param adjusted: Adjust the price for splits and dividends.
:type adjusted: bool
"""
import ipdb; ipdb.set_trace() # BREAKPOINT
data = _load_raw_yahoo_data(indexes, stocks, start, end)
if adjusted:
close_key = 'Adj Close'
else:
close_key = 'Close'
df = pd.DataFrame({key: d[close_key] for key, d in iteritems(data)})
df.index = df.index.tz_localize(pytz.utc)
return df
sas7bdat.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 28
收藏 0
点赞 0
评论 0
def _chunk_to_dataframe(self):
n = self._current_row_in_chunk_index
m = self._current_row_in_file_index
ix = range(m - n, m)
rslt = pd.DataFrame(index=ix)
js, jb = 0, 0
for j in range(self.column_count):
name = self.column_names[j]
if self.column_types[j] == b'd':
rslt[name] = self._byte_chunk[jb, :].view(
dtype=self.byte_order + 'd')
rslt[name] = np.asarray(rslt[name], dtype=np.float64)
if self.convert_dates and (self.column_formats[j] == "MMDDYY"):
epoch = pd.datetime(1960, 1, 1)
rslt[name] = epoch + pd.to_timedelta(rslt[name], unit='d')
jb += 1
elif self.column_types[j] == b's':
rslt[name] = self._string_chunk[js, :]
rslt[name] = rslt[name].apply(lambda x: x.rstrip(b'\x00 '))
if self.encoding is not None:
rslt[name] = rslt[name].apply(
lambda x: x.decode(encoding=self.encoding))
if self.blank_missing:
ii = rslt[name].str.len() == 0
rslt.loc[ii, name] = np.nan
js += 1
else:
raise ValueError("unknown column type %s" %
self.column_types[j])
return rslt
test_feature_availability_profiler.py 文件源码
项目:healthcareai-py
作者: HealthCatalyst
项目源码
文件源码
阅读 24
收藏 0
点赞 0
评论 0
def setUp(self):
self.df = pd.DataFrame(np.random.randn(1000, 4),
columns=['A', 'B', 'AdmitDTS',
'LastLoadDTS'])
self.df['AdmitDTS'] = pd.datetime(2015, 5, 20)
test_feature_availability_profiler.py 文件源码
项目:healthcareai-py
作者: HealthCatalyst
项目源码
文件源码
阅读 24
收藏 0
点赞 0
评论 0
def setUp(self):
self.df = pd.DataFrame(np.random.randn(1000, 2),
columns=['AdmitDTS',
'LastLoadDTS'])
# generate load date
self.df['LastLoadDTS'] = pd.datetime(2015, 5, 20)
# generate datetime objects for admit date
admit = pd.Series(1000)
delta = pd.datetime(2015, 5, 20) - pd.datetime(2015, 5, 1)
int_delta = (delta.days * 24 * 60 * 60) + delta.seconds
for i in range(1000):
random_second = randrange(int_delta)
admit[i] = pd.datetime(2015, 5, 1) + timedelta(
seconds=random_second)
self.df['AdmitDTS'] = admit
def get_nyears_back(raw_data, back=1):
"""N??????"""
all_periods = raw_data.index.get_level_values(0).unique()
l=[]
for period in all_periods:
ly = pd.datetime(period.year-back, period.month, period.day)
if ly in all_periods:
data_ly = raw_data.loc[[ly]].copy()
data_ly.index = pd.MultiIndex.from_product([[period], data_ly.index.get_level_values(1)],
names=data_ly.index.names)
l.append(data_ly)
else:
pass
new_data = pd.concat(l)
return new_data
def sort_dividend(divs):
"""
??????????
:param divs:
:return:
"""
if len(divs) > 0:
df = pd.DataFrame(divs)
df = df.sort_values(by='time')
df.time = df.time.apply(lambda x: pd.datetime.utcfromtimestamp(x))
df = df.set_index('time')
return df
def read_nox(ifile):
_date = datetime.datetime.strptime(os.path.basename(ifile).split('_')[1], '%y%m%d')
year = _date.year
month = _date.month
day = _date.day
nox_dateparse = lambda x: pd.datetime(year, month, day) + \
datetime.timedelta(seconds=int(float(float(x) % 1)*86400.))
df_nox = pd.read_csv(ifile, parse_dates=[0], date_parser=nox_dateparse)
df_nox = df_nox.set_index('TheTime') # Setting index
t = df_nox.index.values
df_nox['timestamp'] = t.astype('datetime64[s]') # Converting index data type
df_nox = df_nox[['timestamp', 'no_conc', 'no2_conc', 'nox_conc']]
df_nox[df_nox < 0] = np.nan
return df_nox
def __init__(self, df, Q, st, end='', lab='', excs=[0, 0, 0], excf=[0, 0, 0]):
self.ymd = [datetime.now().year, datetime.now().month, datetime.now().day]
if end == '':
end = self.ymd
if lab == '':
self.Qlab = 'Discharge'
else:
self.Qlab = lab
self.Qz = df[Q][0]
self.rec_results = self.recession(df, Q, st, end, excs, excf)
def test_analyze_frequency_happy(self):
df = pd.DataFrame({
'date':
list(pd.date_range(pd.datetime(2015, 6, 15), pd.datetime(2015, 6, 18))) +
list(pd.date_range(pd.datetime(2015, 6, 1, 1), pd.datetime(2015, 6, 4, 1))),
'series_key': ['a'] * 4 + ['b'] * 4
})
ts_config = {'date_split_col': 'date', 'series_key_cols': ['series_key']}
frequency = analyze_frequency(df, ts_config)
self.assertEqual(frequency, pd.Timedelta(days=1))
def test_analyse_frquency_exception(self):
df = pd.DataFrame({
'date':
list(pd.date_range(pd.datetime(2015, 6, 15), pd.datetime(2015, 6, 18))) +
list(pd.date_range(pd.datetime(2015, 6, 1, 1), pd.datetime(2015, 6, 4, 1))),
'series_key': ['a'] * 8
})
ts_config = {'date_split_col': 'date', 'series_key_cols': ['series_key']}
with self.assertRaises(ValueError):
analyze_frequency(df, ts_config)
def test_medians_no_series_keys(self):
guac = test_util.load_dataset('bike_sharing', target='count')
guac.make_time_series('datetime', prediction_length=1, frequency=pd.DateOffset(hours=1))
medians = HistoricalMedians([1], guac.config, guac.logger)
out = medians.execute(guac.data)
out.df = out.df.sort_values('datetime')
self.assertTrue(np.isnan(out.df['count_median_1'].iloc[0]))
self.assertAlmostEqual(out.df['count_median_1'].iloc[1], 16, delta=1)
def load_data(indexes=None,stockList=None,start=None,end=None,adjusted=True):
"""
load stocks from Mongo
"""
assert indexes is not None or stockList is not None, """
must specify stockList or indexes"""
if start is None:
start = "1990-01-01"
if start is not None and end is not None:
startdate = datetime.datetime.strptime(start, "%Y-%m-%d")
enddate=datetime.datetime.strptime(end, "%Y-%m-%d")
assert startdate < enddate, "start date is later than end date."
data = OrderedDict()
l=LoadDataCVS(constants.IP,constants.PORT)
l.Conn()
if stockList=="hs300" or stockList=="zz500" or stockList=="sz50" or stockList=="all":
stocks=l.getstocklist(stockList)
else:
stocks=stockList
#print stocks
if stocks is not None:
for stock in stocks:
stkd= l.getstockdaily(stock,start,end)
data[stock] = stkd
if indexes is not None:
for name, ticker in iteritems(indexes):
logger.info('Loading index: {} ({})'.format(name, ticker))
stkd= l.getindexdaily(indexes,start,end)
data[name] = stkd
panel = pd.Panel(data)
panel.minor_axis = ['open', 'high', 'low', 'close', 'volume', 'price','change','code']
panel.major_axis = panel.major_axis.tz_localize(pytz.utc)
#close the connection
l.Close()
# Adjust data
if adjusted:
adj_cols = ['open', 'high', 'low', 'close']
for ticker in panel.items:
ratio = (panel[ticker]['price'] / panel[ticker]['close'])
ratio_filtered = ratio.fillna(0).values
for col in adj_cols:
panel[ticker][col] *= ratio_filtered
return panel
def _load_raw_yahoo_data(indexes=None, stocks=None, start=None, end=None):
"""Load closing prices from yahoo finance.
:Optional:
indexes : dict (Default: {'SPX': '^GSPC'})
Financial indexes to load.
stocks : list (Default: ['AAPL', 'GE', 'IBM', 'MSFT',
'XOM', 'AA', 'JNJ', 'PEP', 'KO'])
Stock closing prices to load.
start : datetime (Default: datetime(1993, 1, 1, 0, 0, 0, 0, pytz.utc))
Retrieve prices from start date on.
end : datetime (Default: datetime(2002, 1, 1, 0, 0, 0, 0, pytz.utc))
Retrieve prices until end date.
:Note:
This is based on code presented in a talk by Wes McKinney:
http://wesmckinney.com/files/20111017/notebook_output.pdf
"""
assert indexes is not None or stocks is not None, """
must specify stocks or indexes"""
if start is None:
start = pd.datetime(1990, 1, 1, 0, 0, 0, 0, pytz.utc)
if start is not None and end is not None:
assert start < end, "start date is later than end date."
data = OrderedDict()
if stocks is not None:
for stock in stocks:
logger.info('Loading stock: {}'.format(stock))
stock_pathsafe = stock.replace(os.path.sep, '--')
cache_filename = "{stock}-{start}-{end}.csv".format(
stock=stock_pathsafe,
start=start,
end=end).replace(':', '-')
cache_filepath = get_cache_filepath(cache_filename)
if os.path.exists(cache_filepath):
stkd = pd.DataFrame.from_csv(cache_filepath)
else:
stkd = DataReader(stock, 'yahoo', start, end).sort_index()
stkd.to_csv(cache_filepath)
data[stock] = stkd
if indexes is not None:
for name, ticker in iteritems(indexes):
logger.info('Loading index: {} ({})'.format(name, ticker))
stkd = DataReader(ticker, 'yahoo', start, end).sort_index()
data[name] = stkd
return data
def load_bars_from_yahoo(indexes=None,
stocks=None,
start=None,
end=None,
adjusted=True):
"""
Loads data from Yahoo into a panel with the following
column names for each indicated security:
- open
- high
- low
- close
- volume
- price
Note that 'price' is Yahoo's 'Adjusted Close', which removes the
impact of splits and dividends. If the argument 'adjusted' is True, then
the open, high, low, and close values are adjusted as well.
:param indexes: Financial indexes to load.
:type indexes: dict
:param stocks: Stock closing prices to load.
:type stocks: list
:param start: Retrieve prices from start date on.
:type start: datetime
:param end: Retrieve prices until end date.
:type end: datetime
:param adjusted: Adjust open/high/low/close for splits and dividends.
The 'price' field is always adjusted.
:type adjusted: bool
"""
data = _load_raw_yahoo_data(indexes, stocks, start, end)
panel = pd.Panel(data)
# Rename columns
panel.minor_axis = ['open', 'high', 'low', 'close', 'volume', 'price']
panel.major_axis = panel.major_axis.tz_localize(pytz.utc)
# Adjust data
if adjusted:
adj_cols = ['open', 'high', 'low', 'close']
for ticker in panel.items:
ratio = (panel[ticker]['price'] / panel[ticker]['close'])
ratio_filtered = ratio.fillna(0).values
for col in adj_cols:
panel[ticker][col] *= ratio_filtered
return panel
def create_bdew_load_profiles(self, dt_index, slp_types, holidays=None):
"""Calculates the hourly electricity load profile in MWh/h of a region.
"""
# define file path of slp csv data
file_path = os.path.join(self.datapath, 'selp_series.csv')
# Read standard load profile series from csv file
selp_series = pd.read_csv(file_path)
tmp_df = selp_series
# Create an index to merge. The year and month will be ignored only the
# time index is necessary.
index = pd.date_range(
pd.datetime(2007, 1, 1, 0), periods=2016, freq='15Min')
tmp_df.set_index(index, inplace=True)
# Create empty DataFrame to take the results.
new_df = pd.DataFrame(index=dt_index, columns=slp_types).fillna(0)
new_df = add_weekdays2df(new_df, holidays=holidays,
holiday_is_sunday=True)
new_df['hour'] = dt_index.hour + 1
new_df['minute'] = dt_index.minute
time_df = new_df[['date', 'hour', 'minute', 'weekday']].copy()
tmp_df[slp_types] = tmp_df[slp_types].astype(float)
# Inner join the slps on the time_df to the slp's for a whole year
tmp_df['hour_of_day'] = tmp_df.index.hour + 1
tmp_df['minute_of_hour'] = tmp_df.index.minute
left_cols = ['hour_of_day', 'minute_of_hour', 'weekday']
right_cols = ['hour', 'minute', 'weekday']
tmp_df = tmp_df.reset_index()
tmp_df.pop('index')
for p in self.seasons.keys():
a = pd.datetime(self.year, self.seasons[p][0],
self.seasons[p][1], 0, 0)
b = pd.datetime(self.year, self.seasons[p][2],
self.seasons[p][3], 23, 59)
new_df.update(pd.DataFrame.merge(
tmp_df[tmp_df['period'] == p[:-1]], time_df[a:b],
left_on=left_cols, right_on=right_cols,
how='inner', left_index=True).sort_index().drop(
['hour_of_day'], 1))
new_df.drop('date', axis=1, inplace=True)
return new_df.div(new_df.sum(axis=0), axis=1)
def _load_raw_yahoo_data(indexes=None, stocks=None, start=None, end=None):
"""Load closing prices from yahoo finance.
:Optional:
indexes : dict (Default: {'SPX': '^SPY'})
Financial indexes to load.
stocks : list (Default: ['AAPL', 'GE', 'IBM', 'MSFT',
'XOM', 'AA', 'JNJ', 'PEP', 'KO'])
Stock closing prices to load.
start : datetime (Default: datetime(1993, 1, 1, 0, 0, 0, 0, pytz.utc))
Retrieve prices from start date on.
end : datetime (Default: datetime(2002, 1, 1, 0, 0, 0, 0, pytz.utc))
Retrieve prices until end date.
:Note:
This is based on code presented in a talk by Wes McKinney:
http://wesmckinney.com/files/20111017/notebook_output.pdf
"""
assert indexes is not None or stocks is not None, """
must specify stocks or indexes"""
if start is None:
start = pd.datetime(1990, 1, 1, 0, 0, 0, 0, pytz.utc)
if start is not None and end is not None:
assert start < end, "start date is later than end date."
data = OrderedDict()
if stocks is not None:
for stock in stocks:
logger.info('Loading stock: {}'.format(stock))
stock_pathsafe = stock.replace(os.path.sep, '--')
cache_filename = "{stock}-{start}-{end}.csv".format(
stock=stock_pathsafe,
start=start,
end=end).replace(':', '-')
cache_filepath = get_cache_filepath(cache_filename)
if os.path.exists(cache_filepath):
stkd = pd.DataFrame.from_csv(cache_filepath)
else:
stkd = DataReader(stock, 'yahoo', start, end).sort_index()
stkd.to_csv(cache_filepath)
data[stock] = stkd
if indexes is not None:
for name, ticker in iteritems(indexes):
logger.info('Loading index: {} ({})'.format(name, ticker))
stkd = DataReader(ticker, 'yahoo', start, end).sort_index()
data[name] = stkd
return data