def _update_ready(self):
if self.comm.rank == 0:
while self.comm.Iprobe(source=MPI.ANY_SOURCE, tag=MPI_TAG_READY):
buf = numpy.empty(1, dtype='i')
self.comm.Recv([buf, MPI.INT], source=MPI.ANY_SOURCE, tag=MPI_TAG_READY)
rank = buf[0]
self._closing_ranks.append(rank)
log_debug(logger, self._log_prefix + "Received closing signal from rank %i (%i/%i)" % (rank,len(self._closing_ranks),self.comm.size))
if len(self._closing_ranks) == self.comm.size:
for i in range(1, self.comm.size):
send_buf = numpy.array(1, dtype='i')
self.comm.Send([send_buf, MPI.INT], dest=i, tag=MPI_TAG_READY)
recv_buf = numpy.empty(1, dtype='i')
self.comm.Recv([recv_buf, MPI.INT], source=i, tag=MPI_TAG_READY)
self._ready = True
log_debug(logger, self._log_prefix + "Master sent out ready signals to slaves")
else:
if self.comm.Iprobe(source=0, tag=MPI_TAG_READY):
recv_buf = numpy.empty(1, dtype='i')
self.comm.Recv([recv_buf, MPI.INT], source=0, tag=MPI_TAG_READY)
send_buf = numpy.array(1, dtype='i')
self.comm.Send([send_buf, MPI.INT], dest=0, tag=MPI_TAG_READY)
self._ready = True
log_debug(logger, self._log_prefix + "Slave rank %i received ready signals from master" % self.comm.rank)
python类INT的实例源码
def gather_array(data, mpi_comm, root=0, shape=0, dtype='float32'):
# gather 1D or 2D numpy arrays
assert isinstance(data, numpy.ndarray)
assert len(data.shape) < 3
# first we pass the data size
size = data.size
sizes = mpi_comm.gather(size, root=root) or []
# now we pass the data
displacements = [int(sum(sizes[:i])) for i in range(len(sizes))]
if dtype is 'float32':
gdata = numpy.empty(int(sum(sizes)), dtype=numpy.float32)
mpi_comm.Gatherv([data.flatten(), size, MPI.FLOAT], [gdata, (sizes, displacements), MPI.FLOAT], root=root)
elif dtype is 'float64':
gdata = numpy.empty(int(sum(sizes)), dtype=numpy.float64)
mpi_comm.Gatherv([data.flatten(), size, MPI.DOUBLE], [gdata, (sizes, displacements), MPI.DOUBLE], root=root)
elif dtype is 'int32':
gdata = numpy.empty(int(sum(sizes)), dtype=numpy.int32)
mpi_comm.Gatherv([data.flatten(), size, MPI.INT], [gdata, (sizes, displacements), MPI.INT], root=root)
elif dtype is 'int64':
gdata = numpy.empty(int(sum(sizes)), dtype=numpy.int64)
mpi_comm.Gatherv([data.flatten(), size, MPI.LONG], [gdata, (sizes, displacements), MPI.LONG], root=root)
if len(data.shape) == 1:
return gdata
else:
if shape == 0:
num_lines = data.shape[0]
if num_lines > 0:
return gdata.reshape((num_lines, gdata.size//num_lines))
else:
return gdata.reshape((0, gdata.shape[1]))
if shape == 1:
num_columns = data.shape[1]
if num_columns > 0:
return gdata.reshape((gdata.size//num_columns, num_columns))
else:
return gdata.reshape((gdata.shape[0], 0))
def all_gather_array(data, mpi_comm, shape=0, dtype='float32'):
# gather 1D or 2D numpy arrays
assert isinstance(data, numpy.ndarray)
assert len(data.shape) < 3
# first we pass the data size
size = data.size
sizes = mpi_comm.allgather(size) or []
# now we pass the data
displacements = [int(sum(sizes[:i])) for i in range(len(sizes))]
if dtype is 'float32':
gdata = numpy.empty(int(sum(sizes)), dtype=numpy.float32)
mpi_comm.Allgatherv([data.flatten(), size, MPI.FLOAT], [gdata, (sizes, displacements), MPI.FLOAT])
elif dtype is 'int32':
gdata = numpy.empty(int(sum(sizes)), dtype=numpy.int32)
mpi_comm.Allgatherv([data.flatten(), size, MPI.INT], [gdata, (sizes, displacements), MPI.INT])
if len(data.shape) == 1:
return gdata
else:
if shape == 0:
num_lines = data.shape[0]
if num_lines > 0:
return gdata.reshape((num_lines, gdata.size//num_lines))
else:
return gdata.reshape((0, gdata.shape[1]))
if shape == 1:
num_columns = data.shape[1]
if num_columns > 0:
return gdata.reshape((gdata.size//num_columns, num_columns))
else:
return gdata.reshape((gdata.shape[0], 0))
def run(self):
"""Wait for new data until node receives a terminate or a test message.
In the beginning, the node is waiting for new batches distributed by
:class:`MPIGridSearchCVMaster._scatter_work`.
After the grid search has been completed, the node either receives data
from :func:`_fit_and_score_with_parameters` to evaluate the estimator
given the parameters determined during grid-search, or is asked
to terminate. Stop messages are: MPI_MSG_TERMINATE or MPI_MSG_TEST.
"""
task_desc = self._task_desc
while True:
comm.Bcast([task_desc, MPI.INT], root=0)
if task_desc[1] == MPI_MSG_TERMINATE:
LOG.debug("Node %d received terminate message", comm_rank)
return
if task_desc[1] == MPI_MSG_CV:
self._run_grid_search()
elif task_desc[1] == MPI_MSG_TEST:
self._run_train_test()
break
else:
raise ValueError('unknown task with id %d' % task_desc[1])
LOG.debug("Node %d is terminating", comm_rank)
def run(self, train_X, train_y):
# tell slave that it should do hyper-parameter search
self._task_desc[0] = 0
self._task_desc[1] = MPI_MSG_CV
comm.Bcast([self._task_desc, MPI.INT], root=0)
comm.bcast((train_X, train_y), root=0)
self._data_X = train_X
self._data_y = train_y
root_result_batch = self._scatter_work()
return self._gather_work(root_result_batch)
def _fit_and_score_with_parameters(X, y, cv, best_parameters):
"""Distribute work of non-nested cross-validation across slave nodes."""
# tell slaves testing phase is next
_task_desc = numpy.empty(2, dtype=int)
_task_desc[1] = MPI_MSG_TEST
comm.Bcast([_task_desc, MPI.INT], root=0)
comm.bcast((X, y), root=0)
# Compability with sklearn > 0.18 TODO
_splitted_cv = [(a, b) for a, b in cv.split(X, y)]
assert comm_size >= len(_splitted_cv)
for i, (train_index, test_index) in enumerate(_splitted_cv):
fold_id = i + 1
LOG.info("Testing fold %d", fold_id)
parameters = best_parameters.loc[fold_id, :].to_dict()
work_item = (fold_id, train_index, test_index, parameters)
comm.send(work_item, dest=fold_id, tag=MPI_TAG_TRAIN_TEST_DATA)
scores = {}
for i in range(len(_splitted_cv)):
fold_id, test_result = comm.recv(source=MPI.ANY_SOURCE,
tag=MPI_TAG_RESULT)
scores[fold_id] = test_result
# Tell all nodes to terminate
for i in range(len(_splitted_cv), comm_size):
comm.send((0, None), dest=i, tag=MPI_TAG_TRAIN_TEST_DATA)
return pandas.Series(scores)
def numpy_to_MPI_typemap(np_type):
from mpi4py import MPI
typemap = {
np.dtype(np.float64) : MPI.DOUBLE,
np.dtype(np.float32) : MPI.FLOAT,
np.dtype(np.int) : MPI.INT,
np.dtype(np.int8) : MPI.CHAR,
np.dtype(np.uint8) : MPI.UNSIGNED_CHAR,
np.dtype(np.int32) : MPI.INT,
np.dtype(np.uint32) : MPI.UNSIGNED_INT,
}
return typemap[np_type]
def get_idxs_thread(comm, npoints):
""" Get indices for processor using Scatterv
Note:
-----
Uppercase mpi4py functions require everything to be in C-compatible
types or they will return garbage!
"""
size = comm.Get_size()
rank = comm.Get_rank()
npoints_thread = np.zeros(size,dtype=np.intc)
offsets_thread = np.zeros(size,dtype=np.intc)
for idx in range(size):
npoints_thread[idx] = npoints/size
offsets_thread[idx] = sum(npoints_thread[:idx])
for idx in range(npoints % size):
npoints_thread[idx] += 1
offsets_thread[idx + 1:] += 1
npoints_thread = tuple(npoints_thread)
offsets_thread = tuple(offsets_thread)
idxs_thread = np.zeros(npoints_thread[rank],dtype=np.intc)
idxs = np.arange(npoints,dtype=np.intc)
comm.Scatterv((idxs, npoints_thread, offsets_thread, MPI.INT), idxs_thread, root=0)
return idxs_thread, npoints_thread, offsets_thread
def _expand_poll(self):
#log_debug(logger, self._log_prefix + "Polling for stack expansion")
expand = False
if self.comm.Iprobe(source=(self.comm.rank-1) % self.comm.size, tag=MPI_TAG_EXPAND):
buf1 = numpy.empty(1, dtype='i')
self.comm.Recv([buf1, MPI.INT], source=(self.comm.rank-1) % self.comm.size, tag=MPI_TAG_EXPAND)
buf2 = numpy.array(1, dtype='i')
self.comm.Send([buf2, MPI.INT], dest=(self.comm.rank+1) % self.comm.size, tag=MPI_TAG_EXPAND)
expand= True
# Sending of expansion signal needed?
elif self._i >= self._stack_length:
buf1 = numpy.array(1, dtype='i')
req_s = self.comm.Isend([buf1, MPI.INT], dest=(self.comm.rank+1) % self.comm.size, tag=MPI_TAG_EXPAND)
buf2 = numpy.empty(1, dtype='i')
req_r = self.comm.Irecv([buf2, MPI.INT], source=(self.comm.rank-1) % self.comm.size, tag=MPI_TAG_EXPAND)
while True:
sent = req_s.Test()
received = req_r.Test()
if sent and received:
break
time.sleep(0.01)
expand = True
if expand:
log_debug(logger, self._log_prefix + "Do stack expansion")
self.comm.Barrier()
self._sync_i_max()
self._expand_stacks_mpi()
else:
pass
#log_debug(logger, self._log_prefix + "No stack expansion")
def _close_signal(self):
if self.comm.rank == 0:
self._closing_ranks = [0]
else:
buf = numpy.array(self.comm.rank, dtype="i")
self.comm.Send([buf, MPI.INT], dest=0, tag=MPI_TAG_READY)
log_debug(logger, self._log_prefix + "Rank %i sent closing signal to master" % (self.comm.rank))
def _sync_i_max(self):
sendbuf = numpy.array(self._i_max, dtype='i')
recvbuf = numpy.empty(1, dtype='i')
log_debug(logger, self._log_prefix + "Entering allreduce with maximum index %i" % (self._i_max))
self.comm.Allreduce([sendbuf, MPI.INT], [recvbuf, MPI.INT], op=MPI.MAX)
self._i_max = recvbuf[0]
log_debug(logger, self._log_prefix + "After reduce: i_max = %i" % self._i_max)