def main():
"""
main entry point for script
"""
comm = MPI.COMM_WORLD
opts = getoptions(True)
opts['threads'] = comm.Get_size()
logout = "mpiOutput-{}.log".format(comm.Get_rank())
# For MPI jobs, do something sane with logging.
setuplogger(logging.ERROR, logout, opts['log'])
config = Config()
if comm.Get_size() < 2:
logging.error("Must run MPI job with at least 2 processes")
sys.exit(1)
myhost = MPI.Get_processor_name()
logging.info("Nodename: %s", myhost)
processjobs(config, opts, comm.Get_rank(), comm)
logging.info("Rank: %s FINISHED", comm.Get_rank())
python类Get_processor_name()的实例源码
def get_device(comm, num_masters=1, gpu_limit=-1, gpu_for_master=False):
"""Arguments:
comm: MPI intracommunicator containing all processes
num_masters: number of processes that will be assigned as masters
gpu_limit: maximum number of gpus to use on one host
gpu_for_master: whether master processes should be given a gpu
Returns device name 'cpu' or 'gpuN' appropriate for use with theano"""
rank = comm.Get_rank()
if gpu_for_master:
gpu_ranks = range(comm.Get_size())
else:
gpu_ranks = get_worker_ranks( comm, num_masters )
# Get the ranks of the other processes that share the same host
# and determine which GPU to take on the host
host = MPI.Get_processor_name()
hosts = comm.allgather(host)
workers_sharing_host = [ i for i in gpu_ranks
if hosts[i] == host ]
if rank in workers_sharing_host:
worker_id = workers_sharing_host.index( rank )
else:
worker_id = -1
# get_num_gpus will fail if CUDA is not installed, so we short circuit if 0 GPUs are requested
if gpu_limit == 0:
return 'cpu'
max_gpu = get_num_gpus() - 1
if gpu_limit > 0:
max_gpu = min( max_gpu, gpu_limit-1 )
if worker_id < 0:# or worker_id > max_gpu:
return 'cpu'
else:
return 'gpu%d' % (worker_id%(max_gpu+1))
def get_id_within_node(comm=None):
from mpi4py import MPI
if comm is None: comm = MPI.COMM_WORLD
rank = comm.rank
nodename = MPI.Get_processor_name()
nodelist = comm.allgather(nodename)
return len([i for i in nodelist[:rank] if i==nodename])
def __init__(self, parallel=True, capture=False, print_worker_log=True):
if run_mpi == False:
print "NewParallel warning: mpi4py could not be loaded"
print "\tany instances of NewParallel will run in serial"
self.parallel = False
else:
self.parallel = parallel
self.capture = capture
self.print_worker_log=print_worker_log
if self.parallel:
self.processor_name = MPI.Get_processor_name()
self.comm = MPI.COMM_WORLD
self.size = self.comm.Get_size()
self.rank = self.comm.Get_rank()
self.status = MPI.Status()
# define MPI message tags
self.tags = enum('READY', 'DONE', 'EXIT', 'START', 'ERROR')
if self.size == 1:
self.parallel = False
print "NewParallel warning: only one core found"
print "\tany instances of NewParallel will run in serial"
else:
self.size = 1
self.rank = 0
self.task_count = 0
self.task_list = None
self.use_master_update = False
self.update_interval = None
def spawn_load(self):
'''spwan a parallel loading process using MPI'''
if not para_load:
return
from mpi4py import MPI
import os
import sys
hostname = MPI.Get_processor_name()
mpiinfo = MPI.Info.Create()
# will give all nodes filled issue if use key=host because need an additional slot
# also the hostname should be exactly the same in the output list of --display-allocation
if hostname != hostname.split('.')[0]:
hostname = hostname.split('.')[0]
mpiinfo.Set(key = 'add-host',value = hostname)
num_spawn = 1
if "CPULIST_train" in os.environ:
# see https://gist.github.com/lebedov/eadce02a320d10f0e81c
# print os.environ['CPULIST_train']
envstr=""
# for key, value in dict(os.environ).iteritems():
# envstr+= '%s=%s\n' % (key,value)
envstr+='CPULIST_train=%s\n' % os.environ['CPULIST_train']
mpiinfo.Set(key ='env', value = envstr)
ninfo = mpiinfo.Get_nkeys()
# print ninfo
mpicommand = sys.executable
file_dir = os.path.dirname(os.path.realpath(__file__))# get the dir of imagenet.py
self.icomm= MPI.COMM_SELF.Spawn(mpicommand,
args=[file_dir+'/proc_load_mpi.py'],
info = mpiinfo, maxprocs = num_spawn)