def test_argpartition_empty_array(self):
# check axis handling for multidimensional empty arrays
a = np.array([])
a.shape = (3, 2, 1, 0)
for axis in range(-a.ndim, a.ndim):
msg = 'test empty array argpartition with axis={0}'.format(axis)
assert_equal(np.partition(a, 0, axis=axis),
np.zeros_like(a, dtype=np.intp), msg)
msg = 'test empty array argpartition with axis=None'
assert_equal(np.partition(a, 0, axis=None),
np.zeros_like(a.ravel(), dtype=np.intp), msg)
python类partition()的实例源码
def test_partition_unicode_kind(self):
d = np.arange(10)
k = b'\xc3\xa4'.decode("UTF8")
assert_raises(ValueError, d.partition, 2, kind=k)
assert_raises(ValueError, d.argpartition, 2, kind=k)
def test_partition_out_of_range(self):
# Test out of range values in kth raise an error, gh-5469
d = np.arange(10)
assert_raises(ValueError, d.partition, 10)
assert_raises(ValueError, d.partition, -11)
# Test also for generic type partition, which uses sorting
# and used to not bound check kth
d_obj = np.arange(10, dtype=object)
assert_raises(ValueError, d_obj.partition, 10)
assert_raises(ValueError, d_obj.partition, -11)
def test_partition_empty_array(self):
# check axis handling for multidimensional empty arrays
a = np.array([])
a.shape = (3, 2, 1, 0)
for axis in range(-a.ndim, a.ndim):
msg = 'test empty array partition with axis={0}'.format(axis)
assert_equal(np.partition(a, 0, axis=axis), a, msg)
msg = 'test empty array partition with axis=None'
assert_equal(np.partition(a, 0, axis=None), a.ravel(), msg)
def test_argpartition_empty_array(self):
# check axis handling for multidimensional empty arrays
a = np.array([])
a.shape = (3, 2, 1, 0)
for axis in range(-a.ndim, a.ndim):
msg = 'test empty array argpartition with axis={0}'.format(axis)
assert_equal(np.partition(a, 0, axis=axis),
np.zeros_like(a, dtype=np.intp), msg)
msg = 'test empty array argpartition with axis=None'
assert_equal(np.partition(a, 0, axis=None),
np.zeros_like(a.ravel(), dtype=np.intp), msg)
def test_partition_unicode_kind(self):
d = np.arange(10)
k = b'\xc3\xa4'.decode("UTF8")
assert_raises(ValueError, d.partition, 2, kind=k)
assert_raises(ValueError, d.argpartition, 2, kind=k)
def test_sort_degraded(self):
# test degraded dataset would take minutes to run with normal qsort
d = np.arange(1000000)
do = d.copy()
x = d
# create a median of 3 killer where each median is the sorted second
# last element of the quicksort partition
while x.size > 3:
mid = x.size // 2
x[mid], x[-2] = x[-2], x[mid]
x = x[:-2]
assert_equal(np.sort(d), do)
assert_equal(d[np.argsort(d)], do)
def test_partition_out_of_range(self):
# Test out of range values in kth raise an error, gh-5469
d = np.arange(10)
assert_raises(ValueError, d.partition, 10)
assert_raises(ValueError, d.partition, -11)
# Test also for generic type partition, which uses sorting
# and used to not bound check kth
d_obj = np.arange(10, dtype=object)
assert_raises(ValueError, d_obj.partition, 10)
assert_raises(ValueError, d_obj.partition, -11)
def test_partition_empty_array(self):
# check axis handling for multidimensional empty arrays
a = np.array([])
a.shape = (3, 2, 1, 0)
for axis in range(-a.ndim, a.ndim):
msg = 'test empty array partition with axis={0}'.format(axis)
assert_equal(np.partition(a, 0, axis=axis), a, msg)
msg = 'test empty array partition with axis=None'
assert_equal(np.partition(a, 0, axis=None), a.ravel(), msg)
def test_argpartition_empty_array(self):
# check axis handling for multidimensional empty arrays
a = np.array([])
a.shape = (3, 2, 1, 0)
for axis in range(-a.ndim, a.ndim):
msg = 'test empty array argpartition with axis={0}'.format(axis)
assert_equal(np.partition(a, 0, axis=axis),
np.zeros_like(a, dtype=np.intp), msg)
msg = 'test empty array argpartition with axis=None'
assert_equal(np.partition(a, 0, axis=None),
np.zeros_like(a.ravel(), dtype=np.intp), msg)
def test_partition_fuzz(self):
# a few rounds of random data testing
for j in range(10, 30):
for i in range(1, j - 2):
d = np.arange(j)
np.random.shuffle(d)
d = d % np.random.randint(2, 30)
idx = np.random.randint(d.size)
kth = [0, idx, i, i + 1]
tgt = np.sort(d)[kth]
assert_array_equal(np.partition(d, kth)[kth], tgt,
err_msg="data: %r\n kth: %r" % (d, kth))
def _check_branching(X,Xsamples,restart,threshold=0.25):
""" Check whether time series branches.
Args:
X (np.array): current time series data.
Xsamples (np.array): list of previous branching samples.
restart (int): counts number of restart trials.
threshold (float, optional): sets threshold for attractor
identification.
Returns:
check = true if branching realization, Xsamples = updated list
"""
check = True
if restart == 0:
Xsamples.append(X)
else:
for Xcompare in Xsamples:
Xtmax_diff = np.absolute(X[-1,:] - Xcompare[-1,:])
# If the second largest element is smaller than threshold
# set check to False, i.e. at least two elements
# need to change in order to have a branching.
# If we observe all parameters of the system,
# a new attractor state must involve changes in two
# variables.
if np.partition(Xtmax_diff,-2)[-2] < threshold:
check = False
if check:
Xsamples.append(X)
if not check:
logg.m('realization {}:'.format(restart), 'no new branch', v=4)
else:
logg.m('realization {}:'.format(restart), 'new branch', v=4)
return check, Xsamples
def _trimmed_mean_1d(arr, k):
"""Calculate trimmed mean on a 1d array.
Trim values largest than the k'th largest value or smaller than the k'th
smallest value
Parameters
----------
arr: ndarray, shape (n,)
The one-dimensional input array to perform trimmed mean on
k: int
The thresholding order for trimmed mean
Returns
-------
trimmed_mean: float
The trimmed mean calculated
"""
kth_smallest = np.partition(arr, k)[k-1]
kth_largest = -np.partition(-arr, k)[k-1]
cnt = 0
summation = 0.0
for elem in arr:
if elem >= kth_smallest and elem <= kth_largest:
cnt += 1
summation += elem
return summation / cnt
def test_partition_out_of_range(self):
# Test out of range values in kth raise an error, gh-5469
d = np.arange(10)
assert_raises(ValueError, d.partition, 10)
assert_raises(ValueError, d.partition, -11)
# Test also for generic type partition, which uses sorting
# and used to not bound check kth
d_obj = np.arange(10, dtype=object)
assert_raises(ValueError, d_obj.partition, 10)
assert_raises(ValueError, d_obj.partition, -11)
def test_partition_empty_array(self):
# check axis handling for multidimensional empty arrays
a = np.array([])
a.shape = (3, 2, 1, 0)
for axis in range(-a.ndim, a.ndim):
msg = 'test empty array partition with axis={0}'.format(axis)
assert_equal(np.partition(a, 0, axis=axis), a, msg)
msg = 'test empty array partition with axis=None'
assert_equal(np.partition(a, 0, axis=None), a.ravel(), msg)
def test_argpartition_empty_array(self):
# check axis handling for multidimensional empty arrays
a = np.array([])
a.shape = (3, 2, 1, 0)
for axis in range(-a.ndim, a.ndim):
msg = 'test empty array argpartition with axis={0}'.format(axis)
assert_equal(np.partition(a, 0, axis=axis),
np.zeros_like(a, dtype=np.intp), msg)
msg = 'test empty array argpartition with axis=None'
assert_equal(np.partition(a, 0, axis=None),
np.zeros_like(a.ravel(), dtype=np.intp), msg)
def test_partition_unicode_kind(self):
d = np.arange(10)
k = b'\xc3\xa4'.decode("UTF8")
assert_raises(ValueError, d.partition, 2, kind=k)
assert_raises(ValueError, d.argpartition, 2, kind=k)
def prepare(self, simulation):
num_tasks = len(self.tasks)
# build ECT matrix
ECT = np.zeros((num_tasks, len(self.hosts)))
for t, task in enumerate(self.tasks):
stage_in = task.parents[0]
for h, host in enumerate(self.hosts):
if stage_in.amount > 0:
ect = stage_in.get_ecomt(self.master, host) + task.get_eet(host)
else:
ect = task.get_eet(host)
ECT[t][h] = ect
# print(ECT)
# build schedule
task_idx = np.arange(num_tasks)
for _ in range(0, len(self.tasks)):
min_hosts = np.argmin(ECT, axis=1)
min_times = ECT[np.arange(ECT.shape[0]), min_hosts]
if self.strategy == ListHeuristic.MIN_FIRST:
t = np.argmin(min_times)
elif self.strategy == ListHeuristic.MAX_FIRST:
t = np.argmax(min_times)
elif self.strategy == ListHeuristic.SUFFERAGE:
if ECT.shape[1] > 1:
min2_times = np.partition(ECT, 1)[:,1]
sufferages = min2_times - min_times
t = np.argmax(sufferages)
else:
t = np.argmin(min_times)
task = self.tasks[int(task_idx[t])]
h = int(min_hosts[t])
host = self.hosts[h]
ect = min_times[t]
self.host_tasks[host.name].append(task)
logging.debug("%s -> %s" % (task.name, host.name))
task_idx = np.delete(task_idx, t)
ECT = np.delete(ECT, t, 0)
stage_in = task.parents[0]
if stage_in.amount > 0:
task_ect = stage_in.get_ecomt(self.master, host) + task.get_eet(host)
else:
task_ect = task.get_eet(host)
ECT[:,h] += task_ect
# print(ECT)
def recommend(self, ids: Sequence[int],
predictions: numpy.ndarray,
n: int=1, diversity: float=0.5) -> Sequence[int]:
"""Recommends an instance to label.
Notes
-----
Assumes predictions are probabilities of positive binary label.
Parameters
----------
ids
Sequence of IDs in the unlabelled data pool.
predictions
N x 1 x C array of predictions. The ith row must correspond with the
ith ID in the sequence.
n
Number of recommendations to make.
diversity
Recommendation diversity in [0, 1].
Returns
-------
Sequence[int]
IDs of the instances to label.
"""
if predictions.shape[1] != 1:
raise ValueError('Uncertainty sampling must have one predictor')
assert len(ids) == predictions.shape[0]
# x* = argmin p(y1^ | x) - p(y2^ | x) where yn^ = argmax p(yn | x)
# (Settles 2009).
partitioned = numpy.partition(predictions, -2, axis=2)
most_likely = partitioned[:, 0, -1]
second_most_likely = partitioned[:, 0, -2]
assert most_likely.shape == (len(ids),)
scores = 1 - (most_likely - second_most_likely)
indices = choose_boltzmann(self._db.read_features(ids), scores, n,
temperature=diversity * 2)
return [ids[i] for i in indices]
# For safe string-based access to recommender classes.