def mpi_moments(x, axis=0):
x = np.asarray(x, dtype='float64')
newshape = list(x.shape)
newshape.pop(axis)
n = np.prod(newshape,dtype=int)
totalvec = np.zeros(n*2+1, 'float64')
addvec = np.concatenate([x.sum(axis=axis).ravel(),
np.square(x).sum(axis=axis).ravel(),
np.array([x.shape[axis]],dtype='float64')])
MPI.COMM_WORLD.Allreduce(addvec, totalvec, op=MPI.SUM)
sum = totalvec[:n]
sumsq = totalvec[n:2*n]
count = totalvec[2*n]
if count == 0:
mean = np.empty(newshape); mean[:] = np.nan
std = np.empty(newshape); std[:] = np.nan
else:
mean = sum/count
std = np.sqrt(np.maximum(sumsq/count - np.square(mean),0))
return mean, std, count
python类COMM_WORLD的实例源码
def __init__(self, comm=None, loadbalance=False, debug=False,
wait_on_start = True, exit_on_end = True,
cores_per_task = 1, **kwargs):
if MPI is None:
raise ImportError("Please install mpi4py")
self.comm = MPI.COMM_WORLD if comm is None else comm
self.rank = self.comm.Get_rank()
if cores_per_task > 1:
self.size = max(1, self.comm.Get_size() // cores_per_task)
else:
self.size = self.comm.Get_size() - 1
self.function = _error_function
self.loadbalance = loadbalance
self.debug = debug
if self.size == 0:
raise ValueError("Tried to create an MPI pool, but there "
"was only one MPI process available. "
"Need at least two.")
self.exit_on_end = exit_on_end
# Enter main loop for workers?
if wait_on_start:
if self.is_worker():
self.wait()
def test_runningmeanstd():
comm = MPI.COMM_WORLD
np.random.seed(0)
for (triple,axis) in [
((np.random.randn(3), np.random.randn(4), np.random.randn(5)),0),
((np.random.randn(3,2), np.random.randn(4,2), np.random.randn(5,2)),0),
((np.random.randn(2,3), np.random.randn(2,4), np.random.randn(2,4)),1),
]:
x = np.concatenate(triple, axis=axis)
ms1 = [x.mean(axis=axis), x.std(axis=axis), x.shape[axis]]
ms2 = mpi_moments(triple[comm.Get_rank()],axis=axis)
for (a1,a2) in zipsame(ms1, ms2):
print(a1, a2)
assert np.allclose(a1, a2)
print("ok!")
def is_mpi_env():
"""
Test if current environment is MPI or not
"""
try:
import mpi4py
except ImportError:
return False
try:
import mpi4py.MPI
except ImportError:
return False
if mpi4py.MPI.COMM_WORLD.size == 1 and mpi4py.MPI.COMM_WORLD.rank == 0:
return False
return True
def smart_read_json(mpi_mode, json_file):
"""
read json file under mpi and multi-processing environment
"""
if not mpi_mode:
json_obj = read_json_file(json_file)
else:
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
if rank == 0:
json_obj = read_json_file(json_file)
else:
json_obj = None
json_obj = comm.bcast(json_obj, root=0)
return json_obj
def smooth(self,x,b,nite):
for l in range(self.nbsmooth):
for k in range(nite):
t0 = time()
# MPI.COMM_WORLD.Barrier()
# we apply the smoothing twice
smoothtwicewitha(self.msk,self.A,x,b,self.omega,self.yo)
# smoothoncewitha(self.msk,self.A,x,b,self.omega,self.yo)
# smoothoncewitha(self.msk,self.A,x,b,self.omega,self.yo)
t1 = time()
self.time['smooth']+=t1-t0
self.ncalls['smooth']+=1
# MPI.COMM_WORLD.Barrier()
t0 = time()
self.time['barrier']+=t0-t1
self.ncalls['barrier']+=1
self.halo.fill(x)
t1 = time()
self.time['halo']+=t1-t0
self.ncalls['halo']+=1
def residual(self,x,b,r):
for l in range(self.nbresidual):
t0 = time()
# MPI.COMM_WORLD.Barrier()
computeresidualwitha(self.msk,self.A,x,b,r)
t1 = time()
self.time['res']+=t1-t0
self.ncalls['res']+=1
# MPI.COMM_WORLD.Barrier()
t0 = time()
self.time['barrier']+=t0-t1
self.ncalls['barrier']+=1
self.halo.fill(r)
t1 = time()
self.time['halo']+=t1-t0
self.ncalls['halo']+=1
def sum(self,x):
""" sum(x) """
t0 = time()
# MPI.COMM_WORLD.Barrier()
nbduplicates = (self.np0*self.mp0)/(self.np*self.mp)
local_sum = computesum(self.msk,x,self.nh)
t1 = time()
self.time['sum']+=t1-t0
self.ncalls['sum']+=1
global_sum=array(MPI.COMM_WORLD.allgather(local_sum))
t0 = time()
self.time['reduce']+=t0-t1
self.ncalls['reduce']+=1
return global_sum.sum() / nbduplicates
#----------------------------------------
def coarsetofine(coarse,fine,x,y):
""" input = x is on coarse / output = y is on fine """
# these are function outside the object Grid
# because it works on two different grids ...
for k in range(coarse.nbcoarsetofine):
if coarse.flag=='peak':
t0 = time()
coarse.subdom.split(x,y)
t1 = time()
coarse.time['split']+=t1-t0
coarse.ncalls['split']+=1
else:
t0 = time()
interpolate(fine.msk,coarse.msk,x,fine.nh,y)
t1 = time()
fine.time['interpolate']+=t1-t0
fine.ncalls['interpolate']+=1
if coarse.nh<=2:
fine.halo.fill(y)
t0 = time()
fine.time['halo']+=t0-t1
fine.ncalls['halo']+=1
# else there is enough points in the halo to skip filling!
# MPI.COMM_WORLD.Barrier()
def __init__(self, master_node_ranks=[0]):
self.comm = MPI.COMM_WORLD
self.size = self.comm.Get_size()
self.rank = self.comm.Get_rank()
if self.size < 2:
raise ValueError('A minimum of 2 ranks are required for the MPI backend')
#Set the global backend
globals()['backend'] = self
#Call the appropriate constructors and pass the required data
if self.rank == 0:
super().__init__(master_node_ranks)
else:
super().__init__()
raise Exception("Slaves exitted main loop.")
def setUpModule():
'''
If an exception is raised in a setUpModule then none of
the tests in the module will be run.
This is useful because the slaves run in a while loop on initialization
only responding to the master's commands and will never execute anything else.
On termination of master, the slaves call quit() that raises a SystemExit().
Because of the behaviour of setUpModule, it will not run any unit tests
for the slave and we now only need to write unit-tests from the master's
point of view.
'''
global rank,backend_mpi
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
backend_mpi = BackendMPI()
def __init__(self):
# Delayed ImportError of mpi4py
if not HAS_MPI4PY:
raise ImportError("No module named mpi4py, you must install mpi4py to use MPIMIgration!")
super(MPIMigration, self).__init__()
self.comm = MPI.COMM_WORLD
self.pid = self.comm.rank
if self.pid == 0:
self.source = self.comm.size - 1
else:
self.source = self.comm.rank - 1
self.dest = (self.comm.rank + 1) % (self.comm.size)
self.all_stars = None
# -----------------------------------------------------------------
def __init__(self):
# Delayed ImportError of mpi4py
if not HAS_MPI4PY:
raise ImportError("No module named mpi4py, you must install mpi4py to use MPIMIgration!")
super(MPIMigration, self).__init__()
self.comm = MPI.COMM_WORLD
self.pid = self.comm.rank
if self.pid == 0:
self.source = self.comm.size - 1
else:
self.source = self.comm.rank - 1
self.dest = (self.comm.rank + 1) % (self.comm.size)
self.all_stars = None
# -----------------------------------------------------------------
def all_reduce_params(sent_shared_params, rec_buffers, average_cnt = 1):
from mpi4py import MPI
mpi_communicator = MPI.COMM_WORLD
commu_time = 0.0
gpu2cpu_cp_time = 0.0
for (sent_model, rec_model) in zip(sent_shared_params, rec_buffers):
cp_start = time.time()
model_val = sent_model.get_value()
gpu2cpu_cp_time += time.time() - cp_start
commu_start = time.time()
mpi_communicator.Allreduce([model_val, MPI.FLOAT], [rec_model, MPI.FLOAT], op=MPI.SUM)
commu_time += time.time() - commu_start
if average_cnt != 1: #try to avoid dividing since it is very cost
rec_model = rec_model / average_cnt
cp_start = time.time()
sent_model.set_value(rec_model)
gpu2cpu_cp_time += time.time() - cp_start
return commu_time, gpu2cpu_cp_time
def get_times(reader, readPath):
"""Read the time values associated with the precursor database."""
# Grab the existing times and sort them
if reader == "foamFile":
dataDir = os.path.join(readPath, "postProcessing", "sampledSurface")
times = os.listdir(dataDir)
times = np.sort(times)
elif reader == "hdf5":
# Set the readPath to the file itself
readPath = h5py.File(readPath, 'r', driver='mpio', comm=MPI.COMM_WORLD)
times = readPath["velocity"]["times"][:]
readPath.close()
else:
raise ValueError("Unknown reader: "+reader)
return times
def get_y_prec(reader, readPath):
"""Read the mean velocity profile of the precursor
and the total number of points in the y direction.
"""
if reader == "foamFile":
uMeanTimes = os.listdir(os.path.join(readPath, "postProcessing",
"collapsedFields"))
y = np.genfromtxt(os.path.join(readPath, "postProcessing",
"collapsedFields",
uMeanTimes[-1],
"UMean_X.xy"))[:, 0]
elif reader == "hdf5":
readPath = h5py.File(readPath, 'r', driver='mpio', comm=MPI.COMM_WORLD)
y = readPath["points"]["pointsY"][:, 0]
readPath.close()
else:
raise ValueError("Unknown reader: "+reader)
return y
def test_consolidate(periodic=False, ndim=2):
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
pts, le, re, ls = make_points(20, ndim, leafsize=3)
Tpara0 = cykdtree.PyParallelKDTree(pts, le, re, leafsize=ls,
periodic=periodic)
Tpara = Tpara0.consolidate()
if rank == 0:
if False:
from cykdtree.plot import plot2D_serial
plot2D_serial(Tpara, label_boxes=True,
plotfile='test_consolidate.png')
Tseri = cykdtree.PyKDTree(pts, le, re, leafsize=ls,
periodic=periodic)
Tpara.assert_equal(Tseri, strict_idx=False)
else:
assert(Tpara is None)
def test_consolidate_edges(periodic=False, ndim=2):
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
pts, le, re, ls = make_points(20, ndim, leafsize=3)
Tpara = cykdtree.PyParallelKDTree(pts, le, re, leafsize=ls,
periodic=periodic)
LEpara, REpara = Tpara.consolidate_edges()
if rank == 0:
Tseri = cykdtree.PyKDTree(pts, le, re, leafsize=ls,
periodic=periodic)
LEseri, REseri = Tseri.consolidate_edges()
else:
LEseri, REseri = None, None
LEseri, REseri = comm.bcast((LEseri, REseri), root=0)
np.testing.assert_allclose(LEpara, LEseri)
np.testing.assert_allclose(REpara, REseri)
def test_parallel_distribute(ndim=2):
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
npts = 50
if rank == 0:
pts = np.random.rand(npts, ndim).astype('float64')
else:
pts = None
total_pts = comm.bcast(pts, root=0)
local_pts, local_idx = parallel_utils.py_parallel_distribute(pts)
npts_local = npts/size
if rank < (npts%size):
npts_local += 1
assert_equal(local_pts.shape, (npts_local, ndim))
assert_equal(local_idx.shape, (npts_local, ))
np.testing.assert_array_equal(total_pts[local_idx], local_pts)
def test_parallel_pivot_value(ndim=2, npts=50):
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
if rank == 0:
pts = np.random.rand(npts, ndim).astype('float64')
else:
pts = None
total_pts = comm.bcast(pts, root=0)
local_pts, local_idx = parallel_utils.py_parallel_distribute(pts)
pivot_dim = ndim-1
piv = parallel_utils.py_parallel_pivot_value(local_pts, pivot_dim)
nmax = (7*npts/10 + 6)
assert(np.sum(total_pts[:, pivot_dim] < piv) <= nmax)
assert(np.sum(total_pts[:, pivot_dim] > piv) <= nmax)
# Not equivalent because each processes does not have multiple of 5 points
# if rank == 0:
# pp, idx = utils.py_pivot(total_pts, pivot_dim)
# np.testing.assert_approx_equal(piv, total_pts[idx[pp], pivot_dim])
def test_calc_rounds():
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
# Get answers
ans_nrounds = int(np.ceil(np.log2(size))) + 1
ans_src_round = 0
curr_rank = rank
curr_size = size
while curr_rank != 0:
split_rank = parallel_utils.py_calc_split_rank(curr_size)
if curr_rank < split_rank:
curr_size = split_rank
curr_rank = curr_rank
else:
curr_size = curr_size - split_rank
curr_rank = curr_rank - split_rank
ans_src_round += 1
# Test
nrounds, src_round = parallel_utils.py_calc_rounds()
assert_equal(nrounds, ans_nrounds)
assert_equal(src_round, ans_src_round)
def test_kdtree_parallel_distribute(ndim=2, npts=50):
total_npts = npts
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
if rank == 0:
total_pts = np.random.rand(total_npts, ndim).astype('float64')
else:
total_pts = None
pts, idx, le, re, ple, pre = parallel_utils.py_kdtree_parallel_distribute(total_pts)
total_pts = comm.bcast(total_pts, root=0)
assert_equal(pts.shape[0], idx.size)
np.testing.assert_array_equal(pts, total_pts[idx,:])
for d in range(ndim):
assert_less_equal(pts[:,d], re[d])
assert_less_equal(le[d], pts[:,d])
def __init__(self, comm=None):
_import_mpi()
if comm is None:
comm = MPI.COMM_WORLD
self.comm = comm
self.master = 0
self.rank = self.comm.Get_rank()
self.workers = set(range(self.comm.size))
self.workers.discard(self.master)
self.size = self.comm.Get_size() - 1
if self.size == 0:
raise ValueError("Tried to create an MPI pool, but there "
"was only one MPI process available. "
"Need at least two.")
def mpi_moments(x, axis=0):
x = np.asarray(x, dtype='float64')
newshape = list(x.shape)
newshape.pop(axis)
n = np.prod(newshape,dtype=int)
totalvec = np.zeros(n*2+1, 'float64')
addvec = np.concatenate([x.sum(axis=axis).ravel(),
np.square(x).sum(axis=axis).ravel(),
np.array([x.shape[axis]],dtype='float64')])
MPI.COMM_WORLD.Allreduce(addvec, totalvec, op=MPI.SUM)
sum = totalvec[:n]
sumsq = totalvec[n:2*n]
count = totalvec[2*n]
if count == 0:
mean = np.empty(newshape); mean[:] = np.nan
std = np.empty(newshape); std[:] = np.nan
else:
mean = sum/count
std = np.sqrt(np.maximum(sumsq/count - np.square(mean),0))
return mean, std, count
def test_runningmeanstd():
comm = MPI.COMM_WORLD
np.random.seed(0)
for (triple,axis) in [
((np.random.randn(3), np.random.randn(4), np.random.randn(5)),0),
((np.random.randn(3,2), np.random.randn(4,2), np.random.randn(5,2)),0),
((np.random.randn(2,3), np.random.randn(2,4), np.random.randn(2,4)),1),
]:
x = np.concatenate(triple, axis=axis)
ms1 = [x.mean(axis=axis), x.std(axis=axis), x.shape[axis]]
ms2 = mpi_moments(triple[comm.Get_rank()],axis=axis)
for (a1,a2) in zipsame(ms1, ms2):
print(a1, a2)
assert np.allclose(a1, a2)
print("ok!")
def create_result_dir(prefix='result'):
comm = MPI.COMM_WORLD
if comm.rank == 0:
result_dir = 'results/{}_{}_0'.format(
prefix, time.strftime('%Y-%m-%d_%H-%M-%S'))
while os.path.exists(result_dir):
i = result_dir.split('_')[-1]
result_dir = re.sub('_[0-9]+$', result_dir, '_{}'.format(i))
if not os.path.exists(result_dir):
os.makedirs(result_dir)
else:
result_dir = None
result_dir = comm.bcast(result_dir, root=0)
if not os.path.exists(result_dir):
os.makedirs(result_dir)
return result_dir
def main():
"""
main entry point for script
"""
comm = MPI.COMM_WORLD
opts = getoptions(True)
opts['threads'] = comm.Get_size()
logout = "mpiOutput-{}.log".format(comm.Get_rank())
# For MPI jobs, do something sane with logging.
setuplogger(logging.ERROR, logout, opts['log'])
config = Config()
if comm.Get_size() < 2:
logging.error("Must run MPI job with at least 2 processes")
sys.exit(1)
myhost = MPI.Get_processor_name()
logging.info("Nodename: %s", myhost)
processjobs(config, opts, comm.Get_rank(), comm)
logging.info("Rank: %s FINISHED", comm.Get_rank())
def get_mpi():
"""
Helper that returns the mpi communicator, rank and size.
"""
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
return comm, rank, size
def enabled():
'''
'''
if MPI is not None:
if MPI.COMM_WORLD.size > 1:
return True
return False
def update(self, x):
x = x.astype('float64')
n = int(np.prod(self.shape))
totalvec = np.zeros(n*2+1, 'float64')
addvec = np.concatenate([x.sum(axis=0).ravel(), np.square(x).sum(axis=0).ravel(), np.array([len(x)],dtype='float64')])
MPI.COMM_WORLD.Allreduce(addvec, totalvec, op=MPI.SUM)
self.incfiltparams(totalvec[0:n].reshape(self.shape), totalvec[n:2*n].reshape(self.shape), totalvec[2*n])