def get_analysis(self, instrument):
"""??????????"""
# pd.set_option("display.max_rows", len(x))
ohlc_data = self.feed_list[0].bar.df
ohlc_data.set_index("date", inplace=True)
ohlc_data.index = pd.DatetimeIndex(ohlc_data.index)
dbal = self.fill.balance.df
start = dbal.index[0]
end = dbal.index[-1]
capital = self.fill.initial_cash
tlog = self.get_tlog(instrument)
tlog = tlog[tlog["units"] != 0]
tlog.reset_index(drop=True, inplace=True)
analysis = stats(ohlc_data, tlog, dbal, start, end, capital)
print(dict_to_table(analysis))
python类set_option()的实例源码
def df_to_string(df):
"""
Create a formatted str representation of the DataFrame.
Parameters
----------
df: DataFrame
Returns
-------
str
"""
pd.set_option('display.expand_frame_repr', False)
pd.set_option('precision', 8)
pd.set_option('display.width', 1000)
pd.set_option('display.max_colwidth', 1000)
return df.to_string()
def run(self):
#pandas.set_option('display.width', 200)
#s_code,code,dateline,chg_m,chg,open,close,high,low,last_close,name ma5 ma89 dif difma
self.makeMa()
#print self.df
#last5 = self.df[self.df['dateline'] == int(self.setting['end'])]
#print last5
#sys.exit()
tmp = self.df[(self.df['dateline'] == int(self.setting['end'])) & (self.df['dif'] > self.df['difma'])]
#print tmp
#sys.exit()
for code in tmp.values:
#print code
#sys.exit()
if code[13] >= code[14]:
print "%s,%s,DMA,%s" % (code[0], code[10], code[3])
#print code
#sys.exit()
#print self.df
#dif = ma(close,5) - ma(close,89)
#difma = ma(dif,36)
def ACO(self, df):
"""
Helper indicator
:param df:
:return:
"""
df_mid_points = (df['High'] + df['Low']) / 2
mid_points = Data.toFloatArray(df_mid_points)
longav = tl.SMA(np.array(mid_points), timeperiod=40)
shortav = tl.SMA(np.array(mid_points), timeperiod=15)
A0 = longav - shortav
Mavg = tl.SMA(A0, timeperiod=15)
AcResult = tl.SMA(Mavg - A0, timeperiod=15)
signals = np.diff(AcResult)
return signals
# if __name__ == "__main__":
# np.set_printoptions(threshold=np.nan)
# pd.set_option("display.max_rows", 280)
# dt = Data()
# df = dt.getCSVData()
# #ACOscillator(df)
# ACOscillator(df)
def show_domain_stats(log, output, top=50):
log['Domain'] = log['url'].apply(get_domain)
by_domain = log.groupby('Domain')
top_domains = (
by_domain.count().sort_values('url', ascending=False)['url'].index)
stats_by_domain = pd.DataFrame(index=top_domains)
stats_by_domain['Pages'] = by_domain.count()['url']
stats_by_domain['Total Score'] = by_domain.sum()['score'].astype(int)
stats_by_domain['Mean Score'] = by_domain.mean()['score']
stats_by_domain['Max Depth'] = by_domain.max()['depth']
stats_by_domain['Median Depth'] = by_domain.median()['depth'].astype(int)
print()
pages = stats_by_domain['Pages']
print('Top {} domains stats (covering {:.1%} pages)'
.format(top, pages[:top].sum() / pages.sum()))
pd.set_option('display.width', 1000)
print(stats_by_domain[:top])
if output:
filename = '{}-by-domain.csv'.format(output)
stats_by_domain.to_csv(filename)
print()
print('Saved domain stats to {}'.format(filename))
def cleanup(me):
if hasattr(me, "_pd_display_maxcolwidth"):
pd.set_option('display.max_colwidth', me._pd_display_maxcolwidth)
engine, session, handler, patchers = me.engine, me.session, me.handler, me.patchers
if me.engine:
if me.session:
try:
me.session.rollback()
me.session.close()
except:
pass
try:
Base.metadata.drop_all(me.engine)
except:
pass
for patcher in patchers:
patcher.stop()
hndls = query_logger.handlers[:]
handler.close()
for h in hndls:
if h is handler:
query_logger.removeHandler(h)
def convert_tables(self):
"""
Based on the confidence score, convert xmap file and two corresponding cmap files
into "pandas table".
"""
pd.set_option('display.width',200)
with open ('%s.table' % self.name, 'a') as xmap_table:
with open (self.xmap) as xmap:
for line in xmap:
if line.startswith('#h'):
hearder = line[3:]
xmap_table.write(hearder)
if line[0]!='#':
xmap_table.write(line)
with open ('%s.rtable' % self.name, 'a') as rcmap_table:
with open (self.rcmap) as rcmap:
for line in rcmap:
if line.startswith('#h'):
hearder = line[3:]
rcmap_table.write(hearder)
if line[0]!='#':
rcmap_table.write(line)
with open ('%s.qtable' % self.name, 'a') as qcmap_table:
with open (self.qcmap) as qcmap:
for line in qcmap:
if line.startswith('#h'):
hearder = line[3:]
qcmap_table.write(hearder)
if line[0]!='#':
qcmap_table.write(line)
self.XmapTable = pd.read_table('%s.table' % self.name)
headers_x = ['RefContigID','RefStartPos','RefEndPos','QryContigID','QryStartPos',
'QryEndPos','Orientation', 'Confidence','QryLen','RefLen', 'Alignment']
self.filtered_XmapTable = self.XmapTable[self.XmapTable['Confidence']>=self.confidence_score][headers_x].reset_index(drop=True)
headers_r = ['CMapId','ContigLength','NumSites','SiteID','Position']
self.RcmapTable = pd.read_table('%s.rtable' % self.name)[headers_r]
headers_q = ['CMapId','ContigLength','NumSites','SiteID','Position','Coverage']
self.QcmapTable = pd.read_table('%s.qtable' % self.name)[headers_q]
os.remove('%s.table' % self.name)
os.remove('%s.rtable' % self.name)
os.remove('%s.qtable' % self.name)
def get_results(filename, seed_image_id):
pd.set_option('display.max_rows', 10000)
start_time = time.time()
df = pd.read_csv(filename)
# temp_key, key, ground_truth, prediction, result
# 00000, 00000, 43, 1095, 0.3076
if len(df.index) == 0:
# Wow, it could not find anything. The image must be mud...
return []
del df['temp_key']
df.prediction = df.ground_truth - (df.prediction - 1000)
df_plus = df[df.prediction >= 0]
df_neg = df[df.prediction < 0]
df_neg.prediction += 360
df = pd.concat([df_plus, df_neg])
del df['ground_truth']
df = df.groupby(['key', 'prediction']).result.sum().reset_index()
filtered_results = []
for image_id, image_results in df.groupby(['key']):
top_result_index = image_results['result'].idxmax()
angle = image_results.ix[top_result_index]['prediction']
max_value = image_results.ix[top_result_index]['result']
filtered_results.append([seed_image_id, image_id, int(angle), max_value])
print 'Done reading results, with slow python, for seed image ID:' + str(seed_image_id) + ' in %s seconds' % (
time.time() - start_time)
return filtered_results
def _gen_summary(self, col_width=50):
pd.set_option('display.max_colwidth', -1)
song_name = '<p style="color:{0};">{1}</p>'.format(attr_color[self.live.attr], self.live.name)
df_head = pd.DataFrame({'Song Name': [song_name]})
df_head['Difficulty'] = self.live.difficulty
df_head['Score'] = int(self.global_status['cum_score'])
df_head['Cover Rate'] = '{0:.2f}%'.format(100*(self.simul_result['timing_sec'] <= self.simul_result['judge_end_time']).mean())
df_head['Max Combo'] = self.simul_result['combo'].max()
for accr in accuracy_list:
df_head[accr] = self.global_status['note_stat'][accr]
card = ['<img src="{0}" width={1} />'.format(icon_path(card.card_id, card.idolized), col_width) for card in self.card_list]
summary, keys = [], ['base_score', 'score', 'hp', 'judge', 'weak_judge']
for i in range(len(card)):
temp = {k:getattr(self.skill_tracker[i], 'cum_'+k) for k in keys}
temp['card'] = card[i]
summary.append(temp)
df = pd.DataFrame(summary, columns=['card']+keys)
df = df.append(pd.DataFrame(df.sum()).transpose())
df['base_score'] = df['base_score'].apply(lambda x: '<p>{0}</p>'.format(int(x)))
df['score'] = df['score'].apply(lambda x: '<p>{0}</p>'.format(int(x)))
df['hp'] = df['hp'].apply(lambda x: '<p>{0}</p>'.format(int(x)))
df['judge'] = df['judge'].apply(lambda x: '<p>{0}</p>'.format(round(x,1)))
df['weak_judge'] = df['weak_judge'].apply(lambda x: '<p>{0}</p>'.format(round(x,1)))
df.index = ['<p>{0}</p>'.format(x) for x in ['L1', 'L2', 'L3', 'L4', 'C', 'R4', 'R3', 'R2', 'R1', 'Total']]
df.loc['<p>Total</p>', 'card'] = ''
html_code = df_head.to_html(escape=False, index=False) + df.transpose().to_html(escape=False)
return HTML(html_code)
def get_turbine_types(print_out=True, **kwargs):
r"""
Get the names of all possible wind turbine types for which the power
coefficient curve or power curve is provided in the data files in
the directory windpowerlib/data.
Parameters
----------
print_out : boolean
Directly prints the list of types if set to True. Default: True.
Examples
--------
>>> from windpowerlib import wind_turbine
>>> turbines = wind_turbine.get_turbine_types(print_out=False)
>>> print(turbines[turbines["turbine_id"].str.contains("ENERCON")].iloc[0])
turbine_id ENERCON E 101 3000
p_nom 3000000
Name: 25, dtype: object
"""
df = read_turbine_data(**kwargs)
if print_out:
pd.set_option('display.max_rows', len(df))
print(df[['turbine_id', 'p_nom']])
pd.reset_option('display.max_rows')
return df[['turbine_id', 'p_nom']]
def show_heat_map(self):
pd.set_option('precision', 2)
plt.figure(figsize=(20, 6))
sns.heatmap(self.data.corr(), square=True)
plt.xticks(rotation=90)
plt.yticks(rotation=360)
plt.suptitle("Correlation Heatmap")
plt.show()
def show_heat_map_to(self, target='sentiment'):
correlations = self.data.corr()[target].sort_values(ascending=False)
plt.figure(figsize=(40, 6))
correlations.drop(target).plot.bar()
pd.set_option('precision', 2)
plt.xticks(rotation=90, fontsize=7)
plt.yticks(rotation=360)
plt.suptitle('The Heatmap of Correlation With ' + target)
plt.show()
def print_table(table, name=None, fmt=None):
"""
Pretty print a pandas DataFrame.
Uses HTML output if running inside Jupyter Notebook, otherwise
formatted text output.
Parameters
----------
table : pandas.Series or pandas.DataFrame
Table to pretty-print.
name : str, optional
Table name to display in upper left corner.
fmt : str, optional
Formatter to use for displaying table elements.
E.g. '{0:.2f}%' for displaying 100 as '100.00%'.
Restores original setting after displaying.
"""
if isinstance(table, pd.Series):
table = pd.DataFrame(table)
if fmt is not None:
prev_option = pd.get_option('display.float_format')
pd.set_option('display.float_format', lambda x: fmt.format(x))
if name is not None:
table.columns.name = name
display(table)
if fmt is not None:
pd.set_option('display.float_format', prev_option)
def get_pretty_stats(stats, recorded_cols=None, num_rows=10):
"""
Format and print the last few rows of a statistics DataFrame.
See the pyfolio project for the data structure.
Parameters
----------
stats: list[Object]
An array of statistics for the period.
num_rows: int
The number of rows to display on the screen.
Returns
-------
str
"""
if isinstance(stats, pd.DataFrame):
stats = stats.T.to_dict().values()
df, columns = prepare_stats(stats, recorded_cols=recorded_cols)
pd.set_option('display.expand_frame_repr', False)
pd.set_option('precision', 8)
pd.set_option('display.width', 1000)
pd.set_option('display.max_colwidth', 1000)
formatters = {
'returns': lambda returns: "{0:.4f}".format(returns),
}
return df.tail(num_rows).to_string(
columns=columns,
formatters=formatters
)
def run(self):
sql_data = "select * FROM s_stock_runtime WHERE dateline =20160607 and s_code='sh600774' "
tmpdf = pandas.read_sql(sql_data, self.mysql.db)
pandas.set_option('display.width', 400)
res = {}
for i in range(len(tmpdf)):
item = tmpdf.iloc[i]
#inf = ''
if item.s_code not in res.keys():
res[item.s_code] = {'B': 0, 'S': 0}
if item.B_1_volume > 100000:
res[item.s_code]['B'] += 1
if item.B_2_volume > 100000:
res[item.s_code]['B'] += 1
if item.B_3_volume > 100000:
res[item.s_code]['B'] += 1
if item.B_4_volume > 100000:
res[item.s_code]['B'] += 1
if item.B_5_volume > 100000:
res[item.s_code]['B'] += 1
if item.S_1_volume > 100000:
res[item.s_code]['S'] += 1
if item.S_2_volume > 100000:
#print item
res[item.s_code]['S'] += 1
if item.S_3_volume > 100000:
res[item.s_code]['S'] += 1
if item.S_4_volume > 100000:
res[item.s_code]['S'] += 1
if item.S_5_volume > 100000:
res[item.s_code]['S'] += 1
print res
def make_state_page(df, conn, keyname='CA', bucketname='www.jobs.com'):
'''ingests a table to print do s3 website bucket'''
# fix issue with printing the entire dataframe
pd.set_option('display.max_colwidth', -1)
website_bucket = conn.get_bucket(bucketname)
html = df.to_html(
formatters=dict(
title=markdown
),
escape=False,
index=True
) +" postings last updated "+str(datetime.now().strftime("%Y-%m-%d %H:%M"))
html = '<!DOCTYPE html><HTML><head><link rel="stylesheet" href="http://s3.amazonaws.com/www.jobs.com/style.css"></head><body>{}</body></HTML>'.format(html.encode('utf8'))
send_to_s3(keyname=keyname, bucket=website_bucket, html=html)
return None
def test_headers(measure_type="Scoring"):
import pandas as pd
pd.set_option('display.max_columns', None)
nba_player = NBA_player("203382", "Baynes, Aron", "Aron Baynes")
nba_player.getPlayerStats(measure_type=measure_type)
df = pd.DataFrame(columns = nba_player.header)
df.loc[0] = nba_player.getPlayerStats(measure_type=measure_type)[0][1]
print(df)
return nba_player
def set_pandas_print_options():
# w, h = pd.util.terminal.get_terminal_size()
# set output options for regression tests on a wide terminal
pd.set_option('display.width', 100)
# reduce precision to avoid to sensitive tests because of roundings:
pd.set_option('display.precision', 6)
test_multi.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def test_format_sparse_config(self):
warn_filters = warnings.filters
warnings.filterwarnings('ignore', category=FutureWarning,
module=".*format")
# GH1538
pd.set_option('display.multi_sparse', False)
result = self.index.format()
self.assertEqual(result[1], 'foo two')
self.reset_display_options()
warnings.filters = warn_filters
test_indexing.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 29
收藏 0
点赞 0
评论 0
def test_underlying_data_conversion(self):
# GH 4080
df = DataFrame(dict((c, [1, 2, 3]) for c in ['a', 'b', 'c']))
df.set_index(['a', 'b', 'c'], inplace=True)
s = Series([1], index=[(2, 2, 2)])
df['val'] = 0
df
df['val'].update(s)
expected = DataFrame(
dict(a=[1, 2, 3], b=[1, 2, 3], c=[1, 2, 3], val=[0, 1, 0]))
expected.set_index(['a', 'b', 'c'], inplace=True)
tm.assert_frame_equal(df, expected)
# GH 3970
# these are chained assignments as well
pd.set_option('chained_assignment', None)
df = DataFrame({"aa": range(5), "bb": [2.2] * 5})
df["cc"] = 0.0
ck = [True] * len(df)
df["bb"].iloc[0] = .13
# TODO: unused
df_tmp = df.iloc[ck] # noqa
df["bb"].iloc[0] = .15
self.assertEqual(df['bb'].iloc[0], 0.15)
pd.set_option('chained_assignment', 'raise')
# GH 3217
df = DataFrame(dict(a=[1, 3], b=[np.nan, 2]))
df['c'] = np.nan
df['c'].update(pd.Series(['foo'], index=[0]))
expected = DataFrame(dict(a=[1, 3], b=[np.nan, 2], c=['foo', np.nan]))
tm.assert_frame_equal(df, expected)
test_pytables.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 32
收藏 0
点赞 0
评论 0
def test_api_default_format(self):
# default_format option
with ensure_clean_store(self.path) as store:
df = tm.makeDataFrame()
pandas.set_option('io.hdf.default_format', 'fixed')
_maybe_remove(store, 'df')
store.put('df', df)
self.assertFalse(store.get_storer('df').is_table)
self.assertRaises(ValueError, store.append, 'df2', df)
pandas.set_option('io.hdf.default_format', 'table')
_maybe_remove(store, 'df')
store.put('df', df)
self.assertTrue(store.get_storer('df').is_table)
_maybe_remove(store, 'df2')
store.append('df2', df)
self.assertTrue(store.get_storer('df').is_table)
pandas.set_option('io.hdf.default_format', None)
with ensure_clean_path(self.path) as path:
df = tm.makeDataFrame()
pandas.set_option('io.hdf.default_format', 'fixed')
df.to_hdf(path, 'df')
with get_store(path) as store:
self.assertFalse(store.get_storer('df').is_table)
self.assertRaises(ValueError, df.to_hdf, path, 'df2', append=True)
pandas.set_option('io.hdf.default_format', 'table')
df.to_hdf(path, 'df3')
with HDFStore(path) as store:
self.assertTrue(store.get_storer('df3').is_table)
df.to_hdf(path, 'df4', append=True)
with HDFStore(path) as store:
self.assertTrue(store.get_storer('df4').is_table)
pandas.set_option('io.hdf.default_format', None)
testing.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 29
收藏 0
点赞 0
评论 0
def setUpClass(cls):
pd.set_option('chained_assignment', 'raise')
def __show_results_in_table(self):
"""Show results in pandas DataFrame format."""
df = super(ViewPresenter, self)._prepare_dataframe()
if df is not None:
# Better visualization in command line
pd.set_option('expand_frame_repr', False)
pd.set_option('display.max_columns', 999)
display(df)
def print_full(x):
"""Print all rows in Pandas DataFrame x."""
pd.set_option('display.max_rows', len(x))
print(x)
pd.reset_option('display.max_rows')
def update_html(df, metadb_timestamp):
pd.set_option('display.max_colwidth', -1)
table_html = df.to_html(formatters={
"doi": format_doi, "gse": format_gse}, escape=False, index=False, justify="left", classes="table table-striped table-bordered")
html_template_str = unicode(open("output_template.html").read())
n_overdue = df.shape[0]
final_html = html_template_str.format(date_updated=datetime.date.today(), metageo_timestamp=metadb_timestamp,
n_overdue=n_overdue, table_html=table_html, tracking_script=tracking_script)
with open("docs/index.html", "w") as f:
f.write(final_html.encode("utf-8"))
def update_html(df, sradb_timestamp):
pd.set_option('display.max_colwidth', -1)
table_html = df.to_html(formatters={
"doi": format_doi, "srx": format_srx}, escape=False, index=False, justify="left", classes="table table-striped table-bordered")
html_template_str = unicode(open("sra_template.html").read())
n_overdue = df.shape[0]
final_html = html_template_str.format(date_updated=datetime.date.today(), sradb_timestamp=sradb_timestamp,
n_overdue=n_overdue, table_html=table_html, tracking_script=tracking_script)
with open("docs/sra.html", "w") as f:
f.write(final_html.encode("utf-8"))
def format_pandas(opts=pandas_options):
try:
import pandas as pd
for key, val in opts.items():
pd.set_option(key, val)
except ImportError:
return
def print_full(df):
'''
print all rows of pd.DataFrame
'''
pd.set_option('display.max_rows', len(df))
print('\n')
print(df)
pd.reset_option('display.max_rows')
# TODO:
def printSeries(series, label, header='', asStr=False):
"""
Print a `series` of values, with a give `label`.
:param series: (convertible to pandas Series) the values
:param label: (str) a label to print for the data
:return: none
"""
import pandas as pd
if type(series) == pd.DataFrame:
df = series
df = df.T
else:
df = pd.DataFrame(pd.Series(series)) # DF is more convenient for printing
df.columns = [label]
oldPrecision = pd.get_option('precision')
pd.set_option('precision', 5)
s = "%s\n%s" % (header, df.T)
pd.set_option('precision', oldPrecision)
if asStr:
return s
else:
print(s)
def _predict(args, cell):
schema, features = _local_predict.get_model_schema_and_features(args['model'])
headers = [x['name'] for x in schema]
img_cols = []
for k, v in six.iteritems(features):
if v['transform'] in ['image_to_vec']:
img_cols.append(v['source_column'])
data = args['data']
df = _local_predict.get_prediction_results(
args['model'], data, headers, img_cols=img_cols, cloud=False,
show_image=not args['no_show_image'])
def _show_img(img_bytes):
return '<img src="data:image/png;base64,' + img_bytes + '" />'
def _truncate_text(text):
return (text[:37] + '...') if isinstance(text, six.string_types) and len(text) > 40 else text
# Truncate text explicitly here because we will set display.max_colwidth to -1.
# This applies to images to but images will be overriden with "_show_img()" later.
formatters = {x: _truncate_text for x in df.columns if df[x].dtype == np.object}
if not args['no_show_image'] and img_cols:
formatters.update({x + '_image': _show_img for x in img_cols})
# Set display.max_colwidth to -1 so we can display images.
old_width = pd.get_option('display.max_colwidth')
pd.set_option('display.max_colwidth', -1)
try:
IPython.display.display(IPython.display.HTML(
df.to_html(formatters=formatters, escape=False, index=False)))
finally:
pd.set_option('display.max_colwidth', old_width)