def setup_ep_results(self, times, nodes, links, result_types=None):
"""Set up the results object (or file, etc.) for save_ep_line() calls to use.
The basic implementation sets up a dictionary of pandas DataFrames with the keys
being member names of the ResultsType class. If the items parameter is left blank,
the function will use the items that were specified during object creation.
If this too, was blank, then all results parameters will be saved.
"""
if result_types is None:
result_types = self.items
link_items = [ member.name for member in result_types if member.is_link ]
node_items = [ member.name for member in result_types if member.is_node ]
self.results.node = pd.Panel(items=node_items, major_axis=times, minor_axis=nodes)
self.results.link = pd.Panel(items=link_items, major_axis=times, minor_axis=links)
self.results.time = times
self.results.network_name = self.inp_file
python类Panel()的实例源码
def parse(self, entry):
data = pd.read_csv(str(entry),
engine= "c",
sep= "\t",
index_col= 0,
parse_dates= True,
infer_datetime_format= True)
if data.index.name is not None: data.index.name = data.index.name.lower()
data.columns = list(range(24)) * 3
paneldata = pd.Panel({
"above": data.iloc[:, 0:24],
"all": data.iloc[:, 24:48],
"percent": data.iloc[:, 48:72]
})
paneldata.minor_axis.name = "hour"
return paneldata
def digest_bars(self, history_spec, do_ffill):
"""
Get the last (history_spec.bar_count - 1) bars from self.digest_panel
for the requested HistorySpec.
"""
bar_count = history_spec.bar_count
if bar_count == 1:
# slicing with [1 - bar_count:] doesn't work when bar_count == 1,
# so special-casing this.
res = pd.DataFrame(index=[], columns=self.sids, dtype=float)
return res.values, res.index
field = history_spec.field
# Panel axes are (field, dates, sids). We want just the entries for
# the requested field, the last (bar_count - 1) data points, and all
# sids.
digest_panel = self.digest_panels[history_spec.frequency]
frame = digest_panel.get_current(field, raw=True)
if do_ffill:
# Do forward-filling *before* truncating down to the requested
# number of bars. This protects us from losing data if an illiquid
# stock has a gap in its price history.
filled = ffill_digest_frame_from_prior_values(
history_spec.frequency,
history_spec.field,
frame,
self.last_known_prior_values,
raw=True
# Truncate only after we've forward-filled
)
indexer = slice(1 - bar_count, None)
return filled[indexer], digest_panel.current_dates()[indexer]
else:
indexer = slice(1 - bar_count, None)
return frame[indexer, :], digest_panel.current_dates()[indexer]
def buffer_panel_minutes(self,
buffer_panel,
earliest_minute=None,
latest_minute=None,
raw=False):
"""
Get the minutes in @buffer_panel between @earliest_minute and
@latest_minute, inclusive.
@buffer_panel can be a RollingPanel or a plain Panel. If a
RollingPanel is supplied, we call `get_current` to extract a Panel
object.
If no value is specified for @earliest_minute, use all the minutes we
have up until @latest minute.
If no value for @latest_minute is specified, use all values up until
the latest minute.
"""
if isinstance(buffer_panel, RollingPanel):
buffer_panel = buffer_panel.get_current(start=earliest_minute,
end=latest_minute,
raw=raw)
return buffer_panel
# Using .ix here rather than .loc because loc requires that the keys
# are actually in the index, whereas .ix returns all the values between
# earliest_minute and latest_minute, which is what we want.
return buffer_panel.ix[:, earliest_minute:latest_minute, :]
def _create_buffer(self):
panel = pd.Panel(
items=self.items,
minor_axis=self.minor_axis,
major_axis=range(self.cap),
dtype=self.dtype,
)
return panel
def _create_buffer(self):
panel = pd.Panel(
items=self.items,
minor_axis=self.minor_axis,
major_axis=range(self.cap),
dtype=self.dtype,
)
return panel
def get_current(self):
"""
Get a Panel that is the current data in view. It is not safe to persist
these objects because internal data might change
"""
where = slice(self._oldest_frame_idx(), self._pos)
major_axis = pd.DatetimeIndex(deepcopy(self.date_buf[where]), tz='utc')
return pd.Panel(self.buffer.values[:, where, :], self.items,
major_axis, self.minor_axis, dtype=self.dtype)
def make_trade_panel_for_asset_info(dates,
asset_info,
price_start,
price_step_by_date,
price_step_by_sid,
volume_start,
volume_step_by_date,
volume_step_by_sid):
"""
locations where assets did not exist.
"""
sids = list(asset_info.index)
price_sid_deltas = np.arange(len(sids), dtype=float) * price_step_by_sid
price_date_deltas = np.arange(len(dates), dtype=float) * price_step_by_date
prices = (price_sid_deltas + price_date_deltas[:, None]) + price_start
volume_sid_deltas = np.arange(len(sids)) * volume_step_by_sid
volume_date_deltas = np.arange(len(dates)) * volume_step_by_date
volumes = (volume_sid_deltas + volume_date_deltas[:, None]) + volume_start
for j, sid in enumerate(sids):
start_date, end_date = asset_info.loc[sid, ['start_date', 'end_date']]
# Normalize here so the we still generate non-NaN values on the minutes
# for an asset's last trading day.
for i, date in enumerate(dates.normalize()):
if not (start_date <= date <= end_date):
prices[i, j] = np.nan
volumes[i, j] = 0
# Legacy panel sources use a flipped convention from what we return
# elsewhere.
return pd.Panel(
{
'price': prices,
'volume': volumes,
},
major_axis=dates,
minor_axis=sids,
).transpose(2, 1, 0)
def test_basics(self, window=10):
items = ['bar', 'baz', 'foo']
minor = ['A', 'B', 'C', 'D']
rp = MutableIndexRollingPanel(window, items, minor, cap_multiple=2)
dates = pd.date_range('2000-01-01', periods=30, tz='utc')
major_deque = deque(maxlen=window)
frames = {}
for i, date in enumerate(dates):
frame = pd.DataFrame(np.random.randn(3, 4), index=items,
columns=minor)
rp.add_frame(date, frame)
frames[date] = frame
major_deque.append(date)
result = rp.get_current()
expected = pd.Panel(frames, items=list(major_deque),
major_axis=items, minor_axis=minor)
tm.assert_panel_equal(result, expected.swapaxes(0, 1))
def test_close_position_event(self):
pt = perf.PositionTracker(asset_finder=self.env.asset_finder)
dt = pd.Timestamp("1984/03/06 3:00PM")
pos1 = perf.Position(1, amount=np.float64(120.0),
last_sale_date=dt, last_sale_price=3.4)
pos2 = perf.Position(2, amount=np.float64(-100.0),
last_sale_date=dt, last_sale_price=3.4)
pt.update_positions({1: pos1, 2: pos2})
event_type = DATASOURCE_TYPE.CLOSE_POSITION
index = [dt + timedelta(days=1)]
pan = pd.Panel({1: pd.DataFrame({'price': 1, 'volume': 0,
'type': event_type}, index=index),
2: pd.DataFrame({'price': 1, 'volume': 0,
'type': event_type}, index=index),
3: pd.DataFrame({'price': 1, 'volume': 0,
'type': event_type}, index=index)})
source = DataPanelSource(pan)
for i, event in enumerate(source):
txn = pt.maybe_create_close_position_transaction(event)
if event.sid == 1:
# Test owned long
self.assertEqual(-120, txn.amount)
elif event.sid == 2:
# Test owned short
self.assertEqual(100, txn.amount)
elif event.sid == 3:
# Test not-owned SID
self.assertIsNone(txn)
def setUp(self):
self.env = TradingEnvironment()
self.days = self.env.trading_days[:4]
self.panel = pd.Panel({1: pd.DataFrame({
'price': [1, 1, 2, 4], 'volume': [1e9, 1e9, 1e9, 0],
'type': [DATASOURCE_TYPE.TRADE,
DATASOURCE_TYPE.TRADE,
DATASOURCE_TYPE.TRADE,
DATASOURCE_TYPE.CLOSE_POSITION]},
index=self.days)
})
def readGraceData(filename, lat_name, lon_name, data_name, time=None):
'''
This function reads in netcdf data provided by GRACE Tellus
@param filename: Name of file to read in
@param lat_name: Name of latitude data
@param lon_name: Name of longitude data
@param data_name: Name of data product
@param time: Name of time data
'''
nc = Dataset(filename, 'r')
lat_index = nc[lat_name][:]
lon_index = nc[lon_name][:]
data = nc[data_name][:]
if time != None:
time = nc.variables[time]
date_index = pd.to_datetime(num2date(time[:],units=time.units,calendar=time.calendar))
return pd.Panel(data=data, items=date_index,major_axis=lat_index, minor_axis=lon_index)
else:
return pd.DataFrame(data = data, columns=lon_index, index=lat_index)
def __init__(self):
"""Initialize parameters of the Interactive Brokers price handler
object.
"""
super(InteractiveBrokersPriceHandler, self).__init__()
self.conn = ibConnection(
clientId=IB.data_handler_id.value, port=IB.port.value
)
self.conn.register(self.__tick_price_handler, message.tickPrice)
if not self.conn.connect():
raise ValueError(
"Odin was unable to connect to the Trader Workstation."
)
# Set the target field to download data from.
today = dt.datetime.today()
open_t, close_t = dt.time(9, 30), dt.time(16)
cur_t = today.time()
# If today is a weekday and the timing is correct, then we use the most
# recently observed price. Otherwise we use the close price.
if today.weekday() < 5 and cur_t >= open_t and cur_t <= close_t:
self.field = TickType.LAST
else:
self.field = TickType.CLOSE
# Initialize a pandas panel to store the price data.
self.bar = pd.Panel(items=[PriceFields.current_price.value])
def verify_indices_all_unique(obj):
"""
Check that all axes of a pandas object are unique.
Parameters
----------
obj : pd.Series / pd.DataFrame / pd.Panel
The object to validate.
Returns
-------
obj : pd.Series / pd.DataFrame / pd.Panel
The validated object, unchanged.
Raises
------
ValueError
If any axis has duplicate entries.
"""
axis_names = [
('index',), # Series
('index', 'columns'), # DataFrame
('items', 'major_axis', 'minor_axis') # Panel
][obj.ndim - 1] # ndim = 1 should go to entry 0,
for axis_name, index in zip(axis_names, obj.axes):
if index.is_unique:
continue
raise ValueError(
"Duplicate entries in {type}.{axis}: {dupes}.".format(
type=type(obj).__name__,
axis=axis_name,
dupes=sorted(index[index.duplicated()]),
)
)
return obj
def _create_buffer(self):
panel = pd.Panel(
items=self.items,
minor_axis=self.minor_axis,
major_axis=range(self.cap),
dtype=self.dtype,
)
return panel
def _create_buffer(self):
panel = pd.Panel(
items=self.items,
minor_axis=self.minor_axis,
major_axis=range(self.cap),
dtype=self.dtype,
)
return panel
def get_peak_info_panel(self):
pn = pd.Panel(OrderedDict([
('Peak Size ({})'.format(self.get_peak_size_units()), self.get_peak_size()),
('Peak Center ({})'.format(self.x_units), self.get_peak_center()),
('FWHM ({})'.format(self.x_units), self.get_peak_fwhm_absolute()),
('FWHM (ratio)', self.get_peak_fwhm_relative()),
]))
pn = pn.swapaxes('items', 'major')
return pn
def stderrs(self):
"""The standard errors of the parameter estimates."""
return DataFrame(self._get('bse'), index=self._result_idx,
columns=self.exog.columns)
# 3d data (return type is a MultiIndex pd.DataFrame)
# Note that pd.Panel was deprecated in 0.20.1
# For models with >1 exogenous variable, these properties consist of an
# nxm vector for each rolling period.
# The "outer" index will be _result_idx (period-ending basis), with the
# inner indices being the individual periods within each outer period.
# --------------------------------------------------------------------------
generic.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 26
收藏 0
点赞 0
评论 0
def keys(self):
"""Get the 'info axis' (see Indexing for more)
This is index for Series, columns for DataFrame and major_axis for
Panel.
"""
return self._info_axis
generic.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 47
收藏 0
点赞 0
评论 0
def iteritems(self):
"""Iterate over (label, values) on info axis
This is index for Series, columns for DataFrame, major_axis for Panel,
and so on.
"""
for h in self._info_axis:
yield h, self[h]
# originally used to get around 2to3's changes to iteritems.
# Now unnecessary. Sidenote: don't want to deprecate this for a while,
# otherwise libraries that use 2to3 will have issues.