def predict(self, X):
check_is_fitted(self, ['X_', 'y_'])
X = check_array(X)
X = DynamicBayesianClassifier._first_col(X)
return self._predict(X)
python类check_array()的实例源码
def parallel_fit(self, X, y, client_token=None, est_timeout=None):
self.n_outputs_ = 1
self.classes_ = np.array(np.unique(check_array(y, ensure_2d=False,
allow_nd=True, dtype=None)))
if est_timeout is None:
est_timeout = int(1e6)
# Store X and y data for workers to use
with open(self.X_file.name, 'wb') as outfile:
pickle.dump(X, outfile, pickle.HIGHEST_PROTOCOL)
with open(self.y_file.name, 'wb') as outfile:
pickle.dump(y, outfile, pickle.HIGHEST_PROTOCOL)
sigopt_procs = []
for build_args in self.estimator_build_args:
# run separaete python process for each estimator with timeout
# these processes are wrapped in timeout command to capture case
# where a single observation never completes
sigopt_procs.append(Popen([
"timeout", str(est_timeout + 10), "python", sklearn_fit.__file__,
"--opt_timeout", str(est_timeout),
"--estimator", build_args['estimator'],
"--X_file", build_args['X_file'], "--y_file", build_args['y_file'],
"--client_token", client_token,
"--output_file", build_args['output_file']
]))
exit_codes = [p.wait() for p in sigopt_procs]
return_codes_args = zip(exit_codes, self.estimator_build_args)
# remove estimators that errored or timed out
valid_est_args = [rc_args[1] for rc_args in return_codes_args
if rc_args[0] == 0]
# load valid estimators back into memory
for est_arg in valid_est_args:
with open(est_arg['output_file'], 'rb') as infile:
clf = pickle.load(infile)
self.estimator_ensemble.append(clf)
def transform(self, X, mask=None):
"""Reduce X to the selected features.
Parameters
----------
X : array of shape [n_samples, n_features]
The input samples.
Returns
-------
X_r : array of shape [n_samples, n_selected_features]
The input samples with only the selected features.
"""
X = check_array(X, accept_sparse='csr')
if mask is None:
mask = self.get_support()
if not mask.any():
warn("No features were selected: either the data is"
" too noisy or the selection test too strict.",
UserWarning)
return np.empty(0).reshape((X.shape[0], 0))
if len(mask) != X.shape[1]:
raise ValueError("X has a different shape than during fitting.")
return X[:, self.safe_mask(X, mask)]
def transform(self, X, mask=None):
"""Reduce X to the selected features.
Parameters
----------
X : array of shape [n_samples, n_features]
The input samples.
Returns
-------
X_r : array of shape [n_samples, n_selected_features]
The input samples with only the selected features.
"""
X = check_array(X, accept_sparse='csr')
if mask is None:
mask = self.get_support()
if not mask.any():
warn("No features were selected: either the data is"
" too noisy or the selection test too strict.",
UserWarning)
return np.empty(0).reshape((X.shape[0], 0))
if len(mask) != X.shape[1]:
raise ValueError("X has a different shape than during fitting.")
return X[:, self.safe_mask(X, mask)]
def predict(self, x):
check_is_fitted(self, "coef_")
x = check_array(x)
return (self.intercept_ + x @ self.coef_nominator_) / (1 + x @ self.coef_denominator_)
def predict(self, X):
# scikit-learn checks
X = check_array(X)
return np.array([self._predict(x) for x in X])
def fit(self, x, y):
x = check_array(x)
_, self.n_out = y.reshape(y.shape[0], -1).shape
_, n_features = x.shape
terminals = [Symbol("x_{}".format(i)) for i in range(n_features)]
self.pset = create_pset(self.operators + terminals + self.constants)
cls = Cartesian(str(hash(self)), self.pset, n_rows=self.n_rows,
n_columns=self.n_columns, n_out=self.n_out, n_back=self.n_back)
self.res = oneplus(evaluate(x, y, self.metric), random_state=self.random_state, cls=cls, lambda_=self.lambda_,
max_iter=self.max_iter, max_nfev=self.max_nfev, f_tol=self.f_tol, n_jobs=self.n_jobs, seed=self.seed)
self.model = compile(self.res.expr)
return self
def predict(self, X):
"""Predict class for every sample in X.
Parameters
----------
X : array-like of shape = [n_samples, n_features_idx]
The input samples.
Returns
-------
y : array of shape = [n_samples]
"""
check_is_fitted(self, 'tree_')
X = check_array(X)
n_features = X.shape[1]
if n_features != self.n_features_:
raise ValueError("Number of features of the model must "
"match the input. Model n_features is {} and "
"input n_features is {}."
.format(self.n_features_, n_features))
X_ = np.empty(X.shape)
for i in range(self.n_features_):
if self.is_numerical_[i]:
X_[:, i] = X[:, i]
else:
try:
X_[:, i] = self.X_encoders_[i].transform(X[:, i])
except ValueError as e:
raise ValueError('New attribute value not found in '
'train data.')
y = self.builder_._predict(self.tree_, X_)
return self.y_encoder_.inverse_transform(y)
def estimate_seasonal_differencing_term(self, x):
"""Estimate the seasonal differencing term.
Parameters
----------
x : array-like, shape=(n_samples,)
The time series vector.
"""
if not self._base_case(x):
return 0
# ensure vector
x = column_or_1d(check_array(
x, ensure_2d=False, dtype=DTYPE,
force_all_finite=True)) # type: np.ndarray
n = x.shape[0]
m = int(self.m)
if n < 2 * m + 5:
return 0
chstat = self._sd_test(x, m)
crit_vals = c(0.4617146, 0.7479655, 1.0007818,
1.2375350, 1.4625240, 1.6920200,
1.9043096, 2.1169602, 2.3268562,
2.5406922, 2.7391007)
if m <= 12:
return int(chstat > crit_vals[m - 2]) # R does m - 1...
if m == 24:
return int(chstat > 5.098624)
if m == 52:
return int(chstat > 10.341416)
if m == 365:
return int(chstat > 65.44445)
return int(chstat > 0.269 * (m ** 0.928))
def _my_lrap(y_true, y_score):
"""Simple implementation of label ranking average precision"""
check_consistent_length(y_true, y_score)
y_true = check_array(y_true)
y_score = check_array(y_score)
n_samples, n_labels = y_true.shape
score = np.empty((n_samples, ))
for i in range(n_samples):
# The best rank correspond to 1. Rank higher than 1 are worse.
# The best inverse ranking correspond to n_labels.
unique_rank, inv_rank = np.unique(y_score[i], return_inverse=True)
n_ranks = unique_rank.size
rank = n_ranks - inv_rank
# Rank need to be corrected to take into account ties
# ex: rank 1 ex aequo means that both label are rank 2.
corr_rank = np.bincount(rank, minlength=n_ranks + 1).cumsum()
rank = corr_rank[rank]
relevant = y_true[i].nonzero()[0]
if relevant.size == 0 or relevant.size == n_labels:
score[i] = 1
continue
score[i] = 0.
for label in relevant:
# Let's count the number of relevant label with better rank
# (smaller rank).
n_ranked_above = sum(rank[r] <= rank[label] for r in relevant)
# Weight by the rank of the actual label
score[i] += n_ranked_above / rank[label]
score[i] /= relevant.size
return score.mean()
def _check_rows_and_columns(a, b):
"""Unpacks the row and column arrays and checks their shape."""
check_consistent_length(*a)
check_consistent_length(*b)
checks = lambda x: check_array(x, ensure_2d=False)
a_rows, a_cols = map(checks, a)
b_rows, b_cols = map(checks, b)
return a_rows, a_cols, b_rows, b_cols
def predict(self, X):
X = check_array(X)
return np.ones(X.shape[0])
def predict(self, X):
if not hasattr(self, 'coef_'):
raise CorrectNotFittedError("estimator is not fitted yet")
X = check_array(X)
return np.ones(X.shape[0])
def predict(self, X):
"""Perform classification on an array of test vectors X.
Parameters
----------
X : array-like, shape = (n_samples, n_features)
Returns
-------
C : array, shape = (n_samples,)
Predicted target values for X, values are from ``classes_``
"""
check_is_fitted(self, ["classes_", "n_classes_"])
X = check_array(X)
return self.base_estimator_.predict(X)
def run(self, data):
"""Compute biclustering.
Parameters
----------
data : numpy.ndarray
"""
data = check_array(data, dtype=np.double, copy=True)
self._validate_parameters()
num_rows, num_cols = data.shape
biclusters = []
for i, j in combinations(range(num_rows), 2):
cols, corr = self._find_cols(data[i], data[j])
if len(cols) >= self.min_cols and corr >= self.correlation_threshold:
rows = [i, j]
for k, r in enumerate(data):
if k != i and k != j and self._accept(data, rows, cols, r):
rows.append(k)
b = Bicluster(rows, cols)
if not self._exists(biclusters, b):
biclusters.append(b)
return Biclustering(biclusters)
def run(self, data):
"""Compute biclustering.
Parameters
----------
data : numpy.ndarray
"""
data = check_array(data, dtype=self._data_type, copy=True)
self._validate_parameters()
if self.__sleep:
sleep(1)
# some executables require the number of rows and columns of the dataset as an input argument
self._num_rows, self._num_cols = data.shape
# creating temp dir to store the executable's inputs and outputs
os.mkdir(self.__tmp_dir)
self._write_data(data)
os.system(self.__exec_comm.format(**self.__dict__))
biclustering = self._parse_output()
# removing temp dir
shutil.rmtree(self.__tmp_dir)
return biclustering
def run(self, data):
"""Compute biclustering.
Parameters
----------
data : numpy.ndarray
"""
data = check_array(data, dtype=np.double, copy=True)
self._validate_parameters()
residuals = np.copy(data)
num_rows, num_cols = residuals.shape
biclusters, layers = [], []
if self.fit_background_layer:
background_layer = self._create_layer(residuals)
layers.append(background_layer)
residuals -= background_layer
biclusters.append(Bicluster(np.arange(num_rows), np.arange(num_cols)))
for i in range(self.num_biclusters):
rows, cols, bicluster_layer = self._fit_layer(residuals)
if len(rows) == 0 or len(cols) == 0 or not self._is_significant(residuals, bicluster_layer):
break
residuals[rows[:, np.newaxis], cols] -= bicluster_layer
layers.append(bicluster_layer)
biclusters.append(Bicluster(rows, cols))
self._back_fitting(residuals, layers, biclusters)
biclustering = Biclustering(biclusters)
if self.fit_background_layer:
biclusters.pop(0)
return biclustering
def run(self, data):
"""Compute biclustering.
Parameters
----------
data : numpy.ndarray
"""
data = check_array(data, dtype=np.bool, copy=True)
self._validate_parameters()
data = [np.packbits(row) for row in data]
biclusters = []
patterns_found = set()
for ri, rj in combinations(data, 2):
pattern = np.bitwise_and(ri, rj)
pattern_cols = sum(popcount(int(n)) for n in pattern)
if pattern_cols >= self.min_cols and self._is_new(patterns_found, pattern):
rows = [k for k, r in enumerate(data) if self._match(pattern, r)]
if len(rows) >= self.min_rows:
cols = np.where(np.unpackbits(pattern) == 1)[0]
biclusters.append(Bicluster(rows, cols))
return Biclustering(biclusters)
def run(self, data):
"""Compute biclustering.
Parameters
----------
data : numpy.ndarray
"""
data = check_array(data, dtype=np.int, copy=True)
self._validate_parameters()
num_remaining_rows, num_cols = data.shape
remaining_rows = np.ones(num_remaining_rows, np.bool)
biclusters = []
for i in range(self.num_biclusters):
indices = np.where(remaining_rows)[0]
b = self._find_motif(data, indices)
biclusters.append(b)
remaining_rows[b.rows] = False
num_remaining_rows -= len(b.rows)
if num_remaining_rows == 0:
break
return Biclustering(biclusters)
def run(self, data):
"""Compute biclustering.
Parameters
----------
data : numpy.ndarray
"""
data = check_array(data, dtype=np.double, copy=True)
self._validate_parameters()
data = scale(data)
if self.transform:
data = np.sign(data) * np.log(1 + np.abs(data))
data = scale(data)
biclusters = []
for i in range(self.num_biclusters):
best, avg, score = max((self._find_bicluster(data) for i in range(self.randomized_searches)), key=itemgetter(-1))
if score < self.score_threshold:
break
data[np.ix_(best.rows, best.cols)] -= avg
biclusters.append(best)
return Biclustering(biclusters)