def test_stacked_regressor(self):
bclf = LinearRegression()
clfs = [RandomForestRegressor(n_estimators=50, random_state=1),
GradientBoostingRegressor(n_estimators=25, random_state=1),
Ridge(random_state=1)]
# Friedman1
X, y = datasets.make_friedman1(n_samples=1200,
random_state=1,
noise=1.0)
X_train, y_train = X[:200], y[:200]
X_test, y_test = X[200:], y[200:]
sr = StackedRegressor(bclf,
clfs,
n_folds=3,
verbose=0,
oob_score_flag=True)
sr.fit(X_train, y_train)
mse = mean_squared_error(y_test, sr.predict(X_test))
assert_less(mse, 6.0)
python类LinearRegression()的实例源码
def test_fwls_regressor(self):
feature_func = lambda x: np.ones(x.shape)
bclf = LinearRegression()
clfs = [RandomForestRegressor(n_estimators=50, random_state=1),
GradientBoostingRegressor(n_estimators=25, random_state=1),
Ridge(random_state=1)]
# Friedman1
X, y = datasets.make_friedman1(n_samples=1200,
random_state=1,
noise=1.0)
X_train, y_train = X[:200], y[:200]
X_test, y_test = X[200:], y[200:]
sr = FWLSRegressor(bclf,
clfs,
feature_func,
n_folds=3,
verbose=0,
oob_score_flag=True)
sr.fit(X_train, y_train)
mse = mean_squared_error(y_test, sr.predict(X_test))
assert_less(mse, 6.0)
def scatter_regresion_Plot(X, Y, testName):
plt.scatter(X, Y, c = 'b', label = '_nolegend_', s = 1)
X = X.reshape(-1, 1)
Y = Y.reshape(-1, 1)
R2 = r2_score(X, Y)
regr = linear_model.LinearRegression()
regr.fit(X, Y)
plt.plot(X, regr.predict(X), "--", label = 'Regression', color = 'r')
plt.title(testName + ' ($R^2$: ' + "{0:.3f}".format(R2) + ")", fontsize = 14)
plt.xlabel('True Values', fontsize = 12, weight = 'bold')
plt.ylabel('Predicted Values', fontsize = 12, weight = 'bold')
plt.legend(loc = 'upper left', bbox_to_anchor = (0, 1.0), fancybox = True, shadow = True, fontsize = 10)
plt.subplots_adjust(left = 0.2, right = 0.9, bottom = 0.05, top = 0.97, wspace = 0.15, hspace = 0.3)
def model_cross_valid(X,Y):
seed = 7
kfold = model_selection.KFold(n_splits=10, random_state=seed)
def bulid_model(model_name):
model = model_name()
return model
scoring = 'neg_mean_squared_error'
# + random fest boost lstm gbdt
for model_name in [LinearRegression,ElasticNet]:
#for model_name in [LinearRegression,Ridge,Lasso,ElasticNet,KNeighborsRegressor,DecisionTreeRegressor,SVR,RandomForestRegressor,AdaBoostRegressor,GradientBoostingRegressor]:
model = bulid_model(model_name)
results = model_selection.cross_val_score(model, X, Y, cv=kfold, scoring=scoring)
print(model_name,results.mean())
def fit_lr(train_X, train_y, test_X):
"""
Use linear regression to predict.
:param train_X:
:param train_y:
:param test_X:
:return:
"""
lr = LinearRegression()
lr.fit(train_X, train_y)
yhat_train = lr.predict(train_X)
yhat_test = lr.predict(test_X)
model = "LR int %.2f coefs %s" % (lr.intercept_, pprint(lr.coef_))
return model, yhat_train, yhat_test
def __init__(self, model, statistics_calc, backend, n_samples = 1000, seed = None):
self.model = model
self.statistics_calc = statistics_calc
self.backend = backend
self.rng = np.random.RandomState(seed)
self.model.prior.reseed(self.rng.randint(np.iinfo(np.uint32).max, dtype=np.uint32))
# main algorithm
seed_arr = self.rng.randint(1, n_samples*n_samples, size=n_samples, dtype=np.int32)
seed_pds = self.backend.parallelize(seed_arr)
sample_parameters_statistics_pds = self.backend.map(self._sample_parameter_statistics, seed_pds)
sample_parameters_and_statistics = self.backend.collect(sample_parameters_statistics_pds)
sample_parameters, sample_statistics = [list(t) for t in zip(*sample_parameters_and_statistics)]
sample_parameters = np.array(sample_parameters)
sample_statistics = np.concatenate(sample_statistics)
self.coefficients_learnt = np.zeros(shape=(sample_parameters.shape[1],sample_statistics.shape[1]))
regr = linear_model.LinearRegression(fit_intercept=True)
for ind in range(sample_parameters.shape[1]):
regr.fit(sample_statistics, sample_parameters[:,ind])
self.coefficients_learnt[ind,:] = regr.coef_
def calculate_residual_correlation_matrix(returns):
# find the market return constraining on the selected companies (first PCA)
# regress each stock on that and find correlation of residuals
returns_matrix = returns.as_matrix().transpose()
covar_matrix = np.cov(returns_matrix)
pca = decomposition.PCA(n_components=1)
pca.fit(covar_matrix)
X = pca.transform(covar_matrix)
regr = linear_model.LinearRegression()
dim = covar_matrix.shape[1]
res = np.zeros(shape=(dim,dim))
for x in range(0, dim):
regr = linear_model.LinearRegression()
regr = regr.fit(X, covar_matrix[:,x])
res[:,x] = covar_matrix[:,x] - regr.predict(X)
res_corr = np.corrcoef(res)
return pd.DataFrame(res_corr, index = returns.columns, columns = returns.columns)
def fit_regression(X, y, regression_class=LinearRegression, regularization_const=.001):
'''
Given a dataset and some solutions (X, y) a regression class (from scikit learn)
and an Lambda which is required if the regression class is Lasso or Ridge
X (pandas DataFrame): The data.
y (pandas DataFrame or Series): The answers.
regression_class (class): One of sklearn.linear_model.[LinearRegression, Ridge, Lasso]
regularization_const: the regularization_const value (regularization parameter) for Ridge or Lasso.
Called alpha by scikit learn for interface reasons.
Return:
tuple, (the_fitted_regressor, mean(cross_val_score)).
'''
if regression_class is LinearRegression:
predictor = regression_class()
else:
predictor = regression_class(alpha=regularization_const, normalize=True)
predictor.fit(X, y)
cross_scores = cross_val_score(predictor, X, y=y, scoring='neg_mean_squared_error')
cross_scores_corrected = np.sqrt(-1 * cross_scores) # Scikit learn returns negative vals && we need root
return (predictor, np.mean(cross_scores_corrected))
def test_least_square_model(prostate_data):
from esl_model.ch3.models import LeastSquareModel
train_x, train_y, test_x, test_y, features = prostate_data
lsm = LeastSquareModel(train_x=train_x, train_y=train_y, features_name=features)
lsm.pre_processing()
lsm.train()
print(lsm.beta_hat)
print('rss:',lsm.rss)
print('F-statistic', lsm.F_statistic(remove_cols=['age', 'lcp', 'gleason', 'pgg45']))
print('z-score', lsm.z_score)
result = lsm.test(test_x, test_y)
print('test error: ', result.mse)
from sklearn.linear_model import LinearRegression
lr = LinearRegression()
lr.fit(train_x, train_y)
print('std error', result.std_error)
assert np.isclose(result.mse, np.mean(((lr.predict(test_x)) - test_y) **2))
def rolling_beta(X, y, idx, window=100):
assert len(X) == len(y)
out_dates = []
out_beta = []
model_ols = linear_model.LinearRegression()
for iStart in range(0, len(X) - window):
iEnd = iStart + window
_x = X[iStart:iEnd].values.reshape(-1, 1)
_y = y[iStart:iEnd].values.reshape(-1, 1)
model_ols.fit(_x, _y)
# store output
out_dates.append(idx[iEnd])
out_beta.append(model_ols.coef_[0][0])
return pd.DataFrame({'beta': out_beta}, index=out_dates)
def rolling_beta(X, y, idx, window=100):
assert len(X) == len(y)
out_dates = []
out_beta = []
model_ols = linear_model.LinearRegression()
for iStart in range(0, len(X) - window):
iEnd = iStart + window
_x = X[iStart:iEnd].values.reshape(-1, 1)
_y = y[iStart:iEnd].values.reshape(-1, 1)
model_ols.fit(_x, _y)
# store output
out_dates.append(idx[iEnd])
out_beta.append(model_ols.coef_[0][0])
return pd.DataFrame({'beta': out_beta}, index=out_dates)
def test_linear_regressor(self):
for dtype in self.number_data_type.keys():
scikit_model = LinearRegression(normalize=True)
data = self.scikit_data['data'].astype(dtype)
target = self.scikit_data['target'].astype(dtype)
scikit_model, spec = self._sklearn_setup(scikit_model, dtype, data, target)
test_data = data[0].reshape(1, -1)
coreml_model = create_model(spec)
try:
self.assertEqual(scikit_model.predict(test_data)[0].dtype,
type(coreml_model.predict({'data': test_data})['target']))
self.assertAlmostEqual(scikit_model.predict(test_data)[0],
coreml_model.predict({'data': test_data})['target'],
msg="{} != {} for Dtype: {}".format(
scikit_model.predict(test_data)[0],
coreml_model.predict({'data': test_data})['target'],
dtype
)
)
except RuntimeError:
print("{} not supported. ".format(dtype))
def setUpClass(self):
"""
Set up the unit test by loading the dataset and training a model.
"""
if not(HAS_SKLEARN):
return
scikit_data = load_boston()
feature_names = scikit_data.feature_names
scikit_model = LinearRegression()
scikit_model.fit(scikit_data['data'], scikit_data['target'])
# Save the data and the model
self.scikit_data = scikit_data
self.scikit_model = scikit_model
def setUpClass(self):
"""
Set up the unit test by loading the dataset and training a model.
"""
if not HAS_SKLEARN:
return
scikit_data = load_boston()
feature_names = scikit_data.feature_names
scikit_model = Pipeline(steps = [
('linear' , LinearRegression())
])
scikit_model.fit(scikit_data['data'], scikit_data['target'])
# Save the data and the model
self.scikit_data = scikit_data
self.scikit_model = scikit_model
def test_linear_regression_evaluation(self):
"""
Check that the evaluation results are the same in scikit learn and coremltools
"""
input_names = self.scikit_data.feature_names
df = pd.DataFrame(self.scikit_data.data, columns=input_names)
for normalize_value in (True, False):
cur_model = LinearRegression(normalize=normalize_value)
cur_model.fit(self.scikit_data['data'], self.scikit_data['target'])
spec = convert(cur_model, input_names, 'target')
df['prediction'] = cur_model.predict(self.scikit_data.data)
metrics = evaluate_regressor(spec, df)
self.assertAlmostEquals(metrics['max_error'], 0)
def find_parameters_w(X, Y):
"""Find the parameter values w for the model which best fits X and Y.
Args:
X: A 2-dimensional numpy array representing the independent variables
in the linear regression model.
Y: A numpy array of floats representing the dependent variables in the
linear regression model.
Returns:
A tuple (w0, w1, w2, w3, w4) representing the parameter values w.
"""
clf = linear_model.LinearRegression()
clf.fit(X, Y)
w0 = clf.intercept_
w1, w2, w3, w4 = clf.coef_
return w0, w1, w2, w3, w4
def predict_price(dates, prices, x):
dates = np.reshape(dates, (len(dates),1)) # converting to matrix of n X 1
prices = np.reshape(prices, (len(prices),1))
linear_mod = linear_model.LinearRegression() # defining the linear regression model
linear_mod.fit(dates, prices) # fitting the data points in the model
plt.scatter(dates, prices, color= 'black', label= 'Data') # plotting the initial datapoints
plt.plot(dates, linear_mod.predict(dates), color= 'red', label= 'Linear model') # plotting the line made by linear regression
plt.xlabel('Date')
plt.ylabel('Price')
plt.title('Linear Regression')
plt.legend()
plt.show()
return linear_mod.predict(x)[0][0], linear_mod.coef_[0][0], linear_mod.intercept_[0]
def prepare_fit_model_for_factors(model_type, x_train, y_train):
"""
Given a model type, train and test data
Args:
model_type (str): 'classification' or 'regression'
x_train:
y_train:
Returns:
(sklearn.base.BaseEstimator): A fit model.
"""
if model_type == 'classification':
algorithm = LogisticRegression()
elif model_type == 'regression':
algorithm = LinearRegression()
else:
algorithm = None
if algorithm is not None:
algorithm.fit(x_train, y_train)
return algorithm
def regression_murder(year): # applies linear regression on murder rates
murder = pd.DataFrame()
dates = crime_rate_df.index.values.tolist()
murder['label'] = crime_rate_df['Murder and\nnonnegligent \nmanslaughter']
prediction_size = int(0.1 * len(murder))
X = np.array(dates)
y = np.array(murder['label'])
y.reshape((len(X), 1))
y_train = y[:-prediction_size]
X_train = X[:-prediction_size]
clf = LinearRegression()
clf.fit(X_train.reshape(-1, 1), y_train)
regression_line = [clf.predict(X_train[i].reshape(1, -1)) for i in range(len(X_train))]
print(clf.predict(year))
plt.scatter(X_train, y_train)
plt.plot(X_train, regression_line)
plt.show()
def linear_regression():
lr = LinearRegression()
lr.fit(X_train, y_train)
# Look at predictions on training and validation set
print("RMSE on Training set :", rmse_cv(lr, train_split, y).mean())
y_train_pred = lr.predict(train_split)
print('rmsle calculate by self:', rmsle(list(np.exp(y) - 1), list(np.exp(y_train_pred) - 1)))
plt.scatter(y_train_pred, y_train_pred - y, c="blue", marker="s", label="Training data")
plt.title("Linear regression")
plt.xlabel("Predicted values")
plt.ylabel("Residuals")
plt.legend(loc="upper left")
plt.hlines(y=0, xmin=10.5, xmax=13.5, color="red")
plt.show()
# Plot predictions
plt.scatter(y_train_pred, y, c="blue", marker="s", label="Training data")
plt.title("Linear regression")
plt.xlabel("Predicted values")
plt.ylabel("Real values")
plt.legend(loc="upper left")
plt.plot([10.5, 13.5], [10.5, 13.5], c="red")
plt.show()
return lr
def main():
diabetes = datasets.load_diabetes()
diabetes_X = diabetes.data[:, np.newaxis, 2]
diabetes_X_train = diabetes_X[:-20]
diabetes_X_test = diabetes_X[-20:]
diabetes_y_train = diabetes.target[:-20]
diabetes_y_test = diabetes.target[-20:]
regr = linear_model.LinearRegression()
regr.fit(diabetes_X_train, diabetes_y_train)
print('Coefficients: \n', regr.coef_)
print("Mean squared error: %.2f" %
np.mean((regr.predict(diabetes_X_test) - diabetes_y_test)**2))
print('Variance score: %.2f' % regr.score(diabetes_X_test, diabetes_y_test))
def test_parameter_estimation_low_memory(self):
X = np.random.uniform(0, 4, 1000)
y = X + np.random.normal(0, 1, 1000)
m = BayesianBootstrapBagging(LinearRegression(), 10000, 1000, low_mem=True)
m.fit(X.reshape(-1, 1), y)
coef_samples = [b.coef_ for b in m.base_models_]
intercept_samples = [b.intercept_ for b in m.base_models_]
self.assertAlmostEqual(np.mean(coef_samples), 1, delta=0.3)
l, r = central_credible_interval(coef_samples, alpha=0.05)
self.assertLess(l, 1)
self.assertGreater(r, 1)
l, r = highest_density_interval(coef_samples, alpha=0.05)
self.assertLess(l, 1)
self.assertGreater(r, 1)
self.assertAlmostEqual(np.mean(intercept_samples), 0, delta=0.3)
l, r = central_credible_interval(intercept_samples, alpha=0.05)
self.assertLess(l, 0)
self.assertGreater(r, 0)
self.assertAlmostEqual(np.mean(intercept_samples), 0, delta=0.3)
l, r = highest_density_interval(intercept_samples, alpha=0.05)
self.assertLess(l, 0)
self.assertGreater(r, 0)
def test_parameter_estimation(self):
X = np.random.uniform(0, 4, 1000)
y = X + np.random.normal(0, 1, 1000)
m = BayesianBootstrapBagging(LinearRegression(), 10000, 1000, low_mem=False)
m.fit(X.reshape(-1, 1), y)
coef_samples = [b.coef_ for b in m.base_models_]
intercept_samples = [b.intercept_ for b in m.base_models_]
self.assertAlmostEqual(np.mean(coef_samples), 1, delta=0.3)
l, r = central_credible_interval(coef_samples, alpha=0.05)
self.assertLess(l, 1)
self.assertGreater(r, 1)
l, r = highest_density_interval(coef_samples, alpha=0.05)
self.assertLess(l, 1)
self.assertGreater(r, 1)
self.assertAlmostEqual(np.mean(intercept_samples), 0, delta=0.3)
l, r = central_credible_interval(intercept_samples, alpha=0.05)
self.assertLess(l, 0)
self.assertGreater(r, 0)
self.assertAlmostEqual(np.mean(intercept_samples), 0, delta=0.3)
l, r = highest_density_interval(intercept_samples, alpha=0.05)
self.assertLess(l, 0)
self.assertGreater(r, 0)
def train_regressor(options, embed_map, wordvecs, worddict):
"""
Return regressor to map word2vec to RNN word space
"""
# Gather all words from word2vec that appear in wordvecs
d = defaultdict(lambda : 0)
for w in embed_map.vocab.keys():
d[w] = 1
shared = OrderedDict()
count = 0
for w in worddict.keys()[:options['n_words']-2]:
if d[w] > 0:
shared[w] = count
count += 1
# Get the vectors for all words in 'shared'
w2v = numpy.zeros((len(shared), 300), dtype='float32')
sg = numpy.zeros((len(shared), options['dim_word']), dtype='float32')
for w in shared.keys():
w2v[shared[w]] = embed_map[w]
sg[shared[w]] = wordvecs[w]
clf = LinearRegression()
clf.fit(w2v, sg)
return clf
def test_stacking():
model = Regressor(estimator=LinearRegression, parameters={}, dataset=RealDataset)
ds = model.stack(10)
assert ds.X_train.shape[0] == model.dataset.X_train.shape[0]
assert ds.X_test.shape[0] == model.dataset.X_test.shape[0]
assert ds.y_train.shape[0] == model.dataset.y_train.shape[0]
model = Regressor(estimator=LinearRegression, parameters={}, dataset=RealDataset)
ds = model.stack(10, full_test=False)
assert np.isnan(ds.X_train).sum() == 0
assert ds.X_train.shape[0] == model.dataset.X_train.shape[0]
assert ds.X_test.shape[0] == model.dataset.X_test.shape[0]
assert ds.y_train.shape[0] == model.dataset.y_train.shape[0]
model = Regressor(estimator=LinearRegression, parameters={}, dataset=RealDataset)
model.dataset.load()
ds = model.stack(10, full_test=False)
# Check cache
assert np.isnan(ds.X_train).sum() == 0
assert ds.X_train.shape[0] == model.dataset.X_train.shape[0]
assert ds.X_test.shape[0] == model.dataset.X_test.shape[0]
assert ds.y_train.shape[0] == model.dataset.y_train.shape[0]
def _get_trend(cls, log, starting_date):
"""Get commit count trend based on log.
:param log: a log on which the trend should be computed
:param starting_date: starting date of log
:return: computed trend
"""
records = [0]
date = starting_date
for entry in log:
if entry['author']['date'] > date + cls._SECONDS_PER_DAY:
date += cls._SECONDS_PER_DAY
records.append(0)
records[-1] += 1
lr = LinearRegression()
lr.fit(np.array(range(len(records))).reshape(-1, 1), np.array(records))
return lr.coef_[0]
def linear_model_manual(prediction_value):
data = pd.read_csv('E://Spyder/LinearRegression/data/data.csv')
X_tem = []
Y_tem = []
for X_data ,Y_data in zip(data['x'],data['y']):
X_tem.append(int(X_data))
Y_tem.append(float(Y_data))
X_parameters = np.array(X_tem)
Y_parameters = np.array(Y_tem)
xy = X_parameters*Y_parameters
xy_avg = xy.mean()
x_avg = X_parameters.mean()
y_avg = Y_parameters.mean()
x_square = X_parameters*X_parameters
x_square_avg = x_square.mean()
predictions = {}
#Method of least squares
predictions['coefficient'] = (xy_avg - x_avg*y_avg) / (x_square_avg - x_avg*x_avg)
predictions['intercept'] = y_avg - predictions['coefficient']*x_avg
#prediction_result
predictions['predictions_result'] = predictions['intercept'] + predictions['coefficient']*prediction_value
return predictions
def linear_model_multivariate():
#coefficient = (X_trans*X)^-1 * X_trans * y
data = pd.read_csv('E://Spyder/LinearRegression/data/data.csv')
X_tem = []
Y_tem = []
linearModel={}
for X_data ,Y_data in zip(data['x'],data['y']):
X_tem.append(int(X_data))
Y_tem.append(float(Y_data))
X_parameters = np.ones((len(X_tem),2))
for i in range(len(X_tem)):
X_parameters[i][0] = X_tem[i]
Y_parameters = np.array(Y_tem)
# Formula
# coefficient = inv(X.T*X) * X.T * y
coefficient = np.dot(np.dot(np.linalg.inv(np.dot(X_parameters.T,X_parameters)),X_parameters.T),Y_parameters)
avg_X = X_parameters.mean(axis = 0)
intercept = Y_parameters.mean() + coefficient * avg_X[1]
linearModel['coefficient'] = coefficient
linearModel['intercept'] = intercept
return linearModel
def get_loss():
#Calculate the loss the linear_model
data = pd.read_csv('E://Spyder/LinearRegression/data/data.csv')
X_tem = []
Y_tem = []
for X_data ,Y_data in zip(data['x'],data['y']):
X_tem.append([int(X_data)])
Y_tem.append(float(Y_data))
x_data = np.array(X_tem)
y_data = np.array(Y_tem)
regr = linear_model.LinearRegression()
regr.fit(x_data,y_data)
loss = np.sum((y_data - regr.predict(x_data)) ** 2)
return loss
#Function to show the result of linear fit model
representation.py 文件源码
项目:sport_movements_analysis
作者: guillaumeAssogba
项目源码
文件源码
阅读 31
收藏 0
点赞 0
评论 0
def plot2dRegression(x,y, nameX, nameY, namePlot):
model = LinearRegression()
linearModel = model.fit(x, y)
predictModel = linearModel.predict(x)
plt.scatter(x,y, color='g')
plt.plot(x, predictModel, color='k')
plt.xlabel(nameX)
plt.ylabel(nameY)
test = stats.linregress(predictModel,y)
print("The squared of the correlation coefficient R^2 is " + str(test.rvalue**2))
plt.savefig("plot/loadings/"+namePlot, bbox_inches='tight')
plt.show()
return test.rvalue**2
#plot the 2D regression between the performance values and the loadings.
#return the correlation factor: R squared