def _gen_init_n_blocks(na, nb, ka, kb):
num_nodes_a = np.arange(na)
n_blocks_a = map(len, np.array_split(num_nodes_a, ka))
num_nodes_b = np.arange(nb)
n_blocks_b = map(len, np.array_split(num_nodes_b, kb))
n_blocks_ = " ".join(map(str, n_blocks_a)) + " " + " ".join(map(str, n_blocks_b))
return n_blocks_
python类array_split()的实例源码
def gen_equal_partition(n, total):
all_nodes = np.arange(total)
n_blocks = list(map(len, np.array_split(all_nodes, n)))
return n_blocks
def run_par(self, function, **kwargs):
"""
Run a function on the agents in parallel.
"""
columns = kwargs["columns"] if "columns" in kwargs else self.agents.columns
# Garbage collect before creating new processes.
gc.collect()
return pd.concat(self.pool.imap(partial(function, **kwargs),
np.array_split(self.agents[columns],
self.processes * self.splits)))
def run_par(self, function, **kwargs):
"""
Run a function on the agents in parallel.
"""
columns = kwargs["columns"] if "columns" in kwargs else self.agents.columns
# Garbage collect before creating new processes.
gc.collect()
return pd.concat(self.pool.imap(partial(function, **kwargs),
np.array_split(self.agents[columns],
self.processes * self.splits)))
def split_in_chunks(minibatch, num_splits, flatten_keys=['labels']):
'''Return the splits per device
Return a list of dictionaries, one per device. Each dictionary
contains, for each key, the values that should be allocated on its
device.
'''
# Split the value of each key into chunks
for k, v in minibatch.iteritems():
minibatch[k] = np.array_split(v, num_splits)
if any(k == v for v in flatten_keys):
minibatch[k] = [el.flatten() for el in minibatch[k]]
return map(dict, zip(*[[(k, v) for v in value]
for k, value in minibatch.items()]))
def chunk_iterator(dataset, chunk_size=1000):
chunk_indices = np.array_split(np.arange(len(dataset)),
len(dataset)/chunk_size)
for chunk_ixs in chunk_indices:
chunk = dataset[chunk_ixs]
yield (chunk_ixs, chunk)
raise StopIteration
def array_split(ary, indices_or_sections, axis=0):
"""Splits an array into multiple sub arrays along a given axis.
This function is almost equivalent to :func:`cupy.split`. The only
difference is that this function allows an integer sections that does not
evenly divide the axis.
.. seealso:: :func:`cupy.split` for more detail, :func:`numpy.array_split`
"""
return core.array_split(ary, indices_or_sections, axis)
def split(ary, indices_or_sections, axis=0):
"""Splits an array into multiple sub arrays along a given axis.
Args:
ary (cupy.ndarray): Array to split.
indices_or_sections (int or sequence of ints): A value indicating how
to divide the axis. If it is an integer, then is treated as the
number of sections, and the axis is evenly divided. Otherwise,
the integers indicate indices to split at. Note that the sequence
on the device memory is not allowed.
axis (int): Axis along which the array is split.
Returns:
A list of sub arrays. Each array is a view of the corresponding input
array.
.. seealso:: :func:`numpy.split`
"""
if ary.ndim <= axis:
raise IndexError('Axis exceeds ndim')
size = ary.shape[axis]
if numpy.isscalar(indices_or_sections):
if size % indices_or_sections != 0:
raise ValueError(
'indices_or_sections must divide the size along the axes.\n'
'If you want to split the array into non-equally-sized '
'arrays, use array_split instead.')
return array_split(ary, indices_or_sections, axis)
def iterbatches(arrays, *, num_batches=None, batch_size=None, shuffle=True, include_final_partial_batch=True):
assert (num_batches is None) != (batch_size is None), 'Provide num_batches or batch_size, but not both'
arrays = tuple(map(np.asarray, arrays))
n = arrays[0].shape[0]
assert all(a.shape[0] == n for a in arrays[1:])
inds = np.arange(n)
if shuffle: np.random.shuffle(inds)
sections = np.arange(0, n, batch_size)[1:] if num_batches is None else num_batches
for batch_inds in np.array_split(inds, sections):
if include_final_partial_batch or len(batch_inds) == batch_size:
yield tuple(a[batch_inds] for a in arrays)
def trim_data(data, resolution):
r = []
for i in numpy.array_split(data, resolution):
if len(i) > 0:
r.append(numpy.average(i))
return r
def test_latlon2pix_edges(pix_size_single, origin_point, is_flipped,
num_chunks, chunk_position):
img = make_image(pix_size_single, origin_point, is_flipped,
num_chunks, chunk_position)
chunk_idx = img.chunk_idx
res_x = img._full_res[0]
res_y = img._full_res[1]
pix_size = (img.pixsize_x, img.pixsize_y)
origin = (img._start_lon, img._start_lat)
# compute chunks
lons = np.arange(res_x + 1) * pix_size[0] + origin[0] # right edge +1
all_lats = np.arange(res_y) * pix_size[1] + origin[1]
lats_chunks = np.array_split(all_lats, num_chunks)[chunk_idx]
pix_x = np.concatenate((np.arange(res_x), [res_x - 1]))
pix_y_chunks = range(lats_chunks.shape[0])
if chunk_position == 'end':
pix_y = np.concatenate((pix_y_chunks, [pix_y_chunks[-1]]))
lats = np.concatenate((lats_chunks, [res_y * pix_size[1] + origin[1]]))
else:
pix_y = pix_y_chunks
lats = lats_chunks
d = np.array([[a, b] for a in lons for b in lats])
xy = img.lonlat2pix(d)
true_xy = np.array([[a, b] for a in pix_x for b in pix_y])
assert np.all(xy == true_xy)
def split_cfold(nsamples, k=5, seed=None):
"""
Function that returns indices for splitting data into random folds.
Parameters
----------
nsamples: int
the number of samples in the dataset
k: int, optional
the number of folds
seed: int, optional
random seed to provide to numpy
Returns
-------
cvinds: list
list of arrays of length k, each with approximate shape (nsamples /
k,) of indices. These indices are randomly permuted (without
replacement) of assignments to each fold.
cvassigns: ndarray
array of shape (nsamples,) with each element in [0, k), that can be
used to assign data to a fold. This corresponds to the indices of
cvinds.
"""
np.random.seed(seed)
pindeces = np.random.permutation(nsamples)
cvinds = np.array_split(pindeces, k)
cvassigns = np.zeros(nsamples, dtype=int)
for n, inds in enumerate(cvinds):
cvassigns[inds] = n
return cvinds, cvassigns
def fit(self, x, y, *args, **kwargs):
# set a different random seed for each thread
np.random.seed(self.random_state + mpiops.chunk_index)
if self.parallel:
process_rfs = np.array_split(range(self.forests),
mpiops.chunks)[mpiops.chunk_index]
else:
process_rfs = range(self.forests)
for t in process_rfs:
print('training forest {} using '
'process {}'.format(t, mpiops.chunk_index))
# change random state in each forest
self.kwargs['random_state'] = np.random.randint(0, 10000)
rf = RandomForestTransformed(
target_transform=self.target_transform,
n_estimators=self.n_estimators,
**self.kwargs
)
rf.fit(x, y)
if self.parallel: # used in training
pk_f = join(self.temp_dir,
'rf_model_{}.pk'.format(t))
else: # used when parallel is false, i.e., during x-val
pk_f = join(self.temp_dir,
'rf_model_{}_{}.pk'.format(t, mpiops.chunk_index))
with open(pk_f, 'wb') as fp:
pickle.dump(rf, fp)
if self.parallel:
mpiops.comm.barrier()
# Mark that we are now trained
self._trained = True
def kmean_distance2(x, C):
"""Compute squared euclidian distance to the nearest cluster centre
Parameters
----------
x : ndarray
(n, d) array of n d-dimensional points
C : ndarray
(k, d) array of k cluster centres
Returns
-------
d2_x : ndarray
(n,) length array of distances from each x to the nearest centre
"""
# To save memory we partition the computation
nsplits = max(1, int(x.shape[0]/distance_partition_size))
splits = np.array_split(x, nsplits)
d2_x = np.empty(x.shape[0])
idx = 0
for x_i in splits:
n_i = x_i.shape[0]
D2_x = scipy.spatial.distance.cdist(x_i, C, metric='sqeuclidean')
d2_x[idx:idx + n_i] = np.amin(D2_x, axis=1)
idx += n_i
return d2_x
def compute_weights(x, C):
"""Number of points in x assigned to each centre c in C
Parameters
----------
x : ndarray
(n, d) array of n d-dimensional points
C : ndarray
(k, d) array of k cluster centres
Returns
-------
weights : ndarray
(k,) length array giving number of x closest to each c in C
"""
nsplits = max(1, int(x.shape[0]/distance_partition_size))
splits = np.array_split(x, nsplits)
closests = np.empty(x.shape[0], dtype=int)
idx = 0
for x_i in splits:
n_i = x_i.shape[0]
D2_x = scipy.spatial.distance.cdist(x_i, C, metric='sqeuclidean')
closests[idx: idx+n_i] = np.argmin(D2_x, axis=1)
idx += n_i
weights = np.bincount(closests, minlength=C.shape[0])
return weights
def reseed_point(X, C, index):
""" Re-initialise the centre of a class if it loses all its members
This should almost never happen. If it does, find the point furthest
from all the other cluster centres and use that. Maybe a bad idea but
a decent first pass
Parameters
----------
X : ndarray
(n, d) array of points
C : ndarray
(k, d) array of cluster centres
index : int >= 0
index between 0..k-1 of the cluster that has lost it's points
Returns
-------
new_point : ndarray
d-dimensional point for replacing the empty cluster centre.
"""
log.info("Reseeding class with no members")
nsplits = max(1, int(X.shape[0]/distance_partition_size))
splits = np.array_split(X, nsplits)
empty_index = np.ones(C.shape[0], dtype=bool)
empty_index[index] = False
local_candidate = None
local_cost = 1e23
for x_i in splits:
D2_x = scipy.spatial.distance.cdist(x_i, C, metric='sqeuclidean')
costs = np.sum(D2_x[:, empty_index], axis=1)
potential_idx = np.argmax(costs)
potential_cost = costs[potential_idx]
if potential_cost < local_cost:
local_candidate = x_i[potential_idx]
local_cost = potential_cost
best_pernode = mpiops.comm.allgather(local_cost)
best_node = np.argmax(best_pernode)
new_point = mpiops.comm.bcast(local_candidate, root=best_node)
return new_point
def __init__(self, shape, bbox, crs, name, n_subchunks, outputdir,
band_tags=None):
# affine
self.A, _, _ = image.bbox2affine(bbox[1, 0], bbox[0, 0],
bbox[0, 1], bbox[1, 1],
shape[0], shape[1])
self.shape = shape
self.outbands = len(band_tags)
self.bbox = bbox
self.name = name
self.outputdir = outputdir
self.n_subchunks = n_subchunks
self.sub_starts = [k[0] for k in np.array_split(
np.arange(self.shape[1]),
mpiops.chunks * self.n_subchunks)]
# file tags don't have spaces
if band_tags:
file_tags = ["_".join(k.lower().split()) for k in band_tags]
else:
file_tags = [str(k) for k in range(self.outbands)]
band_tags = file_tags
if mpiops.chunk_index == 0:
# create a file for each band
self.files = []
for band in range(self.outbands):
output_filename = os.path.join(outputdir, name + "_" +
file_tags[band] + ".tif")
f = rasterio.open(output_filename, 'w', driver='GTiff',
width=self.shape[0], height=self.shape[1],
dtype=np.float32, count=1,
crs=crs,
transform=self.A,
nodata=self.nodata_value)
f.update_tags(1, image_type=band_tags[band])
self.files.append(f)
def gdalaverage(input_dir, out_dir, size):
"""
average data using gdal's averaging method.
Parameters
----------
input_dir: str
input dir name of the tifs that needs to be averaged
out_dir: str
output dir name
size: int, optional
size of kernel
Returns
-------
"""
input_dir = abspath(input_dir)
log.info('Reading tifs from {}'.format(input_dir))
tifs = glob.glob(join(input_dir, '*.tif'))
process_tifs = np.array_split(tifs, mpiops.chunks)[mpiops.chunk_index]
for tif in process_tifs:
data_set = gdal.Open(tif, gdal.GA_ReadOnly)
# band = data_set.GetRasterBand(1)
# data_type = gdal.GetDataTypeName(band.DataType)
# data = band.ReadAsArray()
# no_data_val = band.GetNoDataValue()
# averaged_data = filter_data(data, size, no_data_val)
log.info('Calculated average for {}'.format(basename(tif)))
output_file = join(out_dir, 'average_' + basename(tif))
src_gt = data_set.GetGeoTransform()
tmp_file = '/tmp/tmp_{}.tif'.format(mpiops.chunk_index)
resample_cmd = [TRANSLATE] + [tif, tmp_file] + \
['-tr', str(src_gt[1]*size), str(src_gt[1]*size)] + \
['-r', 'bilinear']
check_call(resample_cmd)
rollback_cmd = [TRANSLATE] + [tmp_file, output_file] + \
['-tr', str(src_gt[1]), str(src_gt[1])]
check_call(rollback_cmd)
log.info('Finished converting {}'.format(basename(tif)))
def mean(input_dir, out_dir, size, func, partitions, mask):
input_dir = abspath(input_dir)
if isdir(input_dir):
log.info('Reading tifs from {}'.format(input_dir))
tifs = glob.glob(join(input_dir, '*.tif'))
else:
assert isfile(input_dir)
tifs = [input_dir]
process_tifs = np.array_split(tifs, mpiops.chunks)[mpiops.chunk_index]
for tif in process_tifs:
log.info('Starting to average {}'.format(basename(tif)))
treat_file(tif, out_dir, size, func, partitions, mask)
log.info('Finished averaging {}'.format(basename(tif)))
def inspect(input_dir, report_file, partitions, extension):
input_dir = abspath(input_dir)
if isdir(input_dir):
log.info('Reading tifs from {}'.format(input_dir))
tifs = glob.glob(join(input_dir, '*.' + extension))
else:
log.info('Reporting geoinfo for {}'.format(input_dir))
tifs = [input_dir]
with open(report_file, 'w', newline='') as csvfile:
writer = csv.writer(csvfile, dialect='excel')
writer.writerow(['FineName', 'band', 'NoDataValue', 'rows', 'cols',
'Min', 'Max', 'Mean', 'Std',
'DataType', 'Categories', 'NanCount'])
process_tifs = np.array_split(tifs, mpiops.chunks)[mpiops.chunk_index]
stats = [] # process geotiff stats including multibanded geotif
for t in process_tifs:
stats.append(get_stats(t, partitions))
# gather all process geotif stats in stats dict
stats = _join_dicts(stats)
# global gather in root
stats = _join_dicts(mpiops.comm.gather(stats, root=0))
if mpiops.chunk_index == 0:
for k, v in stats.items():
write_rows(v, writer)