def test_select_best(self):
"""
Test the select best fit estimator
"""
X, y = ANSCOMBE[1]
X = np.array(X)
y = np.array(y)
X = X[:,np.newaxis]
model = fit_select_best(X, y)
self.assertIsNotNone(model)
self.assertIsInstance(model, Pipeline)
X, y = ANSCOMBE[3]
X = np.array(X)
y = np.array(y)
X = X[:,np.newaxis]
model = fit_select_best(X, y)
self.assertIsNotNone(model)
self.assertIsInstance(model, LinearRegression)
python类LinearRegression()的实例源码
def test_estimator_instance(self):
"""
Test that isestimator works for instances
"""
models = (
LinearRegression(),
LogisticRegression(),
KMeans(),
LSHForest(),
PCA(),
RidgeCV(),
LassoCV(),
RandomForestClassifier(),
)
for model in models:
self.assertTrue(isestimator(model))
def test_estimator_class(self):
"""
Test that isestimator works for classes
"""
models = (
LinearRegression,
LogisticRegression,
KMeans,
LSHForest,
PCA,
RidgeCV,
LassoCV,
RandomForestClassifier,
)
for model in models:
self.assertTrue(inspect.isclass(model))
self.assertTrue(isestimator(model))
def test_clusterer_enforcement(self):
"""
Assert that only clustering estimators can be passed to cluster viz
"""
nomodels = [
SVC, SVR, Ridge, RidgeCV, LinearRegression, RandomForestClassifier
]
for nomodel in nomodels:
with self.assertRaises(YellowbrickTypeError):
visualizer = ClusteringScoreVisualizer(nomodel())
models = [
KMeans, MiniBatchKMeans, AffinityPropagation, MeanShift, DBSCAN, Birch
]
for model in models:
try:
visualizer = ClusteringScoreVisualizer(model())
except YellowbrickTypeError:
self.fail("could not pass clustering estimator to visualizer")
Step 4 Final Strategy V1 with 4 factors.py 文件源码
项目:Market-Neutral-Model
作者: SunJiaxuan
项目源码
文件源码
阅读 49
收藏 0
点赞 0
评论 0
def GetBeta(f,*args):
FactorValue = f(*args)
stock = args[0]
date = args[1]
#Get 20 Business day's data
tempprice = get_price(list(stock), date, "{:%Y-%m-%d}".format(datetime.datetime.strptime(date, '%Y-%m-%d') + datetime.timedelta(days=30)), frequency='1d', fields=None)['OpeningPx']
tempreturn = np.log(tempprice.iloc[-1]/tempprice.iloc[0])
#print('FV',FactorValue)
FactorValue = pd.DataFrame(FactorValue)
DataAll = pd.concat([FactorValue,tempreturn],axis = 1)
DataAll = DataAll.dropna()
DataAll.columns = ['f','p']
#print('fs',FactorValue.shape)
#print('ts',tempreturn.shape)
#print(DataAll)
#print(DataAll.shape)
#print(np.matrix(DataAll.ix[:,0]).shape)
#print(np.matrix(DataAll.ix[:,1]).shape)
regr = linear_model.LinearRegression()
regr.fit(np.transpose(np.matrix(DataAll['f'])), np.transpose(np.matrix(DataAll['p'])))
return regr.coef_
Step 4 Final Strategy V1 with 4 factors.py 文件源码
项目:Market-Neutral-Model
作者: SunJiaxuan
项目源码
文件源码
阅读 30
收藏 0
点赞 0
评论 0
def GetResiduals(stock,enddate):
Xinput = [EquityOCFP(stock,enddate), EquitySize(stock,enddate), RSIIndividual(stock,enddate), Min130Day(stock,enddate)]
X = pd.concat(Xinput, axis=1)
date = enddate
tempprice = get_price(list(stock), date, "{:%Y-%m-%d}".format(datetime.datetime.strptime(date, '%Y-%m-%d') + datetime.timedelta(days=30)), frequency='1d', fields=None)['OpeningPx']
y = np.log(tempprice.iloc[-1]/tempprice.iloc[0])
DataAll = pd.concat([X,y],axis = 1)
DataAll = DataAll.dropna()
regr = linear_model.LinearRegression()
regr.fit(np.matrix(DataAll.ix[:,0:4]), np.transpose(np.matrix(DataAll.ix[:,4])))
residuals = regr.predict(np.matrix(DataAll.ix[:,0:4])) - np.transpose(np.matrix(DataAll.ix[:,4]))
residuals = pd.DataFrame(data = residuals, index = np.transpose(np.matrix(DataAll.index.values)))
residuals.index = DataAll.index.values
residuals.columns = [enddate]
return residuals
#This function is used in the later function
def getDataSet(self, max_value_threshold = 1000, train_length_threshold = 30):
try:
return self.data_set
except:
self.__gen_data_set(max_value_threshold = max_value_threshold,
train_length_threshold = train_length_threshold)
return self.data_set
# def __gen_model(self, model = LinearRegression()):
# X_train, y_train, _ = self.getDataSet(10000, 60)
# model.fit(X_train, y_train)
# if self.ifPlotTrain:
# y_pred = model.predict(X_train)
# df = pd.DataFrame(np.hstack((y_train.reshape(-1,1), y_pred.reshape(-1,1))))
# df.columns = ['Train', 'Predict']
# df[:60].plot()
# plt.title('train_all')
# fig = plt.gcf()
# fig.savefig('./img/train_all.png')
# plt.close(fig)
# self.model = model
def getDataSet(self, max_value_threshold = 1000, train_length_threshold = 30):
try:
return self.data_set
except:
self.__gen_data_set(max_value_threshold = max_value_threshold,
train_length_threshold = train_length_threshold)
return self.data_set
# def __gen_model(self, model = LinearRegression()):
# X_train, y_train, _ = self.getDataSet(10000, 60)
# model.fit(X_train, y_train)
# if self.ifPlotTrain:
# y_pred = model.predict(X_train)
# df = pd.DataFrame(np.hstack((y_train.reshape(-1,1), y_pred.reshape(-1,1))))
# df.columns = ['Train', 'Predict']
# df[:60].plot()
# plt.title('train_all')
# fig = plt.gcf()
# fig.savefig('./img/train_all.png')
# plt.close(fig)
# self.model = model
def analysis():
mysql_cn= pymysql.connect(host='10.25.0.119', port=3306,user='root', passwd='111111', db='music')
df = pd.read_sql('''
SELECT COUNT(*) as plays, ds from user_actions JOIN songs
on user_actions.song_id = songs.song_id
WHERE ds >= '20150805' AND ds <= '20150830' AND action_type = '1'
AND artist_id = 'c026b84e8f23a7741d9b670e3d8973f0'
GROUP BY artist_id, ds
ORDER BY ds
'''.format(),mysql_cn)
X = np.array([i for i in range(26)])
df.columns = ['plays', 'ds']
y = df['plays'].values
print X, y
model = LinearRegression()
model.fit(X.reshape(X.shape[0], 1), y.reshape(y.shape[0]))
x = np.array([i for i in range(26, 50)])
Y = model.predict(x.reshape(x.shape[0], 1))
df = pd.DataFrame(Y)
print Y
df.plot()
plt.show()
mysql_cn.close()
def test_pink_noise_slope():
n_points = 10000
fs = 500.0
try:
from sklearn.linear_model import LinearRegression
except ImportError:
return True
# test the slope
for slope in [1, 1.5, 2]:
noise = pink_noise(n_points, slope=slope)
spec = Spectrum(fs=fs)
psd = spec.periodogram(noise).T
freq = np.linspace(0, fs / 2., psd.size)[:, None]
# linear regression fit in the log domain
reg = LinearRegression()
reg.fit(np.log10(freq[1:]), np.log10(psd[1:]))
assert_almost_equal(reg.coef_[0][0], -slope, decimal=1)
def mlr_val( RM, yE, disp = True, graph = True, rate = 2, more_train = True, center = None):
"""
Validation is peformed as much as the given ratio.
"""
RMt, yEt, RMv, yEv = jchem.get_valid_mode_data( RM, yE, rate = rate, more_train = more_train, center = center)
clf = linear_model.LinearRegression()
clf.fit( RMt, yEt)
print('Training result')
mlr_show( clf, RMt, yEt, disp = disp, graph = graph)
print('Validation result')
r_sqr, RMSE = mlr_show( clf, RMv, yEv, disp = disp, graph = graph)
return r_sqr, RMSE
def cv_train_test( xMa, yVa, tr, ts):
"""
Regression and test is performed for given data
with cross-validation streams
"""
xM = xMa[ tr, :]
yV = yVa[ tr, 0]
clf = linear_model.LinearRegression()
clf.fit( xM, yV)
# The testing information is extracted.
xM_test = xMa[ ts, :]
yV_test = yVa[ ts, 0]
return yV_test.A1, clf.predict( xM_test).ravel()
def gs_param( model, X, y, param_grid, n_splits=5, shuffle=True, n_jobs=-1, graph=False):
"""
gs = gs_param( model, X, y, param_grid, n_splits=5, shuffle=True, n_jobs=-1)
Inputs
======
model = svm.SVC(), or linear_model.LinearRegression(), for example
param = {"C": np.logspace(-2,2,5)}
"""
#print(xM.shape, yVc.shape)
kf5_c = model_selection.KFold( n_splits=n_splits, shuffle=shuffle)
gs = model_selection.GridSearchCV( model, param_grid, cv=kf5_c, n_jobs=n_jobs)
gs.fit( X, y)
if graph:
plt.plot( gs.cv_results_["mean_train_score"], label='E[Train]')
plt.plot( gs.cv_results_["mean_test_score"], label='E[Test]')
plt.legend(loc=0)
plt.grid()
return gs
def cv_pilot_only(self):
"""
Cross-validatin scores are evaluated using LOO.
SNRpilot is equal to SNR, which is SNRdata.
"""
yT_a = self.rx_p["yT_a"]
x_a = self.rx_p["x_a"]
lm = linear_model.LinearRegression()
scores = codes.cross_val_score_loo( lm, yT_a, x_a)
# Output is stored with enviromental variables.
pdi = pd.DataFrame()
pdi["model"] = ["LinearRegression"]
pdi["alpha"] = [0]
pdi["metric"] = ["mean_squared_error"]
pdi["E[scores]"] = [np.mean(scores)]
pdi["std[scores]"] = [np.std(scores)]
pdi["scores"] = [scores]
return pdi
def cv_pilot_reg_only(self, alpha = 0):
model = self.model
yT_a = self.rx_p["yT_a"]
x_a = self.rx_p["x_a"]
# kf = KFold()
# loo = cross_validation.LeaveOneOut( x_a.shape[0])
if alpha == 0:
lm = linear_model.LinearRegression()
else:
lm = getattr( linear_model, model)(alpha)
scores = codes.cross_val_score_loo( lm, yT_a, x_a)
# Output is stored with enviromental variables.
pdi = pd.DataFrame()
pdi["model"] = [model]
pdi["alpha"] = [alpha]
pdi["metric"] = ["mean_squared_error"]
pdi["E[scores]"] = [np.mean(np.power(scores,2))] # MSE
pdi["std[scores]"] = ["t.b.d."]
pdi["scores"] = [scores]
return pdi
def Beta(self):
prixe = math.log(0.03637 / float(365) + 1)
df1 = self.sharedf
df1['change']=df1['change']-prixe
df2 = ShareClass().GetDayData(code='000001',zs=True)
print 11111111111
coef = []
intercept = []
residues=[]
ret= pandas.merge(df1,df2,how='inner',on='date')
array2 = []
if len(ret) > 252:
for z in range(0, 252):
array2.append(math.pow(math.pow(float(1) / 2, float(1 / float(63))), (252 - z - 1)))
for z in range(0, 251):
coef.append(numpy.NaN)
intercept.append(numpy.NaN)
residues.append(numpy.NaN)
for c in range(252, len(ret)+1):
array=[]
for x in ret[c - 252:c]['change_x']:
array.append([x])
clf = linear_model.LinearRegression()
clf.fit(X=array, y=ret[c - 252:c]["change_y"], sample_weight=array2)
coef.append(float(clf.coef_))
residues.append(clf._residues)
intercept.append(float(clf.intercept_))
ret['beta'] = coef
ret['alpha'] = intercept
ret['residues'] = residues
return ret[['date','beta','alpha','residues']]
def define_model(self):
#if self.modeltype == "AR" :
# return statsmodels.tsa.ar_model.AR(max_order=self.parameters['max_order'])
if self.modeltype == "RandomForest" :
return ensemble.RandomForestRegressor(n_estimators=self.parameters['n_estimators'])
#return ensemble.RandomForestClassifier(
# n_estimators=self.parameters['n_estimators'])
elif self.modeltype == "LinearRegression" :
return linear_model.LinearRegression()
elif self.modeltype == "Lasso" :
return linear_model.Lasso(
alpha=self.parameters['alpha'])
elif self.modeltype == "ElasticNet" :
return linear_model.ElasticNet(
alpha=self.parameters['alpha'],
l1_ratio=self.parameters['l1_ratio'])
elif self.modeltype == "SVR" :
return SVR(
C=self.parameters['C'],
epsilon=self.parameters['epsilon'],
kernel=self.parameters['kernel'])
#elif self.modeltype == 'StaticModel':
# return StaticModel (
# parameters=self.parameters
# )
#elif self.modeltype == 'AdvancedStaticModel':
# return AdvancedStaticModel (
# parameters=self.parameters
# )
# elif self.modeltype == 'SGDRegressor' :
# print(self.parameters)
# return linear_model.SGDRegressor(
# loss=self.parameters['loss'],
# penalty=self.parameters['penalty'],
# l1_ratio=self.parameters['l1_ratio'])
else:
raise ConfigError("Unsupported model {0}".format(self.modeltype))
def regressionDistance(vec1,vec2):
regr = linear_model.LinearRegression()
regr.fit(np.asarray(vec1).reshape(len(vec1),1),np.asarray(vec2))
return regr.coef_
def outofsample_extensions(method='linear-regression'):
# Load the data and init seeds
train_data, train_labels, test_data, test_labels = load_mnist()
np.random.seed(1)
sklearn.utils.check_random_state(1)
n_train_samples = 5000
# Learn a new space using Isomap
isomap = Isomap(n_components=10, n_neighbors=20)
train_data_isomap = np.float32(isomap.fit_transform(train_data[:n_train_samples, :]))
if method == 'linear-regression':
# Use linear regression to provide baseline out-of-sample extensions
proj = LinearRegression()
proj.fit(np.float64(train_data[:n_train_samples, :]), np.float64(train_data_isomap))
acc = evaluate_svm(proj.predict(train_data[:n_train_samples, :]), train_labels[:n_train_samples],
proj.predict(test_data), test_labels)
elif method == 'c-ISOMAP-10d' or method == 'c-ISOMAP-20d':
# Use the SEF to provide out-of-sample extensions
if method == 'c-ISOMAP-10d':
proj = LinearSEF(train_data.shape[1], output_dimensionality=10)
proj.cuda()
else:
proj = LinearSEF(train_data.shape[1], output_dimensionality=20)
proj.cuda()
loss = proj.fit(data=train_data[:n_train_samples, :], target_data=train_data_isomap, target='copy',
epochs=50, batch_size=128, verbose=True, learning_rate=0.001, regularizer_weight=1)
acc = evaluate_svm(proj.transform(train_data[:n_train_samples, :]), train_labels[:n_train_samples],
proj.transform(test_data), test_labels)
print("Method: ", method, " Test accuracy: ", 100 * acc, " %")
def outofsample_extensions(method=None, dataset=None):
np.random.seed(1)
sklearn.utils.check_random_state(1)
train_data, train_labels, test_data, test_labels = dataset_loader(dataset, seed=1)
# Learn a new space using Isomap
isomap = Isomap(n_components=10, n_neighbors=20)
train_data_isomap = np.float32(isomap.fit_transform(train_data))
if method == 'linear-regression':
from sklearn.preprocessing import StandardScaler
std = StandardScaler()
train_data = std.fit_transform(train_data)
test_data = std.transform(test_data)
# Use linear regression to provide baseline out-of-sample extensions
proj = LinearRegression()
proj.fit(np.float64(train_data), np.float64(train_data_isomap))
acc = evaluate_svm(proj.predict(train_data), train_labels,
proj.predict(test_data), test_labels)
elif method == 'c-ISOMAP-10d' or method == 'c-ISOMAP-20d':
# Use the SEF to provide out-of-sample extensions
if method == 'c-ISOMAP-10d':
proj = LinearSEF(train_data.shape[1], output_dimensionality=10)
proj.cuda()
else:
proj = LinearSEF(train_data.shape[1], output_dimensionality=20)
proj.cuda()
loss = proj.fit(data=train_data, target_data=train_data_isomap, target='copy',
epochs=50, batch_size=1024, verbose=False, learning_rate=0.001, regularizer_weight=1)
acc = evaluate_svm(proj.transform(train_data), train_labels,
proj.transform(test_data), test_labels)
print("Method: ", method, " Test accuracy: ", 100 * acc, " %")