def evaluate_svm(train_data, train_labels, test_data, test_labels, n_jobs=-1):
"""
Evaluates a representation using a Linear SVM
It uses 3-fold cross validation for selecting the C parameter
:param train_data:
:param train_labels:
:param test_data:
:param test_labels:
:param n_jobs:
:return: the test accuracy
"""
# Scale data to 0-1
scaler = MinMaxScaler()
train_data = scaler.fit_transform(train_data)
test_data = scaler.transform(test_data)
parameters = {'kernel': ['linear'], 'C': [0.0001, 0.001, 0.01, 0.1, 1, 10, 100, 1000, 10000, 100000]}
model = svm.SVC(max_iter=10000)
clf = grid_search.GridSearchCV(model, parameters, n_jobs=n_jobs, cv=3)
clf.fit(train_data, train_labels)
lin_svm_test = clf.score(test_data, test_labels)
return lin_svm_test
python类MinMaxScaler()的实例源码
def build_ensemble(**kwargs):
"""Generate ensemble."""
ens = SuperLearner(**kwargs)
prep = {'Standard Scaling': [StandardScaler()],
'Min Max Scaling': [MinMaxScaler()],
'No Preprocessing': []}
est = {'Standard Scaling':
[ElasticNet(), Lasso(), KNeighborsRegressor()],
'Min Max Scaling':
[SVR()],
'No Preprocessing':
[RandomForestRegressor(random_state=SEED),
GradientBoostingRegressor()]}
ens.add(est, prep)
ens.add(GradientBoostingRegressor(), meta=True)
return ens
def test_replicability():
"""Make sure it can be seeded properly."""
X = iris.data # Use the iris features.
X = MinMaxScaler().fit_transform(X)
ae1 = Autoencoder(hidden_units=(1,),
n_epochs=1000,
random_state=4556,
learning_rate=1e-2,
keep_prob=1.0)
Xenc1 = ae1.fit_transform(X)
ae2 = Autoencoder(hidden_units=(1,),
n_epochs=1000,
random_state=4556,
learning_rate=1e-2,
keep_prob=1.0)
Xenc2 = ae2.fit_transform(X)
assert_array_almost_equal(Xenc1, Xenc2)
def train_model(self):
# scale
scaler = MinMaxScaler(feature_range=(0, 1))
dataset = scaler.fit_transform(self.data)
# split into train and test sets
train_size = int(len(dataset) * 0.95)
train, test = dataset[0:train_size, :], dataset[train_size:len(dataset), :]
look_back = 5
trainX, trainY = self.create_dataset(train, look_back)
# reshape input to be [samples, time steps, features]
trainX = numpy.reshape(trainX, (trainX.shape[0], 1, trainX.shape[1]))
# create and fit the LSTM network
model = Sequential()
model.add(LSTM(6, input_dim=look_back))
model.add(Dense(1))
model.compile(loss='mean_squared_error', optimizer='adam')
model.fit(trainX, trainY, nb_epoch=100, batch_size=1, verbose=2)
return model
def load_dataset(datasource: str) -> (numpy.ndarray, MinMaxScaler):
"""
The function loads dataset from given file name and uses MinMaxScaler to transform data
:param datasource: file name of data source
:return: tuple of dataset and the used MinMaxScaler
"""
# load the dataset
dataframe = pandas.read_csv(datasource, usecols=[1])
dataframe = dataframe.fillna(method='pad')
dataset = dataframe.values
dataset = dataset.astype('float32')
plt.plot(dataset)
plt.show()
# normalize the dataset
scaler = MinMaxScaler(feature_range=(0, 1))
dataset = scaler.fit_transform(dataset)
return dataset, scaler
def train(self, train_size=0.8, k_folds=5):
# retrieve data from DB and pre-process
self._get_data()
# perform train/test split
self._get_train_test_split(train_size=train_size)
# define text pre-processing pipeline
text_pipeline = Pipeline([
('extract_text', DFColumnExtractor(TEXT_FEATURES)),
('vect', TfidfVectorizer(tokenizer=twitter_tokenizer))
])
# define pipeline for pre-processing of numeric features
numeric_pipeline = Pipeline([
('extract_nums', DFColumnExtractor(NON_TEXT_FEATURES)),
('scaler', MinMaxScaler())
])
# combine both steps into a single pipeline
pipeline = Pipeline([
('features', FeatureUnion([
('text_processing', text_pipeline),
('num_processing', numeric_pipeline)
])),
('clf', self._estimator)
])
self.logger.info('Fitting model hyperparameters with {0}-fold CV'.format(k_folds))
gs = GridSearchCV(pipeline, self.params, n_jobs=-1, cv=k_folds)
X = self.data.iloc[self.train_inds_, :]
y = self.data[LABEL].values[self.train_inds_]
gs.fit(X, y)
self.logger.info('Validation set accuracy is {0}'.format(gs.best_score_))
self.gs_ = gs
self.model_ = gs.best_estimator_
def test_cutoff_inside_a_pipeline(data):
minmax_scaler = preprocessing.MinMaxScaler()
dsapp_cutoff = CutOff()
pipeline =Pipeline([
('minmax_scaler',minmax_scaler),
('dsapp_cutoff', dsapp_cutoff)
])
pipeline.fit(data['X_train'], data['y_train'])
X_fake_new_data = data['X_test'][-1,:].reshape(1,-1) + 0.5
mms = preprocessing.MinMaxScaler().fit(data['X_train'])
assert np.all(( mms.transform(X_fake_new_data) > 1 ) == (pipeline.transform(X_fake_new_data) == 1))
def test_dsapp_lr(data):
dsapp_lr = ScaledLogisticRegression()
dsapp_lr.fit(data['X_train'], data['y_train'])
minmax_scaler = preprocessing.MinMaxScaler()
dsapp_cutoff = CutOff()
lr = linear_model.LogisticRegression()
pipeline =Pipeline([
('minmax_scaler',minmax_scaler),
('dsapp_cutoff', dsapp_cutoff),
('lr', lr)
])
pipeline.fit(data['X_train'], data['y_train'])
assert np.all(dsapp_lr.predict(data['X_test']) == pipeline.predict(data['X_test']))
def get_input(self):
# Input data.
# Load the training, validation and test data into constants that are
# attached to the graph.
self.x_train, self.y_train,self.x_validation,self.y_validation = self.get_train_validationset()
self.x_train, self.y_train,self.x_validation,self.y_validation = self.x_train.as_matrix(), self.y_train.as_matrix().reshape((-1,1)),\
self.x_validation.as_matrix(),self.y_validation.as_matrix().reshape((-1,1))
# self.x_train, self.y_train,self.x_validation,self.y_validation = self.x_train.astype(np.float32), self.y_train.astype(np.float32),\
# self.x_validation.astype(np.float32),self.y_validation.astype(np.float32)
sc = MinMaxScaler()
sc.fit(self.x_train)
self.x_train= sc.transform(self.x_train)
self.x_validation= sc.transform(self.x_validation)
self.inputlayer_num = len(self.get_used_features())
self.outputlayer_num = 1
# Input placehoolders
with tf.name_scope('input'):
self.x = tf.placeholder(tf.float32, [None, self.inputlayer_num], name='x-input')
self.y_true = tf.placeholder(tf.float32, [None, self.outputlayer_num ], name='y-input')
self.keep_prob = tf.placeholder(tf.float32, name='drop_out')
return
def test_df_values(self):
est1 = dpp.MinMaxScaler()
est2 = dpp.MinMaxScaler()
result_ar = est1.fit_transform(X)
result_df = est2.fit_transform(df)
for attr in ['data_min_', 'data_max_', 'data_range_',
'scale_', 'min_']:
assert_eq_ar(getattr(est1, attr), getattr(est2, attr).values)
assert_eq_ar(est1.transform(X), est2.transform(X))
assert_eq_ar(est1.transform(df).values, est2.transform(X))
assert_eq_ar(est1.transform(X), est2.transform(df).values)
assert_eq_ar(result_ar, result_df.values)
def _pp_min_max_scale(df):
"""
????????
"""
print(" start minmax scaling...")
# drop?id?price_date??
# df = df.drop(['id', 'price_date'], axis=1)
# ??index???column??
index = df.index
columns = df.columns
# ????????
feature_scaled = preprocessing.MinMaxScaler().fit_transform(df.iloc[:, :-1])
target = np.array(df.iloc[:, -1])
target.shape = (len(target), 1)
# ???????X???????y?????Pandas ? DataFrame??????numpy?ndarray???
df_scaled = pd.DataFrame(np.hstack((feature_scaled, target)))
# ???????column??
df_scaled.index = index
df_scaled.columns = columns
print(" minmax scaling finished.")
return df_scaled
def load_norm_stats(stats_file, dim, method="MVN"):
#### load norm stats ####
io_funcs = BinaryIOCollection()
norm_matrix, frame_number = io_funcs.load_binary_file_frame(stats_file, dim)
assert frame_number==2
if method=="MVN":
scaler = preprocessing.StandardScaler()
scaler.mean_ = norm_matrix[0, :]
scaler.scale_ = norm_matrix[1, :]
elif method=="MINMAX":
scaler = preprocessing.MinMaxScaler(feature_range=(0.01, 0.99))
scaler.min_ = norm_matrix[0, :]
scaler.scale_ = norm_matrix[1, :]
return scaler
def load_norm_stats(stats_file, dim, method="MVN"):
#### load norm stats ####
io_funcs = BinaryIOCollection()
norm_matrix, frame_number = io_funcs.load_binary_file_frame(stats_file, dim)
assert frame_number==2
if method=="MVN":
scaler = preprocessing.StandardScaler()
scaler.mean_ = norm_matrix[0, :]
scaler.scale_ = norm_matrix[1, :]
elif method=="MINMAX":
scaler = preprocessing.MinMaxScaler(feature_range=(0.01, 0.99))
scaler.min_ = norm_matrix[0, :]
scaler.scale_ = norm_matrix[1, :]
return scaler
def get_term_topic(self, X):
n_features = X.shape[1]
id2word = self.vocabulary_
word2topic = {}
with open('word_topic.txt', 'r') as f:
for line in f:
strs = line.decode('utf-8').strip('\n').split('\t')
word2topic[strs[0]] = strs[2]
topic = np.zeros((len(id2word),))
for i, key in enumerate(id2word):
if key in word2topic:
topic[id2word[key]] = word2topic[key]
else:
print key
topic = preprocessing.MinMaxScaler().fit_transform(topic)
# topic = sp.spdiags(topic, diags=0, m=n_features,
# n=n_features, format='csr')
return topic
def get_term_topic(self, X):
n_features = X.shape[1]
id2word = self.vocabulary_
word2topic = {}
with open('word_topic.txt', 'r') as f:
for line in f:
strs = line.decode('utf-8').strip('\n').split('\t')
word2topic[strs[0]] = strs[2]
topic = np.zeros((len(id2word),))
for i, key in enumerate(id2word):
if key in word2topic:
topic[id2word[key]] = word2topic[key]
else:
print key
topic = preprocessing.MinMaxScaler().fit_transform(topic)
# topic = sp.spdiags(topic, diags=0, m=n_features,
# n=n_features, format='csr')
return topic
def __init__(self, mins=None, maxs=None):
from sklearn.preprocessing import MinMaxScaler
self.scaler_ = MinMaxScaler()
if mins is not None:
assert isinstance(mins, np.ndarray)
if mins.ndim == 1:
mins = mins.reshape(1, -1)
self.scaler_.partial_fit(mins)
self.mins_ = mins
else:
self.mins_ = None
if maxs is not None:
assert isinstance(maxs, np.ndarray)
if maxs.ndim == 1:
maxs = maxs.reshape(1, -1)
self.scaler_.partial_fit(maxs)
self.maxs_ = maxs
else:
self.maxs_ = None
if self.mins_ is not None and self.maxs_ is not None:
self.fitted_ = True
else:
self.fitted_ = False
def applyFeatures(dataset, delta):
"""
applies rolling mean and delayed returns to each dataframe in the list
"""
columns = dataset.columns
close = columns[-3]
returns = columns[-1]
for n in delta:
addFeatures(dataset, close, returns, n)
dataset = dataset.drop(dataset.index[0:max(delta)]) #drop NaN due to delta spanning
# normalize columns
scaler = preprocessing.MinMaxScaler()
return pd.DataFrame(scaler.fit_transform(dataset),\
columns=dataset.columns, index=dataset.index)
def prepare_faces():
data = sklearn.datasets.fetch_olivetti_faces('../data', shuffle=False)
X = data.data
y = data.target
X = np.split(X, 40)
y = np.split(y, 40)
X_train = [x[0:7, :] for x in X]
X_test = [x[7:, :] for x in X]
y_train = [a[0:7] for a in y]
y_test = [a[7:] for a in y]
X_train = np.concatenate(X_train)
X_test = np.concatenate(X_test)
y_train = pd.Series(np.concatenate(y_train))
y_test = pd.Series(np.concatenate(y_test))
scaler = MinMaxScaler(feature_range=(-1, 1))
X_train = pd.DataFrame(scaler.fit_transform(X_train))
X_test = pd.DataFrame(scaler.transform(X_test))
return X_train, y_train, X_test, y_test, scaler
def prepare_faces():
data = sklearn.datasets.fetch_olivetti_faces('../data', shuffle=False)
X = data.data
y = data.target
X = np.split(X, 40)
y = np.split(y, 40)
X_train = [x[0:7, :] for x in X]
X_test = [x[7:, :] for x in X]
y_train = [a[0:7] for a in y]
y_test = [a[7:] for a in y]
X_train = np.concatenate(X_train)
X_test = np.concatenate(X_test)
y_train = np.concatenate(y_train)
y_test = np.concatenate(y_test)
scaler = MinMaxScaler(feature_range=(-1, 1))
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)
return X_train, y_train, X_test, y_test, scaler
def get_term_topic(self, X):
n_features = X.shape[1]
id2word = self.vocabulary_
word2topic = {}
with open('word_topic.txt', 'r') as f:
for line in f:
strs = line.decode('utf-8').strip('\n').split('\t')
word2topic[strs[0]] = strs[2]
topic = np.zeros((len(id2word),))
for i, key in enumerate(id2word):
if key in word2topic:
topic[id2word[key]] = word2topic[key]
else:
print key
topic = preprocessing.MinMaxScaler().fit_transform(topic)
# topic = sp.spdiags(topic, diags=0, m=n_features,
# n=n_features, format='csr')
return topic
def get_term_topic(self, X):
n_features = X.shape[1]
id2word = self.vocabulary_
word2topic = {}
with open('word_topic.txt', 'r') as f:
for line in f:
strs = line.decode('utf-8').strip('\n').split('\t')
word2topic[strs[0]] = strs[2]
topic = np.zeros((len(id2word),))
for i, key in enumerate(id2word):
if key in word2topic:
topic[id2word[key]] = word2topic[key]
else:
print key
topic = preprocessing.MinMaxScaler().fit_transform(topic)
# topic = sp.spdiags(topic, diags=0, m=n_features,
# n=n_features, format='csr')
return topic
def test_persistence():
"""Make sure we can pickle it."""
X = iris.data # Use the iris features.
X = MinMaxScaler().fit_transform(X)
ae = Autoencoder(hidden_units=(1,),
n_epochs=1000,
random_state=4556,
learning_rate=1e-2,
keep_prob=1.0)
Xenc = ae.fit_transform(X)
b = BytesIO()
pickle.dump(ae, b)
ae_pickled = pickle.loads(b.getvalue())
Xenc_pickled = ae_pickled.transform(X)
assert_array_almost_equal(Xenc, Xenc_pickled)
def test_monitor_ae():
"""Test the monitor keyword."""
# Use the iris features.
X = iris.data
X = MinMaxScaler().fit_transform(X)
ae = Autoencoder(hidden_units=(3, 2,),
n_epochs=7500,
random_state=4556,
learning_rate=DEFAULT_LEARNING_RATE,
keep_prob=1.0,
hidden_activation=tf.nn.sigmoid,
encoding_activation=tf.nn.sigmoid,
output_activation=tf.nn.sigmoid)
def _monitor(epoch, est, stats):
assert epoch <= 1000, "The autoencoder has been running too long!"
if stats['loss'] < 0.2:
assert epoch > 10, "The autoencoder returned too soon!"
return True
else:
return False
ae.fit(X, monitor=_monitor)
def extract_train_and_validation_data(self,num_labels):
data = pd.read_csv(self.train_data_filename, header=0).values
# convert to Numpy array forms
feature_vec = data[0::,1::]
labels = data[0::,0]
# mean normalize features
min_max_scaler = preprocessing.MinMaxScaler()
feature_vec = min_max_scaler.fit_transform(feature_vec.T).T
# convert to one hot form for labels
labels_onehot = (np.arange(num_labels) == labels[:, None]).astype(np.float32)
# divide data into train and validation data
self.train_X, self.val_X, self.train_y, self.val_y = train_test_split(\
feature_vec, labels_onehot,
test_size=0.2, random_state=42)
def get_today_data_for_MLP(code):
'''
:param code:????
:return: ?????X
'''
import numpy as np
data_path = "./data/stock_data/"
oneDayLine, date = load_data_from_tushare(data_path + str(code) + '.csv')
volumn, volumn_dates = load_volume_from_tushare(data_path + str(code) + '.csv')
daynum = 5
X = []
ef = Extract_Features()
for i in range(daynum, len(date)):
X_delta = [oneDayLine[k] - oneDayLine[k - 1] for k in range(i - daynum, i)] + \
[volumn[k] for k in range(i - daynum, i)] + \
[float(ef.parse_weekday(date[i]))] + \
[float(ef.lunar_month(date[i]))] + \
[ef.rrr(date[i])] + \
[ef.MoneySupply(date[i])]
X.append(X_delta)
X = preprocessing.MinMaxScaler().fit_transform(X)
return np.array(X[-1])
def test_cutoff_inside_a_pipeline(data):
minmax_scaler = preprocessing.MinMaxScaler()
dsapp_cutoff = CutOff()
pipeline =Pipeline([
('minmax_scaler',minmax_scaler),
('dsapp_cutoff', dsapp_cutoff)
])
pipeline.fit(data['X_train'], data['y_train'])
X_fake_new_data = data['X_test'][-1,:].reshape(1,-1) + 0.5
mms = preprocessing.MinMaxScaler().fit(data['X_train'])
assert np.all(( mms.transform(X_fake_new_data) > 1 ) == (pipeline.transform(X_fake_new_data) == 1))
def test_dsapp_lr(data):
dsapp_lr = ScaledLogisticRegression()
dsapp_lr.fit(data['X_train'], data['y_train'])
minmax_scaler = preprocessing.MinMaxScaler()
dsapp_cutoff = CutOff()
lr = linear_model.LogisticRegression()
pipeline =Pipeline([
('minmax_scaler',minmax_scaler),
('dsapp_cutoff', dsapp_cutoff),
('lr', lr)
])
pipeline.fit(data['X_train'], data['y_train'])
assert np.all(dsapp_lr.predict(data['X_test']) == pipeline.predict(data['X_test']))
def compute_preprocessor(self,method):
self.data={}
if method=='none':
self.data=self.orig_data
elif method=='min_max':
transform=preprocessing.MinMaxScaler()
self.data['X_train']=transform.fit_transform(self.orig_data['X_train'])
self.data['X_val']=transform.transform(self.orig_data['X_val'])
self.data['X_test']=transform.transform(self.orig_data['X_test'])
elif method=='scaled':
self.data['X_train']=preprocessing.scale(self.orig_data['X_train'])
self.data['X_val']=preprocessing.scale(self.orig_data['X_val'])
self.data['X_test']=preprocessing.scale(self.orig_data['X_test'])
elif method=='normalized':
self.data['X_train']=preprocessing.normalize(self.orig_data['X_train'])
self.data['X_val']=preprocessing.normalize(self.orig_data['X_val'])
self.data['X_test']=preprocessing.normalize(self.orig_data['X_test'])
self.data['y_train']=self.orig_data['y_train']
self.data['y_val']=self.orig_data['y_val']
self.data['y_test']=self.orig_data['y_test']
def compute_preprocessor(self,method):
self.data={}
if method=='min_max':
transform=preprocessing.MinMaxScaler()
self.data['X_train']=transform.fit_transform(self.orig_data['X_train'])
self.data['X_val']=transform.transform(self.orig_data['X_val'])
self.data['X_test']=transform.transform(self.orig_data['X_test'])
elif method=='scaled':
self.data['X_train']=preprocessing.scale(self.orig_data['X_train'])
self.data['X_val']=preprocessing.scale(self.orig_data['X_val'])
self.data['X_test']=preprocessing.scale(self.orig_data['X_test'])
elif method=='normalized':
self.data['X_train']=preprocessing.normalize(self.orig_data['X_train'])
self.data['X_val']=preprocessing.normalize(self.orig_data['X_val'])
self.data['X_test']=preprocessing.normalize(self.orig_data['X_test'])
self.data['y_train']=self.orig_data['y_train']
self.data['y_val']=self.orig_data['y_val']
self.data['y_test']=self.orig_data['y_test']
def get_train_test( X, pca_order = 10):
X = X.astype('float32')
scaler = MinMaxScaler(feature_range=(0, 1))
X = scaler.fit_transform(X.reshape(-1,1)).reshape( X.shape)
if pca_order > 0:
pca = PCA(3)
X = pca.fit_transform(X)
X = pca.inverse_transform(X)
n_samples = X.shape[0]
train_size = int(n_samples * 0.67)
test_size = n_samples - train_size
train, test = X[0:train_size,:], X[train_size:n_samples,:]
return train, test, scaler