def all_reduce_params(sent_shared_params, rec_buffers, average_cnt = 1):
from mpi4py import MPI
mpi_communicator = MPI.COMM_WORLD
commu_time = 0.0
gpu2cpu_cp_time = 0.0
for (sent_model, rec_model) in zip(sent_shared_params, rec_buffers):
cp_start = time.time()
model_val = sent_model.get_value()
gpu2cpu_cp_time += time.time() - cp_start
commu_start = time.time()
mpi_communicator.Allreduce([model_val, MPI.FLOAT], [rec_model, MPI.FLOAT], op=MPI.SUM)
commu_time += time.time() - commu_start
if average_cnt != 1: #try to avoid dividing since it is very cost
rec_model = rec_model / average_cnt
cp_start = time.time()
sent_model.set_value(rec_model)
gpu2cpu_cp_time += time.time() - cp_start
return commu_time, gpu2cpu_cp_time
python类FLOAT的实例源码
def gather_array(data, mpi_comm, root=0, shape=0, dtype='float32'):
# gather 1D or 2D numpy arrays
assert isinstance(data, numpy.ndarray)
assert len(data.shape) < 3
# first we pass the data size
size = data.size
sizes = mpi_comm.gather(size, root=root) or []
# now we pass the data
displacements = [int(sum(sizes[:i])) for i in range(len(sizes))]
if dtype is 'float32':
gdata = numpy.empty(int(sum(sizes)), dtype=numpy.float32)
mpi_comm.Gatherv([data.flatten(), size, MPI.FLOAT], [gdata, (sizes, displacements), MPI.FLOAT], root=root)
elif dtype is 'float64':
gdata = numpy.empty(int(sum(sizes)), dtype=numpy.float64)
mpi_comm.Gatherv([data.flatten(), size, MPI.DOUBLE], [gdata, (sizes, displacements), MPI.DOUBLE], root=root)
elif dtype is 'int32':
gdata = numpy.empty(int(sum(sizes)), dtype=numpy.int32)
mpi_comm.Gatherv([data.flatten(), size, MPI.INT], [gdata, (sizes, displacements), MPI.INT], root=root)
elif dtype is 'int64':
gdata = numpy.empty(int(sum(sizes)), dtype=numpy.int64)
mpi_comm.Gatherv([data.flatten(), size, MPI.LONG], [gdata, (sizes, displacements), MPI.LONG], root=root)
if len(data.shape) == 1:
return gdata
else:
if shape == 0:
num_lines = data.shape[0]
if num_lines > 0:
return gdata.reshape((num_lines, gdata.size//num_lines))
else:
return gdata.reshape((0, gdata.shape[1]))
if shape == 1:
num_columns = data.shape[1]
if num_columns > 0:
return gdata.reshape((gdata.size//num_columns, num_columns))
else:
return gdata.reshape((gdata.shape[0], 0))
def all_gather_array(data, mpi_comm, shape=0, dtype='float32'):
# gather 1D or 2D numpy arrays
assert isinstance(data, numpy.ndarray)
assert len(data.shape) < 3
# first we pass the data size
size = data.size
sizes = mpi_comm.allgather(size) or []
# now we pass the data
displacements = [int(sum(sizes[:i])) for i in range(len(sizes))]
if dtype is 'float32':
gdata = numpy.empty(int(sum(sizes)), dtype=numpy.float32)
mpi_comm.Allgatherv([data.flatten(), size, MPI.FLOAT], [gdata, (sizes, displacements), MPI.FLOAT])
elif dtype is 'int32':
gdata = numpy.empty(int(sum(sizes)), dtype=numpy.int32)
mpi_comm.Allgatherv([data.flatten(), size, MPI.INT], [gdata, (sizes, displacements), MPI.INT])
if len(data.shape) == 1:
return gdata
else:
if shape == 0:
num_lines = data.shape[0]
if num_lines > 0:
return gdata.reshape((num_lines, gdata.size//num_lines))
else:
return gdata.reshape((0, gdata.shape[1]))
if shape == 1:
num_columns = data.shape[1]
if num_columns > 0:
return gdata.reshape((gdata.size//num_columns, num_columns))
else:
return gdata.reshape((gdata.shape[0], 0))
def get_mpi_type(data_type):
if data_type == 'int16':
return MPI.SHORT
elif data_type == 'uint16':
return MPI.UNSIGNED_SHORT
elif data_type == 'float32':
return MPI.FLOAT
elif data_type == 'int32':
return MPI.INT
def do_sendrecv(comm, glist, wlist, dest):
for gs, ws in zip(glist, wlist):
g = gs.container.value
w = ws.container.value
g.sync()
w.sync()
comm.Sendrecv(sendbuf=[bufint(g), MPI.FLOAT], dest=dest,
recvbuf=[bufint(w), MPI.FLOAT], source=dest)
def numpy_to_MPI_typemap(np_type):
from mpi4py import MPI
typemap = {
np.dtype(np.float64) : MPI.DOUBLE,
np.dtype(np.float32) : MPI.FLOAT,
np.dtype(np.int) : MPI.INT,
np.dtype(np.int8) : MPI.CHAR,
np.dtype(np.uint8) : MPI.UNSIGNED_CHAR,
np.dtype(np.int32) : MPI.INT,
np.dtype(np.uint32) : MPI.UNSIGNED_INT,
}
return typemap[np_type]