def mpi_moments(x, axis=0):
x = np.asarray(x, dtype='float64')
newshape = list(x.shape)
newshape.pop(axis)
n = np.prod(newshape,dtype=int)
totalvec = np.zeros(n*2+1, 'float64')
addvec = np.concatenate([x.sum(axis=axis).ravel(),
np.square(x).sum(axis=axis).ravel(),
np.array([x.shape[axis]],dtype='float64')])
MPI.COMM_WORLD.Allreduce(addvec, totalvec, op=MPI.SUM)
sum = totalvec[:n]
sumsq = totalvec[n:2*n]
count = totalvec[2*n]
if count == 0:
mean = np.empty(newshape); mean[:] = np.nan
std = np.empty(newshape); std[:] = np.nan
else:
mean = sum/count
std = np.sqrt(np.maximum(sumsq/count - np.square(mean),0))
return mean, std, count
python类SUM的实例源码
def mpi_sum(value):
global_sum = np.zeros(1, dtype='float64')
local_sum = np.sum(np.array(value)).astype('float64')
MPI.COMM_WORLD.Reduce(local_sum, global_sum, op=MPI.SUM)
return global_sum[0]
def norm(self,x):
""" norm = sum(x*x) """
nbduplicates = (self.np0*self.mp0)/(self.np*self.mp)
# computenorm is done in Fortran
self.typenorm='l2'
t0 = time()
# MPI.COMM_WORLD.Barrier()
if self.typenorm=='l2':
local_sum = computenorm(self.msk,x,self.nh)
t1 = time()
self.time['norm']+=t1-t0
self.ncalls['norm']+=1
z=MPI.COMM_WORLD.allreduce(local_sum,op=MPI.SUM)/ nbduplicates
z=sqrt(z)
t0 = time()
self.time['reduce']+=t0-t1
self.ncalls['reduce']+=1
if self.typenorm=='inf':
local_z= computemax(self.msk,x,self.nh)
t1 = time()
self.time['norm']+=t1-t0
self.ncalls['norm']+=1
z=MPI.COMM_WORLD.allreduce(local_z,op=MPI.MAX)
t0 = time()
self.time['reduce']+=t0-t1
self.ncalls['reduce']+=1
return z
def all_reduce_params(sent_shared_params, rec_buffers, average_cnt = 1):
from mpi4py import MPI
mpi_communicator = MPI.COMM_WORLD
commu_time = 0.0
gpu2cpu_cp_time = 0.0
for (sent_model, rec_model) in zip(sent_shared_params, rec_buffers):
cp_start = time.time()
model_val = sent_model.get_value()
gpu2cpu_cp_time += time.time() - cp_start
commu_start = time.time()
mpi_communicator.Allreduce([model_val, MPI.FLOAT], [rec_model, MPI.FLOAT], op=MPI.SUM)
commu_time += time.time() - commu_start
if average_cnt != 1: #try to avoid dividing since it is very cost
rec_model = rec_model / average_cnt
cp_start = time.time()
sent_model.set_value(rec_model)
gpu2cpu_cp_time += time.time() - cp_start
return commu_time, gpu2cpu_cp_time
def __init__(self, comm, matrix_thread, k, **kargs):
self.comm = comm
self.matrix_thread = matrix_thread
self.nrows_thread = np.shape(self.matrix_thread)[0]
n = comm.allreduce(self.nrows_thread, op=MPI.SUM)
tp = matrix_thread.dtype.char
_ArpackParams.__init__(self, n, k, tp, **kargs) # arpack.py l.311
self.bmat = 'I'
self.B = lambda x: x
self.workd = np.zeros(3 * n, self.tp)
self.workl = np.zeros(self.ncv * (self.ncv + 8), self.tp)
ltr = _type_conv[self.tp]
if ltr not in ["s", "d"]:
raise ValueError("Input matrix is not real-valued.")
self._arpack_solver = _arpack.__dict__[ltr + 'saupd']
self._arpack_extract = _arpack.__dict__[ltr + 'seupd']
self.ipntr = np.zeros(11, "int")
self.iterate_infodict = _SAUPD_ERRORS[ltr]
def mpi_moments(x, axis=0):
x = np.asarray(x, dtype='float64')
newshape = list(x.shape)
newshape.pop(axis)
n = np.prod(newshape,dtype=int)
totalvec = np.zeros(n*2+1, 'float64')
addvec = np.concatenate([x.sum(axis=axis).ravel(),
np.square(x).sum(axis=axis).ravel(),
np.array([x.shape[axis]],dtype='float64')])
MPI.COMM_WORLD.Allreduce(addvec, totalvec, op=MPI.SUM)
sum = totalvec[:n]
sumsq = totalvec[n:2*n]
count = totalvec[2*n]
if count == 0:
mean = np.empty(newshape); mean[:] = np.nan
std = np.empty(newshape); std[:] = np.nan
else:
mean = sum/count
std = np.sqrt(np.maximum(sumsq/count - np.square(mean),0))
return mean, std, count
def update(self, x):
x = x.astype('float64')
n = int(np.prod(self.shape))
totalvec = np.zeros(n*2+1, 'float64')
addvec = np.concatenate([x.sum(axis=0).ravel(), np.square(x).sum(axis=0).ravel(), np.array([len(x)],dtype='float64')])
MPI.COMM_WORLD.Allreduce(addvec, totalvec, op=MPI.SUM)
self.incfiltparams(totalvec[0:n].reshape(self.shape), totalvec[n:2*n].reshape(self.shape), totalvec[2*n])
def update(self, localg, stepsize):
if self.t % 100 == 0:
self.check_synced()
localg = localg.astype('float32')
globalg = np.zeros_like(localg)
self.comm.Allreduce(localg, globalg, op=MPI.SUM)
if self.scale_grad_by_procs:
globalg /= self.comm.Get_size()
self.t += 1
a = stepsize * np.sqrt(1 - self.beta2**self.t)/(1 - self.beta1**self.t)
self.m = self.beta1 * self.m + (1 - self.beta1) * globalg
self.v = self.beta2 * self.v + (1 - self.beta2) * (globalg * globalg)
step = (- a) * self.m / (np.sqrt(self.v) + self.epsilon)
self.setfromflat(self.getflat() + step)
def _eval_models(self, models):
n = models.shape[0]
if self._mpi:
fit = np.zeros(n)
fit_mpi = np.zeros_like(fit)
self._mpi_comm.Barrier()
self._mpi_comm.Bcast([ models, MPI.DOUBLE ], root = 0)
for i in np.arange(self._mpi_rank, n, self._mpi_size):
fit_mpi[i] = self._func(self._unstandardize(models[i,:]))
self._mpi_comm.Barrier()
self._mpi_comm.Allreduce([ fit_mpi, MPI.DOUBLE ], [ fit, MPI.DOUBLE ],
op = MPI.SUM)
else:
fit = np.array([ self._func(self._unstandardize(models[i,:])) for i in range(n) ])
return fit
def nodal_values(self, f):
""" Convert dolfin function to nodal values. """
farray = f.vector().array()
fdim = len(farray)/len(self.indices)
farray = farray.reshape((len(self.indices), fdim))
arr = np.zeros((len(self.nodes), fdim))
arr_loc = np.zeros_like(arr)
for i, fval in zip(self.indices, farray):
arr_loc[i, :] = fval
comm.Allreduce(arr_loc, arr, op=MPI.SUM)
return arr
def sum(self, x_local):
return self._reduce(x_local, MPI.SUM)
def sum_at_root(self, x_local):
return self._reduce_at_root(x_local, MPI.SUM)
def mpi_average_gradients(self,arr,num_replicas=None):
if num_replicas == None:
num_replicas = self.num_workers
if self.task_index >= num_replicas:
arr *= 0.0
arr_global = np.empty_like(arr)
self.comm.Allreduce(arr,arr_global,op=MPI.SUM)
arr_global /= num_replicas
return arr_global
def mpi_sum_scalars(self,val,num_replicas=None):
if num_replicas == None:
num_replicas = self.num_workers
if self.task_index >= num_replicas:
val *= 0.0
val_global = 0.0
val_global = self.comm.allreduce(val,op=MPI.SUM)
return val_global
def update(self, x):
x = x.astype('float64')
n = int(np.prod(self.shape))
totalvec = np.zeros(n*2+1, 'float64')
addvec = np.concatenate([x.sum(axis=0).ravel(), np.square(x).sum(axis=0).ravel(), np.array([len(x)],dtype='float64')])
MPI.COMM_WORLD.Allreduce(addvec, totalvec, op=MPI.SUM)
self.incfiltparams(totalvec[0:n].reshape(self.shape), totalvec[n:2*n].reshape(self.shape), totalvec[2*n])
def update(self, localg, stepsize):
if self.t % 100 == 0:
self.check_synced()
localg = localg.astype('float32')
globalg = np.zeros_like(localg)
self.comm.Allreduce(localg, globalg, op=MPI.SUM)
if self.scale_grad_by_procs:
globalg /= self.comm.Get_size()
self.t += 1
a = stepsize * np.sqrt(1 - self.beta2**self.t)/(1 - self.beta1**self.t)
self.m = self.beta1 * self.m + (1 - self.beta1) * globalg
self.v = self.beta2 * self.v + (1 - self.beta2) * (globalg * globalg)
step = (- a) * self.m / (np.sqrt(self.v) + self.epsilon)
self.setfromflat(self.getflat() + step)
def _init_w_transforms(data, features, random_states, comm=MPI.COMM_SELF):
"""Initialize the mappings (Wi) for the SRM with random orthogonal matrices.
Parameters
----------
data : list of 2D arrays, element i has shape=[voxels_i, samples]
Each element in the list contains the fMRI data of one subject.
features : int
The number of features in the model.
random_states : list of `RandomState`s
One `RandomState` instance per subject.
comm : mpi4py.MPI.Intracomm
The MPI communicator containing the data
Returns
-------
w : list of array, element i has shape=[voxels_i, features]
The initialized orthogonal transforms (mappings) :math:`W_i` for each
subject.
voxels : list of int
A list with the number of voxels per subject.
Note
----
This function assumes that the numpy random number generator was
initialized.
Not thread safe.
"""
w = []
subjects = len(data)
voxels = np.empty(subjects, dtype=int)
# Set Wi to a random orthogonal voxels by features matrix
for subject in range(subjects):
if data[subject] is not None:
voxels[subject] = data[subject].shape[0]
rnd_matrix = random_states[subject].random_sample((
voxels[subject], features))
q, r = np.linalg.qr(rnd_matrix)
w.append(q)
else:
voxels[subject] = 0
w.append(None)
voxels = comm.allreduce(voxels, op=MPI.SUM)
return w, voxels
def fit(self, X, y=None):
"""Compute the probabilistic Shared Response Model
Parameters
----------
X : list of 2D arrays, element i has shape=[voxels_i, samples]
Each element in the list contains the fMRI data of one subject.
y : not used
"""
logger.info('Starting Probabilistic SRM')
# Check the number of subjects
if len(X) <= 1:
raise ValueError("There are not enough subjects "
"({0:d}) to train the model.".format(len(X)))
# Check for input data sizes
number_subjects = len(X)
number_subjects_vec = self.comm.allgather(number_subjects)
for rank in range(self.comm.Get_size()):
if number_subjects_vec[rank] != number_subjects:
raise ValueError(
"Not all ranks have same number of subjects")
# Collect size information
shape0 = np.zeros((number_subjects,), dtype=np.int)
shape1 = np.zeros((number_subjects,), dtype=np.int)
for subject in range(number_subjects):
if X[subject] is not None:
assert_all_finite(X[subject])
shape0[subject] = X[subject].shape[0]
shape1[subject] = X[subject].shape[1]
shape0 = self.comm.allreduce(shape0, op=MPI.SUM)
shape1 = self.comm.allreduce(shape1, op=MPI.SUM)
# Check if all subjects have same number of TRs
number_trs = np.min(shape1)
for subject in range(number_subjects):
if shape1[subject] < self.features:
raise ValueError(
"There are not enough samples to train the model with "
"{0:d} features.".format(self.features))
if shape1[subject] != number_trs:
raise ValueError("Different number of samples between subjects"
".")
# Run SRM
self.sigma_s_, self.w_, self.mu_, self.rho2_, self.s_ = self._srm(X)
return self
def create_bcs(Lx, Ly, mesh, grid_spacing, rad, num_obstacles,
surface_charge, solutes, enable_NS, enable_EC,
p_lagrange, V_lagrange,
**namespace):
""" The boundaries and boundary conditions are defined here. """
data = np.loadtxt(
"meshes/periodic_porous_Lx{}_Ly{}_rad{}_N{}_dx{}.dat".format(
Lx, Ly, rad, num_obstacles, grid_spacing))
centroids = data[:, :2]
rad = data[:, 2]
# Find a single node to pin pressure to
x_loc = np.array(mesh.coordinates())
x_proc = np.zeros((size, 2))
ids_notboun = np.logical_and(
x_loc[:, 0] > x_loc[:, 0].min() + df.DOLFIN_EPS,
x_loc[:, 1] > x_loc[:, 1].min() + df.DOLFIN_EPS)
x_loc = x_loc[ids_notboun, :]
d2 = (x_loc[:, 0]+Lx/2)**2 + (x_loc[:, 1]+Ly/2)**2
x_bottomleft = x_loc[d2 == d2.min()][0]
x_proc[rank, :] = x_bottomleft
x_pin = np.zeros_like(x_proc)
comm.Allreduce(x_proc, x_pin, op=MPI.SUM)
x_pin = x_pin[x_pin[:, 0] == x_pin[:, 0].min(), :][0]
info("Pinning point: {}".format(x_pin))
pin_code = ("x[0] < {x}+{eps} && "
"x[0] > {x}-{eps} && "
"x[1] > {y}-{eps} && "
"x[1] < {y}+{eps}").format(
x=x_pin[0], y=x_pin[1], eps=1e-3)
boundaries = dict(
obstacles=[Obstacles(Lx, centroids, rad, grid_spacing)]
)
# Allocating the boundary dicts
bcs = dict()
bcs_pointwise = dict()
for boundary in boundaries:
bcs[boundary] = dict()
noslip = Fixed((0., 0.))
if enable_NS:
bcs["obstacles"]["u"] = noslip
if not p_lagrange:
bcs_pointwise["p"] = (0., pin_code)
if enable_EC:
bcs["obstacles"]["V"] = Charged(surface_charge)
if not V_lagrange:
bcs_pointwise["V"] = (0., pin_code)
return boundaries, bcs, bcs_pointwise
def create_bcs(Lx, Ly, mesh,
surface_charge, solutes, enable_NS, enable_EC,
V_lagrange, p_lagrange,
**namespace):
""" The boundaries and boundary conditions are defined here. """
# Find a single node to pin pressure to
x_loc = np.array(mesh.coordinates())
x_proc = np.zeros((size, 2))
ids_notboun = np.logical_and(
x_loc[:, 0] > x_loc[:, 0].min() + df.DOLFIN_EPS,
x_loc[:, 1] > x_loc[:, 1].min() + df.DOLFIN_EPS)
x_loc = x_loc[ids_notboun, :]
d2 = (x_loc[:, 0]-Lx/2)**2 + (x_loc[:, 1]-Ly/2)**2
x_bottomleft = x_loc[d2 == d2.min()][0]
x_proc[rank, :] = x_bottomleft
x_pin = np.zeros_like(x_proc)
comm.Allreduce(x_proc, x_pin, op=MPI.SUM)
x_pin = x_pin[x_pin[:, 0] == x_pin[:, 0].min(), :][0]
info("Pinning point: {}".format(x_pin))
pin_code = ("x[0] < {x}+{eps} && "
"x[0] > {x}-{eps} && "
"x[1] > {y}-{eps} && "
"x[1] < {y}+{eps}").format(
x=x_pin[0], y=x_pin[1], eps=1e-3)
boundaries = dict(
obstacles=[Wall()]
)
# Allocating the boundary dicts
bcs = dict()
bcs_pointwise = dict()
for boundary in boundaries:
bcs[boundary] = dict()
noslip = Fixed((0., 0.))
if enable_NS:
bcs["obstacles"]["u"] = noslip
if not p_lagrange:
bcs_pointwise["p"] = (0., pin_code)
if enable_EC:
bcs["obstacles"]["V"] = Charged(surface_charge)
if not V_lagrange:
bcs_pointwise["V"] = (0., pin_code)
return boundaries, bcs, bcs_pointwise