def optimize_training_parameters(self, n):
# data
from_timestamp = self.min_timestamp
to_timestamp = self.min_timestamp + datetime.timedelta(days=365) + datetime.timedelta(hours=1)
train_timestamps, train_values = self.load_monitor_data(from_timestamp, to_timestamp, "1")
train_data = np.array(train_values)[:, 0:5]
# parameters
nu = np.linspace(start=1e-5, stop=1e-2, num=n)
gamma = np.linspace(start=1e-6, stop=1e-3, num=n)
opt_diff = 1.0
opt_nu = None
opt_gamma = None
fw = open("training_param.csv", "w")
fw.write("nu,gamma,diff\n")
for i in range(len(nu)):
for j in range(len(gamma)):
classifier = svm.OneClassSVM(kernel="rbf", nu=nu[i], gamma=gamma[j])
classifier.fit(train_data)
label = classifier.predict(train_data)
p = 1 - float(sum(label == 1.0)) / len(label)
diff = math.fabs(p-nu[i])
if diff < opt_diff:
opt_diff = diff
opt_nu = nu[i]
opt_gamma = gamma[j]
fw.write(",".join([str(nu[i]), str(gamma[j]), str(diff)]) + "\n")
fw.close()
return opt_nu, opt_gamma
python类OneClassSVM()的实例源码
def test_oneclass_decision_function():
# Test OneClassSVM decision function
clf = svm.OneClassSVM()
rnd = check_random_state(2)
# Generate train data
X = 0.3 * rnd.randn(100, 2)
X_train = np.r_[X + 2, X - 2]
# Generate some regular novel observations
X = 0.3 * rnd.randn(20, 2)
X_test = np.r_[X + 2, X - 2]
# Generate some abnormal novel observations
X_outliers = rnd.uniform(low=-4, high=4, size=(20, 2))
# fit the model
clf = svm.OneClassSVM(nu=0.1, kernel="rbf", gamma=0.1)
clf.fit(X_train)
# predict things
y_pred_test = clf.predict(X_test)
assert_greater(np.mean(y_pred_test == 1), .9)
y_pred_outliers = clf.predict(X_outliers)
assert_greater(np.mean(y_pred_outliers == -1), .9)
dec_func_test = clf.decision_function(X_test)
assert_array_equal((dec_func_test > 0).ravel(), y_pred_test == 1)
dec_func_outliers = clf.decision_function(X_outliers)
assert_array_equal((dec_func_outliers > 0).ravel(), y_pred_outliers == 1)
def detect_outlier(data_train, measurement):
"""
Detect whether the input measurement is outlier or not.
:param data_train: data for training the one class SVM model
:param measurement: one row from the chill_untested.csv
:return: predicted label for input measurement
"""
classifier = svm.OneClassSVM(kernel="rbf", nu=0.005, gamma=0.00001)
classifier.fit(data_train)
label = classifier.predict(measurement)[0]
return label
def fit(self, X):
clf = svm.OneClassSVM(nu=0.5, kernel="rbf", gamma=0.9)
clf.fit(X)
self.clf = clf
def __init__(self, param_dict={}):
self.param_dict = param_dict
print self.__class__.__name__, self.param_dict
self.cls = OneClassSVM(**param_dict)
def learn_structure(self, samples):
X_train, X_test = self._generate_train_test_sets(samples, 0.75)
logger.info("Training with " + str(len(X_train)) +
"samples; testing with " + str(len(X_test)) + " samples.")
svm_detector = svm.OneClassSVM(nu=0.95 * OUTLIERS_FRACTION + 0.05,
kernel="rbf", gamma=0.1)
svm_detector.fit(X_train)
Y_test = svm_detector.predict(X_test)
num_anomalies = Y_test[Y_test == -1].size
logger.info("Found " + str(num_anomalies) +
" anomalies in testing set")
return svm_detector
def test_learn_structure(self):
data = self.get_testing_data()
clf = self.svm.learn_structure(data)
self.assertIsInstance(clf, svm.OneClassSVM)
def fit(self):
global isFitted
isFitted = True
print "fit the model"
train = np.array(self.model.data)
X = train[:, 0:2]
y = train[:, 2]
lam = float(self.complexity.get())
gamma = float(self.gamma.get())
coef0 = float(self.coef0.get())
degree = int(self.degree.get())
kernel_map = {0: "linear", 1: "rbf", 2: "poly"}
#if len(np.unique(y)) == 1:
# clf = svm.OneClassSVM(kernel=kernel_map[self.kernel.get()],
# gamma=gamma, coef0=coef0, degree=degree)
# clf.fit(X)
#else:
#mysvm = svm.SVC(kernel=kernel_map[self.kernel.get()], C=1000,
# gamma=gamma, coef0=coef0, degree=degree)
#mysvm.fit(X, y)
#l = 0.1;
clf = komd.KOMD(lam=lam, Kf=kernel_map[self.kernel.get()], rbf_gamma=gamma, poly_deg=degree, poly_coeff=coef0)
clf.fit(X,y)
#print clf.gamma
#global gamma, bias
#gamma = clf.gamma
#bias = clf.bias
if hasattr(clf, 'score'):
print "Accuracy:", clf.score(X, y) * 100
X1, X2, Z = self.decision_surface(clf)
self.model.clf = clf
#self.model.svm = mysvm
self.clf = clf
#self.mysvm = mysvm
self.model.set_surface((X1, X2, Z))
self.model.surface_type = self.surface_type.get()
self.fitted = True
self.model.changed("surface")
def fit(self):
print("fit the model")
train = np.array(self.model.data)
X = train[:, 0:2]
y = train[:, 2]
C = float(self.complexity.get())
gamma = float(self.gamma.get())
coef0 = float(self.coef0.get())
degree = int(self.degree.get())
kernel_map = {0: "linear", 1: "rbf", 2: "poly"}
if len(np.unique(y)) == 1:
clf = svm.OneClassSVM(kernel=kernel_map[self.kernel.get()],
gamma=gamma, coef0=coef0, degree=degree)
clf.fit(X)
else:
clf = svm.SVC(kernel=kernel_map[self.kernel.get()], C=C,
gamma=gamma, coef0=coef0, degree=degree)
clf.fit(X, y)
if hasattr(clf, 'score'):
print("Accuracy:", clf.score(X, y) * 100)
X1, X2, Z = self.decision_surface(clf)
self.model.clf = clf
self.model.set_surface((X1, X2, Z))
self.model.surface_type = self.surface_type.get()
self.fitted = True
self.model.changed("surface")
def test_oneclass():
# Test OneClassSVM
clf = svm.OneClassSVM()
clf.fit(X)
pred = clf.predict(T)
assert_array_almost_equal(pred, [-1, -1, -1])
assert_array_almost_equal(clf.intercept_, [-1.008], decimal=3)
assert_array_almost_equal(clf.dual_coef_,
[[0.632, 0.233, 0.633, 0.234, 0.632, 0.633]],
decimal=3)
assert_raises(ValueError, lambda: clf.coef_)
def test_immutable_coef_property():
# Check that primal coef modification are not silently ignored
svms = [
svm.SVC(kernel='linear').fit(iris.data, iris.target),
svm.NuSVC(kernel='linear').fit(iris.data, iris.target),
svm.SVR(kernel='linear').fit(iris.data, iris.target),
svm.NuSVR(kernel='linear').fit(iris.data, iris.target),
svm.OneClassSVM(kernel='linear').fit(iris.data),
]
for clf in svms:
assert_raises(AttributeError, clf.__setattr__, 'coef_', np.arange(3))
assert_raises((RuntimeError, ValueError),
clf.coef_.__setitem__, (0, 0), 0)
def check_svm_model_equal(dense_svm, sparse_svm, X_train, y_train, X_test):
dense_svm.fit(X_train.toarray(), y_train)
if sparse.isspmatrix(X_test):
X_test_dense = X_test.toarray()
else:
X_test_dense = X_test
sparse_svm.fit(X_train, y_train)
assert_true(sparse.issparse(sparse_svm.support_vectors_))
assert_true(sparse.issparse(sparse_svm.dual_coef_))
assert_array_almost_equal(dense_svm.support_vectors_,
sparse_svm.support_vectors_.toarray())
assert_array_almost_equal(dense_svm.dual_coef_, sparse_svm.dual_coef_.toarray())
if dense_svm.kernel == "linear":
assert_true(sparse.issparse(sparse_svm.coef_))
assert_array_almost_equal(dense_svm.coef_, sparse_svm.coef_.toarray())
assert_array_almost_equal(dense_svm.support_, sparse_svm.support_)
assert_array_almost_equal(dense_svm.predict(X_test_dense), sparse_svm.predict(X_test))
assert_array_almost_equal(dense_svm.decision_function(X_test_dense),
sparse_svm.decision_function(X_test))
assert_array_almost_equal(dense_svm.decision_function(X_test_dense),
sparse_svm.decision_function(X_test_dense))
if isinstance(dense_svm, svm.OneClassSVM):
msg = "cannot use sparse input in 'OneClassSVM' trained on dense data"
else:
assert_array_almost_equal(dense_svm.predict_proba(X_test_dense),
sparse_svm.predict_proba(X_test), 4)
msg = "cannot use sparse input in 'SVC' trained on dense data"
if sparse.isspmatrix(X_test):
assert_raise_message(ValueError, msg, dense_svm.predict, X_test)
def predict(self, nu, gamma):
# classifier
classifier = svm.OneClassSVM(kernel="rbf", nu=nu, gamma=gamma)
# data for test
from_timestamp = self.min_timestamp + datetime.timedelta(days=365)
to_timestamp = self.max_timestamp
test_timestamps, test_values = self.load_monitor_data(from_timestamp, to_timestamp, "nan")
test_data = np.array(test_values)[:, 0:5]
# data for train
to_timestamp = self.min_timestamp + datetime.timedelta(days=365) + datetime.timedelta(hours=1)
train_timestamps, train_values = self.load_monitor_data(self.min_timestamp, to_timestamp, "1")
for i in range(len(test_timestamps)):
# predict
train_data = np.array(train_values)[:, 0:5]
classifier.fit(train_data)
label = classifier.predict(test_data[i])[0]
test_values[i][5] = int(label)
if label == 1:
test_values[i][6] = 0.0
train_values.append(test_values[i])
else:
test_values[i][6] = 1.0
print test_timestamps[i], label, test_values[i]
# write result into monitor file
fr = open(self.monitor_file, "r")
header = fr.readline()
lines = fr.readlines()
fr.close()
fw = open(self.monitor_file, "w") # update monitor file
fw.write(header)
for line in lines:
timestamp = datetime.datetime.strptime(line.strip().split(",")[0], "%Y-%m-%d %H:%M:%S")
if timestamp in test_timestamps:
idx = test_timestamps.index(timestamp)
value = test_values[idx]
timestamp = str(timestamp)
temperature = str(value[0])
ph = str(value[1])
conductivity = str(value[2])
orp = str(value[3])
do = str(value[4])
label = str(int(value[5]))
outlier_prob = str(value[6])
event_prob = str(value[7])
m = [timestamp, temperature, ph, conductivity, orp, do, label, outlier_prob, event_prob]
fw.write(",".join(m) + "\n")
else:
fw.write(line)
fw.close()
def __init__(self, num_class=2):
"""
:type num_classes: int
:rtype: None
"""
self.__ctrl__ = None
self.__case__ = None
with open('../../.dbname', 'r') as f:
self.__DB_NAME__ = json.load(f)['dbname']
self.__MG_DOCS_COLL__ = 'raw-docs' # raw docs
self.__MG_SENTS_COLL__ = 'bag-of-sents' # raw sentences
self.__MG_TOKENS_COLL__ = 'sample-tokens' # clean tokens (words)
self.__PG_STATS_TBL__ = 'stats' # stylometric features
self.__PG_RESULTS_TBL__ = 'results_' + \
str(num_class) + \
'class' # cross val results
self.__PG_PROBAS_TBL__ = 'probabilities' # cross val probabilities
self.__model__ = Pipeline([ \
# ('scaler2', StandardScaler()),
# ('scaler', MinMaxScaler()),
# ('scaler3', Normalizer()),
('classifier', SVC(probability=True,
kernel='poly',
degree=2,
class_weight='balanced') \
if num_class-1 \
else OneClassSVM(kernel='rbf',
nu=0.7,
gamma=1./250))
])
print 'Instantiated classifier %s.' % \
self.__model__.named_steps['classifier'].__class__.__name__
self.__io__ = DBIO(MG_DB_NAME=self.__DB_NAME__,
PG_DB_NAME=self.__DB_NAME__)
self.__tagger__ = None # initialise if re-creating samples
self.__bootstrap__ = None # initialise in fit
def fit(self, author1, author2, wts1=None, wts2=None,
bootstrap=False, verbose=False):
"""
:type author1: str
:type author2: str
:type wts1: str/List[str]
:type wts2: str/List[str]
:type verbose:bool
:rtype: bool
:
: Prepares databases and tables/collections.
:
"""
self.__bootstrap__ = bootstrap
cases = []
for i, (author, wts) in enumerate([(author1, wts1), (author2, wts2)]):
if not wts:
wts = [wt.encode('ascii') \
for wt in self.__io__.mg_distinct(self.__MG_DOCS_COLL__,
'type',
{ 'author':author } )]
if not isinstance(wts, list):
wts = [wts]
cases += (author, wts, (1,-1)[i]), # use 1, -1 to match output
# from sklearn's OneClassSVM
self.__ctrl__ = cases[0] # assign label 1 in y vector
self.__case__ = cases[1] # assign be label 0 in y vector
self.__MG_TOKENS_COLL__ += '-' + cases[0][0] + \
'-' + cases[1][0] + \
'-' + \
''.join(wt[:3] for wt in cases[0][1]) + \
'-' + \
''.join(wt[:3] for wt in cases[1][1]) + \
'-' + \
('nobs','bs')[bootstrap]
self.__PG_STATS_TBL__ += '_' + cases[0][0] + \
'_' + cases[1][0] + \
'_' + \
''.join(wt[:3] for wt in cases[0][1]) + \
'_' + \
''.join(wt[:3] for wt in cases[1][1]) + \
'_' + \
('nobs','bs')[bootstrap]
if verbose:
print 'Control:', self.__ctrl__
print 'Case: ', self.__case__
print 'Saving tokens to', self.__MG_TOKENS_COLL__
print 'Saving stats to', self.__PG_STATS_TBL__
return self.__prep_sents__(verbose=verbose) # err in preparing sentences