def test_ffill(self):
# test ndim=1
N = 100
s = pd.Series(np.random.randn(N))
mask = random.sample(range(N), 10)
s.iloc[mask] = np.nan
correct = s.ffill().values
test = ffill(s.values)
assert_almost_equal(correct, test)
# test ndim=2
df = pd.DataFrame(np.random.randn(N, N))
df.iloc[mask] = np.nan
correct = df.ffill().values
test = ffill(df.values)
assert_almost_equal(correct, test)
python类DataFrame()的实例源码
def get_expected_next_event_dates(self, dates):
return pd.DataFrame({
0: get_values_for_date_ranges(zip_with_dates,
next_dates[0],
next_date_intervals[0],
dates),
1: get_values_for_date_ranges(zip_with_dates,
next_dates[1],
next_date_intervals[1],
dates),
2: get_values_for_date_ranges(zip_with_dates,
next_dates[2],
next_date_intervals[2],
dates),
3: get_values_for_date_ranges(zip_with_dates,
next_dates[3],
next_date_intervals[3],
dates),
4: zip_with_dates(dates, ['NaT'] * len(dates)),
}, index=dates)
def get_expected_previous_event_dates(self, dates):
return pd.DataFrame({
0: get_values_for_date_ranges(zip_with_dates,
prev_dates[0],
prev_date_intervals[0],
dates),
1: get_values_for_date_ranges(zip_with_dates,
prev_dates[1],
prev_date_intervals[1],
dates),
2: get_values_for_date_ranges(zip_with_dates,
prev_dates[2],
prev_date_intervals[2],
dates),
3: get_values_for_date_ranges(zip_with_dates,
prev_dates[3],
prev_date_intervals[3],
dates),
4: zip_with_dates(dates, ['NaT'] * len(dates)),
}, index=dates)
def get_vals_for_dates(zip_date_index_with_vals,
vals,
date_invervals,
dates):
return pd.DataFrame({
0: get_values_for_date_ranges(zip_date_index_with_vals,
vals[0],
date_invervals[0],
dates),
1: get_values_for_date_ranges(zip_date_index_with_vals,
vals[1],
date_invervals[1],
dates),
2: get_values_for_date_ranges(zip_date_index_with_vals,
vals[2],
date_invervals[2],
dates),
# Assume the latest of 2 cash values is used if we find out about 2
# announcements that happened on the same day for the same sid.
3: get_values_for_date_ranges(zip_date_index_with_vals,
vals[3],
date_invervals[3],
dates),
4: zip_date_index_with_vals(dates, ['NaN'] * len(dates)),
}, index=dates)
def test_auto_deltas(self):
expr = bz.data(
{'ds': self.df,
'ds_deltas': pd.DataFrame(columns=self.df.columns)},
dshape=var * Record((
('ds', self.dshape.measure),
('ds_deltas', self.dshape.measure),
)),
)
loader = BlazeLoader()
ds = from_blaze(
expr.ds,
loader=loader,
missing_values=self.missing_values,
)
self.assertEqual(len(loader), 1)
exprdata = loader[ds]
self.assertTrue(exprdata.expr.isidentical(expr.ds))
self.assertTrue(exprdata.deltas.isidentical(expr.ds_deltas))
def pipeline_event_loader_args(self, dates):
_, mapping = super(
BlazeCashBuybackAuthLoaderTestCase,
self,
).pipeline_event_loader_args(dates)
return (bz.data(pd.concat(
pd.DataFrame({
BUYBACK_ANNOUNCEMENT_FIELD_NAME:
frame[BUYBACK_ANNOUNCEMENT_FIELD_NAME],
CASH_FIELD_NAME:
frame[CASH_FIELD_NAME],
TS_FIELD_NAME:
frame[TS_FIELD_NAME],
SID_FIELD_NAME: sid,
})
for sid, frame in iteritems(mapping)
).reset_index(drop=True)),)
def file_get_iem_data_frame(path):
"""
Return the IEM samplesheet data as a Pandas DataFrame,
to perform better slicing operations.
"""
rows = read_csv_rows(path)
if not rows_are_iem_samplesheet(rows):
raise ValueError("Invalid IEM samplesheet format: %s" % path)
section_gen = rows_iem_section_generator(rows)
for section in section_gen:
if section_is_valid_data(section):
# TODO this appears to be a problem if you have data columns
# with trailing all-blank entries (see CSI-215 fix)
df = pd.DataFrame(data=section.rows[1:], columns=section.rows[0])
# skip tailing rows
return df[df['Sample_ID'].notnull()]
raise ValueError("Invalid IEM samplesheet format, no data found: %s" % path)
def gen_csv_paths(data_dir, pref):
"""
Generate CSV file from image, contour, and segment file paths.
Args:
data_dir: BBBC006 data directory path.
pref: Prefix (either 'train' or 'test')
"""
filenames = get_png_files(os.path.join(data_dir, 'BBBC006_v1_' + pref))
contours = get_png_files(os.path.join(data_dir, 'BBBC006_v1_contours_'
+ pref))
segments = get_png_files(os.path.join(data_dir, 'BBBC006_v1_segments_'
+ pref))
all_files = [filenames, contours, segments]
pd_arr = pd.DataFrame(all_files).transpose()
pd_arr.to_csv(pref + '.csv', index=False, header=False)
def kraken_order_book(book_type: str, currency_code: str = 'EUR', coin_code: str = 'XBT'):
"""Kraken specific orderbook retrieval
"""
import krakenex
kraken_api = krakenex.API(key=KRAKEN_API_KEY, secret=KRAKEN_PRIVATE_KEY, conn=krakenex.Connection())
pair = f'X{coin_code}Z{currency_code}'
orders = kraken_api.query_public('Depth', {'pair': pair})
df = pd.DataFrame(
orders['result'][pair][book_type],
columns=['price', 'volume', 'timestamp'])
return df
def get_irrelevant_cited_papers(bad_papers, db_cursor, papers_table='papers'):
"""Retrieves the papers cited by the irrelevant papers given in input, from a SQL database.
Args:
bad_papers (list of dicts): the list of irrelevant papers, formatted as the output of :func:`data_retrieval.list2paper`
db_cursor (:class:`MySQLdb.cursors.Cursor`): cursor of a SQL database in which there is a papers table
papers_table (string): name of the papers table in the SQL database
Returns:
tuple of tuples: the results of the SQL query
"""
citations = []
for p in bad_papers:
for c in p['citations']:
citations.append([p['index'], c])
citations_df = pd.DataFrame(citations, columns=['citing', 'cited'])
cited = citations_df['cited'].unique()
db_cursor.execute("SELECT id, title, abstract FROM papers p WHERE p.abstract != '' AND p.id IN (" + ','.join(["%s"] * len(cited)) + ")", tuple(cited))
return db_cursor.fetchall()
def parse_fasta(self):
self.ref_id=dict()
self.ref_inf=dict()
i=1
N = 0
ref_inf=np.empty(shape=[0,3])
for seqs in SeqIO.parse(self.ref,'fasta'):
seq_id = seqs.id
self.ref_id[i] = seq_id
seq = str(seqs.seq.upper())
seq_len = len(seq)
self.ref_inf[seq_id]=seq_len
N+=seq.count('N')
ref_inf = np.append(ref_inf,[[i,seq_id,seq_len]],axis=0)
i+=1
self.ref_detail = pd.DataFrame(ref_inf,columns=['Index','Contig','Length(bp)'])
self.N = N
def qualification_filter(self):
"""
Providing information of those unqualified and qualified contigs from the orginal fasta file
with the criterion: >20Kb & >=5 restriction sites inside.
"""
unqualified = np.empty(shape=[0,3])
qualified = np.empty(shape=[0,4])
rm_dup = self.RcmapTable[['CMapId','ContigLength','NumSites']].drop_duplicates()
for i in self.ref_id.keys():
index = i
name = self.ref_id[i]
length = self.ref_inf[name]
if i not in self.RcmapTable['CMapId'].unique():
unqualified = np.append(unqualified,[[index,name, length]],axis=0)
else:
Id = rm_dup[rm_dup['CMapId']==i].index[0]
sites = rm_dup['NumSites'][Id]
qualified = np.append(qualified,[[index,name,length,sites]],axis=0)
self.unqualified = pd.DataFrame(unqualified, columns=['index','contig','length(bp)'])
self.qualified = pd.DataFrame(qualified, columns=['index','contig','length(bp)','numSites'])
def test_append():
np.random.seed(0)
n = 1000
df = pd.DataFrame({'x': np.random.randint(0, 5, size=n),
'y': np.random.normal(size=n)})
gdf = gd.DataFrame.from_pandas(df)
frags = _fragmented_gdf(gdf, nsplit=13)
# Combine with .append
head = frags[0]
tail = frags[1:]
appended = dgd.from_pygdf(head, npartitions=1)
for each in tail:
appended = appended.append(each)
assert_frame_equal(df, appended.compute().to_pandas())
def test_series_append():
np.random.seed(0)
n = 1000
df = pd.DataFrame({'x': np.random.randint(0, 5, size=n),
'y': np.random.normal(size=n)})
gdf = gd.DataFrame.from_pandas(df)
frags = _fragmented_gdf(gdf, nsplit=13)
frags = [df.x for df in frags]
appending = dgd.from_pygdf(frags[0], npartitions=1)
for frag in frags[1:]:
appending = appending.append(frag)
appended = appending.compute().to_pandas()
assert isinstance(appended, pd.Series)
np.testing.assert_array_equal(appended, df.x)
def test_set_index(nelem):
np.random.seed(0)
# Use unique index range as the sort may not be stable-ordering
x = np.arange(nelem)
np.random.shuffle(x)
df = pd.DataFrame({'x': x,
'y': np.random.randint(0, nelem, size=nelem)})
ddf = dd.from_pandas(df, npartitions=2)
dgdf = dgd.from_dask_dataframe(ddf)
expect = ddf.set_index('x').compute()
got = dgdf.set_index('x').compute().to_pandas()
np.testing.assert_array_equal(got.index.values, expect.index.values)
np.testing.assert_array_equal(got.y.values, expect.y.values)
assert got.columns == expect.columns
def test_groupby_single_key(keygen):
np.random.seed(0)
nelem = 500
npartitions = 10
# Generate the keys
xs = keygen(nelem)
assert xs.size == nelem
df = pd.DataFrame({'x': xs,
'z': np.random.normal(size=nelem) + 1})
gdf = gd.DataFrame.from_pandas(df)
dgf = dgd.from_pygdf(gdf, npartitions=npartitions)
groups = dgf.groupby(by=['x']).count()
got = groups.compute().to_pandas()
# Check against expectation
expect = df.groupby(by=['x'], as_index=False).count()
# Check keys
np.testing.assert_array_equal(got.x, expect.x)
# Check values
np.testing.assert_array_equal(got.z, expect.z)
def store_test_predictions(self, prediction_id='_final'):
"""
Stores the test predictions in a CSV file
:param prediction_id: A simple id appended to the name of the summary for uniqueness
:return: None
"""
# prediction id is usually the step count
print 'Storing predictions on Test Data...'
review = []
true_summary = []
generated_summary = []
for i in range(self.test_size):
if not self.checkpointer.is_output_file_present():
review.append(self._index2sentence(self.test_review[i]))
true_summary.append(self._index2sentence(self.true_summary[i]))
if i < (self.test_batch_size * (self.test_size // self.test_batch_size)):
generated_summary.append(self._index2sentence(self.predicted_test_summary[i]))
else:
generated_summary.append('')
prediction_nm = 'generated_summary' + prediction_id
if self.checkpointer.is_output_file_present():
df = pd.read_csv(self.checkpointer.get_result_location(), header=0)
df[prediction_nm] = np.array(generated_summary)
else:
df = pd.DataFrame()
df['review'] = np.array(review)
df['true_summary'] = np.array(true_summary)
df[prediction_nm] = np.array(generated_summary)
df.to_csv(self.checkpointer.get_result_location(), index=False)
print 'Stored the predictions. Moving Forward'
if prediction_id == '_final':
print 'All done. Exiting..'
print 'Exited'
def crawl_for_reviews_and_summary(self, input_file):
"""
Crawl the input dataset
:param input_file: The location of the file containing the txt file dataset
:return: None
"""
self.raw_data_file = input_file
self.df = pd.DataFrame()
self.df['Review'] = self.__crawl_review()
self.df['Summary'] = self.__crawl_summary()
def pearson(X, y):
r = []
p = []
for c in X.columns:
r_, p_ = pearsonr(X[c], y)
r.append(r_)
p.append(p_)
dfr = pd.DataFrame(index=range(1, 1+len(X.columns)))
dfr['pearson'] = r
dfr['pearson_p'] = p
return dfr
def kolmogorov_smirnov(x_train, x_test):
r = []
p = []
for c in x_train.columns:
r_, p_ = ks_2samp(x_train[c], x_test[c])
r.append(r_)
p.append(p_)
dfks = pd.DataFrame(index=range(1, 1 + len(x_train.columns)))
dfks['KS'] = r
dfks['KS_p'] = p
return dfks
def add_group_component(self, components, name, group):
"""Adds a component with given name that contains all of the components
in group.
Parameters
----------
components: Dataframe with components.
name: Name of new group component.
group: List of components that form the group.
Returns
-------
Dataframe with components.
"""
new_comp = components[components['component'].isin(set(group))].copy()
new_comp['component'] = name
components = components.append(new_comp)
return components
def predictive_samples(self, df):
"""Sample from the posterior predictive distribution.
Parameters
----------
df: Dataframe with dates for predictions (column ds), and capacity
(column cap) if logistic growth.
Returns
-------
Dictionary with keys "trend", "seasonal", and "yhat" containing
posterior predictive samples for that component. "seasonal" is the sum
of seasonalities, holidays, and added regressors.
"""
df = self.setup_dataframe(df.copy())
sim_values = self.sample_posterior_predictive(df)
return sim_values
def get_actions(start_date, end_date):
"""
:param start_date:
:param end_date:
:return: actions: pd.Dataframe
"""
dump_path = './cache/all_action_%s_%s.pkl' % (start_date, end_date)
if os.path.exists(dump_path):
actions = pickle.load(open(dump_path))
else:
action_1 = get_actions_1()
action_2 = get_actions_2()
action_3 = get_actions_3()
actions = pd.concat([action_1, action_2, action_3]) # type: pd.DataFrame
actions = actions[(actions.time >= start_date) & (actions.time < end_date)]
pickle.dump(actions, open(dump_path, 'w'))
return actions
def get_actions(start_date, end_date):
"""
????????????action??
:param start_date:
:param end_date:
:return: actions: pd.Dataframe
"""
dump_path = './cache/all_action_%s_%s.csv' % (start_date, end_date)
if os.path.exists(dump_path):
# actions = pickle.load(open(dump_path))
actions = pd.read_csv(dump_path)
else:
action_1 = get_actions_1()
action_2 = get_actions_2()
action_3 = get_actions_3()
actions = pd.concat([action_1, action_2, action_3]) # type: pd.DataFrame
actions = actions[(actions.time >= start_date) & (actions.time < end_date)]
# pickle.dump(actions, open(dump_path, 'w'))
actions.to_csv(dump_path, index=False)
print 'action combination finish...'
return actions
def get_actions(start_date, end_date):
"""
????????? actions
:param start_date:
:param end_date:
:return: actions: pd.Dataframe
"""
dump_path = './cache/all_action_%s_%s.pkl' % (start_date, end_date)
if os.path.exists(dump_path):
actions = pickle.load(open(dump_path))
else:
action_1 = get_actions_1()
action_2 = get_actions_2()
action_3 = get_actions_3()
actions = pd.concat([action_1, action_2, action_3])
actions = actions[(actions.time >= start_date) & (actions.time < end_date)]
pickle.dump(actions, open(dump_path, 'w'))
return actions
def sample_damage_state(self, Pr):
"""
Sample the damage state using a uniform random variable
Parameters
-----------
Pr : pd.Dataframe
Probability of exceeding a damage state
Returns
-------
damage_state : pd.Series
The damage state of each element
"""
p = pd.Series(data = np.random.uniform(size=Pr.shape[0]), index=Pr.index)
damage_state = pd.Series(data=[None]* Pr.shape[0], index=Pr.index)
for DS_names in Pr.columns:
damage_state[p < Pr[DS_names]] = DS_names
return damage_state
def get_stress(self, p=None, tindex=None):
"""Returns the stress or stresses of the time series object as a pandas
DataFrame.
If the time series object has multiple stresses each column
represents a stress.
Returns
-------
stress: pd.Dataframe
Pandas dataframe of the stress(es)
"""
if tindex is not None:
return self.stress[tindex]
else:
return self.stress
def predict(self, df=None):
"""Predict using the prophet model.
Parameters
----------
df: pd.DataFrame with dates for predictions (column ds), and capacity
(column cap) if logistic growth. If not provided, predictions are
made on the history.
Returns
-------
A pd.DataFrame with the forecast components.
"""
if df is None:
df = self.history.copy()
else:
if df.shape[0] == 0:
raise ValueError('Dataframe has no rows.')
df = self.setup_dataframe(df.copy())
df['trend'] = self.predict_trend(df)
seasonal_components = self.predict_seasonal_components(df)
intervals = self.predict_uncertainty(df)
# Drop columns except ds, cap, floor, and trend
cols = ['ds', 'trend']
if 'cap' in df:
cols.append('cap')
if self.logistic_floor:
cols.append('floor')
# Add in forecast components
df2 = pd.concat((df[cols], intervals, seasonal_components), axis=1)
df2['yhat'] = df2['trend'] + df2['seasonal']
return df2
def sample_model(self, df, seasonal_features, iteration):
"""Simulate observations from the extrapolated generative model.
Parameters
----------
df: Prediction dataframe.
seasonal_features: pd.DataFrame of seasonal features.
iteration: Int sampling iteration to use parameters from.
Returns
-------
Dataframe with trend, seasonality, and yhat, each like df['t'].
"""
trend = self.sample_predictive_trend(df, iteration)
beta = self.params['beta'][iteration]
seasonal = np.matmul(seasonal_features.as_matrix(), beta) * self.y_scale
sigma = self.params['sigma_obs'][iteration]
noise = np.random.normal(0, sigma, df.shape[0]) * self.y_scale
return pd.DataFrame({
'yhat': trend + seasonal + noise,
'trend': trend,
'seasonal': seasonal,
})
def make_future_dataframe(self, periods, freq='D', include_history=True):
"""Simulate the trend using the extrapolated generative model.
Parameters
----------
periods: Int number of periods to forecast forward.
freq: Any valid frequency for pd.date_range, such as 'D' or 'M'.
include_history: Boolean to include the historical dates in the data
frame for predictions.
Returns
-------
pd.Dataframe that extends forward from the end of self.history for the
requested number of periods.
"""
last_date = self.history_dates.max()
dates = pd.date_range(
start=last_date,
periods=periods + 1, # An extra in case we include start
freq=freq)
dates = dates[dates > last_date] # Drop start if equals last_date
dates = dates[:periods] # Return correct number of periods
if include_history:
dates = np.concatenate((np.array(self.history_dates), dates))
return pd.DataFrame({'ds': dates})