def _iter(self):
sleeped = 0 # measure time w/o messaging
# Waiting for any input
while not self.comm.Iprobe(source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG):
if sleeped == 0.5:
secs_since_start = round(time.time() - self.start, 1)
logger.info('Sleeping @ %s s.', secs_since_start)
# TODO: stats!
# observe ram usage
# observe free disk space
# observe ...
# make sure executed every 60? seconds
if self.numworkers == 0:
self._shutdown()
# TODO: think about better use of sleeped
sleeped += self._manage_io()
if sleeped > 0.5:
logger.info("Slept for %s seconds.", sleeped - 0.5)
# TODO: catch mpi.send errors properly
try:
self._process_mpi()
except IndexError as e:
logger.info('IndexError happend: %s', e)
python类ANY_SOURCE的实例源码
def run(self, model):
if self.comm == None:
print('Server communicator not initialized')
return
print('server started')
while True:
# Wait for next request from client
request = self.comm.recv(source=MPI.ANY_SOURCE, tag=199)
# Do some process work and formulate a reply
reply = self.process_request(model, request['id'],
request['rank'],request['message'])
# Send reply back to client
self.comm.send(reply, dest=request['rank'], tag=200)
# Do some action work after reply
self.action_after(model, request['id'],
request['rank'], request['message'])
def _update_ready(self):
if self.comm.rank == 0:
while self.comm.Iprobe(source=MPI.ANY_SOURCE, tag=MPI_TAG_READY):
buf = numpy.empty(1, dtype='i')
self.comm.Recv([buf, MPI.INT], source=MPI.ANY_SOURCE, tag=MPI_TAG_READY)
rank = buf[0]
self._closing_ranks.append(rank)
log_debug(logger, self._log_prefix + "Received closing signal from rank %i (%i/%i)" % (rank,len(self._closing_ranks),self.comm.size))
if len(self._closing_ranks) == self.comm.size:
for i in range(1, self.comm.size):
send_buf = numpy.array(1, dtype='i')
self.comm.Send([send_buf, MPI.INT], dest=i, tag=MPI_TAG_READY)
recv_buf = numpy.empty(1, dtype='i')
self.comm.Recv([recv_buf, MPI.INT], source=i, tag=MPI_TAG_READY)
self._ready = True
log_debug(logger, self._log_prefix + "Master sent out ready signals to slaves")
else:
if self.comm.Iprobe(source=0, tag=MPI_TAG_READY):
recv_buf = numpy.empty(1, dtype='i')
self.comm.Recv([recv_buf, MPI.INT], source=0, tag=MPI_TAG_READY)
send_buf = numpy.array(1, dtype='i')
self.comm.Send([send_buf, MPI.INT], dest=0, tag=MPI_TAG_READY)
self._ready = True
log_debug(logger, self._log_prefix + "Slave rank %i received ready signals from master" % self.comm.rank)
def bcast(self, obj=None, root=None):
"""Same with the Broadcast but, it handles for unknown root among
the nodes.
"""
size = self.cart_comm.Get_size()
if size == 1:
return obj
if root is None:
obj = self.cart_comm.recv(source = MPI.ANY_SOURCE)
else:
for dest in xrange(size):
if dest != root:
self.cart_comm.send(obj, dest)
return obj
def _fit_and_score_with_parameters(X, y, cv, best_parameters):
"""Distribute work of non-nested cross-validation across slave nodes."""
# tell slaves testing phase is next
_task_desc = numpy.empty(2, dtype=int)
_task_desc[1] = MPI_MSG_TEST
comm.Bcast([_task_desc, MPI.INT], root=0)
comm.bcast((X, y), root=0)
# Compability with sklearn > 0.18 TODO
_splitted_cv = [(a, b) for a, b in cv.split(X, y)]
assert comm_size >= len(_splitted_cv)
for i, (train_index, test_index) in enumerate(_splitted_cv):
fold_id = i + 1
LOG.info("Testing fold %d", fold_id)
parameters = best_parameters.loc[fold_id, :].to_dict()
work_item = (fold_id, train_index, test_index, parameters)
comm.send(work_item, dest=fold_id, tag=MPI_TAG_TRAIN_TEST_DATA)
scores = {}
for i in range(len(_splitted_cv)):
fold_id, test_result = comm.recv(source=MPI.ANY_SOURCE,
tag=MPI_TAG_RESULT)
scores[fold_id] = test_result
# Tell all nodes to terminate
for i in range(len(_splitted_cv), comm_size):
comm.send((0, None), dest=i, tag=MPI_TAG_TRAIN_TEST_DATA)
return pandas.Series(scores)
def process_messages(self, count_arr, recorder=None):
if recorder: recorder.start()
status = MPI.Status()
s = self.comm.Iprobe(source=MPI.ANY_SOURCE, tag=700, status=status)
# if self.test: print '%d probed, got %s' % (self.rank,s)
while s:
src_rank=status.source
self.comm.Recv(buf=count_arr, source=src_rank, tag=700, status=status)
self.gpucomm, src_gpurank, self_gpurank = self.get_gpucomm_with(src_rank)
if self.test: print('%d merging with %d' % (self.rank, src_rank))
self._merge_params_from(src_gpurank, src_rank)
s = self.comm.Iprobe(source=MPI.ANY_SOURCE, tag=700, status=status)
if self.test: print('%d probed again, got %s' % (self.rank,s))
if recorder: recorder.end('comm')
def push_message(self, dest_rank, count_arr, recorder=None):
'''
push message:
push params_i and alpha_i to the choosen rank
'''
# detect if any other worker is pushing to self at the same time to prevent deadlock
while self.comm.Iprobe(source=MPI.ANY_SOURCE, tag=700):
if self.test: print('a potential deadlock prevented')
self.process_messages(count_arr, recorder)
if recorder: recorder.start()
# 0. blocking request
if self.test: print('%d pushing msg to %d' % (self.rank,dest_rank))
self.comm.Send(buf=count_arr,dest=dest_rank, tag=700)
if self.test: print('%d requested to %d' % (self.rank,dest_rank))
# 1. push
self.gpucomm, dest_gpurank, self_gpurank = self.get_gpucomm_with(dest_rank)
self._push_params(self_gpurank, dest_rank)
if self.test: print('%d msg pushed' % self.rank)
if recorder: recorder.end('comm')
def recv_any_from_child(self,status):
"""Receives any message from any child. Returns the provided status object,
populated with information about received message"""
self.recv( tag='any', source=MPI.ANY_SOURCE, status=status, comm=self.child_comm )
return status
def orchestrate_map(self,pds_id):
"""Orchestrates the slaves/workers to perform a map function
This works by keeping track of the workers who haven't finished executing,
waiting for them to request the next chunk of data when they are free,
responding to them with the data and then sending them a Sentinel
signalling that they can exit.
"""
is_map_done = [True if i in self.master_node_ranks else False for i in range(self.size)]
status = MPI.Status()
#Copy it to the pending. This is so when master accesses
#the PDS data it's not empty.
self.pds_pending_store[pds_id] = list(self.pds_store[pds_id])
#While we have some ranks that haven't finished
while sum(is_map_done)<self.size:
#Wait for a reqest from anyone
data_request = self.comm.recv(
source=MPI.ANY_SOURCE,
tag=MPI.ANY_TAG,
status=status,
)
request_from_rank = status.source
if data_request!=pds_id:
print("Ignoring stale PDS data request from",
request_from_rank,":",data_request,"/",pds_id)
continue
#Pointer so we don't have to keep doing dict lookups
current_pds_items = self.pds_pending_store[pds_id]
num_current_pds_items = len(current_pds_items)
#Everyone's already exhausted all the data.
# Send a sentinel and mark the node as finished
if num_current_pds_items == 0:
self.comm.send(None, dest=request_from_rank, tag=pds_id)
is_map_done[request_from_rank] = True
else:
#Create the chunk of data to send. Pop off items and tag them with an id.
# so we can sort them later
chunk_to_send = []
for i in range(self.chunk_size):
chunk_to_send+=[(num_current_pds_items-i,current_pds_items.pop())]
self.comm.send(chunk_to_send, dest=request_from_rank, tag=pds_id)
def iter(self,log=None,np=None):
'''
The iteration of the Lanczos.
Parameters
----------
log : Log, optional
The log file to record the iteration information.
np : int, optional
The number of subprocess to perform the iteration.
'''
t0=time.time()
if self.method=='S' and (np is None or np<=0):
vecs,Qs=self.controllers['vecs'],self.controllers['Qs']
for i,lanczos in enumerate(self.controllers['lczs']):
ts=time.time()
while lanczos.niter<lanczos.maxiter and not lanczos.stop:
lanczos.iter()
Qs[i,:,lanczos.niter-1]=vecs.dot(lanczos.vectors[lanczos.niter-1])
te=time.time()
if log: log<<'%s%s%s'%('\b'*30 if i>0 else '',('%s/%s(%.2es/%.3es)'%(i+1,len(Qs),te-ts,te-t0)).center(30),'\b'*30 if i==len(Qs)-1 else '')
elif self.method=='B':
lanczos=self.controllers['lanczos']
for i in xrange(lanczos.maxiter):
ts=time.time()
lanczos.iter()
te=time.time()
if log: log<<'%s%s%s'%('\b'*30 if i>0 else '',('%s/%s(%.2es/%.3es)'%(i+1,lanczos.maxiter,te-ts,te-t0)).center(30),'\b'*30 if i==lanczos.maxiter-1 else '')
elif self.method=='S' and np is not None:
path,Qs=os.path.dirname(os.path.realpath(__file__)),self.controllers['Qs']
datas=[[self.controllers['vecs'],[],[]] for i in xrange(np)]
for i,lanczos in enumerate(self.controllers['lczs']):
datas[i%np][1].append(lanczos)
datas[i%np][2].append(i)
comm=MPI.COMM_SELF.Spawn(sys.executable,args=['%s/edbgf.py'%path],maxprocs=np)
for i,data in enumerate(datas):
comm.send(data,dest=i,tag=0)
info,ic,nc=MPI.Status(),0,0
while nc<np:
data=comm.recv(source=MPI.ANY_SOURCE,tag=MPI.ANY_TAG,status=info)
if info.Get_tag()==0:
for index,(_T_,P,niter),Q in data:
lanczos=self.controllers['lczs'][index]
lanczos._T_,lanczos.P,lanczos.niter=_T_,P,niter
Qs[index,:,:]=Q
nc+=1
else:
ic,(index,t)=ic+1,data
if log: log<<'%s%s%s'%('\b'*30 if ic>1 else '',('%s/%s(%.2es/%.3es)'%(ic,len(Qs),t,time.time()-t0)).center(30),'\b'*30 if ic==len(Qs) else '')
comm.Disconnect()
else:
raise ValueError('BGF iter error: not supported.')