def print_stats(data):
data = np.array(data)
desc = stats.describe(data)
print('# of observations:', desc.nobs)
print('min: %d\nmax: %d' % desc.minmax)
print('mean: %.1f' % desc.mean)
# print('variance: %.1f' % desc.variance)
print('stdev: %.1f' % math.sqrt(desc.variance))
print('percentiles')
for p in PERCENTILES:
print('%6.2f' % p, ' ', end='')
print()
for p in stats.scoreatpercentile(data, PERCENTILES):
print('%6d' % p, ' ', end='')
print()
python类describe()的实例源码
def eliminate_incorrectly_segmented(scans, masks):
skip = dim // 2 # To Change see below ...
sxm = scans * masks
near_air_thresh = (-900 - MIN_BOUND) / (MAX_BOUND - MIN_BOUND) - PIXEL_MEAN # version 3 # -750 gives one more (for 0_3, d4, -600 give 15 more than -900
near_air_thresh #0.08628 for -840 # 0.067 # for -867; 0.1148 for -800
cnt = 0
for i in range(sxm.shape[0]):
#sx = sxm[i,skip]
sx = sxm[i]
mx = masks[i]
if np.sum(mx) > 0: # only check non-blanks ...(keep blanks)
sx_max = np.max(sx)
if (sx_max) <= near_air_thresh:
cnt += 1
print ("Entry, count # and max: ", i, cnt, sx_max)
print (stats.describe(sx, axis=None))
#plt.imshow(sx, cmap='gray')
plt.imshow(sx[0,skip], cmap='gray') # selecting the mid entry
plt.show()
s_eliminate = np.max(sxm, axis=(1,2,3,4)) <= near_air_thresh # 3d
s_preserve = np.max(sxm, axis=(1,2,3,4)) > near_air_thresh #3d
s_eliminate_sum = sum(s_eliminate)
s_preserve_sum = sum(s_preserve)
print ("Eliminate, preserve =", s_eliminate_sum, s_preserve_sum)
masks = masks[s_preserve]
scans = scans[s_preserve]
del(sxm)
return scans, masks
# the following 3 functions to read LUNA files are from: https://www.kaggle.com/arnavkj95/data-science-bowl-2017/candidate-generation-and-luna16-preprocessing/notebook
def eliminate_incorrectly_segmented(scans, masks):
skip = dim // 2 # To Change see below ...
sxm = scans * masks
near_air_thresh = (-900 - MIN_BOUND) / (MAX_BOUND - MIN_BOUND) - PIXEL_MEAN # version 3 # -750 gives one more (for 0_3, d4, -600 give 15 more than -900
#near_air_thresh #0.08628 for -840 # 0.067 # for -867; 0.1148 for -800
cnt = 0
for i in range(sxm.shape[0]):
#sx = sxm[i,skip]
sx = sxm[i]
mx = masks[i]
if np.sum(mx) > 0: # only check non-blanks ...(keep blanks)
sx_max = np.max(sx)
if (sx_max) <= near_air_thresh:
cnt += 1
print ("Entry, count # and max: ", i, cnt, sx_max)
print (stats.describe(sx, axis=None))
#plt.imshow(sx, cmap='gray')
plt.imshow(sx[0,skip], cmap='gray') # selecting the mid entry
plt.show()
s_eliminate = np.max(sxm, axis=(1,2,3,4)) <= near_air_thresh # 3d
s_preserve = np.max(sxm, axis=(1,2,3,4)) > near_air_thresh #3d
s_eliminate_sum = sum(s_eliminate)
s_preserve_sum = sum(s_preserve)
print ("Eliminate, preserve =", s_eliminate_sum, s_preserve_sum)
masks = masks[s_preserve]
scans = scans[s_preserve]
del(sxm)
return scans, masks
def scipy_describe(x, **kwargs):
print('Start scipy_describe')
band_arr = getattr(x, TEMP_BAND)
cols = ('var', 'skew', 'kurt', 'min', 'max', 'median', 'std', 'np_skew')
inter = tuple(combinations(range(len(cols)), 2))
cols = cols + tuple((cols[i], cols[j]) for i, j in inter)
num_cols = len(cols)
num_rows = np.prod(band_arr.shape[1:])
new_arr = np.empty((num_rows, num_cols))
for row, (i, j) in enumerate(product(*(range(s) for s in band_arr.values.shape[1:]))):
values = band_arr.values[:, i, j]
d = describe(values)
t = (d.variance, d.skewness, d.kurtosis, d.minmax[0], d.minmax[1])
median = np.median(values)
std = np.std(values)
non_param_skew = (d.mean - median) / std
r = t + (median, std, non_param_skew)
interact = tuple(r[i] * r[j] for i, j in inter)
new_arr[row, :] = r + interact
attrs = copy.deepcopy(x.attrs)
attrs.update(kwargs)
da = xr.DataArray(new_arr,
coords=[('space', np.arange(num_rows)),
('band', np.arange(num_cols))],
dims=('space', 'band'),
attrs=attrs)
return ElmStore({'flat': da}, attrs=attrs, add_canvas=False)
def _describe_and_check(txt, xs, ss):
d = stats.describe(xs)
print(txt)
print('Mean: {}'.format(d.mean))
print('Var : {}'.format(d.variance))
print('Skew: {}'.format(d.skewness))
print('Kurt: {}'.format(d.kurtosis))
assert_allclose([d.mean, d.variance, d.skewness, d.kurtosis],
ss, rtol=5e-2, atol=5e-2)
def run_profiling(self, num_loops, num_neighbors, age_proximity):
"""Executes the k_nearest_neighbors algorithm for num_loops times and returns the average running time
Args:
num_loops: number of loops for which we query the server
num_neighbors: number of neighbors to query for
age_proximity: maximum difference between a candidate neighbor's age and the user
Returns:
"""
print('profiling over ', num_loops, ' times')
random_latitudes = random.uniform(-90, 90, num_loops)
random_longitudes = random.uniform(-180, 180, num_loops)
time_list = []
for i in tqdm(range(len(random_latitudes))):
start_time = time.clock()
kd_store.k_nearest_neighbors({'name': 'bla bla', 'age': 23, 'latitude': random_latitudes[i] / 2,
'longitude': random_longitudes[i]}, num_neighbors, age_proximity)
end_time = time.clock()
time_list.append(end_time - start_time)
# get the timing statistics
stats_desc = stats.describe(time_list)
frac_times_exceeded = len(np.where(np.array(time_list) >= 1)[0]) / len(time_list)
print('\nfraction of times with delay > 1 is: ', frac_times_exceeded, '\n')
print('\nStats:\n', stats_desc)
return stats_desc
def _describe(idxes, values):
d = scipy_describe(values)
t = (d.variance, d.skewness, d.kurtosis, d.minmax[0], d.minmax[1])
median = np.median(values)
std = np.std(values)
non_param_skew = (d.mean - median) / std
r = t + (median, std, non_param_skew)
return np.array(r)[idxes]
def getConfidenceInterval(data, percent=0.95, distribution="t"):
n, min_max, mean, var, skew, kurt = stats.describe(data)
std = np.sqrt(var)
if distribution == "t":
R = stats.t.interval(percent, len(data) - 1, loc=mean,
scale=std / math.sqrt(len(data)))
else:
R = stats.norm.interval(
percent, loc=mean, scale=std / math.sqrt(len(data)))
error = (R[1] - R[0]) / 2
return mean, error
PortfolioOptimizationOfIndexFunds.py 文件源码
项目:Test-stock-prediction-algorithms
作者: timestocome
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def print_statistics(array):
sta = scs.describe(array)
print("%14s %15s" % ('statistic', 'value'))
print(30 * '-')
print("%14s %15.5f" % ('size', sta[0]))
print("%14s %15.5f" % ('min', sta[1][0]))
print("%14s %15.5f" % ('max', sta[1][1]))
print("%14s %15.5f" % ('mean', sta[2] ))
print("%14s %15.5f" % ('std', np.sqrt(sta[3])))
print("%14s %15.5f" % ('skew', sta[4]))
print("%14s %15.5f" % ('kutosis', sta[5]))
def hu_describe(data, uid, part=""):
if len(data) == 0:
nanid = -7777
d = { "vol_%s" % part: nanid,
"min_%s" % part: nanid,
"max_%s" % part: nanid,
"mean_%s" % part: nanid,
"variance_%s" % part: nanid,
"skewness_%s" % part: nanid,
"kurtosis_%s" % part: nanid
}
else:
desc = stats.describe(data, axis=None, nan_policy='omit') #default policy is 'propagate'
#names = ["nobs", "min", "max", "mean", "variance", "skewness", "kurtosis"]
d = { "vol_%s" % part: desc.nobs,
"min_%s" % part: desc.minmax[0],
"max_%s" % part: desc.minmax[1],
"mean_%s" % part: desc.mean,
"variance_%s" % part: desc.variance,
"skewness_%s" % part: desc.skewness,
"kurtosis_%s" % part: desc.kurtosis
}
#columns = ["id", "n_volume_%s" % part, "hu_min_%s" % part, "hu_nmax_%s" % part, "hu_mean_%s" % part, "hu_variance_%s" % part,"hu_skewness_%s" % part, "hu_kurtosis_%s" % part]
#d = [uid, desc.nobs, desc.minmax[0], desc.minmax[1], desc.mean, desc.variance, desc.skewness, desc.kurtosis]
#columns = sorted(d.keys())
df = pd.DataFrame(d, index=[uid])
#df = pd.DataFrame.from_records(d, columns=columns, index=["id"])
#df.reset_index(level=0, inplace=True)
#df.sort_index(axis=1)
#df.index.name = "id"
#df = pd.DataFrame.from_dict(d, orient='index')
return df
def run_all_tests(args=None):
global STATUS_COUNTER
env = parse_env_vars()
if args is None:
parser = build_cli_parser()
args = parser.parse_args()
args.config_dir = None
if not args.dask_scheduler:
args.dask_scheduler = env.get('DASK_SCHEDULER', '10.0.0.10:8786')
if not args.dask_clients or 'ALL' in args.dask_clients:
args.dask_clients = [c for c in DASK_CLIENTS if c != 'ALL']
logger.info('Running run_all_tests with args: {}'.format(args))
assert os.path.exists(args.repo_dir)
for client in args.dask_clients:
eedp = os.path.join(args.elm_examples_path, 'example_data')
if not os.path.exists(eedp):
eedp = os.environ.get('ELM_EXAMPLE_DATA_PATH')
new_env = {'DASK_SCHEDULER': args.dask_scheduler or '',
'DASK_CLIENT': client,
'ELM_EXAMPLE_DATA_PATH': eedp}
if not args.skip_pytest:
run_all_unit_tests(args.repo_dir, new_env,
pytest_mark=args.pytest_mark)
if not args.skip_scripts:
run_all_example_scripts(new_env, path=os.path.join(args.elm_examples_path, 'scripts'),
glob_pattern=args.glob_pattern)
if not args.skip_configs:
run_all_example_configs(new_env, path=os.path.join(args.elm_examples_path, 'configs'),
large_test_mode=args.add_large_test_settings,
glob_pattern=args.glob_pattern)
failed_unit_tests = STATUS_COUNTER.get('unit_tests') != 'ok' and not args.skip_pytest
if STATUS_COUNTER.get('fail') or failed_unit_tests:
raise ValueError('Tests failed {}'.format(STATUS_COUNTER))
print('ETIMES', ETIMES)
speed_up_fracs = {k: [] for k in args.dask_clients if k != 'SERIAL'}
for fname in ETIMES:
if fname == 'unit_tests':
continue
if ETIMES[fname].get("SERIAL"):
base = ETIMES[fname]['SERIAL']
for k, v in ETIMES[fname].items():
if k == 'SERIAL':
continue
speed_up_fracs[k].append( (base - v) / base)
speed_up_fracs_summary = {k: describe(np.array(v))
for k, v in speed_up_fracs.items()}
print('speed_up_fracs {}'.format(speed_up_fracs))
print('Speed up summary {}'.format(speed_up_fracs_summary))
print('STATUS', STATUS_COUNTER)
def xgboost_cv():
# ????????
train_start_date = '2016-02-15'
train_end_date = '2016-03-15'
# ?????????????????????
test_start_date = '2016-03-16'
test_end_date = '2016-03-20'
# ??????????????
# ??
sub_start_date = '2016-03-21'
sub_end_date = '2016-04-02'
# ??
sub_test_start_date = '2016-04-03'
sub_test_end_date = '2016-04-08'
user_index, training_data, label = make_train_set(train_start_date, train_end_date, test_start_date, test_end_date)
# ???? ???????
X_train, X_test, y_train, y_test = train_test_split(training_data, label, test_size=0.2, random_state=0)
dtrain=xgb.DMatrix(X_train.values, label=y_train)
dtest=xgb.DMatrix(X_test.values, label=y_test)
param = {'max_depth': 10, 'eta': 0.05, 'silent': 1, 'objective': 'binary:logistic'}
num_round = 166
param['nthread'] = 5
param['eval_metric'] = "auc"
plst = param.items()
evallist = [(dtest, 'eval'), (dtrain, 'train')]
bst=xgb.train(plst, dtrain, num_round, evallist)
sub_user_index, sub_trainning_data, sub_label = make_train_set(sub_start_date, sub_end_date,
sub_test_start_date, sub_test_end_date)
sub_trainning_data = xgb.DMatrix(sub_trainning_data.values)
y = bst.predict(sub_trainning_data)
y_mean = stats.describe(y).mean
# plt.hist(y)
# plt.show()
pred = sub_user_index.copy()
y_true = sub_user_index.copy()
pred['label'] = y
y_true['label'] = label
pred = pred[pred['label'] >= 0.04]
y_true = y_true[y_true['label'] == 1]
report(pred, y_true)