def _iter(self):
sleeped = 0 # measure time w/o messaging
# Waiting for any input
while not self.comm.Iprobe(source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG):
if sleeped == 0.5:
secs_since_start = round(time.time() - self.start, 1)
logger.info('Sleeping @ %s s.', secs_since_start)
# TODO: stats!
# observe ram usage
# observe free disk space
# observe ...
# make sure executed every 60? seconds
if self.numworkers == 0:
self._shutdown()
# TODO: think about better use of sleeped
sleeped += self._manage_io()
if sleeped > 0.5:
logger.info("Slept for %s seconds.", sleeped - 0.5)
# TODO: catch mpi.send errors properly
try:
self._process_mpi()
except IndexError as e:
logger.info('IndexError happend: %s', e)
python类ANY_TAG的实例源码
def recv(self, obj=None, tag=MPI.ANY_TAG, source=None, buffer=False, status=None, comm=None):
"""Wrapper around MPI.recv/Recv. Returns the received object.
Params:
obj: variable into which the received object should be placed
tag: string indicating which MPI tag should be received
source: integer rank of the message source. Defaults to self.parent_rank
buffer: True if the received object should be sent as a single-segment buffer
(e.g. for numpy arrays) using MPI.Recv rather than MPI.recv
status: MPI status object that is filled with received status information
comm: MPI communicator to use. Defaults to self.parent_comm"""
if comm is None:
comm = self.parent_comm
if source is None:
if self.parent_rank is None:
raise Error("Attempting to receive %s from parent, but parent rank is None" % tag)
source = self.parent_rank
tag_num = self.lookup_mpi_tag(tag)
if buffer:
comm.Recv( obj, source=source, tag=tag_num, status=status )
return obj
else:
obj = comm.recv( source=source, tag=tag_num, status=status )
return obj
def _fit_slave(self, X, y):
"""Pipeline evaluation.
Parameters
----------
X : array of float, shape : n_samples x n_features, default : ()
The input data matrix.
"""
try:
while True:
status_ = MPI.Status()
received = COMM.recv(source=0, tag=MPI.ANY_TAG, status=status_)
# check the tag of the received message
if status_.tag == EXIT:
return
# do the work
i, (train_index, test_index) = received
# if self.verbose:
# print("[{} {}]: Performing experiment {}".format(
# NAME, RANK, i))
cv_results_ = _worker(self, i, X, y, train_index, test_index)
# if self.verbose:
# print("[{} {}]: Experiment {} completed".format(
# NAME, RANK, i))
COMM.send(cv_results_, dest=0, tag=0)
except StandardError as exc:
warnings.warn("Quitting ... TB:", str(exc))
def _worker(self, clf):
"""Worker node's operation.
Receiving tasks from the master to process and sending the result back
Parameters
----------
clf: classification function
the classifier to be used in cross validation
Returns
-------
None
"""
logger.debug(
'worker %d is running, waiting for tasks from master at rank %d' %
(MPI.COMM_WORLD.Get_rank(), self.master_rank)
)
comm = MPI.COMM_WORLD
status = MPI.Status()
while 1:
task = comm.recv(source=self.master_rank,
tag=MPI.ANY_TAG,
status=status)
if status.Get_tag():
break
comm.send(self._voxel_scoring(task, clf),
dest=self.master_rank)
def wait(self, callback=None):
"""Tell the workers to wait and listen for the master process. This is
called automatically when using :meth:`MPIPool.map` and doesn't need to
be called by the user.
"""
if self.is_master():
return
worker = self.comm.rank
status = MPI.Status()
while True:
log.log(_VERBOSE, "Worker {0} waiting for task".format(worker))
task = self.comm.recv(source=self.master, tag=MPI.ANY_TAG,
status=status)
if task is None:
log.log(_VERBOSE, "Worker {0} told to quit work".format(worker))
break
func, arg = task
log.log(_VERBOSE, "Worker {0} got task {1} with tag {2}"
.format(worker, arg, status.tag))
result = func(arg)
log.log(_VERBOSE, "Worker {0} sending answer {1} with tag {2}"
.format(worker, result, status.tag))
self.comm.ssend(result, self.master, status.tag)
if callback is not None:
callback()
def wait(self):
"""
If this isn't the master process, wait for instructions.
"""
if self.is_master():
raise RuntimeError("Master node told to await jobs.")
status = MPI.Status()
while True:
# Event loop.
# Sit here and await instructions.
if self.debug:
print("Worker {0} waiting for task.".format(self.rank))
# Blocking receive to wait for instructions.
task = self.comm.recv(source=0, tag=MPI.ANY_TAG, status=status)
if self.debug:
print("Worker {0} got task {1} with tag {2}."
.format(self.rank, type(task), status.tag))
# Check if message is special sentinel signaling end.
# If so, stop.
if isinstance(task, _close_pool_message):
if self.debug:
print("Worker {0} told to quit.".format(self.rank))
break
# Check if message is special type containing new function
# to be applied
if isinstance(task, _function_wrapper):
self.function = task.function
if self.debug:
print("Worker {0} replaced its task function: {1}."
.format(self.rank, self.function))
continue
# If not a special message, just run the known function on
# the input and return it asynchronously.
result = self.function(task)
if self.debug:
print("Worker {0} sending answer {1} with tag {2}."
.format(self.rank, type(result), status.tag))
self.comm.isend(result, dest=0, tag=status.tag)
# Kill the process?
if self.exit_on_end:
sys.exit()
def wait(self):
"""
If this isn't the master process, wait for instructions.
"""
if self.is_master():
raise RuntimeError("Master node told to await jobs.")
status = MPI.Status()
while True:
# Event loop.
# Sit here and await instructions.
if self.debug:
print("Worker {0} waiting for task.".format(self.rank))
# Blocking receive to wait for instructions.
task = self.comm.recv(source=0, tag=MPI.ANY_TAG, status=status)
if self.debug:
print("Worker {0} got task {1} with tag {2}."
.format(self.rank, task, status.tag))
# Check if message is special sentinel signaling end.
# If so, stop.
if isinstance(task, _close_pool_message):
if self.debug:
print("Worker {0} told to quit.".format(self.rank))
break
# Check if message is special type containing new function
# to be applied
if isinstance(task, _function_wrapper):
self.function = task.function
if self.debug:
print("Worker {0} replaced its task function: {1}."
.format(self.rank, self.function))
continue
# If not a special message, just run the known function on
# the input and return it asynchronously.
result = self.function(task)
if self.debug:
print("Worker {0} sending answer {1} with tag {2}."
.format(self.rank, result, status.tag))
self.comm.isend(result, dest=0, tag=status.tag)
def orchestrate_map(self,pds_id):
"""Orchestrates the slaves/workers to perform a map function
This works by keeping track of the workers who haven't finished executing,
waiting for them to request the next chunk of data when they are free,
responding to them with the data and then sending them a Sentinel
signalling that they can exit.
"""
is_map_done = [True if i in self.master_node_ranks else False for i in range(self.size)]
status = MPI.Status()
#Copy it to the pending. This is so when master accesses
#the PDS data it's not empty.
self.pds_pending_store[pds_id] = list(self.pds_store[pds_id])
#While we have some ranks that haven't finished
while sum(is_map_done)<self.size:
#Wait for a reqest from anyone
data_request = self.comm.recv(
source=MPI.ANY_SOURCE,
tag=MPI.ANY_TAG,
status=status,
)
request_from_rank = status.source
if data_request!=pds_id:
print("Ignoring stale PDS data request from",
request_from_rank,":",data_request,"/",pds_id)
continue
#Pointer so we don't have to keep doing dict lookups
current_pds_items = self.pds_pending_store[pds_id]
num_current_pds_items = len(current_pds_items)
#Everyone's already exhausted all the data.
# Send a sentinel and mark the node as finished
if num_current_pds_items == 0:
self.comm.send(None, dest=request_from_rank, tag=pds_id)
is_map_done[request_from_rank] = True
else:
#Create the chunk of data to send. Pop off items and tag them with an id.
# so we can sort them later
chunk_to_send = []
for i in range(self.chunk_size):
chunk_to_send+=[(num_current_pds_items-i,current_pds_items.pop())]
self.comm.send(chunk_to_send, dest=request_from_rank, tag=pds_id)
def wait(self):
"""
If this isn't the master process, wait for instructions.
"""
if self.is_master():
raise RuntimeError("Master node told to await jobs.")
status = MPI.Status()
while True:
# Event loop.
# Sit here and await instructions.
if self.debug:
print("Worker {0} waiting for task.".format(self.rank))
# Blocking receive to wait for instructions.
task = self.comm.recv(source=0, tag=MPI.ANY_TAG, status=status)
if self.debug:
print("Worker {0} got task {1} with tag {2}."
.format(self.rank, task, status.tag))
# Check if message is special sentinel signaling end.
# If so, stop.
if isinstance(task, _close_pool_message):
if self.debug:
print("Worker {0} told to quit.".format(self.rank))
break
# Check if message is special type containing new function
# to be applied
if isinstance(task, _function_wrapper):
self.function = task.function
if self.debug:
print("Worker {0} replaced its task function: {1}."
.format(self.rank, self.function))
continue
# If not a special message, just run the known function on
# the input and return it asynchronously.
result = self.function(task)
if self.debug:
print("Worker {0} sending answer {1} with tag {2}."
.format(self.rank, result, status.tag))
self.comm.isend(result, dest=0, tag=status.tag)
def iter(self,log=None,np=None):
'''
The iteration of the Lanczos.
Parameters
----------
log : Log, optional
The log file to record the iteration information.
np : int, optional
The number of subprocess to perform the iteration.
'''
t0=time.time()
if self.method=='S' and (np is None or np<=0):
vecs,Qs=self.controllers['vecs'],self.controllers['Qs']
for i,lanczos in enumerate(self.controllers['lczs']):
ts=time.time()
while lanczos.niter<lanczos.maxiter and not lanczos.stop:
lanczos.iter()
Qs[i,:,lanczos.niter-1]=vecs.dot(lanczos.vectors[lanczos.niter-1])
te=time.time()
if log: log<<'%s%s%s'%('\b'*30 if i>0 else '',('%s/%s(%.2es/%.3es)'%(i+1,len(Qs),te-ts,te-t0)).center(30),'\b'*30 if i==len(Qs)-1 else '')
elif self.method=='B':
lanczos=self.controllers['lanczos']
for i in xrange(lanczos.maxiter):
ts=time.time()
lanczos.iter()
te=time.time()
if log: log<<'%s%s%s'%('\b'*30 if i>0 else '',('%s/%s(%.2es/%.3es)'%(i+1,lanczos.maxiter,te-ts,te-t0)).center(30),'\b'*30 if i==lanczos.maxiter-1 else '')
elif self.method=='S' and np is not None:
path,Qs=os.path.dirname(os.path.realpath(__file__)),self.controllers['Qs']
datas=[[self.controllers['vecs'],[],[]] for i in xrange(np)]
for i,lanczos in enumerate(self.controllers['lczs']):
datas[i%np][1].append(lanczos)
datas[i%np][2].append(i)
comm=MPI.COMM_SELF.Spawn(sys.executable,args=['%s/edbgf.py'%path],maxprocs=np)
for i,data in enumerate(datas):
comm.send(data,dest=i,tag=0)
info,ic,nc=MPI.Status(),0,0
while nc<np:
data=comm.recv(source=MPI.ANY_SOURCE,tag=MPI.ANY_TAG,status=info)
if info.Get_tag()==0:
for index,(_T_,P,niter),Q in data:
lanczos=self.controllers['lczs'][index]
lanczos._T_,lanczos.P,lanczos.niter=_T_,P,niter
Qs[index,:,:]=Q
nc+=1
else:
ic,(index,t)=ic+1,data
if log: log<<'%s%s%s'%('\b'*30 if ic>1 else '',('%s/%s(%.2es/%.3es)'%(ic,len(Qs),t,time.time()-t0)).center(30),'\b'*30 if ic==len(Qs) else '')
comm.Disconnect()
else:
raise ValueError('BGF iter error: not supported.')