def save_images(nifti_files, anat, roi_dict, out_dir, **kwargs):
'''Saves multiple nifti images using multiprocessing.
Uses `multiprocessing`.
Args:
nifti_files (list): list of nifti file paths.
anat (nipy.core.api.image.image.Image): anatomical image.
roi_dict (dict): dictionary of cluster dictionaries.
out_dir (str): output directory path.
**kwargs: extra keyword arguments.
'''
p = mp.Pool(30)
idx = [int(f.split('/')[-1].split('.')[0]) for f in nifti_files]
args_iter = itertools.izip(nifti_files,
itertools.repeat(anat),
[roi_dict[i] for i in idx],
[path.join(out_dir, '%d.png' % i) for i in idx],
idx)
p.map(save_helper, args_iter)
p.close()
p.join()
python类Pool()的实例源码
def multiprocess_csv_import(work_list, settings, logger):
pool = multiprocessing.Pool(processes=settings['max_concurrent_processes'])
num_jobs = len(work_list)
results = pool.imap_unordered(run_csv_import_multiprocessing, [[w, settings] for w in work_list])
pool.close()
pool.join()
result_list = list(results)
num_results = len(result_list)
if num_jobs > num_results:
logger.warning("\t- A MULTIPROCESSING PROCESS FAILED WITHOUT AN ERROR\nACTION: Check the record counts")
for result in result_list:
if result != "SUCCESS":
logger.info(result)
def multiprocess_list(mp_type, work_list, settings, logger):
pool = multiprocessing.Pool(processes=settings['max_concurrent_processes'])
num_jobs = len(work_list)
if mp_type == "sql":
results = pool.imap_unordered(run_sql_multiprocessing, [[w, settings] for w in work_list])
else:
results = pool.imap_unordered(run_command_line, work_list)
pool.close()
pool.join()
result_list = list(results)
num_results = len(result_list)
if num_jobs > num_results:
logger.warning("\t- A MULTIPROCESSING PROCESS FAILED WITHOUT AN ERROR\nACTION: Check the record counts")
for result in result_list:
if result != "SUCCESS":
logger.info(result)
def multiprocess_shapefile_load(work_list, settings, logger):
pool = multiprocessing.Pool(processes=settings['max_concurrent_processes'])
num_jobs = len(work_list)
results = pool.imap_unordered(intermediate_shapefile_load_step, [[w, settings] for w in work_list])
pool.close()
pool.join()
result_list = list(results)
num_results = len(result_list)
if num_jobs > num_results:
logger.warning("\t- A MULTIPROCESSING PROCESS FAILED WITHOUT AN ERROR\nACTION: Check the record counts")
for result in result_list:
if result != "SUCCESS":
logger.info(result)
def check_data_valid(data, startval, endval=None):
if endval is None:
endval = len(data)
chunksize = 10000000
startval = int(startval)
endval = int(endval)
offsets = np.arange(0, len(data), chunksize)
args = []
result = True
for offset in offsets:
s = startval + offset
e = min(s + chunksize, endval)
nelems = e - s
test_chunk = data[offset:offset + nelems]
args.append((s, e, test_chunk))
pool = mp.Pool()
result = all(pool.map(_check_chunk, args))
pool.terminate()
return result
def metrics(period, resolution, tag, threads, metricsfile):
try:
pool = Pool(processes=threads)
period_seconds = period * 3600
resolution_seconds = resolution * 3600
if metricsfile:
with open(metricsfile) as fp:
m = json.loads(fp.read().replace('][', ','))
else:
m = metrics_api.get_tag_metrics(tag_name=tag, **context.settings)
click.echo(click.style('Found: %s metrics', fg='green') % (len(m)))
expire = partial(_expire_metric_path, period_seconds, resolution_seconds, tag)
expired_paths = tqdm(pool.imap_unordered(expire, m))
expired_paths = sum(filter(None, expired_paths))
click.echo(click.style('Expired: %s metric paths', fg='green') % expired_paths)
except Exception, e:
print 'Cleanup metrics failed. %s' % e
finally:
pool.terminate()
pool.join()
def easy_par(f, sequence):
from multiprocessing import Pool
poolsize=len(sequence)
if poolsize > 16:
poolsize = 16
pool = Pool(processes=poolsize)
try:
# f is given sequence. guaranteed to be in order
cleaned=False
result = pool.map(f, sequence)
cleaned = [x for x in result if not x is None]
#cleaned = asarray(cleaned)
# not optimal but safe
except KeyboardInterrupt:
pool.terminate()
except Exception as e:
print('got exception: %r' % (e,))
if not args.force:
print("Terminating the pool")
pool.terminate()
finally:
pool.close()
pool.join()
return cleaned
def load_bundle(filename, regex="^\d+", chunk=None):
""" Load graphs from zip archive """
pool = Pool()
archive = zf.ZipFile(filename)
# Determine entries and select subset if requested
entries = archive.namelist()
if chunk:
entries = list(set(entries) & set(chunk))
entries = [(archive, entry) for entry in entries]
# Load entries in parallel
func = partial(load_bundle_entry, regex=re.compile(regex))
items = pool.map(func, entries)
items = filter(lambda (g, l): g is not None, items)
graphs, labels = zip(*items)
archive.close()
pool.close()
pool.join()
return graphs, labels
def fun_page(page_id,onoma):
pp= Pool(50)
mega_list = []
start = time.time()
pst = p(page_id,access_token)
n = 50
group_post = group(pst,n)
temp = 0
for j in group_post:
temp += len(j)
print(str(temp)+'/'+str(len(pst)))
re = pp.map(pros,list(j))
for jj in re:
mega_list.append(jj)
duration = (time.time()-start)/float(60)
print ("Time:"+str(duration)+'min')
with open(onoma,'w') as f:
json.dump(mega_list,f)
return mega_list
def query_shards(self, query):
"""
Return the result of applying shard[query] for each shard in self.shards,
as a sequence.
If PARALLEL_SHARDS is set, the shards are queried in parallel, using
the multiprocessing module.
"""
args = zip([query] * len(self.shards), self.shards)
if PARALLEL_SHARDS and PARALLEL_SHARDS > 1:
logger.debug("spawning %i query processes" % PARALLEL_SHARDS)
pool = multiprocessing.Pool(PARALLEL_SHARDS)
result = pool.imap(query_shard, args, chunksize=1 + len(args) / PARALLEL_SHARDS)
else:
# serial processing, one shard after another
pool = None
result = imap(query_shard, args)
return pool, result
def process_pool():
p=Pool(10)
start=time.time()
#q1=Queue.Queue()
manager=Manager()
q=manager.Queue()
print "main start ",start
for i in xrange(10):
p.apply_async(sub_pool,args=(q,))
p.close()
p.join()
end=time.time()
print "process done at ",end
#print q
print q.get()
'''
while q1.empty() ==False:
d= q1.get(True)
print d
'''
def pool_map():
x=[i for i in range (50) if i%2==0]
#print x
start=time.time()
'''
for i in x:
single(i)
print "time used " , time.time()-start
'''
#using multiprocess
p=Pool(2)
s=p.map(single,x)
p.close()
p.join()
print s
print len(s)
print "end. Time used: ",time.time()-start
def transTest():
#??????????CPU????
numprocs = 8 #?4????4???????4?
pool = multiprocessing.Pool(processes=numprocs)
#pool.apply_async(func =tts_baidu.fileToVoice , args=("BBCHeadline.txt","BBCHeadline.wav","en"))
#pool.apply_async(func =tts_baidu.fileToVoice , args=("1984.txt","1984.wav","en"))
#pool.apply_async(func =tts_baidu.fileToVoice , args=("emma.txt","emma.wav","en"))
#pool.apply_async(func =tts_baidu.fileToVoice , args=("Home.2009.eng.txt","Home.2009.eng.wav","en"))
#pool.apply_async(func =tts_baidu.fileToVoice , args=("StrayBirds.txt","StrayBirds.wav","en"))
#pool.apply_async(func =tts_baidu.fileToVoice , args=("????.txt","????.wav","zh"))
#pool.apply_async(func =tts_baidu.fileToVoice , args=("???????.txt","???????.wav","zh"))
#pool.apply_async(func =tts_baidu.fileToVoice , args=("????????.txt","????????.wav","zh"))
pool.apply_async(func =tts_baidu.fileToVoice , args=("????.txt","????.wav","zh"))
#pool.apply_async(func =tts_baidu.fileToVoice , args=("????????.txt","????????.wav","zh"))
#pool.apply_async(func =tts_baidu.fileToVoice , args=("???????.txt","???????.wav","zh"))
#pool.apply_async(func =tts_baidu.fileToVoice , args=("??????.txt","??????.wav","zh"))
pool.close()
pool.join()
def trans_novs():
# 249 novals gaga
#??????????CPU????
count =0
numprocs = 8 #?4????4???????4?
pool = multiprocessing.Pool(processes=numprocs)
files = os.listdir("./nov")
for filename in files:
if not os.path.isdir(filename):
if filename.endswith("txt"):
fname = "./nov/"+filename
#print fname
pool.apply_async(func =tts_baidu.fileToVoice , args=(fname,fname+".wav","zh"))
pool.close()
pool.join()
#print "finish all ,total handle TTS GET() :",count
def main():
input_dir, output_dir = getDirs()
table_list = listFiles(input_dir)
concurrency = cpu_count()
print 'Using {0:d} Processes'.format(concurrency)
pool = Pool(concurrency)
# perform the passed in write action (function) for each csv row
time_capture = TimeCapture(time.time())
results = pool.map(
multiprocess,
izip(repeat(output_dir),
[copy.deepcopy(time_capture) for i in range(len(table_list))],
table_list,
repeat(write)))
time_capture.end(1)
pool.close()
pool.join()
print 'Finished Successfully!'
displayResults(results, time_capture.total_time)
def get_dissimilarity_matrix(U,V,X,n,error_list,beta,alpha_w,alpha_e_avg_t,alpha_n0,maxconn):
row_size = X.shape[0]
col_size = X.shape[1]
channel_count = X.shape[2]
alpha = get_alpha(n,error_list,alpha_w,alpha_e_avg_t,alpha_n0)
cluster_number = V.shape[0]
D = np.zeros((row_size,col_size,cluster_number))
index_arr = np.array([[k,l] for k in xrange(row_size) for l in xrange(col_size)],dtype='int32')
U_new = U.reshape(row_size*col_size,cluster_number, order='F')
data_inputs = [0 for i in xrange(0,row_size*col_size)]
for i in xrange(0, row_size*col_size):
x = index_arr[i][0]
y = index_arr[i][1]
data_inputs[i] = [U_new,V,X[x][y],x,y,alpha,beta[x*row_size+y,:]]
pool = Pool(maxconn)
outputs = pool.map(compute_cluster_distances_pool, data_inputs)
pool.close()
pool.join()
for i in xrange(0,row_size*col_size):
x = index_arr[i][0]
y = index_arr[i][1]
D[x][y] = outputs[i]
return D
def load_files_parallel(feature_files, load_function, processes, **kwargs):
"""
Function for loading feature files in parallel.
:param feature_files: The collection of files to load.
:param load_function: The function used to load the objects.
:param processes: The number of processes to use for loading the feature files.
:param kwargs: Keyword arguments which will be sent to the load function.
:return: A list of loaded feature data frames or numpy arrays.
"""
logging.info("Reading files in parallel")
pool = multiprocessing.Pool(processes)
try:
#Create a partial function with the keyword arguments set. This is done since the parallel map from
# multiprocessing expects a function which takes a single argument.
partial_load_and_pivot = partial(load_function, **kwargs)
segment_frames = pool.map(partial_load_and_pivot, feature_files)
finally:
pool.close()
return segment_frames
Similarity Metrics - In Parallel.py 文件源码
项目:Parallel-Processing-Nadig
作者: madhug-nadig
项目源码
文件源码
阅读 34
收藏 0
点赞 0
评论 0
def parallel_jaccard_similarity(self,x,y):
p = 16
pool = mp.Pool(processes= p)
chunk_X = []
chunk_Y = []
for i in range(0, len(x), p):
chunk_X.append(x[int(i):int((i+1)*p)])
chunk_Y.append(y[int(i):int((i+1)*p)])
s = time.clock()
intersection_cardinality = sum(pool.starmap(self.interc_card_locl, zip(chunk_X,chunk_Y)))
union_cardinality = sum(pool.starmap(self.union_card_locl, zip(chunk_X,chunk_Y)))
print(intersection_cardinality, union_cardinality)
e = time.clock()
print("Parallel Jaccard Exec Time: ", e-s)
return intersection_cardinality/float(union_cardinality)
def boot(self):
p = Pool(self.cores)
result = p.map(self.do_work, range(self.br))
p.close()
p.join()
return result
def jk(self):
p = Pool(self.cores)
base = np.arange(0, len(self.data))
self.indices = list(np.delete(base, i) for i in base)
result = p.map(self.do_work_jk, range(self.br))
p.close()
p.join()
return result
def get_container_id_mapping(pool, compose_cmd):
service_names = subprocess.check_output(
compose_cmd + ["config", "--services"]
)
service_names = service_names.strip().decode("utf-8").split("\n")
id_mapping = {
name: pool.apply_async(pool_container_id, (name, compose_cmd))
for name in service_names
}
while not all(future.ready() for future in id_mapping.values()):
time.sleep(0.1)
for name, future in list(id_mapping.items()):
if not future.successful():
raise RuntimeError("Cannot get ID of service {0}".format(name))
id_mapping[name] = future.get()
return id_mapping
def test_pool_worker_lifetime_early_close(self):
# Issue #10332: closing a pool whose workers have limited lifetimes
# before all the tasks completed would make join() hang.
p = multiprocessing.Pool(3, maxtasksperchild=1)
results = []
for i in range(6):
results.append(p.apply_async(sqr, (i, 0.3)))
p.close()
p.join()
# check the results
for (j, res) in enumerate(results):
self.assertEqual(res.get(), sqr(j))
#
# Test of creating a customized manager class
#
def _repopulate_pool(self):
"""Bring the number of pool processes up to the specified number,
for use after reaping workers which have exited.
"""
for i in range(self._processes - len(self._pool)):
# changed worker -> clean_worker
args = (self._inqueue, self._outqueue,
self._initializer,
self._initargs, self._maxtasksperchild)
if hasattr(self, '_wrap_exception'):
args += (self._wrap_exception,)
w = self.Process(target=clean_worker, args=args)
self._pool.append(w)
w.name = w.name.replace('Process', 'PoolWorker')
w.daemon = True
w.start()
util.debug('added worker')
def __init__(self, comm=None, loadbalance=False, debug=False,
wait_on_start = True, exit_on_end = True,
cores_per_task = 1, **kwargs):
if MPI is None:
raise ImportError("Please install mpi4py")
self.comm = MPI.COMM_WORLD if comm is None else comm
self.rank = self.comm.Get_rank()
if cores_per_task > 1:
self.size = max(1, self.comm.Get_size() // cores_per_task)
else:
self.size = self.comm.Get_size() - 1
self.function = _error_function
self.loadbalance = loadbalance
self.debug = debug
if self.size == 0:
raise ValueError("Tried to create an MPI pool, but there "
"was only one MPI process available. "
"Need at least two.")
self.exit_on_end = exit_on_end
# Enter main loop for workers?
if wait_on_start:
if self.is_worker():
self.wait()
def Pool(pool = 'AnyPool', **kwargs):
'''
Chooses between the different pools.
If ``pool == 'AnyPool'``, chooses based on availability.
'''
if pool == 'MPIPool':
return MPIPool(**kwargs)
elif pool == 'MultiPool':
return MultiPool(**kwargs)
elif pool == 'SerialPool':
return SerialPool(**kwargs)
elif pool == 'AnyPool':
if MPIPool.enabled():
return MPIPool(**kwargs)
elif MultiPool.enabled():
return MultiPool(**kwargs)
else:
return SerialPool(**kwargs)
else:
raise ValueError('Invalid pool ``%s``.' % pool)
def load_embeddings_mp(path, word_dim, processes=None):
if processes is None:
processes = multiprocessing.cpu_count()
pool = mp.Pool(processes, initializer=_mp_initialize,
initargs=(word_dim,))
with open(path, "r") as f:
iterator = chunks(f, n=processes,
k=processes * 10000)
ret = {}
for batches in iterator:
results = pool.map_async(_mp_process, batches)
results = results.get()
results = aggregate_dicts(*results)
ret.update(results)
return ret
def process_profilelog(fn, pout = None):
# Either call with a list of filenames and set pout or a filename and optionally pout.
if not pout:
pout = fn + '.processed'
pout = open(pout, 'w')
import pstats
if isinstance(fn, list):
p = pstats.Stats(*fn, stream=pout)
else:
p = pstats.Stats(fn, stream=pout)
p.sort_stats('time')
p.print_stats()
p.print_callers()
p.sort_stats('cumulative')
p.print_stats()
pout.flush()
pout.close()
#
# Was present to work around multiprocessing pool bugs in python < 2.7.3
#
def multiprocessingpool(*args, **kwargs):
import multiprocessing.pool
#import multiprocessing.util
#multiprocessing.util.log_to_stderr(10)
# Deal with a multiprocessing bug where signals to the processes would be delayed until the work
# completes. Putting in a timeout means the signals (like SIGINT/SIGTERM) get processed.
def wrapper(func):
def wrap(self, timeout=None):
return func(self, timeout=timeout if timeout is not None else 1e100)
return wrap
multiprocessing.pool.IMapIterator.next = wrapper(multiprocessing.pool.IMapIterator.next)
return multiprocessing.Pool(*args, **kwargs)
def _repopulate_pool(self):
"""Bring the number of pool processes up to the specified number,
for use after reaping workers which have exited.
"""
for i in range(self._processes - len(self._pool)):
# changed worker -> clean_worker
args = (self._inqueue, self._outqueue,
self._initializer,
self._initargs, self._maxtasksperchild)
if hasattr(self, '_wrap_exception'):
args += (self._wrap_exception,)
w = self.Process(target=clean_worker, args=args)
self._pool.append(w)
w.name = w.name.replace('Process', 'PoolWorker')
w.daemon = True
w.start()
util.debug('added worker')
def test_imap_handle_iterable_exception(self):
if self.TYPE == 'manager':
self.skipTest('test not appropriate for {}'.format(self.TYPE))
it = self.pool.imap(sqr, exception_throwing_generator(10, 3), 1)
for i in range(3):
self.assertEqual(next(it), i*i)
self.assertRaises(SayWhenError, it.next)
# SayWhenError seen at start of problematic chunk's results
it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 2)
for i in range(6):
self.assertEqual(next(it), i*i)
self.assertRaises(SayWhenError, it.next)
it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 4)
for i in range(4):
self.assertEqual(next(it), i*i)
self.assertRaises(SayWhenError, it.next)