def regression(nx, ny):
"""
Parameters
==========
specturm : pd.series
Pandas Series object
nodes : list
of nodes to be used for the continuum
Returns
=======
corrected : array
Continuum corrected array
continuum : array
The continuum used to correct the data
x : array
The potentially truncated x values
"""
m, b, r_value, p_value, stderr = ss.linregress(nx, ny)
c = m * nx + b
return c
python类linregress()的实例源码
def test_alpha(self, returns, benchmark, expected):
observed = self.empyrical.alpha(returns, benchmark)
assert_almost_equal(
observed,
expected,
DECIMAL_PLACES)
if len(returns) == len(benchmark):
# Compare to scipy linregress
returns_arr = returns.values
benchmark_arr = benchmark.values
mask = ~np.isnan(returns_arr) & ~np.isnan(benchmark_arr)
slope, intercept, _, _, _ = stats.linregress(benchmark_arr[mask],
returns_arr[mask])
assert_almost_equal(
observed,
intercept * 252,
DECIMAL_PLACES
)
# Alpha/beta translation tests.
def test_beta(self, returns, benchmark, expected):
observed = self.empyrical.beta(returns, benchmark)
assert_almost_equal(
observed,
expected,
DECIMAL_PLACES)
if len(returns) == len(benchmark):
# Compare to scipy linregress
returns_arr = returns.values
benchmark_arr = benchmark.values
mask = ~np.isnan(returns_arr) & ~np.isnan(benchmark_arr)
slope, intercept, _, _, _ = stats.linregress(benchmark_arr[mask],
returns_arr[mask])
assert_almost_equal(
observed,
slope
)
def Regress(self):
if len(self._independent_data) == 0:
print('Loaded no data for contest "%s" party "%s"' % (self._independent, self._party))
return
if len(self._dependent_data) == 0:
print('Loaded no data for contest "%s" party "%s"' % (self._dependent, self._party))
return
y = []
x = []
for precinct,votes in self._independent_data.iteritems():
if precinct not in self._dependent_data:
continue
x.append(votes)
y.append(self._dependent_data[precinct])
if not x or len(x) != len(y):
print('Mismatched or empty data')
return
slope, intercept, r_value, p_value, std_err = stats.linregress(x,y)
print('Slope=%6.4f Intercept=%6.4f R^2=%.4f p=%.6f' % (slope, intercept,
r_value**2, p_value))
self._PrintResiduals(slope, intercept)
def correlation_plot(x, y,
save_path,
title,
xlabel, ylabel):
plt.scatter(x, y)
slope, intercept, r_value, p_value, std_err = stats.linregress(x, y)
line_x = np.arange(x.min(), x.max())
line_y = slope*line_x + intercept
plt.plot(line_x, line_y,
label='$%.2fx + %.2f$, $R^2=%.2f$' % (slope, intercept, r_value**2))
plt.legend(loc='best')
plt.title(title)
plt.xlabel(xlabel)
plt.ylabel(ylabel)
plt.tight_layout()
plt.savefig(save_path)
plt.clf() # clear figure
plt.close()
def __get_tail_se(self):
""" Method to produce the standard error of the Mack Chainladder
model tail factor
Returns:
This calculation is consistent with the R calculation
MackChainLadder$tail.se
"""
tailse = np.array(self.sigma[-2] / \
np.sqrt(self.full_triangle.iloc[0, -3]**self.alpha[-1]))
if self.chainladder.tail == True:
time_pd = self.__get_tail_weighted_time_period()
fse = np.append(self.fse, tailse)
x = np.array([i + 1 for i in range(len(fse))])
fse_reg = stats.linregress(x, np.log(fse))
tailse = np.append(tailse, np.exp(time_pd * fse_reg[0] + fse_reg[1]))
else:
tailse = np.append(tailse,0)
return tailse
def __get_tail_weighted_time_period(self):
""" Method to approximate the weighted-average development age assuming
exponential tail fit.
Returns: float32
"""
#n = self.triangle.ncol-1
#y = self.f[:n]
#x = np.array([i + 1 for i in range(len(y))])
#ldf_reg = stats.linregress(x, np.log(y - 1))
#time_pd = (np.log(self.f[n] - 1) - ldf_reg[1]) / ldf_reg[0]
n = self.triangle.ncol-1
y = Series(self.f[:n])
x = [num+1 for num, item in enumerate(y)]
y.index = x
x = sm.add_constant((y.index)[y>1])
y = y[y>1]
ldf_reg = sm.OLS(np.log(y-1),x).fit()
time_pd = (np.log(self.f[n] - 1) - ldf_reg.params[0]) / ldf_reg.params[1]
#tail_factor = np.exp(tail_model.params[0] + np.array([i+2 for i in range(n,n+100)]) * tail_model.params[1]).astype(float) + 1)
return time_pd
def getModelDiffs(self, dependentContainer, playoffTeamsOnly=False):
## so generate our model here based on the data provided in each
## stat container, then output a new statContainer object with
## the values of the model diffs
print "Consistency check between %s and %s containers is a %r" % (self.getShortStatName(), dependentContainer.getShortStatName(), consistencyCheck(self, dependentContainer))
x_values = self.getStat(playoffTeamsOnly)
y_values = dependentContainer.getStat(playoffTeamsOnly)
gradient, intercept, r_value, p_value, std_err = stats.linregress( x_values, y_values)
modelDiffs = []
def modelValue(x, gradient, intercept):
return (x*gradient + intercept)
for i in range(0, len(x_values)):
modelDiffs.append(y_values[i] - modelValue(x_values[i], gradient, intercept))
return statContainer("%sby%s Model Diffs" % (dependentContainer.getShortStatName(), self.getShortStatName()), "Deltas from the Model for %s by %s, gradient=%.3f, intercept=%.3f" % (dependentContainer.getLongStatName, self.getShortStatName(), gradient, intercept), modelDiffs, self.getTeamIds(playoffTeamsOnly), self.getTeamNames(playoffTeamsOnly), self.getYears(playoffTeamsOnly), self.getMadePlayoffsList(playoffTeamsOnly))
def corrPlotV(dv_true,dv_pred,sv_true,sv_pred,teamName,outName):
slope, intercept, rvalue, pvalue, std_err = stats.linregress(np.append(dv_true,sv_true),np.append(dv_pred,sv_pred))
plt.scatter(dv_true,dv_pred,label='Diastolic Volume',marker='o',facecolors='none',edgecolors='r')
plt.scatter(sv_true,sv_pred,label='Systolic Volume',marker='o',facecolors='none',edgecolors='b')
plt.xlabel("True Volume (mL)")
plt.ylabel("Predicted Volume (mL)")
plt.title("%s\nCorrelation of Volume Predictions with Test Values" % teamName)
x = np.linspace(0,500,10)
plt.plot(x,x,color='k',label='guide y = x')
plt.plot(x,x*slope+intercept,color = 'k',linestyle='--',label='y=%.2fx+%.2f\n$R^2=$%.3f p=%.2e' % (slope,intercept,rvalue**2,pvalue))
plt.gca().set_xlim((0,500))
plt.gca().set_ylim((0,500))
plt.legend(loc='upper left')
plt.grid()
plt.savefig("%sCorrVols.png" % outName)
plt.close()
#plotting prediction vs truth... EF
def corrPlotEF(true,pred,teamName,outName):
slope, intercept, rvalue, pvalue, std_err = stats.linregress(true*100.0,pred*100.0)
plt.scatter(true*100.0,pred*100.0,marker='x',color='#990099',label="Ejection Fraction")
x = np.linspace(0,90,10)
plt.plot(x,x,color='k',label='guide y = x')
plt.plot(x,x*slope+intercept,color = 'k',linestyle='--',label='y=%.2fx+%.2f\n$R^2=$%.3f p=%.2e' % (slope,intercept,rvalue**2,pvalue))
plt.gca().set_xlim((0,90))
plt.gca().set_ylim((0,90))
plt.xlabel("True Ejection Fraction (%)")
plt.ylabel("Predicted Ejection Fraction (%)")
plt.title("%s\nCorrelation of EF Predictions with Test Values" % teamName)
plt.legend(loc='upper left')
plt.grid()
plt.savefig("%sCorrEF.png" % outName)
plt.close()
##################
#
# Bland - Altman plots
#
#plotting Bland-Altman for dv and sv
representation.py 文件源码
项目:sport_movements_analysis
作者: guillaumeAssogba
项目源码
文件源码
阅读 32
收藏 0
点赞 0
评论 0
def plot2dRegression(x,y, nameX, nameY, namePlot):
model = LinearRegression()
linearModel = model.fit(x, y)
predictModel = linearModel.predict(x)
plt.scatter(x,y, color='g')
plt.plot(x, predictModel, color='k')
plt.xlabel(nameX)
plt.ylabel(nameY)
test = stats.linregress(predictModel,y)
print("The squared of the correlation coefficient R^2 is " + str(test.rvalue**2))
plt.savefig("plot/loadings/"+namePlot, bbox_inches='tight')
plt.show()
return test.rvalue**2
#plot the 2D regression between the performance values and the loadings.
#return the correlation factor: R squared
def plot_corr(param_1, param_2, title="", x_label="", y_label="", legend=[]):
"""
It creates a graph with all the time series of the list parameters.
:param title: Title of the graph.
:param x_label: X label of the graph.
:param y_label: Y label of the graph.
:param legend: Labels to show in the legend.
:param param_1: Parameter to correlate.
:param param_2: Parameter to correlate.
:return: The graph.
"""
slope, intercept, r_value, p_value, std_err = stats.linregress(param_1.values, param_2.values)
fig_correlation, axes = plt.subplots(nrows=1, ncols=1)
axes.plot(param_1.values, param_2.values, marker='.', linestyle="")
axes.plot(param_1.values, param_1.values * slope + intercept)
axes.set_title(title)
axes.set_xlabel(x_label)
axes.set_ylabel(y_label)
legend.append("$y = {:.2f}x+{:.2f}$ $r^2={:.2f}$".format(slope, intercept, r_value ** 2))
axes.legend(legend, loc='best')
return fig_correlation
def test_alpha_beta_equality(self, returns, benchmark):
alpha, beta = self.empyrical(
pandas_only=len(returns) != len(benchmark),
return_types=tuple,
).alpha_beta(returns, benchmark)
assert_almost_equal(
alpha,
self.empyrical.alpha(returns, benchmark),
DECIMAL_PLACES)
assert_almost_equal(
beta,
self.empyrical.beta(returns, benchmark),
DECIMAL_PLACES)
if len(returns) == len(benchmark):
# Compare to scipy linregress
returns_arr = returns.values
benchmark_arr = benchmark.values
mask = ~np.isnan(returns_arr) & ~np.isnan(benchmark_arr)
slope, intercept, _, _, _ = stats.linregress(returns_arr[mask],
benchmark_arr[mask])
assert_almost_equal(
alpha,
intercept
)
assert_almost_equal(
beta,
slope
)
def stability_of_timeseries(returns):
"""Determines R-squared of a linear fit to the cumulative
log returns. Computes an ordinary least squares linear fit,
and returns R-squared.
Parameters
----------
returns : pd.Series or np.ndarray
Daily returns of the strategy, noncumulative.
- See full explanation in :func:`~empyrical.stats.cum_returns`.
Returns
-------
float
R-squared.
"""
if len(returns) < 2:
return np.nan
returns = np.asanyarray(returns)
returns = returns[~np.isnan(returns)]
cum_log_returns = np.log1p(returns).cumsum()
rhat = stats.linregress(np.arange(len(cum_log_returns)),
cum_log_returns)[2]
return rhat ** 2
def calcFashionEvoFunc(pNow):
'''
Calculates a new approximate dynamic rule for the evolution of the proportion
of punks as a linear function and a "shock width".
Parameters
----------
pNow : [float]
List describing the history of the proportion of punks in the population.
Returns
-------
(unnamed) : FashionEvoFunc
A new rule for the evolution of the population punk proportion, based on
the history in input pNow.
'''
pNowX = np.array(pNow)
T = pNowX.size
p_t = pNowX[100:(T-1)]
p_tp1 = pNowX[101:T]
pNextSlope, pNextIntercept, trash1, trash2, trash3 = stats.linregress(p_t,p_tp1)
pPopExp = pNextIntercept + pNextSlope*p_t
pPopErrSq= (pPopExp - p_tp1)**2
pNextStd = np.sqrt(np.mean(pPopErrSq))
print(str(pNextIntercept) + ', ' + str(pNextSlope) + ', ' + str(pNextStd))
return FashionEvoFunc(pNextIntercept,pNextSlope,2*pNextStd)
###############################################################################
###############################################################################
def linear_regression(x_vals, y_vals):
'''
least-squares regression of scipy
'''
a_value, b_value, r_value, p_value, std_err = linregress(x_vals, y_vals)
est_yvals = a_value * x_vals + b_value
k = 1 / a_value
plot.plot(x_vals, est_yvals, label='Least-squares fit, k = ' +
str(round(k)) + ", RSquare = " + str(r_value**2))
def model_test(model, X_test, y_test,outputfile):
#net.load_params_from('/path/to/weights_file')
y_pred = model.predict(X_test)
print stats.linregress(y_test,y_pred[:,0])
hkl.dump([y_pred[:,0],y_test],outputfile)
#save model parameters
def linregress_ting(bst, tuningcurve, n_shuffles=250):
"""perform linear regression on all the events in bst, and return the R^2 values"""
if float(n_shuffles).is_integer:
n_shuffles = int(n_shuffles)
else:
raise ValueError("n_shuffles must be an integer!")
posterior, bdries, mode_pth, mean_pth = decode(bst=bst, ratemap=tuningcurve)
# bdries = np.insert(np.cumsum(bst.lengths), 0, 0)
r2values = np.zeros(bst.n_epochs)
r2values_shuffled = np.zeros((n_shuffles, bst.n_epochs))
for idx in range(bst.n_epochs):
y = mode_pth[bdries[idx]:bdries[idx+1]]
x = np.arange(bdries[idx],bdries[idx+1], step=1)
x = x[~np.isnan(y)]
y = y[~np.isnan(y)]
if len(y) > 0:
slope, intercept, rvalue, pvalue, stderr = stats.linregress(x, y)
r2values[idx] = rvalue**2
else:
r2values[idx] = np.nan #
for ss in range(n_shuffles):
if len(y) > 0:
slope, intercept, rvalue, pvalue, stderr = stats.linregress(np.random.permutation(x), y)
r2values_shuffled[ss, idx] = rvalue**2
else:
r2values_shuffled[ss, idx] = np.nan # event contained NO decoded activity... unlikely or even impossible with current code
# sig_idx = np.argwhere(r2values[0,:] > np.percentile(r2values, q=q, axis=0))
# np.argwhere(((R2[1:,:] >= R2[0,:]).sum(axis=0))/(R2.shape[0]-1)<0.05) # equivalent to above
if n_shuffles > 0:
return r2values, r2values_shuffled
return r2values
def linregress_array(posterior):
"""perform linear regression on the posterior matrix, and return the slope, intercept, and R^2 value"""
mode_pth = get_mode_pth_from_array(posterior)
y = mode_pth
x = np.arange(len(y))
x = x[~np.isnan(y)]
y = y[~np.isnan(y)]
if len(y) > 0:
slope, intercept, rvalue, pvalue, stderr = stats.linregress(x, y)
return slope, intercept, rvalue**2
else:
return np.nan, np.nan, np.nan
def linregress_bst(bst, tuningcurve):
"""perform linear regression on all the events in bst, and return the slopes, intercepts, and R^2 values"""
posterior, bdries, mode_pth, mean_pth = decode(bst=bst, ratemap=tuningcurve)
slopes = np.zeros(bst.n_epochs)
intercepts = np.zeros(bst.n_epochs)
r2values = np.zeros(bst.n_epochs)
for idx in range(bst.n_epochs):
y = mode_pth[bdries[idx]:bdries[idx+1]]
x = np.arange(bdries[idx],bdries[idx+1], step=1)
x = x[~np.isnan(y)]
y = y[~np.isnan(y)]
if len(y) > 0:
slope, intercept, rvalue, pvalue, stderr = stats.linregress(x, y)
slopes[idx] = slope
intercepts[idx] = intercept
r2values[idx] = rvalue**2
else:
slopes[idx] = np.nan
intercepts[idx] = np.nan
r2values[idx] = np.nan #
# if bst.n_epochs == 1:
# return np.asscalar(slopes), np.asscalar(intercepts), np.asscalar(r2values)
return slopes, intercepts, r2values
def __get_tail_sigma(self):
""" Method to produce the sigma of the Mack Chainladder
model tail factor
Returns:
This calculation is consistent with the R calculation
MackChainLadder$tail.sigma
"""
y = np.log(self.sigma[:len(self.triangle.data.columns[:-2])])
x = np.array([i + 1 for i in range(len(y))])
model = stats.linregress(x, y)
tailsigma = np.exp((x[-1] + 1) * model[0] + model[1])
if model[3] > 0.05: # p-vale of slope parameter
y = self.sigma
tailsigma = np.sqrt(
abs(min((y[-1]**4 / y[-2]**2), min(y[-2]**2, y[-1]**2))))
if self.chainladder.tail == True:
time_pd = self.__get_tail_weighted_time_period()
y = np.log(np.append(self.sigma,tailsigma))
x = np.array([i + 1 for i in range(len(y))])
sigma_reg = stats.linregress(x, y)
tailsigma = np.append(tailsigma, np.exp(time_pd * sigma_reg[0] + sigma_reg[1]))
else:
tailsigma = np.append(tailsigma,0)
return tailsigma
def get_stats(twoDarray):
print(np.mean(twoDarray[0]))
print(np.mean(twoDarray[1]))
print(np.var(twoDarray[0]))
print(np.var(twoDarray[1]))
print(np.corrcoef(twoDarray[0], twoDarray[1]))
print(stats.linregress(twoDarray[0], twoDarray[1]))
def wilson(data):
'''
Calculate useful things like mass ratio and systemic velocity.
Parameters
----------
data : list
Radial velocity pairs in a 2D list.
Returns
-------
-slope : float
Mass Ratio of the system. The ratio of the secondary
component mass to the primary.
intercept : float
y-intercept of the line which fits data.
stderr : float
Standard error of the estimated gradient.
'''
from scipy.stats import linregress
# Primary RVs on y.
y = [datum[1] for datum in data if not np.isnan(datum[1]+datum[2])]
# Secondary RVs on x.
x = [datum[2] for datum in data if not np.isnan(datum[1]+datum[2])]
slope, intercept, rvalue, pvalue, stderr = linregress(x,y)
return -slope, intercept, stderr
def load_calibration():
'''Load calibration data and calculate calibration values.'''
# TODO: handle different calibration measurements, not just one for datadir
global p1_cal, p2_cal
try:
# Load the variables in 'calibration.py'.
calglobals = runpy.run_path(
os.path.join(datadir, 'calibration.py')
)
# Store the calibration data and the linear regression.
p1_cal = dict(data=calglobals['p1_data'])
try:
p1_zero_idx = p1_cal['data']['refinputs'].index(0.0)
p1_offset = p1_cal['data']['measurements'][p1_zero_idx]
except IndexError:
p1_offset = 0.0
p1_cal['regression'] = stats.linregress(
np.array(p1_cal['data']['measurements']) - p1_offset,
np.array(p1_cal['data']['refinputs'])
)
p2_cal = dict(data=calglobals['p2_data'])
try:
p2_zero_idx = p2_cal['data']['refinputs'].index(0.0)
p2_offset = p2_cal['data']['measurements'][p2_zero_idx]
except IndexError:
p2_offset = 0.0
p2_cal['regression'] = stats.linregress(
np.array(p2_cal['data']['measurements']) - p2_offset,
np.array(p2_cal['data']['refinputs'])
)
except Exception as e:
# TODO: print info/warning?
print(e)
p1_cal = None
p2_cal = None
print('p1_cal: ', p1_cal)
print('p2_cal: ', p2_cal)
def getLinearModel(x_values, y_values, k=1.0, l=1.0):
gradient, intercept, r_value, p_value, std_err = stats.linregress(x_values,y_values)
y_model = []
grad = k*gradient
interc = l*intercept
for x in x_values:
y = trendline(x, grad, interc)
y_model.append(y)
return y_model
def getLinearModel_rValue(x_values, y_values):
gradient, intercept, r_value, p_value, std_err = stats.linregress(x_values,y_values)
return r_value
def getLinearModel_pValue(x_values, y_values):
gradient, intercept, r_value, p_value, std_err = stats.linregress(x_values,y_values)
return p_value
def test_?_w(self):
""" ?_w = [A][A^T]. See p. 533.
DESCRIPTION: Since A is a free parameter, t will not necessarily recover
the original A. ?_w is what really describes the covariance
between cluster means (see p. 533), so that is what you want
to test - it is 'closer' to the data.
"""
n_experiments = int(np.log10(1000000) / 2)
n_list = [100 ** x for x in range(1, n_experiments + 1)]
n_list = np.array(n_list).astype(float)
n_dims = self.n_dims
n_classes = 30 #self.n_classes
?_w_L1_errors = []
for n in n_list:
A, ?, model = self.experiment(int(n), n_dims, n_classes)
?_w = np.matmul(A, A.T)
?_w_model = np.matmul(model.A, model.A.T)
L1_error = np.abs(?_w - ?_w_model).mean()
abs_? = (np.abs(?_w).mean() + np.abs(?_w_model).mean()) * .5
percent_error = L1_error / abs_? * 100
print('Testing ?_w with {} samples: {} percent error'.format(n,
percent_error))
?_w_L1_errors.append(percent_error)
Y = ?_w_L1_errors
X = [x for x in range(len(?_w_L1_errors))]
slope_of_error_vs_N = linregress(X, Y)[0]
self.assertTrue(slope_of_error_vs_N < 0)
def test_?_b(self):
# """ ?_b = [A][?][A^T]. See p. 533.
#
# DESCRIPTION: Since A and ? are free parameters, they will not necessarily
# recover the original A & ?. ?_b is what really describes
# the covariance between cluster means (see p. 533), so that
# is what you want to test - they are 'closer' to the data".
# """
n_classes_list = [4 ** x for x in range(1, 6)]
n_list = [100 * n for n in n_classes_list]
n_list = np.array(n_list).astype(float)
n_dims = self.n_dims
?_b_L1_errors = []
for n, n_classes in zip(n_list, n_classes_list):
A, ?, model = self.experiment(int(n), n_dims, n_classes)
?_b = np.matmul(np.matmul(A, ?), A.T)
?_b_model = np.matmul(np.matmul(model.A, model.?), model.A.T)
L1_error = np.abs(?_b - ?_b_model).mean()
abs_? = (np.abs(?_b).mean() + np.abs(?_b_model).mean()) * .5
percent_error = L1_error / abs_? * 100
?_b_L1_errors.append(percent_error)
print('Testing ?_b with {} classes: {} percent error'.format(
n_classes, percent_error))
Y = ?_b_L1_errors
X = [x for x in range(len(?_b_L1_errors))]
slope_of_error_vs_N = linregress(X, Y)[0]
self.assertTrue(slope_of_error_vs_N < 0)
def linreg(x,y,units='PW/Sv'):
""" Return linear regression model and plot label """
if len(x) > 1:
slope, intercept, r_value, p_value, std_err = stats.linregress(x,y)
y_model = x * slope + intercept
label = '(%5.3f %s)' % (slope, units)
else:
y_model = None
label = ''
return y_model, label