def collect_all_XY(self, root=0):
if self.mpi_comm is None:
XY = [self.obslayer.Y.copy()]
for l in self.layers: XY.append(l.X.copy())
return XY
else:
from mpi4py import MPI
from GPy.core.parameterization.variational import NormalPosterior
N,D = self.Y.shape
N_list = np.array(self.mpi_comm.allgather(N))
N_all = np.sum(N_list)
Y_all = np.empty((N_all,D)) if self.mpi_comm.rank==root else None
self.mpi_comm.Gatherv([self.Y, MPI.DOUBLE], [Y_all, (N_list*D, None), MPI.DOUBLE], root=root)
if self.mpi_comm.rank==root:
XY = [Y_all]
for l in self.layers:
Q = l.X.shape[1]
X_mean_all = np.empty((N_all,Q)) if self.mpi_comm.rank==root else None
self.mpi_comm.Gatherv([l.X.mean.values, MPI.DOUBLE], [X_mean_all, (N_list*Q, None), MPI.DOUBLE], root=root)
X_var_all = np.empty((N_all,Q)) if self.mpi_comm.rank==root else None
self.mpi_comm.Gatherv([l.X.variance.values, MPI.DOUBLE], [X_var_all, (N_list*Q, None), MPI.DOUBLE], root=root)
if self.mpi_comm.rank==root:
XY.append(NormalPosterior(X_mean_all, X_var_all))
if self.mpi_comm.rank==root: return XY
else: return None
python类DOUBLE的实例源码
def gather_array(data, mpi_comm, root=0, shape=0, dtype='float32'):
# gather 1D or 2D numpy arrays
assert isinstance(data, numpy.ndarray)
assert len(data.shape) < 3
# first we pass the data size
size = data.size
sizes = mpi_comm.gather(size, root=root) or []
# now we pass the data
displacements = [int(sum(sizes[:i])) for i in range(len(sizes))]
if dtype is 'float32':
gdata = numpy.empty(int(sum(sizes)), dtype=numpy.float32)
mpi_comm.Gatherv([data.flatten(), size, MPI.FLOAT], [gdata, (sizes, displacements), MPI.FLOAT], root=root)
elif dtype is 'float64':
gdata = numpy.empty(int(sum(sizes)), dtype=numpy.float64)
mpi_comm.Gatherv([data.flatten(), size, MPI.DOUBLE], [gdata, (sizes, displacements), MPI.DOUBLE], root=root)
elif dtype is 'int32':
gdata = numpy.empty(int(sum(sizes)), dtype=numpy.int32)
mpi_comm.Gatherv([data.flatten(), size, MPI.INT], [gdata, (sizes, displacements), MPI.INT], root=root)
elif dtype is 'int64':
gdata = numpy.empty(int(sum(sizes)), dtype=numpy.int64)
mpi_comm.Gatherv([data.flatten(), size, MPI.LONG], [gdata, (sizes, displacements), MPI.LONG], root=root)
if len(data.shape) == 1:
return gdata
else:
if shape == 0:
num_lines = data.shape[0]
if num_lines > 0:
return gdata.reshape((num_lines, gdata.size//num_lines))
else:
return gdata.reshape((0, gdata.shape[1]))
if shape == 1:
num_columns = data.shape[1]
if num_columns > 0:
return gdata.reshape((gdata.size//num_columns, num_columns))
else:
return gdata.reshape((gdata.shape[0], 0))
def _eval_models(self, models):
n = models.shape[0]
if self._mpi:
fit = np.zeros(n)
fit_mpi = np.zeros_like(fit)
self._mpi_comm.Barrier()
self._mpi_comm.Bcast([ models, MPI.DOUBLE ], root = 0)
for i in np.arange(self._mpi_rank, n, self._mpi_size):
fit_mpi[i] = self._func(self._unstandardize(models[i,:]))
self._mpi_comm.Barrier()
self._mpi_comm.Allreduce([ fit_mpi, MPI.DOUBLE ], [ fit, MPI.DOUBLE ],
op = MPI.SUM)
else:
fit = np.array([ self._func(self._unstandardize(models[i,:])) for i in range(n) ])
return fit
def gather(self,x,y):
# if self.myrank==0:
# print("gather",x.shape,self.sbuff.shape,self.rbuff.shape,self.np,self.mp,self.n1,self.m1)
for k in range(self.nbtimes):
self.localcomm.Allgatherv(x.ravel(),
[self.rbuff,self.sizes,self.offsets,MPI.DOUBLE])
b = self.rbuff.reshape( (self.mp,self.np,self.m,self.n))
buffertodomain(b,y,self.nh,self.m1,self.n1)
def numpy_to_MPI_typemap(np_type):
from mpi4py import MPI
typemap = {
np.dtype(np.float64) : MPI.DOUBLE,
np.dtype(np.float32) : MPI.FLOAT,
np.dtype(np.int) : MPI.INT,
np.dtype(np.int8) : MPI.CHAR,
np.dtype(np.uint8) : MPI.UNSIGNED_CHAR,
np.dtype(np.int32) : MPI.INT,
np.dtype(np.uint32) : MPI.UNSIGNED_INT,
}
return typemap[np_type]
def initialize(self, comm, config, args):
rank = comm.Get_rank()
size = comm.Get_size()
self.config = config
self.args = args
filename = args.struct_file[0]
self.struct_filename = filename
self.npoints,self.natoms = coord_reader.get_nframes_natoms(filename)
if coord_reader.supports_parallel_reading(filename):
# read coordinates in parallel
self.idxs_thread, self.npoints_per_thread, self.offsets_per_thread = p_index.get_idxs_thread(comm, self.npoints)
coords_thread = coord_reader.get_coordinates(filename, idxs=self.idxs_thread)
coords_ravel = coords_thread.ravel()
ravel_lengths, ravel_offsets = p_index.get_ravel_offsets(self.npoints_per_thread,self.natoms)
coordstemp = np.zeros(self.npoints*3*self.natoms, dtype='float')
start = MPI.Wtime()
comm.Allgatherv(coords_ravel, (coordstemp, ravel_lengths, ravel_offsets, MPI.DOUBLE))
self.coords = coordstemp.reshape((self.npoints,3,self.natoms))
else:
# serial reading
if rank == 0:
self.coords = coord_reader.get_coordinates(filename)
else:
self.coords = np.zeros((self.npoints,3,self.natoms),dtype=np.double)
comm.Bcast(self.coords, root=0)
logging.info('input coordinates loaded')
self.initialize_local_scale()
self.initialize_weights()
self.initialize_metric()
def _write_solocache_group_to_file(self, data_dict, group_prefix="/"):
if self._is_master() and group_prefix != "/" and group_prefix not in self._f:
self._f.create_group(group_prefix)
keys = data_dict.keys()
keys.sort()
for k in keys:
name = group_prefix + str(k)
if isinstance(data_dict[k], dict):
self._write_solocache_group_to_file(data_dict[k], group_prefix=name+"/")
else:
(data, op) = data_dict[k]
if op is not None:
if numpy.isscalar(data):
sendobj = numpy.array(data)
else:
sendobj = data
recvobj = numpy.empty_like(data)
log_debug(logger, self._log_prefix + "Reducing data %s" % (name))
self.comm.Reduce(
[sendobj, MPI.DOUBLE],
[recvobj, MPI.DOUBLE],
op = op,
root = 0
)
data = recvobj
if self._is_master():
log_debug(logger, self._log_prefix + "Writing data %s" % (name))
self._write_to_f(name, data)
def _gather_local_posterior(self, use_gather,
gather_size, gather_offset):
"""Gather/Gatherv local posterior
Parameters
----------
comm : object
MPI communication group
use_gather : boolean
Whether to use Gather or Gatherv
gather_size : 1D array
The size of each local posterior
gather_offset : 1D array
The offset of each local posterior
Returns
-------
HTFA
Returns the instance itself.
Notes
-----
We use numpy array rather than generic Python objects for MPI
communication because Gatherv is only supported for the former.
https://pythonhosted.org/mpi4py/usrman/tutorial.html
"""
if use_gather:
self.comm.Gather(self.local_posterior_,
self.gather_posterior, root=0)
else:
target = [
self.gather_posterior,
gather_size,
gather_offset,
MPI.DOUBLE]
self.comm.Gatherv(self.local_posterior_, target)
return self
def combine_global(grid,x):
""" return the global x array living on 'grid' """
comm = MPI.COMM_WORLD
nh = grid.nh
nv = grid.nv
mv = grid.mv
N = nv*mv
np = grid.np0
mp = grid.mp0
n= grid.n
m= grid.m
di,dj=n,m
di,dj=nv+2,mv+2
nv0 = di*np
mv0 = dj*mp
sizes = ones(np*mp)*N
offsets=arange(np*mp).reshape((mp,np)).ravel()*N
buff_loc=x.ravel().copy()
buff_loc[grid.msk.ravel()==0]=NaN
buff_glo=zeros(N*np*mp)
comm.Allgatherv(buff_loc,[buff_glo,sizes,offsets,MPI.DOUBLE])
I,J=meshgrid(arange(nv),arange(mv))
xglo = zeros((mv0,nv0))
for j in range(mp):
for i in range(np):
k=i+j*np
xglo[J+j*dj,I+i*di]=buff_glo[k*N:(k+1)*N].reshape((mv,nv))
return xglo
#----------------------------------------
# to check how the matrix looks like on the various grids
# interesting!