def load_embeddings_mp(path, word_dim, processes=None):
if processes is None:
processes = multiprocessing.cpu_count()
pool = mp.Pool(processes, initializer=_mp_initialize,
initargs=(word_dim,))
with open(path, "r") as f:
iterator = chunks(f, n=processes,
k=processes * 10000)
ret = {}
for batches in iterator:
results = pool.map_async(_mp_process, batches)
results = results.get()
results = aggregate_dicts(*results)
ret.update(results)
return ret
python类Pool()的实例源码
def multiprocess(function, args, n_jobs, random_seed=None):
"""
Call function with args across n_jobs processes (n_jobs doesn't have to be
the length of list_of_args).
Arguments:
function (callable):
args (iterable): an iterable of [(1,2), (3, 4)] results in
[function(1,2), function(3,4)]
n_jobs (int): 0 < n_jobs
random_seed (int | array):
Returns:
list:
"""
if random_seed is not None:
# Each process initializes with the current jobs' randomness (random
# state & random state index). Any changes to these processes'
# randomnesses won't update the current process' randomness.
seed(random_seed)
with Pool(n_jobs) as p:
return p.starmap(function, args)
def main():
# Deterministic Gevent Pool
from gevent.pool import Pool
p = Pool(10)
run1 = [a for a in p.imap_unordered(echo, range(10))]
run2 = [a for a in p.imap_unordered(echo, range(10))]
run3 = [a for a in p.imap_unordered(echo, range(10))]
run4 = [a for a in p.imap_unordered(echo, range(10))]
print( run1 == run2 == run3 == run4 )
# Non Deterministic Process Pool
from multiprocessing.pool import Pool
p = Pool(10)
run1 = [a for a in p.imap_unordered(echo, range(10))]
run2 = [a for a in p.imap_unordered(echo, range(10))]
run3 = [a for a in p.imap_unordered(echo, range(10))]
run4 = [a for a in p.imap_unordered(echo, range(10))]
run1[0]
print( run1 == run2 == run3 == run4 )
def score_reviews(model):
reviews = []
processes = []
num = 1.0
while num <= 5.0:
processes.append({
'string_score': str(num).replace('.', '_'),
'score': num,
'model': model
})
num += 0.5
pool = Pool(8)
for result in pool.imap(run_computation, processes):
reviews.extend(result)
pool.close()
pool.join()
# sort reviews from best to worst
reviews.sort(key=get_second, reverse=True)
reviews.sort(key=get_first, reverse=True)
return reviews
def multithread(fn, args=[[]], pool_type=Pool,
processes=_cpus, maxtasksperchild=1,
chunksize=1):
'''Multithread method using a Pool. Not inherently threadsafe.
For threadsafe operations, use Managers or Locks.
Args must be wrapped in their own iterator, as starmap is used for
multiple arguments.
Returns an iterator of the results'''
def helper(pool):
return pool.starmap(fn, args, chunksize=chunksize)
# ThreadPools do not take a maxtasksperchild argument,
# so we need to conditionally construct a pool
if type(pool_type) is Pool:
with pool_type(processes, maxtasksperchild=maxtasksperchild) as pool:
results = helper(pool)
else:
with pool_type(processes) as pool:
results = helper(pool)
return results
def run(self):
# Get paths to all images
im_files = find_images(join(self.input_dir))
assert (len(im_files) > 0)
if 'augmentation' in self.pipeline.keys():
print "Starting preprocessing ({} processes)".format(self.processes)
optimization_pool = Pool(self.processes)
subprocess = partial(preprocess, params=self)
results = optimization_pool.map(subprocess, im_files)
else:
print "Using previously augmented data"
# Create training and validation (imbalanced)
print "Splitting into training/validation"
try:
train_imgs, val_imgs = self.train_val_split(listdir(self.augment_dir))
self.random_sample(train_imgs, val_imgs, classes=DEFAULT_CLASSES)
except AssertionError:
print "No images found in one more classes - unable to split training and validation"
exit()
def generate_dataset(self, split_dir, mode='training'):
if mode not in ['training', 'testing']:
raise ValueError("Mode must be 'training' or 'testing'")
do_augment = mode == 'training' # we only want to augment the training data
split_df = pd.DataFrame.from_csv(join(split_dir, '{}.csv'.format(mode))) # load splits
data_dir = make_sub_dir(split_dir, mode) # output directory for images
# Make directories for each class of images in advance
classes = [str(l) for l in split_df[self.label].unique()]
for class_name in classes:
make_sub_dir(data_dir, str(class_name))
# Pre-process, augment and randomly sample the training set
print "Preprocessing {} data...".format(mode)
if len(find_images(join(data_dir, '*'))) == 0:
pool = Pool(self.processes)
subprocess = partial(do_preprocess, args={'params': self, 'augment': do_augment, 'out_dir': data_dir})
img_list = list(split_df['full_path'])
_ = pool.map(subprocess, img_list)
self.generate_h5(find_images_by_class(data_dir, classes=classes), join(split_dir, '{}.h5'.format(mode)), split_df,
random_sample=True, classes=classes)
def run_tbbpool(n, body):
"""TBB.Pool"""
from TBB import Pool
global reused_pool, numthreads
if 'reused_pool' not in globals():
log.debug("Creating TBB.Pool(%s)" % numthreads)
reused_pool = Pool(int(numthreads))
reused_pool.map(body, n)
def Pool(processes=None, initializer=None, initargs=(), maxtasksperchild=None):
'''
Returns a process pool object
'''
from multiprocessing.pool import Pool
return Pool(processes, initializer, initargs, maxtasksperchild)
def download_chunks(self, max_workers=5):
print('Will now download chunks.')
original_sigint_handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
executor = Pool(max_workers)
signal.signal(signal.SIGINT, original_sigint_handler)
try:
r = executor.map_async(self.get, self.urls)
result = list(r.get(43200))
DownloadResultProcessor.process_and_print(result)
except KeyboardInterrupt:
executor.terminate()
else:
executor.close()
executor.join()
def QA_util_MP_process(num):
pool = Pool(num)
return pool
def fill_cache(self, repair_incorrect: bool = False) -> None:
with Pool(processes=multiprocessing.cpu_count()) as pool:
total = len(self.labeled_spectrograms)
not_yet_cached = [s for s in self.labeled_spectrograms if not s.is_cached()]
to_calculate = self.labeled_spectrograms if repair_incorrect else not_yet_cached
log("Filling cache with {} spectrograms: {} already cached, {} to calculate.".format(
total, total - len(not_yet_cached), len(to_calculate)))
for index, labeled_spectrogram in enumerate(to_calculate):
pool.apply_async(_repair_cached_spectrogram_if_incorrect if repair_incorrect else _cache_spectrogram,
(labeled_spectrogram,))
pool.close()
pool.join()
def Pool(processes=None, initializer=None, initargs=(), maxtasksperchild=None):
'''
Returns a process pool object
'''
from multiprocessing.pool import Pool
return Pool(processes, initializer, initargs, maxtasksperchild)
def loadBatch(img_paths):
with Pool(processes=8) as pool:
imgs = pool.map(loadImage, zip(img_paths, range(len(img_paths))))
return np.asarray(imgs)
# Use this for training, instead of loading everything into memory, in only loads chunks
def __init__(self, func, pool_size=4, timeout=None):
# The signal handler for the consumer exists only in the parent
# process. If we don't give children their own noop signal handler,
# any signal propagated to them by the parent will cause them to throw
# an exception and terminate.
super(ProcessPoolWorker, self).__init__(func, pool_size=pool_size,
timeout=timeout)
self.pool = pool.Pool(processes=pool_size,
initializer=init_process_pool)
self.logger = get_logger(__name__)
def main(directory, convert_directory, target_size, extension):
util.check_required_program_args([directory, convert_directory])
try:
os.mkdir(convert_directory)
except OSError:
pass
supported_extensions = set(['jpg', 'png', 'tiff', 'jpeg', 'tif'])
filenames = [os.path.join(dp, f) for dp, dn, fn in os.walk(directory)
for f in fn if f.split('.')[-1].lower() in supported_extensions]
filenames = sorted(filenames)
print("Resizing images in {} to {}, this takes a while."
"".format(directory, convert_directory))
n = len(filenames)
# process in batches, sometimes weird things happen with Pool on my machine
batchsize = 500
batches = n // batchsize + 1
pool = Pool(N_PROC)
args = []
for f in filenames:
args.append((convert, (directory, convert_directory, f, target_size,
extension)))
for i in range(batches):
print("batch {:>2} / {}".format(i + 1, batches))
pool.map(process, args[i * batchsize: (i + 1) * batchsize])
pool.close()
print('done')
def main(directory, convert_directory, target_size, extension):
util.check_required_program_args([directory, convert_directory])
try:
os.mkdir(convert_directory)
except OSError:
pass
supported_extensions = set(['jpg', 'png', 'tiff', 'jpeg', 'tif'])
filenames = [os.path.join(dp, f) for dp, dn, fn in os.walk(directory)
for f in fn if f.split('.')[-1].lower() in supported_extensions]
filenames = sorted(filenames)
print("Resizing images in {} to {}, this takes a while."
"".format(directory, convert_directory))
n = len(filenames)
# process in batches, sometimes weird things happen with Pool on my machine
batchsize = 500
batches = n // batchsize + 1
pool = Pool(N_PROC)
args = []
for f in filenames:
args.append((resize, (directory, convert_directory, f, target_size,
extension)))
for i in range(batches):
print("batch {:>2} / {}".format(i + 1, batches))
pool.map(process, args[i * batchsize: (i + 1) * batchsize])
pool.close()
print('done')
def Pool(processes=None, initializer=None, initargs=(), maxtasksperchild=None):
'''
Returns a process pool object
'''
from multiprocessing.pool import Pool
return Pool(processes, initializer, initargs, maxtasksperchild)
def __init__(self, multiprocess=True):
#Determine the center to rotate around
self.center_shift = np.array((params.PIXELS, params.PIXELS)) / 2. - 0.5
self.multiprocess = multiprocess
if self.multiprocess:
self.pool = Pool(4)
def Pool(processes=None, initializer=None, initargs=(), maxtasksperchild=None):
'''
Returns a process pool object
'''
from multiprocessing.pool import Pool
return Pool(processes, initializer, initargs, maxtasksperchild)
def Pool(processes=None, initializer=None, initargs=(), maxtasksperchild=None):
'''
Returns a process pool object
'''
from multiprocessing.pool import Pool
return Pool(processes, initializer, initargs, maxtasksperchild)
def Pool(processes=None, initializer=None, initargs=(), maxtasksperchild=None):
'''
Returns a process pool object
'''
from multiprocessing.pool import Pool
return Pool(processes, initializer, initargs, maxtasksperchild)
def main(directory, convert_directory, test, crop_height, crop_width, extension):
try:
os.mkdir(convert_directory)
except OSError:
pass
supported_extensions = set(['jpg', 'png', 'tiff', 'jpeg', 'tif'])
filenames = [os.path.join(dp, f) for dp, dn, fn in os.walk(directory)
for f in fn if f.split('.')[-1].lower() in supported_extensions]
filenames = sorted(filenames)
print("Resizing images in {} to {}, this takes a while."
"".format(directory, convert_directory))
n = len(filenames)
# process in batches, sometimes weird things happen with Pool on my machine
batchsize = 500
batches = n // batchsize + 1
pool = Pool(N_PROC)
args = []
for f in filenames:
args.append((convert_seg_labels, (directory, convert_directory, f, crop_height, crop_width,
extension)))
for i in range(batches):
print("batch {:>2} / {}".format(i + 1, batches))
pool.map(process, args[i * batchsize: (i + 1) * batchsize])
pool.close()
print('done')
def parallel_variability_analysis(tmodel, kind='reactions', proc_num = BEST_THREAD_RATIO):
"""
WIP.
:param tmodel:
:param kind:
:param proc_num:
:return:
"""
raise(NotImplementedError)
objective = tmodel.objective
if kind == Reaction or kind.lower() in ['reaction','reactions']:
these_vars = tmodel.reactions
else:
these_vars = tmodel.get_variables_of_type(kind)
func = partial(_variability_analysis_element, tmodel)
pool = Pool(processes=proc_num)
async_result = pool.map_async(func, these_vars)
pool.close()
pool.join()
# aggregated_result = pd.DataFrame(async_result.get(),
# columns = ['minimize','maximize'])
tmodel.objective = objective
return async_result
def Pool(processes=None, initializer=None, initargs=(), maxtasksperchild=None):
'''
Returns a process pool object
'''
from multiprocessing.pool import Pool
return Pool(processes, initializer, initargs, maxtasksperchild)
StandaloneSimilarity.py 文件源码
项目:job-salary-prediction
作者: soton-data-mining
项目源码
文件源码
阅读 40
收藏 0
点赞 0
评论 0
def multiprocessor_batch_calc(self, batch_queue):
p = Pool(3)
prediction = p.map(self.predict_batch, batch_queue)
return list(itertools.chain.from_iterable(prediction))
def __init__(self, multiprocess=True):
#Determine the center to rotate around
self.center_shift = np.array((params.PIXELS, params.PIXELS)) / 2. - 0.5
self.multiprocess = multiprocess
if self.multiprocess:
self.pool = Pool(4)
def Pool(processes=None, initializer=None, initargs=(), maxtasksperchild=None):
'''
Returns a process pool object
'''
from multiprocessing.pool import Pool
return Pool(processes, initializer, initargs, maxtasksperchild)
def QA_util_MP_process(num):
pool=Pool(num)
return pool
def process_url(url_list):
g = pool.Pool(5)
#print 'fuck!!!'
#for url in url_list:
# g.spawn(procFunc, url)
g.map(procFunc, url_list)
g.join()